Mercurial > hg > schema
changeset 0:f7edadebb1de
initial commit
author | Jeff Hammel <jhammel@mozilla.com> |
---|---|
date | Fri, 17 Feb 2012 12:20:51 -0800 |
parents | |
children | 5baa23a8d32f |
files | INSTALL.py README.txt schema/__init__.py schema/main.py schema/model.py schema/sql.py schema/web.py setup.py tests/doctest.txt tests/test.py |
diffstat | 10 files changed, 326 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/INSTALL.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +""" +installation script for schema +playing with schemas +""" + +import os +import sys +import urllib2 +import subprocess +try: + from subprocess import check_call as call +except: + from subprocess import call + +REPO='' +DEST='schema' # name of the virtualenv +VIRTUALENV='https://raw.github.com/pypa/virtualenv/develop/virtualenv.py' + +def which(binary, path=os.environ['PATH']): + dirs = path.split(os.pathsep) + for dir in dirs: + if os.path.isfile(os.path.join(dir, fileName)): + return os.path.join(dir, fileName) + if os.path.isfile(os.path.join(dir, fileName + ".exe")): + return os.path.join(dir, fileName + ".exe") + +def main(args=sys.argv[1:]): + + # create a virtualenv + virtualenv = which('virtualenv') or which('virtualenv.py') + if virtualenv: + call([virtualenv, DEST]) + else: + process = subproces.Popen([sys.executable, '-', DEST], stdin=subprocess.PIPE) + process.communicate(stdin=urllib2.urlopen(VIRTUALENV).read()) + + # create a src directory + src = os.path.join(DEST, 'src') + os.mkdir(src) + + # clone the repository + call(['hg', 'clone', REPO], cwd=src) + +""" +XXX unfinished + +hg clone ${REPO} +cd schema +python setup.py develop +""" +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/README.txt Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,11 @@ +schema +=========== + +playing with schemas + +---- + +Jeff Hammel + + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/__init__.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,2 @@ +# +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/main.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +""" +playing with schemas +""" + +import sys +import optparse + +def main(args=sys.argv[:]): + + # parse command line options + usage = '%prog [options]' + + # description formatter + class PlainDescriptionFormatter(optparse.IndentedHelpFormatter): + def format_description(self, description): + if description: + return description + '\n' + else: + return '' + + parser = optparse.OptionParser(usage=usage, description=__doc__, formatter=PlainDescriptionFormatter()) + options, args = parser.parse_args(args) + +if __name__ == '__main__': + main() +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/model.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,23 @@ +""" +<jmaher:> I would like to view the data more like this: +testrun: + testname: + modifiers: [chrome, responsiveness, etc.] + machine: + name: tools-r3-fed-314 + os: fedora12 + platform: x86 + product: + name: firefox + rev: 31415926535 + branch: mozilla-central +results: + testrun: id + counter: [values] + counter: [values] + test: [values] + test: [values] +""" + +class Model(object): + pass
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/sql.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,79 @@ +import sqlite3 +import sys +import tempfile + +class SQL(object): + converters = {int: 'INT', + str: 'TEXT', + unicode: 'TEXT'} + + def __init__(self, database=None): + self.database = database or tempfile.mktemp() + + def __call__(self, statement, *parameters): + con = None + e = None + try: + con = sqlite3.connect(self.database) + cursor = con.cursor() + cursor.execute(statement, *parameters) + data = cursor.fetchall() + con.commit() + except sqlite3.Error, e: + print >> sys.stderr, "Error %s:" % e.args[0] + if con: + con.rollback() + raise + if con: + con.close() + if e: + raise + return data + + def update(self, table, **where): + pass + + def select(self, table, id, **where): + if id is None: + id = '*' + return self("SELECT ? FROM ? WHERE %s" % 'AND '.join(['%s=%s' % (i, repr(j)) for i, j in where.items()])) + + def tables(self): + """return the tables available in the db""" + # XXX sqlite specific + return set([i[0] for i in self("SELECT name FROM sqlite_master WHERE type='table'")]) + + def create(self, name, *values): + """ + create a new table + - name: name of the table + - values: 2-tuples of (column name, type) + """ + # sanity checks + assert not [i for i in values if len(i) != 2], "Values should be 2-tuples" + missing = set([i[1] for i in values]).difference(self.converters.keys()) + assert not missing, "Unknown types found: %s" % missing + + self("DROP TABLE IF EXISTS %s" % name) + self("CREATE TABLE %s (%s)" % (name, ', '.join(["%s %s" % (i, self.converters[j]) for i, j in values]))) + + def columns(self, table): + """ + return the column names in a table + """ + + info = self("PRAGMA table_info(%s)" % table) + return [i[1] for i in info] + +if __name__ == '__main__': + db = SQL() + + # there should be no tables + assert not db.tables() + + # add a table + db.create('foo', ('bar', int), ('blarg', str)) + db.create('fleem', ('baz', int)) + assert db.tables() == set(['foo', 'fleem']) + columns = db.columns('foo') + assert columns == ['bar', 'blarg']
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/web.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +""" +web handler for schema +""" + +from webob import Request, Response, exc + +class Handler(object): + + def __init__(self, **kw): + pass + + def __call__(self, environ, start_response): + request = Request(environ) + response = Response(content_type='text/plain', + body="schema") + return response(environ, start_response) + +if __name__ == '__main__': + from wsgiref import simple_server + app = Handler() + server = simple_server.make_server(host='0.0.0.0', port=8080, app=app) + server.serve_forever() + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/setup.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,33 @@ +import os +from setuptools import setup, find_packages + +try: + here = os.path.dirname(os.path.abspath(__file__)) + description = file(os.path.join(here, 'README.txt')).read() +except IOError: + description = '' + +version = "0.0" + +setup(name='graphserver-schema', + version=version, + description="playing with schemas", + long_description=description, + classifiers=[], # Get strings from http://www.python.org/pypi?%3Aaction=list_classifiers + author='Jeff Hammel', + author_email='jhammel@mozilla.com', + url='', + license='', + packages=find_packages(exclude=['ez_setup', 'examples', 'tests']), + include_package_data=True, + zip_safe=False, + install_requires=dependencies, + entry_points=""" + # -*- Entry points: -*- + + [console_scripts] + schema = schema.main:main + """, + ) + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/doctest.txt Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,11 @@ +Test schema +================ + +The obligatory imports: + + >>> import schema + +Run some tests. This test will fail, please fix it: + + >>> assert True == False +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test.py Fri Feb 17 12:20:51 2012 -0800 @@ -0,0 +1,60 @@ +#!/usr/bin/env python + +""" +doctest runner +""" + +import doctest +import os +import sys +from optparse import OptionParser + + +def run_tests(raise_on_error=False, report_first=False): + + # add results here + results = {} + + # doctest arguments + directory = os.path.dirname(os.path.abspath(__file__)) + extraglobs = {'here': directory} + doctest_args = dict(extraglobs=extraglobs, raise_on_error=raise_on_error) + if report_first: + doctest_args['optionflags'] = doctest.REPORT_ONLY_FIRST_FAILURE + + # gather tests + tests = [ test for test in os.listdir(directory) + if test.endswith('.txt') ] + + # run the tests + for test in tests: + try: + results[test] = doctest.testfile(test, **doctest_args) + except doctest.DocTestFailure, failure: + raise + except doctest.UnexpectedException, failure: + raise failure.exc_info[0], failure.exc_info[1], failure.exc_info[2] + + return results + +def main(args=sys.argv[1:]): + + # parse command line args + parser = OptionParser(description=__doc__) + parser.add_option('--raise', dest='raise_on_error', + default=False, action='store_true', + help="raise on first error") + parser.add_option('--report-first', dest='report_first', + default=False, action='store_true', + help="report the first error only (all tests will still run)") + options, args = parser.parse_args(args) + + # run the tests + results = run_tests(**options.__dict__) + if sum([i.failed for i in results.values()]): + sys.exit(1) # error + + +if __name__ == '__main__': + main() +