view bzconsole/api.py @ 32:e843b7f1f400

fix up depends_on
author Jeff Hammel <jhammel@mozilla.com>
date Tue, 04 Dec 2012 13:47:50 -0800
parents f6513032ad28
children 1ce13b2b54a4
line wrap: on
line source

"""
console API to bugzilla
"""

import httplib
import json
import os
import subprocess
import sys
import urllib
import urllib2

from StringIO import StringIO

from utils import tmpbuffer


class BZapi(object):
    """
    console API to bugzilla
    """

    config_cache = '.bzconfiguration'
    # XXX currently this is hard coded for one server

    def __init__(self,
                 server='https://api-dev.bugzilla.mozilla.org/latest',
                 refresh=False,
                 print_request=None,
                 username=None,
                 password=None):
        """
        - refresh : refresh the (cached) configuration
        - print_request : log any request information to a file
        """
        self.server = server
        self.refresh = refresh
        self.print_request = print_request
        self.username = username
        self.password = password

    def products(self, classification=None):
        """list bugzilla products"""
        configuration = self.configuration()
        if classification:
            products = [i for i in configuration['product'] if configuration['product'][i]['classification'] == 'Components']
            return sorted(products)
        else:
            return sorted(configuration['product'].keys())

    def components(self, product):
        """list bugzilla components for a particular product"""
        configuration = self.configuration()
        assert product in configuration['product'], 'Product %s not found' % product
        return sorted(configuration['product'][product]['component'].keys())

    def _unique_components(self):
        """returns a dict of unique component, product"""
        retval = {}
        dupe = set()
        for product in self.products():
            for component in self.components(product):
                if component in retval:
                    dupe.add(component)
                else:
                    retval[component] = product
        for d in dupe:
            del retval[d]
        return retval, dupe

    def users(self, match):
        """returns users matching a search string"""
        assert self.username and self.password, "Must be authenticated"
        return self._request('/user?match=%s' % match)

    def bug(self, number):
        """display a bug"""
        number = int(number)
        return self._request('/bug/%d' % number)

    def new(self, component, title, product=None,
            version=None, description=None, whiteboard=(), cc=(),
            blocks=(), depends_on=()):
        """file a new bug. username and password must be set"""

        # sanity check
        if product:
            assert product in self.products(), "Product not found"
            assert component in self.components(product), "Component not found"
        else:
            unique, dupe = self._unique_components()
            assert component in unique, 'Unique component not found: %s' % component
            product = unique[component]
        assert title, 'Must provide a bug summary'
        assert self.username and self.password, "Must be authenticated"

        # infer version if not given
        if version is None:
            versions = self._configuration['product'][product]['version']
            if len(versions) == 1:
                version = versions[0]
            else:
                default_versions = ('unspecified', 'Trunk')
                for ver in default_versions:
                    version = ver
                    if version in self._configuration['product'][product]['version']:
                        break
        assert version in self._configuration['product'][product]['version'], 'Version not found'

        # create the needed data structure
        request = dict(product=product, component=component,
                       summary=title, version=version,
                       op_sys='All', platform='All',)

        # add CC, if given
        if cc:
            if isinstance(cc, basestring):
                cc=[cc]
            users = []
            for match in cc:
                user = self.users(match)['users']
                assert len(user) == 1, 'Non-unique user: %s' % match
                users.append(user[0])
            request['cc'] = users

        # add blocks, if given:
        if blocks:
            blocks = [int(i) for i in blocks]
            request['blocks'] = blocks

        # add depends_on, if given
        if depends_on:
            depends_on = [int(i) for i in depends_on]
            request['depends_on'] = depends_on

        # get the bug description
        if not description:
            description = tmpbuffer()
        assert description, "Must provide a non-empty description"
        request['comments'] = [self._comment(description)]

        # add whiteboard, if given
        if whiteboard:
            if isinstance(whiteboard, basestring):
                whiteboard=[whiteboard]
            whiteboard = ''.join(['[%s]' % i for i in whiteboard])
            request['whiteboard'] = whiteboard

        # POST the request
        try:
            results = self._request('/bug', request)
        except Exception, e:
            raise

        # return the URL
        return results['ref']

    def configuration(self):
        """bugzilla configuration"""

        if not hasattr(self, '_configuration'):
            config_cache = os.path.join(os.environ['HOME'], self.config_cache)
            if not self.refresh:
                try:
                    self._configuration = json.loads(file(config_cache).read())
                except:
                    pass
            if not getattr(self, '_configuration', None):
                self._configuration = self._request('/configuration')
                if not self.print_request:
                    f = file(config_cache, 'w')
                    print >> f, json.dumps(self._configuration)
            self.refresh = False
        return self._configuration

    ### internal methods

    def _comment(self, text):
        retval = {'is_private': False, 'text': text}
        return retval

    def _request(self, path, data=None):
        url = self.server + path
        query = {}
        if self.username:
            query['username'] = self.username
        if self.password:
            query['password'] = self.password
        if query:
            query = urllib.urlencode(query)
            joiner = '?' in url and '&' or '?'
            url += joiner + query
        headers = {'Accept': 'application/json',
                   'Content-Type': 'application/json'}
        if data:
            data = json.dumps(data)
        req = urllib2.Request(url, data, headers)

        # print out the request
        # from http://stackoverflow.com/questions/603856/how-do-you-get-default-headers-in-a-urllib2-request
        if self.print_request:

            f = file(self.print_request, 'a')
            class MyHTTPConnection(httplib.HTTPConnection):
                def send(self, s):
                    print >> f, s  # or save them, or whatever!
                    httplib.HTTPConnection.send(self, s)

            class MyHTTPSConnection(httplib.HTTPSConnection):
                def send(self, s):
                    print >> f, s
                    httplib.HTTPSConnection.send(self, s)

            class MyHTTPHandler(urllib2.HTTPHandler):
                def http_open(self, req):
                    return self.do_open(MyHTTPConnection, req)
            class MyHTTPSHandler(urllib2.HTTPSHandler):
                def https_open(self, req):
                    return self.do_open(MyHTTPSConnection, req)

            if self.server.startswith('https://'):
                opener = urllib2.build_opener(MyHTTPSHandler)
            else:
                opener = urllib2.build_opener(MyHTTPHandler)
            opener.open(req)
            return

        try:
            response = urllib2.urlopen(req)
        except Exception, e:
            print e
            if data:
                print data
            import pdb; pdb.set_trace()
        the_page = response.read()
        return json.loads(the_page)