Mercurial > hg > Lemuriformes
changeset 17:4793f99b73e0
[lemuriformes] utility functions
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Sun, 10 Dec 2017 17:42:52 -0800 (2017-12-11) |
parents | 9b1bb9eee962 |
children | 56596902e9ae |
files | lemuriformes/port.py lemuriformes/read.py lemuriformes/serialize.py lemuriformes/table2csv.py lemuriformes/table_size.py lemuriformes/transpose.py lemuriformes/uniques.py lemuriformes/url2sql.py lemuriformes/waiter.py lemuriformes/waiter.sh |
diffstat | 10 files changed, 436 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/port.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +import argparse +import socket +import sys + + +def get_free_port(): + """get a free port on localhost""" + + s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) + s.bind(('localhost', 0)) + address, port = s.getsockname() + s.close() + return port + + +def main(args=sys.argv[1:]): + """CLI to get a free port on localhost""" + + # parse command line + parser = argparse.ArgumentParser(description=get_free_port.__doc__) + options = parser.parse_args() + + # print an open port + print (get_free_port()) + + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/read.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,31 @@ +""" +deserialization +""" + +import csv +import json +from .cast import isstring + + +def dictreader(csv_file): + """read csv file into list of dicts""" + + with open(csv_file) as f: + reader = csv.DictReader(f) + return [row for row in reader] + + +def read_list_of_dicts(path, format): + """ + read a list of dicts (not enforced) + + format -- should be 'csv' or 'json' + """ + + assert format in ('csv', 'json') + assert isstring(path) + if format == 'csv': + return dictreader(path) + + with open(path) as f: + return json.load(f)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/serialize.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,91 @@ +#!/usr/bin/env python + +""" +serialization +""" + +import argparse +import csv +import json +import sys +from StringIO import StringIO +from .cast import isstring +from .cast import unify + + +def dictlist2csv(list_of_dicts, header=None, fp=None): + """ + convert a `list_of_dicts` to CSV + + `fp` should be a file-like object or a path to a file + that will be overwritten. If `fp` is not provided, a + string will be returned + """ + + if isstring(fp): + with open(fp, 'w') as _fp: + return dictlist2csv(list_of_dicts, _fp) + + return_string = False + if fp is None: + return_string = True + fp = StringIO() + + # get the header + if not list_of_dicts: + return # XXX what about return_string? Good or bad? + header = header or list_of_dicts[0].keys() + + # instantiate a writer + writer = csv.DictWriter(fp, fieldnames=header) + writer.writeheader() + for row in list_of_dicts: + row = {key: unify(value) + for key, value in row.items()} + try: + writer.writerow(row) + except (UnicodeDecodeError, UnicodeEncodeError) as e: + print (row ) + print(e) + raise + + if return_string: + return fp.getvalue() + + +def dump_json(_json): + """general purpose JSON front-end""" + return json.dumps(_json, indent=2, sort_keys=True) + + +def append(filename, item): + """append line-`item` to `filename`""" + + with open(filename, 'a') as f: + f.write('{}\n'.format(item)) + + +def main(args=sys.argv[1:]): + """CLI""" + + # parse command line + description = "convert a list of dicts in JSON format to CSV" + parser = argparse.ArgumentParser(description=description) + parser.add_argument('input', + type=argparse.FileType('r'), + help="path to file containing a list of flat dicts") + parser.add_argument('-o', '--output', dest='output', + type=argparse.FileType('w'), default=sys.stdout, + help="file to write the CSV to [DEFAULT: stdout]") + options = parser.parse_args(args) + + # parse input + data = json.loads(options.input.read()) + assert type(data) == list + + # write output + dictlist2csv(data, options.output) + + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/table2csv.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,69 @@ +#!/usr/bin/env python + +""" +dump a MySQL table to CSV +""" + + +import argparse +import csv +import sys +from .db import MySQLConnection, MySQLParser + + +class MySQLParser(argparse.ArgumentParser): + + def __init__(self, **kwargs): + argparse.ArgumentParser.__init__(self, **kwargs) + self.add_arguments() + self.options = None + + def add_arguments(self): + self.add_argument('host') + self.add_argument('db') + self.add_argument('-u', '--user', dest='user', default='root', + help="MySQL user [DEFAULT: %(default)s]") + self.add_argument('-p', '--password', dest='password', + help="MySQL password [DEFAULT: %(default)s]") + + def parse_args(self, args): + options = argparse.ArgumentParser.parse_args(self, args) + self.options = self.validate(options) + return self.options + + def validate(self, options): + """validate options""" + return options + + def connection(self): + if self.options is None: + raise Exception("parse_args not called successfully!") + + return MySQLConnection(host=self.options.host, + user=self.options.user, + password=self.options.password, + db=self.options.db) + +def main(args=sys.argv[1:]): + + # parse command line + parser = MySQLParser(description=__doc__) + parser.add_argument('table', + help="table to dump") + parser.add_argument('-o', '--output', dest='output', + type=argparse.FileType('w'), default=sys.stdout, + help="path to put data to, or stdout by default") + options = parser.parse_args(args) + + # read table + connection = parser.connection() + data = connection("SELECT * FROM {table}".format(table=options.table)) + + # dump table + writer = csv.writer(options.output) + writer.writerows(data) + options.output.flush() + + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/table_size.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +monitor SQL table size +""" + +import argparse +import csv +import os +import sys +import time +from .cli import ConfigurationParser +from .url2sql import url2sql + + +def main(args=sys.argv[1:]): + """CLI""" + + # parse command line + parser = ConfigurationParser(description=__doc__) + parser.add_argument('database', type=url2sql, + help="URL of SQL database to connect to") + parser.add_argument('table', + help="table to read sizes of") + parser.add_argument('-o', '--output', dest='output', + type=argparse.FileType('a'), default=sys.stdout, + help="CSV output file; stdout by default") + parser.add_argument('-w', '--wait', dest='wait', + type=float, default=60., + help="how long to wait between calls in seconds [DEFAULT: %(default)s]") + options = parser.parse_args(args) + + # ensure table is part of database + db = options.database + tables = db.tables() + if options.table not in tables: + parser.error("Table '{}' not in database tables: {}".format(options.table, + ', '.join(tables))) + + # instantiate writer + writer = csv.writer(options.output) + + # get initial data + previous = db.count(options.table) + end = time.time() + time.sleep(options.wait) + + while True: + try: + start = time.time() + count = db.count(options.table) + rate = (count - previous)/(start-end) + writer.writerow([start, count, rate]) + options.output.flush() + end = start + previous = count + time.sleep(options.wait - (time.time() - start)) + except KeyboardInterrupt: + break + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/transpose.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,7 @@ +""" +transpose an 2D iterable of iterables in python +""" + +def transpose(array, type=list): + + return map(type, zip(*array))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/uniques.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +count uniques in each CSV file column +""" + +import argparse +import csv +import sys +from collections import OrderedDict +from .columns import read_columns + + +def uniques(fp): + """get unique counts for columns in CSV file `fp`""" + + # read columns + columns = read_columns(fp, type=OrderedDict) + + # convert to sets + for key, value in columns.iteritems(): + columns[key] = set(value) + + return columns + + +def main(args=sys.argv[1:]): + """CLI""" + + # parse command line + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('input', + type=argparse.FileType('r'), + help="input CSV file with headers") + options = parser.parse_args(args) + + # determine sets + columns = uniques(options.input) + + # output uniques + writer = csv.writer(sys.stdout) + for key, value in columns.iteritems(): + writer.writerow([key, len(value)]) + + +if __name__ == '__main__': + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/url2sql.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,40 @@ +""" +instantiate a SQL interface from a URL +""" + +from urlparse import urlparse + +# Local imports of types; messy, currently +from .db import MySQLConnection +from .cli import ConfigurationParser +from .csv2sqlite import SQLiteConnection + +sql_types = {'mysql': MySQLConnection, + 'sqlite': SQLiteConnection +} + +ports = {'mysql': 3306, +} + + +def url2sql(url): + """instantiate SQL connection based on URL""" + + # parse URL + parsed = urlparse(url) + + if parsed.scheme not in sql_types: + raise AssertionError("Unsupported SQL connector type: '{}'".format(parsed.scheme)) + + default_port = ports.get(parsed.scheme) + + # instantiate MySQL connection + if parsed.scheme == 'mysql': + conn_data = dict(host=parsed.hostname, + user=parsed.username, + password=parsed.password, + port=parsed.port or default_port, + db=parsed.path.strip('/')) + elif parsed.scheme == 'sqlite': + conn_data = dict(db_file=parsed.path) + return sql_types[parsed.scheme](**conn_data)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/waiter.py Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,30 @@ +""" +interpolate (etc) bash waiter script for subcommand parallelism +""" + +import os +import tempita + +here = os.path.dirname(os.path.abspath(__file__)) + +class BashWaiter(object): + + template_path = os.path.join(here, 'waiter.sh') + + def __init__(self, *commands): + assert os.path.exists(self.template_path) + self.template = tempita.Template.from_filename(self.template_path) + + self.commands = [] + for command in commands: + self.add(command) + + def add(self, command): + self.commands.append(command) + + + def __str__(self): + """render the template""" + variables = {'commands': self.commands} + return self.template.substitute(**variables) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/waiter.sh Sun Dec 10 17:42:52 2017 -0800 @@ -0,0 +1,27 @@ +#!/bin/bash + +# parallelize and wait for a set of jobs; +# it's MapReduce for `bash`! + +FAIL=0 + +# See http://jeremy.zawodny.com/blog/archives/010717.html + +{{for command in commands}} +{{command}} & +{{endfor}} + +# wait for processes +for job in `jobs -p` +do + echo ${job} + wait ${job} || let "FAIL+=1" +done + +if [ "${FAIL}" == "0" ] +then + exit 0 +fi + +echo "${FAIL} failing job(s)" +exit 1