Block new user registration if registering via MLC (#840)

* Block new user registration if registering via MLC
* Allow login with MLC while registration is disabled
selenium-screenshot-testing
Kevin Chung 2019-01-19 16:00:29 -05:00 committed by GitHub
parent f8607c3d5c
commit 3af036b4b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 182 additions and 8 deletions

View File

@ -21,6 +21,7 @@ from CTFd.utils.decorators.visibility import check_registration_visibility
from CTFd.utils.modes import TEAMS_MODE, USERS_MODE from CTFd.utils.modes import TEAMS_MODE, USERS_MODE
from CTFd.utils.security.signing import serialize, unserialize, SignatureExpired, BadSignature, BadTimeSignature from CTFd.utils.security.signing import serialize, unserialize, SignatureExpired, BadSignature, BadTimeSignature
from CTFd.utils.helpers import info_for, error_for, get_errors, get_infos from CTFd.utils.helpers import info_for, error_for, get_errors, get_infos
from CTFd.utils.config.visibility import registration_visible
import base64 import base64
import requests import requests
@ -319,6 +320,8 @@ def oauth_redirect():
user = Users.query.filter_by(email=user_email).first() user = Users.query.filter_by(email=user_email).first()
if user is None: if user is None:
# Check if we are allowing registration before creating users
if registration_visible():
user = Users( user = Users(
name=user_name, name=user_name,
email=user_email, email=user_email,
@ -327,6 +330,13 @@ def oauth_redirect():
) )
db.session.add(user) db.session.add(user)
db.session.commit() db.session.commit()
else:
log('logins', "[{date}] {ip} - Public registration via MLC blocked")
error_for(
endpoint='auth.login',
message='Public registration is disabled. Please try again later.'
)
return redirect(url_for('auth.login'))
if get_config('user_mode') == TEAMS_MODE: if get_config('user_mode') == TEAMS_MODE:
team_id = api_data['team']['id'] team_id = api_data['team']['id']
@ -344,6 +354,11 @@ def oauth_redirect():
team.members.append(user) team.members.append(user)
db.session.commit() db.session.commit()
if user.oauth_id is None:
user.oauth_id = user_id
user.verified = True
db.session.commit()
login_user(user) login_user(user)
return redirect(url_for('challenges.listing')) return redirect(url_for('challenges.listing'))

View File

@ -7,9 +7,11 @@ from CTFd.cache import cache
from sqlalchemy_utils import database_exists, create_database, drop_database from sqlalchemy_utils import database_exists, create_database, drop_database
from sqlalchemy.engine.url import make_url from sqlalchemy.engine.url import make_url
from collections import namedtuple from collections import namedtuple
from mock import Mock, patch
import datetime import datetime
import six import six
import gc import gc
import requests
if six.PY2: if six.PY2:
text_type = unicode text_type = unicode
@ -130,6 +132,59 @@ def login_as_user(app, name="user", password="password", raise_for_error=True):
return client return client
def login_with_mlc(app, name='user', scope='profile%20team', email='user@ctfd.io', oauth_id=1337, team_name='TestTeam', team_oauth_id=1234, raise_for_error=True):
with app.test_client() as client, \
patch.object(requests, 'get') as fake_get_request, \
patch.object(requests, 'post') as fake_post_request:
client.get('/login')
with client.session_transaction() as sess:
nonce = sess['nonce']
redirect_url = "{endpoint}?response_type=code&client_id={client_id}&scope={scope}&state={state}".format(
endpoint=app.config['OAUTH_AUTHORIZATION_ENDPOINT'],
client_id=app.config['OAUTH_CLIENT_ID'],
scope=scope,
state=nonce
)
r = client.get('/oauth', follow_redirects=False)
assert r.location == redirect_url
fake_post_response = Mock()
fake_post_request.return_value = fake_post_response
fake_post_response.status_code = 200
fake_post_response.json = lambda: {
'access_token': 'fake_mlc_access_token'
}
fake_get_response = Mock()
fake_get_request.return_value = fake_get_response
fake_get_response.status_code = 200
fake_get_response.json = lambda: {
'id': oauth_id,
'name': name,
'email': email,
'team': {
'id': team_oauth_id,
'name': team_name
}
}
client.get('/redirect?code={code}&state={state}'.format(
code='mlc_test_code',
state=nonce
), follow_redirects=False)
if raise_for_error:
with client.session_transaction() as sess:
assert sess['id']
assert sess['name']
assert sess['type']
assert sess['email']
assert sess['nonce']
return client
def get_scores(user): def get_scores(user):
r = user.get('/api/v1/scoreboard') r = user.get('/api/v1/scoreboard')
scores = r.get_json() scores = r.get_json()

0
tests/oauth/__init__.py Normal file
View File

View File

@ -0,0 +1,104 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from tests.helpers import *
from CTFd.utils import set_config
def test_oauth_not_configured():
"""Test that OAuth redirection fails if OAuth settings aren't configured"""
app = create_ctfd()
with app.app_context():
with app.test_client() as client:
r = client.get('/oauth', follow_redirects=False)
assert r.location == 'http://localhost/login'
r = client.get(r.location)
resp = r.get_data(as_text=True)
assert "OAuth Settings not configured" in resp
destroy_ctfd(app)
def test_oauth_configured_flow():
"""Test that MLC integration works properly but does not allow registration (account creation) if disabled"""
app = create_ctfd(user_mode="teams")
app.config.update({
'OAUTH_CLIENT_ID': 'ctfd_testing_client_id',
'OAUTH_CLIENT_SECRET': 'ctfd_testing_client_secret',
'OAUTH_AUTHORIZATION_ENDPOINT': 'http://auth.localhost/oauth/authorize',
'OAUTH_TOKEN_ENDPOINT': 'http://auth.localhost/oauth/token',
'OAUTH_API_ENDPOINT': 'http://api.localhost/user',
})
with app.app_context():
set_config('registration_visibility', 'private')
assert Users.query.count() == 1
assert Teams.query.count() == 0
client = login_with_mlc(app, raise_for_error=False)
assert Users.query.count() == 1
# Users shouldn't be able to register because registration is disabled
resp = client.get('http://localhost/login').get_data(as_text=True)
assert 'Public registration is disabled' in resp
set_config('registration_visibility', 'public')
client = login_with_mlc(app)
# Users should be able to register now
assert Users.query.count() == 2
user = Users.query.filter_by(email='user@ctfd.io').first()
assert user.oauth_id == 1337
assert user.team_id == 1
# Teams should be created
assert Teams.query.count() == 1
team = Teams.query.filter_by(id=1).first()
assert team.oauth_id == 1234
client.get('/logout')
# Users should still be able to login if registration is disabled
set_config('registration_visibility', 'private')
client = login_with_mlc(app)
with client.session_transaction() as sess:
assert sess['id']
assert sess['name']
assert sess['type']
assert sess['email']
assert sess['nonce']
destroy_ctfd(app)
def test_oauth_login_upgrade():
"""Test that users who use MLC after having registered will be associated with their MLC account"""
app = create_ctfd(user_mode="teams")
app.config.update({
'OAUTH_CLIENT_ID': 'ctfd_testing_client_id',
'OAUTH_CLIENT_SECRET': 'ctfd_testing_client_secret',
'OAUTH_AUTHORIZATION_ENDPOINT': 'http://auth.localhost/oauth/authorize',
'OAUTH_TOKEN_ENDPOINT': 'http://auth.localhost/oauth/token',
'OAUTH_API_ENDPOINT': 'http://api.localhost/user',
})
with app.app_context():
register_user(app)
assert Users.query.count() == 2
set_config('registration_visibility', 'private')
# Users should still be able to login
client = login_as_user(app)
client.get('/logout')
user = Users.query.filter_by(id=2).first()
assert user.oauth_id is None
assert user.team_id is None
login_with_mlc(app)
assert Users.query.count() == 2
# Logging in with MLC should insert an OAuth ID and team ID
user = Users.query.filter_by(id=2).first()
assert user.oauth_id
assert user.verified
assert user.team_id
destroy_ctfd(app)