From ac20a7530a37a10d1d28f5f3ca42df078c912334 Mon Sep 17 00:00:00 2001 From: Jacob Plaster Date: Fri, 19 Jul 2019 14:32:56 +0700 Subject: [PATCH] websocket: add subscriptions to status channel functionality --- bfxapi/models/subscription.py | 6 +-- bfxapi/websockets/bfx_websocket.py | 47 +++++++++++++++++++++++ bfxapi/websockets/subscription_manager.py | 4 +- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/bfxapi/models/subscription.py b/bfxapi/models/subscription.py index 19b5158..3db29de 100644 --- a/bfxapi/models/subscription.py +++ b/bfxapi/models/subscription.py @@ -21,13 +21,13 @@ class Subscription: such as unsibscribe and subscribe. """ - def __init__(self, socket, channel_name, symbol, timeframe=None, **kwargs): + def __init__(self, socket, channel_name, symbol, key=None, timeframe=None, **kwargs): self.socket = socket self.channel_name = channel_name self.symbol = symbol self.timeframe = timeframe self.is_subscribed_bool = False - self.key = None + self.key = key self.chan_id = None if timeframe: self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) @@ -79,7 +79,7 @@ class Subscription: def _generate_payload(self, **kwargs): payload = {'event': 'subscribe', 'channel': self.channel_name, 'symbol': self.symbol} - if self.timeframe: + if self.timeframe or self.key: payload['key'] = self.key payload.update(**kwargs) return payload diff --git a/bfxapi/websockets/bfx_websocket.py b/bfxapi/websockets/bfx_websocket.py index 9bf2edd..f1e548f 100644 --- a/bfxapi/websockets/bfx_websocket.py +++ b/bfxapi/websockets/bfx_websocket.py @@ -65,6 +65,23 @@ def _parse_trade(tData, symbol): 'symbol': symbol } +def _parse_deriv_status_update(sData, symbol): + return { + 'symbol': symbol, + 'status_type': 'deriv', + 'mts': sData[0], + # placeholder + 'deriv_price': sData[2], + 'spot_price': sData[3], + # placeholder + 'insurance_fund_balance': sData[5], + # placeholder + # placeholder + 'funding_accrued': sData[8], + 'funding_step': sData[9], + # placeholder + } + class BfxWebsocket(GenericWebsocket): """ @@ -167,6 +184,8 @@ class BfxWebsocket(GenericWebsocket): await self._order_book_handler(data, raw_message_str) if subscription.channel_name == 'trades': await self._trade_handler(data) + if subscription.channel_name == 'status': + await self._status_handler(data) else: self.logger.warn( "Unknown data event: '{}' {}".format(dataEvent, data)) @@ -293,6 +312,18 @@ class BfxWebsocket(GenericWebsocket): self._emit('funding_credit_snapshot', data[2]) self.logger.info("Funding credit snapshot: {}".format(data)) + async def _status_handler(self, data): + sub = self.subscriptionManager.get(data[0]) + symbol = sub.symbol + status_type = sub.key.split(":")[0] + rstatus = data[1] + if status_type == "deriv": + status = _parse_deriv_status_update(rstatus, symbol) + if status: + self._emit('status_update', status) + else: + self.logger.warn('Unknown status data type: {}'.format(data)) + async def _trade_handler(self, data): symbol = self.subscriptionManager.get(data[0]).symbol if type(data[1]) is list: @@ -439,6 +470,22 @@ class BfxWebsocket(GenericWebsocket): if socket.isConnected: await socket.ws.send(json.dumps(payload)) + async def subscribe_order_book(self, symbol): + return await self.subscribe('book', symbol) + + async def subscribe_candles(self, symbol, timeframe): + return await self.subscribe('candles', symbol, timeframe=timeframe) + + async def subscribe_trades(self, symbol): + return await self.subscribe('trades', symbol) + + async def subscribe_ticker(self, symbol): + return await self.subscribe('ticker', symbol) + + async def subscribe_derivative_status(self, symbol): + key = 'deriv:{}'.format(symbol) + return await self.subscribe('status', symbol, key=key) + async def subscribe(self, *args, **kwargs): return await self.subscriptionManager.subscribe(*args, **kwargs) diff --git a/bfxapi/websockets/subscription_manager.py b/bfxapi/websockets/subscription_manager.py index 542a941..0a342f9 100644 --- a/bfxapi/websockets/subscription_manager.py +++ b/bfxapi/websockets/subscription_manager.py @@ -32,7 +32,7 @@ class SubscriptionManager: count += 1 return count - async def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): + async def subscribe(self, channel_name, symbol, key=None, timeframe=None, **kwargs): """ Subscribe to a new channel @@ -51,7 +51,7 @@ class SubscriptionManager: socket = self.bfxapi.get_most_available_socket() # create a new subscription subscription = Subscription( - socket, channel_name, symbol, timeframe, **kwargs) + socket, channel_name, symbol, key, timeframe, **kwargs) self.logger.info("Subscribing to channel {}".format(channel_name)) self.pending_subscriptions[subscription.get_key()] = subscription