Mercurial > hg > Lemuriformes
changeset 15:0d1b8bb1d97b
SQL + data related functionality
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Sun, 10 Dec 2017 17:16:52 -0800 (2017-12-11) |
parents | 756dbd3e391e |
children | 9b1bb9eee962 |
files | lemuriformes/issubset.py lemuriformes/json2csv.py lemuriformes/log.py lemuriformes/mysql.py lemuriformes/sql.py lemuriformes/test_server.py |
diffstat | 6 files changed, 499 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/issubset.py Sun Dec 10 17:16:52 2017 -0800 @@ -0,0 +1,66 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +determine if one CSV column is a subsset of another. +If it is a subset, output nothing and exit 0. +If it is not a subset, output all elements that are +in the asserted subset but not the total set +and exits 1. +Exits 2 on error. +""" + +import os +import sys +from collections import OrderedDict +from .cli import ConfigurationParser +from .uniques import uniques + +def main(args=sys.argv[1:]): + """CLI""" + + sets = OrderedDict([('subset', "the `PATH` to the CSV and `COLUMN` of the asserted subset"), + ('total', "the `PATH` to the CSV and `COLUMN` of the total set")]) + + # parse command line + parser = ConfigurationParser(description=__doc__) + for key, description in sets.items(): + parser.add_argument(key, nargs=2, + help=description) + options = parser.parse_args(args) + + # sanity + filenames = set() + for key in sets.keys(): + # - ensure the values are listw + setattr(options, key, list(getattr(options, key))) + value = getattr(options, key) + # - make files absolute + value[0] = os.path.realpath(value[0]) + # - ensure files exist + filename = value[0] + if not os.path.isfile(filename): + parser.error("Not a file: {}".format(filename)) + filenames.add(filename) + + # read the files + columns = {filename: uniques(filename) + for filename in filenames} + + # assert that the columns are in the files they have been ascribed to + for key in sets.keys(): + filename, column = getattr(options, key) + if column not in columns[filename]: + parser.error("Column '{}' not found in file '{}'".format(column, filename)) + + # calculate the difference + difference = columns[options.subset[0]][options.subset[1]].difference( + columns[options.total[0]][options.total[1]]) + if not difference: + return + print ("\n".join([str(i) for i in sorted(difference)])) + sys.exit(1) + + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/json2csv.py Sun Dec 10 17:16:52 2017 -0800 @@ -0,0 +1,78 @@ +#!/usr/bin/env python + +""" +convert JSON list of hashes to CSV +""" +# Note: we could use https://docs.python.org/2/library/csv.html#csv.DictWriter +# but we're being careful here since we actually want this data structure in code, +# not just for de/serialization + + +import argparse +import csv +import json +import sys +from .cast import unify +from .cli import ConfigurationParser + + +def flatten_list_of_dicts(list_of_dicts, header=None): + """ + flattens a list of dicts into a list of lists. + + Returns (header, list_of_lists) + """ + + if not list_of_dicts: + return [] + + # sanity + keys = list_of_dicts[0].keys() + if header: + if not set(header).issubset(keys): + raise AssertionError("header contains elements not seen in the set: {}".format(', '.format(set(header).difference(keys)))) + for item in list_of_dicts: + # ensure each item has the same keys + if set(keys) != set(item.keys()): + raise AssertionError("Keys not consistent! {} != {}".format(sorted(keys), + sorted(item.keys()))) + + if not header: + header = keys # to sort? + + # flatten it! + retval = [] + for item in list_of_dicts: + retval.append([item[key] for key in header]) + + return (header, retval) + + +def main(args=sys.argv[1:]): + """CLI""" + + # parse command line + parser = ConfigurationParser(description=__doc__) + parser.add_argument('json', type=argparse.FileType('r'), + help="JSON file of list of hashes") + parser.add_argument('-H', '--header', dest='header', nargs='+', + help="use these fields for header") + parser.add_argument('-o', '--output', dest='output', + type=argparse.FileType('w'), default=sys.stdout, + help="path to output, or stdout by default") + options = parser.parse_args(args) + + # read + data = json.load(options.json) + + # flatten + header, flattened = flatten_list_of_dicts(data, header=options.header) + + # write + writer = csv.writer(options.output) + writer.writerow(header) + for row in flattened: + writer.writerow([unify(v) for v in row]) + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/log.py Sun Dec 10 17:16:52 2017 -0800 @@ -0,0 +1,108 @@ +""" +ElasticSearch-style logging +""" + +import json +import sys +import time +from .cli import ConfigurationParser + +stdout = sys.stdout + +try: + # python 2 + string = (str, unicode) +except NameError: + # python 3 + string = (str, ) + + +def read_logfile(f): + """ + read a JSON-per-line log file's contents and return the value + + f -- log file pointer or name + """ + + if isinstance(f, string): + with open(f) as _f: + return read_logfile(_f) + lines = f.read().strip().splitlines() + return [json.loads(line) for line in lines] + + +class ElasticLogger(object): + """Elasticsearch-compatible log dispatcher""" + + def __init__(self, logfilepath=None, **data): + """ + logfilepath -- path to logfile + data -- data to be included with each logging event + """ + + self.logfilepath = logfilepath + self.data = data + + def write(self, f, **data): + """ + write JSON `data` to file-like object `f` + """ + + f.write(json.dumps(data, sort_keys=True) + '\n') + f.flush() + + def __call__(self, message, **kw): + + # create log data JSON blob + now = time.time() + data = self.data.copy() # shallow copy + data.update({'time': now, + 'message': message}) + data.update(kw) + + # log to stdout + self.write(stdout, **data) + + if self.logfilepath: + # log to a file + with open(self.logfilepath, 'a') as logfile: + self.write(logfile, **data) + + +class ElasticLoggerParser(ConfigurationParser): + + def add_arguments(self): + self.add_argument('-l', '--log', '--logfile', dest='logfile', + help="where to log events to in addition to stdout") + self.add_argument('--tag', dest='tags', nargs='+', default=(), + type=self.keyvalue, metavar="KEY=VALUE", + help="set of key, values to tag all log lines with") + + def logger(self): + """return elastic logger instance""" + + assert self.options is not None + return ElasticLogger(self.options.logfile, + **dict(self.options.tags)) + + +def main(args=sys.argv[1:]): + """example CLI program""" + + # parse command line + parser = ElasticLoggerParser(description="my elastic diary") + options = parser.parse_args() + + # example: timestamped diary + logger = parser.logger() + + # main loop + try: + while True: + logger(raw_input()) + except KeyboardInterrupt: + pass + + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/mysql.py Sun Dec 10 17:16:52 2017 -0800 @@ -0,0 +1,114 @@ +""" +MySQL database connection class + helpers +""" + +import pymysql # https://github.com/PyMySQL/PyMySQL +import pymysql.cursors +import sys +from .cli import ConfigurationParser +from .sql import SQLConnection + + +class MySQLConnection(SQLConnection): + """connection to a MySQL database""" + + placeholder = '%s' # VALUE placeholder + connect_data_keys = ['host', 'user', 'password', 'db', 'port', 'charset'] + + def __init__(self, host, user, password, db, port=3306, charset='utf8mb4'): + self.connect_data = {} + for key in self.connect_data_keys: + self.connect_data[key] = locals()[key] + + + def connect(self): + return pymysql.connect(**self.connect_data) + + def __call__(self, sql, *args): + + with self.connect() as cursor: + try: + cursor.execute(sql, args) + except TypeError: + print ((sql, args)) + raise + try: + result = cursor.fetchall() + except: + result = None + self.connect().commit() + return result + + def tables(self): + """return tables""" + + data = self("show tables") + return [item[0] for item in data] + + def drop(self, table): + + if table not in self.tables(): + return + self("drop table if exists {table}".format(table=table)) + + def create(self, table, *columns): + """ + columns -- each column should be a 2-tuple + """ + + sql = "CREATE TABLE {table} ({columns}) DEFAULT CHARSET=utf8mb4" + + # format columns + _columns = ', '.join(["{0} {1}".format(column, _type) + for column, _type in columns]) + + # execute query + self(sql.format(table=table, columns=_columns)) + + def insert(self, table, **row): + """insert a `row` into `table`""" + + assert row + keys = row.keys() + values = [row[key] for key in keys] + self(self.insert_sql(table=table, + columns=keys), + *values) + + def insert_many(self, table, columns, values): + """ + insert many rows into `table` + columns -- list of columns to insert + """ + + # https://stackoverflow.com/questions/13020908/sql-multiple-inserts-with-python + # It may be more efficient to flatten the string + # instead of using `.executemany`; see + # https://stackoverflow.com/questions/14011160/how-to-use-python-mysqldb-to-insert-many-rows-at-once + + with self.connect() as connection: + sql = self.insert_sql(table=table, columns=columns) + connection.executemany(sql, values) + self.connect().commit() + + +class MySQLParser(ConfigurationParser): + """command line parser for MySQL""" + # TODO: obsolete! + + def add_arguments(self): + self.add_argument('host', help="SQL host") + self.add_argument('db', help="database to use") + self.add_argument('-u', '--user', dest='user', default='root', + help="MySQL user [DEFAULT: %(default)s]") + self.add_argument('-p', '--password', dest='password', + help="MySQL password [DEFAULT: %(default)s]") + + def connection(self): + if self.options is None: + raise Exception("parse_args not called successfully!") + + return MySQLConnection(host=self.options.host, + user=self.options.user, + password=self.options.password, + db=self.options.db)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/sql.py Sun Dec 10 17:16:52 2017 -0800 @@ -0,0 +1,92 @@ +""" +abstract SQL functionality +""" + + +from abc import abstractmethod + + +class SQLConnection(object): + """abstract base class for SQL connection""" + + placeholder = '?' # VALUE placeholder + + @abstractmethod + def __call__(self, sql, *args, **kwargs): + """ + execute `sql` against connection cursor with values in `args`; + `kwargs` should be passed to the connection + """ + + @abstractmethod + def tables(self): + """return list of tables in the database""" + + @abstractmethod + def columns(self, table): + """return the columns in `table`""" + + @abstractmethod + def create_table(self, table, **columns): + """ + add a table to the database for the specific SQL type + """ + + @abstractmethod + def pytype2sql(self, pytype): + """return the SQL type for the python type `pytype`""" + + @abstractmethod + def sqltype2py(self, sqltype): + """return the python type for the SQL type `sqltype`""" + + def drop(self, table): + """drop `table` if exists""" + + if table in self.tables(): + return # nothing to do + + sql = "DROP TABLE {table}" + self(sql.format(table=table)) + + def placeholders(self, number): + """ + return placeholder string appropriate to INSERT SQL; + `number` should be an integer or an iterable with a `len` + """ + + try: + number = len(number) # iterable + except TypeError: + pass # assume integer + + return ','.join([self.placeholder for placeholder in range(number)]) + + def insert_sql(self, table, columns): + """return insert SQL statement""" + sql = "INSERT INTO `{table}` ({columns}) VALUES ({values})" + column_str = ', '.join(["`{}`".format(key) for key in columns]) + return sql.format(table=table, + columns=column_str, + values=self.placeholders(columns)) + + def insert_row(self, table, **row): + """insert one `row` into `table`""" + + columns = row.keys() + sql = "INSERT INTO {table} ({columns}) VALUES ({placeholders})".format(table=table, + placeholders=self.placeholders(columns), + columns=', '.join(columns)) + values = tuple([row[column] for column in columns]) + self(sql, *values) + + def count(self, table): + """count + return number of rows in `table`""" + # https://docs.microsoft.com/en-us/sql/t-sql/functions/count-transact-sql + + sql = "select count(*) from {table}".format(table=table) + data = self(sql) + assert len(data) == 1 + if len(data[0]) != 1: + raise AssertionError + return data[0][0]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/test_server.py Sun Dec 10 17:16:52 2017 -0800 @@ -0,0 +1,41 @@ +""" +mock server for testing purposes +""" + +# see https://realpython.com/blog/python/testing-third-party-apis-with-mock-servers/ + +# TODO: +# - python3 compatability +# - use multiprocessing as threading does not allow hangin thread deletion +from BaseHTTPServer import HTTPServer +from threading import Thread +from .port import get_free_port + + +class MockServer(HTTPServer): + + hostname = 'localhost' + + def __init__(self, handler): + HTTPServer.__init__(self, + (self.hostname, get_free_port()), + handler) + self.thread = None + + def base_url(self): + return 'http://{0}:{1}/'.format(*self.server_address) + + def start_thread(self): + self.thread = Thread(target=self.serve_forever) + self.thread.setDaemon(True) + self.thread.start() + + def stop_thread(self): + raise NotImplementedError('TODO') + + def __enter__(self): + self.start_thread() + return self # or perhaps `self.thread`? + + def __exit__(self, *args): + pass # nothing to do? really?