Mercurial > hg > hq
view hq/main.py @ 25:e6b1a435f00e default tip
from TODO
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Sat, 25 Feb 2017 13:09:51 -0800 |
parents | 3593c85388a1 |
children |
line wrap: on
line source
#!/usr/bin/env python """ mercurial queue extension front-end """ import os import subprocess import sys from commandparser import CommandParser call = subprocess.check_output class HQ(object): """ mercurial queue extension front-end policy manager """ def __init__(self, network=True, root=None, binary='hg'): """initialize global options""" # TODO: look at hgrc file # for [defaults] repository_host # check for network self.network = network # repository root self.root = root or call(['hg', 'root']).strip() assert os.path.isdir(self.root), "'%s': not a directory!" # hg binary self.binary = binary # patch repo; not guaranteed to exit self._patch_repo = os.path.join(self.root, '.hg', 'patches') ### subcommands def clone(self, repo, patch=None, queue=None): """ clone the repository and begin a patch queue - patch: name of a new patch to initiate - queue: name of the remote queue """ ### TODO: initialize queue properly # hg qinit -c # cd `hg root`/ # rm -rf patches # hg clone ${queue} patches # cd patches # (fix hgrc) directory = repo.rsplit('/', 1) call(['hg', 'clone', repo, directory]) os.chdir(directory) call(['hg', 'qinit', '-c']) if queue: # pull from the given repository self._patch_command(*['hg', 'pull', '--update', queue]) else: # (optionally) setup a new repo pass # TODO if patch: # create a new patch call(['hg', 'qnew', patch]) def commit(self, message): """ commit a patch and push it to the master repository - message : commit message """ call(['hg', 'qrefresh']) call(['hg', 'qcommit', '-m', message]) if self.network: self._patch_command(*['hg', 'push', '--mq']) def pull(self, repo=None, mq=True): """ pull from the root repository if mq is true, update the patch queue, if versioned """ print "Pulling %s" % self.root # check for outstanding changes output = self._status(self.root) if output: print "Outstanding changes:" print output raise AssertionError applied, unapplied = self._series() self._call('qpop', '--all') self._call(*(['pull'] + (repo and [repo] or []))) if self._versioned(): # pull queue repo print "Pulling %s" % self.directory() if self.incoming(): print >> sys.stderr, "Incoming changes" output = self._status(self.directory()) if output: print >> sys.stderr, "Cannot pull %s; outstanding changes" % (self.directory()) print output else: self._call('pull', '--mq') for patch in applied: self._call('qpush') def goto(self, patch): """ go to a specific patch and apply it - patch: name of patch to go to """ # TODO process = subprocess.Popen(['hg', 'qapplied'], stdout=subprocess.PIPE) stdout, stderr = process.communicate() applied = [ i.strip() for i in stdout.splitlines() if i ] raise NotImplementedError def files(self): """ list the files added by the top patch """ # TODO: should only list top-level directories, otherwise it's silly process = subprocess.Popen("hg qdiff | grep '^+++ ' | sed 's/+++ b\///'", stdout=subprocess.PIPE, cwd=self._root) stdout, stderr = process.communicate() return stdout def status(self): """ display status """ return '\n'.join([self._call(i).strip() for i in ('root', 'status', 'qseries')]) st = status def directory(self): """patch queue directory""" if os.path.isdir(self._patch_repo): return self._patch_repo def incoming(self): """are there incoming changes to the patch queue""" if not self._versioned(): return False try: call([self.binary, 'incoming'], cwd=self.directory()) return True except subprocess.CalledProcessError: return False ### internals def _call(self, *args, **kwargs): command = [self.binary] + list(args) kwargs.setdefault('cwd', self.root) return call(command, **kwargs) def _patch_command(self, *command, **kwargs): """perform a command in the patch repository""" kwargs.setdefault('cwd', self.directory()) return call(command) def _versioned(self): """is the patch queue versioned?""" return os.path.isdir(os.path.join(self.directory(), '.hg')) def _series(self): """returns a 2-tuple of applied, unapplied""" lines = [line.strip() for line in self._call('qseries').strip().splitlines()] applied = [] unapplied = [] for line in lines: try: index, status, name = line.split() except: print line raise if status == 'A': applied.append(name) else: assert status == 'U' unapplied.append(name) return applied, unapplied def _status(self, repo): """get the status of a repo; if clean, return None, else return the output of hg st""" output = call([self.binary, 'st'], cwd=repo).strip() lines = [line for line in output.splitlines() if not line.startswith('?')] if lines: return output def main(args=sys.argv[1:]): parser = CommandParser(HQ) options, args = parser.parse_args(args) parser.invoke(args) if __name__ == '__main__': main()