mirror of
https://github.com/aljazceru/CTFd.git
synced 2025-12-17 05:54:19 +01:00
Adds ondelete='CASCADE' to some models. (#979)
* Fixes `populate.py` to assign captains to teams.
* Adds `ondelete='CASCADE'` to most ForeignKeys in models
* Closes #794
* Test reset in team mode to test removing teams with captains
* Test deleting users/teams with awards to test cascading deletion
* `gen_team()` test helper now creates users for the team and assigns the first one as captain
* Added `Challenges.flags` relationship and moved the `Flags.challenge` relationship to a backref on `Challenges`
This commit is contained in:
@@ -76,6 +76,7 @@ class Challenges(db.Model):
|
|||||||
files = db.relationship("ChallengeFiles", backref="challenge")
|
files = db.relationship("ChallengeFiles", backref="challenge")
|
||||||
tags = db.relationship("Tags", backref="challenge")
|
tags = db.relationship("Tags", backref="challenge")
|
||||||
hints = db.relationship("Hints", backref="challenge")
|
hints = db.relationship("Hints", backref="challenge")
|
||||||
|
flags = db.relationship("Flags", backref="challenge")
|
||||||
|
|
||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
'polymorphic_identity': 'standard',
|
'polymorphic_identity': 'standard',
|
||||||
@@ -93,7 +94,7 @@ class Hints(db.Model):
|
|||||||
__tablename__ = 'hints'
|
__tablename__ = 'hints'
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
type = db.Column(db.String(80), default='standard')
|
type = db.Column(db.String(80), default='standard')
|
||||||
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id', ondelete='CASCADE'))
|
||||||
content = db.Column(db.Text)
|
content = db.Column(db.Text)
|
||||||
cost = db.Column(db.Integer, default=0)
|
cost = db.Column(db.Integer, default=0)
|
||||||
requirements = db.Column(db.JSON)
|
requirements = db.Column(db.JSON)
|
||||||
@@ -125,8 +126,8 @@ class Hints(db.Model):
|
|||||||
class Awards(db.Model):
|
class Awards(db.Model):
|
||||||
__tablename__ = 'awards'
|
__tablename__ = 'awards'
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'))
|
||||||
team_id = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
team_id = db.Column(db.Integer, db.ForeignKey('teams.id', ondelete='CASCADE'))
|
||||||
type = db.Column(db.String(80), default='standard')
|
type = db.Column(db.String(80), default='standard')
|
||||||
name = db.Column(db.String(80))
|
name = db.Column(db.String(80))
|
||||||
description = db.Column(db.Text)
|
description = db.Column(db.Text)
|
||||||
@@ -162,7 +163,7 @@ class Awards(db.Model):
|
|||||||
class Tags(db.Model):
|
class Tags(db.Model):
|
||||||
__tablename__ = 'tags'
|
__tablename__ = 'tags'
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id', ondelete='CASCADE'))
|
||||||
value = db.Column(db.String(80))
|
value = db.Column(db.String(80))
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
@@ -191,7 +192,7 @@ class ChallengeFiles(Files):
|
|||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
'polymorphic_identity': 'challenge'
|
'polymorphic_identity': 'challenge'
|
||||||
}
|
}
|
||||||
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id', ondelete='CASCADE'))
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(ChallengeFiles, self).__init__(**kwargs)
|
super(ChallengeFiles, self).__init__(**kwargs)
|
||||||
@@ -210,13 +211,11 @@ class PageFiles(Files):
|
|||||||
class Flags(db.Model):
|
class Flags(db.Model):
|
||||||
__tablename__ = 'flags'
|
__tablename__ = 'flags'
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id'))
|
challenge_id = db.Column(db.Integer, db.ForeignKey('challenges.id', ondelete='CASCADE'))
|
||||||
type = db.Column(db.String(80))
|
type = db.Column(db.String(80))
|
||||||
content = db.Column(db.Text)
|
content = db.Column(db.Text)
|
||||||
data = db.Column(db.Text)
|
data = db.Column(db.Text)
|
||||||
|
|
||||||
challenge = db.relationship('Challenges', foreign_keys="Flags.challenge_id", lazy='select')
|
|
||||||
|
|
||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
'polymorphic_on': type
|
'polymorphic_on': type
|
||||||
}
|
}
|
||||||
@@ -454,7 +453,7 @@ class Teams(db.Model):
|
|||||||
banned = db.Column(db.Boolean, default=False)
|
banned = db.Column(db.Boolean, default=False)
|
||||||
|
|
||||||
# Relationship for Users
|
# Relationship for Users
|
||||||
captain_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
captain_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='SET NULL'))
|
||||||
captain = db.relationship("Users", foreign_keys=[captain_id])
|
captain = db.relationship("Users", foreign_keys=[captain_id])
|
||||||
|
|
||||||
created = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
created = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
||||||
@@ -682,8 +681,8 @@ class Fails(Submissions):
|
|||||||
class Unlocks(db.Model):
|
class Unlocks(db.Model):
|
||||||
__tablename__ = 'unlocks'
|
__tablename__ = 'unlocks'
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'))
|
||||||
team_id = db.Column(db.Integer, db.ForeignKey('teams.id'))
|
team_id = db.Column(db.Integer, db.ForeignKey('teams.id', ondelete='CASCADE'))
|
||||||
target = db.Column(db.Integer)
|
target = db.Column(db.Integer)
|
||||||
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
||||||
type = db.Column(db.String(32))
|
type = db.Column(db.String(32))
|
||||||
@@ -715,7 +714,7 @@ class Tracking(db.Model):
|
|||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
type = db.Column(db.String(32))
|
type = db.Column(db.String(32))
|
||||||
ip = db.Column(db.String(46))
|
ip = db.Column(db.String(46))
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'))
|
||||||
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
||||||
|
|
||||||
user = db.relationship('Users', foreign_keys="Tracking.user_id", lazy='select')
|
user = db.relationship('Users', foreign_keys="Tracking.user_id", lazy='select')
|
||||||
|
|||||||
@@ -0,0 +1,140 @@
|
|||||||
|
"""Add ondelete cascade to foreign keys
|
||||||
|
|
||||||
|
Revision ID: b295b033364d
|
||||||
|
Revises: b5551cd26764
|
||||||
|
Create Date: 2019-05-03 19:26:57.746887
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import mysql
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = 'b295b033364d'
|
||||||
|
down_revision = 'b5551cd26764'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
bind = op.get_bind()
|
||||||
|
url = str(bind.engine.url)
|
||||||
|
if url.startswith('mysql'):
|
||||||
|
op.drop_constraint('awards_ibfk_1', 'awards', type_='foreignkey')
|
||||||
|
op.drop_constraint('awards_ibfk_2', 'awards', type_='foreignkey')
|
||||||
|
op.create_foreign_key('awards_ibfk_1', 'awards', 'teams', ['team_id'], ['id'], ondelete='CASCADE')
|
||||||
|
op.create_foreign_key('awards_ibfk_2', 'awards', 'users', ['user_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('files_ibfk_1', 'files', type_='foreignkey')
|
||||||
|
op.create_foreign_key('files_ibfk_1', 'files', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('flags_ibfk_1', 'flags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('flags_ibfk_1', 'flags', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('hints_ibfk_1', 'hints', type_='foreignkey')
|
||||||
|
op.create_foreign_key('hints_ibfk_1', 'hints', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('tags_ibfk_1', 'tags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tags_ibfk_1', 'tags', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('team_captain_id', 'teams', type_='foreignkey')
|
||||||
|
op.create_foreign_key('team_captain_id', 'teams', 'users', ['captain_id'], ['id'], ondelete='SET NULL')
|
||||||
|
|
||||||
|
op.drop_constraint('tracking_ibfk_1', 'tracking', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tracking_ibfk_1', 'tracking', 'users', ['user_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('unlocks_ibfk_1', 'unlocks', type_='foreignkey')
|
||||||
|
op.drop_constraint('unlocks_ibfk_2', 'unlocks', type_='foreignkey')
|
||||||
|
op.create_foreign_key('unlocks_ibfk_1', 'unlocks', 'teams', ['team_id'], ['id'], ondelete='CASCADE')
|
||||||
|
op.create_foreign_key('unlocks_ibfk_2', 'unlocks', 'users', ['user_id'], ['id'], ondelete='CASCADE')
|
||||||
|
elif url.startswith('postgres'):
|
||||||
|
op.drop_constraint('awards_team_id_fkey', 'awards', type_='foreignkey')
|
||||||
|
op.drop_constraint('awards_user_id_fkey', 'awards', type_='foreignkey')
|
||||||
|
op.create_foreign_key('awards_team_id_fkey', 'awards', 'teams', ['team_id'], ['id'], ondelete='CASCADE')
|
||||||
|
op.create_foreign_key('awards_user_id_fkey', 'awards', 'users', ['user_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('files_challenge_id_fkey', 'files', type_='foreignkey')
|
||||||
|
op.create_foreign_key('files_challenge_id_fkey', 'files', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('flags_challenge_id_fkey', 'flags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('flags_challenge_id_fkey', 'flags', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('hints_challenge_id_fkey', 'hints', type_='foreignkey')
|
||||||
|
op.create_foreign_key('hints_challenge_id_fkey', 'hints', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('tags_challenge_id_fkey', 'tags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tags_challenge_id_fkey', 'tags', 'challenges', ['challenge_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('team_captain_id', 'teams', type_='foreignkey')
|
||||||
|
op.create_foreign_key('team_captain_id', 'teams', 'users', ['captain_id'], ['id'], ondelete='SET NULL')
|
||||||
|
|
||||||
|
op.drop_constraint('tracking_user_id_fkey', 'tracking', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tracking_user_id_fkey', 'tracking', 'users', ['user_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
op.drop_constraint('unlocks_team_id_fkey', 'unlocks', type_='foreignkey')
|
||||||
|
op.drop_constraint('unlocks_user_id_fkey', 'unlocks', type_='foreignkey')
|
||||||
|
op.create_foreign_key('unlocks_team_id_fkey', 'unlocks', 'teams', ['team_id'], ['id'], ondelete='CASCADE')
|
||||||
|
op.create_foreign_key('unlocks_user_id_fkey', 'unlocks', 'users', ['user_id'], ['id'], ondelete='CASCADE')
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
bind = op.get_bind()
|
||||||
|
url = str(bind.engine.url)
|
||||||
|
if url.startswith('mysql'):
|
||||||
|
op.drop_constraint('unlocks_ibfk_1', 'unlocks', type_='foreignkey')
|
||||||
|
op.drop_constraint('unlocks_ibfk_2', 'unlocks', type_='foreignkey')
|
||||||
|
op.create_foreign_key('unlocks_ibfk_1', 'unlocks', 'teams', ['team_id'], ['id'])
|
||||||
|
op.create_foreign_key('unlocks_ibfk_2', 'unlocks', 'users', ['user_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('tracking_ibfk_1', 'tracking', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tracking_ibfk_1', 'tracking', 'users', ['user_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('team_captain_id', 'teams', type_='foreignkey')
|
||||||
|
op.create_foreign_key('team_captain_id', 'teams', 'users', ['captain_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('tags_ibfk_1', 'tags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tags_ibfk_1', 'tags', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('hints_ibfk_1', 'hints', type_='foreignkey')
|
||||||
|
op.create_foreign_key('hints_ibfk_1', 'hints', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('flags_ibfk_1', 'flags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('flags_ibfk_1', 'flags', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('files_ibfk_1', 'files', type_='foreignkey')
|
||||||
|
op.create_foreign_key('files_ibfk_1', 'files', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('awards_ibfk_1', 'awards', type_='foreignkey')
|
||||||
|
op.drop_constraint('awards_ibfk_2', 'awards', type_='foreignkey')
|
||||||
|
op.create_foreign_key('awards_ibfk_1', 'awards', 'teams', ['team_id'], ['id'])
|
||||||
|
op.create_foreign_key('awards_ibfk_2', 'awards', 'users', ['user_id'], ['id'])
|
||||||
|
elif url.startswith('postgres'):
|
||||||
|
op.drop_constraint('unlocks_team_id_fkey', 'unlocks', type_='foreignkey')
|
||||||
|
op.drop_constraint('unlocks_user_id_fkey', 'unlocks', type_='foreignkey')
|
||||||
|
op.create_foreign_key('unlocks_team_id_fkey', 'unlocks', 'teams', ['team_id'], ['id'])
|
||||||
|
op.create_foreign_key('unlocks_user_id_fkey', 'unlocks', 'users', ['user_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('tracking_user_id_fkey', 'tracking', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tracking_user_id_fkey', 'tracking', 'users', ['user_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('team_captain_id', 'teams', type_='foreignkey')
|
||||||
|
op.create_foreign_key('team_captain_id', 'teams', 'users', ['captain_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('tags_challenge_id_fkey', 'tags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('tags_challenge_id_fkey', 'tags', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('hints_challenge_id_fkey', 'hints', type_='foreignkey')
|
||||||
|
op.create_foreign_key('hints_challenge_id_fkey', 'hints', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('flags_challenge_id_fkey', 'flags', type_='foreignkey')
|
||||||
|
op.create_foreign_key('flags_challenge_id_fkey', 'flags', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('files_challenge_id_fkey', 'files', type_='foreignkey')
|
||||||
|
op.create_foreign_key('files_challenge_id_fkey', 'files', 'challenges', ['challenge_id'], ['id'])
|
||||||
|
|
||||||
|
op.drop_constraint('awards_team_id_fkey', 'awards', type_='foreignkey')
|
||||||
|
op.drop_constraint('awards_user_id_fkey', 'awards', type_='foreignkey')
|
||||||
|
op.create_foreign_key('awards_team_id_fkey', 'awards', 'teams', ['team_id'], ['id'])
|
||||||
|
op.create_foreign_key('awards_user_id_fkey', 'awards', 'users', ['user_id'], ['id'])
|
||||||
|
|
||||||
|
|
||||||
10
populate.py
10
populate.py
@@ -304,6 +304,16 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
if mode == 'teams':
|
||||||
|
# Assign Team Captains
|
||||||
|
print("GENERATING TEAM CAPTAINS")
|
||||||
|
teams = Teams.query.all()
|
||||||
|
for team in teams:
|
||||||
|
captain = Users.query.filter_by(team_id=team.id).order_by(Users.id).limit(1).first()
|
||||||
|
if captain:
|
||||||
|
team.captain_id = captain.id
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
# Generating Solves
|
# Generating Solves
|
||||||
print("GENERATING SOLVES")
|
print("GENERATING SOLVES")
|
||||||
if mode == 'users':
|
if mode == 'users':
|
||||||
|
|||||||
@@ -1,15 +1,18 @@
|
|||||||
from CTFd.models import Users, Challenges, Fails, Solves, Tracking
|
from CTFd.models import Users, Teams, Challenges, Fails, Solves, Tracking
|
||||||
from tests.helpers import (create_ctfd,
|
from tests.helpers import (
|
||||||
destroy_ctfd,
|
create_ctfd,
|
||||||
register_user,
|
destroy_ctfd,
|
||||||
login_as_user,
|
register_user,
|
||||||
gen_challenge,
|
login_as_user,
|
||||||
gen_award,
|
gen_challenge,
|
||||||
gen_flag,
|
gen_award,
|
||||||
gen_user,
|
gen_flag,
|
||||||
gen_solve,
|
gen_user,
|
||||||
gen_fail,
|
gen_team,
|
||||||
gen_tracking)
|
gen_solve,
|
||||||
|
gen_fail,
|
||||||
|
gen_tracking
|
||||||
|
)
|
||||||
import random
|
import random
|
||||||
|
|
||||||
|
|
||||||
@@ -49,3 +52,48 @@ def test_reset():
|
|||||||
assert Fails.query.count() == 0
|
assert Fails.query.count() == 0
|
||||||
assert Tracking.query.count() == 0
|
assert Tracking.query.count() == 0
|
||||||
destroy_ctfd(app)
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
|
def test_reset_team_mode():
|
||||||
|
app = create_ctfd(user_mode="teams")
|
||||||
|
with app.app_context():
|
||||||
|
base_user = 'user'
|
||||||
|
base_team = 'team'
|
||||||
|
|
||||||
|
for x in range(10):
|
||||||
|
chal = gen_challenge(app.db, name='chal_name{}'.format(x))
|
||||||
|
gen_flag(app.db, challenge_id=chal.id, content='flag')
|
||||||
|
|
||||||
|
for x in range(10):
|
||||||
|
user = base_user + str(x)
|
||||||
|
user_email = user + "@ctfd.io"
|
||||||
|
user_obj = gen_user(app.db, name=user, email=user_email)
|
||||||
|
team_obj = gen_team(app.db, name=base_team + str(x), email=base_team + str(x) + '@ctfd.io')
|
||||||
|
team_obj.members.append(user_obj)
|
||||||
|
team_obj.captain_id = user_obj.id
|
||||||
|
app.db.session.commit()
|
||||||
|
gen_award(app.db, user_id=user_obj.id)
|
||||||
|
gen_solve(app.db, user_id=user_obj.id, challenge_id=random.randint(1, 10))
|
||||||
|
gen_fail(app.db, user_id=user_obj.id, challenge_id=random.randint(1, 10))
|
||||||
|
gen_tracking(app.db, user_id=user_obj.id)
|
||||||
|
|
||||||
|
assert Teams.query.count() == 10
|
||||||
|
assert Users.query.count() == 51 # 10 random users, 40 users (10 teams * 4), 1 admin user
|
||||||
|
assert Challenges.query.count() == 10
|
||||||
|
|
||||||
|
register_user(app)
|
||||||
|
client = login_as_user(app, name="admin", password="password")
|
||||||
|
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
data = {
|
||||||
|
"nonce": sess.get('nonce')
|
||||||
|
}
|
||||||
|
client.post('/admin/reset', data=data)
|
||||||
|
|
||||||
|
assert Teams.query.count() == 0
|
||||||
|
assert Users.query.count() == 0
|
||||||
|
assert Challenges.query.count() == 10
|
||||||
|
assert Solves.query.count() == 0
|
||||||
|
assert Fails.query.count() == 0
|
||||||
|
assert Tracking.query.count() == 0
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|||||||
@@ -14,11 +14,10 @@ def test_api_team_get_members():
|
|||||||
"""Can a user get /api/v1/teams/<team_id>/members only if admin"""
|
"""Can a user get /api/v1/teams/<team_id>/members only if admin"""
|
||||||
app = create_ctfd(user_mode="teams")
|
app = create_ctfd(user_mode="teams")
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
user = gen_user(app.db)
|
gen_team(app.db)
|
||||||
team = gen_team(app.db)
|
|
||||||
team.members.append(user)
|
|
||||||
user.team_id = team.id
|
|
||||||
app.db.session.commit()
|
app.db.session.commit()
|
||||||
|
|
||||||
|
gen_user(app.db, name="user_name")
|
||||||
with login_as_user(app, name="user_name") as client:
|
with login_as_user(app, name="user_name") as client:
|
||||||
r = client.get('/api/v1/teams/1/members', json="")
|
r = client.get('/api/v1/teams/1/members', json="")
|
||||||
assert r.status_code == 403
|
assert r.status_code == 403
|
||||||
@@ -28,7 +27,8 @@ def test_api_team_get_members():
|
|||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
|
||||||
resp = r.get_json()
|
resp = r.get_json()
|
||||||
assert resp['data'] == [2]
|
# The following data is sorted b/c in Postgres data isn't necessarily returned ordered.
|
||||||
|
assert sorted(resp['data']) == sorted([2, 3, 4, 5])
|
||||||
destroy_ctfd(app)
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
@@ -36,14 +36,11 @@ def test_api_team_remove_members():
|
|||||||
"""Can a user remove /api/v1/teams/<team_id>/members only if admin"""
|
"""Can a user remove /api/v1/teams/<team_id>/members only if admin"""
|
||||||
app = create_ctfd(user_mode="teams")
|
app = create_ctfd(user_mode="teams")
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
user1 = gen_user(app.db, name="user1", email="user1@ctfd.io") # ID 2
|
|
||||||
user2 = gen_user(app.db, name="user2", email="user2@ctfd.io") # ID 3
|
|
||||||
team = gen_team(app.db)
|
team = gen_team(app.db)
|
||||||
team.members.append(user1)
|
assert len(team.members) == 4
|
||||||
team.members.append(user2)
|
|
||||||
user1.team_id = team.id
|
|
||||||
user2.team_id = team.id
|
|
||||||
app.db.session.commit()
|
app.db.session.commit()
|
||||||
|
|
||||||
|
gen_user(app.db, name='user1')
|
||||||
with login_as_user(app, name="user1") as client:
|
with login_as_user(app, name="user1") as client:
|
||||||
r = client.delete('/api/v1/teams/1/members', json={
|
r = client.delete('/api/v1/teams/1/members', json={
|
||||||
'id': 2
|
'id': 2
|
||||||
@@ -57,7 +54,8 @@ def test_api_team_remove_members():
|
|||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
|
||||||
resp = r.get_json()
|
resp = r.get_json()
|
||||||
assert resp['data'] == [3]
|
# The following data is sorted b/c in Postgres data isn't necessarily returned ordered.
|
||||||
|
assert sorted(resp['data']) == sorted([3, 4, 5])
|
||||||
|
|
||||||
r = client.delete('/api/v1/teams/1/members', json={
|
r = client.delete('/api/v1/teams/1/members', json={
|
||||||
'id': 2
|
'id': 2
|
||||||
|
|||||||
@@ -1,18 +1,22 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
from CTFd.models import Users
|
from CTFd.models import Users, Challenges, Tags, Hints, Flags
|
||||||
from CTFd.utils import set_config
|
from CTFd.utils import set_config
|
||||||
from tests.helpers import (create_ctfd,
|
from tests.helpers import (
|
||||||
destroy_ctfd,
|
create_ctfd,
|
||||||
register_user,
|
destroy_ctfd,
|
||||||
login_as_user,
|
register_user,
|
||||||
gen_challenge,
|
login_as_user,
|
||||||
gen_flag,
|
gen_challenge,
|
||||||
gen_user,
|
gen_flag,
|
||||||
gen_team,
|
gen_tag,
|
||||||
gen_solve,
|
gen_hint,
|
||||||
gen_fail)
|
gen_user,
|
||||||
|
gen_team,
|
||||||
|
gen_solve,
|
||||||
|
gen_fail
|
||||||
|
)
|
||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
|
|
||||||
|
|
||||||
@@ -283,7 +287,7 @@ def test_api_challenge_delete_non_admin():
|
|||||||
|
|
||||||
|
|
||||||
def test_api_challenge_delete_admin():
|
def test_api_challenge_delete_admin():
|
||||||
"""Can a user patch /api/v1/challenges/<challenge_id> if admin"""
|
"""Can a user delete /api/v1/challenges/<challenge_id> if admin"""
|
||||||
app = create_ctfd()
|
app = create_ctfd()
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
gen_challenge(app.db)
|
gen_challenge(app.db)
|
||||||
@@ -294,6 +298,32 @@ def test_api_challenge_delete_admin():
|
|||||||
destroy_ctfd(app)
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
|
def test_api_challenge_with_properties_delete_admin():
|
||||||
|
"""Can a user delete /api/v1/challenges/<challenge_id> if the challenge has other properties"""
|
||||||
|
app = create_ctfd()
|
||||||
|
with app.app_context():
|
||||||
|
challenge = gen_challenge(app.db)
|
||||||
|
gen_hint(app.db, challenge_id=challenge.id)
|
||||||
|
gen_tag(app.db, challenge_id=challenge.id)
|
||||||
|
gen_flag(app.db, challenge_id=challenge.id)
|
||||||
|
|
||||||
|
challenge = Challenges.query.filter_by(id=1).first()
|
||||||
|
assert len(challenge.hints) == 1
|
||||||
|
assert len(challenge.tags) == 1
|
||||||
|
assert len(challenge.flags) == 1
|
||||||
|
|
||||||
|
with login_as_user(app, 'admin') as client:
|
||||||
|
r = client.delete('/api/v1/challenges/1', json="")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.get_json().get('data') is None
|
||||||
|
|
||||||
|
assert Tags.query.count() == 0
|
||||||
|
assert Hints.query.count() == 0
|
||||||
|
assert Flags.query.count() == 0
|
||||||
|
|
||||||
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
def test_api_challenge_attempt_post_public():
|
def test_api_challenge_attempt_post_public():
|
||||||
"""Can a public user post /api/v1/challenges/attempt"""
|
"""Can a public user post /api/v1/challenges/attempt"""
|
||||||
app = create_ctfd()
|
app = create_ctfd()
|
||||||
|
|||||||
@@ -4,14 +4,17 @@
|
|||||||
from CTFd.models import Teams, Users
|
from CTFd.models import Teams, Users
|
||||||
from CTFd.utils import set_config
|
from CTFd.utils import set_config
|
||||||
from CTFd.utils.crypto import verify_password
|
from CTFd.utils.crypto import verify_password
|
||||||
from tests.helpers import (create_ctfd,
|
from tests.helpers import (
|
||||||
destroy_ctfd,
|
create_ctfd,
|
||||||
register_user,
|
destroy_ctfd,
|
||||||
login_as_user,
|
register_user,
|
||||||
gen_user,
|
login_as_user,
|
||||||
gen_team,
|
gen_user,
|
||||||
gen_challenge,
|
gen_team,
|
||||||
gen_flag)
|
gen_challenge,
|
||||||
|
gen_flag,
|
||||||
|
simulate_user_activity
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_api_teams_get_public():
|
def test_api_teams_get_public():
|
||||||
@@ -262,11 +265,21 @@ def test_api_team_delete_admin():
|
|||||||
"""Can a user patch /api/v1/teams/<team_id> if admin"""
|
"""Can a user patch /api/v1/teams/<team_id> if admin"""
|
||||||
app = create_ctfd(user_mode="teams")
|
app = create_ctfd(user_mode="teams")
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
gen_team(app.db)
|
team = gen_team(app.db)
|
||||||
|
|
||||||
|
assert len(team.members) == 4
|
||||||
|
|
||||||
|
members = team.members
|
||||||
|
for user in members:
|
||||||
|
simulate_user_activity(app.db, user=user)
|
||||||
|
|
||||||
with login_as_user(app, 'admin') as client:
|
with login_as_user(app, 'admin') as client:
|
||||||
r = client.delete('/api/v1/teams/1', json="")
|
r = client.delete('/api/v1/teams/1', json="")
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
assert r.get_json().get('data') is None
|
assert r.get_json().get('data') is None
|
||||||
|
|
||||||
|
for user in Users.query.all():
|
||||||
|
assert user.team_id is None
|
||||||
destroy_ctfd(app)
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -5,11 +5,14 @@ from CTFd.models import Users
|
|||||||
from CTFd.utils import set_config
|
from CTFd.utils import set_config
|
||||||
from CTFd.utils.crypto import verify_password
|
from CTFd.utils.crypto import verify_password
|
||||||
from CTFd.schemas.users import UserSchema
|
from CTFd.schemas.users import UserSchema
|
||||||
from tests.helpers import (create_ctfd,
|
from tests.helpers import (
|
||||||
destroy_ctfd,
|
create_ctfd,
|
||||||
register_user,
|
destroy_ctfd,
|
||||||
login_as_user,
|
register_user,
|
||||||
gen_user)
|
login_as_user,
|
||||||
|
gen_user,
|
||||||
|
simulate_user_activity
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_api_users_get_public():
|
def test_api_users_get_public():
|
||||||
@@ -322,10 +325,13 @@ def test_api_user_delete_admin():
|
|||||||
app = create_ctfd()
|
app = create_ctfd()
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
register_user(app)
|
register_user(app)
|
||||||
|
user = Users.query.filter_by(id=2).first()
|
||||||
|
simulate_user_activity(app.db, user=user)
|
||||||
with login_as_user(app, 'admin') as client:
|
with login_as_user(app, 'admin') as client:
|
||||||
r = client.delete('/api/v1/users/2', json="")
|
r = client.delete('/api/v1/users/2', json="")
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
assert r.get_json().get('data') is None
|
assert r.get_json().get('data') is None
|
||||||
|
assert Users.query.filter_by(id=2).first() is None
|
||||||
destroy_ctfd(app)
|
destroy_ctfd(app)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ import six
|
|||||||
import gc
|
import gc
|
||||||
import requests
|
import requests
|
||||||
import uuid
|
import uuid
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
|
||||||
if six.PY2:
|
if six.PY2:
|
||||||
text_type = unicode # noqa: F821
|
text_type = unicode # noqa: F821
|
||||||
@@ -222,6 +224,10 @@ def get_scores(user):
|
|||||||
return scores['data']
|
return scores['data']
|
||||||
|
|
||||||
|
|
||||||
|
def random_string(n=5):
|
||||||
|
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(n))
|
||||||
|
|
||||||
|
|
||||||
def gen_challenge(db, name='chal_name', description='chal_description', value=100, category='chal_category', type='standard', state='visible', **kwargs):
|
def gen_challenge(db, name='chal_name', description='chal_description', value=100, category='chal_category', type='standard', state='visible', **kwargs):
|
||||||
chal = Challenges(name=name, description=description, value=value, category=category, type=type, state=state, **kwargs)
|
chal = Challenges(name=name, description=description, value=value, category=category, type=type, state=state, **kwargs)
|
||||||
db.session.add(chal)
|
db.session.add(chal)
|
||||||
@@ -272,8 +278,14 @@ def gen_user(db, name='user_name', email='user@ctfd.io', password='password', **
|
|||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
def gen_team(db, name='team_name', email='team@ctfd.io', password='password', **kwargs):
|
def gen_team(db, name='team_name', email='team@ctfd.io', password='password', member_count=4, **kwargs):
|
||||||
team = Teams(name=name, email=email, password=password, **kwargs)
|
team = Teams(name=name, email=email, password=password, **kwargs)
|
||||||
|
for i in range(member_count):
|
||||||
|
name = 'user-{}-{}'.format(random_string(), str(i))
|
||||||
|
user = gen_user(db, name=name, email=name + '@ctfd.io', team_id=team.id)
|
||||||
|
if i == 0:
|
||||||
|
team.captain_id = user.id
|
||||||
|
team.members.append(user)
|
||||||
db.session.add(team)
|
db.session.add(team)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return team
|
return team
|
||||||
@@ -286,7 +298,7 @@ def gen_hint(db, challenge_id, content="This is a hint", cost=0, type="standard"
|
|||||||
return hint
|
return hint
|
||||||
|
|
||||||
|
|
||||||
def gen_unlock(db, user_id, team_id, target, type):
|
def gen_unlock(db, user_id, team_id=None, target=None, type='hints'):
|
||||||
unlock = Unlocks(
|
unlock = Unlocks(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
team_id=team_id,
|
team_id=team_id,
|
||||||
@@ -332,3 +344,17 @@ def gen_notification(db, title='title', content='content'):
|
|||||||
notif = Notifications(title=title, content=content)
|
notif = Notifications(title=title, content=content)
|
||||||
db.session.add(notif)
|
db.session.add(notif)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def simulate_user_activity(db, user):
|
||||||
|
gen_tracking(db, user_id=user.id)
|
||||||
|
gen_award(db, user_id=user.id)
|
||||||
|
challenge = gen_challenge(db)
|
||||||
|
flag = gen_flag(db, challenge_id=challenge.id)
|
||||||
|
hint = gen_hint(db, challenge_id=challenge.id)
|
||||||
|
|
||||||
|
for _ in range(5):
|
||||||
|
gen_fail(db, user_id=user.id, challenge_id=challenge.id)
|
||||||
|
|
||||||
|
gen_unlock(db, user_id=user.id, target=hint.id, type='hints')
|
||||||
|
gen_solve(db, user_id=user.id, challenge_id=challenge.id, provided=flag.content)
|
||||||
|
|||||||
Reference in New Issue
Block a user