mirror of
https://github.com/aljazceru/CTFd.git
synced 2025-12-17 14:04:20 +01:00
Fix creating users, teams from the API (#768)
* Fix creating users, teams from the API, hash password in models vs in schemas, stop caching CSS at the decorator level, fix tests * Fix whitelisted emails and add test * Set proper defaults in accounts config
This commit is contained in:
@@ -153,7 +153,7 @@ def register():
|
||||
if not valid_email:
|
||||
errors.append("Please enter a valid email address")
|
||||
if domain_whitelist:
|
||||
domain_whitelist = domain_whitelist.split(',')
|
||||
domain_whitelist = [d.strip() for d in domain_whitelist.split(',')]
|
||||
if domain not in domain_whitelist:
|
||||
errors.append(
|
||||
"Only email addresses under {domains} may register".format(
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_marshmallow import Marshmallow
|
||||
from passlib.hash import bcrypt_sha256
|
||||
from sqlalchemy import TypeDecorator, String, func, types, CheckConstraint, and_
|
||||
from sqlalchemy.sql.expression import union_all
|
||||
from sqlalchemy.types import JSON, NullType
|
||||
from sqlalchemy.orm import validates, column_property
|
||||
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
|
||||
from sqlalchemy.sql import or_, and_, any_
|
||||
from CTFd.utils.crypto import hash_password
|
||||
from CTFd.cache import cache
|
||||
import datetime
|
||||
@@ -301,8 +299,10 @@ class Users(db.Model):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(Users, self).__init__(**kwargs)
|
||||
if kwargs.get('password'):
|
||||
self.password = hash_password(str(kwargs['password']))
|
||||
|
||||
@validates('password')
|
||||
def validate_password(self, key, plaintext):
|
||||
return hash_password(str(plaintext))
|
||||
|
||||
@hybrid_property
|
||||
def account_id(self):
|
||||
@@ -491,8 +491,10 @@ class Teams(db.Model):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(Teams, self).__init__(**kwargs)
|
||||
if kwargs.get('password'):
|
||||
self.password = hash_password(str(kwargs['password']))
|
||||
|
||||
@validates('password')
|
||||
def validate_password(self, key, plaintext):
|
||||
return hash_password(str(plaintext))
|
||||
|
||||
@property
|
||||
def solves(self):
|
||||
|
||||
@@ -4,8 +4,10 @@ from marshmallow import validate, ValidationError, pre_load
|
||||
from marshmallow_sqlalchemy import field_for
|
||||
from CTFd.models import ma, Teams
|
||||
from CTFd.utils.validators import validate_country_code
|
||||
from CTFd.utils.user import is_admin, get_current_team
|
||||
from CTFd.utils.countries import lookup_country_code
|
||||
from CTFd.utils.user import is_admin, get_current_team
|
||||
from CTFd.utils.crypto import verify_password, hash_password
|
||||
|
||||
|
||||
class TeamSchema(ma.ModelSchema):
|
||||
@@ -90,6 +92,25 @@ class TeamSchema(ma.ModelSchema):
|
||||
if obj.id != get_current_team().id:
|
||||
raise ValidationError('Email address has already been used', field_names=['email'])
|
||||
|
||||
@pre_load
|
||||
def validate_password_confirmation(self, data):
|
||||
password = data.get('password')
|
||||
confirm = data.get('confirm')
|
||||
target_team = get_current_team()
|
||||
|
||||
if is_admin():
|
||||
pass
|
||||
else:
|
||||
if password and (confirm is None):
|
||||
raise ValidationError('Please confirm your current password', field_names=['confirm'])
|
||||
|
||||
if password and confirm:
|
||||
test = verify_password(plaintext=confirm, ciphertext=target_team.password)
|
||||
if test is True:
|
||||
return data
|
||||
else:
|
||||
raise ValidationError('Your previous password is incorrect', field_names=['confirm'])
|
||||
|
||||
views = {
|
||||
'user': [
|
||||
'website',
|
||||
|
||||
@@ -105,9 +105,7 @@ class UserSchema(ma.ModelSchema):
|
||||
user_id = data.get('id')
|
||||
|
||||
if is_admin():
|
||||
if password:
|
||||
data['password'] = hash_password(data['password'])
|
||||
return data
|
||||
pass
|
||||
else:
|
||||
if password and (confirm is None):
|
||||
raise ValidationError('Please confirm your current password', field_names=['confirm'])
|
||||
@@ -115,7 +113,6 @@ class UserSchema(ma.ModelSchema):
|
||||
if password and confirm:
|
||||
test = verify_password(plaintext=confirm, ciphertext=target_user.password)
|
||||
if test is True:
|
||||
data['password'] = hash_password(data['password'])
|
||||
return data
|
||||
else:
|
||||
raise ValidationError('Your previous password is incorrect', field_names=['confirm'])
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
</label>
|
||||
<select class="form-control" name="verify_emails">
|
||||
<option value="true" {% if verify_emails == True %}selected{% endif %}>Enabled</option>
|
||||
<option value="false" {% if verify_emails == False %}selected{% endif %}>Disabled</option>
|
||||
<option value="false" {% if verify_emails == False or verify_emails == None %}selected{% endif %}>Disabled</option>
|
||||
</select>
|
||||
</div>
|
||||
|
||||
@@ -37,26 +37,11 @@
|
||||
</small>
|
||||
</label>
|
||||
<select class="form-control" name="name_changes">
|
||||
<option value="true" {% if name_changes== True %}selected{% endif %}>Enabled</option>
|
||||
<option value="true" {% if name_changes == True or name_changes == None %}selected{% endif %}>Enabled</option>
|
||||
<option value="false" {% if name_changes == False %}selected{% endif %}>Disabled</option>
|
||||
</select>
|
||||
</div>
|
||||
|
||||
{# <div class="form-check">#}
|
||||
{# <label>#}
|
||||
{# <input id="verify_emails" name="verify_emails" type="checkbox" {% if verify_emails %}checked{% endif %}>#}
|
||||
{# Only allow users with verified emails#}
|
||||
{# </label>#}
|
||||
{# </div>#}
|
||||
|
||||
{# <div class="form-check">#}
|
||||
{# <label>#}
|
||||
{# <input id="prevent_name_change" name="prevent_name_change" type="checkbox"#}
|
||||
{# {% if prevent_name_change %}checked{% endif %}>#}
|
||||
{# Prevent team name changes#}
|
||||
{# </label>#}
|
||||
{# </div>#}
|
||||
|
||||
<button type="submit" class="btn btn-md btn-primary float-right">Update</button>
|
||||
</form>
|
||||
</div>
|
||||
@@ -152,7 +152,6 @@ def settings():
|
||||
|
||||
|
||||
@views.route('/static/user.css')
|
||||
@cache.cached(timeout=300)
|
||||
def custom_css():
|
||||
"""
|
||||
Custom CSS Handler route
|
||||
|
||||
@@ -2,12 +2,13 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from CTFd.utils import set_config
|
||||
from CTFd.utils.crypto import verify_password
|
||||
from tests.helpers import *
|
||||
|
||||
|
||||
def test_api_teams_get_public():
|
||||
"""Can a user get /api/v1/teams if teams are public"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
set_config('account_visibility', 'public')
|
||||
@@ -24,7 +25,7 @@ def test_api_teams_get_public():
|
||||
|
||||
def test_api_teams_get_private():
|
||||
"""Can a user get /api/v1/teams if teams are private"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
register_user(app)
|
||||
with login_as_user(app) as client:
|
||||
@@ -43,7 +44,7 @@ def test_api_teams_get_private():
|
||||
|
||||
def test_api_teams_get_admin():
|
||||
"""Can a user get /api/v1/teams if teams are viewed by admins only"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with login_as_user(app, 'admin') as client:
|
||||
set_config('account_visibility', 'public')
|
||||
@@ -60,7 +61,7 @@ def test_api_teams_get_admin():
|
||||
|
||||
def test_api_teams_post_non_admin():
|
||||
"""Can a user post /api/v1/teams if not admin"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
r = client.post('/api/v1/teams', json="")
|
||||
@@ -70,24 +71,47 @@ def test_api_teams_post_non_admin():
|
||||
|
||||
def test_api_teams_post_admin():
|
||||
"""Can a user post /api/v1/teams if admin"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with login_as_user(app, 'admin') as client:
|
||||
r = client.post('/api/v1/teams', json={
|
||||
"website": "http://www.team.com",
|
||||
"name": "team",
|
||||
"country": "TW",
|
||||
"email": "team@team.com",
|
||||
"affiliation": "team",
|
||||
"password": "pass"
|
||||
})
|
||||
# Create team
|
||||
r = client.post(
|
||||
'/api/v1/teams',
|
||||
json={
|
||||
"website": "http://www.team.com",
|
||||
"name": "team",
|
||||
"country": "TW",
|
||||
"email": "team@team.com",
|
||||
"affiliation": "team",
|
||||
"password": "password"
|
||||
}
|
||||
)
|
||||
assert r.status_code == 200
|
||||
|
||||
# Make sure password was hashed properly
|
||||
team = Teams.query.filter_by(email='team@team.com').first()
|
||||
assert team
|
||||
assert verify_password('password', team.password)
|
||||
|
||||
# Make sure team can actually be joined
|
||||
register_user(app)
|
||||
client = login_as_user(app)
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
data = {
|
||||
"name": "team",
|
||||
"password": "password",
|
||||
"nonce": sess.get('nonce')
|
||||
}
|
||||
r = client.post('/teams/join', data=data)
|
||||
user = Users.query.filter_by(id=2).first()
|
||||
assert user.team_id == 1
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_api_team_get_public():
|
||||
"""Can a user get /api/v1/team/<team_id> if teams are public"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
set_config('account_visibility', 'public')
|
||||
@@ -105,7 +129,7 @@ def test_api_team_get_public():
|
||||
|
||||
def test_api_team_get_private():
|
||||
"""Can a user get /api/v1/teams/<team_id> if teams are private"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
register_user(app)
|
||||
with login_as_user(app) as client:
|
||||
@@ -125,7 +149,7 @@ def test_api_team_get_private():
|
||||
|
||||
def test_api_team_get_admin():
|
||||
"""Can a user get /api/v1/teams/<team_id> if teams are viewed by admins only"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with login_as_user(app, 'admin') as client:
|
||||
gen_team(app.db)
|
||||
@@ -143,7 +167,7 @@ def test_api_team_get_admin():
|
||||
|
||||
def test_api_team_patch_non_admin():
|
||||
"""Can a user patch /api/v1/teams/<team_id> if not admin"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
gen_team(app.db)
|
||||
with app.test_client() as client:
|
||||
@@ -154,22 +178,26 @@ def test_api_team_patch_non_admin():
|
||||
|
||||
def test_api_team_patch_admin():
|
||||
"""Can a user patch /api/v1/teams/<team_id> if admin"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
gen_team(app.db)
|
||||
with login_as_user(app, 'admin') as client:
|
||||
r = client.patch('/api/v1/teams/1', json={
|
||||
"name": "team_name",
|
||||
"password": "password",
|
||||
"affiliation": "changed"
|
||||
})
|
||||
team = Teams.query.filter_by(id=1).first()
|
||||
assert r.status_code == 200
|
||||
assert r.get_json()['data']['affiliation'] == 'changed'
|
||||
assert verify_password('password', team.password)
|
||||
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_api_team_delete_non_admin():
|
||||
"""Can a user delete /api/v1/teams/<team_id> if not admin"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
gen_team(app.db)
|
||||
with app.test_client() as client:
|
||||
@@ -180,7 +208,7 @@ def test_api_team_delete_non_admin():
|
||||
|
||||
def test_api_team_delete_admin():
|
||||
"""Can a user patch /api/v1/teams/<team_id> if admin"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
gen_team(app.db)
|
||||
with login_as_user(app, 'admin') as client:
|
||||
@@ -192,7 +220,7 @@ def test_api_team_delete_admin():
|
||||
|
||||
def test_api_team_get_me_not_logged_in():
|
||||
"""Can a user get /api/v1/teams/me if not logged in"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
r = client.get('/api/v1/teams/me')
|
||||
@@ -217,7 +245,7 @@ def test_api_team_get_me_logged_in():
|
||||
|
||||
def test_api_team_patch_me_not_logged_in():
|
||||
"""Can a user patch /api/v1/teams/me if not logged in"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
r = client.patch('/api/v1/teams/me', json="")
|
||||
@@ -242,7 +270,7 @@ def test_api_team_patch_me_logged_in():
|
||||
|
||||
def test_api_team_get_me_solves_not_logged_in():
|
||||
"""Can a user get /api/v1/teams/me/solves if not logged in"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
r = client.get('/api/v1/teams/me/solves')
|
||||
@@ -284,7 +312,7 @@ def test_api_team_get_solves():
|
||||
|
||||
def test_api_team_get_me_fails_not_logged_in():
|
||||
"""Can a user get /api/v1/teams/me/fails if not logged in"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
r = client.get('/api/v1/teams/me/fails')
|
||||
@@ -326,7 +354,7 @@ def test_api_team_get_fails():
|
||||
|
||||
def test_api_team_get_me_awards_not_logged_in():
|
||||
"""Can a user get /api/v1/teams/me/awards if not logged in"""
|
||||
app = create_ctfd()
|
||||
app = create_ctfd(user_mode="teams")
|
||||
with app.app_context():
|
||||
with app.test_client() as client:
|
||||
r = client.get('/api/v1/teams/me/awards')
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from CTFd.utils import set_config
|
||||
from CTFd.utils.crypto import verify_password
|
||||
from tests.helpers import *
|
||||
|
||||
|
||||
@@ -71,12 +72,23 @@ def test_api_users_post_admin():
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
with login_as_user(app, 'admin') as client:
|
||||
# Create user
|
||||
r = client.post('/api/v1/users', json={
|
||||
"name": "user",
|
||||
"email": "user@user.com",
|
||||
"password": "pass"
|
||||
"password": "password"
|
||||
})
|
||||
assert r.status_code == 200
|
||||
|
||||
# Make sure password was hashed properly
|
||||
user = Users.query.filter_by(email='user@user.com').first()
|
||||
assert user
|
||||
assert verify_password('password', user.password)
|
||||
|
||||
# Make sure user can login with the creds
|
||||
client = login_as_user(app)
|
||||
r = client.get('/profile')
|
||||
assert r.status_code == 200
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
|
||||
@@ -41,3 +41,24 @@ def test_themes_escape_html():
|
||||
assert r.status_code == 200
|
||||
assert "<script>alert(1)</script>" not in r.get_data(as_text=True)
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_custom_css():
|
||||
"""Config should be able to properly set CSS"""
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
|
||||
with login_as_user(app, "admin") as admin:
|
||||
css_value = """.test{}"""
|
||||
css_value2 = """.test2{}"""
|
||||
r = admin.patch('/api/v1/configs', json={"css": css_value})
|
||||
assert r.status_code == 200
|
||||
assert get_config('css') == css_value
|
||||
|
||||
r = admin.get('/static/user.css')
|
||||
assert r.get_data(as_text=True) == css_value
|
||||
|
||||
r = admin.patch('/api/v1/configs', json={"css": css_value2})
|
||||
r = admin.get('/static/user.css')
|
||||
assert r.get_data(as_text=True) == css_value2
|
||||
destroy_ctfd(app)
|
||||
|
||||
@@ -51,6 +51,25 @@ def test_register_duplicate_email():
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_register_whitelisted_email():
|
||||
"""A user shouldn't be able to register with an email that isn't on the whitelist"""
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
set_config('domain_whitelist', 'whitelisted.com, whitelisted.org, whitelisted.net')
|
||||
register_user(app, name="not_whitelisted", email='user@nope.com')
|
||||
assert Users.query.count() == 1
|
||||
|
||||
register_user(app, name="user1", email='user@whitelisted.com')
|
||||
assert Users.query.count() == 2
|
||||
|
||||
register_user(app, name="user2", email='user@whitelisted.org')
|
||||
assert Users.query.count() == 3
|
||||
|
||||
register_user(app, name="user3", email='user@whitelisted.net')
|
||||
assert Users.query.count() == 4
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_user_bad_login():
|
||||
"""A user should not be able to login with an incorrect password"""
|
||||
app = create_ctfd()
|
||||
|
||||
Reference in New Issue
Block a user