view hq/main.py @ 25:e6b1a435f00e default tip

from TODO
author Jeff Hammel <k0scist@gmail.com>
date Sat, 25 Feb 2017 13:09:51 -0800
parents 3593c85388a1
children
line wrap: on
line source

#!/usr/bin/env python

"""
mercurial queue extension front-end
"""

import os
import subprocess
import sys

from commandparser import CommandParser

call = subprocess.check_output

class HQ(object):
    """
    mercurial queue extension front-end policy manager
    """

    def __init__(self, network=True, root=None, binary='hg'):
        """initialize global options"""
        # TODO: look at hgrc file
        # for [defaults] repository_host

        # check for network
        self.network = network

        # repository root
        self.root = root or call(['hg', 'root']).strip()
        assert os.path.isdir(self.root), "'%s': not a directory!"

        # hg binary
        self.binary = binary

        # patch repo; not guaranteed to exit
        self._patch_repo = os.path.join(self.root, '.hg', 'patches')

    ### subcommands

    def clone(self, repo, patch=None, queue=None):
        """
        clone the repository and begin a patch queue
        - patch: name of a new patch to initiate
        - queue: name of the remote queue
        """

        ### TODO: initialize queue properly
        # hg qinit -c
        # cd `hg root`/
        # rm -rf patches
        # hg clone ${queue} patches
        # cd patches
        # (fix hgrc)

        directory = repo.rsplit('/', 1)
        call(['hg', 'clone', repo, directory])
        os.chdir(directory)
        call(['hg', 'qinit', '-c'])
        if queue:
            # pull from the given repository
            self._patch_command(*['hg', 'pull', '--update', queue])
        else:
            # (optionally) setup a new repo
            pass # TODO

        if patch:
            # create a new patch
            call(['hg', 'qnew', patch])

    def commit(self, message):
        """
        commit a patch and push it to the master repository
        - message : commit message
        """
        call(['hg', 'qrefresh'])
        call(['hg', 'qcommit', '-m', message])
        if self.network:
            self._patch_command(*['hg', 'push', '--mq'])

    def pull(self, repo=None, mq=True):
        """
        pull from the root repository
        if mq is true, update the patch queue, if versioned
        """

        print "Pulling %s" % self.root
        # check for outstanding changes
        output = self._status(self.root)
        if output:
            print "Outstanding changes:"
            print output
            raise AssertionError

        applied, unapplied = self._series()
        self._call('qpop', '--all')
        self._call(*(['pull'] + (repo and [repo] or [])))

        if self._versioned():
            # pull queue repo
            print "Pulling %s" % self.directory()
            if self.incoming():
                print >> sys.stderr, "Incoming changes"

            output = self._status(self.directory())
            if output:
                print >> sys.stderr, "Cannot pull %s; outstanding changes" % (self.directory())
                print output
            else:
                self._call('pull', '--mq')
        for patch in applied:
            self._call('qpush')

    def goto(self, patch):
        """
        go to a specific patch and apply it
        - patch: name of patch to go to
        """
        # TODO
        process = subprocess.Popen(['hg', 'qapplied'], stdout=subprocess.PIPE)
        stdout, stderr = process.communicate()
        applied = [ i.strip() for i in stdout.splitlines()
                    if i ]
        raise NotImplementedError

    def files(self):
        """
        list the files added by the top patch
        """
        # TODO: should only list top-level directories, otherwise it's silly
        process = subprocess.Popen("hg qdiff | grep '^+++ ' | sed 's/+++ b\///'", stdout=subprocess.PIPE, cwd=self._root)
        stdout, stderr = process.communicate()
        return stdout

    def status(self):
        """
        display status
        """
        return '\n'.join([self._call(i).strip() for i in ('root', 'status', 'qseries')])
    st = status

    def directory(self):
        """patch queue directory"""
        if os.path.isdir(self._patch_repo):
            return self._patch_repo

    def incoming(self):
        """are there incoming changes to the patch queue"""
        if not self._versioned():
            return False
        try:
            call([self.binary, 'incoming'], cwd=self.directory())
            return True
        except subprocess.CalledProcessError:
            return False

    ### internals

    def _call(self, *args, **kwargs):
        command = [self.binary] + list(args)
        kwargs.setdefault('cwd', self.root)
        return call(command, **kwargs)

    def _patch_command(self, *command, **kwargs):
        """perform a command in the patch repository"""
        kwargs.setdefault('cwd', self.directory())
        return call(command)

    def _versioned(self):
        """is the patch queue versioned?"""
        return os.path.isdir(os.path.join(self.directory(), '.hg'))

    def _series(self):
        """returns a 2-tuple of applied, unapplied"""
        lines = [line.strip()
                 for line in self._call('qseries').strip().splitlines()]
        applied = []
        unapplied = []
        for line in lines:

            try:
                index, status, name = line.split()
            except:
                print line
                raise

            if status == 'A':
                applied.append(name)
            else:
                assert status == 'U'
                unapplied.append(name)
        return applied, unapplied

    def _status(self, repo):
        """get the status of a repo; if clean, return None, else
        return the output of hg st"""

        output = call([self.binary, 'st'], cwd=repo).strip()
        lines = [line for line in output.splitlines()
                 if not line.startswith('?')]
        if lines:
            return output


def main(args=sys.argv[1:]):
    parser = CommandParser(HQ)
    options, args = parser.parse_args(args)
    parser.invoke(args)

if __name__ == '__main__':
    main()