changeset 17:4793f99b73e0

[lemuriformes] utility functions
author Jeff Hammel <k0scist@gmail.com>
date Sun, 10 Dec 2017 17:42:52 -0800
parents 9b1bb9eee962
children 56596902e9ae
files lemuriformes/port.py lemuriformes/read.py lemuriformes/serialize.py lemuriformes/table2csv.py lemuriformes/table_size.py lemuriformes/transpose.py lemuriformes/uniques.py lemuriformes/url2sql.py lemuriformes/waiter.py lemuriformes/waiter.sh
diffstat 10 files changed, 436 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/port.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+
+import argparse
+import socket
+import sys
+
+
+def get_free_port():
+    """get a free port on localhost"""
+
+    s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM)
+    s.bind(('localhost', 0))
+    address, port = s.getsockname()
+    s.close()
+    return port
+
+
+def main(args=sys.argv[1:]):
+    """CLI to get a free port on localhost"""
+
+    # parse command line
+    parser = argparse.ArgumentParser(description=get_free_port.__doc__)
+    options = parser.parse_args()
+
+    # print an open port
+    print (get_free_port())
+
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/read.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,31 @@
+"""
+deserialization
+"""
+
+import csv
+import json
+from .cast import isstring
+
+
+def dictreader(csv_file):
+    """read csv file into list of dicts"""
+
+    with open(csv_file) as f:
+        reader = csv.DictReader(f)
+        return [row for row in reader]
+
+
+def read_list_of_dicts(path, format):
+    """
+    read a list of dicts (not enforced)
+
+    format -- should be 'csv' or 'json'
+    """
+
+    assert format in ('csv', 'json')
+    assert isstring(path)
+    if format == 'csv':
+        return dictreader(path)
+
+    with open(path) as f:
+        return json.load(f)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/serialize.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+
+"""
+serialization
+"""
+
+import argparse
+import csv
+import json
+import sys
+from StringIO import StringIO
+from .cast import isstring
+from .cast import unify
+
+
+def dictlist2csv(list_of_dicts, header=None, fp=None):
+    """
+    convert a `list_of_dicts` to CSV
+
+    `fp` should be a file-like object or  a path to a file
+    that will be overwritten.  If `fp` is not provided, a
+    string will be returned
+    """
+
+    if isstring(fp):
+        with open(fp, 'w') as _fp:
+            return dictlist2csv(list_of_dicts, _fp)
+
+    return_string = False
+    if fp is None:
+        return_string = True
+        fp = StringIO()
+
+    # get the header
+    if not list_of_dicts:
+        return   # XXX what about return_string?  Good or bad?
+    header = header or list_of_dicts[0].keys()
+
+    # instantiate a writer
+    writer = csv.DictWriter(fp, fieldnames=header)
+    writer.writeheader()
+    for row in list_of_dicts:
+        row = {key: unify(value)
+               for key, value in row.items()}
+        try:
+            writer.writerow(row)
+        except (UnicodeDecodeError, UnicodeEncodeError) as e:
+            print (row )
+            print(e)
+            raise
+
+    if return_string:
+        return fp.getvalue()
+
+
+def dump_json(_json):
+    """general purpose JSON front-end"""
+    return json.dumps(_json, indent=2, sort_keys=True)
+
+
+def append(filename, item):
+    """append line-`item` to `filename`"""
+
+    with open(filename, 'a') as f:
+        f.write('{}\n'.format(item))
+
+
+def main(args=sys.argv[1:]):
+    """CLI"""
+
+    # parse command line
+    description = "convert a list of dicts in JSON format to CSV"
+    parser = argparse.ArgumentParser(description=description)
+    parser.add_argument('input',
+                        type=argparse.FileType('r'),
+                        help="path to file containing a list of flat dicts")
+    parser.add_argument('-o', '--output', dest='output',
+                        type=argparse.FileType('w'), default=sys.stdout,
+                        help="file to write the CSV to [DEFAULT: stdout]")
+    options = parser.parse_args(args)
+
+    # parse input
+    data = json.loads(options.input.read())
+    assert type(data) == list
+
+    # write output
+    dictlist2csv(data, options.output)
+
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/table2csv.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+
+"""
+dump a MySQL table to CSV
+"""
+
+
+import argparse
+import csv
+import sys
+from .db import MySQLConnection, MySQLParser
+
+
+class MySQLParser(argparse.ArgumentParser):
+
+    def __init__(self, **kwargs):
+        argparse.ArgumentParser.__init__(self, **kwargs)
+        self.add_arguments()
+        self.options = None
+
+    def add_arguments(self):
+        self.add_argument('host')
+        self.add_argument('db')
+        self.add_argument('-u', '--user', dest='user', default='root',
+                          help="MySQL user [DEFAULT: %(default)s]")
+        self.add_argument('-p', '--password', dest='password',
+                          help="MySQL password [DEFAULT: %(default)s]")
+
+    def parse_args(self, args):
+        options = argparse.ArgumentParser.parse_args(self, args)
+        self.options = self.validate(options)
+        return self.options
+
+    def validate(self, options):
+        """validate options"""
+        return options
+
+    def connection(self):
+        if self.options is None:
+            raise Exception("parse_args not called successfully!")
+
+        return MySQLConnection(host=self.options.host,
+                               user=self.options.user,
+                               password=self.options.password,
+                               db=self.options.db)
+
+def main(args=sys.argv[1:]):
+
+    # parse command line
+    parser = MySQLParser(description=__doc__)
+    parser.add_argument('table',
+                        help="table to dump")
+    parser.add_argument('-o', '--output', dest='output',
+                        type=argparse.FileType('w'), default=sys.stdout,
+                        help="path to put data to, or stdout by default")
+    options = parser.parse_args(args)
+
+    # read table
+    connection = parser.connection()
+    data = connection("SELECT * FROM {table}".format(table=options.table))
+
+    # dump table
+    writer = csv.writer(options.output)
+    writer.writerows(data)
+    options.output.flush()
+
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/table_size.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+monitor SQL table size
+"""
+
+import argparse
+import csv
+import os
+import sys
+import time
+from .cli import ConfigurationParser
+from .url2sql import url2sql
+
+
+def main(args=sys.argv[1:]):
+    """CLI"""
+
+    # parse command line
+    parser = ConfigurationParser(description=__doc__)
+    parser.add_argument('database', type=url2sql,
+                        help="URL of SQL database to connect to")
+    parser.add_argument('table',
+                        help="table to read sizes of")
+    parser.add_argument('-o', '--output', dest='output',
+                        type=argparse.FileType('a'), default=sys.stdout,
+                        help="CSV output file; stdout by default")
+    parser.add_argument('-w', '--wait', dest='wait',
+                        type=float, default=60.,
+                        help="how long to wait between calls in seconds [DEFAULT: %(default)s]")
+    options = parser.parse_args(args)
+
+    # ensure table is part of database
+    db = options.database
+    tables = db.tables()
+    if options.table not in tables:
+        parser.error("Table '{}' not in database tables: {}".format(options.table,
+                                                                    ', '.join(tables)))
+
+    # instantiate writer
+    writer = csv.writer(options.output)
+
+    # get initial data
+    previous = db.count(options.table)
+    end = time.time()
+    time.sleep(options.wait)
+
+    while True:
+        try:
+            start = time.time()
+            count = db.count(options.table)
+            rate = (count - previous)/(start-end)
+            writer.writerow([start, count, rate])
+            options.output.flush()
+            end = start
+            previous = count
+            time.sleep(options.wait - (time.time() - start))
+        except KeyboardInterrupt:
+            break
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/transpose.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,7 @@
+"""
+transpose an 2D iterable of iterables in python
+"""
+
+def transpose(array, type=list):
+
+    return map(type, zip(*array))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/uniques.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+count uniques in each CSV file column
+"""
+
+import argparse
+import csv
+import sys
+from collections import OrderedDict
+from .columns import read_columns
+
+
+def uniques(fp):
+    """get unique counts for columns in CSV file `fp`"""
+
+    # read columns
+    columns = read_columns(fp, type=OrderedDict)
+
+    # convert to sets
+    for key, value in columns.iteritems():
+        columns[key] = set(value)
+
+    return columns
+
+
+def main(args=sys.argv[1:]):
+    """CLI"""
+
+    # parse command line
+    parser = argparse.ArgumentParser(description=__doc__)
+    parser.add_argument('input',
+                        type=argparse.FileType('r'),
+                        help="input CSV file with headers")
+    options = parser.parse_args(args)
+
+    # determine sets
+    columns = uniques(options.input)
+
+    # output uniques
+    writer = csv.writer(sys.stdout)
+    for key, value in columns.iteritems():
+        writer.writerow([key, len(value)])
+
+
+if __name__ == '__main__':
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/url2sql.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,40 @@
+"""
+instantiate a SQL interface from a URL
+"""
+
+from urlparse import urlparse
+
+# Local imports of types;  messy, currently
+from .db import MySQLConnection
+from .cli import ConfigurationParser
+from .csv2sqlite import  SQLiteConnection
+
+sql_types = {'mysql': MySQLConnection,
+             'sqlite': SQLiteConnection
+}
+
+ports = {'mysql': 3306,
+}
+
+
+def url2sql(url):
+    """instantiate SQL connection based on URL"""
+
+    # parse URL
+    parsed = urlparse(url)
+
+    if parsed.scheme not in sql_types:
+        raise AssertionError("Unsupported SQL connector type: '{}'".format(parsed.scheme))
+
+    default_port = ports.get(parsed.scheme)
+
+    # instantiate MySQL connection
+    if parsed.scheme == 'mysql':
+        conn_data = dict(host=parsed.hostname,
+                         user=parsed.username,
+                         password=parsed.password,
+                         port=parsed.port or default_port,
+                         db=parsed.path.strip('/'))
+    elif parsed.scheme == 'sqlite':
+        conn_data = dict(db_file=parsed.path)
+    return sql_types[parsed.scheme](**conn_data)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/waiter.py	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,30 @@
+"""
+interpolate (etc) bash waiter script for subcommand parallelism
+"""
+
+import os
+import tempita
+
+here = os.path.dirname(os.path.abspath(__file__))
+
+class BashWaiter(object):
+
+    template_path = os.path.join(here, 'waiter.sh')
+
+    def __init__(self, *commands):
+        assert os.path.exists(self.template_path)
+        self.template = tempita.Template.from_filename(self.template_path)
+
+        self.commands = []
+        for command in commands:
+            self.add(command)
+
+    def add(self, command):
+        self.commands.append(command)
+
+
+    def __str__(self):
+        """render the template"""
+        variables = {'commands': self.commands}
+        return self.template.substitute(**variables)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lemuriformes/waiter.sh	Sun Dec 10 17:42:52 2017 -0800
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+# parallelize and wait for a set of jobs;
+# it's MapReduce for `bash`!
+
+FAIL=0
+
+# See http://jeremy.zawodny.com/blog/archives/010717.html
+
+{{for command in commands}}
+{{command}} &
+{{endfor}}
+
+# wait for processes
+for job in `jobs -p`
+do
+    echo ${job}
+    wait ${job} || let "FAIL+=1"
+done
+
+if [ "${FAIL}" == "0" ]
+then
+    exit 0
+fi
+
+echo "${FAIL} failing job(s)"
+exit 1