mirror of
https://github.com/aljazceru/CTFd.git
synced 2025-12-17 14:04:20 +01:00
Fixing large ip solves (#307)
* Switching to using strings to store IP addresses because of the issues with storing them properly, Fixes #302, Fixes $306 * Only runs migrations when using a real database because SQlite is too inflexible to support migrations properly * Removes calls to the ip helper functions but they can remain in case someone needs them in a plugin.
This commit is contained in:
@@ -7,6 +7,7 @@ from jinja2 import FileSystemLoader
|
|||||||
from sqlalchemy.engine.url import make_url
|
from sqlalchemy.engine.url import make_url
|
||||||
from sqlalchemy.exc import OperationalError, ProgrammingError
|
from sqlalchemy.exc import OperationalError, ProgrammingError
|
||||||
from sqlalchemy_utils import database_exists, create_database
|
from sqlalchemy_utils import database_exists, create_database
|
||||||
|
from sqlalchemy_utils.functions import get_tables
|
||||||
from six.moves import input
|
from six.moves import input
|
||||||
|
|
||||||
from CTFd.utils import cache, migrate, migrate_upgrade, migrate_stamp
|
from CTFd.utils import cache, migrate, migrate_upgrade, migrate_stamp
|
||||||
@@ -74,13 +75,14 @@ def create_app(config='CTFd.config.Config'):
|
|||||||
# Register Flask-Migrate
|
# Register Flask-Migrate
|
||||||
migrate.init_app(app, db)
|
migrate.init_app(app, db)
|
||||||
|
|
||||||
# This creates tables instead of db.create_all()
|
|
||||||
# Allows migrations to happen properly
|
|
||||||
migrate_upgrade()
|
|
||||||
|
|
||||||
# Alembic sqlite support is lacking so we should just create_all anyway
|
# Alembic sqlite support is lacking so we should just create_all anyway
|
||||||
if url.drivername.startswith('sqlite'):
|
if url.drivername.startswith('sqlite'):
|
||||||
db.create_all()
|
db.create_all()
|
||||||
|
else:
|
||||||
|
if 'alembic_version' not in db.engine.table_names():
|
||||||
|
# This creates tables instead of db.create_all()
|
||||||
|
# Allows migrations to happen properly
|
||||||
|
migrate_upgrade()
|
||||||
|
|
||||||
app.db = db
|
app.db = db
|
||||||
|
|
||||||
|
|||||||
@@ -221,7 +221,7 @@ def admin_fails(teamid):
|
|||||||
@admin_teams.route('/admin/solves/<int:teamid>/<int:chalid>/solve', methods=['POST'])
|
@admin_teams.route('/admin/solves/<int:teamid>/<int:chalid>/solve', methods=['POST'])
|
||||||
@admins_only
|
@admins_only
|
||||||
def create_solve(teamid, chalid):
|
def create_solve(teamid, chalid):
|
||||||
solve = Solves(chalid=chalid, teamid=teamid, ip='127.0.0.1', flag='MARKED_AS_SOLVED_BY_ADMIN')
|
solve = Solves(teamid=teamid, chalid=chalid, ip='127.0.0.1', flag='MARKED_AS_SOLVED_BY_ADMIN')
|
||||||
db.session.add(solve)
|
db.session.add(solve)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
db.session.close()
|
db.session.close()
|
||||||
|
|||||||
@@ -262,7 +262,7 @@ def chal(chalid):
|
|||||||
# Anti-bruteforce / submitting keys too quickly
|
# Anti-bruteforce / submitting keys too quickly
|
||||||
if utils.get_kpm(session['id']) > 10:
|
if utils.get_kpm(session['id']) > 10:
|
||||||
if utils.ctftime():
|
if utils.ctftime():
|
||||||
wrong = WrongKeys(session['id'], chalid, request.form['key'])
|
wrong = WrongKeys(teamid=session['id'], chalid=chalid, ip=utils.get_ip(), flag=request.form['key'].strip())
|
||||||
db.session.add(wrong)
|
db.session.add(wrong)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
db.session.close()
|
db.session.close()
|
||||||
@@ -289,7 +289,7 @@ def chal(chalid):
|
|||||||
chal_class = get_chal_class(chal.type)
|
chal_class = get_chal_class(chal.type)
|
||||||
if chal_class.solve(chal, provided_key):
|
if chal_class.solve(chal, provided_key):
|
||||||
if utils.ctftime():
|
if utils.ctftime():
|
||||||
solve = Solves(chalid=chalid, teamid=session['id'], ip=utils.get_ip(), flag=provided_key)
|
solve = Solves(teamid=session['id'], chalid=chalid, ip=utils.get_ip(), flag=provided_key)
|
||||||
db.session.add(solve)
|
db.session.add(solve)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
db.session.close()
|
db.session.close()
|
||||||
@@ -297,7 +297,7 @@ def chal(chalid):
|
|||||||
return jsonify({'status': 1, 'message': 'Correct'})
|
return jsonify({'status': 1, 'message': 'Correct'})
|
||||||
|
|
||||||
if utils.ctftime():
|
if utils.ctftime():
|
||||||
wrong = WrongKeys(teamid=session['id'], chalid=chalid, flag=provided_key)
|
wrong = WrongKeys(teamid=session['id'], chalid=chalid, ip=utils.get_ip(), flag=provided_key)
|
||||||
db.session.add(wrong)
|
db.session.add(wrong)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
db.session.close()
|
db.session.close()
|
||||||
|
|||||||
@@ -225,39 +225,41 @@ class Solves(db.Model):
|
|||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
chalid = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
chalid = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
||||||
teamid = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
teamid = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
||||||
ip = db.Column(db.Integer)
|
ip = db.Column(db.String(46))
|
||||||
flag = db.Column(db.Text)
|
flag = db.Column(db.Text)
|
||||||
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
||||||
team = db.relationship('Teams', foreign_keys="Solves.teamid", lazy='joined')
|
team = db.relationship('Teams', foreign_keys="Solves.teamid", lazy='joined')
|
||||||
chal = db.relationship('Challenges', foreign_keys="Solves.chalid", lazy='joined')
|
chal = db.relationship('Challenges', foreign_keys="Solves.chalid", lazy='joined')
|
||||||
# value = db.Column(db.Integer)
|
# value = db.Column(db.Integer)
|
||||||
|
|
||||||
def __init__(self, chalid, teamid, ip, flag):
|
def __init__(self, teamid, chalid, ip, flag):
|
||||||
self.ip = ip2long(ip)
|
self.ip = ip
|
||||||
self.chalid = chalid
|
self.chalid = chalid
|
||||||
self.teamid = teamid
|
self.teamid = teamid
|
||||||
self.flag = flag
|
self.flag = flag
|
||||||
# self.value = value
|
# self.value = value
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<solves %r>' % self.chal
|
return '<solve {}, {}, {}, {}>'.format(self.teamid, self.chalid, self.ip, self.flag)
|
||||||
|
|
||||||
|
|
||||||
class WrongKeys(db.Model):
|
class WrongKeys(db.Model):
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
chalid = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
chalid = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
||||||
teamid = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
teamid = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
||||||
|
ip = db.Column(db.String(46))
|
||||||
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
||||||
flag = db.Column(db.Text)
|
flag = db.Column(db.Text)
|
||||||
chal = db.relationship('Challenges', foreign_keys="WrongKeys.chalid", lazy='joined')
|
chal = db.relationship('Challenges', foreign_keys="WrongKeys.chalid", lazy='joined')
|
||||||
|
|
||||||
def __init__(self, teamid, chalid, flag):
|
def __init__(self, teamid, chalid, ip, flag):
|
||||||
|
self.ip = ip
|
||||||
self.teamid = teamid
|
self.teamid = teamid
|
||||||
self.chalid = chalid
|
self.chalid = chalid
|
||||||
self.flag = flag
|
self.flag = flag
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<wrong %r>' % self.flag
|
return '<wrong {}, {}, {}, {}>'.format(self.teamid, self.chalid, self.ip, self.flag)
|
||||||
|
|
||||||
|
|
||||||
class Unlocks(db.Model):
|
class Unlocks(db.Model):
|
||||||
@@ -278,12 +280,12 @@ class Unlocks(db.Model):
|
|||||||
|
|
||||||
class Tracking(db.Model):
|
class Tracking(db.Model):
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
ip = db.Column(db.BigInteger)
|
ip = db.Column(db.String(46))
|
||||||
team = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
team = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
||||||
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
||||||
|
|
||||||
def __init__(self, ip, team):
|
def __init__(self, ip, team):
|
||||||
self.ip = ip2long(ip)
|
self.ip = ip
|
||||||
self.team = team
|
self.team = team
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
|||||||
@@ -104,7 +104,7 @@
|
|||||||
<tbody>
|
<tbody>
|
||||||
{% for addr in addrs %}
|
{% for addr in addrs %}
|
||||||
<tr>
|
<tr>
|
||||||
<td class="text-center">{{ addr[0]|long2ip }}</td>
|
<td class="text-center">{{ addr[0] }}</td>
|
||||||
<td class="text-center solve-time"><script>document.write( moment({{ addr[1]|unix_time_millis }}).local().format('MMMM Do, h:mm:ss A'))</script></td>
|
<td class="text-center solve-time"><script>document.write( moment({{ addr[1]|unix_time_millis }}).local().format('MMMM Do, h:mm:ss A'))</script></td>
|
||||||
</tr>
|
</tr>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ def init_utils(app):
|
|||||||
@app.before_request
|
@app.before_request
|
||||||
def tracker():
|
def tracker():
|
||||||
if authed():
|
if authed():
|
||||||
track = Tracking.query.filter_by(ip=ip2long(get_ip()), team=session['id']).first()
|
track = Tracking.query.filter_by(ip=get_ip(), team=session['id']).first()
|
||||||
if not track:
|
if not track:
|
||||||
visit = Tracking(ip=get_ip(), team=session['id'])
|
visit = Tracking(ip=get_ip(), team=session['id'])
|
||||||
db.session.add(visit)
|
db.session.add(visit)
|
||||||
|
|||||||
@@ -0,0 +1,94 @@
|
|||||||
|
"""Change Solves IP to String and add IP to WrongKeys
|
||||||
|
|
||||||
|
Revision ID: 1ec4a28fe0ff
|
||||||
|
Revises: c7225db614c1
|
||||||
|
Create Date: 2017-07-08 17:08:59.098805
|
||||||
|
|
||||||
|
"""
|
||||||
|
from CTFd.models import db, Solves, WrongKeys
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.sql import text, table, column
|
||||||
|
import netaddr
|
||||||
|
|
||||||
|
def ip2long(ip):
|
||||||
|
'''Converts a user's IP address into an integer/long'''
|
||||||
|
return int(netaddr.IPAddress(ip))
|
||||||
|
|
||||||
|
|
||||||
|
def long2ip(ip_int):
|
||||||
|
'''Converts a saved integer/long back into an IP address'''
|
||||||
|
return str(netaddr.IPAddress(ip_int))
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '1ec4a28fe0ff'
|
||||||
|
down_revision = 'c7225db614c1'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
solves_table = table('solves',
|
||||||
|
column('id', db.Integer),
|
||||||
|
column('ip', db.Integer),
|
||||||
|
)
|
||||||
|
|
||||||
|
tracking_table = table('tracking',
|
||||||
|
column('id', db.Integer),
|
||||||
|
column('ip', db.String(46)),
|
||||||
|
column('team', db.Integer),
|
||||||
|
column('date', db.DateTime),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
connection = op.get_bind()
|
||||||
|
|
||||||
|
op.alter_column('solves', 'ip', existing_type=sa.Integer(), type_=sa.String(length=46))
|
||||||
|
for solve in connection.execute(solves_table.select()):
|
||||||
|
connection.execute(
|
||||||
|
solves_table.update().where(
|
||||||
|
solves_table.c.id == solve.id
|
||||||
|
).values(
|
||||||
|
ip=long2ip(solve.ip),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
op.alter_column('tracking', 'ip', existing_type=sa.Integer(), type_=sa.String(length=46))
|
||||||
|
for track in connection.execute(tracking_table.select()):
|
||||||
|
connection.execute(
|
||||||
|
tracking_table.update().where(
|
||||||
|
tracking_table.c.id == track.id
|
||||||
|
).values(
|
||||||
|
ip=long2ip(track.ip),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
op.add_column('wrong_keys', sa.Column('ip', sa.String(length=46), nullable=True))
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
connection = op.get_bind()
|
||||||
|
for solve in connection.execute(solves_table.select()):
|
||||||
|
connection.execute(
|
||||||
|
solves_table.update().where(
|
||||||
|
solves_table.c.id == solve.id
|
||||||
|
).values(
|
||||||
|
ip=ip2long(solve.ip),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
for track in connection.execute(tracking_table.select()):
|
||||||
|
connection.execute(
|
||||||
|
tracking_table.update().where(
|
||||||
|
tracking_table.c.id == track.id
|
||||||
|
).values(
|
||||||
|
ip=ip2long(track.ip),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
op.alter_column('solves', 'ip', existing_type=sa.String(length=46), type_=sa.Integer())
|
||||||
|
op.alter_column('tracking', 'ip', existing_type=sa.String(length=46), type_=sa.Integer())
|
||||||
|
op.drop_column('wrong_keys', 'ip')
|
||||||
|
# ### end Alembic commands ###
|
||||||
@@ -260,7 +260,7 @@ if __name__ == '__main__':
|
|||||||
chalid = random.randint(1, CHAL_AMOUNT)
|
chalid = random.randint(1, CHAL_AMOUNT)
|
||||||
if chalid not in used:
|
if chalid not in used:
|
||||||
used.append(chalid)
|
used.append(chalid)
|
||||||
solve = Solves(chalid, x + 1, '127.0.0.1', gen_word())
|
solve = Solves(x + 1, chalid, '127.0.0.1', gen_word())
|
||||||
|
|
||||||
new_base = random_date(base_time, base_time + datetime.timedelta(minutes=random.randint(30, 60)))
|
new_base = random_date(base_time, base_time + datetime.timedelta(minutes=random.randint(30, 60)))
|
||||||
solve.date = new_base
|
solve.date = new_base
|
||||||
@@ -293,7 +293,7 @@ if __name__ == '__main__':
|
|||||||
chalid = random.randint(1, CHAL_AMOUNT)
|
chalid = random.randint(1, CHAL_AMOUNT)
|
||||||
if chalid not in used:
|
if chalid not in used:
|
||||||
used.append(chalid)
|
used.append(chalid)
|
||||||
wrong = WrongKeys(x + 1, chalid, gen_word())
|
wrong = WrongKeys(x + 1, chalid, '127.0.0.1', gen_word())
|
||||||
|
|
||||||
new_base = random_date(base_time, base_time + datetime.timedelta(minutes=random.randint(30, 60)))
|
new_base = random_date(base_time, base_time + datetime.timedelta(minutes=random.randint(30, 60)))
|
||||||
wrong.date = new_base
|
wrong.date = new_base
|
||||||
|
|||||||
@@ -1,16 +1,16 @@
|
|||||||
Flask==0.12
|
Flask==0.12.2
|
||||||
Flask-SQLAlchemy==2.2
|
Flask-SQLAlchemy==2.2
|
||||||
Flask-Session==0.3.1
|
Flask-Session==0.3.1
|
||||||
Flask-Caching==1.2.0
|
Flask-Caching==1.2.0
|
||||||
Flask-Migrate==2.0.3
|
Flask-Migrate==2.0.4
|
||||||
SQLAlchemy==1.1.6
|
SQLAlchemy==1.1.11
|
||||||
SQLAlchemy-Utils==0.32.12
|
SQLAlchemy-Utils==0.32.14
|
||||||
passlib==1.7.1
|
passlib==1.7.1
|
||||||
bcrypt==3.1.3
|
bcrypt==3.1.3
|
||||||
six==1.10.0
|
six==1.10.0
|
||||||
itsdangerous==0.24
|
itsdangerous==0.24
|
||||||
requests==2.13.0
|
requests==2.18.1
|
||||||
PyMySQL==0.7.10
|
PyMySQL==0.7.11
|
||||||
gunicorn==19.7.0
|
gunicorn==19.7.0
|
||||||
dataset==0.8.0
|
dataset==0.8.0
|
||||||
mistune==0.7.4
|
mistune==0.7.4
|
||||||
|
|||||||
@@ -7,16 +7,6 @@ from sqlalchemy.engine.url import make_url
|
|||||||
def create_ctfd(ctf_name="CTFd", name="admin", email="admin@ctfd.io", password="password", setup=True):
|
def create_ctfd(ctf_name="CTFd", name="admin", email="admin@ctfd.io", password="password", setup=True):
|
||||||
app = create_app('CTFd.config.TestingConfig')
|
app = create_app('CTFd.config.TestingConfig')
|
||||||
|
|
||||||
url = make_url(app.config['SQLALCHEMY_DATABASE_URI'])
|
|
||||||
if url.drivername == 'postgres':
|
|
||||||
url.drivername = 'postgresql'
|
|
||||||
|
|
||||||
if database_exists(url):
|
|
||||||
drop_database(url)
|
|
||||||
create_database(url)
|
|
||||||
with app.app_context():
|
|
||||||
app.db.create_all()
|
|
||||||
|
|
||||||
if setup:
|
if setup:
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
with app.test_client() as client:
|
with app.test_client() as client:
|
||||||
@@ -34,6 +24,10 @@ def create_ctfd(ctf_name="CTFd", name="admin", email="admin@ctfd.io", password="
|
|||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
def destroy_ctfd(app):
|
||||||
|
drop_database(app.config['SQLALCHEMY_DATABASE_URI'])
|
||||||
|
|
||||||
|
|
||||||
def register_user(app, name="user", email="user@ctfd.io", password="password"):
|
def register_user(app, name="user", email="user@ctfd.io", password="password"):
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
with app.test_client() as client:
|
with app.test_client() as client:
|
||||||
@@ -101,15 +95,15 @@ def gen_team(db, name='name', email='user@ctfd.io', password='password'):
|
|||||||
return team
|
return team
|
||||||
|
|
||||||
|
|
||||||
def gen_solve(db, chalid, teamid, ip='127.0.0.1', flag='rightkey'):
|
def gen_solve(db, teamid, chalid, ip='127.0.0.1', flag='rightkey'):
|
||||||
solve = Solves(chalid, teamid, ip, flag)
|
solve = Solves(teamid, chalid, ip, flag)
|
||||||
db.session.add(solve)
|
db.session.add(solve)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return solve
|
return solve
|
||||||
|
|
||||||
|
|
||||||
def gen_wrongkey(db, teamid, chalid, flag='wrongkey'):
|
def gen_wrongkey(db, teamid, chalid, ip='127.0.0.1', flag='wrongkey'):
|
||||||
wrongkey = WrongKeys(teamid, chalid, flag)
|
wrongkey = WrongKeys(teamid, chalid, ip, flag)
|
||||||
db.session.add(wrongkey)
|
db.session.add(wrongkey)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return wrongkey
|
return wrongkey
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from tests.helpers import create_ctfd, register_user, login_as_user
|
from tests.helpers import *
|
||||||
from CTFd.models import Teams
|
from CTFd.models import Teams
|
||||||
|
|
||||||
|
|
||||||
@@ -11,6 +11,7 @@ def test_admin_panel():
|
|||||||
assert r.status_code == 302
|
assert r.status_code == 302
|
||||||
r = client.get('/admin/graphs')
|
r = client.get('/admin/graphs')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_pages():
|
def test_admin_pages():
|
||||||
@@ -20,6 +21,7 @@ def test_admin_pages():
|
|||||||
client = login_as_user(app, name="admin", password="password")
|
client = login_as_user(app, name="admin", password="password")
|
||||||
r = client.get('/admin/pages')
|
r = client.get('/admin/pages')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_teams():
|
def test_admin_teams():
|
||||||
@@ -29,6 +31,7 @@ def test_admin_teams():
|
|||||||
client = login_as_user(app, name="admin", password="password")
|
client = login_as_user(app, name="admin", password="password")
|
||||||
r = client.get('/admin/teams')
|
r = client.get('/admin/teams')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_scoreboard():
|
def test_admin_scoreboard():
|
||||||
@@ -38,6 +41,7 @@ def test_admin_scoreboard():
|
|||||||
client = login_as_user(app, name="admin", password="password")
|
client = login_as_user(app, name="admin", password="password")
|
||||||
r = client.get('/admin/scoreboard')
|
r = client.get('/admin/scoreboard')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_containers():
|
def test_admin_containers():
|
||||||
@@ -47,6 +51,7 @@ def test_admin_containers():
|
|||||||
client = login_as_user(app, name="admin", password="password")
|
client = login_as_user(app, name="admin", password="password")
|
||||||
r = client.get('/admin/containers')
|
r = client.get('/admin/containers')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_chals():
|
def test_admin_chals():
|
||||||
@@ -56,6 +61,7 @@ def test_admin_chals():
|
|||||||
client = login_as_user(app, name="admin", password="password")
|
client = login_as_user(app, name="admin", password="password")
|
||||||
r = client.get('/admin/chals')
|
r = client.get('/admin/chals')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_statistics():
|
def test_admin_statistics():
|
||||||
@@ -65,6 +71,7 @@ def test_admin_statistics():
|
|||||||
client = login_as_user(app, name="admin", password="password")
|
client = login_as_user(app, name="admin", password="password")
|
||||||
r = client.get('/admin/statistics')
|
r = client.get('/admin/statistics')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_config():
|
def test_admin_config():
|
||||||
@@ -74,3 +81,4 @@ def test_admin_config():
|
|||||||
client = login_as_user(app, name="admin", password="password")
|
client = login_as_user(app, name="admin", password="password")
|
||||||
r = client.get('/admin/config')
|
r = client.get('/admin/config')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|||||||
@@ -2,8 +2,9 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
from tests.helpers import *
|
from tests.helpers import *
|
||||||
from CTFd.models import Teams
|
from CTFd.models import Teams, Solves, WrongKeys
|
||||||
import json
|
import json
|
||||||
|
from CTFd import utils
|
||||||
|
|
||||||
|
|
||||||
def test_index():
|
def test_index():
|
||||||
@@ -13,6 +14,7 @@ def test_index():
|
|||||||
with app.test_client() as client:
|
with app.test_client() as client:
|
||||||
r = client.get('/')
|
r = client.get('/')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_register_user():
|
def test_register_user():
|
||||||
@@ -22,6 +24,7 @@ def test_register_user():
|
|||||||
register_user(app)
|
register_user(app)
|
||||||
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
|
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
|
||||||
assert team_count == 2 # There's the admin user and the created user
|
assert team_count == 2 # There's the admin user and the created user
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_register_duplicate_teamname():
|
def test_register_duplicate_teamname():
|
||||||
@@ -32,6 +35,7 @@ def test_register_duplicate_teamname():
|
|||||||
register_user(app, name="user1", email="user2@ctfd.io", password="password")
|
register_user(app, name="user1", email="user2@ctfd.io", password="password")
|
||||||
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
|
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
|
||||||
assert team_count == 2 # There's the admin user and the first created user
|
assert team_count == 2 # There's the admin user and the first created user
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_register_duplicate_email():
|
def test_register_duplicate_email():
|
||||||
@@ -42,6 +46,7 @@ def test_register_duplicate_email():
|
|||||||
register_user(app, name="user2", email="user1@ctfd.io", password="password")
|
register_user(app, name="user2", email="user1@ctfd.io", password="password")
|
||||||
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
|
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
|
||||||
assert team_count == 2 # There's the admin user and the first created user
|
assert team_count == 2 # There's the admin user and the first created user
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_bad_login():
|
def test_user_bad_login():
|
||||||
@@ -52,6 +57,7 @@ def test_user_bad_login():
|
|||||||
client = login_as_user(app, name="user", password="wrong_password")
|
client = login_as_user(app, name="user", password="wrong_password")
|
||||||
r = client.get('/profile')
|
r = client.get('/profile')
|
||||||
assert r.location.startswith("http://localhost/login") # We got redirected to login
|
assert r.location.startswith("http://localhost/login") # We got redirected to login
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_login():
|
def test_user_login():
|
||||||
@@ -63,6 +69,7 @@ def test_user_login():
|
|||||||
r = client.get('/profile')
|
r = client.get('/profile')
|
||||||
assert r.location != "http://localhost/login" # We didn't get redirected to login
|
assert r.location != "http://localhost/login" # We didn't get redirected to login
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_isnt_admin():
|
def test_user_isnt_admin():
|
||||||
@@ -74,6 +81,7 @@ def test_user_isnt_admin():
|
|||||||
r = client.get('/admin/graphs')
|
r = client.get('/admin/graphs')
|
||||||
assert r.location == "http://localhost/login"
|
assert r.location == "http://localhost/login"
|
||||||
assert r.status_code == 302
|
assert r.status_code == 302
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_teams():
|
def test_user_get_teams():
|
||||||
@@ -84,6 +92,7 @@ def test_user_get_teams():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/teams')
|
r = client.get('/teams')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_scoreboard():
|
def test_user_get_scoreboard():
|
||||||
@@ -94,6 +103,7 @@ def test_user_get_scoreboard():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/scoreboard')
|
r = client.get('/scoreboard')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_scores():
|
def test_user_get_scores():
|
||||||
@@ -104,6 +114,7 @@ def test_user_get_scores():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/scores')
|
r = client.get('/scores')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_topteams():
|
def test_user_get_topteams():
|
||||||
@@ -114,6 +125,7 @@ def test_user_get_topteams():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/top/10')
|
r = client.get('/top/10')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_challenges():
|
def test_user_get_challenges():
|
||||||
@@ -124,6 +136,7 @@ def test_user_get_challenges():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/challenges')
|
r = client.get('/challenges')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_chals():
|
def test_user_get_chals():
|
||||||
@@ -134,6 +147,7 @@ def test_user_get_chals():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/chals')
|
r = client.get('/chals')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_solves_per_chal():
|
def test_user_get_solves_per_chal():
|
||||||
@@ -144,6 +158,7 @@ def test_user_get_solves_per_chal():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/chals/solves')
|
r = client.get('/chals/solves')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_solves():
|
def test_user_get_solves():
|
||||||
@@ -154,6 +169,7 @@ def test_user_get_solves():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/solves')
|
r = client.get('/solves')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_team_page():
|
def test_user_get_team_page():
|
||||||
@@ -164,6 +180,7 @@ def test_user_get_team_page():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/team/2')
|
r = client.get('/team/2')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_profile():
|
def test_user_get_profile():
|
||||||
@@ -174,6 +191,7 @@ def test_user_get_profile():
|
|||||||
client = login_as_user(app)
|
client = login_as_user(app)
|
||||||
r = client.get('/profile')
|
r = client.get('/profile')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_logout():
|
def test_user_get_logout():
|
||||||
@@ -186,6 +204,7 @@ def test_user_get_logout():
|
|||||||
r = client.get('/challenges')
|
r = client.get('/challenges')
|
||||||
assert r.location == "http://localhost/login?next=challenges"
|
assert r.location == "http://localhost/login?next=challenges"
|
||||||
assert r.status_code == 302
|
assert r.status_code == 302
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_user_get_reset_password():
|
def test_user_get_reset_password():
|
||||||
@@ -196,6 +215,7 @@ def test_user_get_reset_password():
|
|||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
r = client.get('/reset_password')
|
r = client.get('/reset_password')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_viewing_challenges():
|
def test_viewing_challenges():
|
||||||
@@ -208,6 +228,7 @@ def test_viewing_challenges():
|
|||||||
r = client.get('/chals')
|
r = client.get('/chals')
|
||||||
chals = json.loads(r.get_data(as_text=True))
|
chals = json.loads(r.get_data(as_text=True))
|
||||||
assert len(chals['game']) == 1
|
assert len(chals['game']) == 1
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_submitting_correct_flag():
|
def test_submitting_correct_flag():
|
||||||
@@ -227,6 +248,7 @@ def test_submitting_correct_flag():
|
|||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
resp = json.loads(r.data.decode('utf8'))
|
resp = json.loads(r.data.decode('utf8'))
|
||||||
assert resp.get('status') == 1 and resp.get('message') == "Correct"
|
assert resp.get('status') == 1 and resp.get('message') == "Correct"
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_submitting_incorrect_flag():
|
def test_submitting_incorrect_flag():
|
||||||
@@ -246,6 +268,7 @@ def test_submitting_incorrect_flag():
|
|||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
resp = json.loads(r.data.decode('utf8'))
|
resp = json.loads(r.data.decode('utf8'))
|
||||||
assert resp.get('status') == 0 and resp.get('message') == "Incorrect"
|
assert resp.get('status') == 0 and resp.get('message') == "Incorrect"
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_submitting_unicode_flag():
|
def test_submitting_unicode_flag():
|
||||||
@@ -265,6 +288,50 @@ def test_submitting_unicode_flag():
|
|||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
resp = json.loads(r.data.decode('utf8'))
|
resp = json.loads(r.data.decode('utf8'))
|
||||||
assert resp.get('status') == 1 and resp.get('message') == "Correct"
|
assert resp.get('status') == 1 and resp.get('message') == "Correct"
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
|
def test_submitting_flags_with_large_ips():
|
||||||
|
app = create_ctfd()
|
||||||
|
with app.app_context():
|
||||||
|
register_user(app)
|
||||||
|
client = login_as_user(app)
|
||||||
|
|
||||||
|
# SQLite doesn't support BigInteger well so we can't test it properly
|
||||||
|
ip_addresses = ['172.18.0.1', '255.255.255.255', '2001:0db8:85a3:0000:0000:8a2e:0370:7334']
|
||||||
|
for ip_address in ip_addresses:
|
||||||
|
# Monkeypatch get_ip
|
||||||
|
utils.get_ip = lambda: ip_address
|
||||||
|
|
||||||
|
# Generate challenge and flag
|
||||||
|
chal = gen_challenge(app.db)
|
||||||
|
chal_id = chal.id
|
||||||
|
flag = gen_flag(app.db, chal=chal.id, flag=u'correct_key')
|
||||||
|
|
||||||
|
# Submit wrong_key
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
data = {
|
||||||
|
"key": 'wrong_key',
|
||||||
|
"nonce": sess.get('nonce')
|
||||||
|
}
|
||||||
|
r = client.post('/chal/{}'.format(chal_id), data=data)
|
||||||
|
assert r.status_code == 200
|
||||||
|
resp = json.loads(r.data.decode('utf8'))
|
||||||
|
assert resp.get('status') == 0 and resp.get('message') == "Incorrect"
|
||||||
|
assert WrongKeys.query.filter_by(ip=ip_address).first()
|
||||||
|
|
||||||
|
# Submit correct key
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
data = {
|
||||||
|
"key": 'correct_key',
|
||||||
|
"nonce": sess.get('nonce')
|
||||||
|
}
|
||||||
|
r = client.post('/chal/{}'.format(chal_id), data=data)
|
||||||
|
assert r.status_code == 200
|
||||||
|
resp = json.loads(r.data.decode('utf8'))
|
||||||
|
assert resp.get('status') == 1 and resp.get('message') == "Correct"
|
||||||
|
assert Solves.query.filter_by(ip=ip_address).first()
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_pages_routing_and_rendering():
|
def test_pages_routing_and_rendering():
|
||||||
@@ -279,6 +346,7 @@ def test_pages_routing_and_rendering():
|
|||||||
r = client.get('/test')
|
r = client.get('/test')
|
||||||
output = r.get_data(as_text=True)
|
output = r.get_data(as_text=True)
|
||||||
assert "<h2>The quick brown fox jumped over the lazy dog</h2>" in output
|
assert "<h2>The quick brown fox jumped over the lazy dog</h2>" in output
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_themes_handler():
|
def test_themes_handler():
|
||||||
@@ -298,6 +366,7 @@ def test_themes_handler():
|
|||||||
assert r.status_code == 404
|
assert r.status_code == 404
|
||||||
r = client.get('/themes/original/static/../../../utils.py')
|
r = client.get('/themes/original/static/../../../utils.py')
|
||||||
assert r.status_code == 404
|
assert r.status_code == 404
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_ctfd_setup_redirect():
|
def test_ctfd_setup_redirect():
|
||||||
@@ -312,3 +381,4 @@ def test_ctfd_setup_redirect():
|
|||||||
# Files in /themes load properly
|
# Files in /themes load properly
|
||||||
r = client.get('/themes/original/static/css/style.css')
|
r = client.get('/themes/original/static/css/style.css')
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ def test_get_config_and_set_config():
|
|||||||
config = set_config('TEST_CONFIG_ENTRY', 'test_config_entry')
|
config = set_config('TEST_CONFIG_ENTRY', 'test_config_entry')
|
||||||
assert config.value == 'test_config_entry'
|
assert config.value == 'test_config_entry'
|
||||||
assert get_config('TEST_CONFIG_ENTRY') == 'test_config_entry'
|
assert get_config('TEST_CONFIG_ENTRY') == 'test_config_entry'
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_ip2long_ipv4():
|
def test_ip2long_ipv4():
|
||||||
@@ -50,6 +51,7 @@ def test_override_template():
|
|||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
output = r.get_data(as_text=True)
|
output = r.get_data(as_text=True)
|
||||||
assert 'LOGIN OVERRIDE' in output
|
assert 'LOGIN OVERRIDE' in output
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_admin_override_template():
|
def test_admin_override_template():
|
||||||
@@ -63,6 +65,7 @@ def test_admin_override_template():
|
|||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
output = r.get_data(as_text=True)
|
output = r.get_data(as_text=True)
|
||||||
assert 'ADMIN TEAM OVERRIDE' in output
|
assert 'ADMIN TEAM OVERRIDE' in output
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
@patch('smtplib.SMTP')
|
@patch('smtplib.SMTP')
|
||||||
@@ -89,3 +92,4 @@ def test_sendmail_with_smtp(mock_smtp):
|
|||||||
email_msg['To'] = to_addr
|
email_msg['To'] = to_addr
|
||||||
|
|
||||||
mock_smtp.return_value.sendmail.assert_called_once_with(from_addr, [to_addr], email_msg.as_string())
|
mock_smtp.return_value.sendmail.assert_called_once_with(from_addr, [to_addr], email_msg.as_string())
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|||||||
Reference in New Issue
Block a user