Latest set of changes (#190)

* PEP 8 compliance (#183)

* Group imports: standard library, third party, local
* Remove unnecessary spaces
* Comments should start with a # and a single space

* Adding tests for GETs on user facing pages

* Adding more user facing tests

51% test coverage

* Fixes #182

* Cleaning up Pages

Fixes a bug with CSS updating
selenium-screenshot-testing
Kevin Chung 2017-01-10 03:35:48 -05:00 committed by GitHub
parent 397eb95dd7
commit fa788fe3d0
21 changed files with 583 additions and 406 deletions

View File

@ -1,14 +1,12 @@
from flask import Flask, render_template, request, redirect, abort, session, jsonify, json as json_mod, url_for
from flask_sqlalchemy import SQLAlchemy
from logging.handlers import RotatingFileHandler
from flask_session import Session
from sqlalchemy_utils import database_exists, create_database
from jinja2 import FileSystemLoader, TemplateNotFound
from utils import get_config, set_config, cache
import os
import sqlalchemy
from flask import Flask
from jinja2 import FileSystemLoader
from sqlalchemy.engine.url import make_url
from sqlalchemy.exc import OperationalError
from sqlalchemy_utils import database_exists, create_database
from utils import get_config, set_config, cache
class ThemeLoader(FileSystemLoader):
@ -20,7 +18,7 @@ class ThemeLoader(FileSystemLoader):
return super(ThemeLoader, self).get_source(environment, template)
def create_app(config='CTFd.config'):
def create_app(config='CTFd.config.Config'):
app = Flask(__name__)
with app.app_context():
app.config.from_object(config)

View File

@ -1,26 +1,17 @@
from flask import render_template, request, redirect, abort, jsonify, url_for, session, Blueprint
from CTFd.utils import sha512, is_safe_url, authed, admins_only, is_admin, unix_time, unix_time_millis, get_config, \
import hashlib
import json
import os
from flask import current_app as app, render_template, request, redirect, jsonify, url_for, Blueprint
from passlib.hash import bcrypt_sha256
from sqlalchemy.sql import not_
from werkzeug.utils import secure_filename
from CTFd.utils import admins_only, is_admin, unix_time, get_config, \
set_config, sendmail, rmdir, create_image, delete_image, run_image, container_status, container_ports, \
container_stop, container_start, get_themes, cache
from CTFd.models import db, Teams, Solves, Awards, Containers, Challenges, WrongKeys, Keys, Tags, Files, Tracking, Pages, Config, DatabaseError
from CTFd.scoreboard import get_standings
from itsdangerous import TimedSerializer, BadTimeSignature
from sqlalchemy.sql import and_, or_, not_
from sqlalchemy.sql.expression import union_all
from sqlalchemy.sql.functions import coalesce
from werkzeug.utils import secure_filename
from socket import inet_aton, inet_ntoa
from passlib.hash import bcrypt_sha256
from flask import current_app as app
import logging
import hashlib
import time
import re
import os
import json
import datetime
import calendar
admin = Blueprint('admin', __name__)
@ -175,8 +166,10 @@ def admin_css():
if request.method == 'POST':
css = request.form['css']
css = set_config('css', css)
return "1"
return "0"
with app.app_context():
cache.clear()
return '1'
return '0'
@admin.route('/admin/pages', defaults={'route': None}, methods=['GET', 'POST'])
@ -196,7 +189,7 @@ def admin_pages(route):
if not route:
errors.append('Missing URL route')
if errors:
page = Pages(html, "")
page = Pages(html, '')
return render_template('/admin/editor.html', page=page)
if page:
page.route = route
@ -229,7 +222,7 @@ def list_container():
containers = Containers.query.all()
for c in containers:
c.status = container_status(c.name)
c.ports = ", ".join(container_ports(c.name, verbose=True))
c.ports = ', '.join(container_ports(c.name, verbose=True))
return render_template('admin/containers.html', containers=containers)
@ -283,7 +276,6 @@ def new_container():
return redirect(url_for('admin.list_container'))
@admin.route('/admin/chals', methods=['POST', 'GET'])
@admins_only
def admin_chals():
@ -291,16 +283,17 @@ def admin_chals():
chals = Challenges.query.add_columns('id', 'name', 'value', 'description', 'category', 'hidden').order_by(Challenges.value).all()
teams_with_points = db.session.query(Solves.teamid).join(Teams).filter(
Teams.banned == False).group_by(
Solves.teamid).count()
Teams.banned == False).group_by(Solves.teamid).count()
json_data = {'game':[]}
json_data = {'game': []}
for x in chals:
solve_count = Solves.query.join(Teams, Solves.teamid == Teams.id).filter(Solves.chalid == x[1], Teams.banned == False).count()
solve_count = Solves.query.join(Teams, Solves.teamid == Teams.id).filter(
Solves.chalid == x[1], Teams.banned == False).count()
if teams_with_points > 0:
percentage = (float(solve_count) / float(teams_with_points))
else:
percentage = 0.0
json_data['game'].append({
'id': x.id,
'name': x.name,
@ -322,10 +315,10 @@ def admin_chals():
def admin_keys(chalid):
if request.method == 'GET':
chal = Challenges.query.filter_by(id=chalid).first_or_404()
json_data = {'keys':[]}
json_data = {'keys': []}
flags = json.loads(chal.flags)
for i, x in enumerate(flags):
json_data['keys'].append({'id':i, 'key':x['flag'], 'type':x['type']})
json_data['keys'].append({'id': i, 'key': x['flag'], 'type': x['type']})
return jsonify(json_data)
elif request.method == 'POST':
chal = Challenges.query.filter_by(id=chalid).first()
@ -334,7 +327,7 @@ def admin_keys(chalid):
newvals = request.form.getlist('vals[]')
flags = []
for flag, val in zip(newkeys, newvals):
flag_dict = {'flag':flag, 'type':int(val)}
flag_dict = {'flag': flag, 'type': int(val)}
flags.append(flag_dict)
json_data = json.dumps(flags)
@ -350,9 +343,9 @@ def admin_keys(chalid):
def admin_tags(chalid):
if request.method == 'GET':
tags = Tags.query.filter_by(chal=chalid).all()
json_data = {'tags':[]}
json_data = {'tags': []}
for x in tags:
json_data['tags'].append({'id':x.id, 'chal':x.chal, 'tag':x.tag})
json_data['tags'].append({'id': x.id, 'chal': x.chal, 'tag': x.tag})
return jsonify(json_data)
elif request.method == 'POST':
@ -373,7 +366,7 @@ def admin_delete_tags(tagid):
db.session.delete(tag)
db.session.commit()
db.session.close()
return "1"
return '1'
@admin.route('/admin/files/<chalid>', methods=['GET', 'POST'])
@ -381,19 +374,19 @@ def admin_delete_tags(tagid):
def admin_files(chalid):
if request.method == 'GET':
files = Files.query.filter_by(chal=chalid).all()
json_data = {'files':[]}
json_data = {'files': []}
for x in files:
json_data['files'].append({'id':x.id, 'file':x.location})
json_data['files'].append({'id': x.id, 'file': x.location})
return jsonify(json_data)
if request.method == 'POST':
if request.form['method'] == "delete":
f = Files.query.filter_by(id=request.form['file']).first_or_404()
if os.path.exists(os.path.join(app.root_path, 'uploads', f.location)): ## Some kind of os.path.isfile issue on Windows...
if os.path.exists(os.path.join(app.root_path, 'uploads', f.location)): # Some kind of os.path.isfile issue on Windows...
os.unlink(os.path.join(app.root_path, 'uploads', f.location))
db.session.delete(f)
db.session.commit()
db.session.close()
return "1"
return '1'
elif request.form['method'] == "upload":
files = request.files.getlist('files[]')
@ -417,14 +410,14 @@ def admin_files(chalid):
return redirect(url_for('admin.admin_chals'))
@admin.route('/admin/teams', defaults={'page':'1'})
@admin.route('/admin/teams', defaults={'page': '1'})
@admin.route('/admin/teams/<page>')
@admins_only
def admin_teams(page):
page = abs(int(page))
results_per_page = 50
page_start = results_per_page * ( page - 1 )
page_end = results_per_page * ( page - 1 ) + results_per_page
page_start = results_per_page * (page - 1)
page_end = results_per_page * (page - 1) + results_per_page
teams = Teams.query.order_by(Teams.id.asc()).slice(page_start, page_end).all()
count = db.session.query(db.func.count(Teams.id)).first()[0]
@ -440,12 +433,12 @@ def admin_team(teamid):
if request.method == 'GET':
solves = Solves.query.filter_by(teamid=teamid).all()
solve_ids = [s.chalid for s in solves]
missing = Challenges.query.filter( not_(Challenges.id.in_(solve_ids) ) ).all()
missing = Challenges.query.filter(not_(Challenges.id.in_(solve_ids))).all()
last_seen = db.func.max(Tracking.date).label('last_seen')
addrs = db.session.query(Tracking.ip, last_seen) \
.filter_by(team=teamid) \
.group_by(Tracking.ip) \
.order_by(last_seen.desc()).all()
.filter_by(team=teamid) \
.group_by(Tracking.ip) \
.order_by(last_seen.desc()).all()
wrong_keys = WrongKeys.query.filter_by(teamid=teamid).order_by(WrongKeys.date.asc()).all()
awards = Awards.query.filter_by(teamid=teamid).order_by(Awards.date.asc()).all()
score = user.score()
@ -490,7 +483,7 @@ def admin_team(teamid):
if errors:
db.session.close()
return jsonify({'data':errors})
return jsonify({'data': errors})
else:
user.name = name
user.email = email
@ -501,7 +494,7 @@ def admin_team(teamid):
user.country = country
db.session.commit()
db.session.close()
return jsonify({'data':['success']})
return jsonify({'data': ['success']})
@admin.route('/admin/team/<teamid>/mail', methods=['POST'])
@ -511,8 +504,8 @@ def email_user(teamid):
team = Teams.query.filter(Teams.id == teamid).first()
if message and team:
if sendmail(team.email, message):
return "1"
return "0"
return '1'
return '0'
@admin.route('/admin/team/<teamid>/ban', methods=['POST'])
@ -556,16 +549,16 @@ def delete_team(teamid):
def admin_graph(graph_type):
if graph_type == 'categories':
categories = db.session.query(Challenges.category, db.func.count(Challenges.category)).group_by(Challenges.category).all()
json_data = {'categories':[]}
json_data = {'categories': []}
for category, count in categories:
json_data['categories'].append({'category':category, 'count':count})
json_data['categories'].append({'category': category, 'count': count})
return jsonify(json_data)
elif graph_type == "solves":
solves_sub = db.session.query(Solves.chalid, db.func.count(Solves.chalid).label('solves_cnt')) \
.join(Teams, Solves.teamid == Teams.id).filter(Teams.banned == False) \
.group_by(Solves.chalid).subquery()
.join(Teams, Solves.teamid == Teams.id).filter(Teams.banned == False) \
.group_by(Solves.chalid).subquery()
solves = db.session.query(solves_sub.columns.chalid, solves_sub.columns.solves_cnt, Challenges.name) \
.join(Challenges, solves_sub.columns.chalid == Challenges.id).all()
.join(Challenges, solves_sub.columns.chalid == Challenges.id).all()
json_data = {}
for chal, count, name in solves:
json_data[name] = count
@ -587,15 +580,15 @@ def admin_awards(teamid):
awards_list = []
for award in awards:
awards_list.append({
'id':award.id,
'name':award.name,
'description':award.description,
'date':award.date,
'value':award.value,
'category':award.category,
'icon':award.icon
})
json_data = {'awards':awards_list}
'id': award.id,
'name': award.name,
'description': award.description,
'date': award.date,
'value': award.value,
'category': award.category,
'icon': award.icon
})
json_data = {'awards': awards_list}
return jsonify(json_data)
@ -612,10 +605,10 @@ def create_award():
db.session.add(award)
db.session.commit()
db.session.close()
return "1"
return '1'
except Exception as e:
print e
return "0"
print(e)
return '0'
@admin.route('/admin/awards/<award_id>/delete', methods=['POST'])
@ -628,8 +621,8 @@ def delete_award(award_id):
db.session.close()
return '1'
except Exception as e:
print e
return "0"
print(e)
return '0'
@admin.route('/admin/scores')
@ -639,9 +632,9 @@ def admin_scores():
quickest = db.func.max(Solves.date).label('quickest')
teams = db.session.query(Solves.teamid, Teams.name, score).join(Teams).join(Challenges).filter(Teams.banned == False).group_by(Solves.teamid).order_by(score.desc(), quickest)
db.session.close()
json_data = {'teams':[]}
json_data = {'teams': []}
for i, x in enumerate(teams):
json_data['teams'].append({'place':i+1, 'id':x.teamid, 'name':x.name,'score':int(x.score)})
json_data['teams'].append({'place': i + 1, 'id': x.teamid, 'name': x.name, 'score': int(x.score)})
return jsonify(json_data)
@ -654,7 +647,7 @@ def admin_solves(teamid="all"):
solves = Solves.query.filter_by(teamid=teamid).all()
awards = Awards.query.filter_by(teamid=teamid).all()
db.session.close()
json_data = {'solves':[]}
json_data = {'solves': []}
for x in solves:
json_data['solves'].append({
'id': x.id,
@ -674,7 +667,7 @@ def admin_solves(teamid="all"):
'category': award.category,
'time': unix_time(award.date)
})
json_data['solves'].sort(key=lambda k:k['time'])
json_data['solves'].sort(key=lambda k: k['time'])
return jsonify(json_data)
@ -687,6 +680,7 @@ def create_solve(teamid, chalid):
db.session.close()
return '1'
@admin.route('/admin/solves/<keyid>/delete', methods=['POST'])
@admins_only
def delete_solve(keyid):
@ -707,7 +701,6 @@ def delete_wrong_key(keyid):
return '1'
@admin.route('/admin/statistics', methods=['GET'])
@admins_only
def admin_stats():
@ -717,10 +710,10 @@ def admin_stats():
challenge_count = db.session.query(db.func.count(Challenges.id)).first()[0]
solves_sub = db.session.query(Solves.chalid, db.func.count(Solves.chalid).label('solves_cnt')) \
.join(Teams, Solves.teamid == Teams.id).filter(Teams.banned == False) \
.group_by(Solves.chalid).subquery()
.join(Teams, Solves.teamid == Teams.id).filter(Teams.banned == False) \
.group_by(Solves.chalid).subquery()
solves = db.session.query(solves_sub.columns.chalid, solves_sub.columns.solves_cnt, Challenges.name) \
.join(Challenges, solves_sub.columns.chalid == Challenges.id).all()
.join(Challenges, solves_sub.columns.chalid == Challenges.id).all()
solve_data = {}
for chal, count, name in solves:
solve_data[name] = count
@ -736,13 +729,12 @@ def admin_stats():
db.session.close()
return render_template('admin/statistics.html', team_count=teams_registered,
wrong_count=wrong_count,
solve_count=solve_count,
challenge_count=challenge_count,
solve_data=solve_data,
most_solved=most_solved,
least_solved=least_solved
)
wrong_count=wrong_count,
solve_count=solve_count,
challenge_count=challenge_count,
solve_data=solve_data,
most_solved=most_solved,
least_solved=least_solved)
@admin.route('/admin/wrong_keys/<page>', methods=['GET'])
@ -750,12 +742,16 @@ def admin_stats():
def admin_wrong_key(page='1'):
page = abs(int(page))
results_per_page = 50
page_start = results_per_page * ( page - 1 )
page_end = results_per_page * ( page - 1 ) + results_per_page
page_start = results_per_page * (page - 1)
page_end = results_per_page * (page - 1) + results_per_page
wrong_keys = WrongKeys.query.add_columns(WrongKeys.id, WrongKeys.chalid, WrongKeys.flag, WrongKeys.teamid, WrongKeys.date,\
Challenges.name.label('chal_name'), Teams.name.label('team_name')).\
join(Challenges).join(Teams).order_by(WrongKeys.date.desc()).slice(page_start, page_end).all()
wrong_keys = WrongKeys.query.add_columns(WrongKeys.id, WrongKeys.chalid, WrongKeys.flag, WrongKeys.teamid, WrongKeys.date,
Challenges.name.label('chal_name'), Teams.name.label('team_name')) \
.join(Challenges) \
.join(Teams) \
.order_by(WrongKeys.date.desc()) \
.slice(page_start, page_end) \
.all()
wrong_count = db.session.query(db.func.count(WrongKeys.id)).first()[0]
pages = int(wrong_count / results_per_page) + (wrong_count % results_per_page > 0)
@ -771,9 +767,13 @@ def admin_correct_key(page='1'):
page_start = results_per_page * (page - 1)
page_end = results_per_page * (page - 1) + results_per_page
solves = Solves.query.add_columns(Solves.id, Solves.chalid, Solves.teamid, Solves.date, Solves.flag, \
Challenges.name.label('chal_name'), Teams.name.label('team_name')).\
join(Challenges).join(Teams).order_by(Solves.date.desc()).slice(page_start, page_end).all()
solves = Solves.query.add_columns(Solves.id, Solves.chalid, Solves.teamid, Solves.date, Solves.flag,
Challenges.name.label('chal_name'), Teams.name.label('team_name')) \
.join(Challenges) \
.join(Teams) \
.order_by(Solves.date.desc()) \
.slice(page_start, page_end) \
.all()
solve_count = db.session.query(db.func.count(Solves.id)).first()[0]
pages = int(solve_count / results_per_page) + (solve_count % results_per_page > 0)
@ -788,13 +788,13 @@ def admin_fails(teamid='all'):
fails = WrongKeys.query.join(Teams, WrongKeys.teamid == Teams.id).filter(Teams.banned == False).count()
solves = Solves.query.join(Teams, Solves.teamid == Teams.id).filter(Teams.banned == False).count()
db.session.close()
json_data = {'fails':str(fails), 'solves': str(solves)}
json_data = {'fails': str(fails), 'solves': str(solves)}
return jsonify(json_data)
else:
fails = WrongKeys.query.filter_by(teamid=teamid).count()
solves = Solves.query.filter_by(teamid=teamid).count()
db.session.close()
json_data = {'fails':str(fails), 'solves': str(solves)}
json_data = {'fails': str(fails), 'solves': str(solves)}
return jsonify(json_data)
@ -803,8 +803,8 @@ def admin_fails(teamid='all'):
def admin_create_chal():
files = request.files.getlist('files[]')
## TODO: Expand to support multiple flags
flags = [{'flag':request.form['key'], 'type':int(request.form['key_type[0]'])}]
# TODO: Expand to support multiple flags
flags = [{'flag': request.form['key'], 'type':int(request.form['key_type[0]'])}]
# Create challenge
chal = Challenges(request.form['name'], request.form['desc'], request.form['value'], request.form['category'], flags)

View File

@ -1,16 +1,15 @@
from flask import render_template, request, redirect, abort, jsonify, url_for, session, Blueprint
from CTFd.utils import sha512, is_safe_url, authed, can_send_mail, sendmail, can_register, get_config, verify_email
from CTFd.models import db, Teams
import logging
import os
import re
import time
import urllib
from flask import current_app as app, render_template, request, redirect, url_for, session, Blueprint
from itsdangerous import TimedSerializer, BadTimeSignature, Signer, BadSignature
from passlib.hash import bcrypt_sha256
from flask import current_app as app
import logging
import time
import re
import os
import urllib
from CTFd.utils import sha512, is_safe_url, authed, can_send_mail, sendmail, can_register, get_config, verify_email
from CTFd.models import db, Teams
auth = Blueprint('auth', __name__)
@ -20,7 +19,7 @@ auth = Blueprint('auth', __name__)
def confirm_user(data=None):
if not get_config('verify_emails'):
return redirect(url_for('challenges.challenges_view'))
if data and request.method == "GET": ## User is confirming email account
if data and request.method == "GET": # User is confirming email account
try:
s = Signer(app.config['SECRET_KEY'])
email = s.unsign(urllib.unquote_plus(data.decode('base64')))
@ -37,7 +36,7 @@ def confirm_user(data=None):
if authed():
return redirect(url_for('challenges.challenges_view'))
return redirect(url_for('auth.login'))
if not data and request.method == "GET": ## User has been directed to the confirm page because his account is not verified
if not data and request.method == "GET": # User has been directed to the confirm page because his account is not verified
if not authed():
return redirect(url_for('auth.login'))
team = Teams.query.filter_by(id=session['id']).first()
@ -48,7 +47,6 @@ def confirm_user(data=None):
return render_template('confirm.html', team=team)
@auth.route('/reset_password', methods=['POST', 'GET'])
@auth.route('/reset_password/<data>', methods=['POST', 'GET'])
def reset_password(data=None):
@ -76,7 +74,7 @@ def reset_password(data=None):
s = TimedSerializer(app.config['SECRET_KEY'])
token = s.dumps(team.name)
text = """
Did you initiate a password reset?
Did you initiate a password reset?
{0}/{1}
@ -132,15 +130,15 @@ def register():
session['admin'] = team.admin
session['nonce'] = sha512(os.urandom(10))
if can_send_mail() and get_config('verify_emails'): ## Confirming users is enabled and we can send email.
if can_send_mail() and get_config('verify_emails'): # Confirming users is enabled and we can send email.
db.session.close()
logger = logging.getLogger('regs')
logger.warn("[{0}] {1} registered (UNCONFIRMED) with {2}".format(time.strftime("%m/%d/%Y %X"),
request.form['name'].encode('utf-8'),
request.form['email'].encode('utf-8')))
request.form['name'].encode('utf-8'),
request.form['email'].encode('utf-8')))
return redirect(url_for('auth.confirm_user'))
else: ## Don't care about confirming users
if can_send_mail(): ## We want to notify the user that they have registered.
else: # Don't care about confirming users
if can_send_mail(): # We want to notify the user that they have registered.
sendmail(request.form['email'], "You've successfully registered for {}".format(get_config('ctf_name')))
db.session.close()

View File

@ -1,14 +1,13 @@
from flask import current_app as app, render_template, request, redirect, abort, jsonify, json as json_mod, url_for, session, Blueprint
import json
import logging
import re
import time
from flask import render_template, request, redirect, jsonify, url_for, session, Blueprint
from sqlalchemy.sql import or_
from CTFd.utils import ctftime, view_after_ctf, authed, unix_time, get_kpm, user_can_view_challenges, is_admin, get_config, get_ip, is_verified, ctf_started, ctf_ended, ctf_name
from CTFd.models import db, Challenges, Files, Solves, WrongKeys, Keys, Tags, Teams, Awards
from sqlalchemy.sql import and_, or_, not_
import time
import re
import logging
import json
from CTFd.models import db, Challenges, Files, Solves, WrongKeys, Tags, Teams, Awards
challenges = Blueprint('challenges', __name__)
@ -52,11 +51,11 @@ def chals():
if user_can_view_challenges() and (ctf_started() or is_admin()):
chals = Challenges.query.filter(or_(Challenges.hidden != True, Challenges.hidden == None)).add_columns('id', 'name', 'value', 'description', 'category').order_by(Challenges.value).all()
json = {'game':[]}
json = {'game': []}
for x in chals:
tags = [tag.tag for tag in Tags.query.add_columns('tag').filter_by(chal=x[1]).all()]
files = [ str(f.location) for f in Files.query.filter_by(chal=x.id).all() ]
json['game'].append({'id':x[1], 'name':x[2], 'value':x[3], 'description':x[4], 'category':x[5], 'files':files, 'tags':tags})
files = [str(f.location) for f in Files.query.filter_by(chal=x.id).all()]
json['game'].append({'id': x[1], 'name': x[2], 'value': x[3], 'description': x[4], 'category': x[5], 'files': files, 'tags': tags})
db.session.close()
return jsonify(json)
@ -66,12 +65,12 @@ def chals():
@challenges.route('/chals/solves')
def chals_per_solves():
def solves_per_chal():
if not user_can_view_challenges():
return redirect(url_for('auth.login', next=request.path))
solves_sub = db.session.query(Solves.chalid, db.func.count(Solves.chalid).label('solves')).join(Teams, Solves.teamid == Teams.id).filter(Teams.banned == False).group_by(Solves.chalid).subquery()
solves = db.session.query(solves_sub.columns.chalid, solves_sub.columns.solves, Challenges.name) \
.join(Challenges, solves_sub.columns.chalid == Challenges.id).all()
.join(Challenges, solves_sub.columns.chalid == Challenges.id).all()
json = {}
for chal, count, name in solves:
json[chal] = count
@ -79,7 +78,6 @@ def chals_per_solves():
return jsonify(json)
@challenges.route('/solves')
@challenges.route('/solves/<teamid>')
def solves(teamid=None):
@ -88,7 +86,7 @@ def solves(teamid=None):
if teamid is None:
if is_admin():
solves = Solves.query.filter_by(teamid=session['id']).all()
elif authed():
elif user_can_view_challenges():
solves = Solves.query.join(Teams, Solves.teamid == Teams.id).filter(Solves.teamid == session['id'], Teams.banned == False).all()
else:
return redirect(url_for('auth.login', next='solves'))
@ -96,7 +94,7 @@ def solves(teamid=None):
solves = Solves.query.filter_by(teamid=teamid).all()
awards = Awards.query.filter_by(teamid=teamid).all()
db.session.close()
json = {'solves':[]}
json = {'solves': []}
for solve in solves:
json['solves'].append({
'chal': solve.chal.name,
@ -125,11 +123,11 @@ def attempts():
if not user_can_view_challenges():
return redirect(url_for('auth.login', next=request.path))
chals = Challenges.query.add_columns('id').all()
json = {'maxattempts':[]}
json = {'maxattempts': []}
for chal, chalid in chals:
fails = WrongKeys.query.filter_by(teamid=session['id'], chalid=chalid).count()
if fails >= int(get_config("max_tries")) and int(get_config("max_tries")) > 0:
json['maxattempts'].append({'chalid':chalid})
json['maxattempts'].append({'chalid': chalid})
return jsonify(json)
@ -138,7 +136,7 @@ def fails(teamid):
fails = WrongKeys.query.filter_by(teamid=teamid).count()
solves = Solves.query.filter_by(teamid=teamid).count()
db.session.close()
json = {'fails':str(fails), 'solves': str(solves)}
json = {'fails': str(fails), 'solves': str(solves)}
return jsonify(json)
@ -147,9 +145,9 @@ def who_solved(chalid):
if not user_can_view_challenges():
return redirect(url_for('auth.login', next=request.path))
solves = Solves.query.join(Teams, Solves.teamid == Teams.id).filter(Solves.chalid == chalid, Teams.banned == False).order_by(Solves.date.asc())
json = {'teams':[]}
json = {'teams': []}
for solve in solves:
json['teams'].append({'id':solve.team.id, 'name':solve.team.name, 'date':solve.date})
json['teams'].append({'id': solve.team.id, 'name': solve.team.name, 'date': solve.date})
return jsonify(json)
@ -173,7 +171,7 @@ def chal(chalid):
db.session.commit()
db.session.close()
logger.warn("[{0}] {1} submitted {2} with kpm {3} [TOO FAST]".format(*data))
# return "3" # Submitting too fast
# return '3' # Submitting too fast
return jsonify({'status': '3', 'message': "You're submitting keys too fast. Slow down."})
solves = Solves.query.filter_by(teamid=session['id'], chalid=chalid).first()
@ -193,7 +191,7 @@ def chal(chalid):
})
for x in keys:
if x['type'] == 0: #static key
if x['type'] == 0: # static key
print(x['flag'], key.strip().lower())
if x['flag'] and x['flag'].strip().lower() == key.strip().lower():
if ctftime():
@ -202,9 +200,9 @@ def chal(chalid):
db.session.commit()
db.session.close()
logger.info("[{0}] {1} submitted {2} with kpm {3} [CORRECT]".format(*data))
# return "1" # key was correct
return jsonify({'status':'1', 'message':'Correct'})
elif x['type'] == 1: #regex
# return '1' # key was correct
return jsonify({'status': '1', 'message': 'Correct'})
elif x['type'] == 1: # regex
res = re.match(x['flag'], key, re.IGNORECASE)
if res and res.group() == key:
if ctftime():
@ -213,7 +211,7 @@ def chal(chalid):
db.session.commit()
db.session.close()
logger.info("[{0}] {1} submitted {2} with kpm {3} [CORRECT]".format(*data))
# return "1" # key was correct
# return '1' # key was correct
return jsonify({'status': '1', 'message': 'Correct'})
if ctftime():
@ -232,11 +230,10 @@ def chal(chalid):
else:
return jsonify({'status': '0', 'message': 'Incorrect'})
# Challenge already solved
else:
logger.info("{0} submitted {1} with kpm {2} [ALREADY SOLVED]".format(*data))
# return "2" # challenge was already solved
# return '2' # challenge was already solved
return jsonify({'status': '2', 'message': 'You already solved this'})
else:
return "-1"
return '-1'

View File

@ -1,6 +1,7 @@
import os
##### GENERATE SECRET KEY #####
with open('.ctfd_secret_key', 'a+') as secret:
secret.seek(0) # Seek to beginning of file since a+ mode leaves you at the end and w+ deletes the file
key = secret.read()
@ -11,118 +12,125 @@ with open('.ctfd_secret_key', 'a+') as secret:
##### SERVER SETTINGS #####
class Config(object):
'''
SECRET_KEY is the secret value used to creation sessions and sign strings. This should be set to a random string. In the
interest of ease, CTFd will automatically create a secret key file for you. If you wish to add this secret key to
your instance you should hard code this value to a random static value.
'''
SECRET_KEY is the secret value used to creation sessions and sign strings. This should be set to a random string. In the
interest of ease, CTFd will automatically create a secret key file for you. If you wish to add this secret key to
your instance you should hard code this value to a random static value.
You can also remove .ctfd_secret_key from the .gitignore file and commit this file into whatever repository
you are using.
You can also remove .ctfd_secret_key from the .gitignore file and commit this file into whatever repository
you are using.
http://flask.pocoo.org/docs/0.11/quickstart/#sessions
'''
SECRET_KEY = key
http://flask.pocoo.org/docs/0.11/quickstart/#sessions
'''
SECRET_KEY = key
'''
SQLALCHEMY_DATABASE_URI is the URI that specifies the username, password, hostname, port, and database of the server
used to hold the CTFd database.
'''
SQLALCHEMY_DATABASE_URI is the URI that specifies the username, password, hostname, port, and database of the server
used to hold the CTFd database.
http://flask-sqlalchemy.pocoo.org/2.1/config/#configuration-keys
'''
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///ctfd.db'
http://flask-sqlalchemy.pocoo.org/2.1/config/#configuration-keys
'''
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///ctfd.db'
'''
SQLALCHEMY_TRACK_MODIFICATIONS is automatically disabled to suppress warnings and save memory. You should only enable
this if you need it.
'''
SQLALCHEMY_TRACK_MODIFICATIONS = False
'''
SQLALCHEMY_TRACK_MODIFICATIONS is automatically disabled to suppress warnings and save memory. You should only enable
this if you need it.
'''
SQLALCHEMY_TRACK_MODIFICATIONS = False
'''
SESSION_TYPE is a configuration value used for Flask-Session. It is currently unused in CTFd.
http://pythonhosted.org/Flask-Session/#configuration
'''
SESSION_TYPE = "filesystem"
'''
SESSION_TYPE is a configuration value used for Flask-Session. It is currently unused in CTFd.
http://pythonhosted.org/Flask-Session/#configuration
'''
SESSION_TYPE = "filesystem"
'''
SESSION_FILE_DIR is a configuration value used for Flask-Session. It is currently unused in CTFd.
http://pythonhosted.org/Flask-Session/#configuration
'''
SESSION_FILE_DIR = "/tmp/flask_session"
'''
SESSION_FILE_DIR is a configuration value used for Flask-Session. It is currently unused in CTFd.
http://pythonhosted.org/Flask-Session/#configuration
'''
SESSION_FILE_DIR = "/tmp/flask_session"
'''
SESSION_COOKIE_HTTPONLY controls if cookies should be set with the HttpOnly flag.
'''
SESSION_COOKIE_HTTPONLY = True
'''
SESSION_COOKIE_HTTPONLY controls if cookies should be set with the HttpOnly flag.
'''
SESSION_COOKIE_HTTPONLY = True
'''
PERMANENT_SESSION_LIFETIME is the lifetime of a session.
'''
PERMANENT_SESSION_LIFETIME = 604800 # 7 days in seconds
'''
PERMANENT_SESSION_LIFETIME is the lifetime of a session.
'''
PERMANENT_SESSION_LIFETIME = 604800 # 7 days in seconds
'''
HOST specifies the hostname where the CTFd instance will exist. It is currently unused.
'''
HOST = ".ctfd.io"
'''
HOST specifies the hostname where the CTFd instance will exist. It is currently unused.
'''
HOST = ".ctfd.io"
'''
MAILFROM_ADDR is the email address that emails are sent from if not overridden in the configuration panel.
'''
MAILFROM_ADDR = "noreply@ctfd.io"
'''
MAILFROM_ADDR is the email address that emails are sent from if not overridden in the configuration panel.
'''
MAILFROM_ADDR = "noreply@ctfd.io"
'''
UPLOAD_FOLDER is the location where files are uploaded.
The default destination is the CTFd/static/uploads folder. If you need Amazon S3 files
you can use the CTFd S3 plugin: https://github.com/ColdHeat/CTFd-S3-plugin
'''
UPLOAD_FOLDER = os.path.normpath('static/uploads')
'''
UPLOAD_FOLDER is the location where files are uploaded.
The default destination is the CTFd/static/uploads folder. If you need Amazon S3 files
you can use the CTFd S3 plugin: https://github.com/ColdHeat/CTFd-S3-plugin
'''
UPLOAD_FOLDER = os.path.normpath('static/uploads')
'''
TEMPLATES_AUTO_RELOAD specifies whether Flask should check for modifications to templates and
reload them automatically
'''
TEMPLATES_AUTO_RELOAD = True
'''
TEMPLATES_AUTO_RELOAD specifies whether Flask should check for modifications to templates and
reload them automatically
'''
TEMPLATES_AUTO_RELOAD = True
'''
TRUSTED_PROXIES defines a set of regular expressions used for finding a user's IP address if the CTFd instance
is behind a proxy. If you are running a CTF and users are on the same network as you, you may choose to remove
some proxies from the list.
'''
TRUSTED_PROXIES defines a set of regular expressions used for finding a user's IP address if the CTFd instance
is behind a proxy. If you are running a CTF and users are on the same network as you, you may choose to remove
some proxies from the list.
CTFd only uses IP addresses for cursory tracking purposes. It is ill-advised to do anything complicated based
solely on IP addresses.
'''
TRUSTED_PROXIES = [
'^127\.0\.0\.1$',
## Remove the following proxies if you do not trust the local network
## For example if you are running a CTF on your laptop and the teams are all on the same network
'^::1$',
'^fc00:',
'^10\.',
'^172\.(1[6-9]|2[0-9]|3[0-1])\.',
'^192\.168\.'
]
CTFd only uses IP addresses for cursory tracking purposes. It is ill-advised to do anything complicated based
solely on IP addresses.
'''
TRUSTED_PROXIES = [
'^127\.0\.0\.1$',
# Remove the following proxies if you do not trust the local network
# For example if you are running a CTF on your laptop and the teams are all on the same network
'^::1$',
'^fc00:',
'^10\.',
'^172\.(1[6-9]|2[0-9]|3[0-1])\.',
'^192\.168\.'
]
'''
CACHE_TYPE specifies how CTFd should cache configuration values. If CACHE_TYPE is set to 'redis', CTFd will make use
of the REDIS_URL specified in environment variables. You can also choose to hardcode the REDIS_URL here.
'''
CACHE_TYPE specifies how CTFd should cache configuration values. If CACHE_TYPE is set to 'redis', CTFd will make use
of the REDIS_URL specified in environment variables. You can also choose to hardcode the REDIS_URL here.
CACHE_REDIS_URL is the URL to connect to Redis server.
Example: redis://user:password@localhost:6379/2.
CACHE_REDIS_URL is the URL to connect to Redis server.
Example: redis://user:password@localhost:6379/2.
http://pythonhosted.org/Flask-Caching/#configuring-flask-caching
'''
CACHE_TYPE = "simple"
if CACHE_TYPE == 'redis':
CACHE_REDIS_URL = os.environ.get('REDIS_URL')
http://pythonhosted.org/Flask-Caching/#configuring-flask-caching
'''
CACHE_TYPE = "simple"
if CACHE_TYPE == 'redis':
CACHE_REDIS_URL = os.environ.get('REDIS_URL')
class TestingConfig(Config):
PRESERVE_CONTEXT_ON_EXCEPTION = False
TESTING = True
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite://'

View File

@ -1,14 +1,12 @@
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import DatabaseError
from sqlalchemy.sql import func
from socket import inet_aton, inet_ntoa
from struct import unpack, pack, error as struct_error
from passlib.hash import bcrypt_sha256
import datetime
import hashlib
import json
from socket import inet_aton, inet_ntoa
from struct import unpack, pack, error as struct_error
from flask_sqlalchemy import SQLAlchemy
from passlib.hash import bcrypt_sha256
from sqlalchemy.exc import DatabaseError
def sha512(string):
@ -26,6 +24,7 @@ def long2ip(ip_int):
# Backwards compatibility with old CTFd databases
return inet_ntoa(pack('!I', ip_int))
db = SQLAlchemy()
@ -159,7 +158,7 @@ class Teams(db.Model):
def score(self):
score = db.func.sum(Challenges.value).label('score')
team = db.session.query(Solves.teamid, score).join(Teams).join(Challenges).filter(Teams.banned == False, Teams.id==self.id).group_by(Solves.teamid).first()
team = db.session.query(Solves.teamid, score).join(Teams).join(Challenges).filter(Teams.banned == False, Teams.id == self.id).group_by(Solves.teamid).first()
award_score = db.func.sum(Awards.value).label('award_score')
award = db.session.query(award_score).filter_by(teamid=self.id).first()
if team:
@ -171,7 +170,7 @@ class Teams(db.Model):
score = db.func.sum(Challenges.value).label('score')
quickest = db.func.max(Solves.date).label('quickest')
teams = db.session.query(Solves.teamid).join(Teams).join(Challenges).filter(Teams.banned == False).group_by(Solves.teamid).order_by(score.desc(), quickest).all()
#http://codegolf.stackexchange.com/a/4712
# http://codegolf.stackexchange.com/a/4712
try:
i = teams.index((self.id,)) + 1
k = i % 10

View File

@ -1,8 +1,6 @@
from CTFd.models import db, WrongKeys, Pages, Config, Tracking, Teams, Containers, ip2long, long2ip
from flask import current_app as app, g, request, redirect, url_for, session, render_template, abort
import os
import importlib
import glob
import importlib
import os
def init_plugins(app):
@ -12,4 +10,4 @@ def init_plugins(app):
module = '.' + os.path.basename(module)
module = importlib.import_module(module, package='CTFd.plugins')
module.load(app)
print " * Loaded module,", module
print(" * Loaded module, %s" % module)

View File

@ -1,10 +1,12 @@
from flask import current_app as app, session, render_template, jsonify, Blueprint, redirect, url_for, request
from CTFd.utils import unix_time, authed, get_config
from CTFd.models import db, Teams, Solves, Awards, Challenges
from flask import render_template, jsonify, Blueprint, redirect, url_for, request
from sqlalchemy.sql.expression import union_all
from CTFd.utils import unix_time, authed, get_config
from CTFd.models import db, Teams, Solves, Awards, Challenges
scoreboard = Blueprint('scoreboard', __name__)
def get_standings(admin=False, count=None):
score = db.func.sum(Challenges.value).label('score')
date = db.func.max(Solves.date).label('date')
@ -16,13 +18,13 @@ def get_standings(admin=False, count=None):
.group_by(results.columns.teamid).subquery()
if admin:
standings_query = db.session.query(Teams.id.label('teamid'), Teams.name.label('name'), Teams.banned, sumscores.columns.score) \
.join(sumscores, Teams.id == sumscores.columns.teamid) \
.order_by(sumscores.columns.score.desc(), sumscores.columns.date)
.join(sumscores, Teams.id == sumscores.columns.teamid) \
.order_by(sumscores.columns.score.desc(), sumscores.columns.date)
else:
standings_query = db.session.query(Teams.id.label('teamid'), Teams.name.label('name'), sumscores.columns.score) \
.join(sumscores, Teams.id == sumscores.columns.teamid) \
.filter(Teams.banned == False) \
.order_by(sumscores.columns.score.desc(), sumscores.columns.date)
.join(sumscores, Teams.id == sumscores.columns.teamid) \
.filter(Teams.banned == False) \
.order_by(sumscores.columns.score.desc(), sumscores.columns.date)
if count is None:
standings = standings_query.all()
else:
@ -44,9 +46,9 @@ def scores():
if get_config('view_scoreboard_if_authed') and not authed():
return redirect(url_for('auth.login', next=request.path))
standings = get_standings()
json = {'standings':[]}
json = {'standings': []}
for i, x in enumerate(standings):
json['standings'].append({'pos':i+1, 'id':x.teamid, 'team':x.name,'score':int(x.score)})
json['standings'].append({'pos': i + 1, 'id': x.teamid, 'team': x.name, 'score': int(x.score)})
return jsonify(json)
@ -56,19 +58,18 @@ def topteams(count):
return redirect(url_for('auth.login', next=request.path))
try:
count = int(count)
except:
except ValueError:
count = 10
if count > 20 or count < 0:
count = 10
json = {'scores':{}}
json = {'scores': {}}
standings = get_standings(count=count)
for team in standings:
solves = Solves.query.filter_by(teamid=team.teamid).all()
awards = Awards.query.filter_by(teamid=team.teamid).all()
json['scores'][team.name] = []
scores = []
for x in solves:
json['scores'][team.name].append({
'chal': x.chalid,

View File

@ -36,7 +36,14 @@
<div class="navbar-collapse collapse" aria-expanded="false" style="height: 0px">
<ul class="nav navbar-nav navbar-nav-right">
<li><a href="{{ request.script_root }}/admin/graphs">Graphs</a></li>
<li><a href="{{ request.script_root }}/admin/pages">Pages</a></li>
<li>
<a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true"
aria-expanded="false">Pages <span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="{{ request.script_root }}/admin/pages">All Pages</a></li>
<li><a href="{{ request.script_root }}/admin/pages?mode=create">New Page</a></li>
</ul>
</li>
<li><a href="{{ request.script_root }}/admin/teams">Teams</a></li>
<li><a href="{{ request.script_root }}/admin/scoreboard">Scoreboard</a></li>
{% if can_create_container() %}

View File

@ -23,14 +23,23 @@
<form id="page-edit" method="POST">
<div class="row-fluid">
<div class="col-md-12">
<strong>Route: </strong><input name='nonce' type='hidden' value="{{ nonce }}">
<input class="radius" id="route" type="text" name="route" value="{% if page is defined %}{{ page.route }}{% endif %}" placeholder="Route">
<h3>Route: </h3>
<input name='nonce' type='hidden' value="{{ nonce }}">
<input class="form-control radius" id="route" type="text" name="route" value="{% if page is defined %}{{ page.route }}{% endif %}" placeholder="Route">
</div>
</div>
<br>
<div class="row-fluid">
<div class="col-md-12">
<h3>Content: </h3>
<textarea id="admin-pages-editor" name="html">{% if page is defined %}{{ page.html }}{% endif %}</textarea><br>
<button class="btn btn-theme btn-outlined create-challenge pull-right">Create</button>
<button class="btn btn-theme btn-outlined create-challenge pull-right">
{% if page is defined %}
Update
{% else %}
Create
{% endif %}
</button>
</div>
</div>
</form>

View File

@ -2,6 +2,12 @@
{% block stylesheets %}
<link rel="stylesheet" type="text/css" href="{{ request.script_root }}/static/admin/css/vendor/codemirror.min.css">
<style>
.CodeMirror {
padding-left: 15px;
margin-bottom: 10px;
}
</style>
{% endblock %}
{% block content %}
@ -32,29 +38,41 @@
</div>
</div>
<div class="col-md-9">
<h3>CSS editor <a onclick="save_css()"><i class="fa fa-floppy-o"></i></a></h3>
<textarea id="pages-editor" name="html">{{ css }}</textarea>
</div>
<div class="col-md-3">
<h3>HTML Pages <a href="{{ request.script_root }}/admin/pages?mode=create"><i class="fa fa-plus"></i></a></h3>
<div class="col-md-6">
<h3>Pages</h3>
<table id="pages" class="table table-striped">
<thead>
<tr>
<td><b>Route</b></td>
<td class="text-center" style="width: 150px;"><b>Settings</b></td>
</tr>
<tr>
<td><b>Route</b></td>
<td class="text-center" style="width: 150px;"><b>Settings</b></td>
</tr>
</thead>
<tbody>
{% for route in routes %}
<tr name="{{ route.route }}">
<td class="route-name"><a href="{{ request.script_root }}/admin/pages/{{ route.route }}">{{ route.route }}</a></td>
<td class="route-name"><a
href="{{ request.script_root }}/admin/pages/{{ route.route }}">{{ route.route }}</a></td>
<td class="text-center"><i class="fa fa-times"></i></td>
</tr>
{% endfor %}
</tbody>
</table>
<a href="{{ request.script_root }}/admin/pages?mode=create">
<button type="button" id="submit" class="btn btn-md btn-primary btn-theme btn-outlined">
Add Page
</button>
</a>
</div>
<div class="col-md-6">
<h3>CSS editor</h3>
<form method="POST">
<textarea id="pages-editor" name="css" style="padding-left:20px;">{{ css }}</textarea>
<button onclick="save_css()" type="button" id="submit" class="btn btn-md btn-primary btn-theme btn-outlined">
Update CSS
</button>
</form>
</div>
</div>
{% endblock %}

View File

@ -1,33 +1,29 @@
from CTFd.models import db, WrongKeys, Pages, Config, Tracking, Teams, Containers, ip2long, long2ip
from six.moves.urllib.parse import urlparse, urljoin
import six
from werkzeug.utils import secure_filename
from functools import wraps
from flask import current_app as app, g, request, redirect, url_for, session, render_template, abort
from flask_caching import Cache
from itsdangerous import Signer, BadSignature
from socket import inet_aton, inet_ntoa, socket
from struct import unpack, pack, error
from sqlalchemy.engine.url import make_url
from sqlalchemy import create_engine
import time
import datetime
import hashlib
import shutil
import requests
import logging
import os
import sys
import re
import time
import smtplib
import email
import tempfile
import subprocess
import urllib
import functools
import hashlib
import json
import logging
import logging.handlers
import os
import re
import requests
import shutil
import smtplib
import socket
import subprocess
import sys
import tempfile
import time
import urllib
from flask import current_app as app, request, redirect, url_for, session, render_template, abort
from flask_caching import Cache
from itsdangerous import Signer
import six
from six.moves.urllib.parse import urlparse, urljoin
from CTFd.models import db, WrongKeys, Pages, Config, Tracking, Teams, Containers, ip2long, long2ip
cache = Cache()
@ -150,7 +146,7 @@ def ctf_theme():
def pages():
pages = Pages.query.filter(Pages.route!="index").all()
pages = Pages.query.filter(Pages.route != "index").all()
return pages
@ -191,7 +187,7 @@ def can_register():
def admins_only(f):
@wraps(f)
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if session.get('admin'):
return f(*args, **kwargs)
@ -368,7 +364,7 @@ def sendmail(addr, text):
if mailgun():
mg_api_key = get_config('mg_api_key') or app.config.get('MAILGUN_API_KEY')
mg_base_url = get_config('mg_base_url') or app.config.get('MAILGUN_BASE_URL')
r = requests.post(
mg_base_url + '/messages',
auth=("api", mg_api_key),
@ -438,14 +434,14 @@ def sha512(string):
@cache.memoize()
def can_create_container():
try:
output = subprocess.check_output(['docker', 'version'])
subprocess.check_output(['docker', 'version'])
return True
except (subprocess.CalledProcessError, OSError):
return False
def is_port_free(port):
s = socket()
s = socket.socket()
result = s.connect_ex(('127.0.0.1', port))
if result == 0:
s.close()
@ -469,7 +465,7 @@ def create_image(name, buildfile, files):
# docker build -f tmpfile.name -t name
try:
cmd = ['docker', 'build', '-f', tmpfile.name, '-t', name, folder]
print cmd
print(cmd)
subprocess.call(cmd)
container = Containers(name, buildfile)
db.session.add(container)
@ -510,7 +506,7 @@ def run_image(name):
cmd.append('-p')
ports_used.append('{}'.format(port))
cmd += ['--name', name, name]
print cmd
print(cmd)
subprocess.call(cmd)
return True
except subprocess.CalledProcessError:

View File

@ -1,19 +1,13 @@
from flask import current_app as app, render_template, render_template_string, request, redirect, abort, jsonify, json as json_mod, url_for, session, Blueprint, Response, send_file
from CTFd.utils import authed, ip2long, long2ip, is_setup, validate_url, get_config, set_config, sha512, get_ip, cache, ctftime, view_after_ctf, ctf_started, \
is_admin
from CTFd.models import db, Teams, Solves, Awards, Challenges, WrongKeys, Keys, Tags, Files, Tracking, Pages, Config
from jinja2.exceptions import TemplateNotFound
from passlib.hash import bcrypt_sha256
from collections import OrderedDict
import logging
import os
import re
import sys
import json
import os
import datetime
from flask import current_app as app, render_template, request, redirect, abort, jsonify, url_for, session, Blueprint, Response, send_file
from jinja2.exceptions import TemplateNotFound
from passlib.hash import bcrypt_sha256
from CTFd.utils import authed, is_setup, validate_url, get_config, set_config, sha512, cache, ctftime, view_after_ctf, ctf_started, \
is_admin
from CTFd.models import db, Teams, Solves, Awards, Files, Pages
views = Blueprint('views', __name__)
@ -38,10 +32,10 @@ def setup():
ctf_name = request.form['ctf_name']
ctf_name = set_config('ctf_name', ctf_name)
## CSS
# CSS
css = set_config('start', '')
## Admin user
# Admin user
name = request.form['name']
email = request.form['email']
password = request.form['password']
@ -49,7 +43,7 @@ def setup():
admin.admin = True
admin.banned = True
## Index page
# Index page
page = Pages('index', """<div class="container main-container">
<img class="logo" src="{0}/static/original/img/logo.png" />
<h3 class="text-center">
@ -61,20 +55,20 @@ def setup():
</h4>
</div>""".format(request.script_root))
#max attempts per challenge
max_tries = set_config("max_tries",0)
# max attempts per challenge
max_tries = set_config("max_tries", 0)
## Start time
# Start time
start = set_config('start', None)
end = set_config('end', None)
## Challenges cannot be viewed by unregistered users
# Challenges cannot be viewed by unregistered users
view_challenges_unregistered = set_config('view_challenges_unregistered', None)
## Allow/Disallow registration
# Allow/Disallow registration
prevent_registration = set_config('prevent_registration', None)
## Verify emails
# Verify emails
verify_emails = set_config('verify_emails', None)
mail_server = set_config('mail_server', None)
@ -118,13 +112,13 @@ def static_html(template):
abort(404)
@views.route('/teams', defaults={'page':'1'})
@views.route('/teams', defaults={'page': '1'})
@views.route('/teams/<page>')
def teams(page):
page = abs(int(page))
results_per_page = 50
page_start = results_per_page * ( page - 1 )
page_end = results_per_page * ( page - 1 ) + results_per_page
page_start = results_per_page * (page - 1)
page_end = results_per_page * (page - 1) + results_per_page
if get_config('verify_emails'):
count = Teams.query.filter_by(verified=True, banned=False).count()
@ -150,9 +144,9 @@ def team(teamid):
if request.method == 'GET':
return render_template('team.html', solves=solves, awards=awards, team=user, score=score, place=place)
elif request.method == 'POST':
json = {'solves':[]}
json = {'solves': []}
for x in solves:
json['solves'].append({'id':x.id, 'chal':x.chalid, 'team':x.teamid})
json['solves'].append({'id': x.id, 'chal': x.chalid, 'team': x.teamid})
return jsonify(json)
@ -182,7 +176,7 @@ def profile():
errors.append("Your old password doesn't match what we have.")
if not valid_email:
errors.append("That email doesn't look right")
if not get_config('prevent_name_change') and names and name!=session['username']:
if not get_config('prevent_name_change') and names and name != session['username']:
errors.append('That team name is already taken')
if emails and emails.id != session['id']:
errors.append('That email has already been used')
@ -238,4 +232,4 @@ def file_handler(path):
pass
else:
abort(403)
return send_file(os.path.join(app.root_path, 'uploads', f.location))
return send_file(os.path.join(app.root_path, 'uploads', f.location))

View File

@ -1,15 +1,12 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from CTFd.models import Teams, Solves, Challenges, WrongKeys, Keys, Tags, Files, Tracking
from CTFd import create_app
from random import randint
import datetime
import random
import hashlib
import os
import sys
import random
from CTFd import create_app
from CTFd.models import Teams, Solves, Challenges, WrongKeys, Keys, Files
app = create_app()
@ -211,14 +208,14 @@ def gen_file():
def random_date(start, end):
return start + datetime.timedelta(
seconds=randint(0, int((end - start).total_seconds())))
seconds=random.randint(0, int((end - start).total_seconds())))
if __name__ == '__main__':
with app.app_context():
db = app.db
### Generating Challenges
# Generating Challenges
print("GENERATING CHALLENGES")
for x in range(CHAL_AMOUNT):
word = gen_word()
@ -228,7 +225,7 @@ if __name__ == '__main__':
db.session.add(Keys(x + 1, word, 0))
db.session.commit()
### Generating Files
# Generating Files
print("GENERATING FILES")
AMT_CHALS_WITH_FILES = int(CHAL_AMOUNT * (3.0 / 4.0))
for x in range(AMT_CHALS_WITH_FILES):
@ -236,9 +233,10 @@ if __name__ == '__main__':
filename = gen_file()
md5hash = hashlib.md5(filename).hexdigest()
db.session.add(Files(chal, md5hash + '/' + filename))
db.session.commit()
### Generating Users
# Generating Users
print("GENERATING USERS")
used = []
count = 0
@ -250,9 +248,10 @@ if __name__ == '__main__':
team.verified = True
db.session.add(team)
count += 1
db.session.commit()
### Generating Solves
# Generating Solves
print("GENERATING SOLVES")
for x in range(USER_AMOUNT):
used = []
@ -268,9 +267,10 @@ if __name__ == '__main__':
base_time = new_base
db.session.add(solve)
db.session.commit()
### Generating Wrong Keys
# Generating Wrong Keys
print("GENERATING WRONG KEYS")
for x in range(USER_AMOUNT):
used = []
@ -286,5 +286,6 @@ if __name__ == '__main__':
base_time = new_base
db.session.add(wrong)
db.session.commit()
db.session.close()

View File

@ -1,3 +1,4 @@
from CTFd import create_app
app = create_app()
app.run(debug=True, threaded=True, host="0.0.0.0", port=4000)

View File

@ -2,11 +2,9 @@ from CTFd import create_app
from sqlalchemy_utils import database_exists, create_database, drop_database
from sqlalchemy.engine.url import make_url
def create_ctfd(ctf_name="CTFd", name="admin", email="admin@ctfd.io", password="password"):
app = create_app()
app.config['PRESERVE_CONTEXT_ON_EXCEPTION'] = False
app.config['TESTING'] = True
app.config['DEBUG'] = True
app = create_app('CTFd.config.TestingConfig')
url = make_url(app.config['SQLALCHEMY_DATABASE_URI'])
if url.drivername == 'postgres':
@ -15,7 +13,8 @@ def create_ctfd(ctf_name="CTFd", name="admin", email="admin@ctfd.io", password="
if database_exists(url):
drop_database(url)
create_database(url)
app.db.create_all()
with app.app_context():
app.db.create_all()
with app.app_context():
with app.test_client() as client:

View File

View File

@ -1,42 +0,0 @@
from helpers import create_ctfd, register_user, login_as_user
from CTFd.models import Teams
def test_index():
"""Does the index page return a 200 by default"""
app = create_ctfd()
with app.app_context():
with app.test_client() as client:
r = client.get('/')
assert r.status_code == 200
def test_register_user():
"""Tests whether a user can be registered"""
app = create_ctfd()
with app.app_context():
register_user(app)
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
assert team_count == 2 # There's the admin user and the created user
def test_user_login():
"""Tests to see if a registered user can login"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/profile')
assert r.location != "http://localhost/login" # We didn't get redirected to login
assert r.status_code == 200
def test_user_isnt_admin():
"""Tests to see if a registered user cannot access admin pages"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/admin/graphs')
assert r.location == "http://localhost/login"
assert r.status_code == 302

194
tests/test_user_facing.py Normal file
View File

@ -0,0 +1,194 @@
from helpers import create_ctfd, register_user, login_as_user
from CTFd.models import Teams
def test_index():
"""Does the index page return a 200 by default"""
app = create_ctfd()
with app.app_context():
with app.test_client() as client:
r = client.get('/')
assert r.status_code == 200
def test_register_user():
"""Can a user can be registered"""
app = create_ctfd()
with app.app_context():
register_user(app)
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
assert team_count == 2 # There's the admin user and the created user
def test_register_duplicate_teamname():
"""A user shouldn't be able to use and already registered team name"""
app = create_ctfd()
with app.app_context():
register_user(app, name="user1", email="user1@ctfd.io", password="password")
register_user(app, name="user1", email="user2@ctfd.io", password="password")
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
assert team_count == 2 # There's the admin user and the first created user
def test_register_duplicate_email():
"""A user shouldn't be able to use an already registered email address"""
app = create_ctfd()
with app.app_context():
register_user(app, name="user1", email="user1@ctfd.io", password="password")
register_user(app, name="user2", email="user1@ctfd.io", password="password")
team_count = app.db.session.query(app.db.func.count(Teams.id)).first()[0]
assert team_count == 2 # There's the admin user and the first created user
def test_user_bad_login():
"""A user should not be able to login with an incorrect password"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app, name="user", password="wrong_password")
r = client.get('/profile')
assert r.location.startswith("http://localhost/login") # We got redirected to login
def test_user_login():
"""Can a registered user can login"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/profile')
assert r.location != "http://localhost/login" # We didn't get redirected to login
assert r.status_code == 200
def test_user_isnt_admin():
"""A registered user cannot access admin pages"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/admin/graphs')
assert r.location == "http://localhost/login"
assert r.status_code == 302
def test_user_get_teams():
"""Can a registered user can load /teams"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/teams')
assert r.status_code == 200
def test_user_get_scoreboard():
"""Can a registered user can load /scoreboard"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/scoreboard')
assert r.status_code == 200
def test_user_get_scores():
"""Can a registered user can load /scores"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/scores')
assert r.status_code == 200
def test_user_get_topteams():
"""Can a registered user can load /top/10"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/top/10')
assert r.status_code == 200
def test_user_get_challenges():
"""Can a registered user can load /challenges"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/challenges')
assert r.status_code == 200
def test_user_get_chals():
"""Can a registered user can load /chals"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/chals')
assert r.status_code == 200
def test_user_get_solves_per_chal():
"""Can a registered user can load /chals/solves"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/chals/solves')
assert r.status_code == 200
def test_user_get_solves():
"""Can a registered user can load /solves"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/solves')
assert r.status_code == 200
def test_user_get_team_page():
"""Can a registered user can load their public profile (/team/2)"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/team/2')
assert r.status_code == 200
def test_user_get_profile():
"""Can a registered user can load their private profile (/profile)"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
r = client.get('/profile')
assert r.status_code == 200
def test_user_get_logout():
"""Can a registered user can load /logout"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = login_as_user(app)
client.get('/logout', follow_redirects=True)
r = client.get('/challenges')
assert r.location == "http://localhost/login?next=challenges"
assert r.status_code == 302
def test_user_get_reset_password():
"""Can an unregistered user can load /reset_password"""
app = create_ctfd()
with app.app_context():
register_user(app)
client = app.test_client()
r = client.get('/reset_password')
assert r.status_code == 200

0
tests/test_utils.py Normal file
View File

View File

@ -1,2 +1,3 @@
from CTFd import create_app
app = create_app()