Mercurial > hg > bzconsole
changeset 20:c819c8388ad1
* separate api class
* read from ~/.bz dotfile
author | Jeff Hammel <jhammel@mozilla.com> |
---|---|
date | Thu, 09 Dec 2010 15:16:32 -0800 |
parents | 9090fe0cdb72 |
children | e04729acf5be |
files | bzconsole/api.py bzconsole/command.py bzconsole/main.py |
diffstat | 3 files changed, 235 insertions(+), 176 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bzconsole/api.py Thu Dec 09 15:16:32 2010 -0800 @@ -0,0 +1,179 @@ +""" +console API to bugzilla +""" + +import httplib +import json +import os +import subprocess +import sys +import urllib +import urllib2 + +from StringIO import StringIO + +from utils import tmpbuffer + + +class BZapi(object): + """ + console API to bugzilla + """ + + config_cache = '.bzconfiguration' + # XXX currently this is hard coded for one server + + def __init__(self, + server='https://api-dev.bugzilla.mozilla.org/latest', + refresh=False, + print_request=None, + username=None, + password=None): + """ + - refresh : refresh the (cached) configuration + - print_request : log any request information to a file + """ + self.server = server + self.refresh = refresh + self.print_request = print_request + self.username = username + self.password = password + + def products(self, classification=None): + """list bugzilla products""" + configuration = self.configuration() + if classification: + products = [i for i in configuration['product'] if configuration['product'][i]['classification'] == 'Components'] + return sorted(products) + else: + return sorted(configuration['product'].keys()) + + def components(self, product): + """list bugzilla components for a particular product""" + configuration = self.configuration() + assert product in configuration['product'], 'Product %s not found' % product + return sorted(configuration['product'][product]['component'].keys()) + + def _unique_components(self): + """returns a dict of unique component, product""" + retval = {} + dupe = set() + for product in self.products(): + for component in self.components(product): + if component in retval: + dupe.add(component) + else: + retval[component] = product + for d in dupe: + del retval[d] + return retval, dupe + + def new(self, product, component, title, version=None, description=None): + """file a new bug. username and password must be set""" + + # sanity check + assert product in self.products(), "Product not found" + assert component in self.components(product), "Component not found" + assert title, 'Must provide a bug summary' + assert self.username and self.password, "Must be authenticated" + + # infer version if not given + if version is None: + version = 'unspecified' + assert version in self._configuration['product'][product]['version'], 'Version not found' + + # get the bug description + if not description: + description = tmpbuffer() + assert description, "Must provide a non-empty description" + + # create the needed data structure + request = dict(product=product, component=component, + summary=title, version=version, + comments=[self._comment(description)], + op_sys='All', platform='All',) + + # POST the request + try: + results = self._request('/bug', request) + except Exception, e: + raise + + # return the URL + return results['ref'] + + def configuration(self): + """bugzilla configuration""" + + if not hasattr(self, '_configuration'): + config_cache = os.path.join(os.environ['HOME'], self.config_cache) + if not self.refresh: + try: + self._configuration = json.loads(file(config_cache).read()) + except: + pass + if not getattr(self, '_configuration', None): + self._configuration = self._request('/configuration') + if not self.print_request: + f = file(config_cache, 'w') + print >> f, json.dumps(self._configuration) + self.refresh = False + return self._configuration + + ### internal methods + + def _comment(self, text): + retval = {'is_private': False, 'text': text} + return retval + + def _request(self, path, data=None): + url = self.server + path + query = {} + if self.username: + query['username'] = self.username + if self.password: + query['password'] = self.password + if query: + query = urllib.urlencode(query) + url += '?' + query + headers = {'Accept': 'application/json', + 'Content-Type': 'application/json'} + if data: + data = json.dumps(data) + req = urllib2.Request(url, data, headers) + + # print out the request + # from http://stackoverflow.com/questions/603856/how-do-you-get-default-headers-in-a-urllib2-request + if self.print_request: + + f = file(self.print_request, 'a') + class MyHTTPConnection(httplib.HTTPConnection): + def send(self, s): + print >> f, s # or save them, or whatever! + httplib.HTTPConnection.send(self, s) + + class MyHTTPSConnection(httplib.HTTPSConnection): + def send(self, s): + print >> f, s + httplib.HTTPSConnection.send(self, s) + + class MyHTTPHandler(urllib2.HTTPHandler): + def http_open(self, req): + return self.do_open(MyHTTPConnection, req) + class MyHTTPSHandler(urllib2.HTTPSHandler): + def https_open(self, req): + return self.do_open(MyHTTPSConnection, req) + + if self.server.startswith('https://'): + opener = urllib2.build_opener(MyHTTPSHandler) + else: + opener = urllib2.build_opener(MyHTTPHandler) + opener.open(req) + return + + try: + response = urllib2.urlopen(req) + except Exception, e: + import pdb; pdb.set_trace() + the_page = response.read() + return json.loads(the_page)
--- a/bzconsole/command.py Thu Dec 09 13:51:46 2010 -0800 +++ b/bzconsole/command.py Thu Dec 09 15:16:32 2010 -0800 @@ -4,9 +4,14 @@ import inspect import json +import os import sys from optparse import OptionParser from pprint import pprint + +class Undefined(object): + def __init__(self, default): + self.default=default class CommandParser(OptionParser): # TODO: add `help` command @@ -31,6 +36,10 @@ self.disable_interspersed_args() + def add_option(self, *args, **kwargs): + kwargs['default'] = Undefined(kwargs.get('default')) + OptionParser.add_option(self, *args, **kwargs) + def print_help(self): OptionParser.print_help(self) @@ -84,7 +93,28 @@ name, args = self.parse(args) # setup - _object = self._class(**self.options.__dict__) + options = {} + dotfile = os.path.join(os.environ['HOME'], '.' + self.get_prog_name()) + if os.path.exists(dotfile): + f = file(dotfile) + for line in f.readlines(): + line = line.strip() + if not line: + continue + if ':' in line: + key, value = [i.strip() + for i in line.split(':', 1)] + options[key] = value + else: + print >> sys.stderr, "Bad option line: " + line + for key, value in self.options.__dict__.items(): + if isinstance(value, Undefined): + if key in options: + continue + options[key] = value.default + else: + options[key] = value.default + _object = self._class(**options) # command specific args command = self.commands[name]
--- a/bzconsole/main.py Thu Dec 09 13:51:46 2010 -0800 +++ b/bzconsole/main.py Thu Dec 09 15:16:32 2010 -0800 @@ -1,187 +1,37 @@ #!/usr/bin/env python -""" -console API to bugzilla -""" -import httplib -import json -import os -import subprocess import sys -import urllib -import urllib2 - +from api import BZapi +from command import CommandParser from StringIO import StringIO -from command import CommandParser -from utils import tmpbuffer - - -class BZapi(object): - """ - console API to bugzilla - """ - - config_cache = '.bzconfiguration' - # XXX currently this is hard coded for one server - - def __init__(self, - server='https://api-dev.bugzilla.mozilla.org/latest', - refresh=False, - print_request=None, - username=None, - password=None): - """ - - refresh : refresh the (cached) configuration - - print_request : log any request information to a file - """ - self.server = server - self.refresh = refresh - self.print_request = print_request - self.username = username - self.password = password - - def products(self, classification=None): - """list bugzilla products""" - configuration = self.configuration() - if classification: - products = [i for i in configuration['product'] if configuration['product'][i]['classification'] == 'Components'] - return sorted(products) - else: - return sorted(configuration['product'].keys()) +class BZcli(BZapi): + """command line interface front-end for the API class""" - def components(self, product): - """list bugzilla components for a particular product""" - configuration = self.configuration() - assert product in configuration['product'], 'Product %s not found' % product - return sorted(configuration['product'][product]['component'].keys()) - - def _unique_components(self): - """returns a dict of unique component, product""" - retval = {} - dupe = set() - for product in self.products(): - for component in self.components(product): - if component in retval: - dupe.add(component) - else: - retval[component] = product - for d in dupe: - del retval[d] - return retval - - def new(self, product, component, title, version=None, description=None): - """file a new bug. username and password must be set""" - - # sanity check - assert product in self.products(), "Product not found" - assert component in self.components(product), "Component not found" - assert title, 'Must provide a bug summary' - assert self.username and self.password, "Must be authenticated" - - # infer version if not given - if version is None: - version = 'unspecified' - assert version in self._configuration['product'][product]['version'], 'Version not found' - - # get the bug description - if not description: - description = tmpbuffer() - assert description, "Must provide a non-empty description" - - # create the needed data structure - request = dict(product=product, component=component, - summary=title, version=version, - comments=[self._comment(description)], - op_sys='All', platform='All',) + def unique(self, component=None): + """display unique and duplicated components""" + unique, dupe = self._unique_components() - # POST the request - try: - results = self._request('/bug', request) - except Exception, e: - raise - - # return the URL - return results['ref'] - - def configuration(self): - """bugzilla configuration""" - - if not hasattr(self, '_configuration'): - config_cache = os.path.join(os.environ['HOME'], self.config_cache) - if not self.refresh: - try: - self._configuration = json.loads(file(config_cache).read()) - except: - pass - if not getattr(self, '_configuration', None): - self._configuration = self._request('/configuration') - if not self.print_request: - f = file(config_cache, 'w') - print >> f, json.dumps(self._configuration) - self.refresh = False - return self._configuration - - ### internal methods - - def _comment(self, text): - retval = {'is_private': False, 'text': text} - return retval - - def _request(self, path, data=None): - url = self.server + path - query = {} - if self.username: - query['username'] = self.username - if self.password: - query['password'] = self.password - if query: - query = urllib.urlencode(query) - url += '?' + query - headers = {'Accept': 'application/json', - 'Content-Type': 'application/json'} - if data: - data = json.dumps(data) - req = urllib2.Request(url, data, headers) - - # print out the request - # from http://stackoverflow.com/questions/603856/how-do-you-get-default-headers-in-a-urllib2-request - if self.print_request: - - f = file(self.print_request, 'a') - class MyHTTPConnection(httplib.HTTPConnection): - def send(self, s): - print >> f, s # or save them, or whatever! - httplib.HTTPConnection.send(self, s) - - class MyHTTPSConnection(httplib.HTTPSConnection): - def send(self, s): - print >> f, s - httplib.HTTPSConnection.send(self, s) - - class MyHTTPHandler(urllib2.HTTPHandler): - def http_open(self, req): - return self.do_open(MyHTTPConnection, req) - class MyHTTPSHandler(urllib2.HTTPSHandler): - def https_open(self, req): - return self.do_open(MyHTTPSConnection, req) - - if self.server.startswith('https://'): - opener = urllib2.build_opener(MyHTTPSHandler) - else: - opener = urllib2.build_opener(MyHTTPHandler) - opener.open(req) - return - - try: - response = urllib2.urlopen(req) - except Exception, e: - import pdb; pdb.set_trace() - the_page = response.read() - return json.loads(the_page) + if component: + # determine if the component is unique + if component in unique: + return '%s: %s' % (component, unique[component]) + if component in dupe: + return '>>>DUPLICATE<<<' + return 'Component "%s" not found' % component + + buffer = StringIO() + print >> buffer, 'Unique:' + for key in sorted(unique.keys()): + print >> buffer, ' %s: %s' % (key, unique[key]) + print >> buffer + print >> buffer, 'Duplicate:' + for value in sorted(dupe): + print >> buffer, ' %s' % value + return buffer.getvalue() def main(args=sys.argv[1:]): - parser = CommandParser(BZapi) + parser = CommandParser(BZcli) parser.invoke(args) if __name__ == '__main__':