view autobot/changes/poller.py @ 245:5f286f64ce6e

convert to a datetime; i guess you gotta do that now
author Jeff Hammel <jhammel@mozilla.com>
date Thu, 22 Dec 2011 15:46:05 -0800
parents 714a7a7f4ea7
children 0927a07dfab3
line wrap: on
line source

# This file is part of Buildbot.  Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members

import os
import time
import tempfile
from datetime import datetime
from twisted.python import log
from twisted.internet import defer, utils

from buildbot.changes import base, changes

class Poller(base.PollingChangeSource):
    """
    This will poll a remote resource for changes and submit
    them to the change master.
    """

    src = ''

    compare_attrs = ["repourl", "branch", "workdir",
                     "pollInterval", "binary", "usetimestamps",
                     "category", "project"]

    def __init__(self, repourl, binary=None, branch=None,
                 workdir=None, pollInterval=10*60,
                 usetimestamps=True,
                 category=None, project=None,
                 pollinterval=-2):

        # for backward compatibility; the parameter used to be spelled with 'i'
        if pollinterval != -2:
            pollInterval = pollinterval

        if project is None: project = ''
        self.repourl = repourl
        self.branch = branch
        self.pollInterval = pollInterval
        self.lastChange = time.time()
        self.lastPoll = time.time()
        self.binary = binary
        self.workdir = workdir
        self.usetimestamps = usetimestamps
        self.category = category
        self.project = project
        self.changeCount = 0
        self.commitInfo  = {}
        self.initLock = defer.DeferredLock()

        if not self.workdir:
            self.workdir = tempfile.mkdtemp()

        self.name = '%s: %s#%s' % (self.__class__.__name__, repourl, branch)

# XXX this is commented out because of race conditions on startup
# instead, the repository is just made if it doesn't exist

#     def startService(self):

#         base.PollingChangeSource.startService(self)

#         # initialize the repository we'll use to get changes; note that
#         # startService is not an event-driven method, so this method will
#         # instead acquire self.initLock immediately when it is called.
#         if not self.isInitialized():
#             log.msg('Initializing new repository')
#             d = self.initRepository()
#             d.addErrback(log.err, 'while initializing %s repository at %s' % (self.name, self.workdir))
#         else:
#             log.msg("%s repository already exists" % self.name)

#         log.msg("%s startService with repository %s" % (self.name, self.workdir))

#         # call this *after* initRepository, so that the initLock is locked first


    def initRepository(self, _):
        """initialize a repository or whatever"""

        # make the directory, if necessary
        d = defer.succeed(None)
        def make_dir(_):
            dirpath = os.path.dirname(self.workdir.rstrip(os.sep))
            if not os.path.exists(dirpath):
                log.msg('%s: creating parent directories for workdir' % self.name)
                os.makedirs(dirpath)
        d.addCallback(make_dir)

        # perform the initialization
        def processCommand(command):
            d = utils.getProcessOutputAndValue(self.binary,
                    command, env=dict(PATH=os.environ['PATH']))
#            d.addCallback(self._convert_nonzero_to_failure)
#            d.addErrback(self._stop_on_failure)
            return d
        for command in self.initializationCommands():
            self.log('%s %s' % (self.binary, ' '.join(command)))
            d.addCallback(lambda _: processCommand(command[:]))
            # XXX for some retarded reason, only the first one of these
            # seems actually to get called
            # i don't know....all i do know is that i'm tired of writing
            # twisted code

        # finish up
        def log_finished(_):
            log.msg("%s: finished initializing working dir %s from %s" % (self.name, self.workdir, self.repourl))
        d.addCallback(log_finished)
        return d

    def log(self, msg):
        log.msg('%s: %s' % (self.name, msg))

    def describe(self):
        status = ""
        if not self.parent:
            status = "[STOPPED - check log]"
        str = '%s watching the remote repository %s, branch: %s %s' \
                % (self.name, self.repourl, self.branch, status)
        return str

    def poll(self):
        """poll for new changes"""

        d = self._get_changes()
        d.addCallback(self._process_changes)
        d.addErrback(self._process_changes_failure)
        d.addCallback(self._catch_up)
        d.addErrback(self._catch_up_failure)
        return d

    ### functions relating to hashing

    def _setPreHash(self, _hash):
        self.preHash = _hash.strip()
        self.log("preHash is %s" % self.preHash)

    def _setPostHash(self, _hash):
        self.postHash = _hash.strip()
        self.log("postHash is %s" % self.postHash)

    def _get_changes(self):
        """update the changes if the hash doesnt match"""

        self.lastPoll = time.time()
        self.log('polling repo at %s : %s' % (self.repourl, self.lastPoll))

        d = defer.succeed(None)
        # ensure the repository is initialized
        if not self.isInitialized():
            self.log('Initializing new repository at %s' % self.workdir)
            d.addCallback(self.initRepository)

        # get the hash before updating
        d.addCallback(self._hash)
        d.addCallback(self._setPreHash)

        # update
        d.addCallback(self._fetch)

        # set the post hash
        d.addCallback(self._hash)
        d.addCallback(self._setPostHash)

        return d

    ### functions related to processing changes

    def _process_changes(self, _):
        """processes the changes between the preHash and the postHash"""

        d = defer.succeed(None)

        # get the change list
        self.changeCount = 0
        if self.preHash == self.postHash:
            return d

        d.addCallback(self._change_list)
        d.addCallback(self._process_change_list)
        return d

    def _process_change_list(self, revList):

        self.changeCount = len(revList)
        self.log('processing %d changes: %s in "%s"'
                 % (self.changeCount, revList, self.workdir) )

        # get metadata for changes and send them to master
        d = defer.succeed(None)
        for rev in revList:
            d.addCallback(self._process_change, rev)
        return d

    def _process_change(self, something, rev):
        self.commitInfo = {}
        self.log('processing change %s (something=%s)' % (rev, something))
        d = defer.succeed(None)
        d.addCallback(self._get_commit_timestamp, rev)
        d.addCallback(self._get_commit_name, rev)
        d.addCallback(self._get_commit_files, rev)
        d.addCallback(self._get_commit_comments, rev)
        d.addCallback(self._add_change, rev)
        return d

    def _add_change(self, _, rev):
        log.msg("_add_change results: %s" % self.commitInfo)

        # convert the timestamp into a datetime object
        # (required in buildbot 0.8.5)
        timestamp = self.commitInfo['timestamp']
        if isinstance(timestamp, int) or isinstance(timestamp, float):
            timestamp = datetime.fromtimestamp(timestamp)

        self.log('timestamp: %s' % timestamp)

        # send the change to the master
        c = changes.Change(who=self.commitInfo['name'],
                           revision=rev,
                           files=self.commitInfo['files'],
                           comments=self.commitInfo['comments'],
                           when=self.commitInfo['timestamp'],
                           branch=self.branch,
                           category=self.category,
                           project=self.project,
                           repository=self.repourl)
        log.msg("parent: %s, %s" % (self.parent, getattr(self.parent, 'module', None)))
        self.master.addChange(author=self.commitInfo['name'],
                              revision=rev,
                              files=self.commitInfo['files'],
                              comments=self.commitInfo['comments'],
                              when_timestamp=timestamp,
                              branch=self.branch,
                              category=self.category,
                              project=self.project,
                              repository=self.repourl,
                              src=self.src)
        self.lastChange = self.lastPoll


    def _process_changes_failure(self, f):
        log.msg('%s: repo poll failed' % self.name)
        log.err(f)
        # eat the failure to continue along the defered chain - we still want to catch up
        return None

    def _catch_up_failure(self, f):
        log.err(f)
        log.msg('%s: please resolve issues in local repo: %s' % (self.name, self.workdir))
        # this used to stop the service, but this is (a) unfriendly to tests and (b)
        # likely to leave the error message lost in a sea of other log messages

    def _convert_nonzero_to_failure(self, res):
        "utility method to handle the result of getProcessOutputAndValue"
        (stdout, stderr, code) = res
        if code != 0:
            raise EnvironmentError('command failed with exit code %d: %s' % (code, stderr))

    def _stop_on_failure(self, f):
        "utility method to stop the service when a failure occurs"
        if self.running:
            d = defer.maybeDeferred(lambda : self.stopService())
            d.addErrback(log.err, 'while stopping broken %s service' % self.name)
        return f


###

class HgPoller(Poller):
    """poller for a mercurial source"""

    src = 'hg'

    def __init__(self, repourl, binary='hg', branch='default', **kwargs):
        Poller.__init__(self, repourl, binary=binary, branch=branch, **kwargs)

    def isInitialized(self):
        """is the repository initialized?"""
        return os.path.exists(os.path.join(self.workdir, '.hg'))

    def checkoutBranch(self, _):
        """checkout the branch"""
        d = utils.getProcessOutput(self.binary, ['checkout', self.branch],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=True)
        return d

    def initializationCommands(self):
        """commands to initialize a mercurial repository"""
        commands = [ [ 'clone', self.repourl, self.workdir ] ]
        if self.branch != 'default':
            commands.append(['-r', self.workdir, 'checkout', self.branch])
        return commands

    def update(self, _):
        args = ['pull', self.repourl]
        d = utils.getProcessOutput(self.binary, args, path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=True )
        return d


    def _fetch(self, _):
        d = self.checkoutBranch(None)
        d.addCallback(self.update)
        return d

    def _hash(self, _):
        """commit hash"""
        d = utils.getProcessOutput(self.binary, ['tip', '--template', '{node}\\n'],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']), errortoo=False )
        return d


    def _change_list(self, _):
        """
        return a deferred something-or-other that has the changes to be
        processed.  XXX the format is pretty particular
        """
        range = '%s:%s' % (self.preHash, self.postHash)
        d = utils.getProcessOutput(self.binary, ['log', '-r', range, '--template', '{node}\\n'],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']), errortoo=False )
        def split_changes(raw_changes):
            changes = raw_changes.strip()
            if not changes:
                return []
            return changes.split()[1:]
        d.addCallback(split_changes)

        return d

    def _catch_up(self, rev):
        if self.changeCount == 0:
            self.log('%s: no changes, no catch_up' % self.postHash)
            return
        self.log('catching up to %s' % self.postHash)
        args = ['update']
        d = utils.getProcessOutputAndValue(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']))
        d.addCallback(self._convert_nonzero_to_failure)
        return d

    ### functions for retrieving various metadatas

    ### timestamp

    def _get_commit_timestamp(self, _, rev):
        # unix timestamp
        args = ['log', '-r', rev, '--template', '{date|hgdate}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_timestamp_from_output)
        return d

    def _get_commit_timestamp_from_output(self, output):
        stripped_output = output.strip()
        if self.usetimestamps:
            try:
                _stamp, offset = output.split()
                stamp = float(_stamp)
            except Exception, e:
                log.msg('%s: caught exception converting output "%s" to timestamp' % (self.name, stripped_output))
                raise e
            self.commitInfo['timestamp'] = stamp
        else:
            self.commitInfo['timestamp'] = None

    ### commit author ('name')

    def _get_commit_name(self, _, rev):
        """get the author of a commit"""
        args = ['log', '-r', rev, '--template', '{author}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_name_from_output)
        return d

    def _get_commit_name_from_output(self, output):
        stripped_output = output.strip()
        if len(stripped_output) == 0:
            raise EnvironmentError('could not get commit name for rev')
        self.commitInfo['name'] = stripped_output
        return self.commitInfo['name'] # for tests, or so gitpoller says

    ### files

    def _get_commit_files(self, _, rev):
        """get the files associated with a commit"""
        args = ['log', '-r', rev, '--template', '{files}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_files_from_output)
        return d

    def _get_commit_files_from_output(self, output):
        fileList = output.strip().split()
        self.commitInfo['files'] = fileList
        return self.commitInfo['files']

    ### commit comments

    def _get_commit_comments(self, _, rev):
        """get the commit message"""
        args = ['log', '-r', rev, '--template', '{desc}']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False )
        d.addCallback(self._get_commit_comments_from_output)
        return d

    def _get_commit_comments_from_output(self, output):
        stripped_output = output.strip()
        self.commitInfo['comments'] = stripped_output
        return self.commitInfo['comments']


###

class GitPoller(Poller):

    src = 'git'

    def __init__(self, repourl, binary='git', branch='master', **kwargs):
        Poller.__init__(self, repourl, binary=binary, branch=branch, **kwargs)

    def isInitialized(self):
        """is the repository initialized?"""
        return os.path.exists(os.path.join(self.workdir, '.git'))

    def checkoutBranch(self, _):
        """checkout the branch"""
        # XXX I have no clue why I am doing this except that I'm working
        # around twisted evidently not doing what I think its doing
        # so in the face of a clearly superior architecture I will
        # randomly stab around and hope to god that eventually,
        # through sheer dumb luck, I can get on the proper branch
        d = utils.getProcessOutput(self.binary, ['checkout', self.branch],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=True)
        return d


    def initializationCommands(self):
        """commands needed to initialize the repository"""
        commands = [ [ 'clone', self.repourl, self.workdir ] ]
        if self.branch != 'master':
            git_dir = self.workdir.rstrip('/')
            commands.append(['--git-dir', git_dir + '/.git',
                             '--work-tree', git_dir,
                             'checkout', self.branch])
        log.msg('GitPoller: initializationCommands: %s' % commands)
        return commands

    def update(self, _):
        args = ['pull', 'origin', self.branch]
        d = utils.getProcessOutput(self.binary, args, path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=True)
        return d

    def _fetch(self, _):
        d = self.checkoutBranch(None)
        d.addCallback(self.update)
        return d

    def _hash(self, _):
        """
        get hash of where you are now:
        git rev-parse HEAD
        """
        d = utils.getProcessOutput(self.binary, ['rev-parse', 'HEAD'],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']), errortoo=False)
        return d


    def _change_list(self, _):
        range = '%s..%s' % (self.preHash, self.postHash)
        d = utils.getProcessOutput(self.binary,
                                   ['log', range, r'--format=%H'],
                                   path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=False)
        def split_changes(raw_changes):
            changes = raw_changes.strip()
            if not changes:
                return []
            return changes.split()
        d.addCallback(split_changes)
        return d

    def _catch_up(self, rev):
        if self.changeCount == 0:
            self.log('%s: no changes, no catch_up' % self.postHash)
            return
        self.log('catching up to %s' % self.postHash)

        # XXX keeping this in as a dummy for now
        # should *really* do fetch earlier and *now* do pull
        # (or whatever)
        args = ['status']
        d = utils.getProcessOutputAndValue(self.binary, args, path=self.workdir, env=dict(PATH=os.environ['PATH']))
        d.addCallback(self._convert_nonzero_to_failure)
        return d

    ### metadata for commits

    ### timestamp

    def _get_commit_timestamp(self, _, rev):
        # unix timestamp
        args = ['log', rev, '--no-walk', r'--format=%ct']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=False )
        d.addCallback(self._get_commit_timestamp_from_output)
        return d

    def _get_commit_timestamp_from_output(self, output):
        stripped_output = output.strip()
        if self.usetimestamps:
            try:
                stamp = float(stripped_output)
            except Exception, e:
                    self.log('caught exception converting output \'%s\' to timestamp' % stripped_output)
                    raise e
            self.commitInfo['timestamp'] = stamp
        else:
            self.commitInfo['timestamp'] = None
        return self.commitInfo['timestamp'] # for tests

    ### commit author ('name')

    def _get_commit_name(self, _, rev):
        args = ['log', rev, '--no-walk', r'--format=%aE']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=False )
        d.addCallback(self._get_commit_name_from_output)
        return d

    def _get_commit_name_from_output(self, output):
        stripped_output = output.strip()
        if len(stripped_output) == 0:
            raise EnvironmentError('could not get commit name for rev')
        self.commitInfo['name'] = stripped_output
        return self.commitInfo['name'] # for tests

    ### files

    def _get_commit_files(self, _, rev):
        args = ['log', rev, '--name-only', '--no-walk', r'--format=%n']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=False )
        d.addCallback(self._get_commit_files_from_output)
        return d

    def _get_commit_files_from_output(self, output):
        fileList = output.split()
        self.commitInfo['files'] = fileList
        return self.commitInfo['files'] # for tests


    ### comments

    def _get_commit_comments(self, _, rev):
        args = ['log', rev, '--no-walk', r'--format=%s%n%b']
        d = utils.getProcessOutput(self.binary, args, path=self.workdir,
                                   env=dict(PATH=os.environ['PATH']),
                                   errortoo=False )
        d.addCallback(self._get_commit_comments_from_output)
        return d

    def _get_commit_comments_from_output(self, output):
        stripped_output = output.strip()
        if len(stripped_output) == 0:
            raise EnvironmentError('could not get commit comment for rev')
        self.commitInfo['comments'] = stripped_output
        return self.commitInfo['comments'] # for tests