Mercurial > hg > Lemuriformes
view lemuriformes/csv2sqlite.py @ 18:56596902e9ae default tip
add some setup + tests
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Sun, 10 Dec 2017 17:57:03 -0800 |
parents | afc259799019 |
children |
line wrap: on
line source
#!/usr/bin/env python """ convert CSV files to a SQLite DB file """ # For SQLite types see # http://www.sqlite.org/datatype3.html import argparse import csv import os import sqlite3 import sys from collections import OrderedDict from .cast import string from .sql import SQLConnection def read_csv(csv_file): """read csv file with header and return list of dicts""" reader = csv.DictReader(csv_file) return [row for row in reader] def path_root(path): """return basename file root sans extension""" return os.path.splitext(os.path.basename(path))[0] class SQLiteConnection(SQLConnection): """connection class to SQLite database""" # mapping of python types to SQLite types types = {int: "INTEGER", float: "REAL", str: "TEXT"} def __init__(self, db_file): if not os.path.exists(db_file): with open(db_file, 'wb') as f: # touch file pass self.db_file = db_file self.conn = sqlite3.connect(self.db_file) def __call__(self, sql, *args): c = self.conn.cursor() c.execute(sql, args) retval = c.fetchall() self.conn.commit() return retval def __del__(self): self.conn.close() def tables(self): """ return tables in the SQLite database Ref: https://stackoverflow.com/questions/82875/how-to-list-the-tables-in-an-sqlite-database-file-that-was-opened-with-attach """ sql = "SELECT name FROM sqlite_master WHERE type='table';" results = self(sql) return [row[0] for row in results] def create_table(self, table, **columns): """add a table to the database""" columns = ', '.join(["{key} {value}".format(key=key, value=value) for key, value in columns.items()]) self("CREATE TABLE IF NOT EXISTS {table} ({columns});".format(table=table, columns=columns)) def schema(self, table): """get dictionary of typing for a table""" # http://www.sqlite.org/pragma.html#pragma_table_info sql = "PRAGMA table_info('{}')".format(table) results = self(sql) # TODO: get data types from e.g. # [(0, u'how', u'', 0, None, 0), (1, u'are', u'', 0, None, 0), (2, u'you', u'', 0, None, 0)] # ... now # [(0, u'how', u'TEXT', 0, None, 0), (1, u'you', u'TEXT', 0, None, 0), (2, u'are', u'TEXT', 0, None, 0)] return OrderedDict([(result[1], result[2]) for result in results]) def columns(self, table): """return ordered list of column names""" return self.schema(table).keys() def select(self, table): """just `select *` for now""" sql = "SELECT * FROM {table};" sql = sql.format(table=table) data = self(sql) columns = self.columns(table) return [OrderedDict(zip(columns, row)) for row in data] select_all = select class CSV2SQLite(object): """ convert Comma Separated Value input to SQLite """ # TODO: allow conversion to arbitrary SQL def __init__(self, output): self.conn = SQLiteConnection(output) def __call__(self, *csv_files, **csv_dict): # allow tables of default(ing) name csv_dict.update({csv_file: None for csv_file in csv_files}) for csv_file, tablename in csv_dict.items(): self.csv2table(csv_file, tablename=tablename) def csv2table(self, csv_file, tablename=None): if isinstance(csv_file, string): with open(csv_file) as f: return self.csv2table(f, tablename=tablename) if tablename is None: # TODO: allow lookup from scheme tablename = path_root(csv_file.name) # read csv data = read_csv(csv_file) assert data # infer schema from data # TODO: allow lookup from scheme columns = {column: "TEXT" for column in data[0].keys()} # create table self.conn.create_table(tablename, **columns) # inseert data for row in data: self.conn.insert_row(tablename, **row) def main(args=sys.argv[1:]): """CLI""" # parse command line parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('input', nargs='+', type=argparse.FileType('r'), help="input CSV files; table names taken from file names") parser.add_argument('-o', '--output', dest='output', required=True, help="output SQLite file") options = parser.parse_args(args) # overwrite the file # TODO: deprecate and allow appending with open(options.output, 'w') as f: pass # instantiate converter conn = CSV2SQLite(options.output) # convert input CSV to SQLite tables conn(*options.input) if __name__ == '__main__': main()