diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index 587576d..5d5d033 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -2,13 +2,15 @@ import json, asyncio, websockets from pyee.asyncio import AsyncIOEventEmitter +from .manager import Manager + from .channels import Channels class BfxWebsocketClient(object): def __init__(self, host, channels=None): - self.host = host + self.host, self.chanIds, self.event_emitter = host, dict(), AsyncIOEventEmitter() - self.chanIds, self.event_emitter = dict(), AsyncIOEventEmitter() + self.manager, self.websocket = Manager(event_emitter=self.event_emitter), None self.channels = channels or list() @@ -28,33 +30,21 @@ class BfxWebsocketClient(object): message = json.loads(message) if isinstance(message, dict) and message["event"] == "subscribed": + del message["event"] self.chanIds[message["chanId"]] = message - self.event_emitter.emit("subscribed", message) if isinstance(message, list): chanId, parameters = message[0], message[1:] - subscription = self.chanIds[chanId] - - if subscription["channel"] == Channels.TICKER: - self.event_emitter.emit("ticker", subscription, parameters[0]) - - if subscription["channel"] == Channels.TRADES: - if len(parameters) == 1: - self.event_emitter.emit("trades_snapshot", subscription, parameters[0]) - - if len(parameters) == 2: - self.event_emitter.emit("trades_update", subscription, parameters[0], parameters[1]) - - if subscription["channel"] == Channels.BOOK: - if all(isinstance(element, list) for element in parameters[0]): - self.event_emitter.emit("book_snapshot", subscription, parameters[0]) - else: self.event_emitter.emit("book_update", subscription, parameters[0]) + self.manager.handle(subscription, *parameters) except websockets.ConnectionClosed: continue async def subscribe(self, channel, **kwargs): + if self.websocket == None: + return self.channels.append((channel, kwargs)) + await self.websocket.send(json.dumps({ "event": "subscribe", "channel": channel, @@ -65,4 +55,4 @@ class BfxWebsocketClient(object): def handler(function): self.event_emitter.on(event, function) - return handler + return handler \ No newline at end of file diff --git a/bfxapi/websocket/manager.py b/bfxapi/websocket/manager.py new file mode 100644 index 0000000..3e3e9cd --- /dev/null +++ b/bfxapi/websocket/manager.py @@ -0,0 +1,29 @@ +from .channels import Channels + +class Manager(object): + def __init__(self, event_emitter): + self.event_emitter = event_emitter + + self.__handlers = { + Channels.TICKER: self.__ticker_channel_handler, + Channels.TRADES: self.__trades_channel_handler, + Channels.BOOK: self.__book_channel_handler + } + + def handle(self, subscription, *parameters): + return self.__handlers[subscription["channel"]](subscription, *parameters) + + def __ticker_channel_handler(self, subscription, *parameters): + self.event_emitter.emit("ticker", subscription, parameters[0]) + + def __trades_channel_handler(self, subscription, *parameters): + if len(parameters) == 1: + self.event_emitter.emit("trades_snapshot", subscription, parameters[0]) + + if len(parameters) == 2: + self.event_emitter.emit("trades_update", subscription, parameters[0], parameters[1]) + + def __book_channel_handler(self, subscription, *parameters): + if all(isinstance(element, list) for element in parameters[0]): + self.event_emitter.emit("book_snapshot", subscription, parameters[0]) + else: self.event_emitter.emit("book_update", subscription, parameters[0]) \ No newline at end of file