Add bfxapi/websocket/errors.py script. Add __require_websocket_connection decorator inside BfxWebsocketClient class. Implement unsubscribe and clear methods.

This commit is contained in:
Davide Casale
2022-11-08 17:46:45 +01:00
parent 5dfe3d6b13
commit a37b7dda33
3 changed files with 37 additions and 9 deletions

View File

@@ -3,8 +3,8 @@ import json, asyncio, websockets
from pyee.asyncio import AsyncIOEventEmitter from pyee.asyncio import AsyncIOEventEmitter
from .manager import Manager from .manager import Manager
from .channels import Channels from .channels import Channels
from .errors import ConnectionNotOpen
class BfxWebsocketClient(object): class BfxWebsocketClient(object):
def __init__(self, host, channels=None): def __init__(self, host, channels=None):
@@ -34,23 +34,44 @@ class BfxWebsocketClient(object):
self.chanIds[message["chanId"]] = message self.chanIds[message["chanId"]] = message
self.event_emitter.emit("subscribed", message) self.event_emitter.emit("subscribed", message)
if isinstance(message, list): elif isinstance(message, dict) and message["event"] == "unsubscribed":
if message["status"] == "OK":
del self.chanIds[message["chanId"]]
elif isinstance(message, list):
chanId, parameters = message[0], message[1:] chanId, parameters = message[0], message[1:]
subscription = self.chanIds[chanId] self.manager.handle(self.chanIds[chanId], *parameters)
self.manager.handle(subscription, *parameters)
except websockets.ConnectionClosed: except websockets.ConnectionClosed:
continue continue
async def subscribe(self, channel, **kwargs): def __require_websocket_connection(function):
if self.websocket == None: async def wrapper(self, *args, **kwargs):
return self.channels.append((channel, kwargs)) if self.websocket == None or self.websocket.open == False:
raise ConnectionNotOpen("No open connection with the server.")
await function(self, *args, **kwargs)
return wrapper
@__require_websocket_connection
async def subscribe(self, channel, **kwargs):
await self.websocket.send(json.dumps({ await self.websocket.send(json.dumps({
"event": "subscribe", "event": "subscribe",
"channel": channel, "channel": channel,
**kwargs **kwargs
})) }))
@__require_websocket_connection
async def unsubscribe(self, chanId):
await self.websocket.send(json.dumps({
"event": "unsubscribe",
"chanId": chanId
}))
async def clear(self):
for chanId in self.chanIds.keys():
await self.unsubscribe(chanId)
def on(self, event): def on(self, event):
def handler(function): def handler(function):
self.event_emitter.on(event, function) self.event_emitter.on(event, function)

View File

@@ -1,3 +1,3 @@
from .BfxWebsocketClient import BfxWebsocketClient from .BfxWebsocketClient import BfxWebsocketClient
from .channels import Channels from .channels import Channels
from .errors import ConnectionNotOpen

View File

@@ -0,0 +1,7 @@
class ConnectionNotOpen(Exception):
"""
This error indicates an attempt to communicate via websocket before starting the connection with the servers.
"""
pass