diff --git a/CTFd/schemas/teams.py b/CTFd/schemas/teams.py index 2b8b428f..54405144 100644 --- a/CTFd/schemas/teams.py +++ b/CTFd/schemas/teams.py @@ -1,6 +1,7 @@ -from marshmallow import ValidationError, pre_load, validate +from marshmallow import ValidationError, post_dump, pre_load, validate from marshmallow.fields import Nested from marshmallow_sqlalchemy import field_for +from sqlalchemy.orm import load_only from CTFd.models import TeamFieldEntries, TeamFields, Teams, Users, ma from CTFd.schemas.fields import TeamFieldEntriesSchema @@ -191,6 +192,116 @@ class TeamSchema(ma.ModelSchema): field_names=["captain_id"], ) + @pre_load + def validate_fields(self, data): + """ + This validator is used to only allow users to update the field entry for their user. + It's not possible to exclude it because without the PK Marshmallow cannot load the right instance + """ + fields = data.get("fields") + if fields is None: + return + + current_user = get_current_user() + + if is_admin(): + user_id = data.get("id") + if user_id: + target_user = Users.query.filter_by(id=data["id"]).first() + else: + target_user = current_user + + provided_ids = [] + for f in fields: + f.pop("id", None) + field_id = f.get("field_id") + + # # Check that we have an existing field for this. May be unnecessary b/c the foriegn key should enforce + field = TeamFields.query.filter_by(id=field_id).first_or_404() + + # Get the existing field entry if one exists + entry = TeamFieldEntries.query.filter_by( + field_id=field.id, user_id=target_user.id + ).first() + if entry: + f["id"] = entry.id + provided_ids.append(entry.id) + + # Extremely dirty hack to prevent deleting previously provided data. + # This needs a better soln. + entries = ( + TeamFieldEntries.query.options(load_only("id")) + .filter_by(user_id=target_user.id) + .all() + ) + for entry in entries: + if entry.id not in provided_ids: + fields.append({"id": entry.id}) + else: + provided_ids = [] + for f in fields: + # Remove any existing set + f.pop("id", None) + field_id = f.get("field_id") + + # # Check that we have an existing field for this. May be unnecessary b/c the foriegn key should enforce + field = TeamFields.query.filter_by(id=field_id).first_or_404() + + if field.editable is False: + raise ValidationError( + f"Field {field.name} cannot be editted", field_names=["fields"] + ) + + # Get the existing field entry if one exists + entry = TeamFieldEntries.query.filter_by( + field_id=field.id, user_id=current_user.id + ).first() + + if entry: + f["id"] = entry.id + provided_ids.append(entry.id) + + # Extremely dirty hack to prevent deleting previously provided data. + # This needs a better soln. + entries = ( + TeamFieldEntries.query.options(load_only("id")) + .filter_by(user_id=current_user.id) + .all() + ) + for entry in entries: + if entry.id not in provided_ids: + fields.append({"id": entry.id}) + + @post_dump + def process_fields(self, data): + """ + Handle permissions levels for fields. + This is post_dump to manipulate JSON instead of the raw db object + + Admins can see all fields. + Users (self) can see their edittable and public fields + Public (user) can only see public fields + """ + # Gather all possible fields + removed_field_ids = [] + fields = TeamFields.query.all() + + # Select fields for removal based on current view and properties of the field + for field in fields: + if self.view == "user": + if field.public is False: + removed_field_ids.append(field.id) + elif self.view == "self": + if field.editable is False and field.public is False: + removed_field_ids.append(field.id) + + # Rebuild fuilds + fields = data.get("fields") + if fields: + data["fields"] = [ + field for field in fields if field["field_id"] not in removed_field_ids + ] + views = { "user": [ "website",