from flask import Flask, render_template, session, redirect, url_for, request, g, flash, jsonify app = Flask(__name__) from database import User, Team, UserAccess, Challenge, ChallengeSolve, ChallengeFailure, ScoreAdjustment, TroubleTicket, TicketComment, Notification, db from datetime import datetime, timedelta from peewee import fn from utils import decorators, flag, cache, misc, captcha, email, select import utils.scoreboard import config import utils import redis import requests import socket app.secret_key = config.secret.key import logging if config.production: logging.basicConfig(level=logging.INFO) else: logging.basicConfig(level=logging.DEBUG) @app.before_request def make_info_available(): if "user_id" in session: try: g.user = User.get(User.id == session["user_id"]) g.user_restricts = g.user.restricts.split(",") g.team = g.user.team g.team_restricts = g.team.restricts.split(",") except User.DoesNotExist: session.pop("user_id") return render_template("login.html") else: g.logged_in = False @app.context_processor def scoreboard_variables(): var = dict(config=config, select=select) if "user_id" in session: var["logged_in"] = True var["user"] = g.user var["team"] = g.team # TODO should this apply to users or teams? # var["notifications"] = Notification.select().where(Notification.user == g.user) var["notifications"] = [] else: var["logged_in"] = False var["notifications"] = [] return var # Blueprints from modules import api, admin app.register_blueprint(api.api) app.register_blueprint(admin.admin) # Publically accessible things @app.route('/') def root(): if g.logged_in: return redirect(url_for('team_dashboard')) return redirect(url_for('register')) @app.route('/chat/') def chat(): return render_template("chat.html") @app.route('/scoreboard/') def scoreboard(): data = cache.get_complex("scoreboard") graphdata = cache.get_complex("graph") if data is None or graphdata is None: if config.immediate_scoreboard: data = utils.scoreboard.calculate_scores() graphdata = utils.scoreboard.calculate_graph(data) utils.scoreboard.set_complex("scoreboard", data, 120) utils.scoreboard.set_complex("graph", graphdata, 120) else: return "No scoreboard data available. Please contact an organizer." return render_template("scoreboard.html", data=data, graphdata=graphdata) @app.route('/login/', methods=["GET", "POST"]) def login(): if request.method == "GET": return render_template("login.html") elif request.method == "POST": username = request.form["username"] password = request.form["password"] try: user = User.get(User.username == username) if(user.checkPassword(password)): UserAccess.create(user=user, ip=misc.get_ip(), time=datetime.now()) session["user_id"] = user.id flash("Login successful.") return redirect(url_for('team_dashboard')) else: flash("Incorrect username or password", "error") return render_template("login.html") except User.DoesNotExist: flash("Incorrect username or password", "error") return render_template("login.html") @app.route('/register/', methods=["GET", "POST"]) def register(): if not config.registration: if "admin" in session and session["admin"]: pass else: return "Registration is currently disabled. Email icectf@icec.tf to create an account." if request.method == "GET": return render_template("register.html") elif request.method == "POST": error, message = captcha.verify_captcha() if error: flash(message) return render_template("register.html") username = request.form["username"].strip() user_email = request.form["email"].strip() password = request.form["password"].strip() confirm_password = request.form["confirm_password"].strip() background = request.form["background"].strip() country = request.form["country"].strip() tshirt_size = "" gender = "" if "tshirt_size" in request.form.keys(): tshirt_size = request.form["tshirt_size"].strip() if "gender" in request.form.keys(): gender = request.form["gender"].strip() join_team = bool(int(request.form["join_team"].strip())) if join_team: team_key = request.form["team_key"].strip() else: team_name = request.form["team_name"].strip() team_affiliation = request.form["team_affiliation"].strip() if len(username) > 50 or not username: flash("You must have a username!") return render_template("register.html") try: user = User.get(User.username == username) flash("This username is already in use!") return render_template("register.html") except User.DoesNotExist: pass if password != confirm_password: flash("Password does not match confirmation") return render_template("register.html") if not (user_email and "." in user_email and "@" in user_email): flash("You must have a valid email!") return render_template("register.html") if not email.is_valid_email(user_email): flash("You're lying") return render_template("register.html") if (not tshirt_size == "") and (not tshirt_size in select.TShirts): flash("Invalid T-shirt size") return render_template("register.html") if not background in select.BackgroundKeys: flash("Invalid Background") return render_template("register.html") if not country in select.CountryKeys: flash("Invalid Background") return render_template("register.html") if (not gender == "") and (not gender in ["M", "F"]): flash("Invalid gender") return render_template("register.html") confirmation_key = misc.generate_confirmation_key() team=None if join_team: try: team = Team.get(Team.key == team_key) except Team.DoesNotExist: flash("Couldn't find this team, check your team key.") return render_template("register.html") else: if not team_name or len(team_name) > 50: flash("Missing team name") return render_template("register.html") if not team_affiliation or len(team_affiliation) > 100: team_affiliation = "No affiliation" try: team = Team.get(Team.name == team_name) flash("This team name is already in use!") return render_template("register.html") except Team.DoesNotExist: pass team_key = misc.generate_team_key() team = Team.create(name=team_name, affiliation=team_affiliation, key=team_key) user = User.create(username=username, email=user_email, background=background, country=country, tshirt_size=tshirt_size, gender=gender, email_confirmation_key=confirmation_key, team=team) user.setPassword(password) user.save() UserAccess.create(user=user, ip=misc.get_ip(), time=datetime.now()) # print(confirmation_key) email.send_confirmation_email(user_email, confirmation_key) session["user_id"] = user.id flash("Registration finished") return redirect(url_for('user_dashboard')) @app.route('/logout/') def logout(): session.pop("user_id") flash("You've successfully logged out.") return redirect(url_for('login')) # Things that require a team @app.route('/confirm_email/', methods=["GET"]) @decorators.login_required def confirm_email(confirmation_key): if confirmation_key == g.user.email_confirmation_key: flash("Email confirmed!") g.user.email_confirmed = True g.user.save() else: flash("Incorrect confirmation key.") return redirect(url_for('user_dashboard')) @app.route('/forgot_password/', methods=["GET", "POST"]) def forgot_password(): if request.method == "GET": return render_template("forgot_password.html") elif request.method == "POST": username = request.form["username"].strip() if len(username) > 50 or not username: flash("You must have a username!") return redirect(url_for('forgot_password')) try: user = User.get(User.username == username) user.password_reset_token = misc.generate_confirmation_key() user.password_reset_expired = datetime.now() + timedelta(days=1) user.save() email.send_password_reset_email(user.email, user.password_reset_token) flash("Forgot password email sent! Check your email.") return render_template("forgot_password.html") except User.DoesNotExist: flash("Username is not registered", "error") return render_template("forgot_password.html") @app.route('/reset_password/', methods=["GET", "POST"]) def reset_password(password_reset_token): if request.method == "GET": return render_template("reset_password.html") elif request.method == "POST": password = request.form["password"].strip() confirm_password = request.form["confirm_password"].strip() if not password == confirm_password: flash("Password does not match") return render_template("reset_password.html", password_reset_token=password_reset_token) if not password_reset_token: flash("Reset Token is invalid", "error") return redirect(url_for("forgot_password")) try: user = User.get(User.password_reset_token == password_reset_token) if user.password_reset_expired < datetime.now(): flash("Token expired") return redirect(url_for("forgot_password")) user.setPassword(password) user.password_reset_token = None user.save() flash("Password successfully reset") return redirect(url_for("login")) except User.DoesNotExist: flash("Reset Token is invalid", "error") return redirect(url_for("forgot_password")) @app.route('/user/', methods=["GET", "POST"]) @decorators.login_required def user_dashboard(): if request.method == "GET": first_login = False if g.user.first_login: first_login = True g.user.first_login = False g.user.save() return render_template("user.html", first_login=first_login) elif request.method == "POST": if g.redis.get("ul{}".format(session["user_id"])): flash("You're changing your information too fast!") return redirect(url_for('user_dashboard')) username = request.form["username"].strip() user_email = request.form["email"].strip() password = request.form["password"].strip() confirm_password = request.form["confirm_password"].strip() background = request.form["background"].strip() country = request.form["country"].strip() tshirt_size = "" gender = "" if "tshirt_size" in request.form.keys(): tshirt_size = request.form["tshirt_size"].strip() if "gender" in request.form.keys(): gender = request.form["gender"].strip() if len(username) > 50 or not username: flash("You must have a username!") return redirect(url_for('user_dashboard')) if g.user.username != username: try: user = User.get(User.username == username) flash("This username is already in use!") return redirect(url_for('user_dashboard')) except User.DoesNotExist: pass if not (user_email and "." in user_email and "@" in user_email): flash("You must have a valid email!") return redirect(url_for('user_dashboard')) if not email.is_valid_email(user_email): flash("You're lying") return redirect(url_for('user_dashboard')) if (not tshirt_size == "") and (not tshirt_size in select.TShirts): flash("Invalid T-shirt size") return redirect(url_for('user_dashboard')) if not background in select.BackgroundKeys: flash("Invalid Background") return redirect(url_for('user_dashboard')) if not country in select.CountryKeys: flash("Invalid Background") return redirect(url_for('user_dashboard')) if (not gender == "") and (not gender in ["M", "F"]): flash("Invalid gender") return redirect(url_for('user_dashboard')) email_changed = (user_email != g.user.email) g.user.username = username g.user.email = user_email g.user.background = background g.user.country = country g.user.gender = gender g.user.tshirt_size = tshirt_size g.redis.set("ul{}".format(session["user_id"]), str(datetime.now()), 120) if password != "": if password != confirm_password: flash("Password does not match confirmation") return redirect(url_for('user_dashboard')) g.user.setPassword(password) if email_changed: g.user.email_confirmation_key = misc.generate_confirmation_key() g.user.email_confirmed = False email.send_confirmation_email(user_email, g.user.email_confirmation_key) flash("Changes saved. Please check your email for a new confirmation key.") else: flash("Changes saved.") g.user.save() return redirect(url_for('user_dashboard')) @app.route('/team/', methods=["GET", "POST"]) @decorators.login_required def team_dashboard(): if request.method == "GET": team_solves = ChallengeSolve.select(ChallengeSolve, Challenge).join(Challenge).where(ChallengeSolve.team == g.team) team_adjustments = ScoreAdjustment.select().where(ScoreAdjustment.team == g.team) team_score = sum([i.challenge.points for i in team_solves] + [i.value for i in team_adjustments]) return render_template("team.html", team_solves=team_solves, team_adjustments=team_adjustments, team_score=team_score) elif request.method == "POST": if g.redis.get("ul{}".format(session["user_id"])): flash("You're changing your information too fast!") return redirect(url_for('team_dashboard')) team_name = request.form["team_name"].strip() affiliation = request.form["team_affiliation"].strip() if len(team_name) > 50 or not team_name: flash("You must have a team name!") return redirect(url_for('team_dashboard')) if not affiliation or len(affiliation) > 100: affiliation = "No affiliation" if g.team.name != team_name: try: team = Team.get(Team.name == team_name) flash("This team name is already in use!") return redirect(url_for('team_dashboard')) except Team.DoesNotExist: pass g.team.name = team_name g.team.affiliation = affiliation g.redis.set("ul{}".format(session["user_id"]), str(datetime.now()), 120) flash("Changes saved.") g.team.save() return redirect(url_for('team_dashboard')) @app.route('/teamconfirm/', methods=["POST"]) def teamconfirm(): if utils.misc.get_ip() in config.confirm_ip: team_name = request.form["team_name"].strip() team_key = request.form["team_key"].strip() try: team = Team.get(Team.name == team_name) except Team.DoesNotExist: return "invalid", 403 if team.key == team_key: return "ok", 200 else: return "invalid", 403 else: return "unauthorized", 401 @app.route('/challenges/') @decorators.must_be_allowed_to("view challenges") @decorators.competition_running_required @decorators.confirmed_email_required def challenges(): chals = Challenge.select().order_by(Challenge.points, Challenge.name) solved = Challenge.select().join(ChallengeSolve).where(ChallengeSolve.team == g.team) solves = {i: int(g.redis.hget("solves", i).decode()) for i in [k.id for k in chals]} categories = sorted(list({chal.category for chal in chals})) return render_template("challenges.html", challenges=chals, solved=solved, categories=categories, solves=solves) @app.route('/challenges//solves/') @decorators.must_be_allowed_to("view challenge solves") @decorators.must_be_allowed_to("view challenges") @decorators.competition_running_required @decorators.confirmed_email_required def challenge_show_solves(challenge): chal = Challenge.get(Challenge.id == challenge) solves = ChallengeSolve.select(ChallengeSolve, Team).join(Team).order_by(ChallengeSolve.time).where(ChallengeSolve.challenge == chal) return render_template("challenge_solves.html", challenge=chal, solves=solves) @app.route('/submit//', methods=["POST"]) @decorators.must_be_allowed_to("solve challenges") @decorators.must_be_allowed_to("view challenges") @decorators.competition_running_required @decorators.confirmed_email_required def submit(challenge): chal = Challenge.get(Challenge.id == challenge) flagval = request.form["flag"] code, message = flag.submit_flag(g.team, chal, flagval) flash(message) return redirect(url_for('challenges')) # Trouble tickets @app.route('/tickets/') @decorators.must_be_allowed_to("view tickets") @decorators.login_required def team_tickets(): return render_template("tickets.html", tickets=list(g.team.tickets)) @app.route('/tickets/new/', methods=["GET", "POST"]) @decorators.must_be_allowed_to("submit tickets") @decorators.must_be_allowed_to("view tickets") @decorators.login_required def open_ticket(): if request.method == "GET": return render_template("open_ticket.html") elif request.method == "POST": if g.redis.get("ticketl{}".format(session["user_id"])): return "You're doing that too fast." g.redis.set("ticketl{}".format(g.team.id), "1", 30) summary = request.form["summary"] description = request.form["description"] opened_at = datetime.now() ticket = TroubleTicket.create(team=g.team, summary=summary, description=description, opened_at=opened_at) flash("Ticket #{} opened.".format(ticket.id)) return redirect(url_for("team_ticket_detail", ticket=ticket.id)) @app.route('/tickets//') @decorators.must_be_allowed_to("view tickets") @decorators.login_required def team_ticket_detail(ticket): try: ticket = TroubleTicket.get(TroubleTicket.id == ticket) except TroubleTicket.DoesNotExist: flash("Couldn't find ticket #{}.".format(ticket)) return redirect(url_for("team_tickets")) if ticket.team != g.team: flash("That's not your ticket.") return redirect(url_for("team_tickets")) comments = TicketComment.select().where(TicketComment.ticket == ticket).order_by(TicketComment.time) return render_template("ticket_detail.html", ticket=ticket, comments=comments) @app.route('/tickets//comment/', methods=["POST"]) @decorators.must_be_allowed_to("comment on tickets") @decorators.must_be_allowed_to("view tickets") def team_ticket_comment(ticket): if g.redis.get("ticketl{}".format(session["user_id"])): return "You're doing that too fast." g.redis.set("ticketl{}".format(g.team.id), "1", 30) try: ticket = TroubleTicket.get(TroubleTicket.id == ticket) except TroubleTicket.DoesNotExist: flash("Couldn't find ticket #{}.".format(ticket)) return redirect(url_for("team_tickets")) if ticket.team != g.team: flash("That's not your ticket.") return redirect(url_for("team_tickets")) if request.form["comment"]: TicketComment.create(ticket=ticket, comment_by=g.team.name, comment=request.form["comment"], time=datetime.now()) flash("Comment added.") if ticket.active and "resolved" in request.form: ticket.active = False ticket.save() flash("Ticket closed.") elif not ticket.active and "resolved" not in request.form: ticket.active = True ticket.save() flash("Ticket re-opened.") return redirect(url_for("team_ticket_detail", ticket=ticket.id)) # Debug @app.route('/debug/') def debug_app(): return jsonify(hostname=socket.gethostname()) # Manage Peewee database sessions and Redis @app.before_request def before_request(): g.connected = True db.connect() g.redis = redis.StrictRedis(host=config.redis.host, port=config.redis.port, db=config.redis.db) @app.teardown_request def teardown_request(exc): if getattr(g, 'connected', False): db.close() g.redis.connection_pool.disconnect() # CSRF things @app.before_request def csrf_protect(): csrf_exempt = ['/teamconfirm/'] if request.method == "POST": token = session.get('_csrf_token', None) if (not token or token != request.form["_csrf_token"]) and not request.path in csrf_exempt: return "Invalid CSRF token!" def generate_csrf_token(): if '_csrf_token' not in session: session['_csrf_token'] = misc.generate_random_string(64) return session['_csrf_token'] app.jinja_env.globals['csrf_token'] = generate_csrf_token if __name__ == '__main__': app.run(debug=True, port=8001)