changeset 140:372315b3bb8e

author Jeff Hammel <>
date Thu, 06 Nov 2014 07:41:05 -0800
parents 30abbd61ea5e
children c6aea14a3e2b
files configuration/ tests/
diffstat 2 files changed, 866 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/configuration/	Thu Nov 06 07:41:05 2014 -0800
@@ -0,0 +1,639 @@
+#!/usr/bin/env python
+unified configuration with serialization/deserialization
+# imports
+import argparse
+import copy
+import os
+import sys
+# imports for configuration providers
+    import json
+except ImportError:
+    try:
+        import simplejson as json
+    except ImportError:
+        json = None
+    import yaml
+except ImportError:
+    yaml = None
+# module contents
+__all__ = ['Configuration',
+           'configuration_providers',
+           'types',
+           'UnknownOptionException',
+           'MissingValueException',
+           'ConfigurationProviderException',
+           'TypeCastException',
+### exceptions
+class UnknownOptionException(Exception):
+    """exception raised when a non-configuration value is present in the configuration"""
+class MissingValueException(Exception):
+    """exception raised when a required value is missing"""
+class ConfigurationProviderException(Exception):
+    """exception raised when a configuration provider is missing, etc"""
+class TypeCastException(Exception):
+    """exception raised when a configuration item cannot be coerced to a type"""
+### configuration providers for serialization/deserialization
+configuration_providers = []
+class ConfigurationProvider(object):
+    """
+    abstract base class for configuration providers for
+    serialization/deserialization
+    """
+    def read(self, filename):
+        raise NotImplementedError("Abstract base class")
+    def write(self, config, filename):
+        if isinstance(filename, basestring):
+            f = file(filename, 'w')
+            newfile = True
+        else:
+            f = filename
+            newfile = False
+        try:
+            self._write(f, config)
+        finally:
+            if newfile:
+                f.close()
+    def _write(self, fp, config):
+        raise NotImplementedError("Abstract base class")
+if json:
+    class JSON(ConfigurationProvider):
+        indent = 2
+        extensions = ['json']
+        def read(self, filename):
+            return json.loads(file(filename).read())
+        def _write(self, fp, config):
+            fp.write(json.dumps(config, indent=self.indent, sort_keys=True))
+    configuration_providers.append(JSON())
+if yaml:
+    class YAML(ConfigurationProvider):
+        extensions = ['yml', 'yaml']
+        dump_args = {'default_flow_style': False}
+        def read(self, filename):
+            f = file(filename)
+            config = yaml.load(f)
+            f.close()
+            return config
+        def _write(self, fp, config):
+            fp.write(yaml.dump(config, **self.dump_args))
+            # TODO: could use templates to get order down, etc
+    configuration_providers.append(YAML())
+# TODO: add configuration providers
+# - for taking command-line arguments from a file
+# - for .ini files
+__all__.extend([i.__class__.__name__ for i in configuration_providers])
+### optparse interface
+#class ConfigurationOption(optparse.Option):
+#    """option that keeps track if it is seen"""
+#    # TODO: this should be configurable or something
+#    def take_action(self, action, dest, opt, value, values, parser):
+#        # switch on types
+#        formatter = getattr(parser, 'cli_formatter')
+#        if formatter:
+#            formatter = formatter(dest)
+#            if formatter:
+#                value = formatter(value)
+        # call the optparse front-end
+#        optparse.Option.take_action(self, action, dest, opt, value, values, parser)
+#        # add the parsed option to the set of things parsed
+#        if not hasattr(parser, 'parsed'):
+#            parser.parsed = dict()
+#        parser.parsed[dest] = value
+### plugins for option types
+class BaseCLI(object):
+    """base_cli for all option types"""
+    def __call__(self, name, value):
+        """return args, kwargs needed to instantiate an optparse option"""
+        args = value.get('flags', ['--%s' % name])
+        if not args:
+            # No CLI interface
+            return (), {}
+        kw = {'dest': name}
+        help = value.get('help', name)
+        if 'default' in value:
+            kw['default'] = value['default']
+            help += ' [DEFAULT: %(default)s]'
+        kw['help'] = help
+        kw['action'] = 'store'
+        return args, kw
+    def take_action(self, value):
+        return value
+class BoolCLI(BaseCLI):
+    def __call__(self, name, value):
+        # preserve the default values
+        help = value.get('help')
+        flags = value.get('flags')
+        args, kw = BaseCLI.__call__(self, name, value)
+        kw['help'] = help # reset
+        if value.get('default'):
+            kw['action'] = 'store_false'
+            if not flags:
+                args = ['--no-%s' % name]
+            if not help:
+                kw['help'] = 'disable {}'.format(name)
+        else:
+            kw['action'] = 'store_true'
+            if not help:
+                kw['help'] = 'enable {}'.format(name)
+        return args, kw
+class IntCLI(BaseCLI):
+    def __call__(self, name, value):
+        pass # TODO
+class ListCLI(BaseCLI):
+    def __call__(self, name, value):
+        args, kw = BaseCLI.__call__(self, name, value)
+        kw['nargs'] = '+'
+        return args, kw
+class DictCLI(ListCLI):
+    delimeter = '='
+    def __call__(self, name, value):
+        # optparse can't handle dict types OOTB
+        default = value.get('default')
+        if isinstance(default, dict):
+            value = copy.deepcopy(value)
+            value['default'] = default.items()
+        return ListCLI.__call__(self, name, value)
+    def take_action(self, value):
+        if self.delimeter not in value:
+            raise AssertionError("Each value must be delimited by '%s': %s" % (self.delimeter, value))
+        return value.split(self.delimeter, 1)
+# types of CLI arguments
+types = {bool:  BoolCLI(),
+         list:  ListCLI(),
+         dict:  DictCLI(),
+         str:   BaseCLI(),
+         None:  BaseCLI()} # default
+__all__ += [i.__class__.__name__ for i in types.values()]
+class Configuration(argparse.ArgumentParser):
+    """declarative configuration object"""
+    options = {}         # configuration basis definition
+    extend = set()       # which dicts/lists should be extended
+    load_option = 'load' # where to put the load option
+    load_help = "load configuration from a file"
+    @classmethod
+    def parse(cls, args, *_args, **_kwargs):
+        """get the resultant config dictionary in a single call"""
+        conf = cls(*_args, **_kwargs)
+        conf.parse_args(*_args, **_kwargs)
+        return conf.config
+    def __init__(self,
+                 configuration_providers=configuration_providers,
+                 types=types,
+                 load=None,
+                 dump='--dump',
+                 **parser_args):
+        # sanity check
+        if isinstance(self.options, dict):
+            self.option_dict = self.options
+        elif isinstance(self.options, (list, tuple)):
+            self.option_dict = dict(self.options)
+        else:
+            raise NotImplementedError("Configuration: `options` should be castable to a dict")
+        # setup configuration
+        self.config = {}
+        self.configuration_providers = configuration_providers
+        self.types = types
+        self.added = set() # set of items added to the configuration
+        if 'description' not in parser_args:
+            parser_args['description'] = getattr(self, '__doc__', '')
+#            if 'formatter' not in parser_args:
+#                class PlainDescriptionFormatter(optparse.IndentedHelpFormatter):
+#                    """description formatter for console script entry point"""
+#                    def format_description(self, description):
+#                        if description:
+#                            return description.strip() + '\n'
+#                        else:
+#                            return ''
+#                parser_args['formatter'] = PlainDescriptionFormatter()
+#        parser_args.setdefault('option_class', ConfigurationOption)
+        # setup commandline parser
+        argparse.ArgumentParser.__init__(self, **parser_args)
+        self.parsed = dict()
+        self.add_arguments(self)
+        # add option(s) for configuration_providers
+        if load:
+            self.add_argument(load,
+                              dest=self.load_option,
+                              nargs='+',
+                              help=self.load_help)
+        elif load is None:
+            self.add_argument(self.load_option, nargs='*',
+                              help=self.load_help)
+        # add an option for dumping
+        formats = self.formats()
+        if formats and dump:
+            if isinstance(dump, basestring):
+                dump = [dump]
+            dump = list(dump)
+            self.add_argument(*dump,
+                              **dict(dest='dump',
+                                     help="Output configuration file; Formats: {}".format(formats)))
+    ### iteration
+    def items(self):
+        """items in options"""
+        # TODO: make the class a real iterator
+        # allow options to be a list of 2-tuples
+        if isinstance(self.options, dict):
+            return self.options.items()
+        return self.options
+    ### methods for validating configuration
+    def check(self, config):
+        """
+        check validity of configuration to be added
+        """
+        # ensure options in configuration are in self.options
+        unknown_options = [i for i in config if i not in self.option_dict]
+        if unknown_options:
+            raise UnknownOptionException("Unknown options: {}".format(', '.join(unknown_options)))
+        # ensure options are of the right type (if specified)
+        for key, value in config.items():
+            _type = self.option_dict[key].get('type')
+            if _type is None and 'default' in self.option_dict[key]:
+                _type = type(self.option_dict[key]['default'])
+            if _type is not None:
+                tocast = True
+                try:
+                    if isinstance(value, _type):
+                        tocast = False
+                except TypeError:
+                    # type is a type-casting function, not a proper type
+                    pass
+                if tocast:
+                    try:
+                        config[key] = _type(value)
+                    except BaseException, e:
+                        raise TypeCastException("Could not coerce '%s'=%s, to type %s: %s" % (key, value, _type.__name__, e))
+        return config
+    def validate(self):
+        """validate resultant configuration"""
+        for key, value in self.items():
+            if key not in self.config:
+                required = value.get('required')
+                if required:
+                    if isinstance(required, basestring):
+                        required_message = required
+                    else:
+                        required_message = "Parameter {} is required but not present".format(key)
+                    # TODO: this should probably raise all missing values vs
+                    # one by one
+                    raise MissingValueException(required_message)
+    ### methods for adding configuration
+    def default_config(self):
+        """configuration defaults"""
+        defaults = {}
+        for key, value in self.items():
+            if 'default' in value:
+                defaults[key] = value['default']
+        return copy.deepcopy(defaults)
+    def get(self, key, default=None):
+        return self.config.get(key, default)
+    def __getitem__(self, key):
+        return self.config[key]
+    def __call__(self, *args):
+        """add items to configuration and check it"""
+        # start with defaults
+        self.config = self.default_config()
+        # add the configuration
+        for config in args:
+            self.add(config)
+        # validate total configuration
+        self.validate()
+        # return the configuration
+        return self.config
+    def add(self, config, check=True):
+        """update configuration: not undoable"""
+        # check config to be added
+        self.check(config)
+        # add the configuration
+        for key, value in config.items():
+            value = copy.deepcopy(value)
+            if key in self.extend and key in self.config:
+                type1 = type(self.config[key])
+                type2 = type(value)
+                assert type1 == type2 # XXX hack
+                if type1 == dict:
+                    self.config[key].update(value)
+                elif type1 == list:
+                    self.config[key].extend(value)
+                else:
+                    raise NotImplementedError
+            else:
+                self.config[key] = value
+            self.added.add(key)
+    ### methods for argparse
+    ### XXX could go in a subclass
+    def help_formatter(self, option):
+        if option in self.option_dict:
+            handler = self.types[self.option_type(option)]
+            return getattr(handler, 'take_action', lambda x: x)
+    def option_type(self, name):
+        """get the type of an option named `name`"""
+        value = self.option_dict[name]
+        if 'type' in value:
+            return value['type']
+        if 'default' in value:
+            default = value['default']
+            if default is None:
+                return None  # not <type 'NoneType'>
+            return type(value['default'])
+    def add_arguments(self, parser):
+        """add argparse arguments to an ArgumentParser instance"""
+        for key, value in self.items():
+            try:
+                handler = self.types[self.option_type(key)]
+            except KeyError:
+                # if an option can't be coerced to a type
+                # we should just not add a CLI handler for it
+                continue
+            args, kw = handler(key, value)
+            if not args:
+                # No CLI interface
+                continue
+            parser.add_argument(*args, **kw)
+    ### functions for loading configuration
+    def configuration_files(self, options):
+        """configuration files to read"""
+        configuration_files = getattr(options, self.load_option, [])
+        if isinstance(configuration_files, basestring):
+            configuration_files = [configuration_files]
+        return configuration_files
+    def load_configuration_file(self, filename):
+        """load a configuration file"""
+        return self.deserialize(filename)
+    def read_configuration_files(self, options):
+        """deserialize configuration"""
+        configuration_files = self.configuration_files(options)
+        # ensure all files are present
+        missing = [i for i in configuration_files
+                   if not os.path.exists(i)]
+        if missing:
+            self.error("Missing files: {}".format(', '.join(missing)))
+        # load configuration files
+        config = []
+        for f in configuration_files:
+            try:
+                loaded_config = self.load_configuration_file(f)
+                if loaded_config:
+                    config.append(loaded_config)
+            except BaseException, e:
+                parser.error(str(e))
+        return config
+    def parse_args(self, *args, **kw):
+        """"""
+        # parse command line options
+        self.parsed = dict()
+        options = argparse.ArgumentParser.parse_args(self, *args, **kw)
+        # get CLI configuration options
+        cli_config = dict([(key, value) for key, value in options.__dict__.items()
+                           if key in self.option_dict and key in self.parsed])
+        # deserialize configuration
+        config = self.read_configuration_files(options)
+        config.append(cli_config)
+        missingvalues = None
+        try:
+            # generate configuration
+            self(*config)
+        except MissingValueException, missingvalues:
+            # errors are handled below
+            pass
+        # dump configuration
+        self.dump(options, missingvalues)
+        # update options from config
+        options.__dict__.update(self.config)
+        # return parsed arguments
+        return options
+    def dump(self, options, missingvalues):
+        """dump configuration, if specified"""
+        if missingvalues:
+            self.error(str(missingvalues))
+        dump = getattr(options, 'dump')
+        if dump:
+            # TODO: have a way of specifying format other than filename
+            self.serialize(dump)
+    ### serialization/deserialization
+    def formats(self):
+        """formats for deserialization"""
+        retval = []
+        for provider in self.configuration_providers:
+            if provider.extensions and hasattr(provider, 'write'):
+                retval.append(provider.extensions[0])
+        return retval
+    def configuration_provider(self, format):
+        """configuration provider guess for a given filename"""
+        for provider in self.configuration_providers:
+            if format in provider.extensions:
+                return provider
+    def filename2format(self, filename):
+        extension = os.path.splitext(filename)[-1]
+        return extension.lstrip('.') or None
+    def serialize(self, filename, format=None, full=False):
+        """
+        serialize configuration to a file
+        - filename: path of file to serialize to
+        - format: format of configuration provider
+        - full: whether to serialize non-set optional strings [TODO]
+        """
+        # TODO: allow file object vs file name
+        if not format:
+            format = self.filename2format(filename)
+            if not format:
+                raise Exception('Please specify a format')
+                # TODO: more specific exception type
+        provider = self.configuration_provider(format)
+        if not provider:
+            raise Exception("Provider not found for format: %s" % format)
+        config = copy.deepcopy(self.config)
+        provider.write(config, filename)
+    def deserialize(self, filename, format=None):
+        """load configuration from a file"""
+        # TODO: allow file object vs file name
+        assert os.path.exists(filename)
+        # get the format
+        format = format or self.filename2format(filename)
+        # get the providers in some sensible order
+        providers = self.configuration_providers[:]
+        if format:
+            providers.sort(key=lambda x: int(format in x.extensions), reverse=True)
+        # deserialize the data
+        for provider in providers:
+            try:
+                return
+            except:
+                continue
+        else:
+            raise ConfigurationProviderException("Could not load {}".format(filename))
+class UserConfiguration(Configuration):
+    """`Configuration` class driven by a config file in user-space"""
+    # configuration items to interpolate as paths
+    # TODO: integrate with above BaseCLI
+    paths = []
+    def __init__(self, config=None, load='--config', **kwargs):
+        # default configuration file
+        self.config_name = config or '.' + os.path.splitext(sys.argv[0])[0]
+        self.default_config_file = os.path.join('~', self.config_name)
+        self.default_config_file_path = os.path.expanduser(self.default_config_file)
+        if os.path.exists(self.default_config_file_path):
+            self.load_help += ' [DEFAULT: %s]' % self.default_config_file
+        Configuration.__init__(self, load=load, **kwargs)
+    def validate(self):
+        Configuration.validate(self)
+        for path in self.paths:
+            self.config[path] = os.path.expanduser(self.config[path])
+    def configuration_files(self, options, args):
+        configuration_files = Configuration.configuration_files(self, options, args)
+        if not configuration_files:
+            # load user config only if no config provided
+            if os.path.exists(self.default_config_file_path):
+                configuration_files = [self.default_config_file_path]
+        return configuration_files
+    def load_configuration_file(self, filename):
+        config = Configuration.load_configuration_file(self, filename)
+        # ignore options that we don't care about
+        config = dict([(key, value) for key, value in config.items()
+                       if key in self.option_dict])
+        return config
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/	Thu Nov 06 07:41:05 2014 -0800
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+unit tests for configuration package
+import configuration
+import datetime
+import json
+import os
+import sys
+import tempfile
+import unittest
+from example import ExampleConfiguration # example configuration to test
+# globals
+here = os.path.dirname(os.path.abspath(__file__))
+class ConfigurationUnitTest(unittest.TestCase):
+    def test_cli(self):
+        """test command line interface"""
+        example = ExampleConfiguration()
+        # parse command line arguments
+        options, args = example.parse_args(['-a', 'ts', '--develop', '-e', '/home/jhammel/bin/firefox'])
+        # ensure that the options appropriately get set
+        self.assertEqual(bool(args), False) # no arguments
+        self.assertEqual(options.develop, True)
+        self.assertEqual(options.activeTests, ['ts'])
+        self.assertEqual(options.browser_path, '/home/jhammel/bin/firefox')
+        # ensure that the configuration appropriately gets updated
+        self.assertEqual(example.config['develop'], True)
+        self.assertEqual(example.config['activeTests'], ['ts'])
+        self.assertEqual(example.config['browser_path'], '/home/jhammel/bin/firefox')
+    def test_dict(self):
+        """test dictionary parsing from the "command line" """
+        # test adding a preference
+        example = ExampleConfiguration()
+        options, args = example.parse_args(['-a', 'ts', '--develop', '-e', '/home/jhammel/bin/firefox',
+                                            '--pref', 'hangmonitor.timeout=0'])
+        self.assertTrue('hangmonitor.timeout' in options.preferences)
+        # test overriding a preference
+        example = ExampleConfiguration()
+        options = example.parse_args(['-a', 'ts', '--develop', '-e', '/home/jhammel/bin/firefox',
+                                      '--pref', 'browser.bookmarks.max_backups=1'])
+        self.assertTrue(options.preferences['browser.bookmarks.max_backups'] == '1')
+    def test_configuration_providers(self):
+        """test file-based configuration providers"""
+        # requires json/simplejson to be installed
+        example = ExampleConfiguration()
+        # see what providers you got
+        json_provider = example.configuration_provider('json')
+        self.assertTrue(isinstance(json_provider, configuration.JSON))
+        # serialize to a temporary file
+        filename = tempfile.mktemp(suffix='.json')
+        self.assertEqual(example.filename2format(filename), 'json')
+        self.assertFalse(os.path.exists(filename))
+        config = {'browser_path': '/home/jhammel/bin/firefox',
+                  'activeTests':  ['ts']}
+        example(config)
+        config['test_timeout'] = 1200 # default
+        config['preferences'] = {"browser.bookmarks.max_backups": 0,
+                                 "browser.cache.disk.smart_size.enabled": False}
+        # ensure they are equal
+        self.assertEqual(config, example.config)
+        example.serialize(filename)
+        self.assertTrue(os.path.exists(filename))
+        serialized = json.loads(file(filename).read())
+        self.assertEqual(serialized, config)
+        # deserialize
+        deserialized = example.deserialize(filename)
+        self.assertEqual(deserialized, config)
+        # cleanup
+        if os.path.exists(filename):
+            os.remove(filename)
+    def test_missing_values(self):
+        """ensure that Configuration raises a missing value exception"""
+        example = ExampleConfiguration()
+        # monkey-patch the error method from optparse.OptionParser
+        error_msg = []
+        def error(msg):
+            error_msg.append(msg)
+        example.error = error
+        # trigger it
+        example.parse_args(args=[])
+        self.assertEqual(error_msg, ['Parameter browser_path is required but not present'])
+    def test_required(self):
+        """ensure you have to have required values"""
+        example = ExampleConfiguration()
+        # ensure you get an exception
+        missingvalueexception = None
+        try:
+            example()
+        except configuration.MissingValueException, e:
+            missingvalueexception = e
+        self.assertTrue(isinstance(e, configuration.MissingValueException))
+    def test_multiple_configurations(self):
+        """test having multiple configurations"""
+        # simple override
+        args1 = ['-e', '/opt/bin/firefox']
+        # simple serialized file
+        json_file = os.path.join(here, 'base.json')
+        assert os.path.exists(json_file)
+        json_config = json.loads(file(json_file).read())
+        # parse the json file
+        example = ExampleConfiguration()
+        example.parse_args([json_file])
+        self.assertEqual(example.config, json_config)
+        # parse the json file with overrides
+        example = ExampleConfiguration()
+        example.parse_args([json_file] + args1)
+        config = json_config.copy()
+        config['browser_path'] = '/opt/bin/firefox'
+        self.assertEqual(example.config, config)
+        # it shouldn't matter in which order the arguments are
+        example = ExampleConfiguration()
+        example.parse_args(args1 + [json_file])
+        self.assertEqual(example.config, config)
+        # Now a tricky case:
+        # the default value for test_timeout is 1200:
+        example = ExampleConfiguration()
+        self.assertEqual(example.options['test_timeout']['default'], 1200)
+        # The value from base.json is 60:
+        self.assertEqual(json_config['test_timeout'], 60)
+        self.assertEqual(config['test_timeout'], 60)
+        # but we can override it back from the "command line"
+        example.parse_args(args1 + [json_file, '--test_timeout', '1200'])
+        config['test_timeout'] = 1200
+        self.assertEqual(example.config, config)
+    def test_extend(self):
+        # default preferences
+        example = ExampleConfiguration()
+        default_prefs = {"browser.bookmarks.max_backups": 0,
+                         "browser.cache.disk.smart_size.enabled": False}
+        example.parse_args(['-a', 'ts', '-e', '/opt/bin/firefox'])
+        self.assertEqual(example.config['preferences'], default_prefs)
+        # now extend them
+        example = ExampleConfiguration()
+        default_prefs['network.dns.ipv4OnlyDomains'] = 'localhost'
+        tf = tempfile.mktemp()
+        f = file(tf, 'w')
+        f.write(json.dumps({'preferences': {'network.dns.ipv4OnlyDomains': 'localhost'}}))
+        f.close()
+        example.parse_args(['-a', 'ts', '-e', '/opt/bin/firefox', tf])
+        self.assertEqual(example.config['preferences'], default_prefs)
+        os.remove(tf)
+    def test_typecast(self):
+        """casting example"""
+        def todate(string):
+            return datetime.datetime.strptime(string, "%Y%m%d")
+        # make an example class
+        class TypecastExample(configuration.Configuration):
+            options = {'datestring': {'type': todate}}
+        example = TypecastExample()
+        # parse a date string
+        example({'datestring': "20120704"})
+        # ensure it works correctly
+        expected = datetime.datetime(2012, 7, 4, 0, 0)
+        self.assertEqual(example.config['datestring'], expected)
+    def test_added(self):
+        """test that we keep track of things added to the configuration"""
+        # make an example class
+        class AddedExample(configuration.Configuration):
+            options = {'foo': {},
+                       'bar': {}}
+        # parse it; there should be nothing
+        instance = AddedExample()
+        instance()
+        self.assertEqual(instance.added, set())
+        # parse it; there should be one thing
+        instance = AddedExample()
+        instance({'foo': 'foo'})
+        self.assertEqual(instance.added, set(['foo']))
+        # parse it; there should be two things
+        instance = AddedExample()
+        instance({'foo': 'foo'}, {'foo': 'FOO', 'bar': 'bar'})
+        self.assertEqual(instance.added, set(['foo', 'bar']))
+if __name__ == '__main__':
+    unittest.main()