# HG changeset patch # User Jeff Hammel # Date 1295021160 28800 # Node ID 34b1d30503fad7577e13c1f2210b92396965f3e4 # Parent 9ba5c3df5e30b2951d74529c632a99571c783501 add a stub for an hg poller; doesnt work yet diff -r 9ba5c3df5e30 -r 34b1d30503fa autobot/changes/__init__.py diff -r 9ba5c3df5e30 -r 34b1d30503fa autobot/changes/poller.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/autobot/changes/poller.py Fri Jan 14 08:06:00 2011 -0800 @@ -0,0 +1,335 @@ +# This file is part of Buildbot. Buildbot is free software: you can +# redistribute it and/or modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation, version 2. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 51 +# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Copyright Buildbot Team Members + +import time +import tempfile +import os +from twisted.python import log +from twisted.internet import defer, utils + +from buildbot.util import deferredLocked +from buildbot.changes import base + +class Poller(base.PollingChangeSource): + """This source will poll a remote git repo for changes and submit + them to the change master.""" + + compare_attrs = ["repourl", "branch", "workdir", + "pollInterval", "binary", "usetimestamps", + "category", "project"] + + def __init__(self, repourl, binary=None, branch=None, + workdir=None, pollInterval=10*60, + binary='git', usetimestamps=True, + category=None, project=None, + pollinterval=-2): + # for backward compatibility; the parameter used to be spelled with 'i' + if pollinterval != -2: + pollInterval = pollinterval + if project is None: project = '' + + self.repourl = repourl + self.branch = branch + self.pollInterval = pollInterval + self.lastChange = time.time() + self.lastPoll = time.time() + self.binary = binary + self.workdir = workdir + self.usetimestamps = usetimestamps + self.category = category + self.project = project + self.changeCount = 0 + self.commitInfo = {} + self.initLock = defer.DeferredLock() + + if self.workdir == None: + self.workdir = tempfile.gettempdir() + + self.name = self.__class__.__name__ + + def startService(self): + # initialize the repository we'll use to get changes; note that + # startService is not an event-driven method, so this method will + # instead acquire self.initLock immediately when it is called. + if not self.isInitialized(): + d = self.initRepository() + d.addErrback(log.err, 'while initializing %s repository' % self.name) + else: + log.msg("%s repository already exists" % self.name) + + # call this *after* initRepository, so that the initLock is locked first + base.PollingChangeSource.startService(self) + + @deferredLocked('initLock') + def initRepository(self): + d = defer.succeed(None) + def make_dir(_): + dirpath = os.path.dirname(self.workdir.rstrip(os.sep)) + if not os.path.exists(dirpath): + log.msg('%s: creating parent directories for workdir' % self.name) + os.makedirs(dirpath) + d.addCallback(make_dir) + + def processCommand(command): + d = utils.getProcessOutputAndValue(self.gitbin, + command, env=dict(PATH=os.environ['PATH'])) + d.addCallback(self._convert_nonzero_to_failure) + d.addErrback(self._stop_on_failure) + return d + + for command in self.initializationCommands(): + d.addCallback(lambda _: processCommand(command[:])) + + def log_finished(_): + log.msg("%s: finished initializing working dir from %s" % (self.name, self.repourl) + d.addCallback(log_finished) + return d + + def describe(self): + status = "" + if not self.master: + status = "[STOPPED - check log]" + str = '%s watching the remote git repository %s, branch: %s %s' \ + % (self.name, self.repourl, self.branch, status) + return str + + @deferredLocked('initLock') + def poll(self): + d = self._get_changes() + d.addCallback(self._process_changes) + d.addErrback(self._process_changes_failure) + d.addCallback(self._catch_up) + d.addErrback(self._catch_up_failure) + return d + + def _get_commit_comments(self, rev): + args = ['log', rev, '--no-walk', r'--format=%s%n%b'] + d = utils.getProcessOutput(self.gitbin, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False ) + def process(git_output): + stripped_output = git_output.strip() + if len(stripped_output) == 0: + raise EnvironmentError('could not get commit comment for rev') + return stripped_output + d.addCallback(process) + return d + + def _get_commit_timestamp(self, rev): + # unix timestamp + args = ['log', rev, '--no-walk', r'--format=%ct'] + d = utils.getProcessOutput(self.gitbin, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False ) + def process(git_output): + stripped_output = git_output.strip() + if self.usetimestamps: + try: + stamp = float(stripped_output) + except Exception, e: + log.msg('gitpoller: caught exception converting output \'%s\' to timestamp' % stripped_output) + raise e + return stamp + else: + return None + d.addCallback(process) + return d + + def _get_commit_files(self, rev): + args = ['log', rev, '--name-only', '--no-walk', r'--format=%n'] + d = utils.getProcessOutput(self.gitbin, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False ) + def process(git_output): + fileList = git_output.split() + return fileList + d.addCallback(process) + return d + + def _get_commit_name(self, rev): + args = ['log', rev, '--no-walk', r'--format=%aE'] + d = utils.getProcessOutput(self.gitbin, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=False ) + def process(git_output): + stripped_output = git_output.strip() + if len(stripped_output) == 0: + raise EnvironmentError('could not get commit name for rev') + return stripped_output + d.addCallback(process) + return d + + @defer.defferedGenerator + def _get_changes(self): + log.msg('%s: polling repo at %s' % (self.name, self.repourl)) + + self.lastPoll = time.time() + + # get the hash before updating + d = self._hash() + wfd = defer.waitForDeferred(d) + yield wfd + results = wfd.getResult() + self.preHash = results.strip() + + # get a deferred object that performs the fetch + yield self._fetch() + + # get the hash after updating + d = self._hash() + wfd = defer.waitForDeferred(d) + yield wfd + results = wfd.getResult() + self.postHash = results.strip() + + + @defer.deferredGenerator + def _process_changes(self, unused_output): + # get the change list + revListArgs = ['log', 'HEAD..FETCH_HEAD', r'--format=%H'] + self.changeCount = 0 + d = utils.getProcessOutput(self.gitbin, revListArgs, path=self.workdir, + env=dict(PATH=os.environ['PATH']), errortoo=False ) + wfd = defer.waitForDeferred(d) + yield wfd + results = wfd.getResult() + + # process oldest change first + revList = results.split() + if not revList: + return + + revList.reverse() + self.changeCount = len(revList) + + log.msg('%s: processing %d changes: %s in "%s"' + % (self.name, self.changeCount, revList, self.workdir) ) + + for rev in revList: + dl = defer.DeferredList([ + self._get_commit_timestamp(rev), + self._get_commit_name(rev), + self._get_commit_files(rev), + self._get_commit_comments(rev), + ], consumeErrors=True) + + wfd = defer.waitForDeferred(dl) + yield wfd + results = wfd.getResult() + + # check for failures + failures = [ r[1] for r in results if not r[0] ] + if failures: + # just fail on the first error; they're probably all related! + raise failures[0] + + timestamp, name, files, comments = [ r[1] for r in results ] + d = self.master.addChange( + who=name, + revision=rev, + files=files, + comments=comments, + when=timestamp, + branch=self.branch, + category=self.category, + project=self.project, + repository=self.repourl) + wfd = defer.waitForDeferred(d) + yield wfd + results = wfd.getResult() + + def _process_changes_failure(self, f): + log.msg('%s: repo poll failed' % self.name) + log.err(f) + # eat the failure to continue along the defered chain - we still want to catch up + return None + + def _catch_up(self, res): + if self.changeCount == 0: + log.msg('gitpoller: no changes, no catch_up') + return + log.msg('gitpoller: catching up to FETCH_HEAD') + args = ['reset', '--hard', 'FETCH_HEAD'] + d = utils.getProcessOutputAndValue(self.gitbin, args, path=self.workdir, env=dict(PATH=os.environ['PATH'])) + d.addCallback(self._convert_nonzero_to_failure) + return d + + def _catch_up_failure(self, f): + log.err(f) + log.msg('%s: please resolve issues in local repo: %s' % (self.name, self.workdir)) + # this used to stop the service, but this is (a) unfriendly to tests and (b) + # likely to leave the error message lost in a sea of other log messages + + def _convert_nonzero_to_failure(self, res): + "utility method to handle the result of getProcessOutputAndValue" + (stdout, stderr, code) = res + if code != 0: + raise EnvironmentError('command failed with exit code %d: %s' % (code, stderr)) + + def _stop_on_failure(self, f): + "utility method to stop the service when a failure occurs" + if self.running: + d = defer.maybeDeferred(lambda : self.stopService()) + d.addErrback(log.err, 'while stopping broken %s service' % self.name) + return f + +class HGPoller(Poller): + def __init__(self, repourl, binary='hg', branch='default', **kwargs): + Poller.__init__(self, repourl, binary=binary, branch=branch, **kwargs) + + def isInitialized(self): + return os.path.exists(os.path.join(self.workdir), '.hg') + + def itializationCommands(self): + return [ [ 'clone', self.repourl ] ] + + def _fetch(self): + + # get a deferred object that performs the fetch + args = ['pull'] + # This command always produces data on stderr, but we actually do not care + # about the stderr or stdout from this command. We set errortoo=True to + # avoid an errback from the deferred. The callback which will be added to this + # deferred will not use the response. + d = utils.getProcessOutput(self.gitbin, args, path=self.workdir, env=dict(PATH=os.environ['PATH']), errortoo=True ) + + return d + + @defer.deferredGenerator + def _process_changes(self, unused_ouput): + self.changeCount = 0 + if self.preHash == self.postHash: + return + range = '%s:%s' % (preHash, postHash) + d = utils.getProcessOutput(self.binary, ['log', '-r', range, '--template', '{node}\\n'], + path=self.workdir, + env=dict(PATH=os.environ['PATH']), errortoo=False ) + wfd = defer.waitForDeferred(d) + yield wfd + results = wfd.getResult() + + # process oldest change first + revList = results.split() + if not revList: + return + + revList.reverse() + self.changeCount = len(revList) + + log.msg('%s: processing %d changes: %s in "%s"' + % (self.name, self.changeCount, revList, self.workdir) ) + + for rev in revList: pass + + @defer.deferredGenerator + def _hash(self) + d = utils.getProcessOutput(self.binary, ['tip', '--template', '{node}\\n'], + path=self.workdir, + env=dict(PATH=os.environ['PATH']), errortoo=False ) + return d + + diff -r 9ba5c3df5e30 -r 34b1d30503fa autobot/process/factory.py --- a/autobot/process/factory.py Thu Jan 13 11:20:57 2011 -0800 +++ b/autobot/process/factory.py Fri Jan 14 08:06:00 2011 -0800 @@ -16,7 +16,6 @@ command = 'if %s; else false; fi' % '; elif '.join(args) return ['bash', '-c', command] - class VirtualenvFactory(BuildFactory): """ create a virtualenv and install some python packages in it diff -r 9ba5c3df5e30 -r 34b1d30503fa autobot/template/master/master.cfg --- a/autobot/template/master/master.cfg Thu Jan 13 11:20:57 2011 -0800 +++ b/autobot/template/master/master.cfg Fri Jan 14 08:06:00 2011 -0800 @@ -15,7 +15,7 @@ ####### CHANGESOURCES from buildbot.changes.pb import PBChangeSource -c['change_source'] = PBChangeSource() +c['change_source'] = [PBChangeSource()] ####### BUILDERS