From 50644e811643c8767ab9f211bffa8e716c5a2bac Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Wed, 9 Nov 2022 19:13:48 +0100 Subject: [PATCH] Rename manager.py to handlers.py. Add code to BfxWebsocketClient.py to handle authenticated channel. Update bfxapi/websocket/__init__.py imports. --- bfxapi/websocket/BfxWebsocketClient.py | 36 ++++++++++---------- bfxapi/websocket/__init__.py | 4 +-- bfxapi/websocket/channels.py | 8 ----- bfxapi/websocket/{manager.py => handlers.py} | 28 ++++++++++++--- 4 files changed, 43 insertions(+), 33 deletions(-) delete mode 100644 bfxapi/websocket/channels.py rename bfxapi/websocket/{manager.py => handlers.py} (71%) diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index 3b8ee70..1ac041e 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -2,20 +2,22 @@ import json, asyncio, hmac, hashlib, time, websockets from pyee.asyncio import AsyncIOEventEmitter -from .manager import Manager +from .handlers import Channels, PublicChannelsHandler, AuthenticatedEventsHandler from .errors import ConnectionNotOpen, AuthenticationCredentialsError +HEARTBEAT = "hb" + class BfxWebsocketClient(object): def __init__(self, host, API_KEY=None, API_SECRET=None): self.host, self.chanIds, self.event_emitter = host, dict(), AsyncIOEventEmitter() - self.manager, self.websocket = Manager(event_emitter=self.event_emitter), None + self.websocket, self.API_KEY, self.API_SECRET = None, API_KEY, API_SECRET - self.API_KEY, self.API_SECRET = API_KEY, API_SECRET - - def run_forever(self): - asyncio.run(self.connect()) + self.handlers = { + "public": PublicChannelsHandler(event_emitter=self.event_emitter), + "authenticated": AuthenticatedEventsHandler(event_emitter=self.event_emitter) + } async def connect(self): async for websocket in websockets.connect(self.host): @@ -25,31 +27,26 @@ class BfxWebsocketClient(object): self.event_emitter.emit("open") if self.API_KEY != None and self.API_SECRET != None: - self.authenticate(self.API_KEY, self.API_SECRET) + await self.authenticate(self.API_KEY, self.API_SECRET) async for message in websocket: message = json.loads(message) if isinstance(message, dict) and message["event"] == "subscribed": - del message["event"] self.chanIds[message["chanId"]] = message - self.event_emitter.emit("subscribed", message) + self.event_emitter.emit("subscribed", message) elif isinstance(message, dict) and message["event"] == "unsubscribed": if message["status"] == "OK": del self.chanIds[message["chanId"]] - elif isinstance(message, dict) and message["event"] == "auth": if message["status"] == "OK": - self.chanIds[message["chanId"]] = message - self.event_emitter.emit("authenticated", message) else: raise AuthenticationCredentialsError("Cannot authenticate with given API-KEY and API-SECRET.") - - elif isinstance(message, list): - chanId, parameters = message[0], message[1:] - - self.manager.handle(self.chanIds[chanId], *parameters) + elif isinstance(message, list) and (chanId := message[0]) and message[1] != HEARTBEAT: + if chanId == 0: + self.handlers["authenticated"].handle(message[1], *message[2:]) + else: self.handlers["public"].handle(self.chanIds[chanId], *message[1:]) except websockets.ConnectionClosed: continue @@ -108,4 +105,7 @@ class BfxWebsocketClient(object): def handler(function): self.event_emitter.once(event, function) - return handler \ No newline at end of file + return handler + + def run(self): + asyncio.run(self.connect()) \ No newline at end of file diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py index ed83944..e18ac12 100644 --- a/bfxapi/websocket/__init__.py +++ b/bfxapi/websocket/__init__.py @@ -1,3 +1,3 @@ from .BfxWebsocketClient import BfxWebsocketClient -from .channels import Channels -from .errors import ConnectionNotOpen \ No newline at end of file +from .handlers import Channels +from .errors import ConnectionNotOpen, AuthenticationCredentialsError \ No newline at end of file diff --git a/bfxapi/websocket/channels.py b/bfxapi/websocket/channels.py deleted file mode 100644 index 45b3d8e..0000000 --- a/bfxapi/websocket/channels.py +++ /dev/null @@ -1,8 +0,0 @@ -from enum import Enum - -class Channels(str, Enum): - TICKER = "ticker" - TRADES = "trades" - BOOK = "book" - CANDLES = "candles" - STATUS = "status" \ No newline at end of file diff --git a/bfxapi/websocket/manager.py b/bfxapi/websocket/handlers.py similarity index 71% rename from bfxapi/websocket/manager.py rename to bfxapi/websocket/handlers.py index 6b57549..450b277 100644 --- a/bfxapi/websocket/manager.py +++ b/bfxapi/websocket/handlers.py @@ -1,8 +1,13 @@ -from .channels import Channels +from enum import Enum -HEARTBEAT = "hb" +class Channels(str, Enum): + TICKER = "ticker" + TRADES = "trades" + BOOK = "book" + CANDLES = "candles" + STATUS = "status" -class Manager(object): +class PublicChannelsHandler(object): def __init__(self, event_emitter): self.event_emitter = event_emitter @@ -15,8 +20,7 @@ class Manager(object): } def handle(self, subscription, *parameters): - if parameters[0] != HEARTBEAT: - self.__handlers[subscription["channel"]](subscription, *parameters) + self.__handlers[subscription["channel"]](subscription, *parameters) def __ticker_channel_handler(self, subscription, *parameters): self.event_emitter.emit("ticker", subscription, parameters[0]) @@ -40,3 +44,17 @@ class Manager(object): def __status_channel_handler(self, subscription, *parameters): self.event_emitter.emit("status", subscription, parameters[0]) + +class AuthenticatedEventsHandler(object): + def __init__(self, event_emitter): + self.event_emitter = event_emitter + + self.__handlers = { + "bu": self.__bu_event_handler + } + + def handle(self, type, *parameters): + self.__handlers[type](*parameters) + + def __bu_event_handler(self, AUM, AUM_NET): + self.event_emitter.emit("balance_update", AUM, AUM_NET) \ No newline at end of file