Add a session invalidation strategy inspired by Django (#1449)

* Use a session invalidation strategy inspired by Django. https://docs.djangoproject.com/en/3.0/topics/auth/default/#session-invalidation-on-password-change
* Closes #1430
This commit is contained in:
Kevin Chung
2020-05-29 14:01:21 -04:00
committed by GitHub
parent 50f75be5eb
commit 90adffcbdd
5 changed files with 88 additions and 3 deletions

View File

@@ -22,6 +22,7 @@ from CTFd.utils.decorators.visibility import (
check_score_visibility,
)
from CTFd.utils.email import sendmail, user_created_notification
from CTFd.utils.security.auth import update_user
from CTFd.utils.user import get_current_user, get_current_user_type, is_admin
users_namespace = Namespace("users", description="Endpoint to retrieve Users")
@@ -155,7 +156,9 @@ class UserPrivate(Resource):
db.session.commit()
clear_user_session(user_id=user.id)
# Update user's session for the new session hash
update_user(user)
response = schema.dump(response.data)
db.session.close()

View File

@@ -8,6 +8,7 @@ from CTFd.exceptions import UserNotFoundException, UserTokenExpiredException
from CTFd.models import UserTokens, db
from CTFd.utils.encoding import hexencode
from CTFd.utils.security.csrf import generate_nonce
from CTFd.utils.security.signing import hmac
def login_user(user):
@@ -15,6 +16,17 @@ def login_user(user):
session["name"] = user.name
session["email"] = user.email
session["nonce"] = generate_nonce()
session["hash"] = hmac(user.password)
# Clear out any currently cached user attributes
clear_user_session(user_id=user.id)
def update_user(user):
session["id"] = user.id
session["name"] = user.name
session["email"] = user.email
session["hash"] = hmac(user.password)
# Clear out any currently cached user attributes
clear_user_session(user_id=user.id)

View File

@@ -1,3 +1,7 @@
import hashlib
import hmac as _hmac
import six
from flask import current_app
from itsdangerous import Signer
from itsdangerous.exc import ( # noqa: F401
@@ -7,6 +11,8 @@ from itsdangerous.exc import ( # noqa: F401
)
from itsdangerous.url_safe import URLSafeTimedSerializer
from CTFd.utils import string_types
def serialize(data, secret=None):
if secret is None:
@@ -34,3 +40,16 @@ def unsign(data, secret=None):
secret = current_app.config["SECRET_KEY"]
s = Signer(secret)
return s.unsign(data)
def hmac(data, secret=None, digest=hashlib.sha1):
if secret is None:
secret = current_app.config["SECRET_KEY"]
if six.PY3:
if isinstance(data, string_types):
data = data.encode("utf-8")
if isinstance(secret, string_types):
secret = secret.encode("utf-8")
return _hmac.new(key=secret, msg=data, digestmod=digest).hexdigest()

View File

@@ -2,18 +2,28 @@ import datetime
import re
from flask import current_app as app
from flask import request, session
from flask import abort, redirect, request, session, url_for
from CTFd.cache import cache
from CTFd.constants.users import UserAttrs
from CTFd.constants.teams import TeamAttrs
from CTFd.models import Fails, Users, db, Teams, Tracking
from CTFd.utils import get_config
from CTFd.utils.security.signing import hmac
from CTFd.utils.security.auth import logout_user
def get_current_user():
if authed():
user = Users.query.filter_by(id=session["id"]).first()
# Check if the session is still valid
session_hash = session.get("hash")
if session_hash:
if session_hash != hmac(user.password):
logout_user()
abort(redirect(url_for("auth.login", next=request.full_path)))
return user
else:
return None

View File

@@ -1,4 +1,4 @@
from tests.helpers import create_ctfd, destroy_ctfd
from tests.helpers import create_ctfd, destroy_ctfd, login_as_user, register_user
def test_sessions_set_httponly():
@@ -19,3 +19,44 @@ def test_sessions_set_samesite():
cookie = dict(r.headers)["Set-Cookie"]
assert "SameSite=" in cookie
destroy_ctfd(app)
def test_session_invalidation_on_admin_password_change():
app = create_ctfd()
with app.app_context():
register_user(app)
with login_as_user(app, name="admin") as admin, login_as_user(app) as user:
r = user.get("/settings")
assert r.status_code == 200
r = admin.patch("/api/v1/users/2", json={"password": "password2"})
assert r.status_code == 200
r = user.get("/settings")
# User's password was changed
# They should be logged out
assert r.location.startswith("http://localhost/login")
assert r.status_code == 302
destroy_ctfd(app)
def test_session_invalidation_on_user_password_change():
app = create_ctfd()
with app.app_context():
register_user(app)
with login_as_user(app) as user:
r = user.get("/settings")
assert r.status_code == 200
data = {"confirm": "password", "password": "new_password"}
r = user.patch("/api/v1/users/me", json=data)
assert r.status_code == 200
r = user.get("/settings")
# User initiated their own password change
# They should not be logged out
assert r.status_code == 200
destroy_ctfd(app)