251 lines
9.7 KiB
Python
Executable File
251 lines
9.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from data.database import *
|
|
from datetime import datetime, timedelta
|
|
import config
|
|
import getpass
|
|
import hashlib
|
|
import os
|
|
import os.path
|
|
import shutil
|
|
import sys
|
|
import redis
|
|
import random
|
|
import utils
|
|
import utils.admin
|
|
import utils.misc
|
|
import yaml
|
|
import argparse
|
|
import logging
|
|
|
|
tables = [Team, User, UserAccess, Stage, Challenge, ChallengeSolve, ChallengeFailure, NewsItem, TroubleTicket, TicketComment, Notification, ScoreAdjustment, AdminUser, SshAccount]
|
|
|
|
def create_tables(args):
|
|
check = True
|
|
if args.fail:
|
|
check = False
|
|
[i.create_table(check) for i in tables]
|
|
print("Tables created")
|
|
|
|
def drop_tables(args):
|
|
if input("Are you sure? Type yes to continue: ") == "yes":
|
|
[i.drop_table() for i in tables]
|
|
print("Done")
|
|
else:
|
|
print("Okay, nothing happened.")
|
|
|
|
|
|
|
|
|
|
def gen_team(args):
|
|
n = args.team_count
|
|
chals = list(Challenge.select())
|
|
ctz = datetime.now()
|
|
diff = timedelta(minutes=5)
|
|
for i in range(n):
|
|
name = "Team {} {}".format(i + 1, utils.misc.generate_random_string(length=5))
|
|
team_key = utils.misc.generate_team_key()
|
|
t = Team.create(name=name, affiliation="Autogenerated", key=team_key)
|
|
username = "username{}".format(i+1)
|
|
user = User.create(username=username, email="nope@nope.nope",
|
|
background="university", country="ISL",
|
|
tshirt_size="XL", gender="M",
|
|
email_confirmation_key="autogen", email_confirmed=True,
|
|
team=t)
|
|
t.save()
|
|
password = "password{}".format(user.id)
|
|
user.set_password(password)
|
|
user.save()
|
|
print("Team added with id {}".format(t.id))
|
|
print("User added with username {} and password {}".format(username, password))
|
|
|
|
def add_admin(args):
|
|
username = input("Username: ")
|
|
password = getpass.getpass().encode()
|
|
pwhash = utils.admin.create_password(password)
|
|
r = random.SystemRandom()
|
|
secret = "".join([r.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ234567") for i in range(16)])
|
|
AdminUser.create(username=username, password=pwhash, secret=secret)
|
|
print("AdminUser created; Enter the following key into your favorite TOTP application (Google Authenticator Recommended): {}".format(secret))
|
|
def scan_challenges_problem(d, files):
|
|
staticpaths = {}
|
|
if "static.yml" in files:
|
|
with open(os.path.join(d, "static.yml")) as f:
|
|
statics = yaml.load(f)
|
|
for static in statics:
|
|
h = hashlib.sha256()
|
|
with open(os.path.join(d, static), "rb") as staticfile:
|
|
while True:
|
|
buf = staticfile.read(4096)
|
|
h.update(buf)
|
|
if not buf:
|
|
break
|
|
|
|
if "." in static:
|
|
name, ext = static.split(".", maxsplit=1)
|
|
fn = "{}_{}.{}".format(name, h.hexdigest(), ext)
|
|
else:
|
|
fn = "{}_{}".format(static, h.hexdigest())
|
|
staticpaths[static] = fn
|
|
shutil.copy(os.path.join(d, static), os.path.join(config.static_dir, fn))
|
|
print(fn)
|
|
|
|
with open(os.path.join(d, "problem.yml")) as f:
|
|
data = yaml.load(f)
|
|
print("Inserting problem in directory %s" % (d))
|
|
for i in staticpaths:
|
|
print("looking for |{}|".format(i))
|
|
data["description"] = data["description"].replace("|{}|".format(i), "{}{}".format(config.static_prefix, staticpaths[i]))
|
|
|
|
data["stage"] = Stage.get(Stage.alias == data["stage"])
|
|
query = Challenge.select().where(Challenge.alias == data["alias"])
|
|
if query.exists():
|
|
print("Updating " + str(data["name"]) + "...")
|
|
q = Challenge.update(**data).where(Challenge.alias == data["alias"])
|
|
q.execute()
|
|
else:
|
|
Challenge.create(**data)
|
|
|
|
|
|
def scan_challenges_stage(d, files):
|
|
|
|
with open(os.path.join(d, "stage.yml")) as f:
|
|
data = yaml.load(f)
|
|
query = Stage.select().where(Stage.alias == data["alias"])
|
|
if query.exists():
|
|
print("Updating %s..." % (data["alias"]))
|
|
q = Stage.update(**data).where(Stage.alias == data["alias"])
|
|
q.execute()
|
|
else:
|
|
Stage.create(**data)
|
|
|
|
|
|
def scan_challenges(args):
|
|
path = args.path
|
|
n = 0
|
|
|
|
os.makedirs(config.static_dir, exist_ok=True)
|
|
for root, dirs, files in os.walk(path):
|
|
if "stage.yml" in files:
|
|
scan_challenges_stage(root, files)
|
|
if "problem.yml" in files:
|
|
n += 1
|
|
scan_challenges_problem(root, files)
|
|
|
|
print(n, "challenges loaded")
|
|
|
|
|
|
def add_challenge(args):
|
|
challengefile = args.file
|
|
with open(challengefile) as f:
|
|
chal = Challenge.create(**yaml.load(f))
|
|
print("Challenge added with id {}".format(chal.id))
|
|
|
|
def gen_challenges(args):
|
|
n = args.challenge_count
|
|
for i in range(n):
|
|
name = str(random.randint(0, 999999999))
|
|
chal = Challenge.create(name="Challenge {}".format(name), category="Generated", description="Lorem ipsum, dolor sit amet. The flag is {}".format(name), points=random.randint(50, 400), flag=name, author="autogen")
|
|
print("Challenge added with id {}".format(chal.id))
|
|
|
|
|
|
|
|
def list_challenges(args):
|
|
for chal in Challenge.select():
|
|
print("{} {}".format(str(chal.id).rjust(3), chal.name))
|
|
|
|
|
|
def del_challenge(args):
|
|
id = args.id
|
|
c = Challenge.get(id=id)
|
|
ChallengeFailure.delete().where(ChallengeFailure.challenge == c).execute()
|
|
ChallengeSolve.delete().where(ChallengeSolve.challenge == c).execute()
|
|
c.delete_instance()
|
|
|
|
def clear_challenges(args):
|
|
ChallengeFailure.delete().execute()
|
|
ChallengeSolve.delete().execute()
|
|
Challenge.delete().execute()
|
|
|
|
|
|
def recache_solves(args):
|
|
r = redis.StrictRedis(host=config.redis.host, port=config.redis.port, db=config.redis.db, password=config.redis.password)
|
|
for chal in Challenge.select():
|
|
r.hset("solves", chal.id, chal.solves.count())
|
|
print(r.hvals("solves"))
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="{} problem manager".format(config.ctf_name))
|
|
debug_level = parser.add_mutually_exclusive_group()
|
|
debug_level.add_argument('-v', '--verbose', help="Print intermediate results", action="store_true")
|
|
debug_level.add_argument('-s', '--silent', help="Print out very little", action="store_true")
|
|
subparser = parser.add_subparsers(help='Select one of the following actions')
|
|
|
|
# Problems
|
|
parser_problems = subparser.add_parser('challenges', help='Deal with challenges')
|
|
subparser_problems = parser_problems.add_subparsers(help='Select one of the following actions')
|
|
|
|
parser_problems_scan = subparser_problems.add_parser('scan', help='Scan a path for problems')
|
|
parser_problems_scan.add_argument("path", help="Directory for the path")
|
|
parser_problems_scan.set_defaults(func=scan_challenges)
|
|
|
|
parser_problems_generate = subparser_problems.add_parser('generate', help='Generate some dummy problems')
|
|
parser_problems_generate.add_argument("challenge_count", type=int, help="number of problems to generate")
|
|
parser_problems_generate.set_defaults(func=gen_challenges)
|
|
|
|
parser_problems_add = subparser_problems.add_parser('add', help='add a problem')
|
|
parser_problems_add.add_argument("file", help="file to use")
|
|
parser_problems_add.set_defaults(func=add_challenge)
|
|
|
|
parser_problems_list = subparser_problems.add_parser('list', help='List problems in the database')
|
|
parser_problems_list.set_defaults(func=list_challenges)
|
|
|
|
parser_problems_del = subparser_problems.add_parser('del', help='Delete a problem')
|
|
parser_problems_del.add_argument('id', help="ID of problem to remove")
|
|
parser_problems_del.set_defaults(func=del_challenge)
|
|
|
|
parser_problems_clear = subparser_problems.add_parser('clear', help='Delete a problem')
|
|
parser_problems_clear.set_defaults(func=clear_challenges)
|
|
|
|
|
|
# Database
|
|
parser_database = subparser.add_parser("database", help="Deal with database")
|
|
subparser_database = parser_database.add_subparsers(help="Select one of the following actions")
|
|
|
|
parser_database_create = subparser_database.add_parser("create-tables", help="create tables")
|
|
parser_database_create.add_argument("-f", "--fail", help="Do not check if tables exist", action="store_true")
|
|
parser_database_create.set_defaults(func=create_tables)
|
|
|
|
parser_database_clear = subparser_database.add_parser("drop-tables", help="drop tables")
|
|
parser_database_clear.set_defaults(func=drop_tables)
|
|
|
|
parser_database_recache_solves = subparser_database.add_parser("recache-solves", help="recache solves")
|
|
parser_database_recache_solves.set_defaults(func=recache_solves)
|
|
|
|
# Users
|
|
parser_user = subparser.add_parser("users", help="Deal with users")
|
|
subparser_users = parser_user.add_subparsers(help="Select one of the following actions")
|
|
|
|
parser_users_generate = subparser_users.add_parser('generate', help='Generate some dummy teams')
|
|
parser_users_generate.add_argument("team_count", type=int, help="number of teams to generate")
|
|
parser_users_generate.set_defaults(func=gen_team)
|
|
|
|
parser_users_admin = subparser_users.add_parser('add-admin', help='Create a new administrator')
|
|
parser_users_admin.set_defaults(func=add_admin)
|
|
|
|
args = parser.parse_args()
|
|
if args.silent:
|
|
logging.basicConfig(level=logging.CRITICAL, stream=sys.stdout)
|
|
elif args.verbose:
|
|
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
|
|
else:
|
|
logging.basicConfig(level=logging.WARNING, stream=sys.stdout)
|
|
|
|
if 'func' in args:
|
|
args.func(args)
|
|
else:
|
|
parser.print_help()
|
|
|
|
main()
|
|
|
|
# vim: syntax=python:ft=python
|