view autobot/changes/poller.py @ 116:c6fbb0d981e9

pass missing pointless argument
author Jeff Hammel <jhammel@mozilla.com>
date Sat, 22 Jan 2011 18:12:54 -0800
parents 3f94c56f8f47
children afc245e4e55d
line wrap: on
line source

# This file is part of Buildbot.  Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members

import os
import time
import tempfile
from twisted.python import log
from twisted.internet import defer, utils

#from buildbot.util import deferredLocked
from buildbot.changes import base

class Poller(base.PollingChangeSource):
    """
    This will poll a remote resource for changes and submit
    them to the change master.
    """
    
    compare_attrs = ["repourl", "branch", "workdir",
                     "pollInterval", "binary", "usetimestamps",
                     "category", "project"]
                     
    def __init__(self, repourl, binary=None, branch=None,
                 workdir=None, pollInterval=10*60, 
                 usetimestamps=True,
                 category=None, project=None,
                 pollinterval=-2):

        # for backward compatibility; the parameter used to be spelled with 'i'
        if pollinterval != -2:
            pollInterval = pollinterval

        if project is None: project = ''
        self.repourl = repourl
        self.branch = branch
        self.pollInterval = pollInterval
        self.lastChange = time.time()
        self.lastPoll = time.time()
        self.binary = binary
        self.workdir = workdir
        self.usetimestamps = usetimestamps
        self.category = category
        self.project = project
        self.changeCount = 0
        self.commitInfo  = {}
        self.initLock = defer.DeferredLock()
        
        if not self.workdir:
            self.workdir = tempfile.mkdtemp()

        self.name = self.__class__.__name__

#     def startService(self):

#         base.PollingChangeSource.startService(self)

#         # initialize the repository we'll use to get changes; note that
#         # startService is not an event-driven method, so this method will
#         # instead acquire self.initLock immediately when it is called.
#         if not self.isInitialized():
#             log.msg('Initializing new repository')
# #            d = self.initRepository()
# #            d.addErrback(log.err, 'while initializing %s repository at %s' % (self.name, self.workdir))
#         else:
#             log.msg("%s repository already exists" % self.name)

#         log.msg("%s startService with repository %s" % (self.name, self.workdir))

#         # call this *after* initRepository, so that the initLock is locked first


    def initRepository(self, _):
        """initialize a repository or whatever"""

        # make the directory, if necessary
        d = defer.succeed(None)
        def make_dir(_):
            dirpath = os.path.dirname(self.workdir.rstrip(os.sep))
            if not os.path.exists(dirpath):
                log.msg('%s: creating parent directories for workdir' % self.name)
                os.makedirs(dirpath)
        d.addCallback(make_dir)

        # perform the initialization
        def processCommand(command):
            d = utils.getProcessOutputAndValue(self.binary,
                    command, env=dict(PATH=os.environ['PATH']))
            d.addCallback(self._convert_nonzero_to_failure)
            d.addErrback(self._stop_on_failure)
            return d
        for command in self.initializationCommands():
            d.addCallback(lambda _: processCommand(command[:]))

        # finish up
        def log_finished(_):
            log.msg("%s: finished initializing working dir %s from %s" % (self.name, self.workdir, self.repourl))
        d.addCallback(log_finished)
        return d

    def describe(self):
        status = ""
        if not self.master:
            status = "[STOPPED - check log]"
        str = '%s watching the remote git repository %s, branch: %s %s' \
                % (self.name, self.repourl, self.branch, status)
        return str

    def poll(self):
        """poll for new changes"""
        d = self._get_changes()
        d.addCallback(self._process_changes)
        d.addErrback(self._process_changes_failure)
        d.addCallback(self._catch_up)
        d.addErrback(self._catch_up_failure)
        return d    

    ### functions relating to hashing

    def _setPreHash(self, _hash):
        self.preHash = _hash.strip()
        log.msg("The secret key is %s" % self.preHash)

    def _setPostHash(self, _hash):
        self.postHash = _hash.strip()
        log.msg("The *NEW* secret key is %s" % self.postHash)

    def _get_changes(self):
        """update the changes if the hash doesnt match"""

        self.lastPoll = time.time()
        log.msg('%s: polling repo at %s : %s' % (self.name, self.repourl, self.lastPoll))

        d = defer.succeed(None)
        # ensure the repository is initialized
        if not self.isInitialized():
            log.msg('Initialing new repository')
            d.addCallback(self.initRepository)

        # get the hash before updating
        log.msg("About to get prehash")
        d.addCallback(self._hash)
        d.addCallback(self._setPreHash)

        # update
        d.addCallback(self._fetch)

        # set the post hash
        d.addCallback(self._hash)
        d.addCallback(self._setPostHash)
        
        return d

    ### functions related to processing changes

    def _process_changes(self, _):

        d = defer.succeed(None)

        # get the change list
        self.changeCount = 0
        if self.preHash == self.postHash:
            return d

        d.addCallback(self._change_list)
        d.addCallback(self._process_change_list)
        return d

    def _process_change_list(self, revList):

        log.msg('this is the changelist: %s' % revList)
        self.changeCount = len(revList)
        log.msg('%s: processing %d changes: %s in "%s"'
                % (self.name, self.changeCount, revList, self.workdir) )

        # get metadata for changes and send them to master
        d = defer.succeed(None)
        for rev in revList:
            d.addCallback(self._process_change, rev)

        return d

    def _process_change(self, rev):
        self.commitInfo = {}
        log.msg('%s: processing change %s' % rev)
        d = defer.succeed(None)
        return d
#             # get metadata
#             dl = defer.DeferredList([
#                 self._get_commit_timestamp(rev),
#                 self._get_commit_name(rev),
#                 self._get_commit_files(rev),
#                 self._get_commit_comments(rev),
#                 ], consumeErrors=True)

#             # add the change, apparently
#             dl.addCallback(self._add_change, rev)
        

    def _add_change(self, change):
        log.msg("_add_change results")

        

        # send the change to the master
#             timestamp, name, files, comments = [ r[1] for r in results ]
#             d = self.master.addChange(
#                    who=name,
#                    revision=rev,
#                    files=files,
#                    comments=comments,
#                    when=timestamp,
#                    branch=self.branch,
#                    category=self.category,
#                    project=self.project,
#                    repository=self.repourl)
#             wfd = defer.waitForDeferred(d)
#             yield wfd
#             results = wfd.getResult()

    def _process_changes_failure(self, f):
        log.msg('%s: repo poll failed' % self.name)
        log.err(f)
        # eat the failure to continue along the defered chain - we still want to catch up
        return None
        
    def _catch_up_failure(self, f):
        log.err(f)
        log.msg('%s: please resolve issues in local repo: %s' % (self.name, self.workdir))
        # this used to stop the service, but this is (a) unfriendly to tests and (b)
        # likely to leave the error message lost in a sea of other log messages

    def _convert_nonzero_to_failure(self, res):
        "utility method to handle the result of getProcessOutputAndValue"
        (stdout, stderr, code) = res
        if code != 0:
            raise EnvironmentError('command failed with exit code %d: %s' % (code, stderr))

    def _stop_on_failure(self, f):
        "utility method to stop the service when a failure occurs"
        if self.running:
            d = defer.maybeDeferred(lambda : self.stopService())
            d.addErrback(log.err, 'while stopping broken %s service' % self.name)
        return f


class HgPoller(Poller):
    """poller for a mercurial source"""

    def __init__(self, repourl, binary='hg', branch='default', **kwargs):
        Poller.__init__(self, repourl, binary=binary, branch=branch, **kwargs)

    def isInitialized(self):
        return os.path.exists(os.path.join(self.workdir, '.hg'))

    def initializationCommands(self):
        return [ [ 'clone', self.repourl, self.workdir ] ]

    def _fetch(self, _):

        # get a deferred object that performs the fetch
        args = ['pull', self.repourl]
        # This command always produces data on stderr, but we actually do not care
        # about the stderr or stdout from this command. We set errortoo=True to
        # avoid an errback from the deferred. The callback which will be added to this
        # deferred will not use the response.
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=True )

        return d


    def _hash(self, _):
        d = utils.getProcessOutput(self.binary, ['tip', '--template', '{node}\\n'],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']), errortoo=False )
        return d


    def _change_list(self, _):
        """
        return a deferred something-or-other that has the changes to be
        processed.  XXX the format is pretty particular
        """
        range = '%s:%s' % (self.preHash, self.postHash)
        d = utils.getProcessOutput(self.binary, ['log', '-r', range, '--template', '{node}\\n'],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']), errortoo=False )
        def split_changes(raw_changes):
            changes = raw_changes.strip()
            if not changes:
                return []
            return list(reversed(changes.split()))
        d.addCallback(split_changes)
        
        return d
        
    def _catch_up(self, rev):
        log.msg('%s: catching up to %s' % (self.name, self.postHash))
        if self.changeCount == 0:
            log.msg('%s: no changes, no catch_up' % self.name)
            return
        args = ['update']
        d = utils.getProcessOutputAndValue(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']))
        d.addCallback(self._convert_nonzero_to_failure)
        return d

    ### functions for retrieving various metadatas

    ### timestamp

    def _get_commit_timestamp(self, rev):
        # unix timestamp
        args = ['log', '-r', rev, '--template', '{date|hgdate}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_timestamp_from_output)
        return d

    def _get_commit_timestamp_from_output(self, output):
        stripped_output = output.strip()
        if self.usetimestamps:
            try:
                _stamp, offset = output.split()
                stamp = float(_stamp)
            except Exception, e:
                log.msg('%s: caught exception converting output "%s" to timestamp' % (self.name, stripped_output))
                raise e
            self.commitInfo['timestamp'] = stamp
        else:
            self.commitInfo['timestamp'] = None

    ### commit author ('name')

    def _get_commit_name(self, rev):
        """get the author of a commit"""
        args = ['log', '-r', rev, '--template', '{author}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_name_from_output)
        return d

    def _get_commit_name_from_output(self, output):
        stripped_output = output.strip()
        if len(stripped_output) == 0:
            raise EnvironmentError('could not get commit name for rev')
        self.commitInfo['name'] = stripped_output
        return self.commitInfo['name'] # for tests, or so gitpoller says

    ### files

    def _get_commit_files(self, rev):
        """get the files associated with a commit"""
        args = ['log', '-r', rev, '--template', '{files}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_files_from_output)
        return d

    def _get_commit_files_from_output(self, output):
        fileList = output.strip.split()
        self.commitInfo['files'] = fileList
        return self.commitInfo['files']

    ### comments

    def _get_commit_comments(self, rev):
        """get the commit message"""
        args = ['log', '-r', rev, '--template', '{desc}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_comments_from_output)
        return d

    def _get_commit_comments_from_output(self, output):
        stripped_output = output.strip()
        self.commitInfo['comments'] = stripped_output
        return self.commitInfo['comments']