changeset 15:0d1b8bb1d97b

SQL + data related functionality
author Jeff Hammel <k0scist@gmail.com>
date Sun, 10 Dec 2017 17:16:52 -0800 (2017-12-11)
parents 756dbd3e391e
children 9b1bb9eee962
files lemuriformes/issubset.py lemuriformes/json2csv.py lemuriformes/log.py lemuriformes/mysql.py lemuriformes/sql.py lemuriformes/test_server.py
diffstat 6 files changed, 499 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/issubset.py	Sun Dec 10 17:16:52 2017 -0800
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+determine if one CSV column is a subsset of another.
+If it is a subset, output nothing and exit 0.
+If it is not a subset, output all elements that are
+in the asserted subset but not the total set
+and exits 1.
+Exits 2 on error.
+"""
+
+import os
+import sys
+from collections import OrderedDict
+from .cli import ConfigurationParser
+from .uniques import uniques
+
+def main(args=sys.argv[1:]):
+    """CLI"""
+
+    sets = OrderedDict([('subset', "the `PATH` to the CSV and `COLUMN` of the asserted subset"),
+                        ('total', "the `PATH` to the CSV and `COLUMN` of the total set")])
+
+    # parse command line
+    parser = ConfigurationParser(description=__doc__)
+    for key, description in sets.items():
+        parser.add_argument(key, nargs=2,
+                            help=description)
+    options = parser.parse_args(args)
+
+    # sanity
+    filenames = set()
+    for key in sets.keys():
+        # - ensure the values are listw
+        setattr(options, key, list(getattr(options, key)))
+        value = getattr(options, key)
+        # - make files absolute
+        value[0] = os.path.realpath(value[0])
+        # - ensure files exist
+        filename = value[0]
+        if not os.path.isfile(filename):
+            parser.error("Not a file: {}".format(filename))
+        filenames.add(filename)
+
+    # read the files
+    columns = {filename: uniques(filename)
+               for filename in filenames}
+
+    # assert that the columns are in the files they have been ascribed to
+    for key in sets.keys():
+        filename, column = getattr(options, key)
+        if column not in columns[filename]:
+            parser.error("Column '{}' not found in file '{}'".format(column, filename))
+
+    # calculate the difference
+    difference = columns[options.subset[0]][options.subset[1]].difference(
+        columns[options.total[0]][options.total[1]])
+    if not difference:
+        return
+    print ("\n".join([str(i) for i in sorted(difference)]))
+    sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/json2csv.py	Sun Dec 10 17:16:52 2017 -0800
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+
+"""
+convert JSON list of hashes to CSV
+"""
+# Note: we could use https://docs.python.org/2/library/csv.html#csv.DictWriter
+# but we're being careful here since we actually want this data structure in code,
+# not just for de/serialization
+
+
+import argparse
+import csv
+import json
+import sys
+from .cast import unify
+from .cli import ConfigurationParser
+
+
+def flatten_list_of_dicts(list_of_dicts, header=None):
+    """
+    flattens a list of dicts into a list of lists.
+
+    Returns (header, list_of_lists)
+    """
+
+    if not list_of_dicts:
+        return []
+
+    # sanity
+    keys = list_of_dicts[0].keys()
+    if header:
+        if not set(header).issubset(keys):
+            raise AssertionError("header contains elements not seen in the set: {}".format(', '.format(set(header).difference(keys))))
+    for item in list_of_dicts:
+        # ensure each item has the same keys
+        if set(keys) != set(item.keys()):
+            raise AssertionError("Keys not consistent! {} != {}".format(sorted(keys),
+                                                                        sorted(item.keys())))
+
+    if not header:
+        header = keys  # to sort?
+
+    # flatten it!
+    retval = []
+    for item in list_of_dicts:
+        retval.append([item[key] for key in header])
+
+    return (header, retval)
+
+
+def main(args=sys.argv[1:]):
+    """CLI"""
+
+    # parse command line
+    parser = ConfigurationParser(description=__doc__)
+    parser.add_argument('json', type=argparse.FileType('r'),
+                        help="JSON file of list of hashes")
+    parser.add_argument('-H', '--header', dest='header', nargs='+',
+                        help="use these fields for header")
+    parser.add_argument('-o', '--output', dest='output',
+                        type=argparse.FileType('w'), default=sys.stdout,
+                        help="path to output, or stdout by default")
+    options = parser.parse_args(args)
+
+    # read
+    data = json.load(options.json)
+
+    # flatten
+    header, flattened = flatten_list_of_dicts(data, header=options.header)
+
+    # write
+    writer = csv.writer(options.output)
+    writer.writerow(header)
+    for row in flattened:
+        writer.writerow([unify(v) for v in row])
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/log.py	Sun Dec 10 17:16:52 2017 -0800
@@ -0,0 +1,108 @@
+"""
+ElasticSearch-style logging
+"""
+
+import json
+import sys
+import time
+from .cli import ConfigurationParser
+
+stdout = sys.stdout
+
+try:
+    # python 2
+    string = (str, unicode)
+except NameError:
+    # python 3
+    string = (str, )
+
+
+def read_logfile(f):
+    """
+    read a JSON-per-line log file's contents and return the value
+
+    f -- log file pointer or name
+    """
+
+    if isinstance(f, string):
+        with open(f) as _f:
+            return read_logfile(_f)
+    lines = f.read().strip().splitlines()
+    return [json.loads(line) for line in lines]
+
+
+class ElasticLogger(object):
+    """Elasticsearch-compatible log dispatcher"""
+
+    def __init__(self, logfilepath=None, **data):
+        """
+        logfilepath -- path to logfile
+        data -- data to be included with each logging event
+        """
+
+        self.logfilepath = logfilepath
+        self.data = data
+
+    def write(self, f, **data):
+        """
+        write JSON `data` to file-like object `f`
+        """
+
+        f.write(json.dumps(data, sort_keys=True) + '\n')
+        f.flush()
+
+    def __call__(self, message, **kw):
+
+        # create log data JSON blob
+        now = time.time()
+        data = self.data.copy()   # shallow copy
+        data.update({'time': now,
+                     'message': message})
+        data.update(kw)
+
+        # log to stdout
+        self.write(stdout, **data)
+
+        if self.logfilepath:
+            # log to a file
+            with open(self.logfilepath, 'a') as logfile:
+                self.write(logfile, **data)
+
+
+class ElasticLoggerParser(ConfigurationParser):
+
+    def add_arguments(self):
+        self.add_argument('-l', '--log', '--logfile', dest='logfile',
+                          help="where to log events to in addition to stdout")
+        self.add_argument('--tag', dest='tags', nargs='+', default=(),
+                          type=self.keyvalue, metavar="KEY=VALUE",
+                          help="set of key, values to tag all log lines with")
+
+    def logger(self):
+        """return elastic logger instance"""
+
+        assert self.options is not None
+        return ElasticLogger(self.options.logfile,
+                             **dict(self.options.tags))
+
+
+def main(args=sys.argv[1:]):
+    """example CLI program"""
+
+    # parse command line
+    parser = ElasticLoggerParser(description="my elastic diary")
+    options = parser.parse_args()
+
+    # example: timestamped diary
+    logger = parser.logger()
+
+    # main loop
+    try:
+        while True:
+            logger(raw_input())
+    except KeyboardInterrupt:
+        pass
+
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/mysql.py	Sun Dec 10 17:16:52 2017 -0800
@@ -0,0 +1,114 @@
+"""
+MySQL database connection class + helpers
+"""
+
+import pymysql  # https://github.com/PyMySQL/PyMySQL
+import pymysql.cursors
+import sys
+from .cli import ConfigurationParser
+from .sql import SQLConnection
+
+
+class MySQLConnection(SQLConnection):
+    """connection to a MySQL database"""
+
+    placeholder = '%s'  # VALUE placeholder
+    connect_data_keys = ['host', 'user', 'password', 'db', 'port', 'charset']
+
+    def __init__(self, host, user, password, db, port=3306, charset='utf8mb4'):
+        self.connect_data = {}
+        for key in self.connect_data_keys:
+            self.connect_data[key] = locals()[key]
+
+
+    def connect(self):
+        return pymysql.connect(**self.connect_data)
+
+    def __call__(self, sql, *args):
+
+        with self.connect() as cursor:
+            try:
+                cursor.execute(sql, args)
+            except TypeError:
+                print ((sql, args))
+                raise
+            try:
+                result = cursor.fetchall()
+            except:
+                result = None
+        self.connect().commit()
+        return result
+
+    def tables(self):
+        """return tables"""
+
+        data = self("show tables")
+        return [item[0] for item in data]
+
+    def drop(self, table):
+
+        if table not in self.tables():
+            return
+        self("drop table if exists {table}".format(table=table))
+
+    def create(self, table, *columns):
+        """
+        columns -- each column should be a 2-tuple
+        """
+
+        sql = "CREATE TABLE {table} ({columns}) DEFAULT CHARSET=utf8mb4"
+
+        # format columns
+        _columns = ', '.join(["{0} {1}".format(column, _type)
+                             for column, _type in columns])
+
+        # execute query
+        self(sql.format(table=table, columns=_columns))
+
+    def insert(self, table, **row):
+        """insert a `row` into `table`"""
+
+        assert row
+        keys = row.keys()
+        values = [row[key] for key in keys]
+        self(self.insert_sql(table=table,
+                             columns=keys),
+             *values)
+
+    def insert_many(self, table, columns, values):
+        """
+        insert many rows into `table`
+        columns -- list of columns to insert
+        """
+
+        # https://stackoverflow.com/questions/13020908/sql-multiple-inserts-with-python
+        # It may be more efficient to flatten the string
+        # instead of using `.executemany`; see
+        # https://stackoverflow.com/questions/14011160/how-to-use-python-mysqldb-to-insert-many-rows-at-once
+
+        with self.connect() as connection:
+            sql = self.insert_sql(table=table, columns=columns)
+            connection.executemany(sql, values)
+        self.connect().commit()
+
+
+class MySQLParser(ConfigurationParser):
+    """command line parser for MySQL"""
+    # TODO: obsolete!
+
+    def add_arguments(self):
+        self.add_argument('host', help="SQL host")
+        self.add_argument('db', help="database to use")
+        self.add_argument('-u', '--user', dest='user', default='root',
+                          help="MySQL user [DEFAULT: %(default)s]")
+        self.add_argument('-p', '--password', dest='password',
+                          help="MySQL password [DEFAULT: %(default)s]")
+
+    def connection(self):
+        if self.options is None:
+            raise Exception("parse_args not called successfully!")
+
+        return MySQLConnection(host=self.options.host,
+                               user=self.options.user,
+                               password=self.options.password,
+                               db=self.options.db)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/sql.py	Sun Dec 10 17:16:52 2017 -0800
@@ -0,0 +1,92 @@
+"""
+abstract SQL functionality
+"""
+
+
+from abc import abstractmethod
+
+
+class SQLConnection(object):
+    """abstract base class for SQL connection"""
+
+    placeholder = '?'  # VALUE placeholder
+
+    @abstractmethod
+    def __call__(self, sql, *args, **kwargs):
+        """
+        execute `sql` against connection cursor with values in `args`;
+        `kwargs` should be passed to the connection
+        """
+
+    @abstractmethod
+    def tables(self):
+        """return list of tables in the database"""
+
+    @abstractmethod
+    def columns(self, table):
+        """return the columns in `table`"""
+
+    @abstractmethod
+    def create_table(self, table, **columns):
+        """
+        add a table to the database for the specific SQL type
+        """
+
+    @abstractmethod
+    def pytype2sql(self, pytype):
+        """return the SQL type for the python type `pytype`"""
+
+    @abstractmethod
+    def sqltype2py(self, sqltype):
+        """return the python type for the SQL type `sqltype`"""
+
+    def drop(self, table):
+        """drop `table` if exists"""
+
+        if table in self.tables():
+            return  # nothing to do
+
+        sql = "DROP TABLE {table}"
+        self(sql.format(table=table))
+
+    def placeholders(self, number):
+        """
+        return placeholder string appropriate to INSERT SQL;
+        `number` should be an integer or an iterable with a `len`
+        """
+
+        try:
+            number = len(number)  # iterable
+        except TypeError:
+            pass  # assume integer
+
+        return ','.join([self.placeholder for placeholder in range(number)])
+
+    def insert_sql(self, table, columns):
+        """return insert SQL statement"""
+        sql = "INSERT INTO `{table}` ({columns}) VALUES ({values})"
+        column_str = ', '.join(["`{}`".format(key) for key in columns])
+        return sql.format(table=table,
+                          columns=column_str,
+                          values=self.placeholders(columns))
+
+    def insert_row(self, table, **row):
+        """insert one `row` into `table`"""
+
+        columns = row.keys()
+        sql = "INSERT INTO {table} ({columns}) VALUES ({placeholders})".format(table=table,
+                                                                               placeholders=self.placeholders(columns),
+                                                                               columns=', '.join(columns))
+        values = tuple([row[column] for column in columns])
+        self(sql, *values)
+
+    def count(self, table):
+        """count + return number of rows in `table`"""
+        # https://docs.microsoft.com/en-us/sql/t-sql/functions/count-transact-sql
+
+        sql = "select count(*) from {table}".format(table=table)
+        data = self(sql)
+        assert len(data) == 1
+        if len(data[0]) != 1:
+            raise AssertionError
+        return data[0][0]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/test_server.py	Sun Dec 10 17:16:52 2017 -0800
@@ -0,0 +1,41 @@
+"""
+mock server for testing purposes
+"""
+
+# see https://realpython.com/blog/python/testing-third-party-apis-with-mock-servers/
+
+# TODO:
+# - python3 compatability
+# - use multiprocessing as threading does not allow hangin thread deletion
+from BaseHTTPServer import HTTPServer
+from threading import Thread
+from .port import get_free_port
+
+
+class MockServer(HTTPServer):
+
+    hostname = 'localhost'
+
+    def __init__(self, handler):
+        HTTPServer.__init__(self,
+                            (self.hostname, get_free_port()),
+                            handler)
+        self.thread = None
+
+    def base_url(self):
+        return 'http://{0}:{1}/'.format(*self.server_address)
+
+    def start_thread(self):
+        self.thread = Thread(target=self.serve_forever)
+        self.thread.setDaemon(True)
+        self.thread.start()
+
+    def stop_thread(self):
+        raise NotImplementedError('TODO')
+
+    def __enter__(self):
+        self.start_thread()
+        return self  # or perhaps `self.thread`?
+
+    def __exit__(self, *args):
+        pass   # nothing to do? really?