Files
CTFd/tests/utils/test_events.py
Kevin Chung 76e5ad08a8 820 python 3 only (#1454)
* Remove Python 2 specific code
* Require imports to have a proper isort-supported order
* Only test/lint on Python 3
* Bump most dependencies to latest supported version
2020-05-30 02:43:49 -04:00

196 lines
6.3 KiB
Python

import json
from collections import defaultdict
from queue import Queue
from unittest.mock import patch
import redis
from redis.exceptions import ConnectionError
from CTFd.config import TestingConfig
from CTFd.utils.events import EventManager, RedisEventManager, ServerSentEvent
from tests.helpers import create_ctfd, destroy_ctfd, login_as_user, register_user
def test_event_manager_installed():
"""Test that EventManager is installed on the Flask app"""
app = create_ctfd()
assert type(app.events_manager) == EventManager
destroy_ctfd(app)
def test_event_manager_subscription():
"""Test that EventManager subscribing works"""
with patch.object(Queue, "get") as fake_queue:
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
saved_event = {"type": "notification", "data": saved_data}
fake_queue.return_value = saved_event
event_manager = EventManager()
events = event_manager.subscribe()
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == {"data": "", "type": "ping"}
assert message.__str__().startswith("event:ping")
assert len(event_manager.clients) == 1
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == saved_event
assert message.__str__().startswith("event:notification\ndata:")
assert len(event_manager.clients) == 1
def test_event_manager_publish():
"""Test that EventManager publishing to clients works"""
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
event_manager = EventManager()
event_manager.clients.append(defaultdict(Queue))
event_manager.publish(data=saved_data, type="notification", channel="ctf")
event = event_manager.clients[0]["ctf"].get()
event = ServerSentEvent(**event)
assert event.data == saved_data
def test_event_endpoint_is_event_stream():
"""Test that the /events endpoint is text/event-stream"""
app = create_ctfd()
with patch.object(Queue, "get") as fake_queue:
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
saved_event = {"type": "notification", "data": saved_data}
fake_queue.return_value = saved_event
with app.app_context():
register_user(app)
with login_as_user(app) as client:
r = client.get("/events")
assert "text/event-stream" in r.headers["Content-Type"]
destroy_ctfd(app)
def test_redis_event_manager_installed():
"""Test that RedisEventManager is installed on the Flask app"""
class RedisConfig(TestingConfig):
REDIS_URL = "redis://localhost:6379/1"
CACHE_REDIS_URL = "redis://localhost:6379/1"
CACHE_TYPE = "redis"
try:
app = create_ctfd(config=RedisConfig)
except ConnectionError:
print("Failed to connect to redis. Skipping test.")
else:
with app.app_context():
assert isinstance(app.events_manager, RedisEventManager)
destroy_ctfd(app)
def test_redis_event_manager_subscription():
"""Test that RedisEventManager subscribing works."""
class RedisConfig(TestingConfig):
REDIS_URL = "redis://localhost:6379/2"
CACHE_REDIS_URL = "redis://localhost:6379/2"
CACHE_TYPE = "redis"
try:
app = create_ctfd(config=RedisConfig)
except ConnectionError:
print("Failed to connect to redis. Skipping test.")
else:
with app.app_context():
saved_data = {
u"data": {
u"content": u"asdf",
u"date": u"2019-01-28T05:02:19.830906+00:00",
u"id": 13,
u"team": None,
u"team_id": None,
u"title": u"asdf",
u"user": None,
u"user_id": None,
},
u"type": u"notification",
}
saved_event = {
"pattern": None,
"type": "message",
"channel": "ctf",
"data": json.dumps(saved_data),
}
with patch.object(redis.client.PubSub, "listen") as fake_pubsub_listen:
fake_pubsub_listen.return_value = [saved_event]
event_manager = RedisEventManager()
events = event_manager.subscribe()
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == {"data": "", "type": "ping"}
assert message.__str__().startswith("event:ping")
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == saved_data
assert message.__str__().startswith("event:notification\ndata:")
destroy_ctfd(app)
def test_redis_event_manager_publish():
"""Test that RedisEventManager publishing to clients works."""
class RedisConfig(TestingConfig):
REDIS_URL = "redis://localhost:6379/3"
CACHE_REDIS_URL = "redis://localhost:6379/3"
CACHE_TYPE = "redis"
try:
app = create_ctfd(config=RedisConfig)
except ConnectionError:
print("Failed to connect to redis. Skipping test.")
else:
with app.app_context():
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
event_manager = RedisEventManager()
event_manager.publish(data=saved_data, type="notification", channel="ctf")
destroy_ctfd(app)