CTFd/tests/users/test_challenges.py

579 lines
18 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from CTFd.models import Teams, Solves, Fails, Challenges
from CTFd.utils import get_config, set_config
from tests.helpers import *
from freezegun import freeze_time
from mock import patch
import json
def test_user_get_challenges():
"""
Can a registered user load /challenges
"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/challenges')
assert r.status_code == 200
destroy_ctfd(app)
def test_user_get_chals():
"""
Can a registered user load /chals
"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/api/v1/challenges')
assert r.status_code == 200
destroy_ctfd(app)
def test_viewing_challenges():
"""
Test that users can see added challenges
"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
gen_challenge(app.db)
r = client.get('/api/v1/challenges')
chals = r.get_json()['data']
assert len(chals) == 1
destroy_ctfd(app)
def test_viewing_challenge():
"""Test that users can see individual challenges"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
gen_challenge(app.db)
r = client.get('/api/v1/challenges/1')
assert r.get_json()
destroy_ctfd(app)
# def test_chals_solves():
# """Test that the /chals/solves endpoint works properly"""
# app = create_ctfd()
# with app.app_context():
# # Generate 5 users
# for c in range(1, 6):
# name = "user{}".format(c)
# email = "user{}@ctfd.io".format(c)
# register_user(app, name=name, email=email, password="password")
#
# # Generate 5 challenges
# for c in range(6):
# chal1 = gen_challenge(app.db, value=100)
#
# user_ids = list(range(2, 7))
# chal_ids = list(range(1, 6))
# for u in user_ids:
# for c in chal_ids:
# gen_solve(app.db, teamid=u, chalid=c)
# chal_ids.pop()
#
# client = login_as_user(app, name="user1")
#
# with client.session_transaction() as sess:
# r = client.get('/chals/solves')
# output = r.get_data(as_text=True)
# saved = json.loads('''{
# "1": 5,
# "2": 4,
# "3": 3,
# "4": 2,
# "5": 1,
# "6": 0
# }
# ''')
# received = json.loads(output)
# assert saved == received
# set_config('hide_scores', True)
# with client.session_transaction() as sess:
# r = client.get('/chals/solves')
# output = r.get_data(as_text=True)
# saved = json.loads('''{
# "1": -1,
# "2": -1,
# "3": -1,
# "4": -1,
# "5": -1,
# "6": -1
# }
# ''')
# received = json.loads(output)
# assert saved == received
# destroy_ctfd(app)
def test_submitting_correct_flag():
"""Test that correct flags are correct"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db)
flag = gen_flag(app.db, challenge_id=chal.id, content='flag')
data = {
"submission": 'flag',
"challenge_id": chal.id,
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 200
resp = r.get_json()['data']
assert resp.get('status') == "correct"
assert resp.get('message') == "Correct"
destroy_ctfd(app)
def test_submitting_correct_static_case_insensitive_flag():
"""Test that correct static flags are correct if the static flag is marked case_insensitive"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db)
flag = gen_flag(app.db, challenge_id=chal.id, content='flag', data="case_insensitive")
data = {
"submission": 'FLAG',
"challenge_id": chal.id,
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 200
resp = r.get_json()['data']
assert resp.get('status') == "correct"
assert resp.get('message') == "Correct"
destroy_ctfd(app)
def test_submitting_correct_regex_case_insensitive_flag():
"""Test that correct regex flags are correct if the regex flag is marked case_insensitive"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db)
flag = gen_flag(app.db, challenge_id=chal.id, type='regex', content='flag', data="case_insensitive")
data = {
"submission": 'FLAG',
"challenge_id": chal.id,
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 200
resp = r.get_json()['data']
assert resp.get('status') == "correct"
assert resp.get('message') == "Correct"
destroy_ctfd(app)
def test_submitting_incorrect_flag():
"""Test that incorrect flags are incorrect"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db)
flag = gen_flag(app.db, challenge_id=chal.id, content='flag')
data = {
"submission": 'notflag',
"challenge_id": chal.id,
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 200
resp = r.get_json()['data']
assert resp.get('status') == "incorrect"
assert resp.get('message') == "Incorrect"
destroy_ctfd(app)
def test_submitting_unicode_flag():
"""Test that users can submit a unicode flag"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db)
flag = gen_flag(app.db, challenge_id=chal.id, content=u'你好')
with client.session_transaction() as sess:
data = {
"submission": '你好',
"challenge_id": chal.id,
}
r = client.post('/api/v1/challenges/attempt'.format(chal.id), json=data)
assert r.status_code == 200
resp = r.get_json()['data']
assert resp.get('status') == "correct"
assert resp.get('message') == "Correct"
destroy_ctfd(app)
def test_challenges_with_max_attempts():
"""Test that users are locked out of a challenge after they reach max_attempts"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db)
chal = Challenges.query.filter_by(id=chal.id).first()
chal_id = chal.id
chal.max_attempts = 3
app.db.session.commit()
flag = gen_flag(app.db, challenge_id=chal.id, content=u'flag')
for x in range(3):
data = {
"submission": 'notflag',
"challenge_id": chal_id,
}
r = client.post('/api/v1/challenges/attempt'.format(chal_id), json=data)
wrong_keys = Fails.query.count()
assert wrong_keys == 3
data = {
"submission": 'flag',
"challenge_id": chal_id,
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 403
resp = r.get_json()['data']
assert resp.get('status') == "incorrect"
assert resp.get('message') == "You have 0 tries remaining"
solves = Solves.query.count()
assert solves == 0
destroy_ctfd(app)
def test_challenge_kpm_limit():
"""Test that users are properly ratelimited when submitting flags"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db)
chal_id = chal.id
flag = gen_flag(app.db, challenge_id=chal.id, content=u'flag')
for x in range(11):
with client.session_transaction() as sess:
data = {
"submission": 'notflag',
"challenge_id": chal_id,
}
r = client.post('/api/v1/challenges/attempt', json=data)
wrong_keys = Fails.query.count()
assert wrong_keys == 11
data = {
"submission": 'flag',
"challenge_id": chal_id,
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 429
wrong_keys = Fails.query.count()
assert wrong_keys == 12
resp = r.get_json()['data']
assert resp.get('status') == "ratelimited"
assert resp.get('message') == "You're submitting flags too fast. Slow down."
solves = Solves.query.count()
assert solves == 0
destroy_ctfd(app)
def test_that_view_challenges_unregistered_works():
'''Test that view_challenges_unregistered works'''
app = create_ctfd()
with app.app_context():
chal = gen_challenge(app.db, name=text_type('🐺'))
chal_id = chal.id
hint = gen_hint(app.db, chal_id)
client = app.test_client()
r = client.get('/api/v1/challenges', json='')
assert r.status_code == 403
r = client.get('/api/v1/challenges')
assert r.status_code == 302
set_config('challenge_visibility', 'public')
client = app.test_client()
r = client.get('/api/v1/challenges')
assert r.get_json()['data']
r = client.get('/api/v1/challenges/1/solves')
assert r.get_json().get('data') is not None
data = {
"submission": 'not_flag',
"challenge_id": chal_id
}
r = client.post('/api/v1/challenges/attempt'.format(chal_id), json=data)
assert r.status_code == 403
assert r.get_json().get('data').get('status') == "authentication_required"
assert r.get_json().get('data').get('message') is None
destroy_ctfd(app)
def test_hidden_challenge_is_unreachable():
"""Test that hidden challenges return 404 and do not insert a solve or wrong key"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db, state='hidden')
flag = gen_flag(app.db, challenge_id=chal.id, content='flag')
chal_id = chal.id
assert Challenges.query.count() == 1
r = client.get('/api/v1/challenges', json='')
data = r.get_json().get('data')
assert data == []
r = client.get('/api/v1/challenges/1', json='')
assert r.status_code == 404
data = r.get_json().get('data')
assert data is None
data = {
"submission": 'flag',
"challenge_id": chal_id
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 404
r = client.post('/api/v1/challenges/attempt?preview=true', json=data)
assert r.status_code == 404
assert r.get_json().get('data') is None
solves = Solves.query.count()
assert solves == 0
wrong_keys = Fails.query.count()
assert wrong_keys == 0
destroy_ctfd(app)
def test_hidden_challenge_is_unsolveable():
"""Test that hidden challenges return 404 and do not insert a solve or wrong key"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal = gen_challenge(app.db, state='hidden')
flag = gen_flag(app.db, challenge_id=chal.id, content='flag')
data = {
"submission": 'flag',
"challenge_id": chal.id
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 404
solves = Solves.query.count()
assert solves == 0
wrong_keys = Fails.query.count()
assert wrong_keys == 0
destroy_ctfd(app)
def test_challenge_with_requirements_is_unsolveable():
"""Test that a challenge with a requirement is unsolveable without first solving the requirement"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
chal1 = gen_challenge(app.db)
flag1 = gen_flag(app.db, challenge_id=chal1.id, content='flag')
requirements = {
'prerequisites': [1]
}
chal2 = gen_challenge(app.db, requirements=requirements)
app.db.session.commit()
flag2 = gen_flag(app.db, challenge_id=chal2.id, content='flag')
r = client.get('/api/v1/challenges')
challenges = r.get_json()['data']
assert len(challenges) == 1
assert challenges[0]['id'] == 1
r = client.get('/api/v1/challenges/2')
assert r.status_code == 403
assert r.get_json().get('data') is None
# Attempt to solve hidden Challenge 2
data = {
"submission": 'flag',
"challenge_id": 2
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 403
assert r.get_json().get('data') is None
# Solve Challenge 1
data = {
"submission": 'flag',
"challenge_id": 1
}
r = client.post('/api/v1/challenges/attempt', json=data)
resp = r.get_json()['data']
assert resp['status'] == 'correct'
# Challenge 2 should now be visible
r = client.get('/api/v1/challenges')
challenges = r.get_json()['data']
assert len(challenges) == 2
r = client.get('/api/v1/challenges/2')
assert r.status_code == 200
assert r.get_json().get('data')['id'] == 2
# Attempt to solve the now-visible Challenge 2
data = {
"submission": 'flag',
"challenge_id": 2
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 200
assert resp['status'] == 'correct'
destroy_ctfd(app)
def test_challenges_cannot_be_solved_while_paused():
"""Test that challenges cannot be solved when the CTF is paused"""
app = create_ctfd()
with app.app_context():
set_config('paused', True)
register_user(app)
client = login_as_user(app)
r = client.get('/challenges')
assert r.status_code == 200
# Assert that there is a paused message
data = r.get_data(as_text=True)
assert 'paused' in data
chal = gen_challenge(app.db)
flag = gen_flag(app.db, challenge_id=chal.id, content='flag')
data = {
"submission": 'flag',
"challenge_id": chal.id
}
r = client.post('/api/v1/challenges/attempt', json=data)
# Assert that the JSON message is correct
resp = r.get_json()['data']
assert r.status_code == 403
assert resp['status'] == 'paused'
assert resp['message'] == 'CTFd is paused'
# There are no solves saved
solves = Solves.query.count()
assert solves == 0
# There are no wrong keys saved
wrong_keys = Fails.query.count()
assert wrong_keys == 0
destroy_ctfd(app)
def test_challenges_under_view_after_ctf():
app = create_ctfd()
with app.app_context(), freeze_time("2017-10-7"):
set_config('start', '1507089600') # Wednesday, October 4, 2017 12:00:00 AM GMT-04:00 DST
set_config('end', '1507262400') # Friday, October 6, 2017 12:00:00 AM GMT-04:00 DST
register_user(app)
client = login_as_user(app)
gen_challenge(app.db)
gen_flag(app.db, challenge_id=1, content='flag')
r = client.get('/challenges')
assert r.status_code == 403
r = client.get('/api/v1/challenges')
assert r.status_code == 403
assert r.get_json().get('data') is None
r = client.get('/api/v1/challenges/1')
assert r.status_code == 403
assert r.get_json().get('data') is None
data = {
"submission": 'flag',
"challenge_id": 1
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 403
assert r.get_json().get('data') is None
assert Solves.query.count() == 0
data = {
"submission": 'notflag',
"challenge_id": 1
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 403
assert r.get_json().get('data') is None
assert Fails.query.count() == 0
set_config('view_after_ctf', True)
r = client.get('/challenges')
assert r.status_code == 200
r = client.get('/api/v1/challenges')
assert r.status_code == 200
assert r.get_json()['data'][0]['id'] == 1
r = client.get('/api/v1/challenges/1')
assert r.status_code == 200
assert r.get_json()['data']['id'] == 1
data = {
"submission": 'flag',
"challenge_id": 1
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 200
assert r.get_json()['data']['status'] == "correct"
assert Solves.query.count() == 0
data = {
"submission": 'notflag',
"challenge_id": 1
}
r = client.post('/api/v1/challenges/attempt', json=data)
assert r.status_code == 200
assert r.get_json()['data']['status'] == "incorrect"
assert Fails.query.count() == 0
destroy_ctfd(app)