view numerics/table.py @ 180:69543d62ae7a

more split stubbing
author Jeff Hammel <k0scist@gmail.com>
date Tue, 09 Aug 2016 14:34:31 -0700
parents f63194f81f7d
children
line wrap: on
line source

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
parse header-based CSV
"""

# imports
import argparse
import csv
import json
import os
import sys
import time
from collections import OrderedDict

string = (str, unicode)


def duplicates(*values):
    """return all duplicates in `values`"""

    counts = {value: values.count(value)
              for value in set(values)}
    retval = []
    for value in values:
        if counts[value] > 1 and value not in retval:
            retval.append(value)
    return retval


def read_table(fp, verbose=False):
    """read table with header and return list of dictionaries"""

    if isinstance(fp, string):
        with open(fp, 'r') as _fp:
            return read_table(_fp, verbose)

    # read CSV
    data = [row for row in csv.reader(fp)]

    # check data
    columns =  set([len(row) for row in data])
    if len(columns) != 1:
        raise AssertionError("Expected: a constant number of columns, instead got: {}".format(', '.join([str(column)
                                                                                                         for column in sorted(columns)])))
    columns = columns.pop()
    if verbose:
        print "{} columns".format(columns)
    data = [[item.strip() for item in row]
            for row in data]

    # xform to JSON-format structure
    header = data.pop(0)
    if verbose:
        print "Header:\n{header}".format(header=json.dumps(header, indent=1))
    duplicate_fields = duplicates(*header)
    if duplicate_fields:
        raise AssertionError("Duplicate header fields found: {duplicates}".format(duplicates=', '.join(duplicate_fields)))
    return [OrderedDict(zip(header, row))
            for row in data]


class TableParser(argparse.ArgumentParser):
    """CLI option parser"""

    def __init__(self, **kwargs):
        kwargs.setdefault('formatter_class', argparse.RawTextHelpFormatter)
        kwargs.setdefault('description', __doc__)
        argparse.ArgumentParser.__init__(self, **kwargs)
        self.add_arguments()
        self.options = None

    def add_arguments(self):
        self.add_argument('input', type=argparse.FileType('r'),
                          help="input CSV file")
        self.add_argument('-o', '--output', dest='output',
                          type=argparse.FileType('w'), default=sys.stdout,
                          help="output file to write to, or stdout by default")
        self.add_argument('-c', '--column', dest='columns', nargs='+',
                          help="column names to output")
        self.add_argument('--format', dest='format',
                          choices=('json', 'csv'), default='json',
                          help="output in this format")
        self.add_argument('-v', '--verbose', dest='verbose',
                          action='store_true', default=False,
                          help="be verbose")

    def parse_args(self, *args, **kw):
        options = argparse.ArgumentParser.parse_args(self, *args, **kw)
        self.validate(options)
        self.options = options
        return options

    def validate(self, options):
        """validate options"""

    def read_table(self):

        assert self.options

        data = read_table(self.options.input,
                          verbose=self.options.verbose)
        if not data:
            parser.error("No data found: {}".format(self.options.intput))
        if self.options.columns:
            header = data[0].keys()
            missing = [column
                       for column in self.options.columns
                       if column not in header]
            if missing:
                self.error("Columns not found in header: {0}".format(", ".join(missing)))
            header = options.columns
            data = [OrderedDict(zip(header,
                                    [row[column] for column in header]))
                    for row in data]
        return data

def main(args=sys.argv[1:]):
    """CLI"""

    # parse command line options
    parser = TableParser()
    options = parser.parse_args(args)

    # read table
    data = parser.read_table()

    # output to JSON
    if options.verbose:
        print ("Output {format}:".format(format=options.format))
    if options.format == 'json':
        options.output.write(json.dumps(data, indent=1))
    elif options.format == 'csv':
        writer = csv.writer(options.output)
        for row in data:
            writer.writerow([row[column] for column in header])


if __name__ == '__main__':
    main()