import json from collections import defaultdict import redis from mock import patch from redis.exceptions import ConnectionError from six.moves.queue import Queue from CTFd.config import TestingConfig from CTFd.utils.events import EventManager, RedisEventManager, ServerSentEvent from tests.helpers import create_ctfd, destroy_ctfd, login_as_user, register_user def test_event_manager_installed(): """Test that EventManager is installed on the Flask app""" app = create_ctfd() assert type(app.events_manager) == EventManager destroy_ctfd(app) def test_event_manager_subscription(): """Test that EventManager subscribing works""" with patch.object(Queue, "get") as fake_queue: saved_data = { "user_id": None, "title": "asdf", "content": "asdf", "team_id": None, "user": None, "team": None, "date": "2019-01-28T01:20:46.017649+00:00", "id": 10, } saved_event = {"type": "notification", "data": saved_data} fake_queue.return_value = saved_event event_manager = EventManager() for message in event_manager.subscribe(): assert message.to_dict() == saved_event assert message.__str__().startswith("event:notification\ndata:") assert len(event_manager.clients) == 1 break def test_event_manager_publish(): """Test that EventManager publishing to clients works""" saved_data = { "user_id": None, "title": "asdf", "content": "asdf", "team_id": None, "user": None, "team": None, "date": "2019-01-28T01:20:46.017649+00:00", "id": 10, } event_manager = EventManager() event_manager.clients.append(defaultdict(Queue)) event_manager.publish(data=saved_data, type="notification", channel="ctf") event = event_manager.clients[0]["ctf"].get() event = ServerSentEvent(**event) assert event.data == saved_data def test_event_endpoint_is_event_stream(): """Test that the /events endpoint is text/event-stream""" app = create_ctfd() with patch.object(Queue, "get") as fake_queue: saved_data = { "user_id": None, "title": "asdf", "content": "asdf", "team_id": None, "user": None, "team": None, "date": "2019-01-28T01:20:46.017649+00:00", "id": 10, } saved_event = {"type": "notification", "data": saved_data} fake_queue.return_value = saved_event with app.app_context(): register_user(app) with login_as_user(app) as client: r = client.get("/events") assert "text/event-stream" in r.headers["Content-Type"] destroy_ctfd(app) def test_redis_event_manager_installed(): """Test that RedisEventManager is installed on the Flask app""" class RedisConfig(TestingConfig): REDIS_URL = "redis://localhost:6379/1" CACHE_REDIS_URL = "redis://localhost:6379/1" CACHE_TYPE = "redis" try: app = create_ctfd(config=RedisConfig) except ConnectionError: print("Failed to connect to redis. Skipping test.") else: with app.app_context(): assert isinstance(app.events_manager, RedisEventManager) destroy_ctfd(app) def test_redis_event_manager_subscription(): """Test that RedisEventManager subscribing works.""" class RedisConfig(TestingConfig): REDIS_URL = "redis://localhost:6379/2" CACHE_REDIS_URL = "redis://localhost:6379/2" CACHE_TYPE = "redis" try: app = create_ctfd(config=RedisConfig) except ConnectionError: print("Failed to connect to redis. Skipping test.") else: with app.app_context(): saved_data = { u"data": { u"content": u"asdf", u"date": u"2019-01-28T05:02:19.830906+00:00", u"id": 13, u"team": None, u"team_id": None, u"title": u"asdf", u"user": None, u"user_id": None, }, u"type": u"notification", } saved_event = { "pattern": None, "type": "message", "channel": "ctf", "data": json.dumps(saved_data), } with patch.object(redis.client.PubSub, "listen") as fake_pubsub_listen: fake_pubsub_listen.return_value = [saved_event] event_manager = RedisEventManager() for message in event_manager.subscribe(): assert isinstance(message, ServerSentEvent) assert message.to_dict() == saved_data assert message.__str__().startswith("event:notification\ndata:") break destroy_ctfd(app) def test_redis_event_manager_publish(): """Test that RedisEventManager publishing to clients works.""" class RedisConfig(TestingConfig): REDIS_URL = "redis://localhost:6379/3" CACHE_REDIS_URL = "redis://localhost:6379/3" CACHE_TYPE = "redis" try: app = create_ctfd(config=RedisConfig) except ConnectionError: print("Failed to connect to redis. Skipping test.") else: with app.app_context(): saved_data = { "user_id": None, "title": "asdf", "content": "asdf", "team_id": None, "user": None, "team": None, "date": "2019-01-28T01:20:46.017649+00:00", "id": 10, } event_manager = RedisEventManager() event_manager.publish(data=saved_data, type="notification", channel="ctf") destroy_ctfd(app)