Mercurial > hg > Lemuriformes
view lemuriformes/json2csv.py @ 18:56596902e9ae default tip
add some setup + tests
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Sun, 10 Dec 2017 17:57:03 -0800 |
parents | 0d1b8bb1d97b |
children |
line wrap: on
line source
#!/usr/bin/env python """ convert JSON list of hashes to CSV """ # Note: we could use https://docs.python.org/2/library/csv.html#csv.DictWriter # but we're being careful here since we actually want this data structure in code, # not just for de/serialization import argparse import csv import json import sys from .cast import unify from .cli import ConfigurationParser def flatten_list_of_dicts(list_of_dicts, header=None): """ flattens a list of dicts into a list of lists. Returns (header, list_of_lists) """ if not list_of_dicts: return [] # sanity keys = list_of_dicts[0].keys() if header: if not set(header).issubset(keys): raise AssertionError("header contains elements not seen in the set: {}".format(', '.format(set(header).difference(keys)))) for item in list_of_dicts: # ensure each item has the same keys if set(keys) != set(item.keys()): raise AssertionError("Keys not consistent! {} != {}".format(sorted(keys), sorted(item.keys()))) if not header: header = keys # to sort? # flatten it! retval = [] for item in list_of_dicts: retval.append([item[key] for key in header]) return (header, retval) def main(args=sys.argv[1:]): """CLI""" # parse command line parser = ConfigurationParser(description=__doc__) parser.add_argument('json', type=argparse.FileType('r'), help="JSON file of list of hashes") parser.add_argument('-H', '--header', dest='header', nargs='+', help="use these fields for header") parser.add_argument('-o', '--output', dest='output', type=argparse.FileType('w'), default=sys.stdout, help="path to output, or stdout by default") options = parser.parse_args(args) # read data = json.load(options.json) # flatten header, flattened = flatten_list_of_dicts(data, header=options.header) # write writer = csv.writer(options.output) writer.writerow(header) for row in flattened: writer.writerow([unify(v) for v in row]) if __name__ == '__main__': main()