Using a Python 3 compatible base64 encoder and fixing verify_emails in Python 3 (#336)

This commit is contained in:
Kevin Chung
2017-08-09 04:17:10 -04:00
committed by GitHub
parent cfc87e7a2b
commit 1a077f72ed
4 changed files with 89 additions and 14 deletions

View File

@@ -2,7 +2,6 @@ import logging
import os import os
import re import re
import time import time
import urllib
from flask import current_app as app, render_template, request, redirect, url_for, session, Blueprint from flask import current_app as app, render_template, request, redirect, url_for, session, Blueprint
from itsdangerous import TimedSerializer, BadTimeSignature, Signer, BadSignature from itsdangerous import TimedSerializer, BadTimeSignature, Signer, BadSignature
@@ -25,8 +24,10 @@ def confirm_user(data=None):
# User is confirming email account # User is confirming email account
if data and request.method == "GET": if data and request.method == "GET":
try: try:
s = Signer(app.config['SECRET_KEY']) s = TimedSerializer(app.config['SECRET_KEY'])
email = s.unsign(urllib.unquote_plus(data.decode('base64'))) email = s.loads(utils.base64decode(data, urldecode=True), max_age=1800)
except BadTimeSignature:
return render_template('confirm.html', errors=['Your confirmation link has expired'])
except BadSignature: except BadSignature:
return render_template('confirm.html', errors=['Your confirmation link seems wrong']) return render_template('confirm.html', errors=['Your confirmation link seems wrong'])
except: except:
@@ -82,7 +83,7 @@ def reset_password(data=None):
if data is not None and request.method == "POST": if data is not None and request.method == "POST":
try: try:
s = TimedSerializer(app.config['SECRET_KEY']) s = TimedSerializer(app.config['SECRET_KEY'])
name = s.loads(urllib.unquote_plus(data.decode('base64')), max_age=1800) name = s.loads(utils.base64decode(data, urldecode=True), max_age=1800)
except BadTimeSignature: except BadTimeSignature:
return render_template('reset_password.html', errors=['Your link has expired']) return render_template('reset_password.html', errors=['Your link has expired'])
except: except:
@@ -110,7 +111,7 @@ Did you initiate a password reset?
{0}/{1} {0}/{1}
""".format(url_for('auth.reset_password', _external=True), urllib.quote_plus(token.encode('base64'))) """.format(url_for('auth.reset_password', _external=True), utils.base64encode(token, urlencode=True))
utils.sendmail(email, text) utils.sendmail(email, text)

View File

@@ -119,7 +119,9 @@ class Config(object):
class TestingConfig(Config): class TestingConfig(Config):
SECRET_KEY = 'AAAAAAAAAAAAAAAAAAAA'
PRESERVE_CONTEXT_ON_EXCEPTION = False PRESERVE_CONTEXT_ON_EXCEPTION = False
TESTING = True TESTING = True
DEBUG = True DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite://' SQLALCHEMY_DATABASE_URI = 'sqlite://'
SERVER_NAME = 'localhost'

View File

@@ -1,3 +1,4 @@
import base64
import datetime import datetime
import functools import functools
import hashlib import hashlib
@@ -16,7 +17,6 @@ import subprocess
import sys import sys
import tempfile import tempfile
import time import time
import urllib
import dataset import dataset
import zipfile import zipfile
import io import io
@@ -25,8 +25,8 @@ from email.mime.text import MIMEText
from flask import current_app as app, request, redirect, url_for, session, render_template, abort from flask import current_app as app, request, redirect, url_for, session, render_template, abort
from flask_caching import Cache from flask_caching import Cache
from flask_migrate import Migrate, upgrade as migrate_upgrade, stamp as migrate_stamp from flask_migrate import Migrate, upgrade as migrate_upgrade, stamp as migrate_stamp
from itsdangerous import Signer from itsdangerous import TimedSerializer, BadTimeSignature, Signer, BadSignature
from six.moves.urllib.parse import urlparse, urljoin from six.moves.urllib.parse import urlparse, urljoin, quote, unquote
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from CTFd.models import db, WrongKeys, Pages, Config, Tracking, Teams, Files, Containers, ip2long, long2ip from CTFd.models import db, WrongKeys, Pages, Config, Tracking, Teams, Files, Containers, ip2long, long2ip
@@ -495,11 +495,12 @@ def sendmail(addr, text):
def verify_email(addr): def verify_email(addr):
s = Signer(app.config['SECRET_KEY']) s = TimedSerializer(app.config['SECRET_KEY'])
token = s.sign(addr) token = s.dumps(addr)
text = """Please click the following link to confirm your email address for {}: {}""".format( text = """Please click the following link to confirm your email address for {ctf_name}: {url}/{token}""".format(
get_config('ctf_name'), ctf_name=get_config('ctf_name'),
url_for('auth.confirm_user', _external=True) + '/' + urllib.quote_plus(token.encode('base64')) url=url_for('auth.confirm_user', _external=True),
token=base64encode(token, urlencode=True)
) )
sendmail(addr, text) sendmail(addr, text)
@@ -522,6 +523,28 @@ def sha512(string):
return hashlib.sha512(string).hexdigest() return hashlib.sha512(string).hexdigest()
def base64encode(s, urlencode=False):
if six.PY3 and isinstance(s, six.string_types):
s = s.encode('utf-8')
encoded = base64.urlsafe_b64encode(s)
if six.PY3:
encoded = encoded.decode('utf-8')
if urlencode:
encoded = quote(encoded)
return encoded
def base64decode(s, urldecode=False):
if urldecode:
s = unquote(s)
if six.PY3 and isinstance(s, six.string_types):
s = s.encode('utf-8')
decoded = base64.urlsafe_b64decode(s)
if six.PY3:
decoded = decoded.decode('utf-8')
return decoded
@cache.memoize() @cache.memoize()
def can_create_container(): def can_create_container():
try: try:

View File

@@ -3,7 +3,8 @@
from tests.helpers import * from tests.helpers import *
from CTFd.models import ip2long, long2ip from CTFd.models import ip2long, long2ip
from CTFd.utils import get_config, set_config, override_template, sendmail from CTFd.utils import get_config, set_config, override_template, sendmail, verify_email
from CTFd.utils import base64encode, base64decode
from mock import patch from mock import patch
import json import json
@@ -41,6 +42,20 @@ def test_long2ip_ipv6():
assert long2ip(42540616829182469433547762482097946625) == '2001:658:22a:cafe:200::1' assert long2ip(42540616829182469433547762482097946625) == '2001:658:22a:cafe:200::1'
def test_base64encode():
"""The base64encode wrapper works properly"""
assert base64encode('abc123') == 'YWJjMTIz'
assert base64encode('😆') == '8J-Yhg=='
assert base64encode('😆', urlencode=True) == '8J-Yhg%3D%3D'
def test_base64decode():
"""The base64decode wrapper works properly"""
assert base64decode('YWJjMTIz') == 'abc123'
assert base64decode('8J-Yhg==') == '😆'
assert base64decode('8J-Yhg%3D%3D', urldecode=True) == '😆'
def test_override_template(): def test_override_template():
"""Does override_template work properly for regular themes""" """Does override_template work properly for regular themes"""
app = create_ctfd() app = create_ctfd()
@@ -93,3 +108,37 @@ def test_sendmail_with_smtp(mock_smtp):
mock_smtp.return_value.sendmail.assert_called_once_with(from_addr, [to_addr], email_msg.as_string()) mock_smtp.return_value.sendmail.assert_called_once_with(from_addr, [to_addr], email_msg.as_string())
destroy_ctfd(app) destroy_ctfd(app)
@patch('smtplib.SMTP')
def test_verify_email(mock_smtp):
"""Does verify_email send emails"""
from email.mime.text import MIMEText
app = create_ctfd()
with app.app_context():
set_config('mail_server', 'localhost')
set_config('mail_port', 25)
set_config('mail_username', 'username')
set_config('mail_password', 'password')
set_config('verify_emails', True)
from_addr = get_config('mailfrom_addr') or app.config.get('MAILFROM_ADDR')
to_addr = 'user@user.com'
verify_email(to_addr)
# This is currently not actually validated
msg = ("Please click the following link to confirm"
" your email address for CTFd:"
" http://localhost/confirm/InVzZXJAdXNlci5jb20iLkRHbGFZUS5XUURfQzBub3pGZkFMRlIyeGxDS1BCMjZETlk%3D")
ctf_name = get_config('ctf_name')
email_msg = MIMEText(msg)
email_msg['Subject'] = "Message from {0}".format(ctf_name)
email_msg['From'] = from_addr
email_msg['To'] = to_addr
# Need to freeze time to predict the value of the itsdangerous token.
# For now just assert that sendmail was called.
mock_smtp.return_value.sendmail.assert_called()
destroy_ctfd(app)