# HG changeset patch # User Jeff Hammel # Date 1415288465 28800 # Node ID 372315b3bb8eb9764d50ca9185d409cc50248475 # Parent 30abbd61ea5e7c8e5a7ef37163ea9306444c4716 stubbing diff -r 30abbd61ea5e -r 372315b3bb8e configuration/configuration2.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/configuration/configuration2.py Thu Nov 06 07:41:05 2014 -0800 @@ -0,0 +1,639 @@ +#!/usr/bin/env python + +""" +unified configuration with serialization/deserialization +""" + +# imports +import argparse +import copy +import os +import sys + +# imports for configuration providers +try: + import json +except ImportError: + try: + import simplejson as json + except ImportError: + json = None +try: + import yaml +except ImportError: + yaml = None + +# module contents +__all__ = ['Configuration', + 'configuration_providers', + 'types', + 'UnknownOptionException', + 'MissingValueException', + 'ConfigurationProviderException', + 'TypeCastException', +] + + + +### exceptions + +class UnknownOptionException(Exception): + """exception raised when a non-configuration value is present in the configuration""" + +class MissingValueException(Exception): + """exception raised when a required value is missing""" + +class ConfigurationProviderException(Exception): + """exception raised when a configuration provider is missing, etc""" + +class TypeCastException(Exception): + """exception raised when a configuration item cannot be coerced to a type""" + + +### configuration providers for serialization/deserialization + +configuration_providers = [] + +class ConfigurationProvider(object): + """ + abstract base class for configuration providers for + serialization/deserialization + """ + def read(self, filename): + raise NotImplementedError("Abstract base class") + + def write(self, config, filename): + if isinstance(filename, basestring): + f = file(filename, 'w') + newfile = True + else: + f = filename + newfile = False + try: + self._write(f, config) + finally: + if newfile: + f.close() + def _write(self, fp, config): + raise NotImplementedError("Abstract base class") + +if json: + class JSON(ConfigurationProvider): + indent = 2 + extensions = ['json'] + def read(self, filename): + return json.loads(file(filename).read()) + def _write(self, fp, config): + fp.write(json.dumps(config, indent=self.indent, sort_keys=True)) + configuration_providers.append(JSON()) + +if yaml: + class YAML(ConfigurationProvider): + extensions = ['yml', 'yaml'] + dump_args = {'default_flow_style': False} + def read(self, filename): + f = file(filename) + config = yaml.load(f) + f.close() + return config + def _write(self, fp, config): + fp.write(yaml.dump(config, **self.dump_args)) + # TODO: could use templates to get order down, etc + + configuration_providers.append(YAML()) + +# TODO: add configuration providers +# - for taking command-line arguments from a file +# - for .ini files + +__all__.extend([i.__class__.__name__ for i in configuration_providers]) + + +### optparse interface + +#class ConfigurationOption(optparse.Option): +# """option that keeps track if it is seen""" +# # TODO: this should be configurable or something +# def take_action(self, action, dest, opt, value, values, parser): +# +# # switch on types +# formatter = getattr(parser, 'cli_formatter') +# if formatter: +# formatter = formatter(dest) +# if formatter: +# value = formatter(value) + + # call the optparse front-end +# optparse.Option.take_action(self, action, dest, opt, value, values, parser) + +# # add the parsed option to the set of things parsed +# if not hasattr(parser, 'parsed'): +# parser.parsed = dict() +# parser.parsed[dest] = value + + +### plugins for option types + +class BaseCLI(object): + """base_cli for all option types""" + + def __call__(self, name, value): + """return args, kwargs needed to instantiate an optparse option""" + + args = value.get('flags', ['--%s' % name]) + if not args: + # No CLI interface + return (), {} + + kw = {'dest': name} + help = value.get('help', name) + if 'default' in value: + kw['default'] = value['default'] + help += ' [DEFAULT: %(default)s]' + kw['help'] = help + kw['action'] = 'store' + return args, kw + + def take_action(self, value): + return value + + +class BoolCLI(BaseCLI): + + def __call__(self, name, value): + + # preserve the default values + help = value.get('help') + flags = value.get('flags') + + args, kw = BaseCLI.__call__(self, name, value) + kw['help'] = help # reset + if value.get('default'): + kw['action'] = 'store_false' + if not flags: + args = ['--no-%s' % name] + if not help: + kw['help'] = 'disable {}'.format(name) + else: + kw['action'] = 'store_true' + if not help: + kw['help'] = 'enable {}'.format(name) + return args, kw + +class IntCLI(BaseCLI): + + def __call__(self, name, value): + pass # TODO + +class ListCLI(BaseCLI): + + def __call__(self, name, value): + args, kw = BaseCLI.__call__(self, name, value) + kw['nargs'] = '+' + return args, kw + + +class DictCLI(ListCLI): + + delimeter = '=' + + def __call__(self, name, value): + + # optparse can't handle dict types OOTB + default = value.get('default') + if isinstance(default, dict): + value = copy.deepcopy(value) + value['default'] = default.items() + + return ListCLI.__call__(self, name, value) + + def take_action(self, value): + if self.delimeter not in value: + raise AssertionError("Each value must be delimited by '%s': %s" % (self.delimeter, value)) + return value.split(self.delimeter, 1) + +# types of CLI arguments +types = {bool: BoolCLI(), + list: ListCLI(), + dict: DictCLI(), + str: BaseCLI(), + None: BaseCLI()} # default + +__all__ += [i.__class__.__name__ for i in types.values()] + + +class Configuration(argparse.ArgumentParser): + """declarative configuration object""" + + options = {} # configuration basis definition + extend = set() # which dicts/lists should be extended + load_option = 'load' # where to put the load option + load_help = "load configuration from a file" + + @classmethod + def parse(cls, args, *_args, **_kwargs): + """get the resultant config dictionary in a single call""" + conf = cls(*_args, **_kwargs) + conf.parse_args(*_args, **_kwargs) + return conf.config + + def __init__(self, + configuration_providers=configuration_providers, + types=types, + load=None, + dump='--dump', + **parser_args): + + # sanity check + if isinstance(self.options, dict): + self.option_dict = self.options + elif isinstance(self.options, (list, tuple)): + self.option_dict = dict(self.options) + else: + raise NotImplementedError("Configuration: `options` should be castable to a dict") + + # setup configuration + self.config = {} + self.configuration_providers = configuration_providers + self.types = types + self.added = set() # set of items added to the configuration + + + if 'description' not in parser_args: + parser_args['description'] = getattr(self, '__doc__', '') +# if 'formatter' not in parser_args: +# class PlainDescriptionFormatter(optparse.IndentedHelpFormatter): +# """description formatter for console script entry point""" +# def format_description(self, description): +# if description: +# return description.strip() + '\n' +# else: +# return '' +# parser_args['formatter'] = PlainDescriptionFormatter() +# parser_args.setdefault('option_class', ConfigurationOption) + + # setup commandline parser + argparse.ArgumentParser.__init__(self, **parser_args) + self.parsed = dict() + self.add_arguments(self) + + # add option(s) for configuration_providers + if load: + self.add_argument(load, + dest=self.load_option, + nargs='+', + help=self.load_help) + elif load is None: + self.add_argument(self.load_option, nargs='*', + help=self.load_help) + + # add an option for dumping + formats = self.formats() + if formats and dump: + if isinstance(dump, basestring): + dump = [dump] + dump = list(dump) + self.add_argument(*dump, + **dict(dest='dump', + help="Output configuration file; Formats: {}".format(formats))) + + + ### iteration + + def items(self): + """items in options""" + # TODO: make the class a real iterator + + # allow options to be a list of 2-tuples + if isinstance(self.options, dict): + return self.options.items() + return self.options + + ### methods for validating configuration + + def check(self, config): + """ + check validity of configuration to be added + """ + + # ensure options in configuration are in self.options + unknown_options = [i for i in config if i not in self.option_dict] + if unknown_options: + raise UnknownOptionException("Unknown options: {}".format(', '.join(unknown_options))) + + # ensure options are of the right type (if specified) + for key, value in config.items(): + _type = self.option_dict[key].get('type') + if _type is None and 'default' in self.option_dict[key]: + _type = type(self.option_dict[key]['default']) + if _type is not None: + tocast = True + try: + if isinstance(value, _type): + tocast = False + except TypeError: + # type is a type-casting function, not a proper type + pass + if tocast: + try: + config[key] = _type(value) + except BaseException, e: + raise TypeCastException("Could not coerce '%s'=%s, to type %s: %s" % (key, value, _type.__name__, e)) + + return config + + def validate(self): + """validate resultant configuration""" + + for key, value in self.items(): + if key not in self.config: + required = value.get('required') + if required: + if isinstance(required, basestring): + required_message = required + else: + required_message = "Parameter {} is required but not present".format(key) + # TODO: this should probably raise all missing values vs + # one by one + raise MissingValueException(required_message) + + + ### methods for adding configuration + + def default_config(self): + """configuration defaults""" + defaults = {} + for key, value in self.items(): + if 'default' in value: + defaults[key] = value['default'] + return copy.deepcopy(defaults) + + def get(self, key, default=None): + return self.config.get(key, default) + + def __getitem__(self, key): + return self.config[key] + + def __call__(self, *args): + """add items to configuration and check it""" + + # start with defaults + self.config = self.default_config() + + # add the configuration + for config in args: + self.add(config) + + # validate total configuration + self.validate() + + # return the configuration + return self.config + + def add(self, config, check=True): + """update configuration: not undoable""" + + # check config to be added + self.check(config) + + # add the configuration + for key, value in config.items(): + value = copy.deepcopy(value) + if key in self.extend and key in self.config: + type1 = type(self.config[key]) + type2 = type(value) + assert type1 == type2 # XXX hack + if type1 == dict: + self.config[key].update(value) + elif type1 == list: + self.config[key].extend(value) + else: + raise NotImplementedError + else: + self.config[key] = value + self.added.add(key) + + + ### methods for argparse + ### XXX could go in a subclass + + def help_formatter(self, option): + if option in self.option_dict: + handler = self.types[self.option_type(option)] + return getattr(handler, 'take_action', lambda x: x) + + def option_type(self, name): + """get the type of an option named `name`""" + + value = self.option_dict[name] + if 'type' in value: + return value['type'] + if 'default' in value: + default = value['default'] + if default is None: + return None # not + return type(value['default']) + + def add_arguments(self, parser): + """add argparse arguments to an ArgumentParser instance""" + + for key, value in self.items(): + try: + handler = self.types[self.option_type(key)] + except KeyError: + # if an option can't be coerced to a type + # we should just not add a CLI handler for it + continue + args, kw = handler(key, value) + if not args: + # No CLI interface + continue + parser.add_argument(*args, **kw) + + + ### functions for loading configuration + + def configuration_files(self, options): + """configuration files to read""" + + configuration_files = getattr(options, self.load_option, []) + if isinstance(configuration_files, basestring): + configuration_files = [configuration_files] + return configuration_files + + def load_configuration_file(self, filename): + """load a configuration file""" + return self.deserialize(filename) + + def read_configuration_files(self, options): + """deserialize configuration""" + + configuration_files = self.configuration_files(options) + + # ensure all files are present + missing = [i for i in configuration_files + if not os.path.exists(i)] + if missing: + self.error("Missing files: {}".format(', '.join(missing))) + + # load configuration files + config = [] + for f in configuration_files: + try: + loaded_config = self.load_configuration_file(f) + if loaded_config: + config.append(loaded_config) + except BaseException, e: + parser.error(str(e)) + return config + + def parse_args(self, *args, **kw): + """""" + + # parse command line options + self.parsed = dict() + options = argparse.ArgumentParser.parse_args(self, *args, **kw) + + # get CLI configuration options + cli_config = dict([(key, value) for key, value in options.__dict__.items() + if key in self.option_dict and key in self.parsed]) + + # deserialize configuration + config = self.read_configuration_files(options) + config.append(cli_config) + + missingvalues = None + try: + # generate configuration + self(*config) + except MissingValueException, missingvalues: + # errors are handled below + pass + + # dump configuration + self.dump(options, missingvalues) + + # update options from config + options.__dict__.update(self.config) + + # return parsed arguments + return options + + def dump(self, options, missingvalues): + """dump configuration, if specified""" + + if missingvalues: + self.error(str(missingvalues)) + + dump = getattr(options, 'dump') + if dump: + # TODO: have a way of specifying format other than filename + self.serialize(dump) + + + ### serialization/deserialization + + def formats(self): + """formats for deserialization""" + retval = [] + for provider in self.configuration_providers: + if provider.extensions and hasattr(provider, 'write'): + retval.append(provider.extensions[0]) + return retval + + def configuration_provider(self, format): + """configuration provider guess for a given filename""" + for provider in self.configuration_providers: + if format in provider.extensions: + return provider + + def filename2format(self, filename): + extension = os.path.splitext(filename)[-1] + return extension.lstrip('.') or None + + def serialize(self, filename, format=None, full=False): + """ + serialize configuration to a file + - filename: path of file to serialize to + - format: format of configuration provider + - full: whether to serialize non-set optional strings [TODO] + """ + # TODO: allow file object vs file name + + if not format: + format = self.filename2format(filename) + if not format: + raise Exception('Please specify a format') + # TODO: more specific exception type + + provider = self.configuration_provider(format) + if not provider: + raise Exception("Provider not found for format: %s" % format) + + config = copy.deepcopy(self.config) + + provider.write(config, filename) + + def deserialize(self, filename, format=None): + """load configuration from a file""" + # TODO: allow file object vs file name + + assert os.path.exists(filename) + + # get the format + format = format or self.filename2format(filename) + + # get the providers in some sensible order + providers = self.configuration_providers[:] + if format: + providers.sort(key=lambda x: int(format in x.extensions), reverse=True) + + # deserialize the data + for provider in providers: + try: + return provider.read(filename) + except: + continue + else: + raise ConfigurationProviderException("Could not load {}".format(filename)) + + +class UserConfiguration(Configuration): + """`Configuration` class driven by a config file in user-space""" + + # configuration items to interpolate as paths + # TODO: integrate with above BaseCLI + paths = [] + + def __init__(self, config=None, load='--config', **kwargs): + + # default configuration file + self.config_name = config or '.' + os.path.splitext(sys.argv[0])[0] + self.default_config_file = os.path.join('~', self.config_name) + self.default_config_file_path = os.path.expanduser(self.default_config_file) + if os.path.exists(self.default_config_file_path): + self.load_help += ' [DEFAULT: %s]' % self.default_config_file + + Configuration.__init__(self, load=load, **kwargs) + + def validate(self): + Configuration.validate(self) + for path in self.paths: + self.config[path] = os.path.expanduser(self.config[path]) + + def configuration_files(self, options, args): + configuration_files = Configuration.configuration_files(self, options, args) + if not configuration_files: + # load user config only if no config provided + if os.path.exists(self.default_config_file_path): + configuration_files = [self.default_config_file_path] + return configuration_files + + def load_configuration_file(self, filename): + config = Configuration.load_configuration_file(self, filename) + + # ignore options that we don't care about + config = dict([(key, value) for key, value in config.items() + if key in self.option_dict]) + + return config diff -r 30abbd61ea5e -r 372315b3bb8e tests/unit2.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit2.py Thu Nov 06 07:41:05 2014 -0800 @@ -0,0 +1,227 @@ +#!/usr/bin/env python + +""" +unit tests for configuration package +""" + +import configuration +import datetime +import json +import os +import sys +import tempfile +import unittest + +from example import ExampleConfiguration # example configuration to test + +# globals +here = os.path.dirname(os.path.abspath(__file__)) + +class ConfigurationUnitTest(unittest.TestCase): + + def test_cli(self): + """test command line interface""" + + example = ExampleConfiguration() + + # parse command line arguments + options, args = example.parse_args(['-a', 'ts', '--develop', '-e', '/home/jhammel/bin/firefox']) + + # ensure that the options appropriately get set + self.assertEqual(bool(args), False) # no arguments + self.assertEqual(options.develop, True) + self.assertEqual(options.activeTests, ['ts']) + self.assertEqual(options.browser_path, '/home/jhammel/bin/firefox') + + # ensure that the configuration appropriately gets updated + self.assertEqual(example.config['develop'], True) + self.assertEqual(example.config['activeTests'], ['ts']) + self.assertEqual(example.config['browser_path'], '/home/jhammel/bin/firefox') + + def test_dict(self): + """test dictionary parsing from the "command line" """ + + + # test adding a preference + example = ExampleConfiguration() + options, args = example.parse_args(['-a', 'ts', '--develop', '-e', '/home/jhammel/bin/firefox', + '--pref', 'hangmonitor.timeout=0']) + self.assertTrue('hangmonitor.timeout' in options.preferences) + + # test overriding a preference + example = ExampleConfiguration() + options = example.parse_args(['-a', 'ts', '--develop', '-e', '/home/jhammel/bin/firefox', + '--pref', 'browser.bookmarks.max_backups=1']) + self.assertTrue(options.preferences['browser.bookmarks.max_backups'] == '1') + + + def test_configuration_providers(self): + """test file-based configuration providers""" + # requires json/simplejson to be installed + + example = ExampleConfiguration() + + # see what providers you got + json_provider = example.configuration_provider('json') + self.assertTrue(isinstance(json_provider, configuration.JSON)) + + # serialize to a temporary file + filename = tempfile.mktemp(suffix='.json') + self.assertEqual(example.filename2format(filename), 'json') + self.assertFalse(os.path.exists(filename)) + config = {'browser_path': '/home/jhammel/bin/firefox', + 'activeTests': ['ts']} + example(config) + config['test_timeout'] = 1200 # default + config['preferences'] = {"browser.bookmarks.max_backups": 0, + "browser.cache.disk.smart_size.enabled": False} + + # ensure they are equal + self.assertEqual(config, example.config) + example.serialize(filename) + self.assertTrue(os.path.exists(filename)) + serialized = json.loads(file(filename).read()) + self.assertEqual(serialized, config) + + # deserialize + deserialized = example.deserialize(filename) + self.assertEqual(deserialized, config) + + # cleanup + if os.path.exists(filename): + os.remove(filename) + + def test_missing_values(self): + """ensure that Configuration raises a missing value exception""" + + example = ExampleConfiguration() + + # monkey-patch the error method from optparse.OptionParser + error_msg = [] + def error(msg): + error_msg.append(msg) + example.error = error + + # trigger it + example.parse_args(args=[]) + self.assertEqual(error_msg, ['Parameter browser_path is required but not present']) + + def test_required(self): + """ensure you have to have required values""" + + example = ExampleConfiguration() + + # ensure you get an exception + missingvalueexception = None + try: + example() + except configuration.MissingValueException, e: + missingvalueexception = e + self.assertTrue(isinstance(e, configuration.MissingValueException)) + + + def test_multiple_configurations(self): + """test having multiple configurations""" + + # simple override + args1 = ['-e', '/opt/bin/firefox'] + + # simple serialized file + json_file = os.path.join(here, 'base.json') + assert os.path.exists(json_file) + json_config = json.loads(file(json_file).read()) + + # parse the json file + example = ExampleConfiguration() + example.parse_args([json_file]) + self.assertEqual(example.config, json_config) + + # parse the json file with overrides + example = ExampleConfiguration() + example.parse_args([json_file] + args1) + config = json_config.copy() + config['browser_path'] = '/opt/bin/firefox' + self.assertEqual(example.config, config) + + # it shouldn't matter in which order the arguments are + example = ExampleConfiguration() + example.parse_args(args1 + [json_file]) + self.assertEqual(example.config, config) + + # Now a tricky case: + # the default value for test_timeout is 1200: + example = ExampleConfiguration() + self.assertEqual(example.options['test_timeout']['default'], 1200) + # The value from base.json is 60: + self.assertEqual(json_config['test_timeout'], 60) + self.assertEqual(config['test_timeout'], 60) + # but we can override it back from the "command line" + example.parse_args(args1 + [json_file, '--test_timeout', '1200']) + config['test_timeout'] = 1200 + self.assertEqual(example.config, config) + + def test_extend(self): + + # default preferences + example = ExampleConfiguration() + default_prefs = {"browser.bookmarks.max_backups": 0, + "browser.cache.disk.smart_size.enabled": False} + example.parse_args(['-a', 'ts', '-e', '/opt/bin/firefox']) + self.assertEqual(example.config['preferences'], default_prefs) + + # now extend them + example = ExampleConfiguration() + default_prefs['network.dns.ipv4OnlyDomains'] = 'localhost' + tf = tempfile.mktemp() + f = file(tf, 'w') + f.write(json.dumps({'preferences': {'network.dns.ipv4OnlyDomains': 'localhost'}})) + f.close() + example.parse_args(['-a', 'ts', '-e', '/opt/bin/firefox', tf]) + self.assertEqual(example.config['preferences'], default_prefs) + os.remove(tf) + + def test_typecast(self): + """casting example""" + + def todate(string): + return datetime.datetime.strptime(string, "%Y%m%d") + + # make an example class + class TypecastExample(configuration.Configuration): + options = {'datestring': {'type': todate}} + example = TypecastExample() + + # parse a date string + example({'datestring': "20120704"}) + + # ensure it works correctly + expected = datetime.datetime(2012, 7, 4, 0, 0) + self.assertEqual(example.config['datestring'], expected) + + + def test_added(self): + """test that we keep track of things added to the configuration""" + + # make an example class + class AddedExample(configuration.Configuration): + options = {'foo': {}, + 'bar': {}} + + # parse it; there should be nothing + instance = AddedExample() + instance() + self.assertEqual(instance.added, set()) + + # parse it; there should be one thing + instance = AddedExample() + instance({'foo': 'foo'}) + self.assertEqual(instance.added, set(['foo'])) + + # parse it; there should be two things + instance = AddedExample() + instance({'foo': 'foo'}, {'foo': 'FOO', 'bar': 'bar'}) + self.assertEqual(instance.added, set(['foo', 'bar'])) + + +if __name__ == '__main__': + unittest.main()