Mercurial > hg > bzconsole
view bzconsole/api.py @ 41:5d09b4e9a21b
cleanup
author | Jeff Hammel <jhammel@mozilla.com> |
---|---|
date | Sun, 16 Dec 2012 20:04:03 -0800 |
parents | 99cdb343273f |
children | 678b82fedb40 |
line wrap: on
line source
""" console API to bugzilla """ import base64 import httplib import json import os import patch import subprocess import sys import urllib import urllib2 from StringIO import StringIO from utils import tmpbuffer class BZapi(object): """ console API to bugzilla """ # currently there is only one cache for configuration config_cache = '.bzconfiguration' def __init__(self, server='https://api-dev.bugzilla.mozilla.org/latest', refresh=False, print_request=None, username=None, password=None): """ - refresh : refresh the (cached) configuration - print_request : log any request information to a file """ self.server = server self.refresh = refresh self.print_request = print_request self.username = username self.password = password def products(self, classification=None): """list bugzilla products""" configuration = self.configuration() if classification: products = [i for i in configuration['product'] if configuration['product'][i]['classification'] == 'Components'] return sorted(products) else: return sorted(configuration['product'].keys()) def components(self, product): """list bugzilla components for a particular product""" configuration = self.configuration() assert product in configuration['product'], 'Product %s not found' % product return sorted(configuration['product'][product]['component'].keys()) def _unique_components(self): """returns a dict of unique component, product""" retval = {} dupe = set() for product in self.products(): for component in self.components(product): if component in retval: dupe.add(component) else: retval[component] = product for d in dupe: del retval[d] return retval, dupe def users(self, match): """returns users matching a search string""" assert self.username and self.password, "Must be authenticated" return self._request('/user?match=%s' % match) def bug(self, number): """display a bug""" number = int(number) return self._request('/bug/%d' % number) def new(self, component, title, product=None, version=None, description=None, whiteboard=(), cc=(), blocks=(), depends_on=()): """file a new bug. username and password must be set""" # sanity check if product: assert product in self.products(), "Product not found" assert component in self.components(product), "Component not found" else: unique, dupe = self._unique_components() assert component in unique, 'Unique component not found: %s' % component product = unique[component] assert title, 'Must provide a bug summary' assert self.username and self.password, "Must be authenticated" # infer version if not given if version is None: versions = self._configuration['product'][product]['version'] if len(versions) == 1: version = versions[0] else: default_versions = ('unspecified', 'Trunk') for ver in default_versions: version = ver if version in self._configuration['product'][product]['version']: break assert version in self._configuration['product'][product]['version'], 'Version not found' # create the needed data structure request = dict(product=product, component=component, summary=title, version=version, op_sys='All', platform='All',) # add CC, if given if cc: if isinstance(cc, basestring): cc=[cc] users = [] for match in cc: user = self.users(match)['users'] assert len(user) == 1, 'Non-unique user: %s' % match users.append(user[0]) request['cc'] = users # add blocks, if given: if blocks: blocks = [int(i) for i in blocks] request['blocks'] = blocks # add depends_on, if given if depends_on: depends_on = [int(i) for i in depends_on] request['depends_on'] = depends_on # get the bug description if not description: description = tmpbuffer() assert description, "Must provide a non-empty description" request['comments'] = [self._comment(description)] # add whiteboard, if given if whiteboard: if isinstance(whiteboard, basestring): whiteboard=[whiteboard] whiteboard = ''.join(['[%s]' % i for i in whiteboard]) request['whiteboard'] = whiteboard # POST the request try: results = self._request('/bug', request) except Exception, e: raise # return the URL return results['ref'] def attach(self, bug, attachment, description=None, reviewer=None, comment=None): """ add an attachment to a bug - bug: bug number to attach to - attachment: file or URL of attachment - reviewer: flag for review (r?) - comment: add this comment to the bug """ if not description: # fairly hacky :/ description = attachment # read contents if '://' in attachment: # URL basename = attachment.rstrip('/').rsplit('/', 1)[-1] contents = urllib2.urlopen(attachment).read() else: # file path basename = os.path.basename(attachment) contents = file(attachment).read() is_patch = bool(patch.fromstring(contents)) if is_patch: content_type = 'text/plain' else: # TODO: better content_type content_type = 'text/plain' # patch flags flags = [] if reviewer: # from # https://github.com/toolness/pybugzilla/blob/master/bzpatch.py#L177 flags.append({'name': 'review', 'requestee': {'name': reviewer}, 'status': '?', 'type_id': 4 # yay for magic numbers! :/ }) # create attachment data structure # https://wiki.mozilla.org/Bugzilla:REST_API:Objects#Attachment contents = contents.encode('base64') attachment= {'content_type': content_type, 'data': contents, 'description': description, 'encoding': 'base64', 'file_name': basename, 'flags': flags, 'is_patch': is_patch, 'is_private': False, 'size': len(contents) } return self._request('/bug/%s/attachment' % bug, attachment) def configuration(self): """bugzilla configuration""" if not hasattr(self, '_configuration'): config_cache = os.path.join(os.environ['HOME'], self.config_cache) if not self.refresh: try: self._configuration = json.loads(file(config_cache).read()) except: pass if not getattr(self, '_configuration', None): self._configuration = self._request('/configuration') if not self.print_request: f = file(config_cache, 'w') print >> f, json.dumps(self._configuration) self.refresh = False return self._configuration ### internal methods def _comment(self, text): retval = {'is_private': False, 'text': text} return retval def _request(self, path, data=None): url = self.server + path query = {} if self.username: query['username'] = self.username if self.password: query['password'] = self.password if query: query = urllib.urlencode(query) joiner = '?' in url and '&' or '?' url += joiner + query headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} if data: data = json.dumps(data) req = urllib2.Request(url, data, headers) # print out the request # from http://stackoverflow.com/questions/603856/how-do-you-get-default-headers-in-a-urllib2-request if self.print_request: f = file(self.print_request, 'a') class MyHTTPConnection(httplib.HTTPConnection): def send(self, s): print >> f, s # or save them, or whatever! httplib.HTTPConnection.send(self, s) class MyHTTPSConnection(httplib.HTTPSConnection): def send(self, s): print >> f, s httplib.HTTPSConnection.send(self, s) class MyHTTPHandler(urllib2.HTTPHandler): def http_open(self, req): return self.do_open(MyHTTPConnection, req) class MyHTTPSHandler(urllib2.HTTPSHandler): def https_open(self, req): return self.do_open(MyHTTPSConnection, req) if self.server.startswith('https://'): opener = urllib2.build_opener(MyHTTPSHandler) else: opener = urllib2.build_opener(MyHTTPHandler) opener.open(req) return try: response = urllib2.urlopen(req) except Exception, e: print e if data: print data import pdb; pdb.set_trace() the_page = response.read() return json.loads(the_page)