Mercurial > hg > wsgintegrate
diff wsgintegrate/match.py @ 0:ec815b7cb142
initial commit of wsgintegrate
author | Jeff Hammel <jhammel@mozilla.com> |
---|---|
date | Tue, 07 Jun 2011 08:03:09 -0700 |
parents | |
children | 89047fd9ea8f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/wsgintegrate/match.py Tue Jun 07 08:03:09 2011 -0700 @@ -0,0 +1,181 @@ +""" +utilities for matching requests +these are just a sample; you can add arbitrary match objects if desired +""" + +from webob import exc + +class RequestMatch(object): + """abstract base class for matching requests""" + + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + """WSGI app""" + if not self.condition(environ): + raise exc.HTTPNotFound + return self.app(environ, start_response) + + def condition(self, environ): + """match condition""" + return True + +class ConditionMatch(RequestMatch): + """generic environment-based condition-checker""" + # XXX unused + def __init__(self, app, condition): + RequestMatch.__init__(self, app) + self.condition = condition + +### aspect-based checkers + +class MatchPath(RequestMatch): + """match based on PATH INFO""" + + def __init__(self, app, path): + RequestMatch.__init__(self, app) + self.path = path + + def match(self, path): + if path.startswith(self.path): + # currently only matches with str.startswith; + # different types of matches could be considered + return self.path, path[len(self.path):] + + def condition(self, environ): # XXX unused + return self.match(environ['PATH_INFO']) is not None + + def __call__(self, environ, start_response): + match = self.match(environ['PATH_INFO']) + if match is None: + raise exc.HTTPNotFound + script_name, path_info = match + + # fix up the environment for downstream applications + environ['SCRIPT_NAME'] = script_name + environ['PATH_INFO'] = path_info + + return self.app(environ, start_response) + +class MatchMethod(RequestMatch): + """match based on request method""" + + def __init__(self, app, methods): + RequestMatch.__init__(self, app) + if isinstance(methods, basestring): + methods = methods.split() + self.methods = set(methods) + + def condition(self, environ): + return environ['REQUEST_METHOD'] in self.methods + +class MatchHost(RequestMatch): + """match based on the host and port""" + + def __init__(self, app, host, port=None): + RequestMatch.__init__(self, app) + self.host = host + self.port = port + def condition(self, environ): + host = environ['HTTP_HOST'] + port = None + if ':' in host: + host, port = host.rsplit(':', 1) + if self.port and port != self.port: + return False + return host == self.host + +class MatchAuthorized(RequestMatch): + """match if a user is authorized or not""" + def __init__(self, app, users=None): + RequestMatch.__init__(self, app) + self.authorized_users = users + def condition(self, environ): + raise NotImplementedError # TODO + +class MatchProtocol(RequestMatch): + """match a given protocol, i.e. http vs https://""" + def __init__(self, app, protocol): + self.protocol = protocol + RequestMatch.__init__(self, app) + def condition(self, environ): + raise NotImplementedError # TODO + +class MatchQueryString(RequestMatch): + """ + match a request based on if the right query string parameters are given + """ + def __init__(self, app, *tags, **kw): + self.app = app + self.tags = tags + self.kw = kw + def condition(self, environ): + raise NotImplementedError # TODO + +### logical checkers (currently unused) + +class AND(RequestMatch): + def __init__(self, app, condition1, condition2, *conditions): + RequestMatch.__init__(self, app) + self.conditions = [condition1, condition2] + self.conditions.extend(conditions) + def condition(self, environ): + for condition in self.conditions: + if isinstance(condition, RequestMatch): + if not condition.condition(environ): + return False + else: + if not condition(): + return False + return True + +class OR(RequestMatch): + def __init__(self, app, condition1, condition2, *conditions): + RequestMatch.__init__(self, app) + self.conditions = [condition1, condition2] + self.conditions.extend(conditions) + def condition(self, environ): + for condition in self.conditions: + if isinstance(condition, RequestMatch): + if condition.condition(environ): + return True + else: + if condition(): + return + return False + +# string accessible list of conditions +conditions = {'host': MatchHost, + 'method': MatchMethod, + 'path': MatchPath } + +class WrapApp(object): + """match string-based conditions""" + + def __init__(self, conditions=None): + self.conditions = conditions or globals()['conditions'] + + def __call__(self, app, *conditions, **kwargs): + """ + wrap an app in conditions + conditions should be a key, value 2-tuple of string, args; + kwargs should be a dictionary of unordered conditions, + likewise of the form string, args. + use *conditions if order is important, otherwise kwargs + """ + + # determine the condition + conditions = list(conditions) + if kwargs: + conditions.extend(kwargs.items()) + + # wrap the application + for condition, args in conditions: + assert condition in self.conditions, 'Condition "%s" unknown' % condition + app = self.conditions[condition](app, args) + return app + +# convenience invocation +wrap = WrapApp(conditions) +