view bzconsole/ @ 32:e843b7f1f400

fix up depends_on
author Jeff Hammel <>
date Tue, 04 Dec 2012 13:47:50 -0800
parents f6513032ad28
children 1ce13b2b54a4
line wrap: on
line source

console API to bugzilla

import httplib
import json
import os
import subprocess
import sys
import urllib
import urllib2

from StringIO import StringIO

from utils import tmpbuffer

class BZapi(object):
    console API to bugzilla

    config_cache = '.bzconfiguration'
    # XXX currently this is hard coded for one server

    def __init__(self,
        - refresh : refresh the (cached) configuration
        - print_request : log any request information to a file
        self.server = server
        self.refresh = refresh
        self.print_request = print_request
        self.username = username
        self.password = password

    def products(self, classification=None):
        """list bugzilla products"""
        configuration = self.configuration()
        if classification:
            products = [i for i in configuration['product'] if configuration['product'][i]['classification'] == 'Components']
            return sorted(products)
            return sorted(configuration['product'].keys())

    def components(self, product):
        """list bugzilla components for a particular product"""
        configuration = self.configuration()
        assert product in configuration['product'], 'Product %s not found' % product
        return sorted(configuration['product'][product]['component'].keys())

    def _unique_components(self):
        """returns a dict of unique component, product"""
        retval = {}
        dupe = set()
        for product in self.products():
            for component in self.components(product):
                if component in retval:
                    retval[component] = product
        for d in dupe:
            del retval[d]
        return retval, dupe

    def users(self, match):
        """returns users matching a search string"""
        assert self.username and self.password, "Must be authenticated"
        return self._request('/user?match=%s' % match)

    def bug(self, number):
        """display a bug"""
        number = int(number)
        return self._request('/bug/%d' % number)

    def new(self, component, title, product=None,
            version=None, description=None, whiteboard=(), cc=(),
            blocks=(), depends_on=()):
        """file a new bug. username and password must be set"""

        # sanity check
        if product:
            assert product in self.products(), "Product not found"
            assert component in self.components(product), "Component not found"
            unique, dupe = self._unique_components()
            assert component in unique, 'Unique component not found: %s' % component
            product = unique[component]
        assert title, 'Must provide a bug summary'
        assert self.username and self.password, "Must be authenticated"

        # infer version if not given
        if version is None:
            versions = self._configuration['product'][product]['version']
            if len(versions) == 1:
                version = versions[0]
                default_versions = ('unspecified', 'Trunk')
                for ver in default_versions:
                    version = ver
                    if version in self._configuration['product'][product]['version']:
        assert version in self._configuration['product'][product]['version'], 'Version not found'

        # create the needed data structure
        request = dict(product=product, component=component,
                       summary=title, version=version,
                       op_sys='All', platform='All',)

        # add CC, if given
        if cc:
            if isinstance(cc, basestring):
            users = []
            for match in cc:
                user = self.users(match)['users']
                assert len(user) == 1, 'Non-unique user: %s' % match
            request['cc'] = users

        # add blocks, if given:
        if blocks:
            blocks = [int(i) for i in blocks]
            request['blocks'] = blocks

        # add depends_on, if given
        if depends_on:
            depends_on = [int(i) for i in depends_on]
            request['depends_on'] = depends_on

        # get the bug description
        if not description:
            description = tmpbuffer()
        assert description, "Must provide a non-empty description"
        request['comments'] = [self._comment(description)]

        # add whiteboard, if given
        if whiteboard:
            if isinstance(whiteboard, basestring):
            whiteboard = ''.join(['[%s]' % i for i in whiteboard])
            request['whiteboard'] = whiteboard

        # POST the request
            results = self._request('/bug', request)
        except Exception, e:

        # return the URL
        return results['ref']

    def configuration(self):
        """bugzilla configuration"""

        if not hasattr(self, '_configuration'):
            config_cache = os.path.join(os.environ['HOME'], self.config_cache)
            if not self.refresh:
                    self._configuration = json.loads(file(config_cache).read())
            if not getattr(self, '_configuration', None):
                self._configuration = self._request('/configuration')
                if not self.print_request:
                    f = file(config_cache, 'w')
                    print >> f, json.dumps(self._configuration)
            self.refresh = False
        return self._configuration

    ### internal methods

    def _comment(self, text):
        retval = {'is_private': False, 'text': text}
        return retval

    def _request(self, path, data=None):
        url = self.server + path
        query = {}
        if self.username:
            query['username'] = self.username
        if self.password:
            query['password'] = self.password
        if query:
            query = urllib.urlencode(query)
            joiner = '?' in url and '&' or '?'
            url += joiner + query
        headers = {'Accept': 'application/json',
                   'Content-Type': 'application/json'}
        if data:
            data = json.dumps(data)
        req = urllib2.Request(url, data, headers)

        # print out the request
        # from
        if self.print_request:

            f = file(self.print_request, 'a')
            class MyHTTPConnection(httplib.HTTPConnection):
                def send(self, s):
                    print >> f, s  # or save them, or whatever!
                    httplib.HTTPConnection.send(self, s)

            class MyHTTPSConnection(httplib.HTTPSConnection):
                def send(self, s):
                    print >> f, s
                    httplib.HTTPSConnection.send(self, s)

            class MyHTTPHandler(urllib2.HTTPHandler):
                def http_open(self, req):
                    return self.do_open(MyHTTPConnection, req)
            class MyHTTPSHandler(urllib2.HTTPSHandler):
                def https_open(self, req):
                    return self.do_open(MyHTTPSConnection, req)

            if self.server.startswith('https://'):
                opener = urllib2.build_opener(MyHTTPSHandler)
                opener = urllib2.build_opener(MyHTTPHandler)

            response = urllib2.urlopen(req)
        except Exception, e:
            print e
            if data:
                print data
            import pdb; pdb.set_trace()
        the_page =
        return json.loads(the_page)