mirror of
https://github.com/aljazceru/CTFd.git
synced 2025-12-18 22:44:24 +01:00
Copy schemas over from users
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
from marshmallow import ValidationError, pre_load, validate
|
||||
from marshmallow import ValidationError, post_dump, pre_load, validate
|
||||
from marshmallow.fields import Nested
|
||||
from marshmallow_sqlalchemy import field_for
|
||||
from sqlalchemy.orm import load_only
|
||||
|
||||
from CTFd.models import TeamFieldEntries, TeamFields, Teams, Users, ma
|
||||
from CTFd.schemas.fields import TeamFieldEntriesSchema
|
||||
@@ -191,6 +192,116 @@ class TeamSchema(ma.ModelSchema):
|
||||
field_names=["captain_id"],
|
||||
)
|
||||
|
||||
@pre_load
|
||||
def validate_fields(self, data):
|
||||
"""
|
||||
This validator is used to only allow users to update the field entry for their user.
|
||||
It's not possible to exclude it because without the PK Marshmallow cannot load the right instance
|
||||
"""
|
||||
fields = data.get("fields")
|
||||
if fields is None:
|
||||
return
|
||||
|
||||
current_user = get_current_user()
|
||||
|
||||
if is_admin():
|
||||
user_id = data.get("id")
|
||||
if user_id:
|
||||
target_user = Users.query.filter_by(id=data["id"]).first()
|
||||
else:
|
||||
target_user = current_user
|
||||
|
||||
provided_ids = []
|
||||
for f in fields:
|
||||
f.pop("id", None)
|
||||
field_id = f.get("field_id")
|
||||
|
||||
# # Check that we have an existing field for this. May be unnecessary b/c the foriegn key should enforce
|
||||
field = TeamFields.query.filter_by(id=field_id).first_or_404()
|
||||
|
||||
# Get the existing field entry if one exists
|
||||
entry = TeamFieldEntries.query.filter_by(
|
||||
field_id=field.id, user_id=target_user.id
|
||||
).first()
|
||||
if entry:
|
||||
f["id"] = entry.id
|
||||
provided_ids.append(entry.id)
|
||||
|
||||
# Extremely dirty hack to prevent deleting previously provided data.
|
||||
# This needs a better soln.
|
||||
entries = (
|
||||
TeamFieldEntries.query.options(load_only("id"))
|
||||
.filter_by(user_id=target_user.id)
|
||||
.all()
|
||||
)
|
||||
for entry in entries:
|
||||
if entry.id not in provided_ids:
|
||||
fields.append({"id": entry.id})
|
||||
else:
|
||||
provided_ids = []
|
||||
for f in fields:
|
||||
# Remove any existing set
|
||||
f.pop("id", None)
|
||||
field_id = f.get("field_id")
|
||||
|
||||
# # Check that we have an existing field for this. May be unnecessary b/c the foriegn key should enforce
|
||||
field = TeamFields.query.filter_by(id=field_id).first_or_404()
|
||||
|
||||
if field.editable is False:
|
||||
raise ValidationError(
|
||||
f"Field {field.name} cannot be editted", field_names=["fields"]
|
||||
)
|
||||
|
||||
# Get the existing field entry if one exists
|
||||
entry = TeamFieldEntries.query.filter_by(
|
||||
field_id=field.id, user_id=current_user.id
|
||||
).first()
|
||||
|
||||
if entry:
|
||||
f["id"] = entry.id
|
||||
provided_ids.append(entry.id)
|
||||
|
||||
# Extremely dirty hack to prevent deleting previously provided data.
|
||||
# This needs a better soln.
|
||||
entries = (
|
||||
TeamFieldEntries.query.options(load_only("id"))
|
||||
.filter_by(user_id=current_user.id)
|
||||
.all()
|
||||
)
|
||||
for entry in entries:
|
||||
if entry.id not in provided_ids:
|
||||
fields.append({"id": entry.id})
|
||||
|
||||
@post_dump
|
||||
def process_fields(self, data):
|
||||
"""
|
||||
Handle permissions levels for fields.
|
||||
This is post_dump to manipulate JSON instead of the raw db object
|
||||
|
||||
Admins can see all fields.
|
||||
Users (self) can see their edittable and public fields
|
||||
Public (user) can only see public fields
|
||||
"""
|
||||
# Gather all possible fields
|
||||
removed_field_ids = []
|
||||
fields = TeamFields.query.all()
|
||||
|
||||
# Select fields for removal based on current view and properties of the field
|
||||
for field in fields:
|
||||
if self.view == "user":
|
||||
if field.public is False:
|
||||
removed_field_ids.append(field.id)
|
||||
elif self.view == "self":
|
||||
if field.editable is False and field.public is False:
|
||||
removed_field_ids.append(field.id)
|
||||
|
||||
# Rebuild fuilds
|
||||
fields = data.get("fields")
|
||||
if fields:
|
||||
data["fields"] = [
|
||||
field for field in fields if field["field_id"] not in removed_field_ids
|
||||
]
|
||||
|
||||
views = {
|
||||
"user": [
|
||||
"website",
|
||||
|
||||
Reference in New Issue
Block a user