820 python 3 only (#1454)

* Remove Python 2 specific code
* Require imports to have a proper isort-supported order
* Only test/lint on Python 3
* Bump most dependencies to latest supported version
This commit is contained in:
Kevin Chung
2020-05-30 02:43:49 -04:00
committed by GitHub
parent 72be918e06
commit 76e5ad08a8
45 changed files with 213 additions and 323 deletions

View File

@@ -32,7 +32,7 @@ jobs:
strategy:
matrix:
python-version: ['2.7', '3.6']
python-version: ['3.6']
TESTING_DATABASE_URL: ['mysql+pymysql://root@localhost/ctfd', 'sqlite://', 'postgres://postgres@localhost/ctfd']
name: Python ${{ matrix.python-version }}

2
.gitignore vendored
View File

@@ -36,6 +36,7 @@ pip-delete-this-directory.txt
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
@@ -57,6 +58,7 @@ target/
*.db
*.log
*.log.*
.idea/
.vscode/
CTFd/static/uploads

7
.isort.cfg Normal file
View File

@@ -0,0 +1,7 @@
[settings]
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
use_parentheses=True
line_length=88
skip=migrations

View File

@@ -19,13 +19,11 @@ env:
- TESTING_DATABASE_URL='sqlite://'
- TESTING_DATABASE_URL='postgres://postgres@localhost/ctfd'
python:
- 2.7
- 3.6
before_install:
- sudo rm -f /etc/boto.cfg
- export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
- export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
- python3.6 -m pip install black==19.3b0
install:
- pip install -r development.txt
- yarn install --non-interactive

View File

@@ -8,7 +8,6 @@ from flask import Flask, Request
from flask_migrate import upgrade
from jinja2 import FileSystemLoader
from jinja2.sandbox import SandboxedEnvironment
from six.moves import input
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.utils import cached_property
@@ -129,7 +128,7 @@ def confirm_upgrade():
print("/*\\ CTFd has updated and must update the database! /*\\")
print("/*\\ Please backup your database before proceeding! /*\\")
print("/*\\ CTFd maintainers are not responsible for any data loss! /*\\")
if input("Run database migrations (Y/N)").lower().strip() == "y":
if input("Run database migrations (Y/N)").lower().strip() == "y": # nosec B322
return True
else:
print("/*\\ Ignored database migrations... /*\\")
@@ -215,16 +214,10 @@ def create_app(config="CTFd.config.Config"):
if reverse_proxy:
if type(reverse_proxy) is str and "," in reverse_proxy:
proxyfix_args = [int(i) for i in reverse_proxy.split(",")]
app.wsgi_app = ProxyFix(app.wsgi_app, None, *proxyfix_args)
app.wsgi_app = ProxyFix(app.wsgi_app, *proxyfix_args)
else:
app.wsgi_app = ProxyFix(
app.wsgi_app,
num_proxies=None,
x_for=1,
x_proto=1,
x_host=1,
x_port=1,
x_prefix=1,
app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1
)
version = utils.get_config("ctf_version")

View File

@@ -1,8 +1,8 @@
import csv
import datetime
import os
from io import BytesIO, StringIO
import six
from flask import Blueprint, abort
from flask import current_app as app
from flask import (
@@ -14,7 +14,18 @@ from flask import (
url_for,
)
from CTFd.cache import cache, clear_config, clear_standings, clear_pages
admin = Blueprint("admin", __name__)
# isort:imports-firstparty
from CTFd.admin import challenges # noqa: F401
from CTFd.admin import notifications # noqa: F401
from CTFd.admin import pages # noqa: F401
from CTFd.admin import scoreboard # noqa: F401
from CTFd.admin import statistics # noqa: F401
from CTFd.admin import submissions # noqa: F401
from CTFd.admin import teams # noqa: F401
from CTFd.admin import users # noqa: F401
from CTFd.cache import cache, clear_config, clear_pages, clear_standings
from CTFd.models import (
Awards,
Challenges,
@@ -40,17 +51,6 @@ from CTFd.utils.security.auth import logout_user
from CTFd.utils.uploads import delete_file
from CTFd.utils.user import is_admin
admin = Blueprint("admin", __name__)
from CTFd.admin import challenges # noqa: F401
from CTFd.admin import notifications # noqa: F401
from CTFd.admin import pages # noqa: F401
from CTFd.admin import scoreboard # noqa: F401
from CTFd.admin import statistics # noqa: F401
from CTFd.admin import submissions # noqa: F401
from CTFd.admin import teams # noqa: F401
from CTFd.admin import users # noqa: F401
@admin.route("/admin", methods=["GET"])
def view():
@@ -126,7 +126,7 @@ def export_csv():
if model is None:
abort(404)
temp = six.StringIO()
temp = StringIO()
writer = csv.writer(temp)
header = [column.name for column in model.__mapper__.columns]
@@ -142,7 +142,7 @@ def export_csv():
temp.seek(0)
# In Python 3 send_file requires bytes
output = six.BytesIO()
output = BytesIO()
output.write(temp.getvalue().encode("utf-8"))
output.seek(0)
temp.close()

View File

@@ -1,6 +1,5 @@
import os
import six
from flask import current_app as app
from flask import render_template, render_template_string, request, url_for
@@ -56,7 +55,7 @@ def challenges_detail(challenge_id):
"rb",
) as update:
tpl = update.read()
if six.PY3 and isinstance(tpl, binary_type):
if isinstance(tpl, binary_type):
tpl = tpl.decode("utf-8")
update_j2 = render_template_string(tpl, challenge=challenge)

View File

@@ -2,9 +2,9 @@ from flask import request
from flask_restx import Namespace, Resource
from CTFd.cache import clear_standings
from CTFd.utils.config import is_teams_mode
from CTFd.models import Awards, db, Users
from CTFd.models import Awards, Users, db
from CTFd.schemas.awards import AwardSchema
from CTFd.utils.config import is_teams_mode
from CTFd.utils.decorators import admins_only
awards_namespace = Namespace("awards", description="Endpoint to retrieve Awards")

View File

@@ -4,8 +4,9 @@ statistics_namespace = Namespace(
"statistics", description="Endpoint to retrieve Statistics"
)
# isort:imports-firstparty
from CTFd.api.v1.statistics import challenges # noqa: F401
from CTFd.api.v1.statistics import scores # noqa: F401
from CTFd.api.v1.statistics import submissions # noqa: F401
from CTFd.api.v1.statistics import teams # noqa: F401
from CTFd.api.v1.statistics import users # noqa: F401
from CTFd.api.v1.statistics import scores # noqa: F401

View File

@@ -3,7 +3,7 @@ from collections import defaultdict
from flask_restx import Resource
from CTFd.api.v1.statistics import statistics_namespace
from CTFd.models import db, Challenges
from CTFd.models import Challenges, db
from CTFd.utils.decorators import admins_only
from CTFd.utils.scores import get_standings

View File

@@ -6,10 +6,10 @@ from flask import current_app as app
from flask import redirect, render_template, request, session, url_for
from itsdangerous.exc import BadSignature, BadTimeSignature, SignatureExpired
from CTFd.cache import clear_team_session, clear_user_session
from CTFd.models import Teams, Users, db
from CTFd.utils import config, email, get_app_config, get_config
from CTFd.utils import user as current_user
from CTFd.cache import clear_user_session, clear_team_session
from CTFd.utils import validators
from CTFd.utils.config import is_teams_mode
from CTFd.utils.config.integrations import mlc_registration

View File

@@ -1,4 +1,5 @@
from enum import Enum
from flask import current_app
JS_ENUMS = {}

View File

@@ -5,9 +5,6 @@ from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import column_property, validates
from CTFd.utils.crypto import hash_password
from CTFd.utils.humanize.numbers import ordinalize
db = SQLAlchemy()
ma = Marshmallow()
@@ -255,6 +252,8 @@ class Users(db.Model):
@validates("password")
def validate_password(self, key, plaintext):
from CTFd.utils.crypto import hash_password
return hash_password(str(plaintext))
@hybrid_property
@@ -362,6 +361,7 @@ class Users(db.Model):
application itself will result in a circular import.
"""
from CTFd.utils.scores import get_user_standings
from CTFd.utils.humanize.numbers import ordinalize
standings = get_user_standings(admin=admin)
@@ -412,6 +412,8 @@ class Teams(db.Model):
@validates("password")
def validate_password(self, key, plaintext):
from CTFd.utils.crypto import hash_password
return hash_password(str(plaintext))
@property
@@ -501,6 +503,7 @@ class Teams(db.Model):
application itself will result in a circular import.
"""
from CTFd.utils.scores import get_team_standings
from CTFd.utils.humanize.numbers import ordinalize
standings = get_team_standings(admin=admin)

View File

@@ -15,9 +15,9 @@ from CTFd.models import (
db,
)
from CTFd.plugins import register_plugin_assets_directory
from CTFd.plugins.migrations import upgrade
from CTFd.plugins.challenges import CHALLENGE_CLASSES, BaseChallenge
from CTFd.plugins.flags import get_flag_class
from CTFd.plugins.migrations import upgrade
from CTFd.utils.modes import get_model
from CTFd.utils.uploads import delete_file
from CTFd.utils.user import get_ip

View File

@@ -1,6 +1,6 @@
from flask import Blueprint, redirect, render_template, request, url_for
from CTFd.cache import clear_user_session, clear_team_session
from CTFd.cache import clear_team_session, clear_user_session
from CTFd.models import Teams, db
from CTFd.utils import config, get_config
from CTFd.utils.crypto import verify_password

View File

@@ -1,21 +1,19 @@
import cmarkgfm
import six
from flask import current_app as app
# isort:imports-firstparty
from CTFd.cache import cache
from CTFd.models import Configs, db
if six.PY2:
string_types = (str, unicode) # noqa: F821
text_type = unicode # noqa: F821
binary_type = str
else:
string_types = (str,)
text_type = str
binary_type = bytes
string_types = (str,)
text_type = str
binary_type = bytes
markdown = lambda md: cmarkgfm.markdown_to_html_with_extensions(
md, extensions=["autolink", "table", "strikethrough"]
)
def markdown(md):
return cmarkgfm.markdown_to_html_with_extensions(
md, extensions=["autolink", "table", "strikethrough"]
)
def get_app_config(key, default=None):
@@ -32,7 +30,7 @@ def _get_config(key):
value = config.value
if value and value.isdigit():
return int(value)
elif value and isinstance(value, six.string_types):
elif value and isinstance(value, string_types):
if value.lower() == "true":
return True
elif value.lower() == "false":
@@ -62,6 +60,3 @@ def set_config(key, value):
db.session.commit()
cache.delete_memoized(_get_config, key)
return config
from CTFd.models import Configs, db # noqa: E402

View File

@@ -1,5 +1,5 @@
from CTFd.cache import cache
from CTFd.models import db, Pages
from CTFd.models import Pages, db
@cache.memoize()

View File

@@ -1,9 +1,5 @@
import six
import smtplib
from email.mime.text import MIMEText
if six.PY3:
from email.message import EmailMessage
from email.message import EmailMessage
from socket import timeout
from CTFd.utils import get_app_config, get_config
@@ -52,20 +48,14 @@ def sendmail(addr, text, subject):
try:
smtp = get_smtp(**data)
if six.PY2:
msg = MIMEText(text)
else:
msg = EmailMessage()
msg.set_content(text)
msg = EmailMessage()
msg.set_content(text)
msg["Subject"] = subject
msg["From"] = mailfrom_addr
msg["To"] = addr
if six.PY2:
smtp.sendmail(msg["From"], [msg["To"]], msg.as_string())
else:
smtp.send_message(msg)
smtp.send_message(msg)
smtp.quit()
return True, "Email sent"

View File

@@ -1,60 +1,48 @@
import base64
import codecs
import six
from CTFd.utils import string_types
def hexencode(s):
if six.PY3 and isinstance(s, string_types):
if isinstance(s, string_types):
s = s.encode("utf-8")
encoded = codecs.encode(s, "hex")
if six.PY3:
try:
encoded = encoded.decode("utf-8")
except UnicodeDecodeError:
pass
try:
encoded = encoded.decode("utf-8")
except UnicodeDecodeError:
pass
return encoded
def hexdecode(s):
decoded = codecs.decode(s, "hex")
if six.PY3:
try:
decoded = decoded.decode("utf-8")
except UnicodeDecodeError:
pass
try:
decoded = decoded.decode("utf-8")
except UnicodeDecodeError:
pass
return decoded
def base64encode(s):
if six.PY3 and isinstance(s, string_types):
if isinstance(s, string_types):
s = s.encode("utf-8")
else:
# Python 2 support because the base64 module doesnt like unicode
s = str(s)
encoded = base64.urlsafe_b64encode(s).rstrip(b"\n=")
if six.PY3:
try:
encoded = encoded.decode("utf-8")
except UnicodeDecodeError:
pass
try:
encoded = encoded.decode("utf-8")
except UnicodeDecodeError:
pass
return encoded
def base64decode(s):
if six.PY3 and isinstance(s, string_types):
if isinstance(s, string_types):
s = s.encode("utf-8")
else:
# Python 2 support because the base64 module doesnt like unicode
s = str(s)
decoded = base64.urlsafe_b64decode(s.ljust(len(s) + len(s) % 4, b"="))
if six.PY3:
try:
decoded = decoded.decode("utf-8")
except UnicodeDecodeError:
pass
try:
decoded = decoded.decode("utf-8")
except UnicodeDecodeError:
pass
return decoded

View File

@@ -1,14 +1,13 @@
import json
from collections import defaultdict
from queue import Queue
import six
from gevent import Timeout
from six.moves.queue import Queue
from CTFd.cache import cache
from CTFd.utils import string_types
@six.python_2_unicode_compatible
class ServerSentEvent(object):
def __init__(self, data, type=None, id=None):
self.data = data
@@ -16,7 +15,7 @@ class ServerSentEvent(object):
self.id = id
def __str__(self):
if isinstance(self.data, six.string_types):
if isinstance(self.data, string_types):
data = self.data
else:
data = json.dumps(self.data)
@@ -68,7 +67,7 @@ class EventManager(object):
class RedisEventManager(EventManager):
def __init__(self):
super(EventManager, self).__init__()
self.client = cache.cache._client
self.client = cache.cache._write_client
def publish(self, data, type=None, channel="ctf"):
event = ServerSentEvent(data, type=type)

View File

@@ -4,9 +4,9 @@ import os
import re
import tempfile
import zipfile
from io import BytesIO
import dataset
import six
from alembic.util import CommandError
from flask import current_app as app
from flask_migrate import upgrade as migration_upgrade
@@ -17,8 +17,9 @@ from CTFd import __version__ as CTFD_VERSION
from CTFd.cache import cache
from CTFd.models import db, get_class_by_tablename
from CTFd.plugins import get_plugin_names
from CTFd.plugins.migrations import upgrade as plugin_upgrade, current as plugin_current
from CTFd.utils import get_app_config, set_config
from CTFd.plugins.migrations import current as plugin_current
from CTFd.plugins.migrations import upgrade as plugin_upgrade
from CTFd.utils import get_app_config, set_config, string_types
from CTFd.utils.exports.freeze import freeze_export
from CTFd.utils.migrations import (
create_database,
@@ -42,7 +43,7 @@ def export_ctf():
tables = db.tables
for table in tables:
result = db[table].all()
result_file = six.BytesIO()
result_file = BytesIO()
freeze_export(result, fileobj=result_file)
result_file.seek(0)
backup_zip.writestr("db/{}.json".format(table), result_file.read())
@@ -54,7 +55,7 @@ def export_ctf():
"results": [{"version_num": get_current_revision()}],
"meta": {},
}
result_file = six.StringIO()
result_file = BytesIO()
json.dump(result, result_file)
result_file.seek(0)
backup_zip.writestr("db/alembic_version.json", result_file.read())
@@ -209,7 +210,7 @@ def import_ctf(backup, erase=True):
if sqlite:
direct_table = get_class_by_tablename(table.name)
for k, v in entry.items():
if isinstance(v, six.string_types):
if isinstance(v, string_types):
# We only want to apply this hack to columns that are expecting a datetime object
try:
is_dt_column = (
@@ -246,9 +247,7 @@ def import_ctf(backup, erase=True):
"db/awards.json",
):
requirements = entry.get("requirements")
if requirements and isinstance(
requirements, six.string_types
):
if requirements and isinstance(requirements, string_types):
entry["requirements"] = json.loads(requirements)
try:

View File

@@ -1,5 +1,6 @@
from sqlalchemy.exc import OperationalError, ProgrammingError
from CTFd.utils.exports.serializers import JSONSerializer
from sqlalchemy.exc import ProgrammingError, OperationalError
def freeze_export(result, fileobj):

View File

@@ -1,9 +1,10 @@
import json
import six
from collections import defaultdict, OrderedDict
from datetime import datetime, date
from collections import OrderedDict, defaultdict
from datetime import date, datetime
from decimal import Decimal
from CTFd.utils import string_types
class JSONEncoder(json.JSONEncoder):
def default(self, obj):
@@ -44,7 +45,7 @@ class JSONSerializer(object):
data = r.get("requirements")
if data:
try:
if isinstance(data, six.string_types):
if isinstance(data, string_types):
result["results"][i]["requirements"] = json.loads(data)
except ValueError:
pass

View File

@@ -41,9 +41,9 @@ from CTFd.utils.security.auth import login_user, logout_user, lookup_user_token
from CTFd.utils.security.csrf import generate_nonce
from CTFd.utils.user import (
authed,
get_current_team_attrs,
get_current_user_attrs,
get_current_user_recent_ips,
get_current_team_attrs,
get_ip,
is_admin,
)

View File

@@ -1,6 +1,5 @@
import hashlib
import hmac as _hmac
import six
from flask import current_app
from itsdangerous import Signer
@@ -46,10 +45,9 @@ def hmac(data, secret=None, digest=hashlib.sha1):
if secret is None:
secret = current_app.config["SECRET_KEY"]
if six.PY3:
if isinstance(data, string_types):
data = data.encode("utf-8")
if isinstance(secret, string_types):
secret = secret.encode("utf-8")
if isinstance(data, string_types):
data = data.encode("utf-8")
if isinstance(secret, string_types):
secret = secret.encode("utf-8")
return _hmac.new(key=secret, msg=data, digestmod=digest).hexdigest()

View File

@@ -1,6 +1,5 @@
from uuid import uuid4
import six
from flask.json.tag import TaggedJSONSerializer
from flask.sessions import SessionInterface, SessionMixin
from itsdangerous import BadSignature, want_bytes
@@ -73,7 +72,7 @@ class CachingSessionInterface(SessionInterface):
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent)
if not six.PY2 and not isinstance(sid, text_type):
if isinstance(sid, text_type) is False:
sid = sid.decode("utf-8", "strict")
val = cache.get(self.key_prefix + sid)
if val is not None:

View File

@@ -1,16 +1,17 @@
import datetime
import re
from flask import abort
from flask import current_app as app
from flask import abort, redirect, request, session, url_for
from flask import redirect, request, session, url_for
from CTFd.cache import cache
from CTFd.constants.users import UserAttrs
from CTFd.constants.teams import TeamAttrs
from CTFd.models import Fails, Users, db, Teams, Tracking
from CTFd.constants.users import UserAttrs
from CTFd.models import Fails, Teams, Tracking, Users, db
from CTFd.utils import get_config
from CTFd.utils.security.signing import hmac
from CTFd.utils.security.auth import logout_user
from CTFd.utils.security.signing import hmac
def get_current_user():

View File

@@ -1,8 +1,8 @@
import re
from urllib.parse import urljoin, urlparse
from flask import request
from marshmallow import ValidationError
from six.moves.urllib.parse import urljoin, urlparse
from CTFd.models import Users
from CTFd.utils.countries import lookup_country_code

View File

@@ -151,9 +151,7 @@ def setup():
<a href="admin">Click here</a> to login and setup your CTF
</h4>
</div>
</div>""".format(
request.script_root
)
</div>"""
page = Pages(title=None, route="index", content=index, draft=False)

View File

@@ -1,15 +1,16 @@
lint:
flake8 --ignore=E402,E501,E712,W503,E203,I002 --exclude=CTFd/uploads CTFd/ migrations/ tests/
flake8 --ignore=E402,E501,E712,W503,E203 --exclude=CTFd/uploads CTFd/ migrations/ tests/
black --check --exclude=CTFd/uploads --exclude=node_modules .
prettier --check 'CTFd/themes/**/assets/**/*'
format:
isort --skip=CTFd/uploads -rc CTFd/ tests/
black --exclude=CTFd/uploads --exclude=node_modules .
prettier --write 'CTFd/themes/**/assets/**/*'
test:
pytest -rf --cov=CTFd --cov-context=test --ignore=node_modules/ --disable-warnings -n auto
bandit -r CTFd -x CTFd/uploads
bandit -r CTFd -x CTFd/uploads --skip B105,B322
pipdeptree
yarn verify

View File

@@ -1,20 +1,20 @@
-r requirements.txt
pytest==4.4.0
pytest-randomly==1.2.3
coverage==5.0.3
mock==2.0.0
flake8==3.7.7
freezegun==0.3.11
pytest==5.4.2
pytest-randomly==3.4.0
coverage==5.1
flake8==3.8.2
freezegun==0.3.15
psycopg2==2.7.5
psycopg2-binary==2.7.5
codecov==2.0.15
moto==1.3.7
bandit==1.5.1
moto==1.3.14
bandit==1.6.2
flask_profiler==1.8.1
pytest-xdist==1.28.0
pytest-cov==2.8.1
pytest-xdist==1.32.0
pytest-cov==2.9.0
sphinx_rtd_theme==0.4.3
flask-debugtoolbar==0.10.1
flake8-isort==2.8.0
Faker==3.0.1
flask-debugtoolbar==0.11.0
flake8-isort==3.0.0
Faker==4.1.0
pipdeptree==0.13.2
black==19.3b0

View File

@@ -4,10 +4,10 @@ import os
import sys
import dataset
from six.moves import input, string_types
from sqlalchemy_utils import drop_database
from CTFd import config, create_app
from CTFd.utils import string_types
# This is important to allow access to the CTFd application factory
sys.path.append(os.getcwd())

View File

@@ -1,26 +1,23 @@
Flask==1.1.1
Werkzeug==0.16.0
Flask-SQLAlchemy==2.4.1
Flask-Caching==1.4.0
Flask==1.1.2
Werkzeug==1.0.1
Flask-SQLAlchemy==2.4.3
Flask-Caching==1.8.0
Flask-Migrate==2.5.3
Flask-Script==2.0.6
SQLAlchemy==1.3.11
SQLAlchemy-Utils==0.36.0
SQLAlchemy==1.3.17
SQLAlchemy-Utils==0.36.6
passlib==1.7.2
bcrypt==3.1.7
six==1.13.0
itsdangerous==1.1.0
requests>=2.20.0
requests==2.23.0
PyMySQL==0.9.3
gunicorn==19.10.0
dataset==1.1.2
gunicorn==20.0.4
dataset==1.3.1
cmarkgfm==0.4.2
netaddr==0.7.19
redis==3.3.11
gevent==1.4.0
python-dotenv==0.10.3
flask-restx==0.1.1
pathlib2==2.3.5
redis==3.5.2
gevent==20.5.2
python-dotenv==0.13.0
flask-restx==0.2.0
flask-marshmallow==0.10.1
marshmallow-sqlalchemy==0.17.0
boto3==1.13.9

View File

@@ -1,10 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from CTFd.cache import clear_user_session, clear_all_user_sessions
from CTFd.cache import clear_all_user_sessions, clear_user_session
from CTFd.models import Users
from CTFd.utils.user import is_admin, get_current_user
from CTFd.utils.security.auth import login_user
from CTFd.utils.user import get_current_user, is_admin
from tests.helpers import create_ctfd, destroy_ctfd, register_user

View File

@@ -1,4 +1,4 @@
from CTFd.constants import RawEnum, JSEnum, JinjaEnum
from CTFd.constants import JinjaEnum, JSEnum, RawEnum
from tests.helpers import create_ctfd, destroy_ctfd

View File

@@ -4,11 +4,10 @@ import random
import string
import uuid
from collections import namedtuple
from unittest.mock import Mock, patch
import requests
import six
from flask.testing import FlaskClient
from mock import Mock, patch
from sqlalchemy.engine.url import make_url
from sqlalchemy_utils import drop_database
from werkzeug.datastructures import Headers
@@ -36,12 +35,8 @@ from CTFd.models import (
Users,
)
if six.PY2:
text_type = unicode # noqa: F821
binary_type = str
else:
text_type = str
binary_type = bytes
text_type = str
binary_type = bytes
FakeRequest = namedtuple("FakeRequest", ["form"])

View File

@@ -162,6 +162,7 @@ def test_pages_routing_and_rendering():
with app.test_client() as client:
r = client.get("/test")
output = r.get_data(as_text=True)
print(output)
assert "<h2>The quick brown fox jumped over the lazy dog</h2>" in output
destroy_ctfd(app)

View File

@@ -1,9 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import six
from unittest.mock import patch
from freezegun import freeze_time
from mock import patch
from CTFd.models import Users, db
from CTFd.utils import get_config, set_config
@@ -306,10 +306,7 @@ def test_user_can_confirm_email(mock_smtp):
assert "Need to resend the confirmation email?" in r.get_data(as_text=True)
# smtp send message function was called
if six.PY2:
mock_smtp.return_value.sendmail.assert_called()
else:
mock_smtp.return_value.send_message.assert_called()
mock_smtp.return_value.send_message.assert_called()
with client.session_transaction() as sess:
data = {"nonce": sess.get("nonce")}
@@ -336,10 +333,7 @@ def test_user_can_confirm_email(mock_smtp):
@patch("smtplib.SMTP")
def test_user_can_reset_password(mock_smtp):
"""Test that a user is capable of resetting their password"""
from email.mime.text import MIMEText
if six.PY3:
from email.message import EmailMessage
from email.message import EmailMessage
app = create_ctfd()
with app.app_context(), freeze_time("2012-01-14 03:21:34"):
@@ -377,11 +371,8 @@ def test_user_can_reset_password(mock_smtp):
)
ctf_name = get_config("ctf_name")
if six.PY2:
email_msg = MIMEText(msg)
else:
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg["Subject"] = "Password Reset Request from {ctf_name}".format(
ctf_name=ctf_name
@@ -390,15 +381,10 @@ def test_user_can_reset_password(mock_smtp):
email_msg["To"] = to_addr
# Make sure that the reset password email is sent
if six.PY2:
mock_smtp.return_value.sendmail.assert_called_with(
from_addr, [to_addr], email_msg.as_string()
)
else:
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
# Get user's original password
user = Users.query.filter_by(email="user@user.com").first()

View File

@@ -208,7 +208,7 @@ def test_submitting_unicode_flag():
gen_flag(app.db, challenge_id=chal.id, content=u"你好")
with client.session_transaction():
data = {"submission": "你好", "challenge_id": chal.id}
r = client.post("/api/v1/challenges/attempt".format(chal.id), json=data)
r = client.post("/api/v1/challenges/attempt", json=data)
assert r.status_code == 200
resp = r.get_json()["data"]
assert resp.get("status") == "correct"
@@ -231,7 +231,7 @@ def test_challenges_with_max_attempts():
gen_flag(app.db, challenge_id=chal.id, content=u"flag")
for x in range(3):
data = {"submission": "notflag", "challenge_id": chal_id}
r = client.post("/api/v1/challenges/attempt".format(chal_id), json=data)
r = client.post("/api/v1/challenges/attempt", json=data)
wrong_keys = Fails.query.count()
assert wrong_keys == 3
@@ -307,7 +307,7 @@ def test_that_view_challenges_unregistered_works():
assert r.get_json().get("data") is not None
data = {"submission": "not_flag", "challenge_id": chal_id}
r = client.post("/api/v1/challenges/attempt".format(chal_id), json=data)
r = client.post("/api/v1/challenges/attempt", json=data)
assert r.status_code == 403
assert r.get_json().get("data").get("status") == "authentication_required"
assert r.get_json().get("data").get("message") is None

View File

@@ -247,10 +247,10 @@ def test_scoring_logic_with_zero_point_challenges():
with client2.session_transaction():
# solve chal1
data = {"submission": "flag", "challenge_id": chal1_id}
client2.post("/api/v1/challenges/attempt".format(chal1_id), json=data)
client2.post("/api/v1/challenges/attempt", json=data)
# solve chal2
data = {"submission": "flag", "challenge_id": chal2_id}
client2.post("/api/v1/challenges/attempt".format(chal2_id), json=data)
client2.post("/api/v1/challenges/attempt", json=data)
# user2 is now on top
scores = get_scores(admin)
@@ -260,7 +260,7 @@ def test_scoring_logic_with_zero_point_challenges():
with freeze_time("2017-10-5 03:50:34"):
with client1.session_transaction():
data = {"submission": "flag", "challenge_id": chal2_id}
client1.post("/api/v1/challenges/attempt".format(chal2_id), json=data)
client1.post("/api/v1/challenges/attempt", json=data)
# user2 should still be on top because they solved chal2 first
scores = get_scores(admin)
@@ -270,7 +270,7 @@ def test_scoring_logic_with_zero_point_challenges():
with freeze_time("2017-10-5 03:55:34"):
with client2.session_transaction():
data = {"submission": "flag", "challenge_id": chal0_id}
client2.post("/api/v1/challenges/attempt".format(chal0_id), json=data)
client2.post("/api/v1/challenges/attempt", json=data)
# user2 should still be on top because 0 point challenges should not tie break
scores = get_scores(admin)

View File

@@ -1,18 +1,14 @@
import six
from email.mime.text import MIMEText
if six.PY3:
from email.message import EmailMessage
from email.message import EmailMessage
from unittest.mock import Mock, patch
import requests
from freezegun import freeze_time
from mock import Mock, patch
from CTFd.utils import get_config, set_config
from CTFd.utils.email import (
sendmail,
verify_email_address,
successful_registration_notification,
verify_email_address,
)
from tests.helpers import create_ctfd, destroy_ctfd
@@ -38,25 +34,18 @@ def test_sendmail_with_smtp_from_config_file(mock_smtp):
sendmail(to_addr, msg)
ctf_name = get_config("ctf_name")
if six.PY2:
email_msg = MIMEText(msg)
else:
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg["Subject"] = "Message from {0}".format(ctf_name)
email_msg["From"] = from_addr
email_msg["To"] = to_addr
if six.PY2:
mock_smtp.return_value.sendmail.assert_called_with(
from_addr, [to_addr], email_msg.as_string()
)
else:
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
destroy_ctfd(app)
@@ -81,24 +70,16 @@ def test_sendmail_with_smtp_from_db_config(mock_smtp):
sendmail(to_addr, msg)
ctf_name = get_config("ctf_name")
if six.PY2:
email_msg = MIMEText(msg)
else:
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg["Subject"] = "Message from {0}".format(ctf_name)
email_msg["From"] = from_addr
email_msg["To"] = to_addr
if six.PY2:
mock_smtp.return_value.sendmail.assert_called_with(
from_addr, [to_addr], email_msg.as_string()
)
else:
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
destroy_ctfd(app)
@@ -207,26 +188,18 @@ def test_verify_email(mock_smtp):
)
ctf_name = get_config("ctf_name")
if six.PY2:
email_msg = MIMEText(msg)
else:
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg["Subject"] = "Confirm your account for {ctf_name}".format(
ctf_name=ctf_name
)
email_msg["From"] = from_addr
email_msg["To"] = to_addr
if six.PY2:
mock_smtp.return_value.sendmail.assert_called_with(
from_addr, [to_addr], email_msg.as_string()
)
else:
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
destroy_ctfd(app)
@@ -252,24 +225,16 @@ def test_successful_registration_email(mock_smtp):
msg = "You've successfully registered for CTFd!"
if six.PY2:
email_msg = MIMEText(msg)
else:
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg = EmailMessage()
email_msg.set_content(msg)
email_msg["Subject"] = "Successfully registered for {ctf_name}".format(
ctf_name=ctf_name
)
email_msg["From"] = from_addr
email_msg["To"] = to_addr
if six.PY2:
mock_smtp.return_value.sendmail.assert_called_with(
from_addr, [to_addr], email_msg.as_string()
)
else:
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
mock_smtp.return_value.send_message.assert_called()
assert str(mock_smtp.return_value.send_message.call_args[0][0]) == str(
email_msg
)
destroy_ctfd(app)

View File

@@ -2,8 +2,6 @@
import string
import six
from CTFd.utils.encoding import base64decode, base64encode, hexdecode, hexencode
@@ -27,50 +25,23 @@ def test_hexdecode():
def test_base64encode():
"""The base64encode wrapper works properly"""
if six.PY2:
assert base64encode("abc123") == "YWJjMTIz"
assert base64encode(unicode("abc123")) == "YWJjMTIz" # noqa: F821
assert (
base64encode(
unicode( # noqa: F821
'"test@mailinator.com".DGxeoA.lCssU3M2QuBfohO-FtdgDQLKbU4'
)
)
== "InRlc3RAbWFpbGluYXRvci5jb20iLkRHeGVvQS5sQ3NzVTNNMlF1QmZvaE8tRnRkZ0RRTEtiVTQ"
)
assert base64encode("user+user@ctfd.io") == "dXNlcit1c2VyQGN0ZmQuaW8"
assert base64encode("😆") == "8J-Yhg"
else:
assert base64encode("abc123") == "YWJjMTIz"
assert (
base64encode('"test@mailinator.com".DGxeoA.lCssU3M2QuBfohO-FtdgDQLKbU4')
== "InRlc3RAbWFpbGluYXRvci5jb20iLkRHeGVvQS5sQ3NzVTNNMlF1QmZvaE8tRnRkZ0RRTEtiVTQ"
)
assert base64encode("user+user@ctfd.io") == "dXNlcit1c2VyQGN0ZmQuaW8"
assert base64encode("😆") == "8J-Yhg"
assert base64encode("abc123") == "YWJjMTIz"
assert (
base64encode('"test@mailinator.com".DGxeoA.lCssU3M2QuBfohO-FtdgDQLKbU4')
== "InRlc3RAbWFpbGluYXRvci5jb20iLkRHeGVvQS5sQ3NzVTNNMlF1QmZvaE8tRnRkZ0RRTEtiVTQ"
)
assert base64encode("user+user@ctfd.io") == "dXNlcit1c2VyQGN0ZmQuaW8"
assert base64encode("😆") == "8J-Yhg"
def test_base64decode():
"""The base64decode wrapper works properly"""
if six.PY2:
assert base64decode("YWJjMTIz") == "abc123"
assert base64decode(unicode("YWJjMTIz")) == "abc123" # noqa: F821
assert (
base64decode(
unicode( # noqa: F821
"InRlc3RAbWFpbGluYXRvci5jb20iLkRHeGVvQS5sQ3NzVTNNMlF1QmZvaE8tRnRkZ0RRTEtiVTQ"
)
)
== '"test@mailinator.com".DGxeoA.lCssU3M2QuBfohO-FtdgDQLKbU4'
assert base64decode("YWJjMTIz") == "abc123"
assert (
base64decode(
"InRlc3RAbWFpbGluYXRvci5jb20iLkRHeGVvQS5sQ3NzVTNNMlF1QmZvaE8tRnRkZ0RRTEtiVTQ"
)
assert base64decode("8J-Yhg") == "😆"
else:
assert base64decode("YWJjMTIz") == "abc123"
assert (
base64decode(
"InRlc3RAbWFpbGluYXRvci5jb20iLkRHeGVvQS5sQ3NzVTNNMlF1QmZvaE8tRnRkZ0RRTEtiVTQ"
)
== '"test@mailinator.com".DGxeoA.lCssU3M2QuBfohO-FtdgDQLKbU4'
)
assert base64decode("dXNlcit1c2VyQGN0ZmQuaW8") == "user+user@ctfd.io"
assert base64decode("8J-Yhg") == "😆"
== '"test@mailinator.com".DGxeoA.lCssU3M2QuBfohO-FtdgDQLKbU4'
)
assert base64decode("dXNlcit1c2VyQGN0ZmQuaW8") == "user+user@ctfd.io"
assert base64decode("8J-Yhg") == "😆"

View File

@@ -1,10 +1,10 @@
import json
from collections import defaultdict
from queue import Queue
from unittest.mock import patch
import redis
from mock import patch
from redis.exceptions import ConnectionError
from six.moves.queue import Queue
from CTFd.config import TestingConfig
from CTFd.utils.events import EventManager, RedisEventManager, ServerSentEvent

View File

@@ -1,5 +1,6 @@
from unittest.mock import Mock, patch
import requests
from mock import Mock, patch
from CTFd.utils import get_config, set_config
from CTFd.utils.updates import update_check

View File

@@ -1,8 +1,8 @@
import os
from io import BytesIO
import boto3
from moto import mock_s3
from six import BytesIO
from CTFd.utils.uploads import S3Uploader, rmdir
from tests.helpers import create_ctfd, destroy_ctfd