diff --git a/CTFd/challenges.py b/CTFd/challenges.py index 5763542..7cfd824 100644 --- a/CTFd/challenges.py +++ b/CTFd/challenges.py @@ -289,13 +289,13 @@ def chal(chalid): data = (time.strftime("%m/%d/%Y %X"), session['username'].encode('utf-8'), request.form['key'].encode('utf-8'), utils.get_kpm(session['id'])) print("[{0}] {1} submitted {2} with kpm {3}".format(*data)) + chal = Challenges.query.filter_by(id=chalid).first_or_404() + chal_class = get_chal_class(chal.type) + # Anti-bruteforce / submitting keys too quickly if utils.get_kpm(session['id']) > 10: if utils.ctftime(): - wrong = WrongKeys(teamid=session['id'], chalid=chalid, ip=utils.get_ip(), flag=request.form['key'].strip()) - db.session.add(wrong) - db.session.commit() - db.session.close() + chal_class.fail(team=team, chal=chal, request=request) logger.warn("[{0}] {1} submitted {2} with kpm {3} [TOO FAST]".format(*data)) # return '3' # Submitting too fast return jsonify({'status': 3, 'message': "You're submitting keys too fast. Slow down."}) @@ -304,7 +304,6 @@ def chal(chalid): # Challange not solved yet if not solves: - chal = Challenges.query.filter_by(id=chalid).first_or_404() provided_key = request.form['key'].strip() saved_keys = Keys.query.filter_by(chal=chal.id).all() @@ -316,7 +315,6 @@ def chal(chalid): 'message': "You have 0 tries remaining" }) - chal_class = get_chal_class(chal.type) status, message = chal_class.attempt(chal, request) if status: # The challenge plugin says the input is right if utils.ctftime() or utils.is_admin(): diff --git a/tests/user/test_challenges.py b/tests/user/test_challenges.py index 33c18be..a7b5e36 100644 --- a/tests/user/test_challenges.py +++ b/tests/user/test_challenges.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from CTFd.models import Teams, Solves, WrongKeys +from CTFd.models import Teams, Solves, WrongKeys, Challenges from CTFd.utils import get_config, set_config from CTFd import utils from tests.helpers import * @@ -144,6 +144,86 @@ def test_submitting_unicode_flag(): destroy_ctfd(app) +def test_challenges_with_max_attempts(): + """Test that users are locked out of a challenge after they reach max_attempts""" + app = create_ctfd() + with app.app_context(): + register_user(app) + client = login_as_user(app) + chal = gen_challenge(app.db) + chal = Challenges.query.filter_by(id=chal.id).first() + chal_id = chal.id + chal.max_attempts = 3 + app.db.session.commit() + + flag = gen_flag(app.db, chal=chal.id, flag=u'flag') + for x in range(3): + with client.session_transaction() as sess: + data = { + "key": 'notflag', + "nonce": sess.get('nonce') + } + r = client.post('/chal/{}'.format(chal_id), data=data) + + wrong_keys = WrongKeys.query.all() + assert len(wrong_keys) == 3 + + with client.session_transaction() as sess: + data = { + "key": 'flag', + "nonce": sess.get('nonce') + } + r = client.post('/chal/{}'.format(chal_id), data=data) + assert r.status_code == 200 + + resp = json.loads(r.data.decode('utf8')) + assert resp.get('status') == 0 and resp.get('message') == "You have 0 tries remaining" + + solves = Solves.query.all() + assert len(solves) == 0 + destroy_ctfd(app) + + +def test_challenge_kpm_limit(): + """Test that users are properly ratelimited when submitting flags""" + app = create_ctfd() + with app.app_context(): + register_user(app) + client = login_as_user(app) + chal = gen_challenge(app.db) + chal_id = chal.id + + flag = gen_flag(app.db, chal=chal.id, flag=u'flag') + for x in range(11): + with client.session_transaction() as sess: + data = { + "key": 'notflag', + "nonce": sess.get('nonce') + } + r = client.post('/chal/{}'.format(chal_id), data=data) + + wrong_keys = WrongKeys.query.all() + assert len(wrong_keys) == 11 + + with client.session_transaction() as sess: + data = { + "key": 'flag', + "nonce": sess.get('nonce') + } + r = client.post('/chal/{}'.format(chal_id), data=data) + assert r.status_code == 200 + + wrong_keys = WrongKeys.query.all() + assert len(wrong_keys) == 12 + + resp = json.loads(r.data.decode('utf8')) + assert resp.get('status') == 3 and resp.get('message') == "You're submitting keys too fast. Slow down." + + solves = Solves.query.all() + assert len(solves) == 0 + destroy_ctfd(app) + + def test_submitting_flags_with_large_ips(): '''Test that users with high octect IP addresses can submit flags''' app = create_ctfd()