view speedtest/speedtest.py @ 0:26e919a36f86 default tip

speedtest containerized dispatching software
author Jeff Hammel <k0scist@gmail.com>
date Thu, 09 Feb 2017 09:47:04 -0800
parents
children
line wrap: on
line source

#!/usr/bin/env python

"""
measure download speed of a URL
"""

# imports
import argparse
import csv
import json
import os
import requests
import sys
import time
from StringIO import StringIO

string = (str, unicode)

### library functions

def is_url(path):
    return '://' in path


### Results handling

class SpeedtestResults(object):
    """ABC for speedtest results"""

    columns = ['duration',  # seconds
               'size',      # bytes
               'speed',     # bytes/s
               'cumulative' # bytes/s
    ]
    # TODO:
    # speed and cumulative are derived quantities;
    # this ABC should calculate them

    def __init__(self, url, method='POST'):
        self.url = url
        self.rows = []
        self.method = method

    def add(self, duration, size, speed, cumulative):
        """add a particular measurement"""

        raise NotImplementedError("Abstract Base Class")

    def body(self):
        """results body"""
        # TODO: e.g. post-processing
        raise NotImplementedError("Abstract Base Class")

    def write(self, output):

        # get body
        body = self.body()

        # write it!
        if isinstance(output, str):
            if is_url(output):
                self.write_http(output, body)
            else:
                with open(output, 'w') as f:
                    f.write(body)
        else:
            output.write(body)
            output.flush()

    def write_http(self, url, data):
        """'write' to HTTP"""

        response = requests.request(self.method, url, data=data)
        print (response.text)
        response.raise_for_status()


class CSVResults(SpeedtestResults):
    """output CSV speedtest results"""

    def add(self, duration, size, speed, cumulative):
        self.rows.append([locals()[column] for column in self.columns])

    def body(self):
        buffer = StringIO()
        writer = csv.writer(buffer)
        writer.writerows(self.rows)
        return buffer.getvalue()


class JSONResults(SpeedtestResults):
    """output JSON speedtest results"""

    def __init__(self, url, data=None):
        SpeedtestResults.__init__(self, url)
        self.base_data = {'url': self.url}
        self.base_data.update(data or {})

    def add(self, duration, size, speed, cumulative):
        data = self.base_data.copy()
        data.update(dict([(column, locals()[column]) for column in self.columns]))
        self.rows.append(data)

    def body(self):
        return '\n'.join([json.dumps(row) for row in self.rows])


class ElasticsearchResults(JSONResults):
    """output Elasticsearch results"""

    def body(self):
        # see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
        # this assumes  that the `index` and `type` are specified by the URL
        # while less than wonderful, I think that is fair
        master = {'index': {}}
        return '\n'.join([json.dumps(master) + '\n' + json.dumps(row)
                          for row in self.rows]) + '\n'


results_handlers = {'csv': CSVResults,
                    'json': JSONResults,
                    'elasticsearch': ElasticsearchResults}


### CLI

class SpeedtestParser(argparse.ArgumentParser):
    """parser for speedtest"""

    def __init__(self, **kwargs):
        kwargs.setdefault('description', __doc__)
        kwargs.setdefault('formatter_class',
                          argparse.ArgumentDefaultsHelpFormatter)
        argparse.ArgumentParser.__init__(self, **kwargs)
        self.add_arguments()
        self.options = None

    def keyvalue(self, string):
        """argparse key-value pair"""
        if '=' not in string:
            self.error("Expected '=' in {string}".format(string=string))
        return string.split('=', 1)

    def add_arguments(self):
        self.add_argument('url')
        self.add_argument('-o', '--output', dest='output',
                          help="output file or URL, or stdout by default")
        self.add_argument('--variables', dest='variables',
                          nargs='+', type=self.keyvalue, default=(),
                          help="additional key-value pairs to add to each record")
        self.add_argument('--fmt', '--format', dest='format',
                          choices=results_handlers.keys(), default='json',
                          help="results handler")
        self.add_argument('--chunk', '--chunk-size', dest='chunk_size',
                          type=int, default=1024,
                          help="chunk size in bytes for streaming")

    def parse_args(self, *args, **kw):
        options = argparse.ArgumentParser.parse_args(self, *args, **kw)
        self.validate(options)
        self.options = options
        return self.options

    def validate(self, options):
        """validate options"""

    def results_handler(self):
        """return appropriate results handler according to arguments"""

        # ensure that we've called `parse_args` to parse the arguments
        assert self.options is not None
        if self.options is None:
            raise AssertionError("`results_handler` must be called after arguments are parse")

        # get kwargs for the chosen format
        format = self.options.format
        kwargs = {}
        if format in ('json', 'elasticsearch'):
            data = dict(self.options.variables)
            # only do hostname for now
            hostname = os.environ.get('HOSTNAME')
            if hostname:
                data['hostname'] = hostname
            kwargs['data'] = data
        # instantiate and return handler
        return results_handlers[format](self.options.url, **kwargs)


def main(args=sys.argv[1:]):
    """CLI"""

    # parse CLI
    parser = SpeedtestParser()
    options = parser.parse_args(args)

    # setup output
    output = parser.results_handler()

    # start the request
    start = time.time()
    response = requests.get(options.url, stream=True, timeout=30)

    # iterate through the response
    size = 0
    last = start
    for chunk in response.iter_content(options.chunk_size):
        now = time.time()
        duration = now - start
        size += len(chunk)
        speed = len(chunk)/(now - last)
        cumulative = size/duration
        last = now

        # add results to output
        output.add(duration=duration,
                   size=size,
                   speed=int(speed),
                   cumulative=int(cumulative))

    # output
    output.write(options.output or sys.stdout)


if __name__ == '__main__':
    main()