mirror of
https://github.com/aljazceru/CTFd.git
synced 2025-12-17 05:54:19 +01:00
333 lines
9.9 KiB
Python
333 lines
9.9 KiB
Python
from flask import abort, request
|
|
from flask_restx import Namespace, Resource
|
|
|
|
from CTFd.cache import clear_standings, clear_user_session
|
|
from CTFd.models import (
|
|
Awards,
|
|
Notifications,
|
|
Solves,
|
|
Submissions,
|
|
Tracking,
|
|
Unlocks,
|
|
Users,
|
|
db,
|
|
)
|
|
from CTFd.schemas.awards import AwardSchema
|
|
from CTFd.schemas.submissions import SubmissionSchema
|
|
from CTFd.schemas.users import UserSchema
|
|
from CTFd.utils.config import get_mail_provider
|
|
from CTFd.utils.decorators import admins_only, authed_only, ratelimit
|
|
from CTFd.utils.decorators.visibility import (
|
|
check_account_visibility,
|
|
check_score_visibility,
|
|
)
|
|
from CTFd.utils.email import sendmail, user_created_notification
|
|
from CTFd.utils.security.auth import update_user
|
|
from CTFd.utils.user import get_current_user, get_current_user_type, is_admin
|
|
|
|
users_namespace = Namespace("users", description="Endpoint to retrieve Users")
|
|
|
|
|
|
@users_namespace.route("")
|
|
class UserList(Resource):
|
|
@check_account_visibility
|
|
def get(self):
|
|
if is_admin() and request.args.get("view") == "admin":
|
|
users = Users.query.filter_by().paginate(max_per_page=100)
|
|
else:
|
|
users = Users.query.filter_by(banned=False, hidden=False).paginate(
|
|
max_per_page=100
|
|
)
|
|
|
|
response = UserSchema(view="user", many=True).dump(users.items)
|
|
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
return {
|
|
"meta": {
|
|
"pagination": {
|
|
"page": users.page,
|
|
"next": users.next_num,
|
|
"prev": users.prev_num,
|
|
"pages": users.pages,
|
|
"per_page": users.per_page,
|
|
"total": users.total,
|
|
}
|
|
},
|
|
"success": True,
|
|
"data": response.data,
|
|
}
|
|
|
|
@users_namespace.doc(
|
|
params={
|
|
"notify": "Whether to send the created user an email with their credentials"
|
|
}
|
|
)
|
|
@admins_only
|
|
def post(self):
|
|
req = request.get_json()
|
|
schema = UserSchema("admin")
|
|
response = schema.load(req)
|
|
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
db.session.add(response.data)
|
|
db.session.commit()
|
|
|
|
if request.args.get("notify"):
|
|
name = response.data.name
|
|
email = response.data.email
|
|
password = req.get("password")
|
|
|
|
user_created_notification(addr=email, name=name, password=password)
|
|
|
|
clear_standings()
|
|
|
|
response = schema.dump(response.data)
|
|
|
|
return {"success": True, "data": response.data}
|
|
|
|
|
|
@users_namespace.route("/<int:user_id>")
|
|
@users_namespace.param("user_id", "User ID")
|
|
class UserPublic(Resource):
|
|
@check_account_visibility
|
|
def get(self, user_id):
|
|
user = Users.query.filter_by(id=user_id).first_or_404()
|
|
|
|
if (user.banned or user.hidden) and is_admin() is False:
|
|
abort(404)
|
|
|
|
user_type = get_current_user_type(fallback="user")
|
|
response = UserSchema(view=user_type).dump(user)
|
|
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
response.data["place"] = user.place
|
|
response.data["score"] = user.score
|
|
|
|
return {"success": True, "data": response.data}
|
|
|
|
@admins_only
|
|
def patch(self, user_id):
|
|
user = Users.query.filter_by(id=user_id).first_or_404()
|
|
data = request.get_json()
|
|
data["id"] = user_id
|
|
schema = UserSchema(view="admin", instance=user, partial=True)
|
|
response = schema.load(data)
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
db.session.commit()
|
|
|
|
response = schema.dump(response.data)
|
|
|
|
db.session.close()
|
|
|
|
clear_user_session(user_id=user_id)
|
|
clear_standings()
|
|
|
|
return {"success": True, "data": response}
|
|
|
|
@admins_only
|
|
def delete(self, user_id):
|
|
Notifications.query.filter_by(user_id=user_id).delete()
|
|
Awards.query.filter_by(user_id=user_id).delete()
|
|
Unlocks.query.filter_by(user_id=user_id).delete()
|
|
Submissions.query.filter_by(user_id=user_id).delete()
|
|
Solves.query.filter_by(user_id=user_id).delete()
|
|
Tracking.query.filter_by(user_id=user_id).delete()
|
|
Users.query.filter_by(id=user_id).delete()
|
|
db.session.commit()
|
|
db.session.close()
|
|
|
|
clear_user_session(user_id=user_id)
|
|
clear_standings()
|
|
|
|
return {"success": True}
|
|
|
|
|
|
@users_namespace.route("/me")
|
|
class UserPrivate(Resource):
|
|
@authed_only
|
|
def get(self):
|
|
user = get_current_user()
|
|
response = UserSchema("self").dump(user).data
|
|
response["place"] = user.place
|
|
response["score"] = user.score
|
|
return {"success": True, "data": response}
|
|
|
|
@authed_only
|
|
def patch(self):
|
|
user = get_current_user()
|
|
data = request.get_json()
|
|
schema = UserSchema(view="self", instance=user, partial=True)
|
|
response = schema.load(data)
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
db.session.commit()
|
|
|
|
# Update user's session for the new session hash
|
|
update_user(user)
|
|
|
|
response = schema.dump(response.data)
|
|
db.session.close()
|
|
|
|
clear_standings()
|
|
|
|
return {"success": True, "data": response.data}
|
|
|
|
|
|
@users_namespace.route("/me/solves")
|
|
class UserPrivateSolves(Resource):
|
|
@authed_only
|
|
def get(self):
|
|
user = get_current_user()
|
|
solves = user.get_solves(admin=True)
|
|
|
|
view = "user" if not is_admin() else "admin"
|
|
response = SubmissionSchema(view=view, many=True).dump(solves)
|
|
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
return {"success": True, "data": response.data}
|
|
|
|
|
|
@users_namespace.route("/me/fails")
|
|
class UserPrivateFails(Resource):
|
|
@authed_only
|
|
def get(self):
|
|
user = get_current_user()
|
|
fails = user.get_fails(admin=True)
|
|
|
|
view = "user" if not is_admin() else "admin"
|
|
response = SubmissionSchema(view=view, many=True).dump(fails)
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
if is_admin():
|
|
data = response.data
|
|
else:
|
|
data = []
|
|
count = len(response.data)
|
|
|
|
return {"success": True, "data": data, "meta": {"count": count}}
|
|
|
|
|
|
@users_namespace.route("/me/awards")
|
|
@users_namespace.param("user_id", "User ID")
|
|
class UserPrivateAwards(Resource):
|
|
@authed_only
|
|
def get(self):
|
|
user = get_current_user()
|
|
awards = user.get_awards(admin=True)
|
|
|
|
view = "user" if not is_admin() else "admin"
|
|
response = AwardSchema(view=view, many=True).dump(awards)
|
|
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
return {"success": True, "data": response.data}
|
|
|
|
|
|
@users_namespace.route("/<user_id>/solves")
|
|
@users_namespace.param("user_id", "User ID")
|
|
class UserPublicSolves(Resource):
|
|
@check_account_visibility
|
|
@check_score_visibility
|
|
def get(self, user_id):
|
|
user = Users.query.filter_by(id=user_id).first_or_404()
|
|
|
|
if (user.banned or user.hidden) and is_admin() is False:
|
|
abort(404)
|
|
|
|
solves = user.get_solves(admin=is_admin())
|
|
|
|
view = "user" if not is_admin() else "admin"
|
|
response = SubmissionSchema(view=view, many=True).dump(solves)
|
|
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
return {"success": True, "data": response.data}
|
|
|
|
|
|
@users_namespace.route("/<user_id>/fails")
|
|
@users_namespace.param("user_id", "User ID")
|
|
class UserPublicFails(Resource):
|
|
@check_account_visibility
|
|
@check_score_visibility
|
|
def get(self, user_id):
|
|
user = Users.query.filter_by(id=user_id).first_or_404()
|
|
|
|
if (user.banned or user.hidden) and is_admin() is False:
|
|
abort(404)
|
|
fails = user.get_fails(admin=is_admin())
|
|
|
|
view = "user" if not is_admin() else "admin"
|
|
response = SubmissionSchema(view=view, many=True).dump(fails)
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
if is_admin():
|
|
data = response.data
|
|
else:
|
|
data = []
|
|
count = len(response.data)
|
|
|
|
return {"success": True, "data": data, "meta": {"count": count}}
|
|
|
|
|
|
@users_namespace.route("/<user_id>/awards")
|
|
@users_namespace.param("user_id", "User ID or 'me'")
|
|
class UserPublicAwards(Resource):
|
|
@check_account_visibility
|
|
@check_score_visibility
|
|
def get(self, user_id):
|
|
user = Users.query.filter_by(id=user_id).first_or_404()
|
|
|
|
if (user.banned or user.hidden) and is_admin() is False:
|
|
abort(404)
|
|
awards = user.get_awards(admin=is_admin())
|
|
|
|
view = "user" if not is_admin() else "admin"
|
|
response = AwardSchema(view=view, many=True).dump(awards)
|
|
|
|
if response.errors:
|
|
return {"success": False, "errors": response.errors}, 400
|
|
|
|
return {"success": True, "data": response.data}
|
|
|
|
|
|
@users_namespace.route("/<int:user_id>/email")
|
|
@users_namespace.param("user_id", "User ID")
|
|
class UserEmails(Resource):
|
|
@admins_only
|
|
@ratelimit(method="POST", limit=10, interval=60)
|
|
def post(self, user_id):
|
|
req = request.get_json()
|
|
text = req.get("text", "").strip()
|
|
user = Users.query.filter_by(id=user_id).first_or_404()
|
|
|
|
if get_mail_provider() is None:
|
|
return (
|
|
{"success": False, "errors": {"": ["Email settings not configured"]}},
|
|
400,
|
|
)
|
|
|
|
if not text:
|
|
return (
|
|
{"success": False, "errors": {"text": ["Email text cannot be empty"]}},
|
|
400,
|
|
)
|
|
|
|
result, response = sendmail(addr=user.email, text=text)
|
|
|
|
return {"success": result, "data": {}}
|