Mercurial > hg > speedtest
view speedtest/speedtest.py @ 0:26e919a36f86 default tip
speedtest containerized dispatching software
author | Jeff Hammel <k0scist@gmail.com> |
---|---|
date | Thu, 09 Feb 2017 09:47:04 -0800 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python """ measure download speed of a URL """ # imports import argparse import csv import json import os import requests import sys import time from StringIO import StringIO string = (str, unicode) ### library functions def is_url(path): return '://' in path ### Results handling class SpeedtestResults(object): """ABC for speedtest results""" columns = ['duration', # seconds 'size', # bytes 'speed', # bytes/s 'cumulative' # bytes/s ] # TODO: # speed and cumulative are derived quantities; # this ABC should calculate them def __init__(self, url, method='POST'): self.url = url self.rows = [] self.method = method def add(self, duration, size, speed, cumulative): """add a particular measurement""" raise NotImplementedError("Abstract Base Class") def body(self): """results body""" # TODO: e.g. post-processing raise NotImplementedError("Abstract Base Class") def write(self, output): # get body body = self.body() # write it! if isinstance(output, str): if is_url(output): self.write_http(output, body) else: with open(output, 'w') as f: f.write(body) else: output.write(body) output.flush() def write_http(self, url, data): """'write' to HTTP""" response = requests.request(self.method, url, data=data) print (response.text) response.raise_for_status() class CSVResults(SpeedtestResults): """output CSV speedtest results""" def add(self, duration, size, speed, cumulative): self.rows.append([locals()[column] for column in self.columns]) def body(self): buffer = StringIO() writer = csv.writer(buffer) writer.writerows(self.rows) return buffer.getvalue() class JSONResults(SpeedtestResults): """output JSON speedtest results""" def __init__(self, url, data=None): SpeedtestResults.__init__(self, url) self.base_data = {'url': self.url} self.base_data.update(data or {}) def add(self, duration, size, speed, cumulative): data = self.base_data.copy() data.update(dict([(column, locals()[column]) for column in self.columns])) self.rows.append(data) def body(self): return '\n'.join([json.dumps(row) for row in self.rows]) class ElasticsearchResults(JSONResults): """output Elasticsearch results""" def body(self): # see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html # this assumes that the `index` and `type` are specified by the URL # while less than wonderful, I think that is fair master = {'index': {}} return '\n'.join([json.dumps(master) + '\n' + json.dumps(row) for row in self.rows]) + '\n' results_handlers = {'csv': CSVResults, 'json': JSONResults, 'elasticsearch': ElasticsearchResults} ### CLI class SpeedtestParser(argparse.ArgumentParser): """parser for speedtest""" def __init__(self, **kwargs): kwargs.setdefault('description', __doc__) kwargs.setdefault('formatter_class', argparse.ArgumentDefaultsHelpFormatter) argparse.ArgumentParser.__init__(self, **kwargs) self.add_arguments() self.options = None def keyvalue(self, string): """argparse key-value pair""" if '=' not in string: self.error("Expected '=' in {string}".format(string=string)) return string.split('=', 1) def add_arguments(self): self.add_argument('url') self.add_argument('-o', '--output', dest='output', help="output file or URL, or stdout by default") self.add_argument('--variables', dest='variables', nargs='+', type=self.keyvalue, default=(), help="additional key-value pairs to add to each record") self.add_argument('--fmt', '--format', dest='format', choices=results_handlers.keys(), default='json', help="results handler") self.add_argument('--chunk', '--chunk-size', dest='chunk_size', type=int, default=1024, help="chunk size in bytes for streaming") def parse_args(self, *args, **kw): options = argparse.ArgumentParser.parse_args(self, *args, **kw) self.validate(options) self.options = options return self.options def validate(self, options): """validate options""" def results_handler(self): """return appropriate results handler according to arguments""" # ensure that we've called `parse_args` to parse the arguments assert self.options is not None if self.options is None: raise AssertionError("`results_handler` must be called after arguments are parse") # get kwargs for the chosen format format = self.options.format kwargs = {} if format in ('json', 'elasticsearch'): data = dict(self.options.variables) # only do hostname for now hostname = os.environ.get('HOSTNAME') if hostname: data['hostname'] = hostname kwargs['data'] = data # instantiate and return handler return results_handlers[format](self.options.url, **kwargs) def main(args=sys.argv[1:]): """CLI""" # parse CLI parser = SpeedtestParser() options = parser.parse_args(args) # setup output output = parser.results_handler() # start the request start = time.time() response = requests.get(options.url, stream=True, timeout=30) # iterate through the response size = 0 last = start for chunk in response.iter_content(options.chunk_size): now = time.time() duration = now - start size += len(chunk) speed = len(chunk)/(now - last) cumulative = size/duration last = now # add results to output output.add(duration=duration, size=size, speed=int(speed), cumulative=int(cumulative)) # output output.write(options.output or sys.stdout) if __name__ == '__main__': main()