Remove BfxWebsocketClient::once. Replace all occurrences with BfxWebsocketClient::on. Add BfxWebsocketClient::ONCE_EVENTS class variable.

This commit is contained in:
Davide Casale
2023-04-19 04:20:53 +02:00
parent 3441d2af2f
commit 3d9e7c7b25
4 changed files with 15 additions and 27 deletions

View File

@@ -55,8 +55,10 @@ class BfxWebsocketClient:
MAXIMUM_CONNECTIONS_AMOUNT = 20 MAXIMUM_CONNECTIONS_AMOUNT = 20
ONCE_EVENTS = [ "open", "authenticated", "disconnection" ]
EVENTS = [ EVENTS = [
"open", "subscribed", "authenticated", "disconnection", "wss-error", *ONCE_EVENTS, "subscribed", "wss-error",
*PublicChannelsHandler.EVENTS, *PublicChannelsHandler.EVENTS,
*AuthenticatedChannelsHandler.EVENTS *AuthenticatedChannelsHandler.EVENTS
] ]
@@ -110,9 +112,6 @@ class BfxWebsocketClient:
def _on_wss_timeout(): def _on_wss_timeout():
on_timeout_event.set() on_timeout_event.set()
raise ReconnectionTimeoutError("Connection has been offline for too long " \
f"without being able to reconnect (wss_timeout: {self.wss_timeout}s).")
async def _connection(): async def _connection():
nonlocal reconnection, timer, tasks nonlocal reconnection, timer, tasks
@@ -173,7 +172,8 @@ class BfxWebsocketClient:
await asyncio.sleep(delay.next()) await asyncio.sleep(delay.next())
if on_timeout_event.is_set(): if on_timeout_event.is_set():
break raise ReconnectionTimeoutError("Connection has been offline for too long " \
f"without being able to reconnect (wss_timeout: {self.wss_timeout}s).")
try: try:
await _connection() await _connection()
@@ -259,30 +259,18 @@ class BfxWebsocketClient:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \ raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events print BfxWebsocketClient.EVENTS") "of available events print BfxWebsocketClient.EVENTS")
def _register_event(event, function):
if event in BfxWebsocketClient.ONCE_EVENTS:
self.event_emitter.once(event, function)
else: self.event_emitter.on(event, function)
if callback is not None: if callback is not None:
for event in events: for event in events:
self.event_emitter.on(event, callback) _register_event(event, callback)
if callback is None: if callback is None:
def handler(function): def handler(function):
for event in events: for event in events:
self.event_emitter.on(event, function) _register_event(event, function)
return handler
def once(self, *events, callback = None):
for event in events:
if event not in BfxWebsocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events print BfxWebsocketClient.EVENTS")
if callback is not None:
for event in events:
self.event_emitter.once(event, callback)
if callback is None:
def handler(function):
for event in events:
self.event_emitter.once(event, function)
return handler return handler

View File

@@ -16,7 +16,7 @@ def on_derivatives_status_update(subscription: subscriptions.Status, data: Deriv
def on_wss_error(code: Error, msg: str): def on_wss_error(code: Error, msg: str):
print(code, msg) print(code, msg)
@bfx.wss.once("open") @bfx.wss.on("open")
async def on_open(): async def on_open():
await bfx.wss.subscribe(Channel.STATUS, key="deriv:tBTCF0:USTF0") await bfx.wss.subscribe(Channel.STATUS, key="deriv:tBTCF0:USTF0")

View File

@@ -14,7 +14,7 @@ def on_t_ticker_update(subscription: subscriptions.Ticker, data: TradingPairTick
print(f"Data: {data}") print(f"Data: {data}")
@bfx.wss.once("open") @bfx.wss.on("open")
async def on_open(): async def on_open():
await bfx.wss.subscribe(Channel.TICKER, symbol="tBTCUSD") await bfx.wss.subscribe(Channel.TICKER, symbol="tBTCUSD")

View File

@@ -20,7 +20,7 @@ def on_t_trade_executed(_sub: subscriptions.Trades, trade: TradingPairTrade):
def on_wss_error(code: Error, msg: str): def on_wss_error(code: Error, msg: str):
print(code, msg) print(code, msg)
@bfx.wss.once("open") @bfx.wss.on("open")
async def on_open(): async def on_open():
await bfx.wss.subscribe(Channel.CANDLES, key="trade:1m:tBTCUSD") await bfx.wss.subscribe(Channel.CANDLES, key="trade:1m:tBTCUSD")