diff --git a/CTFd/api/v1/challenges.py b/CTFd/api/v1/challenges.py index f27b7a92..bf8a5d5b 100644 --- a/CTFd/api/v1/challenges.py +++ b/CTFd/api/v1/challenges.py @@ -4,8 +4,7 @@ from typing import List from flask import abort, render_template, request, url_for from flask_restx import Namespace, Resource from sqlalchemy import func as sa_func -from sqlalchemy import types as sa_types -from sqlalchemy.sql import and_, cast, false, true +from sqlalchemy.sql import and_, false, true from CTFd.api.v1.helpers.request import validate_args from CTFd.api.v1.helpers.schemas import sqlalchemy_to_pydantic @@ -85,10 +84,16 @@ challenges_namespace.schema_model( def _build_solves_query(extra_filters=(), admin_view=False): + """Returns queries and data that that are used for showing an account's solves. + It returns a tuple of + - SQLAlchemy query with (challenge_id, solve_count_for_challenge_id) + - Current user's solved challenge IDs + """ # This can return None (unauth) if visibility is set to public user = get_current_user() # We only set a condition for matching user solves if there is a user and # they have an account ID (user mode or in a team in teams mode) + AccountModel = get_model() if user is not None and user.account_id is not None: user_solved_cond = Solves.account_id == user.account_id else: @@ -102,7 +107,6 @@ def _build_solves_query(extra_filters=(), admin_view=False): freeze_cond = true() # Finally, we never count solves made by hidden or banned users/teams, even # if we are an admin. This is to match the challenge detail API. - AccountModel = get_model() exclude_solves_cond = and_( AccountModel.banned == false(), AccountModel.hidden == false(), ) @@ -110,16 +114,19 @@ def _build_solves_query(extra_filters=(), admin_view=False): # of correct solves made by the current user per the condition above (which # should probably only be 0 or 1!) solves_q = ( - db.session.query( - Solves.challenge_id, - sa_func.count(Solves.challenge_id), - sa_func.sum(cast(user_solved_cond, sa_types.Integer)), - ) + db.session.query(Solves.challenge_id, sa_func.count(Solves.challenge_id),) .join(AccountModel) .filter(*extra_filters, freeze_cond, exclude_solves_cond) .group_by(Solves.challenge_id) ) - return solves_q + # Also gather the user's solve items which can be different from above query + # Even if we are a hidden user, we should see that we have solved the challenge + # But as a hidden user we are not included in the count + solve_ids = ( + Solves.query.with_entities(Solves.challenge_id).filter(user_solved_cond).all() + ) + solve_ids = {value for value, in solve_ids} + return solves_q, solve_ids @challenges_namespace.route("") @@ -181,20 +188,18 @@ class ChallengeList(Resource): # Admins get a shortcut to see all challenges despite pre-requisites admin_view = is_admin() and request.args.get("view") == "admin" - solve_counts, user_solves = {}, set() + solve_counts = {} # Build a query for to show challenge solve information. We only # give an admin view if the request argument has been provided. # # NOTE: This is different behaviour to the challenge detail # endpoint which only needs the current user to be an admin rather # than also also having to provide `view=admin` as a query arg. - solves_q = _build_solves_query(admin_view=admin_view) + solves_q, user_solves = _build_solves_query(admin_view=admin_view) # Aggregate the query results into the hashes defined at the top of # this block for later use - for chal_id, solve_count, solved_by_user in solves_q: + for chal_id, solve_count in solves_q: solve_counts[chal_id] = solve_count - if solved_by_user: - user_solves.add(chal_id) if scores_visible() and accounts_visible(): solve_count_dfl = 0 else: @@ -435,14 +440,14 @@ class Challenge(Resource): response = chal_class.read(challenge=chal) - solves_q = _build_solves_query( + solves_q, user_solves = _build_solves_query( admin_view=is_admin(), extra_filters=(Solves.challenge_id == chal.id,) ) # If there are no solves for this challenge ID then we have 0 rows maybe_row = solves_q.first() if maybe_row: - _, solve_count, solved_by_user = maybe_row - solved_by_user = bool(solved_by_user) + challenge_id, solve_count = maybe_row + solved_by_user = challenge_id in user_solves else: solve_count, solved_by_user = 0, False diff --git a/tests/api/v1/test_challenges.py b/tests/api/v1/test_challenges.py index b5bc6df7..a952a09b 100644 --- a/tests/api/v1/test_challenges.py +++ b/tests/api/v1/test_challenges.py @@ -364,6 +364,13 @@ def test_api_challenges_get_solve_count_hidden_user(): assert r.status_code == 200 chal_data = r.get_json()["data"].pop() assert chal_data["solves"] == 0 + # We expect the admin to be able to see their own solve + with login_as_user(app, "admin") as admin: + r = admin.get("/api/v1/challenges") + assert r.status_code == 200 + chal_data = r.get_json()["data"].pop() + assert chal_data["solves"] == 0 + assert chal_data["solved_by_me"] is True destroy_ctfd(app) @@ -398,6 +405,110 @@ def test_api_challenges_get_solve_count_banned_user(): destroy_ctfd(app) +def test_api_challenges_challenge_with_requirements(): + """Does the challenge list API show challenges with requirements met?""" + app = create_ctfd() + with app.app_context(): + prereq_id = gen_challenge(app.db).id + chal_obj = gen_challenge(app.db) + chal_obj.requirements = {"prerequisites": [prereq_id]} + chal_id = chal_obj.id + # Create a new user which will solve the prerequisite + register_user(app) + # Confirm that only the prerequisite challenge is listed initially + with login_as_user(app) as client: + r = client.get("/api/v1/challenges") + assert r.status_code == 200 + (chal_data,) = r.get_json()["data"] + assert chal_data["id"] == prereq_id + # Generate a solve and then confirm the second challenge is visible + gen_solve(app.db, user_id=2, challenge_id=prereq_id) + with login_as_user(app) as client: + r = client.get("/api/v1/challenges") + assert r.status_code == 200 + data = r.get_json()["data"] + assert len(data) == 2 + chal_ids = {c["id"] for c in r.get_json()["data"]} + assert chal_ids == {prereq_id, chal_id} + destroy_ctfd(app) + + +def test_api_challenges_challenge_with_requirements_hidden_user(): + """Does the challenge list API show gated challenges to a hidden user?""" + app = create_ctfd() + with app.app_context(): + prereq_id = gen_challenge(app.db).id + chal_obj = gen_challenge(app.db) + chal_obj.requirements = {"prerequisites": [prereq_id]} + chal_id = chal_obj.id + # Create a new user which will solve the prerequisite and hide them + register_user(app) + Users.query.get(2).hidden = True + app.db.session.commit() + # Confirm that only the prerequisite challenge is listed initially + with login_as_user(app) as client: + r = client.get("/api/v1/challenges") + assert r.status_code == 200 + (chal_data,) = r.get_json()["data"] + assert chal_data["id"] == prereq_id + # Generate a solve and then confirm the second challenge is visible + gen_solve(app.db, user_id=2, challenge_id=prereq_id) + with login_as_user(app) as client: + r = client.get("/api/v1/challenges") + assert r.status_code == 200 + data = r.get_json()["data"] + assert len(data) == 2 + chal_ids = {c["id"] for c in r.get_json()["data"]} + assert chal_ids == {prereq_id, chal_id} + destroy_ctfd(app) + + +def test_api_challenges_challenge_with_requirements_banned_user(): + """Does the challenge list API show gated challenges to a banned user?""" + app = create_ctfd() + with app.app_context(): + prereq_id = gen_challenge(app.db).id + chal_obj = gen_challenge(app.db) + chal_obj.requirements = {"prerequisites": [prereq_id]} + # Create a new user which will solve the prerequisite and ban them + register_user(app) + Users.query.get(2).banned = True + app.db.session.commit() + # Generate a solve just in case and confirm the API 403s + gen_solve(app.db, user_id=2, challenge_id=prereq_id) + with login_as_user(app) as client: + assert client.get("/api/v1/challenges").status_code == 403 + destroy_ctfd(app) + + +def test_api_challenges_challenge_with_requirements_no_user(): + """Does the challenge list API show gated challenges to the public?""" + app = create_ctfd() + with app.app_context(): + set_config("challenge_visibility", "public") + prereq_id = gen_challenge(app.db).id + chal_obj = gen_challenge(app.db) + chal_obj.requirements = {"prerequisites": [prereq_id]} + # Create a new user which will solve the prerequisite + register_user(app) + # Confirm that only the prerequisite challenge is listed publicly + with app.test_client() as client: + r = client.get("/api/v1/challenges") + assert r.status_code == 200 + initial_data = r.get_json()["data"] + (chal_data,) = initial_data + assert chal_data["id"] == prereq_id + # Fix up the solve count for later comparison with `initial_data` + chal_data["solves"] += 1 + # Generate a solve and then confirm the response is unchanged + gen_solve(app.db, user_id=2, challenge_id=prereq_id) + with app.test_client() as client: + r = client.get("/api/v1/challenges") + assert r.status_code == 200 + assert r.get_json()["data"] == initial_data + destroy_ctfd(app) + + def test_api_challenges_post_admin(): """Can a user post /api/v1/challenges if admin""" app = create_ctfd()