Mercurial > hg > Lemuriformes
changeset 11:afc259799019
[CSV] add script to convert to SQL
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Sun, 10 Dec 2017 13:58:55 -0800 |
parents | ad1bf59eedb0 |
children | 82cd4e0b66cf |
files | lemuriformes/csv2sqlite.py |
diffstat | 1 files changed, 174 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lemuriformes/csv2sqlite.py Sun Dec 10 13:58:55 2017 -0800 @@ -0,0 +1,174 @@ +#!/usr/bin/env python + +""" +convert CSV files to a SQLite DB file +""" + +# For SQLite types see +# http://www.sqlite.org/datatype3.html + +import argparse +import csv +import os +import sqlite3 +import sys +from collections import OrderedDict +from .cast import string +from .sql import SQLConnection + + +def read_csv(csv_file): + """read csv file with header and return list of dicts""" + + reader = csv.DictReader(csv_file) + return [row for row in reader] + + +def path_root(path): + """return basename file root sans extension""" + + return os.path.splitext(os.path.basename(path))[0] + + +class SQLiteConnection(SQLConnection): + """connection class to SQLite database""" + + # mapping of python types to SQLite types + types = {int: "INTEGER", + float: "REAL", + str: "TEXT"} + + def __init__(self, db_file): + if not os.path.exists(db_file): + with open(db_file, 'wb') as f: + # touch file + pass + self.db_file = db_file + self.conn = sqlite3.connect(self.db_file) + + def __call__(self, sql, *args): + c = self.conn.cursor() + c.execute(sql, args) + retval = c.fetchall() + self.conn.commit() + return retval + + def __del__(self): + self.conn.close() + + def tables(self): + """ + return tables in the SQLite database + Ref: https://stackoverflow.com/questions/82875/how-to-list-the-tables-in-an-sqlite-database-file-that-was-opened-with-attach + """ + sql = "SELECT name FROM sqlite_master WHERE type='table';" + results = self(sql) + return [row[0] for row in results] + + def create_table(self, table, **columns): + """add a table to the database""" + + columns = ', '.join(["{key} {value}".format(key=key, value=value) + for key, value in columns.items()]) + self("CREATE TABLE IF NOT EXISTS {table} ({columns});".format(table=table, + columns=columns)) + + def schema(self, table): + """get dictionary of typing for a table""" + # http://www.sqlite.org/pragma.html#pragma_table_info + + sql = "PRAGMA table_info('{}')".format(table) + results = self(sql) + # TODO: get data types from e.g. + # [(0, u'how', u'', 0, None, 0), (1, u'are', u'', 0, None, 0), (2, u'you', u'', 0, None, 0)] + # ... now + # [(0, u'how', u'TEXT', 0, None, 0), (1, u'you', u'TEXT', 0, None, 0), (2, u'are', u'TEXT', 0, None, 0)] + return OrderedDict([(result[1], result[2]) + for result in results]) + + def columns(self, table): + """return ordered list of column names""" + + return self.schema(table).keys() + + def select(self, table): + """just `select *` for now""" + + sql = "SELECT * FROM {table};" + sql = sql.format(table=table) + data = self(sql) + columns = self.columns(table) + return [OrderedDict(zip(columns, row)) + for row in data] + + select_all = select + + +class CSV2SQLite(object): + """ + convert Comma Separated Value input to SQLite + """ + # TODO: allow conversion to arbitrary SQL + + def __init__(self, output): + self.conn = SQLiteConnection(output) + + def __call__(self, *csv_files, **csv_dict): + + # allow tables of default(ing) name + csv_dict.update({csv_file: None + for csv_file in csv_files}) + + for csv_file, tablename in csv_dict.items(): + self.csv2table(csv_file, tablename=tablename) + + def csv2table(self, csv_file, tablename=None): + if isinstance(csv_file, string): + with open(csv_file) as f: + return self.csv2table(f, tablename=tablename) + if tablename is None: + # TODO: allow lookup from scheme + tablename = path_root(csv_file.name) + # read csv + data = read_csv(csv_file) + assert data + + # infer schema from data + # TODO: allow lookup from scheme + columns = {column: "TEXT" for column in data[0].keys()} + + # create table + self.conn.create_table(tablename, **columns) + + # inseert data + for row in data: + self.conn.insert_row(tablename, **row) + + +def main(args=sys.argv[1:]): + """CLI""" + + # parse command line + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('input', nargs='+', + type=argparse.FileType('r'), + help="input CSV files; table names taken from file names") + parser.add_argument('-o', '--output', dest='output', + required=True, + help="output SQLite file") + options = parser.parse_args(args) + + # overwrite the file + # TODO: deprecate and allow appending + with open(options.output, 'w') as f: + pass + + # instantiate converter + conn = CSV2SQLite(options.output) + + # convert input CSV to SQLite tables + conn(*options.input) + + +if __name__ == '__main__': + main()