view pyloader/factory.py @ 98:d59f0425d705 default tip

version bump
author Jeff Hammel <k0scist@gmail.com>
date Mon, 27 May 2024 16:20:52 -0700
parents 5fc59dfa7aec
children
line wrap: on
line source

#!/usr/bin/env python

"""
abstract factories
"""

import os
import sys
from . import cast
from . import loader
from copy import deepcopy
from optparse import OptionParser
try:
    # python 2
    from ConfigParser import InterpolationDepthError
    from ConfigParser import InterpolationMissingOptionError
    from ConfigParser import InterpolationSyntaxError
    from ConfigParser import SafeConfigParser as ConfigParser
except ImportError:
    # python 3
    from configparser import InterpolationDepthError
    from configparser import InterpolationMissingOptionError
    from configparser import InterpolationSyntaxError
    from configparser import SafeConfigParser as ConfigParser


string = (str,)


__all__ = ['CircularReferenceError', 'PyFactory', 'IniFactory']


class CircularReferenceError(Exception):
    """factory has detected a circular reference"""


class PyFactory(object):

    # to evaluate arguments as objects
    delimeters = ('%(', ')s')

    def __init__(self, config=None, main=''):
        self.main = main  # main section
        self.configure(config or {})

    def configure(self, config):
        """load a new configuration"""
        # TODO: this should really be a configuration update.  If you keep
        # track of all "apps" and their parents (i.e. as a ADG)
        # you should be able to update only relevent apps
        self.config = config
        self.seen = set() # already seen apps to note cyclic dependencies
        self.parsed = {}  # instantiated apps

    def load(self, name=None):
        """load an object"""

        name = name or self.main # load main section by default
        assert name in self.config, "'%s' not found in configuration"
        if name in self.parsed:
            return self.parsed[name]
        if name in self.seen:
            raise CircularReferenceError('Circular reference! : %s' % name)
        self.seen.add(name)

        # get section
        section = self.config[name]
        assert 'path' in section

        # load object
        obj = loader.load(section['path'])

        # get the object's arguments (if any)
        args = section.get('args', None)
        kwargs = section.get('kwargs', None)

        # if args and kwargs aren't there, you're done!
        if args is None and kwargs is None:
            self.parsed[name] = obj
            return obj

        # interpolate arguments
        if args:
            args = [self.interpolate(arg) for arg in args]
        else:
            args = []
        if kwargs:
            kwargs = dict([(key, self.interpolate(value))
                           for key, value in kwargs.items()])
        else:
            kwargs = {}

        # invoke
        self.parsed[name] = obj(*args, **kwargs)
        return self.parsed[name]

    def interpolate(self, value):

        # only interpolate strings
        if not isinstance(value, string):
            return value

        if value.startswith(self.delimeters[0]) and value.endswith(self.delimeters[1]):
            value = value[len(self.delimeters[0]):-len(self.delimeters[1])]
            if value in self.config:
                return self.load(value)
        return value


class IniFactory(PyFactory):
    """
    load a python object from an .ini file
    """

    def __init__(self, inifile, main=''):
        assert os.path.exists(inifile), "File not found: %s" % inifile
        self.inifile = inifile
        config = self.read(inifile)
        PyFactory.__init__(self, config, main)

    @classmethod
    def configuration(cls, iniconfig, **defaults):
        """interpret configuration from raw .ini syntax"""

        config = {}
        interpolated = set()
        seen = set()
        object_string = '%(object)s'

        # create a hash of section names
        names = {}
        for section in iniconfig:

            # sanity check
            assert ':' in section, "No : in section: %s" % section

            name = section.split(':',1)[0]
            names[name] = section

        def create_section(section, options):

            # split up the section identifier
            name, path = section.split(':', 1)

            # make a dict for the section
            sect = {}

            # interpret decorators
            if ':' in path:
                wrapper, _path = path.split(':', 1)
                # TODO: could interpolate wrapper
                if wrapper in names:

                    # inline wrapper arguments:
                    # [extended-fibonacci:@:four=4,five=5:fibonacci]
                    _wrapper_args = None
                    _wrapper_kwargs = None
                    if ':' in _path:
                        _wrapper_options, __path = _path.split(':', 1)
                        if ',' in _wrapper_options or '=' in _wrapper_options:
                            # ,= : tokens to ensure these are wrapper options
                            # as these shouldn't be found in a real path (dotted path or file path)
                            _wrapper_args, _wrapper_kwargs = cast.str2args(_wrapper_options)
                            _path = __path

                    if _path in names:
                        # [foo:bar:fleem]
                        wrapped_name = _path
                    else:
                        # stub value for "anonymous" name
                        # [foo:bar:%(here)s/objects.py:MyClass]
                        wrapped_name = section

                    # get wrapper options
                    if wrapper not in config:
                        # load wrapper configuration
                        wrapper_section = names[wrapper]
                        if wrapper_section in seen:
                            pass # TODO
                        create_section(wrapper_section, iniconfig[wrapper_section])
                    wrapper_options = deepcopy(config[wrapper])

                    # add inline wrapper args, kwargs
                    if _wrapper_args is not None:
                        if 'args' in wrapper_options:
                            wrapper_options['args'].extend(_wrapper_args)
                        else:
                            wrapper_options['args'] = _wrapper_args
                    if _wrapper_kwargs is not None:
                        if 'kwargs' in wrapper_options:
                            wrapper_options['kwargs'].update(_wrapper_kwargs)
                        else:
                            wrapper_options['kwargs'] = _wrapper_kwargs

                    # interpolate wrapper_options
                    def interpolate(option):
                        if option == object_string:
                            return '%(' + wrapped_name + ')s'
                        return option
                    if 'args' in wrapper_options:
                        args = wrapper_options['args'][:]
                        args = [interpolate(i) for i in args]
                        wrapper_options['args'] = args
                    if 'kwargs' in wrapper_options:
                        kwargs = wrapper_options['kwargs'].copy()
                        kwargs = dict([(i,interpolate(j)) for i, j in kwargs.items()])
                        wrapper_options['kwargs'] = kwargs

                    # create wrapper
                    config[name] = wrapper_options
                    if _path == wrapped_name:
                        return
                    name = wrapped_name
                    path = _path

            elif path in names:
                # override section: [foo:bar]
                if path not in config:
                    # load overridden section
                    overridden_section = names[path]
                    if overridden_section in seen:
                        pass # TODO
                    create_section(overridden_section, iniconfig[overridden_section])
                sect = deepcopy(config[path])

            if 'path' not in sect:
                # add the path to section dict
                path = path % defaults
                sect['path'] = path

            # get arguments from .ini options
            for option, value in options.items():
                if option == '.': # positional arguments
                    sect['args'] = cast.str2list(value)
                else: # keyword arguments
                    sect.setdefault('kwargs', {})[option] = value

            # set the configuration
            config[name] = sect

        # get the object definitions
        for section, options in iniconfig.items():
            seen.add(section)
            if section not in interpolated:
                create_section(section, options)
            interpolated.add(section)
                    
        return config
        
    @classmethod
    def read(cls, inifile):
        """reads configuration from an .ini file"""

        here = os.path.dirname(os.path.abspath(inifile))
        
        # read configuration
        defaults={'here': here,
                  'this': os.path.abspath(inifile)}
        parser = ConfigParser(defaults=defaults)
        parser.optionxform = str # use whole case
        parser.read(inifile)

        # parse configuration
        config = {}
        for section in parser.sections():

            config[section] = {}

            # read the options
            for option in parser.options(section):

                if option in parser.defaults():
                    # don't include the defaults
                    continue

                # try to interpolate the option
                # otherwise, use the raw value
                try:
                    value = parser.get(section, option)
                except (InterpolationMissingOptionError, InterpolationSyntaxError, InterpolationDepthError):
                    value = parser.get(section, option, raw=True)

                config[section][option] = value

        return cls.configuration(config, **parser.defaults())


def main(args=sys.argv[1:]):
    """command line entry point"""

    usage = '%prog file1.ini -arg1 -arg2 --key1=value1 --key2=value2'
    parser = OptionParser(usage=usage, description=IniFactory.__doc__)
    options, args = parser.parse_args(args)

    if len(args) != 1:
        parser.print_usage()
        parser.exit()

    factory = IniFactory(args[0])
    obj = factory.load()
    print (obj)


if __name__ == '__main__':
    main()