4
|
1 #!/usr/bin/env python
|
|
2
|
|
3
|
|
4 """
|
|
5 interpolation
|
|
6 """
|
|
7
|
36
|
8 import argparse
|
37
|
9 import sys
|
36
|
10
|
10
|
11 __all__ = ['neighbors', 'linear_interpolation', 'InterpolateParser']
|
4
|
12
|
|
13 def neighbors(start, finish):
|
|
14 """
|
|
15 returns the neighbors in finish from start
|
|
16 assumes both are sorted
|
|
17 """
|
|
18 assert finish
|
|
19 index = 0
|
|
20 retval = []
|
|
21 for x in start:
|
|
22 # endpoints
|
|
23 if x < finish[0]:
|
|
24 retval.append((None, 0))
|
|
25 continue
|
|
26 if x > finish[-1]:
|
|
27 retval.append((len(finish)-1, None))
|
|
28 continue
|
|
29 # traverse
|
|
30 try:
|
|
31 while True:
|
|
32 if x < finish[index] or x > finish[index+1]:
|
|
33 index += 1
|
|
34 continue
|
|
35 else:
|
|
36 break
|
|
37 retval.append((index, index+1))
|
|
38 except IndexError:
|
|
39 retval.append((len(finish)-2, len(finish)-1))
|
|
40
|
|
41 return retval
|
|
42
|
|
43
|
|
44 def linear_interpolation(data, points):
|
|
45 """
|
|
46 linearly interpolate data to points
|
|
47
|
|
48 data -- iterable of 2-tuples (or equivalent) of `x,y`
|
|
49 points -- `x`-values to interpolate to
|
|
50 """
|
|
51
|
|
52 # ensure we are sorted
|
|
53 data = sorted(data, key=lambda x: x[0])
|
|
54 points = sorted(points)
|
|
55
|
|
56 # get the neighbors
|
|
57 x = [value[0] for value in data]
|
|
58 nearest_neighbors = neighbors(points, x)
|
|
59
|
|
60 # we don't support endpoints yet; this is interpolation, not extrapolation
|
|
61 assert not any([(neighbor[0] is None or neighbor[1] is None)
|
|
62 for neighbor in nearest_neighbors])
|
|
63
|
|
64 retval = []
|
|
65 for index, (left, right) in enumerate(nearest_neighbors):
|
|
66 # linearly interpolate
|
|
67 ratio = (points[index] - data[left][0])/float(data[right][0] - data[left][0])
|
|
68 retval.append(ratio*data[right][1] + (1.-ratio)*data[left][1])
|
|
69 return retval
|
|
70
|
10
|
71 class InterpolateParser(argparse.ArgumentParser):
|
|
72 """CLI option parser"""
|
4
|
73
|
10
|
74 def __init__(self, **kwargs):
|
|
75 kwargs.setdefault('description', __doc__)
|
|
76 argparse.ArgumentParser.__init__(self, **kwargs)
|
|
77 self.add_argument('input', nargs='?',
|
|
78 type=argparse.FileType('r'), default=sys.stdin,
|
|
79 help='input file, or read from stdin if ommitted')
|
|
80 self.add_argument('-o', '--output', dest='output',
|
|
81 type=argparse.FileType('w'), default=sys.stdout,
|
|
82 help="output file, or stdout if ommitted")
|
|
83 self.add_argument('--points', '--print-points', dest='print_points',
|
|
84 action='store_true', default=False,
|
|
85 help="print the points to interpolate to and exit")
|
|
86 self.options = None
|
|
87
|
|
88 def parse_args(self, *args, **kw):
|
|
89 options = argparse.ArgumentParser.parse_args(self, *args, **kw)
|
|
90 self.validate(options)
|
|
91 self.options = options
|
|
92 return options
|
|
93
|
|
94 def validate(self, options):
|
|
95 """validate options"""
|
|
96
|
|
97 def main(args=sys.argv[1:]):
|
|
98 """CLI"""
|
|
99
|
|
100 # parse command line options
|
|
101 parser = InterpolateParser()
|
|
102 options = parser.parse_args(args)
|
|
103
|
|
104 # read the CSV
|
|
105 reader = csv.reader(options.input)
|
|
106 data = [[float(col) for col in row] for row in reader]
|
|
107 ncols = set([len(row) for row in data])
|
|
108 assert len(ncols) == 1
|
|
109 ncols = ncols.pop()
|
|
110 assert ncols > 1
|
|
111
|
|
112 # get `x` values
|
|
113 data = sorted(data, key=lambda x: x[0])
|
|
114 x = [row[0] for row in data]
|
|
115 xmin = int(x[0]) + 1
|
|
116 xmax = int(x[-1])
|
|
117 points = range(xmin, xmax+1)
|
|
118 if options.print_points:
|
|
119 print ('\n'.join([str(point) for point in points]))
|
|
120 return
|
|
121
|
|
122 # make into x,y series
|
|
123 series = [[(row[0], row[col]) for row in data]
|
|
124 for col in range(1,ncols)]
|
|
125
|
|
126 # interpolate
|
|
127 interpolated = [linear_interpolation(s, points) for s in series]
|
|
128
|
|
129 # output interpolated data
|
|
130 writer = csv.writer(options.output)
|
|
131 for row in zip(points, *interpolated):
|
|
132 writer.writerow(row)
|
|
133
|
|
134 if __name__ == '__main__':
|
|
135 main()
|