ColdCore/app.py

534 lines
19 KiB
Python
Raw Normal View History

2016-03-14 03:17:21 +00:00
from flask import Flask, render_template, session, redirect, url_for, request, g, flash, jsonify
2015-11-08 06:10:26 +00:00
app = Flask(__name__)
from database import User, Team, TeamAccess, Challenge, ChallengeSolve, ChallengeFailure, ScoreAdjustment, TroubleTicket, TicketComment, Notification, db
2015-11-08 06:10:26 +00:00
from datetime import datetime
from peewee import fn
2015-11-10 16:51:45 +00:00
2016-07-12 14:24:19 +00:00
from utils import decorators, flag, cache, misc, captcha, email, select
2015-11-10 16:51:45 +00:00
import utils.scoreboard
2015-11-08 06:10:26 +00:00
import config
import utils
import redis
2015-11-08 19:04:02 +00:00
import requests
2016-03-14 03:15:15 +00:00
import socket
2015-11-08 06:10:26 +00:00
2015-11-29 02:40:11 +00:00
app.secret_key = config.secret.key
2015-11-08 06:10:26 +00:00
import logging
logging.basicConfig(level=logging.DEBUG)
@app.before_request
def make_info_available():
if "user_id" in session:
2016-07-11 23:57:43 +00:00
try:
g.user = User.get(User.id == session["user_id"])
g.user_restricts = g.user.restricts.split(",")
2016-07-12 12:03:04 +00:00
g.team = g.user.team
g.team_restricts = g.team.restricts.split(",")
2016-07-11 23:57:43 +00:00
except User.DoesNotExist:
2016-07-12 14:24:19 +00:00
session.pop("user_id")
2016-07-11 23:57:43 +00:00
return render_template("login.html")
2015-11-08 06:10:26 +00:00
@app.context_processor
def scoreboard_variables():
2016-07-12 14:24:19 +00:00
var = dict(config=config, select=select)
if "user_id" in session:
2015-11-08 06:10:26 +00:00
var["logged_in"] = True
var["user"] = g.user
2016-07-12 12:03:04 +00:00
var["team"] = g.team
2016-07-11 23:57:43 +00:00
# TODO should this apply to users or teams?
# var["notifications"] = Notification.select().where(Notification.user == g.user)
var["notifications"] = []
2015-11-08 06:10:26 +00:00
else:
var["logged_in"] = False
var["notifications"] = []
2015-11-08 06:10:26 +00:00
return var
2015-11-12 03:41:04 +00:00
# Blueprints
from modules import api, admin
2015-11-12 03:41:04 +00:00
app.register_blueprint(api.api)
app.register_blueprint(admin.admin)
2015-11-12 03:41:04 +00:00
2015-11-08 06:10:26 +00:00
# Publically accessible things
@app.route('/')
def root():
return redirect(url_for('register'))
2015-11-08 06:10:26 +00:00
2016-04-17 17:18:44 +00:00
@app.route('/chat/')
def chat():
return render_template("chat.html")
2015-11-08 06:10:26 +00:00
@app.route('/scoreboard/')
def scoreboard():
2015-11-10 16:51:45 +00:00
data = cache.get_complex("scoreboard")
graphdata = cache.get_complex("graph")
2016-05-27 03:16:08 +00:00
if data is None or graphdata is None:
2016-05-27 01:58:31 +00:00
if config.immediate_scoreboard:
data = utils.scoreboard.calculate_scores()
graphdata = utils.scoreboard.calculate_graph(data)
utils.scoreboard.set_complex("scoreboard", data, 120)
utils.scoreboard.set_complex("graph", graphdata, 120)
else:
return "No scoreboard data available. Please contact an organizer."
2015-11-10 04:41:07 +00:00
2015-11-12 16:40:55 +00:00
return render_template("scoreboard.html", data=data, graphdata=graphdata)
2015-11-08 06:10:26 +00:00
@app.route('/login/', methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template("login.html")
elif request.method == "POST":
username = request.form["username"]
password = request.form["password"]
2015-11-08 06:10:26 +00:00
try:
user = User.get(User.username == username)
if(user.checkPassword(password)):
session["user_id"] = user.id
flash("Login successful.")
2016-07-12 12:03:04 +00:00
return redirect(url_for('team_dashboard'))
else:
flash("Incorrect username or password", "error")
return render_template("login.html")
except User.DoesNotExist:
flash("Incorrect username or password", "error")
2015-11-08 06:10:26 +00:00
return render_template("login.html")
@app.route('/register/', methods=["GET", "POST"])
def register():
2016-04-17 16:33:42 +00:00
if not config.registration:
2016-05-28 04:25:50 +00:00
if "admin" in session and session["admin"]:
pass
else:
2016-07-09 21:02:29 +00:00
return "Registration is currently disabled. Email icectf@icec.tf to create an account."
2016-04-17 16:33:42 +00:00
2015-11-08 06:10:26 +00:00
if request.method == "GET":
return render_template("register.html")
elif request.method == "POST":
error, message = captcha.verify_captcha()
if error:
flash(message)
return render_template("register.html")
2015-11-08 19:04:02 +00:00
username = request.form["username"].strip()
user_email = request.form["email"].strip()
password = request.form["password"].strip()
confirm_password = request.form["confirm_password"].strip()
2016-07-12 12:03:04 +00:00
background = request.form["background"].strip()
country = request.form["country"].strip()
2016-07-12 12:03:04 +00:00
tshirt_size = ""
gender = ""
if "tshirt_size" in request.form.keys():
tshirt_size = request.form["tshirt_size"].strip()
if "gender" in request.form.keys():
gender = request.form["gender"].strip()
2016-07-12 12:03:04 +00:00
join_team = bool(int(request.form["join_team"].strip()))
if join_team:
team_key = request.form["team_key"].strip()
else:
team_name = request.form["team_name"].strip()
team_affiliation = request.form["team_affiliation"].strip()
if len(username) > 50 or not username:
flash("You must have a username!")
return render_template("register.html")
2016-07-12 12:03:04 +00:00
try:
user = User.get(User.username == username)
flash("This username is already in use!")
return render_template("register.html")
except User.DoesNotExist:
pass
2016-07-12 14:24:19 +00:00
if password != confirm_password:
flash("Password does not match confirmation")
return render_template("register.html")
if not (user_email and "." in user_email and "@" in user_email):
flash("You must have a valid email!")
return render_template("register.html")
if not email.is_valid_email(user_email):
2016-05-22 20:16:52 +00:00
flash("You're lying")
return render_template("register.html")
if (not tshirt_size == "") and (not tshirt_size in select.TShirts):
2016-07-12 14:24:19 +00:00
flash("Invalid T-shirt size")
return render_template("register.html")
if not background in select.BackgroundKeys:
flash("Invalid Background")
return render_template("register.html")
if not country in select.CountryKeys:
flash("Invalid Background")
return render_template("register.html")
2016-07-12 19:17:24 +00:00
if (not gender == "") and (not gender in ["M", "F"]):
flash("Invalid gender")
return render_template("register.html")
2015-11-10 16:51:45 +00:00
confirmation_key = misc.generate_confirmation_key()
2015-11-11 12:41:01 +00:00
2016-07-12 12:03:04 +00:00
team=None
if join_team:
try:
team = Team.get(Team.key == team_key)
except Team.DoesNotExist:
flash("Couldn't find this team, check your team key.")
return rener_template("register.html")
else:
if not team_affiliation or len(team_affiliation) > 100:
team_affiliation = "No affiliation"
try:
team = Team.get(Team.name == team_name)
flash("This team name is already in use!")
return render_template("register.html")
except Team.DoesNotExist:
pass
team_key = misc.generate_team_key()
team = Team.create(name=team_name, affiliation=team_affiliation, key=team_key)
user = User.create(username=username, email=user_email,
2016-07-12 12:03:04 +00:00
background=background, country=country,
tshirt_size=tshirt_size, gender=gender,
email_confirmation_key=confirmation_key,
team=team)
user.setPassword(password)
2016-07-12 14:24:19 +00:00
user.save()
2015-11-09 02:44:04 +00:00
# print(confirmation_key)
2016-07-12 12:03:04 +00:00
email.send_confirmation_email(user_email, confirmation_key)
2015-11-09 02:44:04 +00:00
session["user_id"] = user.id
flash("Registration finished")
2016-07-12 14:24:19 +00:00
return redirect(url_for('user_dashboard'))
2015-11-08 06:10:26 +00:00
@app.route('/logout/')
def logout():
session.pop("user_id")
2015-11-08 08:19:44 +00:00
flash("You've successfully logged out.")
2015-11-08 06:10:26 +00:00
return redirect(url_for('root'))
# Things that require a team
2016-07-12 18:59:37 +00:00
@app.route('/confirm_email/<confirmation_key>', methods=["GET"])
2015-11-10 16:51:45 +00:00
@decorators.login_required
2016-07-12 18:59:37 +00:00
def confirm_email(confirmation_key):
if confirmation_key == g.user.email_confirmation_key:
2015-11-09 02:44:04 +00:00
flash("Email confirmed!")
g.user.email_confirmed = True
g.user.save()
2015-11-09 02:44:04 +00:00
else:
flash("Incorrect confirmation key.")
2016-07-12 12:03:04 +00:00
return redirect(url_for('user_dashboard'))
2015-11-09 02:44:04 +00:00
2016-07-11 23:57:43 +00:00
@app.route('/user/', methods=["GET", "POST"])
2015-11-10 16:51:45 +00:00
@decorators.login_required
2016-07-12 12:03:04 +00:00
def user_dashboard():
2015-11-08 06:10:26 +00:00
if request.method == "GET":
2015-11-08 19:04:02 +00:00
first_login = False
2016-07-11 23:57:43 +00:00
if g.user.first_login:
2015-11-08 19:04:02 +00:00
first_login = True
2016-07-11 23:57:43 +00:00
g.user.first_login = False
g.user.save()
2016-07-12 12:03:04 +00:00
return render_template("user.html", first_login=first_login)
2015-11-08 06:10:26 +00:00
elif request.method == "POST":
if g.redis.get("ul{}".format(session["user_id"])):
2015-11-10 04:41:07 +00:00
flash("You're changing your information too fast!")
2016-07-12 12:03:04 +00:00
return redirect(url_for('user_dashboard'))
2015-11-10 04:41:07 +00:00
2016-07-12 14:24:19 +00:00
username = request.form["username"].strip()
user_email = request.form["email"].strip()
password = request.form["password"].strip()
confirm_password = request.form["confirm_password"].strip()
background = request.form["background"].strip()
country = request.form["country"].strip()
2015-11-09 02:44:04 +00:00
2016-07-12 19:17:24 +00:00
tshirt_size = ""
gender = ""
if "tshirt_size" in request.form.keys():
tshirt_size = request.form["tshirt_size"].strip()
if "gender" in request.form.keys():
gender = request.form["gender"].strip()
2016-07-12 14:24:19 +00:00
if len(username) > 50 or not username:
flash("You must have a username!")
return redirect(url_for('user_dashboard'))
if g.user.username != username:
try:
user = User.get(User.username == username)
flash("This username is already in use!")
return redirect(url_for('user_dashboard'))
except User.DoesNotExist:
pass
if not (user_email and "." in user_email and "@" in user_email):
flash("You must have a valid email!")
2016-07-12 12:03:04 +00:00
return redirect(url_for('user_dashboard'))
2016-07-12 14:24:19 +00:00
if not email.is_valid_email(user_email):
flash("You're lying")
2016-07-12 12:03:04 +00:00
return redirect(url_for('user_dashboard'))
2016-07-12 19:17:24 +00:00
if (not tshirt_size == "") and (not tshirt_size in select.TShirts):
2016-07-12 14:24:19 +00:00
flash("Invalid T-shirt size")
return redirect(url_for('user_dashboard'))
2016-07-12 14:24:19 +00:00
if not background in select.BackgroundKeys:
flash("Invalid Background")
return redirect(url_for('user_dashboard'))
2015-11-09 02:44:04 +00:00
2016-07-12 14:24:19 +00:00
if not country in select.CountryKeys:
flash("Invalid Background")
return redirect(url_for('user_dashboard'))
2016-07-12 19:17:24 +00:00
if (not gender == "") and (not gender in ["M", "F"]):
flash("Invalid gender")
return redirect(url_for('user_dashboard'))
2016-07-12 14:24:19 +00:00
email_changed = (user_email != g.user.email)
g.user.username = username
g.user.email = user_email
g.user.background = background
g.user.country = country
g.user.gender = gender
g.user.tshirt_size = tshirt_size
g.redis.set("ul{}".format(session["user_id"]), str(datetime.now()), 120)
2016-07-12 14:24:19 +00:00
if password != "":
if password != confirm_password:
flash("Password does not match confirmation")
2016-07-12 12:03:04 +00:00
return redirect(url_for('user_dashboard'))
2016-07-12 14:24:19 +00:00
g.user.setPassword(password)
2016-05-22 20:16:52 +00:00
2016-07-12 14:24:19 +00:00
if email_changed:
g.user.email_confirmation_key = misc.generate_confirmation_key()
g.user.email_confirmed = False
2016-05-22 20:16:52 +00:00
2016-07-12 14:24:19 +00:00
email.send_confirmation_email(user_email, g.user.email_confirmation_key)
2015-11-09 02:44:04 +00:00
flash("Changes saved. Please check your email for a new confirmation key.")
else:
flash("Changes saved.")
2016-07-12 14:24:19 +00:00
g.user.save()
2015-11-09 02:44:04 +00:00
2016-07-12 12:03:04 +00:00
return redirect(url_for('user_dashboard'))
@app.route('/team/', methods=["GET", "POST"])
@decorators.login_required
def team_dashboard():
if request.method == "GET":
team_solves = ChallengeSolve.select(ChallengeSolve, Challenge).join(Challenge).where(ChallengeSolve.team == g.team)
team_adjustments = ScoreAdjustment.select().where(ScoreAdjustment.team == g.team)
team_score = sum([i.challenge.points for i in team_solves] + [i.value for i in team_adjustments])
return render_template("team.html", team_solves=team_solves, team_adjustments=team_adjustments, team_score=team_score)
elif request.method == "POST":
if g.redis.get("ul{}".format(session["user_id"])):
flash("You're changing your information too fast!")
return redirect(url_for('team_dashboard'))
team_name = request.form["team_name"].strip()
2016-07-12 14:24:19 +00:00
affiliation = request.form["team_affiliation"].strip()
2016-07-12 12:03:04 +00:00
if len(team_name) > 50 or not team_name:
flash("You must have a team name!")
return redirect(url_for('team_dashboard'))
if not affiliation or len(affiliation) > 100:
affiliation = "No affiliation"
2016-07-12 14:24:19 +00:00
if g.team_name != team_name:
try:
team = Team.get(Team.name == team_name)
flash("This team name is already in use!")
return redirect(url_for('team_dashboard'))
except Team.DoesNotExist:
pass
2016-07-12 12:03:04 +00:00
g.team.name = team_name
g.team.affiliation = affiliation
g.redis.set("ul{}".format(session["user_id"]), str(datetime.now()), 120)
flash("Changes saved.")
g.team.save()
return redirect(url_for('team_dashboard'))
2015-11-08 06:10:26 +00:00
2016-05-15 22:05:42 +00:00
@app.route('/teamconfirm/', methods=["POST"])
def teamconfirm():
2016-05-15 22:25:44 +00:00
if utils.misc.get_ip() in config.confirm_ip:
2016-05-15 22:05:42 +00:00
team_name = request.form["team_name"].strip()
team_key = request.form["team_key"].strip()
try:
team = Team.get(Team.name == team_name)
except Team.DoesNotExist:
return "invalid", 403
if team.key == team_key:
return "ok", 200
else:
return "invalid", 403
else:
return "unauthorized", 401
2015-11-08 06:10:26 +00:00
@app.route('/challenges/')
2015-12-17 12:57:35 +00:00
@decorators.must_be_allowed_to("view challenges")
2015-11-10 16:51:45 +00:00
@decorators.competition_running_required
@decorators.confirmed_email_required
2015-11-08 06:10:26 +00:00
def challenges():
2016-04-27 22:07:39 +00:00
chals = Challenge.select().order_by(Challenge.points, Challenge.name)
2015-11-08 06:10:26 +00:00
solved = Challenge.select().join(ChallengeSolve).where(ChallengeSolve.team == g.team)
2015-12-03 23:18:00 +00:00
solves = {i: int(g.redis.hget("solves", i).decode()) for i in [k.id for k in chals]}
categories = sorted(list({chal.category for chal in chals}))
2015-12-03 23:18:00 +00:00
return render_template("challenges.html", challenges=chals, solved=solved, categories=categories, solves=solves)
@app.route('/challenges/<int:challenge>/solves/')
2015-12-17 12:57:35 +00:00
@decorators.must_be_allowed_to("view challenge solves")
@decorators.must_be_allowed_to("view challenges")
2015-12-03 23:18:00 +00:00
@decorators.competition_running_required
@decorators.confirmed_email_required
def challenge_show_solves(challenge):
chal = Challenge.get(Challenge.id == challenge)
solves = ChallengeSolve.select(ChallengeSolve, Team).join(Team).order_by(ChallengeSolve.time).where(ChallengeSolve.challenge == chal)
return render_template("challenge_solves.html", challenge=chal, solves=solves)
2015-11-08 06:10:26 +00:00
@app.route('/submit/<int:challenge>/', methods=["POST"])
2015-12-17 12:57:35 +00:00
@decorators.must_be_allowed_to("solve challenges")
@decorators.must_be_allowed_to("view challenges")
2015-11-10 16:51:45 +00:00
@decorators.competition_running_required
@decorators.confirmed_email_required
2015-11-08 06:10:26 +00:00
def submit(challenge):
chal = Challenge.get(Challenge.id == challenge)
2015-11-10 16:51:45 +00:00
flagval = request.form["flag"]
2015-11-08 06:10:26 +00:00
2015-11-10 16:51:45 +00:00
code, message = flag.submit_flag(g.team, chal, flagval)
flash(message)
2015-11-08 06:10:26 +00:00
return redirect(url_for('challenges'))
# Trouble tickets
@app.route('/tickets/')
2015-12-17 12:57:35 +00:00
@decorators.must_be_allowed_to("view tickets")
@decorators.login_required
def team_tickets():
return render_template("tickets.html", tickets=list(g.team.tickets))
@app.route('/tickets/new/', methods=["GET", "POST"])
2015-12-17 12:57:35 +00:00
@decorators.must_be_allowed_to("submit tickets")
@decorators.must_be_allowed_to("view tickets")
@decorators.login_required
def open_ticket():
if request.method == "GET":
return render_template("open_ticket.html")
elif request.method == "POST":
if g.redis.get("ticketl{}".format(session["user_id"])):
2016-05-28 02:12:38 +00:00
return "You're doing that too fast."
g.redis.set("ticketl{}".format(g.team.id), "1", 30)
summary = request.form["summary"]
description = request.form["description"]
opened_at = datetime.now()
ticket = TroubleTicket.create(team=g.team, summary=summary, description=description, opened_at=opened_at)
flash("Ticket #{} opened.".format(ticket.id))
return redirect(url_for("team_ticket_detail", ticket=ticket.id))
@app.route('/tickets/<int:ticket>/')
2015-12-17 12:57:35 +00:00
@decorators.must_be_allowed_to("view tickets")
@decorators.login_required
def team_ticket_detail(ticket):
try:
ticket = TroubleTicket.get(TroubleTicket.id == ticket)
except TroubleTicket.DoesNotExist:
flash("Couldn't find ticket #{}.".format(ticket))
return redirect(url_for("team_tickets"))
if ticket.team != g.team:
flash("That's not your ticket.")
return redirect(url_for("team_tickets"))
2016-05-28 02:48:57 +00:00
comments = TicketComment.select().where(TicketComment.ticket == ticket).order_by(TicketComment.time)
return render_template("ticket_detail.html", ticket=ticket, comments=comments)
@app.route('/tickets/<int:ticket>/comment/', methods=["POST"])
2015-12-17 12:57:35 +00:00
@decorators.must_be_allowed_to("comment on tickets")
@decorators.must_be_allowed_to("view tickets")
def team_ticket_comment(ticket):
if g.redis.get("ticketl{}".format(session["user_id"])):
2016-05-28 02:12:38 +00:00
return "You're doing that too fast."
g.redis.set("ticketl{}".format(g.team.id), "1", 30)
try:
ticket = TroubleTicket.get(TroubleTicket.id == ticket)
except TroubleTicket.DoesNotExist:
flash("Couldn't find ticket #{}.".format(ticket))
return redirect(url_for("team_tickets"))
if ticket.team != g.team:
flash("That's not your ticket.")
return redirect(url_for("team_tickets"))
if request.form["comment"]:
TicketComment.create(ticket=ticket, comment_by=g.team.name, comment=request.form["comment"], time=datetime.now())
flash("Comment added.")
if ticket.active and "resolved" in request.form:
ticket.active = False
ticket.save()
flash("Ticket closed.")
elif not ticket.active and "resolved" not in request.form:
ticket.active = True
ticket.save()
flash("Ticket re-opened.")
return redirect(url_for("team_ticket_detail", ticket=ticket.id))
2016-03-14 03:15:15 +00:00
# Debug
@app.route('/debug/')
def debug_app():
return jsonify(hostname=socket.gethostname())
2015-11-08 06:10:26 +00:00
# Manage Peewee database sessions and Redis
@app.before_request
def before_request():
2016-07-12 15:27:39 +00:00
g.connected = True
2015-11-08 06:10:26 +00:00
db.connect()
g.redis = redis.StrictRedis()
@app.teardown_request
def teardown_request(exc):
2016-07-12 15:27:39 +00:00
if getattr(g, 'connected', False):
db.close()
g.redis.connection_pool.disconnect()
2015-11-08 06:10:26 +00:00
# CSRF things
@app.before_request
def csrf_protect():
2016-05-16 01:20:48 +00:00
csrf_exempt = ['/teamconfirm/']
2015-11-08 06:10:26 +00:00
if request.method == "POST":
2015-12-03 23:18:00 +00:00
token = session.get('_csrf_token', None)
2016-05-16 01:35:30 +00:00
if (not token or token != request.form["_csrf_token"]) and not request.path in csrf_exempt:
2015-11-08 06:10:26 +00:00
return "Invalid CSRF token!"
def generate_csrf_token():
if '_csrf_token' not in session:
2015-11-10 16:51:45 +00:00
session['_csrf_token'] = misc.generate_random_string(64)
2015-11-08 06:10:26 +00:00
return session['_csrf_token']
app.jinja_env.globals['csrf_token'] = generate_csrf_token
if __name__ == '__main__':
app.run(debug=True, port=8001)