- attachment:2fa.py of OneTimePasswords
Attachment '2fa.py'
Download 1 """Module to implement two-factor autentication using
2 rfc4226 HOTP or rfc6238 TOTP on your tracker.
3
4 Templating functions including QR code generator and two
5 actions:
6
7 * login using TOTP/HOTP
8 * Manage TOTP/HOTP - enroll using HOTP or TOTP, unenroll 2FA
9
10
11 Configuration is done using the extensions/config.ini file.
12 Settings with default values:
13
14 [HOTP]
15
16 # Presence of the key disables this mechanism. User will
17 # not be able to enroll using the mechanism. They will be
18 # able to login using the method unless the value is
19 # 'login_deny' (no quotes). This will tell the user to see
20 # an administrator.
21 disable =
22
23 # If the counter on the token generator and the tracker get
24 # out of sync, increment the counter up to window times to
25 # try to match the token the user entered.
26 window = 10
27
28 # Number of digits in the OTP token. 6 or 8 are usually supported by
29 # authenticators.
30 length = 6
31
32 # Name for this tracker used for issuer in the QR code.
33 # If empty, the string before the last / in the TRACKER_WEB
34 # config.ini setting.
35 issuer =
36
37 [TOTP]
38
39 # Same as HOTP.
40 disable =
41
42 # If the time sync on the token generator and this server
43 # are out of sync calculate the token for window 30 second
44 # periods before and after now. 0 means the user must
45 # generate and the server must process the token within the
46 # current 30 second window.
47 window = 0
48
49 # Same as HOTP.
50 length = 6
51
52 # Same as HOTP.
53 issuer =
54
55 """
56
57 from roundup.anypy.strings import b2s
58 from roundup.anypy import urllib_
59 from roundup.i18n import _
60
61 import logging
62
63 # import for generator
64 from roundup.cgi.actions import EditItemAction
65 from roundup.configuration import InvalidOptionError
66
67 import random
68
69 # imports for LoginAction replacement
70 from roundup.cgi.actions import LoginAction
71 from roundup.cgi.exceptions import Redirect, LoginError
72
73 import cgi
74 import math
75 import re
76 import time
77
78 # If this fails OTP doesn't work at all.
79 # So fail on import rather than at runtime.
80 from onetimepass import get_hotp, valid_hotp, valid_totp
81
82 try:
83 # Hopefully this works for python2
84 from StringIO import cStringIO as IOBuff
85 except ImportError:
86 # python 3
87 from io import StringIO as IOBuff
88
89 try:
90 # qrcode only supports python 3
91 import qrcode
92 import qrcode.image.svg
93 except ImportError:
94 qrcode = None
95
96 logger = logging.getLogger('extension')
97
98
99 def chunk_text(text_or_password, chunksize=4, errormsg=None):
100 """ Template function to chunk text by breaking one of
101 three types of text source:
102
103 plain text
104
105 HTML form field with _value
106
107 Password hyperdb class with _value.password
108 """
109 if not errormsg:
110 errormsg = _("No value present.")
111
112 if not isinstance(text_or_password, str):
113 try:
114 text = text_or_password._value.password
115 except AttributeError:
116 text = text_or_password._value
117 else:
118 text = text_or_password
119
120 if text == '-':
121 return errormsg
122
123 return ' '.join([text[x*chunksize:x*chunksize+chunksize] for x in
124 range(0, math.ceil(len(text)/chunksize))])
125
126
127 def format_current_count(count):
128 if count == -1:
129 return _("2FA disabled. No counter.")
130 if count == -2:
131 return _("Using time based 2FA. Counter unused.")
132 return count
133
134
135 def hotpIntegrityValue(user_context):
136 '''Return the OTP for counter value 0. Google Authenticator
137 uses this as the Integrity Check value for the key.
138 The first OTP that will be accepted starts at counter
139 value 1.
140 '''
141
142 if user_context.secret_counter._value == -2:
143 return _("Using time based 2FA. Check value not available.")
144
145 try:
146 key = user_context.secret_2fa._value.password
147 except AttributeError:
148 key = user_context.secret_2fa._value
149
150 if key == '-':
151 return _("2FA disabled. Check value not available.")
152 value = str(get_hotp(key, 0))
153 return chunk_text(value.rjust(6, '0'), chunksize=3)
154
155
156 def qrify(user_context, output="svg"):
157 '''Generate a QR code to enroll the user using Google
158 Authenticator and other client apps.
159 The QR code is a special url to enroll the user.
160 We set the following info in the QR:
161
162 identity: issuer:username@TRACKER_NAME
163 issuer: last element of TRACKER_WEB, HOTP_ISSUER or
164 TOTP_ISSUER from extensions/config.ini
165 All values are url encoded.
166 Generates an svg or ascii qr code. Default output=svg.
167 Ascii is useful for text browsers.
168 '''
169
170 if qrcode is None:
171 return _("QR code is not available.")
172
173 if user_context.secret_2fa._value == '-':
174 return _("No secret key is set.")
175
176 def get_ext_setting(setting, default=None):
177 try:
178 return user_context._db.config.ext[setting]
179 except InvalidOptionError:
180 return default
181
182 counter = int(user_context.secret_counter._value)
183
184 # can be a Password or web text item.
185 try:
186 secret = urllib_.quote(user_context.secret_2fa._value.password)
187 except AttributeError:
188 secret = urllib_.quote(user_context.secret_2fa._value)
189
190 user = urllib_.quote(user_context.username._value)
191 web_url = urllib_.quote(user_context._db.config['TRACKER_WEB'])
192
193 if user_context.secret_counter == -2:
194 # TOTP
195 issuer = urllib_.quote(get_ext_setting('TOTP_ISSUER', ''))
196 else:
197 issuer = urllib_.quote(get_ext_setting('HOTP_ISSUER', ''))
198
199 if not issuer:
200 # take last component of / terminated web url
201 issuer = urllib_.quote(web_url.split('/')[-2])
202 tracker = urllib_.quote(user_context._db.config['TRACKER_NAME'])
203
204 # neither sissuer nor user can have a ':'
205 issuer.replace(':', '_')
206 user.replace(':', '_')
207
208 # https://github.com/google/google-authenticator/wiki/Key-Uri-Format
209 if counter == 0: # HOTP
210 url = (("otpauth://hotp/%(issuer)s%%3A%(user)s@%(tracker)s?"
211 "secret=%(secret)s&issuer=%(issuer)s&counter=0") %
212 locals())
213 else: # counter = -2 TOTP
214 url = (("otpauth://totp/%(issuer)s%%3A%(user)s@%(tracker)s?"
215 "secret=%(secret)s&issuer=%(issuer)s") % locals())
216
217 if output == "svg":
218 factory = qrcode.image.svg.SvgPathImage
219 qr_svg = qrcode.make(url, image_factory=factory)
220 return b2s(qr_svg.to_string())
221 elif output == "ascii":
222 qr = qrcode.QRCode()
223 qr.add_data(url)
224 file_mock = IOBuff()
225 qr.print_ascii(out=file_mock)
226 file_mock.seek(0)
227 qr_text = file_mock.read()
228 # wrap it in pre tags to keep formatting
229 return '<pre class="ascii_otp">%s</pre>' % qr_text
230 # The following css is required to make it work on
231 # browsers that implement css. If the line height is
232 # not 1, blank space is created between the lines.
233 # The space makes QR recognition fail. It works fine
234 # without it on text browsers.
235 # pre.ascii_otp {line-height: 1em;}
236 else:
237 return "Error generating QR code."
238
239
240 def set_form_wins(client, value):
241 '''set client's form_wins attribute to the boolean value
242 passed in. If value is False, the templating code displays
243 values from the database rather than from the current form.
244
245 By default, the value is True so that a rejected form
246 displays the values the user entered rather than overwriting
247 them with the values from the database.
248
249 However we want to display the database values on the
250 user.2fa.html template as there is no user entered data
251 on the form that changes the secret key.
252 '''
253 client.form_wins = value
254
255
256 class Generate2faSecret(EditItemAction):
257 '''Change users HOTP secret or disable 2FA.
258
259 Values for schema properties:
260 secret_2fa: 32 characters from base32 alphabet or - if disabled
261 secret_counter:
262 integer >= 0 2fa HOTP enabled
263 integer = -1 2fa disabled
264 integer = -2 reserved for 2fa TOTP enabled (not yet implmented)
265
266 To prevent a user from manually setting the key by submitting
267 a form, we add a property to the db handle that the detector
268 looks for. If a web request comes in without that property on
269 the db, it raises an error.
270 '''
271 def handle(self):
272 ''' Generate new 2FA HOTP secret or remove 2FA enrollment.
273 '''
274
275 if __debug__:
276 logger.debug("Generate2faSecret: enter")
277
278 def get_ext_setting(setting, default=None):
279 try:
280 return self.db.config.ext[setting]
281 except InvalidOptionError:
282 return default
283
284 # add property on db object to be used by detector
285 # so it knows request came from here.
286 self.db.Generate2faSecret = "Generate2faSecret"
287
288 # Process unenrolling request.
289
290 # 'remove_2fa' is the name of the un-enrolling button
291 # in web interface.
292 if 'remove_2fa' in self.form:
293 # zero out 2FA creds and counter
294 self.form.list.append(
295 cgi.MiniFieldStorage("secret_2fa", "-")
296 )
297 # all password changes need confirmation element
298 self.form.list.append(
299 cgi.MiniFieldStorage("@confirm@secret_2fa", "-")
300 )
301
302 # empty reset counter when removing 2fa
303 self.form.list.append(
304 cgi.MiniFieldStorage("secret_counter", "-1")
305 )
306 # call the core EditItemAction to process the edit.
307 EditItemAction.handle(self)
308 return
309
310 # Now all we have is to enroll the user.
311
312 # define base32 alphabet for random string generation
313 alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
314
315 # get available otp methods
316 otp_methods = []
317
318 if get_ext_setting("HOTP_DISABLE") is None:
319 otp_methods.append('hotp')
320 if get_ext_setting("TOTP_DISABLE") is None:
321 otp_methods.append('totp')
322
323 if not otp_methods:
324 raise ValueError(_("All two factor methods are disabled."))
325
326 if '__enable_TOTP' in self.form:
327 if 'totp' in otp_methods:
328 use_totp = True
329 else:
330 raise ValueError(_("Time based method disabled"))
331 else:
332 if 'hotp' in otp_methods:
333 use_totp = False
334 else:
335 raise ValueError(_("Counter based method disabled"))
336
337 # Is this the best we can do?? How random is it?
338 # 160 bits (32 characters with 5 bits) of entropy.
339 new_secret = ''.join(
340 [random.choice(alphabet) for x in range(0, 32)])
341
342 self.form.list.append(
343 cgi.MiniFieldStorage("secret_2fa", new_secret)
344 )
345 self.form.list.append(
346 cgi.MiniFieldStorage("@confirm@secret_2fa", new_secret)
347 )
348
349 if use_totp is False:
350 # always reset counter to 0 when changing HOTP secret
351 self.form.list.append(
352 cgi.MiniFieldStorage("secret_counter", "0")
353 )
354 else:
355 self.form.list.append(
356 cgi.MiniFieldStorage("secret_counter", "-2")
357 )
358
359 # call the core EditItemAction to process the edit.
360 EditItemAction.handle(self)
361
362
363 def make_redirect_url(urlparse_dict):
364 """As part of 2FA login redirect to new url composed from source
365 URL plus new query parameters.
366 """
367 return urllib_.urlunparse((urlparse_dict['scheme'],
368 urlparse_dict['netloc'],
369 urlparse_dict['path'],
370 urlparse_dict['params'],
371 urllib_.urlencode(
372 list(
373 sorted(
374 urlparse_dict['query'].items()
375 )
376 ),
377 doseq=True
378 ),
379 urlparse_dict['fragment'])
380 )
381
382
383 def clean_url_dict(self):
384 """Return a dict with cleaned up components of source URL.
385
386 As part of login code, we need to redirect back to source page
387 but without @ok_message or @error_message query params.
388 """
389
390 if '__came_from' in self.form:
391 # On valid or invalid login, redirect the user back to the page
392 # they started on. Searches, issue, and other pages
393 # are all preserved in __came_from. Clean out any old feedback
394 # @error_message, @ok_message from the __came_from url.
395 #
396 # 1. Split the url into components.
397 # 2. Split the query string into parts.
398 # 3. Delete @error_message and @ok_message if present.
399 # 4. Define a new redirect_url missing the @...message entries.
400 # This will be redefined if there is a login error to include
401 # a new error message
402
403 clean_url = self.examine_url(self.form['__came_from'].value)
404 redirect_url_tuple = urllib_.urlparse(clean_url)
405 # now I have a tuple form for the __came_from url
406 try:
407 query = urllib_.parse_qs(redirect_url_tuple.query)
408 if "@error_message" in query:
409 del query["@error_message"]
410 if "@ok_message" in query:
411 del query["@ok_message"]
412 if "@action" in query:
413 # also remove the action from the redirect
414 # there is only ever one @action value.
415 if query['@action'] == ["logout"]:
416 del query["@action"]
417 except AttributeError:
418 # no query param so nothing to remove. Just define.
419 query = {}
420 pass
421
422 urlparse_dict = {"scheme": redirect_url_tuple.scheme,
423 "netloc": redirect_url_tuple.netloc,
424 "path": redirect_url_tuple.path,
425 "params": redirect_url_tuple.params,
426 "query": query,
427 "fragment": redirect_url_tuple.fragment
428 }
429 return urlparse_dict
430 else:
431 return None
432
433
434 class OtpLoginAction(LoginAction):
435 '''Login Action requiring one time password (HOTP/TOTP)
436 to successfully log in.
437 '''
438
439 # Calculate TOTP for an interval of this many seconds
440 # Changing this INVALIDATES all TOTP's.
441 # Some authenticators won't allow a value other than 30.
442 # Read the rfc before changing this.
443 totp_interval_in_sec = 30
444
445 def handle(self):
446
447 if __debug__:
448 logger.debug("OtpLoginAction: enter")
449
450 redirect_url_dict = clean_url_dict(self)
451
452 def get_ext_setting(setting, default=None):
453 try:
454 return self.db.config.ext[setting]
455 except InvalidOptionError:
456 return default
457
458 # store exceptions thrown by verifyLogin to expose
459 # error message later
460 verifyLoginException = None
461
462 # Find enabled methods.
463 otp_methods = []
464 if get_ext_setting("HOTP_DISABLE") is None:
465 otp_methods.append('hotp')
466 if get_ext_setting("TOTP_DISABLE") is None:
467 otp_methods.append('totp')
468
469 if '__login_name' in self.form:
470 username = self.form['__login_name'].value
471 try:
472 # get the info for the user
473 userid = self.db.user.lookup(username)
474
475 try:
476 secret = self.db.user.get(userid, 'secret_2fa',
477 "").password
478 except AttributeError:
479 # thrown if secret is empty
480 secret = '-'
481
482 last = self.db.user.get(userid, 'secret_counter')
483
484 if __debug__:
485 logger.debug("Login username %s is valid", username)
486 except KeyError:
487 # No such user so login will fail.
488 # Set variables as though OTP is disabled.
489 # Don't return here as a timing attack can find
490 # valid usernames.
491 secret = "-" # user's TOTP/HOTP secret
492 last = -1 # sequence number of last used HOTP -1 invalid
493
494 try:
495 last = int(last)
496 except (ValueError, TypeError):
497 # string but not an int or non-string
498 last = -1 # sentinal value - indicates OTP disabled
499
500 # sanity check
501 if (secret == "-" and last != -1) or \
502 (secret != "-" and last == -1):
503 # somebody is playing games. This should never happen.
504 # abort immediately.
505 errormsg = _("Internal error. Secret key and counter mismatch.")
506 # keep user on original page and report error.
507 if redirect_url_dict is None:
508 self.client.add_error_message(errormsg)
509 return
510 else:
511 redirect_url_dict['query']['@error_message'] = errormsg
512 raise Redirect(make_redirect_url(redirect_url_dict))
513
514 if '__login_password' in self.form:
515 password = self.form['__login_password'].value
516 else:
517 password = ""
518
519 if '__otp' in self.form:
520 token = self.form['__otp'].value
521 # remove spaces/tabs. Some auth generators
522 # chunk the token, copy/paste includes the spaces
523 token = re.sub(r'\s', '', token)
524 else:
525 if secret == '-':
526 # 2fa not enabled, do a normal login
527 LoginAction.handle(self)
528 # LoginAction "returns" an exception
529 # so the return below should not be reached
530 return
531 else:
532 token = ""
533
534 if last == -2:
535 # TOTP - window is the number of 30 second
536 # (interval) periods before/after the current 30
537 # second period. 0 only matches the code for
538 # this (30) second period.
539 windows = int(get_ext_setting('TOTP_WINDOW', 0))
540 else:
541 # HOTP - window we will search for a match so
542 # up to 10 keys past the last used one can be valid.
543 windows = int(get_ext_setting('HOTP_WINDOW', 10))
544
545 if last == -2:
546 # TOTP - token length number of numbers in token
547 # 6 or 8 are usually supported
548 token_length = int(get_ext_setting('TOTP_LENGTH', 6))
549 else:
550 token_length = int(get_ext_setting('HOTP_LENGTH', 6))
551
552 # Verify TOTP/HOTP and prevent replay
553 if last >= 0:
554 # uppercase because it's used for retreiving a config setting.
555 otp_method = "HOTP"
556
557 # valid_hotp returns: integer > 0 (match sequence number) or False
558 otp_ok = valid_hotp(token,
559 secret,
560 token_length=token_length,
561 trials=windows,
562 last=last)
563
564 # FIXME: should the TOTP stuff be done here to prevent
565 # leaking that the user is using HOTP and not TOTP. TOTP
566 # validation will take longer. Try sleeping
567 # random.random()/10 seconds.
568 time.sleep(random.random()/10)
569
570 # invalidate an exposed HOTP to make it single
571 # use even if username/password fails.
572 if otp_ok:
573 self.db.user.set(userid, secret_counter=otp_ok)
574 self.db.commit()
575
576 elif last == -2:
577 # uppercase because it's used for retreiving a config setting.
578 otp_method = "TOTP"
579
580 interval = self.totp_interval_in_sec
581
582 # FIXME: consider setting clock argument for
583 # valid_otp to (now - interval) and then None.
584 # Allow one interval look back to account for
585 # entry delay and network transmission between
586 # synchronized clocks. This is a look back
587 # window only. Note setting windows=1 looks
588 # forward and back one interval.
589
590 # valid_totp returns: True or False
591 otp_ok = valid_totp(token,
592 secret,
593 token_length=token_length,
594 window=windows,
595 interval_length=interval)
596
597 # verify that token has not been used
598 # check against last TOTP to prevent replay
599 # Use session db so multiple tokens
600 # (TOTP_used-<user>-<token>) with lifetime of
601 # now + (window * 30 seconds or (interval))
602 # can be saved and expired by usual method?
603 otk = self.client.db.Otk
604
605 totp_key = "TOTP_used-%s-%s" % (username, token)
606
607 try:
608 _discard = otk.getall(totp_key)
609 # token has been seen before, login must fail.
610 otp_ok = False
611 except KeyError:
612 # ignore if key not found
613 pass
614
615 # Calculate a timestamp that will make OTK.clean()
616 # expire the entry 1.1 x interval plus windows * interval
617 # after the current time. 1.1 is just to make sure we record
618 # the TOTP through it's entire interval.
619 # (FYI expiration is 1 week, so we subtract a week first)
620 ts = time.time() - (60 * 60 * 24 * 7) \
621 + (interval * windows) + 1.1*interval
622
623 if otp_ok:
624 # this is the only code that depends on a valid otp
625 otk.set(totp_key, token=token, __timestamp=ts)
626
627 # The time spent cleaning the OTK db depends on
628 # on number of records in database. Attempts to
629 # determine if the TOTP was correct via timing attack
630 # may be made more difficult by doing housekeeping.
631 otk.clean()
632 otk.commit()
633
634 else:
635 # process the token so we take the same amount of
636 # time as when 2fa is enabled but the token is wrong.
637 _discard = valid_hotp(token, 'A'*32, last=0, trials=windows)
638 time.sleep(random.random()/10) # combat timing attack
639
640 # Since 2fa not enabled: set otp_ok False
641 otp_ok = False
642
643 login_ok = False
644 try:
645 LoginAction.verifyLogin(self, username, password)
646 login_ok = True
647 except LoginError as e:
648 # store for later use
649 verifyLoginException = e
650
651 # If user supplied the next OTP in the HOTP sequence
652 # or if it passes with a TOTP, call core LoginAction.
653 if otp_ok and ((otp_ok == last + 1) or (last == -2)):
654 # if otp method is not supported and logins disabled
655 # using it, reject with a suitable error.
656
657 if get_ext_setting(otp_method + "_DISABLE") == 'login_deny':
658 # implement part of workflow from core LoginAction.handle
659 self.client.make_user_anonymous()
660 self.client.session_api.destroy()
661
662 errormsg = _('Login disabled please see Administrator')
663 if redirect_url_dict is None:
664 self.client.add_error_message(errormsg)
665 return
666 else:
667 redirect_url_dict['query']['@error_message'] = errormsg
668 raise Redirect(make_redirect_url(redirect_url_dict))
669
670 # perform normal user login
671 LoginAction.handle(self)
672 return # just in case control returns here
673
674 if (last != -2) and otp_ok and login_ok:
675 # User has skipped one or more HOTP's save the sequence
676 # number and reject the login. Request they enter the next
677 # number in the sequence.
678 #
679 # FIXME how to detect hotp and not login_ok to force two
680 # hotp keys in a row. Maybe a new field secret_missing
681 # to store state between login attempts?
682
683 # implement part of workflow from core LoginAction.handle
684 self.client.make_user_anonymous()
685 self.client.session_api.destroy()
686
687 errormsg = _(
688 'Multiple One Time Passwords were skipped. '
689 'Please log in again with your next One Time Password.')
690
691 if redirect_url_dict is None:
692 self.client.add_error_message(errormsg)
693 return
694 else:
695 redirect_url_dict['query']['@error_message'] = errormsg
696 raise Redirect(make_redirect_url(redirect_url_dict))
697
698 if not login_ok:
699 # implement part of workflow from core LoginAction.handle
700 self.client.make_user_anonymous()
701
702 # keep user on same page, report error from verifyLogin.
703 if redirect_url_dict is None:
704 for arg in verifyLoginException.args:
705 self.client.add_error_message(arg)
706 return
707 else:
708 redirect_url_dict['query']['@error_message'] = \
709 verifyLoginException.args
710 raise Redirect(make_redirect_url(redirect_url_dict))
711
712 # If hotp/totp was not matched, fail login.
713 if otp_ok is False:
714 # implement part of workflow from core LoginAction.handle
715 self.client.make_user_anonymous()
716 self.client.session_api.destroy()
717 # Match LoginAction.handle error so we don't give away info
718 errormsg = _('Invalid login')
719
720 # keep user on original page and report error.
721 if redirect_url_dict is None:
722 self.client.add_error_message(errormsg)
723 return
724 else:
725 redirect_url_dict['query']['@error_message'] = errormsg
726 raise Redirect(make_redirect_url(redirect_url_dict))
727
728
729 def init(instance):
730 instance.registerUtil('chunk_text', chunk_text)
731 instance.registerUtil('format_current_count', format_current_count)
732 instance.registerUtil('hotpIntegrityValue', hotpIntegrityValue)
733 instance.registerUtil('qrify', qrify)
734 if 'set_form_wins' not in instance.templating_utils:
735 instance.registerUtil('set_form_wins', set_form_wins)
736
737 instance.registerAction('gen2fasecret', Generate2faSecret)
738 instance.registerAction('login', OtpLoginAction)
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.