mirror of
https://github.com/aljazceru/CTFd.git
synced 2025-12-17 05:54:19 +01:00
* Closes #1839 Co-authored-by: maybe-sybr <58414429+maybe-sybr@users.noreply.github.com>
This commit is contained in:
@@ -4,8 +4,7 @@ from typing import List
|
||||
from flask import abort, render_template, request, url_for
|
||||
from flask_restx import Namespace, Resource
|
||||
from sqlalchemy import func as sa_func
|
||||
from sqlalchemy import types as sa_types
|
||||
from sqlalchemy.sql import and_, cast, false, true
|
||||
from sqlalchemy.sql import and_, false, true
|
||||
|
||||
from CTFd.api.v1.helpers.request import validate_args
|
||||
from CTFd.api.v1.helpers.schemas import sqlalchemy_to_pydantic
|
||||
@@ -85,10 +84,16 @@ challenges_namespace.schema_model(
|
||||
|
||||
|
||||
def _build_solves_query(extra_filters=(), admin_view=False):
|
||||
"""Returns queries and data that that are used for showing an account's solves.
|
||||
It returns a tuple of
|
||||
- SQLAlchemy query with (challenge_id, solve_count_for_challenge_id)
|
||||
- Current user's solved challenge IDs
|
||||
"""
|
||||
# This can return None (unauth) if visibility is set to public
|
||||
user = get_current_user()
|
||||
# We only set a condition for matching user solves if there is a user and
|
||||
# they have an account ID (user mode or in a team in teams mode)
|
||||
AccountModel = get_model()
|
||||
if user is not None and user.account_id is not None:
|
||||
user_solved_cond = Solves.account_id == user.account_id
|
||||
else:
|
||||
@@ -102,7 +107,6 @@ def _build_solves_query(extra_filters=(), admin_view=False):
|
||||
freeze_cond = true()
|
||||
# Finally, we never count solves made by hidden or banned users/teams, even
|
||||
# if we are an admin. This is to match the challenge detail API.
|
||||
AccountModel = get_model()
|
||||
exclude_solves_cond = and_(
|
||||
AccountModel.banned == false(), AccountModel.hidden == false(),
|
||||
)
|
||||
@@ -110,16 +114,19 @@ def _build_solves_query(extra_filters=(), admin_view=False):
|
||||
# of correct solves made by the current user per the condition above (which
|
||||
# should probably only be 0 or 1!)
|
||||
solves_q = (
|
||||
db.session.query(
|
||||
Solves.challenge_id,
|
||||
sa_func.count(Solves.challenge_id),
|
||||
sa_func.sum(cast(user_solved_cond, sa_types.Integer)),
|
||||
)
|
||||
db.session.query(Solves.challenge_id, sa_func.count(Solves.challenge_id),)
|
||||
.join(AccountModel)
|
||||
.filter(*extra_filters, freeze_cond, exclude_solves_cond)
|
||||
.group_by(Solves.challenge_id)
|
||||
)
|
||||
return solves_q
|
||||
# Also gather the user's solve items which can be different from above query
|
||||
# Even if we are a hidden user, we should see that we have solved the challenge
|
||||
# But as a hidden user we are not included in the count
|
||||
solve_ids = (
|
||||
Solves.query.with_entities(Solves.challenge_id).filter(user_solved_cond).all()
|
||||
)
|
||||
solve_ids = {value for value, in solve_ids}
|
||||
return solves_q, solve_ids
|
||||
|
||||
|
||||
@challenges_namespace.route("")
|
||||
@@ -181,20 +188,18 @@ class ChallengeList(Resource):
|
||||
# Admins get a shortcut to see all challenges despite pre-requisites
|
||||
admin_view = is_admin() and request.args.get("view") == "admin"
|
||||
|
||||
solve_counts, user_solves = {}, set()
|
||||
solve_counts = {}
|
||||
# Build a query for to show challenge solve information. We only
|
||||
# give an admin view if the request argument has been provided.
|
||||
#
|
||||
# NOTE: This is different behaviour to the challenge detail
|
||||
# endpoint which only needs the current user to be an admin rather
|
||||
# than also also having to provide `view=admin` as a query arg.
|
||||
solves_q = _build_solves_query(admin_view=admin_view)
|
||||
solves_q, user_solves = _build_solves_query(admin_view=admin_view)
|
||||
# Aggregate the query results into the hashes defined at the top of
|
||||
# this block for later use
|
||||
for chal_id, solve_count, solved_by_user in solves_q:
|
||||
for chal_id, solve_count in solves_q:
|
||||
solve_counts[chal_id] = solve_count
|
||||
if solved_by_user:
|
||||
user_solves.add(chal_id)
|
||||
if scores_visible() and accounts_visible():
|
||||
solve_count_dfl = 0
|
||||
else:
|
||||
@@ -435,14 +440,14 @@ class Challenge(Resource):
|
||||
|
||||
response = chal_class.read(challenge=chal)
|
||||
|
||||
solves_q = _build_solves_query(
|
||||
solves_q, user_solves = _build_solves_query(
|
||||
admin_view=is_admin(), extra_filters=(Solves.challenge_id == chal.id,)
|
||||
)
|
||||
# If there are no solves for this challenge ID then we have 0 rows
|
||||
maybe_row = solves_q.first()
|
||||
if maybe_row:
|
||||
_, solve_count, solved_by_user = maybe_row
|
||||
solved_by_user = bool(solved_by_user)
|
||||
challenge_id, solve_count = maybe_row
|
||||
solved_by_user = challenge_id in user_solves
|
||||
else:
|
||||
solve_count, solved_by_user = 0, False
|
||||
|
||||
|
||||
@@ -364,6 +364,13 @@ def test_api_challenges_get_solve_count_hidden_user():
|
||||
assert r.status_code == 200
|
||||
chal_data = r.get_json()["data"].pop()
|
||||
assert chal_data["solves"] == 0
|
||||
# We expect the admin to be able to see their own solve
|
||||
with login_as_user(app, "admin") as admin:
|
||||
r = admin.get("/api/v1/challenges")
|
||||
assert r.status_code == 200
|
||||
chal_data = r.get_json()["data"].pop()
|
||||
assert chal_data["solves"] == 0
|
||||
assert chal_data["solved_by_me"] is True
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
@@ -398,6 +405,110 @@ def test_api_challenges_get_solve_count_banned_user():
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_api_challenges_challenge_with_requirements():
|
||||
"""Does the challenge list API show challenges with requirements met?"""
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
prereq_id = gen_challenge(app.db).id
|
||||
chal_obj = gen_challenge(app.db)
|
||||
chal_obj.requirements = {"prerequisites": [prereq_id]}
|
||||
chal_id = chal_obj.id
|
||||
# Create a new user which will solve the prerequisite
|
||||
register_user(app)
|
||||
# Confirm that only the prerequisite challenge is listed initially
|
||||
with login_as_user(app) as client:
|
||||
r = client.get("/api/v1/challenges")
|
||||
assert r.status_code == 200
|
||||
(chal_data,) = r.get_json()["data"]
|
||||
assert chal_data["id"] == prereq_id
|
||||
# Generate a solve and then confirm the second challenge is visible
|
||||
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
|
||||
with login_as_user(app) as client:
|
||||
r = client.get("/api/v1/challenges")
|
||||
assert r.status_code == 200
|
||||
data = r.get_json()["data"]
|
||||
assert len(data) == 2
|
||||
chal_ids = {c["id"] for c in r.get_json()["data"]}
|
||||
assert chal_ids == {prereq_id, chal_id}
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_api_challenges_challenge_with_requirements_hidden_user():
|
||||
"""Does the challenge list API show gated challenges to a hidden user?"""
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
prereq_id = gen_challenge(app.db).id
|
||||
chal_obj = gen_challenge(app.db)
|
||||
chal_obj.requirements = {"prerequisites": [prereq_id]}
|
||||
chal_id = chal_obj.id
|
||||
# Create a new user which will solve the prerequisite and hide them
|
||||
register_user(app)
|
||||
Users.query.get(2).hidden = True
|
||||
app.db.session.commit()
|
||||
# Confirm that only the prerequisite challenge is listed initially
|
||||
with login_as_user(app) as client:
|
||||
r = client.get("/api/v1/challenges")
|
||||
assert r.status_code == 200
|
||||
(chal_data,) = r.get_json()["data"]
|
||||
assert chal_data["id"] == prereq_id
|
||||
# Generate a solve and then confirm the second challenge is visible
|
||||
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
|
||||
with login_as_user(app) as client:
|
||||
r = client.get("/api/v1/challenges")
|
||||
assert r.status_code == 200
|
||||
data = r.get_json()["data"]
|
||||
assert len(data) == 2
|
||||
chal_ids = {c["id"] for c in r.get_json()["data"]}
|
||||
assert chal_ids == {prereq_id, chal_id}
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_api_challenges_challenge_with_requirements_banned_user():
|
||||
"""Does the challenge list API show gated challenges to a banned user?"""
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
prereq_id = gen_challenge(app.db).id
|
||||
chal_obj = gen_challenge(app.db)
|
||||
chal_obj.requirements = {"prerequisites": [prereq_id]}
|
||||
# Create a new user which will solve the prerequisite and ban them
|
||||
register_user(app)
|
||||
Users.query.get(2).banned = True
|
||||
app.db.session.commit()
|
||||
# Generate a solve just in case and confirm the API 403s
|
||||
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
|
||||
with login_as_user(app) as client:
|
||||
assert client.get("/api/v1/challenges").status_code == 403
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_api_challenges_challenge_with_requirements_no_user():
|
||||
"""Does the challenge list API show gated challenges to the public?"""
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
set_config("challenge_visibility", "public")
|
||||
prereq_id = gen_challenge(app.db).id
|
||||
chal_obj = gen_challenge(app.db)
|
||||
chal_obj.requirements = {"prerequisites": [prereq_id]}
|
||||
# Create a new user which will solve the prerequisite
|
||||
register_user(app)
|
||||
# Confirm that only the prerequisite challenge is listed publicly
|
||||
with app.test_client() as client:
|
||||
r = client.get("/api/v1/challenges")
|
||||
assert r.status_code == 200
|
||||
initial_data = r.get_json()["data"]
|
||||
(chal_data,) = initial_data
|
||||
assert chal_data["id"] == prereq_id
|
||||
# Fix up the solve count for later comparison with `initial_data`
|
||||
chal_data["solves"] += 1
|
||||
# Generate a solve and then confirm the response is unchanged
|
||||
gen_solve(app.db, user_id=2, challenge_id=prereq_id)
|
||||
with app.test_client() as client:
|
||||
r = client.get("/api/v1/challenges")
|
||||
assert r.status_code == 200
|
||||
assert r.get_json()["data"] == initial_data
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_api_challenges_post_admin():
|
||||
"""Can a user post /api/v1/challenges if admin"""
|
||||
app = create_ctfd()
|
||||
|
||||
Reference in New Issue
Block a user