view bzconsole/api.py @ 58:cb3330bcff99

bzconsole/api.py
author Jeff Hammel <jhammel@mozilla.com>
date Fri, 27 Sep 2013 11:40:23 -0700
parents baecb5a6f05b
children 1ccb1bb4e641
line wrap: on
line source

"""
console API to bugzilla
"""

import base64
import httplib
import json
import os
import patch
import subprocess
import sys
import urllib
import urllib2

from StringIO import StringIO
from utils import tmpbuffer


class BZapi(object):
    """API to bugzilla"""

    # currently there is only one cache for configuration
    # TODO: should be one per server
    config_cache = '.bzconfiguration'

    def __init__(self,
                 server='https://api-dev.bugzilla.mozilla.org/latest',
                 base_url='https://bugzilla.mozilla.org',
                 refresh=False,
                 print_request=None,
                 username=None,
                 password=None):
        """
        - refresh : refresh the (cached) configuration
        - print_request : log any request information to a file
        """
        self.server = server
        self.base_url = base_url.rstrip('/')
        self.refresh = refresh
        self.print_request = print_request
        self.username = username
        self.password = password

    def products(self, classification=None):
        """list bugzilla products"""
        configuration = self.configuration()
        if classification:
            products = [i for i in configuration['product'] if configuration['product'][i]['classification'] == 'Components']
            return sorted(products)
        else:
            return sorted(configuration['product'].keys())

    def components(self, product):
        """list bugzilla components for a particular product"""
        configuration = self.configuration()
        assert product in configuration['product'], 'Product %s not found' % product
        return sorted(configuration['product'][product]['component'].keys())

    def _unique_components(self):
        """returns a dict of unique component, product"""
        retval = {}
        dupe = set()
        for product in self.products():
            for component in self.components(product):
                if component in retval:
                    dupe.add(component)
                else:
                    retval[component] = product
        for d in dupe:
            del retval[d]
        return retval, dupe

    def users(self, match):
        """returns users matching a search string"""
        assert self.username and self.password, "Must be authenticated"
        return self._request('/user?match=%s' % match)

    def bug(self, number):
        """display a bug"""
        number = int(number)
        return self._request('/bug/%d' % number)

    def comments(self, bug, comment=None):
        """
        display a bug's comments
        - comment: comment # to display
        """

        bug = int(bug)
        if comment:
            if isinstance(comment, basestring):
                comment = [comment]
            comment = [int(c) for c in comment]
            raise NotImplementedError("TODO")
        else:
            return self._request('/bug/%d/comment' % bug)['comments']

    def log(self, bug, reverse=True, lines=2, limit=None):
        """brief comment log"""
        bug = int(bug)
        comments = list(enumerate(self.comments(bug)))
        if reverse:
            comments.reverse()
        if limit is not None:
            comments = comments[:limit]
        template = "%(url)s\n%(author)s %(datetime)s \n%(text)s"
        output = []
        for index, comment in comments:
            text = '\n'.join([line.strip() if len(line) < 79 else line[:79] + '...'
                              for line in comment['text'].splitlines()[:lines]
                              if line.strip()])
            vals = dict(url='%s/show_bug.cgi?id=%d#c%d' % (self.base_url, bug, index),
                        datetime=comment['creation_time'],
                        author=comment['creator']['name'],
                        text=text)
            output.append(template % vals)

        return '\n\n'.join(output)

    def top(self, bug, lines=2, limit=4):
        """latest comments"""
        return self.log(bug, lines=lines, limit=limit, reverse=True)

    def summary(self, number):
        """
        display bug summary:
        'bug 123456 - dancing is forbidden now'
        """
        number = int(number)
        bug = self._request('/bug/%d' % number)
        return '%d - %s' % (number, bug['summary'])

    def new(self, component, title, product=None,
            version=None, description=None, whiteboard=(), cc=(),
            blocks=(), depends_on=(), see_also=()):
        """file a new bug. username and password must be set"""

        # sanity check
        if product:
            assert product in self.products(), "Product not found"
            assert component in self.components(product), "Component '%s' not found in product '%s'" % (component, product)
        else:
            unique, dupe = self._unique_components()
            assert component in unique, 'Unique component not found: %s' % component
            product = unique[component]
        assert title, 'Must provide a bug summary'
        assert self.username and self.password, "Must be authenticated"

        # infer version if not given
        versions = set(self._configuration['product'][product]['version'])
        if version is None:
            if len(versions) == 1:
                version = list(versions)[0]
            else:
                default_versions = ('unspecified', 'Trunk')
                for ver in default_versions:
                    if ver in versions:
                        version = ver
                        break
        assert version in versions, 'Version not found (Available versions for product "%s": %s)' % (product, versions)

        # create the needed data structure
        request = dict(product=product, component=component,
                       summary=title, version=version,
                       op_sys='All', platform='All',)

        # add CC, if given
        if cc:
            if isinstance(cc, basestring):
                cc=[cc]
            users = []
            for match in cc:
                user = self.users(match)['users']
                assert len(user) == 1, 'Non-unique user: %s' % match
                users.append(user[0])
            request['cc'] = users

        # add blocks, if given:
        if blocks:
            blocks = [int(i) for i in blocks]
            request['blocks'] = blocks

        # add depends_on, if given
        if depends_on:
            depends_on = [int(i) for i in depends_on]
            request['depends_on'] = depends_on

        if see_also:
            if isinstance(see_also, string):
                see_also = [see_also]
            request['see_also'] = list(see_also)

        # get the bug description
        if os.path.exists(description):
            description = file(description).read()
            description = tmpbuffer(contents=description)
        if not description:
            description = tmpbuffer()
        assert description, "Must provide a non-empty description"
        request['comments'] = [self._comment(description)]

        # add whiteboard, if given
        if whiteboard:
            if isinstance(whiteboard, basestring):
                whiteboard=[whiteboard]
            whiteboard = ''.join(['[%s]' % i for i in whiteboard])
            request['whiteboard'] = whiteboard

        # POST the request
        try:
            results = self._request('/bug', request)
        except Exception, e:
            raise

        # return the URL
        return results['ref']

    def attach(self, bug, attachment, description=None, reviewer=None, comment=None):
        """
        add an attachment to a bug

        - bug: bug number to attach to
        - attachment: file or URL of attachment
        - reviewer: flag for review (r?)
        - comment: add this comment to the bug
        """
        # see also:
        # https://github.com/toolness/pybugzilla/blob/master/bzpatch.py

        # TODO: infer bug # from attachment path if possible

        assert self.username and self.password, "Must be authenticated"

        # read contents
        if '://' in attachment:
            # URL
            basename = attachment.rstrip('/').rsplit('/', 1)[-1]
            contents = urllib2.urlopen(attachment).read()
        else:
            # file path
            basename  = os.path.basename(attachment)
            contents = file(attachment).read()

        is_patch = bool(patch.fromstring(contents))
        if is_patch:
            content_type = 'text/plain'
        else:
            # TODO: better content_type
            content_type = 'text/plain'
        if not description:
            description = basename

        # patch flags
        flags = []
        if reviewer:
            # from
            # https://github.com/toolness/pybugzilla/blob/master/bzpatch.py#L177
            flags.append({'name': 'review',
                          'requestee': {'name': reviewer},
                          'status': '?',
                          'type_id': 4 # yay for magic numbers! :/
                          })

        # create attachment data structure
        # https://wiki.mozilla.org/Bugzilla:REST_API:Objects#Attachment
        contents = contents.encode('base64')
        attachment= {'content_type': content_type,
                     'data': contents,
                     'description': description,
                     'encoding': 'base64',
                     'file_name': basename,
                     'flags': flags,
                     'is_patch': is_patch,
                     'is_private': False,
                     'size': len(contents)
                     }
        if comment:
            attachment['comments'] = [self._comment(comment)]

        return self._request('/bug/%s/attachment' % bug, attachment)

    def configuration(self):
        """bugzilla configuration"""

        if not hasattr(self, '_configuration'):
            config_cache = os.path.join(os.environ['HOME'], self.config_cache)
            if not self.refresh:
                try:
                    self._configuration = json.loads(file(config_cache).read())
                except:
                    pass
            if not getattr(self, '_configuration', None):
                self._configuration = self._request('/configuration')
                if not self.print_request:
                    f = file(config_cache, 'w')
                    print >> f, json.dumps(self._configuration)
            self.refresh = False
        return self._configuration

    ### internal methods

    def _comment(self, text):
        retval = {'is_private': False, 'text': text}
        return retval

    def _request(self, path, data=None):
        url = self.server + path
        query = {}
        if self.username:
            query['username'] = self.username
        if self.password:
            query['password'] = self.password
        if query:
            query = urllib.urlencode(query)
            joiner = '?' in url and '&' or '?'
            url += joiner + query
        headers = {'Accept': 'application/json',
                   'Content-Type': 'application/json'}
        if data:
            data = json.dumps(data)
        req = urllib2.Request(url, data, headers)

        # print out the request
        # from http://stackoverflow.com/questions/603856/how-do-you-get-default-headers-in-a-urllib2-request
        if self.print_request:

            f = file(self.print_request, 'a')
            class MyHTTPConnection(httplib.HTTPConnection):
                def send(self, s):
                    print >> f, s  # or save them, or whatever!
                    httplib.HTTPConnection.send(self, s)

            class MyHTTPSConnection(httplib.HTTPSConnection):
                def send(self, s):
                    print >> f, s
                    httplib.HTTPSConnection.send(self, s)

            class MyHTTPHandler(urllib2.HTTPHandler):
                def http_open(self, req):
                    return self.do_open(MyHTTPConnection, req)
            class MyHTTPSHandler(urllib2.HTTPSHandler):
                def https_open(self, req):
                    return self.do_open(MyHTTPSConnection, req)

            if self.server.startswith('https://'):
                opener = urllib2.build_opener(MyHTTPSHandler)
            else:
                opener = urllib2.build_opener(MyHTTPHandler)
            opener.open(req)
            return

        try:
            response = urllib2.urlopen(req)
        except Exception, e:
            print e
            if data:
                print data
            import pdb; pdb.set_trace()
        the_page = response.read()
        return json.loads(the_page)