view bitsyauth/__init__.py @ 19:aac4a4ea70a7

whitespace
author Jeff Hammel <jhammel@mozilla.com>
date Thu, 17 Nov 2011 11:06:40 -0800
parents 814ed3208afa
children 9f4369b769d0
line wrap: on
line source

import markup
import random
import re
import sys

from cStringIO import StringIO
from markup.form import Form
from paste.auth import basic, cookie, digest, form, multi, auth_tkt
from webob import Request, Response, exc

try:
    from skimpyGimpy import skimpyAPI
    CAPTCHA = True
except ImportError:
    CAPTCHA = False

dictionary_file = '/usr/share/dict/american-english'

def random_word():
    """generate a random word for CAPTCHA auth"""
    min_length = 5 # minimum word length
    if not globals().has_key('dictionary'):
        # read the dictionary -- this may be platform dependent
        # XXX could use a backup dictionary
        _dictionary = file(dictionary_file).readlines()
        _dictionary = [ i.strip() for i in _dictionary ]
        _dictionary = [ i.lower() for i in _dictionary
                        if i.isalpha() and i > min_length ]
        globals()['dictionary'] = _dictionary
    return random.Random().choice(dictionary)

class BitsyAuthInnerWare(object):
    """inner auth;  does login checking"""

    def __init__(self, app, passwords, newuser=None, site=None, realm=None):
        """a simple reimplementation of auth
        * app: the WSGI app to be wrapped
        * passwords: callable that return a dictionary of {'user': 'password'}
        * newuser: callable to make a new user, taking name + pw
        * site: name of the site
        * realm: realm for HTTP digest authentication
        """

        self.app = app
        self.passwords = passwords
        self.site = site or ''
        self.realm = realm or self.site
        self.captcha = True
        self.urls = { 'login': '/login', 'join': '/join', }
        self.keys = {} # keys, words for CAPTCHA request

        self.content_type = { 'image_captcha': 'image/png',
                              'wav_captcha': 'audio/wav' }

        if newuser:
            self.newuser = newuser
        else:
            self.urls.pop('join') # don't do joining

        # WSGI app securely wrapped
        self.wrapped_app = self.security_wrapper()

        if not CAPTCHA:
            self.captcha = False
        
    ### WSGI/HTTP layer

    def __call__(self, environ, start_response):

        orig_environ = dict(environ)

        self.request = Request(environ)
        self.request.path_info = self.request.path_info.rstrip('/')
        
        self.redirect_to = '/' + self.request.script_name.lstrip('/')

        # URLs intrinsic to BitsyAuth
        if self.request.path_info == '/logout':
            response = self.redirect()
            return response(self.request.environ, start_response)

        if self.request.path_info in self.url_lookup():
            response = self.make_response()
            return response(self.request.environ, start_response)

        # digest auth
        if self.request.headers.has_key('Authorization'):
            return self.wrapped_app(self.request.environ, start_response)

        response = Request(orig_environ).get_response(self.app)

        # respond to 401s
        if response.status_int == 401: # Unauthorized
            if self.request.environ.get('REMOTE_USER'):
                return exc.HTTPForbidden()
            else:
                response = self.request.get_response(self.wrapped_app)

        user = self.request.environ.get('REMOTE_USER')
        if user:
            self.request.environ['paste.auth_tkt.set_user'](user)

        return response(self.request.environ, start_response)

    ### authentication function

    def digest_authfunc(self, environ, realm, user):
        return self.passwords()[user] # passwords stored in m5 digest

    def authfunc(self, environ, user, password):
        return self.hash(user, password) == self.passwords()[user]

    def hash(self, user, password):
        # use md5 digest for now
        return digest.digest_password(self.realm, user, password)

    def security_wrapper(self):
        """return the app securely wrapped"""

        multi_auth = multi.MultiHandler(self.app)

        # digest authentication
        multi_auth.add_method('digest', digest.middleware,
                              self.realm, self.digest_authfunc)
        multi_auth.set_query_argument('digest', key='auth')

        # form authentication
        template = self.login(wrap=True, action='%s')
        multi_auth.add_method('form', form.middleware, self.authfunc,
                              template=template)
        multi_auth.set_default('form')

        return multi_auth

        # might have to wrap cookie.middleware(BitsyAuth(multi(app))) ::shrug::
        return cookie.middleware(multi_auth)

    ### methods dealing with intrinsic URLs

    def url_lookup(self):
        retval = dict([ (value, key) for key, value
                        in self.urls.items() ])
        if self.captcha:
            retval.update(dict([(('/join/%s.png' % key), 'image_captcha')
                                for key in self.keys]))
        return retval
        
    def get_response(self, text, content_type='text/html'):
        res = Response(content_type=content_type, body=text)
        res.content_length = len(res.body)
        return res

    def make_response(self):
        url_lookup = self.url_lookup()
        path = self.request.path_info
        assert path in url_lookup

        # login and join shouldn't be accessible when logged in
        if self.request.environ.get('REMOTE_USER'):
            return self.redirect("You are already logged in")

        handler = url_lookup[path]
        function = getattr(self, handler)

        if self.request.method == 'GET':
            # XXX could/should do this with decorators            
            return self.get_response(function(wrap=True),
                                     content_type=self.content_type.get(handler,'text/html'))
        if self.request.method == 'POST':
            post_func = getattr(self, handler + "_post")
            errors = post_func()
            if errors:
                return self.get_response(function(errors=errors, wrap=True))
            else:
                location = self.request.POST.get('referer')
                return self.redirect("Welcome!", location=location)

    def redirect(self, message='', location=None):
        """redirect from instrinsic urls"""
        return exc.HTTPSeeOther(message, location=location or self.redirect_to)

    def image_captcha(self, wrap=True):
        """return data for the image"""
        key = self.request.path_info.split('/join/')[-1]
        key = int(key.split('.png')[0])
        return skimpyAPI.Png(self.keys[key], scale=3.0).data()
                
    ### forms and their display methods

    ### login

    def login_form(self, referer=None, action=None):
        form = Form(action=action or '', submit='Login')
        form.add_element('textfield', 'Name', input_name='username')
        form.add_element('password', 'Password', input_name='password')
        if referer:
            form.add_element('hidden', 'referer', value=referer)
        return form

    def login(self, errors=None, wrap=False, action=None):
        """login div"""
        referer = None
        if hasattr(self, 'request'):
            referer = self.request.referer
        form = self.login_form(action=action, referer=referer)
        join = self.urls.get('join')
        retval = form(errors)
        if join:        
            retval += '<br/>\n' + markup.a('join', href="%s" % join)
        retval = markup.div(retval)
        if wrap:
            title = 'login'
            if self.site:
                pagetitle = '%s - %s' % (title, self.site)
            retval = markup.wrap(markup.h1(title.title()) + retval,
                                 pagetitle=pagetitle)

        return retval

    def login_post(self):
        """handle a login POST request"""
        user = self.request.POST.get('username')
        password = self.request.POST.get('password')
        passwords = self.passwords()
        error = False
        if user not in passwords:
            error = True
        else:
            error = not self.authfunc(self.request.environ, user, password)
        if error:
            return { 'Name': 'Wrong username or password' }
        self.request.environ['REMOTE_USER'] = user
        self.request.environ['paste.auth_tkt.set_user'](user)

    ### join

    def captcha_pre(self, word, key):
        """CAPTCHA with pre-formatted text"""
        return skimpyAPI.Pre(word, scale=1.2).data()

    def captcha_png(self, word, key):
        """CAPTCHA with a PNG image"""
        return markup.image('/join/%s.png' % key)

    def join_form(self):
        captcha = ''
        if self.captcha:
            # data for CAPTCHA
            key = random.Random().randint(0, sys.maxint)
            word = random_word()

            self.keys[key] = word

            captcha = StringIO()

            captcha_text = "Please type the word below so I know you're not a computer:"
            captcha_help = "(please %s if the page is unreadable)" % markup.link('/join?captcha=image', 'go here')

            print >> captcha, markup.p('%s<br/> %s' % (captcha_text,
                                                       markup.i(captcha_help)))

            # determine type of CAPTCHA
            captchas = ' '.join(self.request.GET.getall('captcha'))
            if not captchas:
                captchas = 'pre'

            captcha_funcs=dict(pre=self.captcha_pre,
                               image=self.captcha_png,)
            captchas = [ captcha_funcs[i](word, key) for i in captchas.split()
                         if i in captcha_funcs ]
            captchas = '\n'.join([markup.p(i) for i in captchas])
            print >> captcha, captchas
            print >> captcha, markup.p(markup.input(None, **dict(name='captcha', type='text')))

            captcha = captcha.getvalue()

        form = Form(action=self.urls['join'], submit='Join', post_html=captcha)
        form.add_element('textfield', 'Name')
        form.add_password_confirmation()
        form.add_element('hidden', 'key', value=str(key))
        return form

    def join(self, errors=None, wrap=False):
        """join div or page if wrap"""
        form = self.join_form()
        retval = markup.div(form(errors))
        if wrap:
            pagetitle = title = 'join'
            if self.site:
                pagetitle = '%s - %s' % (title, self.site)
            if self.captcha:
                errors = errors or {}
                captcha_err = errors.get('CAPTCHA', '')
                if captcha_err:
                    captcha_err = markup.p(markup.em(captcha_err),
                                           **{'class': 'error'})
            retval = markup.wrap(markup.h1(title.title()) + captcha_err + retval,
                                 pagetitle=pagetitle)
        return retval

    def join_post(self):
        """handle a join POST request"""
        form = self.join_form()
        errors = form.validate(self.request.POST)

        # validate captcha
        if CAPTCHA:
            key = self.request.POST.get('key')
            try:
                key = int(key)
            except ValueError:
                key = None
            if not key:
                errors['CAPTCHA'] = 'Please type the funky looking word'
            word = self.keys.pop(key, None)
            if not word:
                errors['CAPTCHA'] = 'Please type the funky looking word'
            if word != self.request.POST.get('captcha','').lower():
                errors['CAPTCHA'] = 'Sorry, you typed the wrong word'

        name = self.request.POST.get('Name', '')
        if not name:
            if not errors.has_key('Name'):
                errors['Name'] = []
            errors['Name'].append('Please enter a user name')
        if name in self.passwords():
            if not errors.has_key('Name'):
                errors['Name'] = []
            errors['Name'].append('The name %s is already taken' % name)

        if not errors: # create a new user
            self.newuser(name,
                         self.hash(name, self.request.POST['Password']))
            self.request.environ['REMOTE_USER'] = name # login the new user
            self.request.environ['paste.auth_tkt.set_user'](name)

        return errors

class BitsyAuth(object):
    """outer middleware for auth;  does the cookie handling and wrapping"""

    def __init__(self, app, global_conf, passwords, newuser, site='', secret='secret'):
        self.app = app
        self.path = '/logout'
        self.cookie = '__ac'
        auth = BitsyAuthInnerWare(app, passwords=passwords, newuser=newuser, site=site)
        self.hash = auth.hash
        # paste.auth.cookie
        #        self.cookie_handler = cookie.middleware(auth, cookie_name=self.cookie, timeout=90) # minutes

        # paste.auth.auth_tkt
        self.cookie_handler = auth_tkt.make_auth_tkt_middleware(
            auth, global_conf, secret,
            cookie_name=self.cookie, logout_path='/logout')

    def __call__(self, environ, start_response):
        if environ['PATH_INFO'] == '/logout':
            pass
        try:
            return self.cookie_handler(environ, start_response)
        except auth_tkt.BadTicket:
            environ.pop('HTTP_COOKIE') # kill all cookies!  bad! XXX
            return self.cookie_handler(environ, start_response)
            return self.logout(environ, start_response)

    def logout(self, environ, start_response):
        req = Request(environ)
        keys = [ 'REMOTE_USER' ]
        #        keys = [ 'REMOTE_USER', 'AUTH_TYPE', 'paste.auth.cookie', 'paste.cookies', 'HTTP_COOKIE' ]  # XXX zealous kill
        for key in keys:
            req.environ.pop(key, None)

        body = '<html><head><title>logout</title></head><body>logout</body></html>'
        res = Response(content_type='text/html', body=body)
        res.content_length = len(res.body)
        req.cookies.pop(self.cookie, None)
        res.delete_cookie(self.cookie)
        res.unset_cookie(self.cookie)
        return res(environ, start_response)