Improved CSV exports (#1960)

* Closes #1940
* Create Scoreboard CSV export with support for custom fields
* Create Users CSV export with support for custom fields
* Create Teams CSV export with support for custom fields
This commit is contained in:
Kevin Chung
2021-07-26 02:20:54 -04:00
committed by GitHub
parent 995ef0a6e2
commit 0ba70aa92d
3 changed files with 238 additions and 36 deletions

View File

@@ -1,7 +1,7 @@
import csv
import datetime
import os
from io import BytesIO, StringIO
from io import StringIO
from flask import Blueprint, abort
from flask import current_app as app
@@ -39,11 +39,10 @@ from CTFd.models import (
Unlocks,
Users,
db,
get_class_by_tablename,
)
from CTFd.utils import config as ctf_config
from CTFd.utils import get_config, set_config
from CTFd.utils.csv import load_challenges_csv, load_teams_csv, load_users_csv
from CTFd.utils.csv import dump_csv, load_challenges_csv, load_teams_csv, load_users_csv
from CTFd.utils.decorators import admins_only
from CTFd.utils.exports import export_ctf as export_ctf_util
from CTFd.utils.exports import import_ctf as import_ctf_util
@@ -149,31 +148,7 @@ def import_csv():
def export_csv():
table = request.args.get("table")
# TODO: It might make sense to limit dumpable tables. Config could potentially leak sensitive information.
model = get_class_by_tablename(table)
if model is None:
abort(404)
temp = StringIO()
writer = csv.writer(temp)
header = [column.name for column in model.__mapper__.columns]
writer.writerow(header)
responses = model.query.all()
for curr in responses:
writer.writerow(
[getattr(curr, column.name) for column in model.__mapper__.columns]
)
temp.seek(0)
# In Python 3 send_file requires bytes
output = BytesIO()
output.write(temp.getvalue().encode("utf-8"))
output.seek(0)
temp.close()
output = dump_csv(name=table)
return send_file(
output,

View File

@@ -4,7 +4,7 @@ from wtforms.widgets.html5 import NumberInput
from CTFd.forms import BaseForm
from CTFd.forms.fields import SubmitField
from CTFd.models import db
from CTFd.utils.csv import get_dumpable_tables
class ResetInstanceForm(BaseForm):
@@ -74,12 +74,7 @@ class AccountSettingsForm(BaseForm):
class ExportCSVForm(BaseForm):
table = SelectField(
"Database Table",
choices=list(
zip(sorted(db.metadata.tables.keys()), sorted(db.metadata.tables.keys()))
),
)
table = SelectField("Database Table", choices=get_dumpable_tables())
submit = SubmitField("Download CSV")

View File

@@ -1,7 +1,232 @@
import csv
import json
from io import BytesIO, StringIO
from CTFd.models import Flags, Hints, Tags, Teams, Users, db
from CTFd.models import (
Flags,
Hints,
Tags,
TeamFields,
Teams,
UserFields,
Users,
db,
get_class_by_tablename,
)
from CTFd.plugins.challenges import get_chal_class
from CTFd.utils.config import is_teams_mode, is_users_mode
from CTFd.utils.scores import get_standings
def get_dumpable_tables():
csv_keys = list(CSV_KEYS.keys())
db_keys = list(db.metadata.tables.keys())
tables = csv_keys + db_keys
table_keys = list(zip(tables, tables))
return table_keys
def dump_csv(name):
dump_func = CSV_KEYS.get(name)
if dump_func:
return dump_func()
elif get_class_by_tablename(name):
return dump_database_table(tablename=name)
else:
raise KeyError
def dump_scoreboard_csv():
# TODO: Add fields to scoreboard data
temp = StringIO()
writer = csv.writer(temp)
standings = get_standings()
# Get all user fields in a specific order
user_fields = UserFields.query.all()
user_field_ids = [f.id for f in user_fields]
user_field_names = [f.name for f in user_fields]
if is_teams_mode():
team_fields = TeamFields.query.all()
team_field_ids = [f.id for f in team_fields]
team_field_names = [f.name for f in team_fields]
header = (
[
"place",
"team",
"team id",
"score",
"member name",
"member id",
"member email",
"member score",
]
+ user_field_names
+ team_field_names
)
writer.writerow(header)
for i, standing in enumerate(standings):
team = Teams.query.filter_by(id=standing.account_id).first()
# Build field entries using the order of the field values
team_field_entries = {f.field_id: f.value for f in team.field_entries}
team_field_values = [
team_field_entries.get(f_id, "") for f_id in team_field_ids
]
team_row = [
i + 1,
team.name,
team.id,
standing.score,
"",
"",
] + team_field_values
writer.writerow(team_row)
for member in team.members:
user_field_entries = {f.field_id: f.value for f in member.field_entries}
user_field_values = [
user_field_entries.get(f_id, "") for f_id in user_field_ids
]
user_row = [
"",
"",
"",
"",
member.name,
member.id,
member.email,
member.score,
] + user_field_values
writer.writerow(user_row)
elif is_users_mode():
header = ["place", "user", "score"] + user_field_names
writer.writerow(header)
for i, standing in enumerate(standings):
user = Users.query.filter_by(id=standing.account_id).first()
# Build field entries using the order of the field values
user_field_entries = {f.field_id: f.value for f in user.field_entries}
user_field_values = [
user_field_entries.get(f_id, "") for f_id in user_field_ids
]
user_row = [i + 1, user.name, standing.score] + user_field_values
writer.writerow(user_row)
# In Python 3 send_file requires bytes
output = BytesIO()
output.write(temp.getvalue().encode("utf-8"))
output.seek(0)
temp.close()
return output
def dump_users_with_fields_csv():
temp = StringIO()
writer = csv.writer(temp)
user_fields = UserFields.query.all()
user_field_ids = [f.id for f in user_fields]
user_field_names = [f.name for f in user_fields]
header = [column.name for column in Users.__mapper__.columns] + user_field_names
writer.writerow(header)
responses = Users.query.all()
for curr in responses:
user_field_entries = {f.field_id: f.value for f in curr.field_entries}
user_field_values = [
user_field_entries.get(f_id, "") for f_id in user_field_ids
]
user_row = [
getattr(curr, column.name) for column in Users.__mapper__.columns
] + user_field_values
writer.writerow(user_row)
temp.seek(0)
# In Python 3 send_file requires bytes
output = BytesIO()
output.write(temp.getvalue().encode("utf-8"))
output.seek(0)
temp.close()
return output
def dump_teams_with_fields_csv():
temp = StringIO()
writer = csv.writer(temp)
team_fields = TeamFields.query.all()
team_field_ids = [f.id for f in team_fields]
team_field_names = [f.name for f in team_fields]
header = [column.name for column in Teams.__mapper__.columns] + team_field_names
writer.writerow(header)
responses = Teams.query.all()
for curr in responses:
team_field_entries = {f.field_id: f.value for f in curr.field_entries}
team_field_values = [
team_field_entries.get(f_id, "") for f_id in team_field_ids
]
team_row = [
getattr(curr, column.name) for column in Teams.__mapper__.columns
] + team_field_values
writer.writerow(team_row)
temp.seek(0)
# In Python 3 send_file requires bytes
output = BytesIO()
output.write(temp.getvalue().encode("utf-8"))
output.seek(0)
temp.close()
return output
def dump_database_table(tablename):
# TODO: It might make sense to limit dumpable tables. Config could potentially leak sensitive information.
model = get_class_by_tablename(tablename)
if model is None:
raise KeyError("Unknown database table")
temp = StringIO()
writer = csv.writer(temp)
header = [column.name for column in model.__mapper__.columns]
writer.writerow(header)
responses = model.query.all()
for curr in responses:
writer.writerow(
[getattr(curr, column.name) for column in model.__mapper__.columns]
)
temp.seek(0)
# In Python 3 send_file requires bytes
output = BytesIO()
output.write(temp.getvalue().encode("utf-8"))
output.seek(0)
temp.close()
return output
def load_users_csv(dict_reader):
@@ -57,3 +282,10 @@ def load_challenges_csv(dict_reader):
db.session.add(h)
db.session.commit()
return True
CSV_KEYS = {
"scoreboard": dump_scoreboard_csv,
"users+fields": dump_users_with_fields_csv,
"teams+fields": dump_teams_with_fields_csv,
}