Mercurial > hg > bzconsole
view bzconsole/api.py @ 55:a5b7ea791f64
bzconsole/api.py
author | Jeff Hammel <jhammel@mozilla.com> |
---|---|
date | Thu, 19 Sep 2013 13:04:03 -0700 |
parents | 39325717218d |
children | baecb5a6f05b |
line wrap: on
line source
""" console API to bugzilla """ import base64 import httplib import json import os import patch import subprocess import sys import urllib import urllib2 from StringIO import StringIO from utils import tmpbuffer class BZapi(object): """API to bugzilla""" # currently there is only one cache for configuration # TODO: should be one per server config_cache = '.bzconfiguration' def __init__(self, server='https://api-dev.bugzilla.mozilla.org/latest', base_url='https://bugzilla.mozilla.org', refresh=False, print_request=None, username=None, password=None): """ - refresh : refresh the (cached) configuration - print_request : log any request information to a file """ self.server = server self.base_url = base_url.rstrip('/') self.refresh = refresh self.print_request = print_request self.username = username self.password = password def products(self, classification=None): """list bugzilla products""" configuration = self.configuration() if classification: products = [i for i in configuration['product'] if configuration['product'][i]['classification'] == 'Components'] return sorted(products) else: return sorted(configuration['product'].keys()) def components(self, product): """list bugzilla components for a particular product""" configuration = self.configuration() assert product in configuration['product'], 'Product %s not found' % product return sorted(configuration['product'][product]['component'].keys()) def _unique_components(self): """returns a dict of unique component, product""" retval = {} dupe = set() for product in self.products(): for component in self.components(product): if component in retval: dupe.add(component) else: retval[component] = product for d in dupe: del retval[d] return retval, dupe def users(self, match): """returns users matching a search string""" assert self.username and self.password, "Must be authenticated" return self._request('/user?match=%s' % match) def bug(self, number): """display a bug""" number = int(number) return self._request('/bug/%d' % number) def comments(self, bug, comment=None): """ display a bug's comments - comment: comment # to display """ bug = int(bug) if comment: if isinstance(comment, basestring): comment = [comment] comment = [int(c) for c in comment] raise NotImplementedError("TODO") else: return self._request('/bug/%d/comment' % bug)['comments'] def log(self, bug, reverse=True, lines=2, limit=None): """brief comment log""" bug = int(bug) comments = list(enumerate(self.comments(bug))) if reverse: comments.reverse() if limit is not None: comments = comments[:limit] template = "%(url)s\n%(author)s %(datetime)s \n%(text)s" output = [] for index, comment in comments: text = '\n'.join([line.strip() if len(line) < 79 else line[:79] + '...' for line in comment['text'].splitlines()[:lines] if line.strip()]) vals = dict(url='%s/show_bug.cgi?id=%d#c%d' % (self.base_url, bug, index), datetime=comment['creation_time'], author=comment['creator']['name'], text=text) output.append(template % vals) return '\n\n'.join(output) def top(self, bug, lines=2, limit=4): """latest comments""" return self.log(bug, lines=lines, limit=limit, reverse=True) def summary(self, number): """ display bug summary: 'bug 123456 - dancing is forbidden now' """ number = int(number) bug = self._request('/bug/%d' % number) return '%d - %s' % (number, bug['summary']) def new(self, component, title, product=None, version=None, description=None, whiteboard=(), cc=(), blocks=(), depends_on=()): """file a new bug. username and password must be set""" # sanity check if product: assert product in self.products(), "Product not found" assert component in self.components(product), "Component not found" else: unique, dupe = self._unique_components() assert component in unique, 'Unique component not found: %s' % component product = unique[component] assert title, 'Must provide a bug summary' assert self.username and self.password, "Must be authenticated" # infer version if not given versions = set(self._configuration['product'][product]['version']) if version is None: if len(versions) == 1: version = list(versions)[0] else: default_versions = ('unspecified', 'Trunk') for ver in default_versions: if ver in versions: version = ver break assert version in versions, 'Version not found (Available versions for product "%s": %s)' % (product, versions) # create the needed data structure request = dict(product=product, component=component, summary=title, version=version, op_sys='All', platform='All',) # add CC, if given if cc: if isinstance(cc, basestring): cc=[cc] users = [] for match in cc: user = self.users(match)['users'] assert len(user) == 1, 'Non-unique user: %s' % match users.append(user[0]) request['cc'] = users # add blocks, if given: if blocks: blocks = [int(i) for i in blocks] request['blocks'] = blocks # add depends_on, if given if depends_on: depends_on = [int(i) for i in depends_on] request['depends_on'] = depends_on # get the bug description if not description: description = tmpbuffer() assert description, "Must provide a non-empty description" request['comments'] = [self._comment(description)] # add whiteboard, if given if whiteboard: if isinstance(whiteboard, basestring): whiteboard=[whiteboard] whiteboard = ''.join(['[%s]' % i for i in whiteboard]) request['whiteboard'] = whiteboard # POST the request try: results = self._request('/bug', request) except Exception, e: raise # return the URL return results['ref'] def attach(self, bug, attachment, description=None, reviewer=None, comment=None): """ add an attachment to a bug - bug: bug number to attach to - attachment: file or URL of attachment - reviewer: flag for review (r?) - comment: add this comment to the bug """ # see also: # https://github.com/toolness/pybugzilla/blob/master/bzpatch.py # TODO: infer bug # from attachment path if possible assert self.username and self.password, "Must be authenticated" # read contents if '://' in attachment: # URL basename = attachment.rstrip('/').rsplit('/', 1)[-1] contents = urllib2.urlopen(attachment).read() else: # file path basename = os.path.basename(attachment) contents = file(attachment).read() is_patch = bool(patch.fromstring(contents)) if is_patch: content_type = 'text/plain' else: # TODO: better content_type content_type = 'text/plain' if not description: description = basename # patch flags flags = [] if reviewer: # from # https://github.com/toolness/pybugzilla/blob/master/bzpatch.py#L177 flags.append({'name': 'review', 'requestee': {'name': reviewer}, 'status': '?', 'type_id': 4 # yay for magic numbers! :/ }) # create attachment data structure # https://wiki.mozilla.org/Bugzilla:REST_API:Objects#Attachment contents = contents.encode('base64') attachment= {'content_type': content_type, 'data': contents, 'description': description, 'encoding': 'base64', 'file_name': basename, 'flags': flags, 'is_patch': is_patch, 'is_private': False, 'size': len(contents) } if comment: attachment['comments'] = [self._comment(comment)] return self._request('/bug/%s/attachment' % bug, attachment) def configuration(self): """bugzilla configuration""" if not hasattr(self, '_configuration'): config_cache = os.path.join(os.environ['HOME'], self.config_cache) if not self.refresh: try: self._configuration = json.loads(file(config_cache).read()) except: pass if not getattr(self, '_configuration', None): self._configuration = self._request('/configuration') if not self.print_request: f = file(config_cache, 'w') print >> f, json.dumps(self._configuration) self.refresh = False return self._configuration ### internal methods def _comment(self, text): retval = {'is_private': False, 'text': text} return retval def _request(self, path, data=None): url = self.server + path query = {} if self.username: query['username'] = self.username if self.password: query['password'] = self.password if query: query = urllib.urlencode(query) joiner = '?' in url and '&' or '?' url += joiner + query headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} if data: data = json.dumps(data) req = urllib2.Request(url, data, headers) # print out the request # from http://stackoverflow.com/questions/603856/how-do-you-get-default-headers-in-a-urllib2-request if self.print_request: f = file(self.print_request, 'a') class MyHTTPConnection(httplib.HTTPConnection): def send(self, s): print >> f, s # or save them, or whatever! httplib.HTTPConnection.send(self, s) class MyHTTPSConnection(httplib.HTTPSConnection): def send(self, s): print >> f, s httplib.HTTPSConnection.send(self, s) class MyHTTPHandler(urllib2.HTTPHandler): def http_open(self, req): return self.do_open(MyHTTPConnection, req) class MyHTTPSHandler(urllib2.HTTPSHandler): def https_open(self, req): return self.do_open(MyHTTPSConnection, req) if self.server.startswith('https://'): opener = urllib2.build_opener(MyHTTPSHandler) else: opener = urllib2.build_opener(MyHTTPHandler) opener.open(req) return try: response = urllib2.urlopen(req) except Exception, e: print e if data: print data import pdb; pdb.set_trace() the_page = response.read() return json.loads(the_page)