view lemuriformes/csv2sqlite.py @ 14:756dbd3e391e

JSON deduplication module
author Jeff Hammel <k0scist@gmail.com>
date Sun, 10 Dec 2017 17:10:25 -0800
parents afc259799019
children
line wrap: on
line source

#!/usr/bin/env python

"""
convert CSV files to a SQLite DB file
"""

# For SQLite types see
# http://www.sqlite.org/datatype3.html

import argparse
import csv
import os
import sqlite3
import sys
from collections import OrderedDict
from .cast import string
from .sql import SQLConnection


def read_csv(csv_file):
    """read csv file with header and return list of dicts"""

    reader = csv.DictReader(csv_file)
    return [row for row in reader]


def path_root(path):
    """return basename file root sans extension"""

    return os.path.splitext(os.path.basename(path))[0]


class SQLiteConnection(SQLConnection):
    """connection class to SQLite database"""

    # mapping of python types to SQLite types
    types = {int: "INTEGER",
             float: "REAL",
             str: "TEXT"}

    def __init__(self, db_file):
        if not os.path.exists(db_file):
            with open(db_file, 'wb') as f:
                # touch file
                pass
        self.db_file = db_file
        self.conn = sqlite3.connect(self.db_file)

    def __call__(self, sql, *args):
        c = self.conn.cursor()
        c.execute(sql, args)
        retval = c.fetchall()
        self.conn.commit()
        return retval

    def __del__(self):
        self.conn.close()

    def tables(self):
        """
        return tables in the SQLite database
        Ref: https://stackoverflow.com/questions/82875/how-to-list-the-tables-in-an-sqlite-database-file-that-was-opened-with-attach
        """
        sql = "SELECT name FROM sqlite_master WHERE type='table';"
        results = self(sql)
        return [row[0] for row in results]

    def create_table(self, table, **columns):
        """add a table to the database"""

        columns = ', '.join(["{key} {value}".format(key=key, value=value)
                             for key, value in columns.items()])
        self("CREATE TABLE IF NOT EXISTS {table} ({columns});".format(table=table,
                                                                      columns=columns))

    def schema(self, table):
        """get dictionary of typing for a table"""
        # http://www.sqlite.org/pragma.html#pragma_table_info

        sql = "PRAGMA table_info('{}')".format(table)
        results = self(sql)
        # TODO: get data types from e.g.
        # [(0, u'how', u'', 0, None, 0), (1, u'are', u'', 0, None, 0), (2, u'you', u'', 0, None, 0)]
        # ... now
        # [(0, u'how', u'TEXT', 0, None, 0), (1, u'you', u'TEXT', 0, None, 0), (2, u'are', u'TEXT', 0, None, 0)]
        return OrderedDict([(result[1], result[2])
                            for result in results])

    def columns(self, table):
        """return ordered list of column names"""

        return self.schema(table).keys()

    def select(self, table):
        """just `select *` for now"""

        sql = "SELECT * FROM {table};"
        sql = sql.format(table=table)
        data = self(sql)
        columns = self.columns(table)
        return [OrderedDict(zip(columns, row))
                for row in data]

    select_all = select


class CSV2SQLite(object):
    """
    convert Comma Separated Value input to SQLite
    """
    # TODO: allow conversion to arbitrary SQL

    def __init__(self, output):
        self.conn = SQLiteConnection(output)

    def __call__(self, *csv_files, **csv_dict):

        # allow tables of default(ing) name
        csv_dict.update({csv_file: None
                         for csv_file in csv_files})

        for csv_file, tablename in csv_dict.items():
            self.csv2table(csv_file, tablename=tablename)

    def csv2table(self, csv_file, tablename=None):
        if isinstance(csv_file, string):
            with open(csv_file) as f:
                return self.csv2table(f, tablename=tablename)
        if tablename is None:
            # TODO: allow lookup from scheme
            tablename = path_root(csv_file.name)
        # read csv
        data = read_csv(csv_file)
        assert data

        # infer schema from data
        # TODO:  allow lookup from scheme
        columns = {column: "TEXT" for column in data[0].keys()}

        # create table
        self.conn.create_table(tablename, **columns)

        # inseert data
        for row in data:
            self.conn.insert_row(tablename, **row)


def main(args=sys.argv[1:]):
    """CLI"""

    # parse command line
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('input', nargs='+',
                        type=argparse.FileType('r'),
                        help="input CSV files; table names taken from file names")
    parser.add_argument('-o', '--output', dest='output',
                        required=True,
                        help="output SQLite file")
    options = parser.parse_args(args)

    # overwrite the file
    # TODO:  deprecate and allow appending
    with open(options.output, 'w') as f:
        pass

    # instantiate converter
    conn = CSV2SQLite(options.output)

    # convert input CSV to SQLite tables
    conn(*options.input)


if __name__ == '__main__':
    main()