mirror of https://github.com/JohnHammond/CTFd.git
Improve imports/exports to reduce the likelihood of a conflict (#523)
* Improve imports/exports to reduce the likelihood of a conflict * To allow previous version imports of keys to work - fill in key `type` from `key_type`. (#520) * Adding an import_ctf testselenium-screenshot-testing
parent
f23fd627ed
commit
a571cf1baf
|
@ -32,7 +32,7 @@ from sqlalchemy.exc import InvalidRequestError, IntegrityError
|
|||
from socket import timeout
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from CTFd.models import db, WrongKeys, Pages, Config, Tracking, Teams, Files, ip2long, long2ip
|
||||
from CTFd.models import db, Challenges, WrongKeys, Pages, Config, Tracking, Teams, Files, ip2long, long2ip
|
||||
|
||||
from datafreeze.format import SERIALIZERS
|
||||
from datafreeze.format.fjson import JSONSerializer, JSONEncoder
|
||||
|
@ -793,6 +793,14 @@ def export_ctf(segments=None):
|
|||
result_file.seek(0)
|
||||
backup_zip.writestr('db/{}.json'.format(item), result_file.read())
|
||||
|
||||
# Guarantee that alembic_version is saved into the export
|
||||
if 'metadata' not in segments:
|
||||
result = db['alembic_version'].all()
|
||||
result_file = six.BytesIO()
|
||||
datafreeze.freeze(result, format='ctfd', fileobj=result_file)
|
||||
result_file.seek(0)
|
||||
backup_zip.writestr('db/alembic_version.json', result_file.read())
|
||||
|
||||
# Backup uploads
|
||||
upload_folder = os.path.join(os.path.normpath(app.root_path), get_config('UPLOAD_FOLDER'))
|
||||
for root, dirs, files in os.walk(upload_folder):
|
||||
|
@ -863,16 +871,23 @@ def import_ctf(backup, segments=None, erase=False):
|
|||
elif item == 'pages':
|
||||
saved = json.loads(data)
|
||||
for entry in saved['results']:
|
||||
# Support migration c12d2a1b0926_add_draft_and_title_to_pages
|
||||
route = entry['route']
|
||||
title = entry.get('title', route.title())
|
||||
html = entry['html']
|
||||
draft = entry.get('draft', False)
|
||||
auth_required = entry.get('auth_required', False)
|
||||
page = Pages.query.filter_by(route=route).first()
|
||||
if page:
|
||||
page.html = html
|
||||
else:
|
||||
page = Pages(route, html)
|
||||
page = Pages(title, route, html, draft=draft, auth_required=auth_required)
|
||||
db.session.add(page)
|
||||
db.session.commit()
|
||||
|
||||
teams_base = db.session.query(db.func.max(Teams.id)).scalar() or 0
|
||||
chals_base = db.session.query(db.func.max(Challenges.id)).scalar() or 0
|
||||
|
||||
for segment in segments:
|
||||
group = groups[segment]
|
||||
for item in group:
|
||||
|
@ -888,11 +903,31 @@ def import_ctf(backup, segments=None, erase=False):
|
|||
if get_config('SQLALCHEMY_DATABASE_URI').startswith('sqlite'):
|
||||
for k, v in entry.items():
|
||||
if isinstance(v, six.string_types):
|
||||
try:
|
||||
match = re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d", v)
|
||||
if match:
|
||||
entry[k] = datetime.datetime.strptime(v, '%Y-%m-%dT%H:%M:%S.%f')
|
||||
continue
|
||||
match = re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", v)
|
||||
if match:
|
||||
entry[k] = datetime.datetime.strptime(v, '%Y-%m-%dT%H:%M:%S')
|
||||
except ValueError as e:
|
||||
pass
|
||||
table.insert(entry)
|
||||
continue
|
||||
for k, v in entry.items():
|
||||
if k == 'chal' or k == 'chalid':
|
||||
entry[k] += chals_base
|
||||
if k == 'team' or k == 'teamid':
|
||||
entry[k] += teams_base
|
||||
|
||||
if item == 'teams':
|
||||
table.insert_ignore(entry, ['email'])
|
||||
elif item == 'keys':
|
||||
# Support migration 2539d8b5082e_rename_key_type_to_type
|
||||
key_type = entry.get('key_type', None)
|
||||
if key_type is not None:
|
||||
entry['type'] = key_type
|
||||
del entry['key_type']
|
||||
table.insert(entry)
|
||||
else:
|
||||
table.insert(entry)
|
||||
else:
|
||||
continue
|
||||
|
||||
|
@ -914,6 +949,6 @@ def import_ctf(backup, segments=None, erase=False):
|
|||
os.makedirs(dirname)
|
||||
|
||||
source = backup.open(f)
|
||||
target = file(full_path, "wb")
|
||||
target = open(full_path, "wb")
|
||||
with source, target:
|
||||
shutil.copyfileobj(source, target)
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
from tests.helpers import *
|
||||
from CTFd.models import ip2long, long2ip
|
||||
from CTFd.utils import get_config, set_config, override_template, sendmail, verify_email, ctf_started, ctf_ended, export_ctf
|
||||
from CTFd.utils import get_config, set_config, override_template, sendmail, verify_email, ctf_started, ctf_ended, export_ctf, import_ctf
|
||||
from CTFd.utils import register_plugin_script, register_plugin_stylesheet
|
||||
from CTFd.utils import base64encode, base64decode
|
||||
from CTFd.utils import check_email_format
|
||||
|
@ -11,6 +11,7 @@ from CTFd.utils import update_check
|
|||
from freezegun import freeze_time
|
||||
from mock import patch, Mock
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import six
|
||||
|
||||
|
@ -348,9 +349,49 @@ def test_export_ctf():
|
|||
output = json.loads(output)
|
||||
app.db.session.commit()
|
||||
backup = export_ctf()
|
||||
backup.seek(0)
|
||||
|
||||
with open('export.zip', 'wb') as f:
|
||||
f.write(backup.getvalue())
|
||||
os.remove('export.zip')
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
def test_import_ctf():
|
||||
"""Test that CTFd can import a CTF"""
|
||||
app = create_ctfd()
|
||||
# TODO: Unrelated to an in-memory database, imports in a test environment are not working with SQLite...
|
||||
if app.config['SQLALCHEMY_DATABASE_URI'].startswith('sqlite') is False:
|
||||
with app.app_context():
|
||||
base_user = 'user'
|
||||
for x in range(10):
|
||||
user = base_user + str(x)
|
||||
user_email = user + "@ctfd.io"
|
||||
gen_team(app.db, name=user, email=user_email)
|
||||
|
||||
for x in range(10):
|
||||
chal = gen_challenge(app.db, name='chal_name{}'.format(x))
|
||||
gen_flag(app.db, chal=chal.id, flag='flag')
|
||||
|
||||
app.db.session.commit()
|
||||
|
||||
backup = export_ctf()
|
||||
|
||||
with open('export.zip', 'wb') as f:
|
||||
f.write(backup.read())
|
||||
destroy_ctfd(app)
|
||||
|
||||
app = create_ctfd()
|
||||
with app.app_context():
|
||||
import_ctf('export.zip')
|
||||
|
||||
app.db.session.commit()
|
||||
|
||||
print(Teams.query.count())
|
||||
print(Challenges.query.count())
|
||||
|
||||
assert Teams.query.count() == 11
|
||||
assert Challenges.query.count() == 10
|
||||
assert Keys.query.count() == 10
|
||||
destroy_ctfd(app)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue