Files
CTFd/tests/utils/test_events.py
Kevin Chung e0290cc67b Notifications improvements (#2166)
* Improve event `ping`s to actually include data so that they show up in devtools
* Improve Event publishers to take an `id` parameter that is sent to the browser
* Add a `since_id` parameter to `GET /api/v1/notifications` to get Notifications that have happened since a specific ID
* Add `HEAD /api/v1/notifications` to get a count of notifications that have happened. This also includes a `since_id` parameter to allow for a notification cursor.
2022-08-15 17:35:01 -04:00

251 lines
8.2 KiB
Python

from collections import defaultdict
from queue import Queue
from unittest.mock import patch
from redis.exceptions import ConnectionError
from CTFd.config import TestingConfig
from CTFd.utils.events import EventManager, RedisEventManager, ServerSentEvent
from tests.helpers import create_ctfd, destroy_ctfd, login_as_user, register_user
def test_event_manager_installed():
"""Test that EventManager is installed on the Flask app"""
app = create_ctfd()
assert type(app.events_manager) == EventManager
destroy_ctfd(app)
def test_event_manager_subscription():
"""Test that EventManager subscribing works"""
with patch.object(Queue, "get") as fake_queue:
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
saved_event = {"type": "notification", "data": saved_data}
fake_queue.return_value = saved_event
event_manager = EventManager()
events = event_manager.subscribe()
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == {"data": "ping", "type": "ping"}
assert message.__str__().startswith("event:ping")
assert len(event_manager.clients) == 1
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == saved_event
assert message.__str__().startswith("event:notification\ndata:")
assert len(event_manager.clients) == 1
def test_event_manager_publish():
"""Test that EventManager publishing to clients works"""
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
event_manager = EventManager()
q = defaultdict(Queue)
event_manager.clients[id(q)] = q
event_manager.publish(data=saved_data, type="notification", channel="ctf")
event = event_manager.clients[id(q)]["ctf"].get()
event = ServerSentEvent(**event)
assert event.data == saved_data
def test_event_endpoint_is_event_stream():
"""Test that the /events endpoint is text/event-stream"""
app = create_ctfd()
with patch.object(Queue, "get") as fake_queue:
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
saved_event = {"type": "notification", "data": saved_data}
fake_queue.return_value = saved_event
with app.app_context():
register_user(app)
with login_as_user(app) as client:
r = client.get("/events")
assert "text/event-stream" in r.headers["Content-Type"]
destroy_ctfd(app)
def test_redis_event_manager_installed():
"""Test that RedisEventManager is installed on the Flask app"""
class RedisConfig(TestingConfig):
REDIS_URL = "redis://localhost:6379/1"
CACHE_REDIS_URL = "redis://localhost:6379/1"
CACHE_TYPE = "redis"
try:
app = create_ctfd(config=RedisConfig)
except ConnectionError:
print("Failed to connect to redis. Skipping test.")
else:
with app.app_context():
assert isinstance(app.events_manager, RedisEventManager)
destroy_ctfd(app)
def test_redis_event_manager_subscription():
"""Test that RedisEventManager subscribing works."""
class RedisConfig(TestingConfig):
REDIS_URL = "redis://localhost:6379/2"
CACHE_REDIS_URL = "redis://localhost:6379/2"
CACHE_TYPE = "redis"
try:
app = create_ctfd(config=RedisConfig)
except ConnectionError:
print("Failed to connect to redis. Skipping test.")
else:
with app.app_context():
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
saved_event = {"type": "notification", "data": saved_data}
with patch.object(Queue, "get") as fake_queue:
fake_queue.return_value = saved_event
event_manager = RedisEventManager()
events = event_manager.subscribe()
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == {"data": "ping", "type": "ping"}
assert message.__str__().startswith("event:ping")
message = next(events)
assert isinstance(message, ServerSentEvent)
assert message.to_dict() == saved_event
assert message.__str__().startswith("event:notification\ndata:")
destroy_ctfd(app)
def test_redis_event_manager_publish():
"""Test that RedisEventManager publishing to clients works."""
class RedisConfig(TestingConfig):
REDIS_URL = "redis://localhost:6379/3"
CACHE_REDIS_URL = "redis://localhost:6379/3"
CACHE_TYPE = "redis"
try:
app = create_ctfd(config=RedisConfig)
except ConnectionError:
print("Failed to connect to redis. Skipping test.")
else:
with app.app_context():
saved_data = {
"user_id": None,
"title": "asdf",
"content": "asdf",
"team_id": None,
"user": None,
"team": None,
"date": "2019-01-28T01:20:46.017649+00:00",
"id": 10,
}
event_manager = RedisEventManager()
event_manager.publish(data=saved_data, type="notification", channel="ctf")
destroy_ctfd(app)
def test_redis_event_manager_listen():
"""Test that RedisEventManager listening pubsub works."""
# This test is nob currently working properly
# This test is sort of incomplete b/c we aren't also subscribing
# I wasnt able to get listening and subscribing to work at the same time
# But the code does work under gunicorn and serve.py
try:
# import importlib
# from gevent.monkey import patch_time, patch_socket
# from gevent import Timeout
# patch_time()
# patch_socket()
class RedisConfig(TestingConfig):
REDIS_URL = "redis://localhost:6379/4"
CACHE_REDIS_URL = "redis://localhost:6379/4"
CACHE_TYPE = "redis"
try:
app = create_ctfd(config=RedisConfig)
except ConnectionError:
print("Failed to connect to redis. Skipping test.")
else:
with app.app_context():
# saved_event = {
# "data": {
# "team_id": None,
# "user_id": None,
# "content": "asdf",
# "title": "asdf",
# "id": 1,
# "team": None,
# "user": None,
# "date": "2020-08-31T23:57:27.193081+00:00",
# "type": "toast",
# "sound": None,
# },
# "type": "notification",
# }
event_manager = RedisEventManager()
# def disable_retry(f, *args, **kwargs):
# return f()
# with patch("tenacity.retry", side_effect=disable_retry):
# with Timeout(10):
# event_manager.listen()
event_manager.listen()
# event_manager.publish(
# data=saved_event["data"], type="notification", channel="ctf"
# )
destroy_ctfd(app)
finally:
pass
# import socket
# import time
# importlib.reload(socket)
# importlib.reload(time)