Mercurial > hg > hq
view hq/main.py @ 15:6c4f258fae85
going forward with it
author | Jeff Hammel <jhammel@mozilla.com> |
---|---|
date | Fri, 17 May 2013 03:15:03 -0700 |
parents | 46a68cd554b5 |
children | b878f4ce93fc |
line wrap: on
line source
#!/usr/bin/env python """ mercurial queue extension front-end """ # TODO: migrate to http://k0s.org/hg/CommandParser/ import os import subprocess import sys from commandparser import CommandParser call = subprocess.check_output class HQ(object): """ mercurial queue extension front-end policy manager """ def __init__(self, network=True, root=None, binary='hg'): """initialize global options""" # TODO: look at hgrc file # for [defaults] repository_host # XXX ??? # check for network self.network = network # repository root self.root = root or call(['hg', 'root']).strip() assert os.path.isdir(self.root), "'%s': not a directory!" # hg binary self.binary = binary # patch repo; not guaranteed to exit self._patch_repo = os.path.join(self.root, '.hg', 'patches') ### subcommands def clone(self, repo, patch=None, queue=None): """ clone the repository and begin a patch queue - path: name of a new patch to initiate - queue: name of the remote queue """ directory = repo.rsplit('/', 1) call(['hg', 'clone', repo, directory]) os.chdir(directory) call(['hg', 'qinit', '-c']) if queue: # pull from the given repository self._patch_command(*['hg', 'pull', '--update', queue]) else: # (optionally) setup a new repo pass # TODO if patch: # create a new patch call(['hg', 'qnew', patch]) def commit(self, message): """ commit a patch and push it to the master repository - message : commit message """ call(['hg', 'qrefresh']) call(['hg', 'qcommit', '-m', message]) if self.network: self._patch_command(*['hg', 'push']) def pull(self, repo=None, mq=True): """ pull from the root repository if mq is true, update the patch queue, if versioned """ # check for outstanding changes output = self._call(['st']).strip() lines = [line for line in output.splitlines() if not line.startswith('?')] if lines: print "Outstanding changes:" print output raise AssertionError applied, unapplied = self._series() self._call(['qpop', '--all']) self._call(['pull'] + (repo and [repo] or [])) # TODO: pull queue repo for patch in applied: self._call(['qpush']) def goto(self, patch): """ go to a specific patch and apply it - patch: name of patch to go to """ # TODO process = subprocess.Popen(['hg', 'qapplied'], stdout=subprocess.PIPE) stdout, stderr = process.communicate() applied = [ i.strip() for i in stdout.splitlines() if i ] raise NotImplementedError def files(self): """ list the files added by the top patch """ # TODO: should only list top-level directories, otherwise it's silly process = subprocess.Popen("hg qdiff | grep '^+++ ' | sed 's/+++ b\///'", stdout=subprocess.PIPE, cwd=self._root) stdout, stderr = process.communicate() return stdout def status(self): """ display status """ return '\n'.join([self._call(i).strip() for i in ('root', 'status', 'qseries')]) def directory(self): """patch queue directory""" if os.path.isdir(self._patch_repo): return self._patch_repo def incoming(self): """are there incoming changes to the patch queue""" if not self._versioned(): return False try: call([self.binary, 'incoming'], cwd=self.directory()) return True except subprocess.CalledProcessError: return False ### internals def _call(self, *args, **kwargs): command = [self.binary] + list(args) kwargs.setdefault('cwd', self.root) return call(command, cwd=self.root) def _patch_command(self, *command, **kwargs): """perform a command in the patch repository""" kwargs.setdefault(cwd=self.directory()) return call(command, **kwargs) def _versioned(self): """is the patch queue versioned?""" return os.path.isdir(os.path.join(self.directory(), '.hg')) def _series(self): """returns a 2-tuple of applied, unapplied""" lines = self._command(['qseries']).strip() applied = [] unapplied = [] for line in lines: line.strip() index, status, name = line.split() if status == 'A': applied.append(name) else: assert status == 'U' unapplied.append(name) return applied, unapplied def main(args=sys.argv[1:]): parser = CommandParser(HQ) options, args = parser.parse_args(args) parser.invoke(args) if __name__ == '__main__': main()