Mercurial > hg > bitsyauth
comparison bitsyauth/bitsyauth.py @ 0:284621b3effd
initial commit of bitsyauth, initially from bitsyblog
author | k0s <k0scist@gmail.com> |
---|---|
date | Mon, 02 Nov 2009 21:37:40 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:284621b3effd |
---|---|
1 import markup | |
2 import random | |
3 import re | |
4 import sys | |
5 | |
6 from cStringIO import StringIO | |
7 from markup.form import Form | |
8 from paste.auth import basic, cookie, digest, form, multi, auth_tkt | |
9 from webob import Request, Response, exc | |
10 | |
11 try: | |
12 from skimpyGimpy import skimpyAPI | |
13 CAPTCHA = True | |
14 except ImportError: | |
15 CAPTCHA = False | |
16 | |
17 dictionary_file = '/usr/share/dict/american-english' | |
18 | |
19 def random_word(): | |
20 """generate a random word for CAPTCHA auth""" | |
21 min_length = 5 # minimum word length | |
22 if not globals().has_key('dictionary'): | |
23 # read the dictionary -- this may be platform dependent | |
24 # XXX could use a backup dictionary | |
25 _dictionary = file(dictionary_file).readlines() | |
26 _dictionary = [ i.strip() for i in _dictionary ] | |
27 _dictionary = [ i.lower() for i in _dictionary | |
28 if i.isalpha() and i > min_length ] | |
29 globals()['dictionary'] = _dictionary | |
30 return random.Random().choice(dictionary) | |
31 | |
32 class BitsyAuthInnerWare(object): | |
33 """inner auth; does login checking""" | |
34 | |
35 def __init__(self, app, passwords, newuser=None, site=None, realm=None): | |
36 """a simple reimplementation of auth | |
37 * app: the WSGI app to be wrapped | |
38 * passwords: callable that return a dictionary of {'user': 'password'} | |
39 * newuser: callable to make a new user, taking name + pw | |
40 * site: name of the site | |
41 * realm: realm for HTTP digest authentication | |
42 """ | |
43 | |
44 self.app = app | |
45 self.passwords = passwords | |
46 self.site = site or '' | |
47 self.realm = realm or self.site | |
48 self.captcha = True | |
49 self.redirect_to = '/' # redirect to site root | |
50 self.urls = { 'login': '/login', 'join': '/join', } | |
51 self.keys = {} # keys, words for CAPTCHA request | |
52 | |
53 self.content_type = { 'image_captcha': 'image/png', | |
54 'wav_captcha': 'audio/wav' } | |
55 | |
56 if newuser: | |
57 self.newuser = newuser | |
58 else: | |
59 self.urls.pop('join') # don't do joining | |
60 | |
61 # WSGI app securely wrapped | |
62 self.wrapped_app = self.security_wrapper() | |
63 | |
64 if not CAPTCHA: | |
65 self.captcha = False | |
66 | |
67 ### WSGI/HTTP layer | |
68 | |
69 def __call__(self, environ, start_response): | |
70 | |
71 self.request = Request(environ) | |
72 self.request.path_info = self.request.path_info.rstrip('/') | |
73 | |
74 # URLs intrinsic to BitsyAuth | |
75 if self.request.path_info == '/logout': | |
76 response = self.redirect() | |
77 return response(self.request.environ, start_response) | |
78 | |
79 if self.request.path_info in self.url_lookup(): | |
80 response = self.make_response() | |
81 return response(self.request.environ, start_response) | |
82 | |
83 # digest auth | |
84 if self.request.headers.has_key('Authorization'): | |
85 return self.wrapped_app(self.request.environ, start_response) | |
86 | |
87 response = self.request.get_response(self.app) | |
88 # respond to 401s | |
89 if response.status_int == 401: # Unauthorized | |
90 if self.request.environ.get('REMOTE_USER'): | |
91 return exc.HTTPForbidden() | |
92 else: | |
93 response = self.request.get_response(self.wrapped_app) | |
94 | |
95 user = self.request.environ.get('REMOTE_USER') | |
96 if user: | |
97 self.request.environ['paste.auth_tkt.set_user'](user) | |
98 | |
99 return response(self.request.environ, start_response) | |
100 | |
101 ### authentication function | |
102 | |
103 def digest_authfunc(self, environ, realm, user): | |
104 return self.passwords()[user] # passwords stored in m5 digest | |
105 | |
106 def authfunc(self, environ, user, password): | |
107 return self.hash(user, password) == self.passwords()[user] | |
108 | |
109 def hash(self, user, password): | |
110 # use md5 digest for now | |
111 return digest.digest_password(self.realm, user, password) | |
112 | |
113 def security_wrapper(self): | |
114 """return the app securely wrapped""" | |
115 | |
116 multi_auth = multi.MultiHandler(self.app) | |
117 | |
118 # digest authentication | |
119 multi_auth.add_method('digest', digest.middleware, | |
120 self.realm, self.digest_authfunc) | |
121 multi_auth.set_query_argument('digest', key='auth') | |
122 | |
123 # form authentication | |
124 template = self.login(wrap=True, action='%s') | |
125 multi_auth.add_method('form', form.middleware, self.authfunc, | |
126 template=template) | |
127 multi_auth.set_default('form') | |
128 | |
129 return multi_auth | |
130 | |
131 # might have to wrap cookie.middleware(BitsyAuth(multi(app))) ::shrug:: | |
132 return cookie.middleware(multi_auth) | |
133 | |
134 ### methods dealing with intrinsic URLs | |
135 | |
136 def url_lookup(self): | |
137 retval = dict([ (value, key) for key, value | |
138 in self.urls.items() ]) | |
139 if self.captcha: | |
140 retval.update(dict([(('/join/%s.png' % key), 'image_captcha') | |
141 for key in self.keys])) | |
142 return retval | |
143 | |
144 def get_response(self, text, content_type='text/html'): | |
145 res = Response(content_type=content_type, body=text) | |
146 res.content_length = len(res.body) | |
147 return res | |
148 | |
149 def make_response(self): | |
150 url_lookup = self.url_lookup() | |
151 path = self.request.path_info | |
152 assert path in url_lookup | |
153 | |
154 # login and join shouldn't be accessible when logged in | |
155 if self.request.environ.get('REMOTE_USER'): | |
156 return self.redirect("You are already logged in") | |
157 | |
158 handler = url_lookup[path] | |
159 function = getattr(self, handler) | |
160 | |
161 if self.request.method == 'GET': | |
162 # XXX could/should do this with decorators | |
163 return self.get_response(function(wrap=True), | |
164 content_type=self.content_type.get(handler,'text/html')) | |
165 if self.request.method == 'POST': | |
166 post_func = getattr(self, handler + "_post") | |
167 errors = post_func() | |
168 if errors: | |
169 return self.get_response(function(errors=errors, wrap=True)) | |
170 else: | |
171 return self.redirect("Welcome!") | |
172 | |
173 def redirect(self, message=''): | |
174 """redirect from instrinsic urls""" | |
175 return exc.HTTPSeeOther(message, location=self.redirect_to) | |
176 | |
177 def image_captcha(self, wrap=True): | |
178 """return data for the image""" | |
179 key = self.request.path_info.split('/join/')[-1] | |
180 key = int(key.split('.png')[0]) | |
181 return skimpyAPI.Png(self.keys[key], scale=3.0).data() | |
182 | |
183 ### forms and their display methods | |
184 | |
185 ### login | |
186 | |
187 def login_form(self, referer=None, action=None): | |
188 if action is None: | |
189 action = self.urls['login'] | |
190 form = Form(action='', submit='Login') | |
191 form.add_element('textfield', 'Name', input_name='username') | |
192 form.add_element('password', 'Password', input_name='password') | |
193 if referer is not None: | |
194 form.add_element('hidden', 'referer', value=referer) | |
195 return form | |
196 | |
197 def login(self, errors=None, wrap=False, action=None): | |
198 """login div""" | |
199 form = self.login_form(action=action) | |
200 join = self.urls.get('join') | |
201 retval = form(errors) | |
202 if join: | |
203 retval += '<br/>\n' + markup.a('join', href="%s" % join) | |
204 retval = markup.div(retval) | |
205 if wrap: | |
206 title = 'login' | |
207 if self.site: | |
208 pagetitle = '%s - %s' % (title, self.site) | |
209 retval = markup.wrap(markup.h1(title.title()) + retval, | |
210 pagetitle=pagetitle) | |
211 | |
212 return retval | |
213 | |
214 def login_post(self): | |
215 """handle a login POST request""" | |
216 user = self.request.POST.get('username') | |
217 password = self.request.POST.get('password') | |
218 passwords = self.passwords() | |
219 error = False | |
220 if user not in passwords: | |
221 error = True | |
222 else: | |
223 error = not self.authfunc(self.request.environ, user, password) | |
224 if error: | |
225 return { 'Name': 'Wrong username or password' } | |
226 self.request.environ['REMOTE_USER'] = user | |
227 self.request.environ['paste.auth_tkt.set_user'](user) | |
228 | |
229 ### join | |
230 | |
231 def captcha_pre(self, word, key): | |
232 """CAPTCHA with pre-formatted text""" | |
233 return skimpyAPI.Pre(word, scale=1.2).data() | |
234 | |
235 def captcha_png(self, word, key): | |
236 """CAPTCHA with a PNG image""" | |
237 return markup.image('/join/%s.png' % key) | |
238 | |
239 def join_form(self): | |
240 captcha = '' | |
241 if self.captcha: | |
242 # data for CAPTCHA | |
243 key = random.Random().randint(0, sys.maxint) | |
244 word = random_word() | |
245 | |
246 self.keys[key] = word | |
247 | |
248 captcha = StringIO() | |
249 | |
250 captcha_text = "Please type the word below so I know you're not a computer:" | |
251 captcha_help = "(please %s if the page is unreadable)" % markup.link('/join?captcha=image', 'go here') | |
252 | |
253 print >> captcha, markup.p('%s<br/> %s' % (captcha_text, | |
254 markup.i(captcha_help))) | |
255 | |
256 # determine type of CAPTCHA | |
257 captchas = ' '.join(self.request.GET.getall('captcha')) | |
258 if not captchas: | |
259 captchas = 'pre' | |
260 | |
261 captcha_funcs=dict(pre=self.captcha_pre, | |
262 image=self.captcha_png,) | |
263 captchas = [ captcha_funcs[i](word, key) for i in captchas.split() | |
264 if i in captcha_funcs ] | |
265 captchas = '\n'.join([markup.p(i) for i in captchas]) | |
266 print >> captcha, captchas | |
267 | |
268 print >> captcha, markup.p(markup.input(None, **dict(name='captcha', type='text'))) | |
269 | |
270 captcha = captcha.getvalue() | |
271 | |
272 form = Form(action=self.urls['join'], submit='Join', post_html=captcha) | |
273 form.add_element('textfield', 'Name') | |
274 form.add_password_confirmation() | |
275 form.add_element('hidden', 'key', value=str(key)) | |
276 return form | |
277 | |
278 def join(self, errors=None, wrap=False): | |
279 """join div or page if wrap""" | |
280 form = self.join_form() | |
281 retval = markup.div(form(errors)) | |
282 if wrap: | |
283 pagetitle = title = 'join' | |
284 if self.site: | |
285 pagetitle = '%s - %s' % (title, self.site) | |
286 if self.captcha: | |
287 errors = errors or {} | |
288 captcha_err = errors.get('CAPTCHA', '') | |
289 if captcha_err: | |
290 captcha_err = markup.p(markup.em(captcha_err), | |
291 **{'class': 'error'}) | |
292 retval = markup.wrap(markup.h1(title.title()) + captcha_err + retval, | |
293 pagetitle=pagetitle) | |
294 return retval | |
295 | |
296 def join_post(self): | |
297 """handle a join POST request""" | |
298 form = self.join_form() | |
299 errors = form.validate(self.request.POST) | |
300 | |
301 # validate captcha | |
302 if CAPTCHA: | |
303 key = self.request.POST.get('key') | |
304 try: | |
305 key = int(key) | |
306 except ValueError: | |
307 key = None | |
308 if not key: | |
309 errors['CAPTCHA'] = 'Please type the funky looking word' | |
310 word = self.keys.pop(key, None) | |
311 if not word: | |
312 errors['CAPTCHA'] = 'Please type the funky looking word' | |
313 if word != self.request.POST.get('captcha','').lower(): | |
314 errors['CAPTCHA'] = 'Sorry, you typed the wrong word' | |
315 | |
316 name = self.request.POST.get('Name', '') | |
317 if not name: | |
318 if not errors.has_key('Name'): | |
319 errors['Name'] = [] | |
320 errors['Name'].append('Please enter a user name') | |
321 if name in self.passwords(): | |
322 if not errors.has_key('Name'): | |
323 errors['Name'] = [] | |
324 errors['Name'].append('The name %s is already taken' % name) | |
325 | |
326 if not errors: # create a new user | |
327 self.newuser(name, | |
328 self.hash(name, self.request.POST['Password'])) | |
329 self.request.environ['REMOTE_USER'] = name # login the new user | |
330 self.request.environ['paste.auth_tkt.set_user'](name) | |
331 | |
332 return errors | |
333 | |
334 class BitsyAuth(object): | |
335 """outer middleware for auth; does the cookie handling and wrapping""" | |
336 | |
337 def __init__(self, app, global_conf, passwords, newuser, site='', secret='secret'): | |
338 self.app = app | |
339 self.path = '/logout' | |
340 self.cookie = '__ac' | |
341 auth = BitsyAuthInnerWare(app, passwords=passwords, newuser=newuser, site=site) | |
342 self.hash = auth.hash | |
343 # paste.auth.cookie | |
344 # self.cookie_handler = cookie.middleware(auth, cookie_name=self.cookie, timeout=90) # minutes | |
345 | |
346 # paste.auth.auth_tkt | |
347 self.cookie_handler = auth_tkt.make_auth_tkt_middleware(auth, global_conf, secret, cookie_name=self.cookie, logout_path='/logout') | |
348 | |
349 def __call__(self, environ, start_response): | |
350 if environ['PATH_INFO'] == '/logout': | |
351 pass | |
352 return self.cookie_handler(environ, start_response) | |
353 | |
354 def logout(self, environ): | |
355 req = Request(environ) | |
356 keys = [ 'REMOTE_USER' ] | |
357 # keys = [ 'REMOTE_USER', 'AUTH_TYPE', 'paste.auth.cookie', 'paste.cookies', 'HTTP_COOKIE' ] # XXX zealous kill | |
358 for key in keys: | |
359 req.environ.pop(key, None) | |
360 | |
361 body = '<html><head><title>logout</title></head><body>logout</body></html>' | |
362 res = Response(content_type='text/html', body=body) | |
363 res.content_length = len(res.body) | |
364 req.cookies.pop(self.cookie, None) | |
365 res.delete_cookie(self.cookie) | |
366 res.unset_cookie(self.cookie) | |
367 return res(environ, start_response) | |
368 |