mirror of
https://github.com/aljazceru/CTFd.git
synced 2025-12-18 06:24:23 +01:00
Improve Challenge type plugin staticmethods (#394)
This passes more logic into the challenge classes so that it can be accessed by plugins
This commit is contained in:
@@ -281,6 +281,7 @@ def chal(chalid):
|
||||
if not utils.user_can_view_challenges():
|
||||
return redirect(url_for('auth.login', next=request.path))
|
||||
if (utils.authed() and utils.is_verified() and (utils.ctf_started() or utils.view_after_ctf())) or utils.is_admin():
|
||||
team = Teams.query.filter_by(id=session['id']).first()
|
||||
fails = WrongKeys.query.filter_by(teamid=session['id'], chalid=chalid).count()
|
||||
logger = logging.getLogger('keys')
|
||||
data = (time.strftime("%m/%d/%Y %X"), session['username'].encode('utf-8'), request.form['key'].encode('utf-8'), utils.get_kpm(session['id']))
|
||||
@@ -314,21 +315,15 @@ def chal(chalid):
|
||||
})
|
||||
|
||||
chal_class = get_chal_class(chal.type)
|
||||
status, message = chal_class.solve(chal, provided_key)
|
||||
status, message = chal_class.attempt(chal, request)
|
||||
if status: # The challenge plugin says the input is right
|
||||
if utils.ctftime() or utils.is_admin():
|
||||
solve = Solves(teamid=session['id'], chalid=chalid, ip=utils.get_ip(), flag=provided_key)
|
||||
db.session.add(solve)
|
||||
db.session.commit()
|
||||
db.session.close()
|
||||
chal_class.solve(team=team, chal=chal, request=request)
|
||||
logger.info("[{0}] {1} submitted {2} with kpm {3} [CORRECT]".format(*data))
|
||||
return jsonify({'status': 1, 'message': message})
|
||||
else: # The challenge plugin says the input is wrong
|
||||
if utils.ctftime() or utils.is_admin():
|
||||
wrong = WrongKeys(teamid=session['id'], chalid=chalid, ip=utils.get_ip(), flag=provided_key)
|
||||
db.session.add(wrong)
|
||||
db.session.commit()
|
||||
db.session.close()
|
||||
chal_class.fail(team=team, chal=chal, request=request)
|
||||
logger.info("[{0}] {1} submitted {2} with kpm {3} [WRONG]".format(*data))
|
||||
# return '0' # key was wrong
|
||||
if max_tries:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from CTFd.plugins.keys import get_key_class
|
||||
from CTFd.models import db, Keys
|
||||
from CTFd.models import db, Solves, WrongKeys, Keys
|
||||
from CTFd import utils
|
||||
|
||||
|
||||
class BaseChallenge(object):
|
||||
@@ -12,13 +13,30 @@ class CTFdStandardChallenge(BaseChallenge):
|
||||
name = "standard"
|
||||
|
||||
@staticmethod
|
||||
def solve(chal, provided_key):
|
||||
def attempt(chal, request):
|
||||
provided_key = request.form['key'].strip()
|
||||
chal_keys = Keys.query.filter_by(chal=chal.id).all()
|
||||
for chal_key in chal_keys:
|
||||
if get_key_class(chal_key.key_type).compare(chal_key.flag, provided_key):
|
||||
return True, 'Correct'
|
||||
return False, 'Incorrect'
|
||||
|
||||
@staticmethod
|
||||
def solve(team, chal, request):
|
||||
provided_key = request.form['key'].strip()
|
||||
solve = Solves(teamid=team.id, chalid=chal.id, ip=utils.get_ip(req=request), flag=provided_key)
|
||||
db.session.add(solve)
|
||||
db.session.commit()
|
||||
db.session.close()
|
||||
|
||||
@staticmethod
|
||||
def fail(team, chal, request):
|
||||
provided_key = request.form['key'].strip()
|
||||
wrong = WrongKeys(teamid=team.id, chalid=chal.id, ip=utils.get_ip(request), flag=provided_key)
|
||||
db.session.add(wrong)
|
||||
db.session.commit()
|
||||
db.session.close()
|
||||
|
||||
|
||||
CHALLENGE_CLASSES = {
|
||||
0: CTFdStandardChallenge
|
||||
|
||||
@@ -333,7 +333,7 @@ def unix_time_to_utc(t):
|
||||
return datetime.datetime.utcfromtimestamp(t)
|
||||
|
||||
|
||||
def get_ip():
|
||||
def get_ip(req=None):
|
||||
""" Returns the IP address of the currently in scope request. The approach is to define a list of trusted proxies
|
||||
(in this case the local network), and only trust the most recently defined untrusted IP address.
|
||||
Taken from http://stackoverflow.com/a/22936947/4285524 but the generator there makes no sense.
|
||||
@@ -344,15 +344,17 @@ def get_ip():
|
||||
CTFd does not use IP address for anything besides cursory tracking of teams and it is ill-advised to do much
|
||||
more than that if you do not know what you're doing.
|
||||
"""
|
||||
if req is None:
|
||||
req = request
|
||||
trusted_proxies = app.config['TRUSTED_PROXIES']
|
||||
combined = "(" + ")|(".join(trusted_proxies) + ")"
|
||||
route = request.access_route + [request.remote_addr]
|
||||
route = req.access_route + [req.remote_addr]
|
||||
for addr in reversed(route):
|
||||
if not re.match(combined, addr): # IP is not trusted but we trust the proxies
|
||||
remote_addr = addr
|
||||
break
|
||||
else:
|
||||
remote_addr = request.remote_addr
|
||||
remote_addr = req.remote_addr
|
||||
return remote_addr
|
||||
|
||||
|
||||
|
||||
@@ -155,7 +155,9 @@ def test_submitting_flags_with_large_ips():
|
||||
ip_addresses = ['172.18.0.1', '255.255.255.255', '2001:0db8:85a3:0000:0000:8a2e:0370:7334']
|
||||
for ip_address in ip_addresses:
|
||||
# Monkeypatch get_ip
|
||||
utils.get_ip = lambda: ip_address
|
||||
def get_ip_fake(req=None):
|
||||
return ip_address
|
||||
utils.get_ip = get_ip_fake
|
||||
|
||||
# Generate challenge and flag
|
||||
chal = gen_challenge(app.db)
|
||||
|
||||
Reference in New Issue
Block a user