From 0add7426afdbbb326f5cf7c9d2898604b8c26b2f Mon Sep 17 00:00:00 2001 From: Jacob Plaster Date: Tue, 27 Nov 2018 15:12:09 +0000 Subject: [PATCH] Adds subscription manager to handle channels --- README.md | 2 + bfxapi/websockets/BfxWebsocket.py | 96 +++++++-------- bfxapi/websockets/GenericWebsocket.py | 1 + bfxapi/websockets/SubscriptionManager.py | 144 +++++++++++++++++++++++ 4 files changed, 196 insertions(+), 47 deletions(-) create mode 100644 bfxapi/websockets/SubscriptionManager.py diff --git a/README.md b/README.md index d4a48fd..33aea98 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,8 @@ The websocket exposes a collection of events that are triggered when certain dat - `funding_info_updates` (array): new funding information has been broadcasted - `order_book_snapshot` (array): initial snapshot of the order book on connection - `order_book_update` (array): a new order has been placed into the ordebrook +- `subscribed` (Subscription): a new channel has been subscribed to +- `unsubscribed` (Subscription): a channel has been un-subscribed For example. If you wanted to subscribe to all of the trades on the `tBTCUSD` market, then you can simply listen to the `new_trade` event. For Example: diff --git a/bfxapi/websockets/BfxWebsocket.py b/bfxapi/websockets/BfxWebsocket.py index dca1fac..f8cb0a8 100644 --- a/bfxapi/websockets/BfxWebsocket.py +++ b/bfxapi/websockets/BfxWebsocket.py @@ -6,6 +6,7 @@ import hmac import random from .GenericWebsocket import GenericWebsocket, AuthError +from .SubscriptionManager import SubscriptionManager from ..models import Order, Trade, OrderBook class Flags: @@ -133,7 +134,6 @@ class BfxWebsocket(GenericWebsocket): def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2', onSeedCandleHook=None, onSeedTradeHook=None, manageOrderBooks=False, *args, **kwargs): - self.channels = {} self.API_KEY=API_KEY self.API_SECRET=API_SECRET self.manageOrderBooks = manageOrderBooks @@ -141,6 +141,7 @@ class BfxWebsocket(GenericWebsocket): self.orderBooks = {} super(BfxWebsocket, self).__init__(host, *args, **kwargs) + self.subscriptionManager = SubscriptionManager(self) self._WS_DATA_HANDLERS = { 'tu': self._trade_update_handler, @@ -165,6 +166,7 @@ class BfxWebsocket(GenericWebsocket): self._WS_SYSTEM_HANDLERS = { 'info': self._system_info_handler, 'subscribed': self._system_subscribed_handler, + 'unsubscribed': self._system_unsubscribe_handler, 'error': self._system_error_handler, 'auth': self._system_auth_handler, 'conf': self._system_conf_handler @@ -183,14 +185,15 @@ class BfxWebsocket(GenericWebsocket): if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS: return await self._WS_DATA_HANDLERS[dataEvent](data) - elif chanId in self.channels: + elif self.subscriptionManager.is_subscribed(chanId): + subscription = self.subscriptionManager.get(chanId) # candles do not have an event - if self.channels[chanId].get('channel') == 'candles': + if subscription.channel_name == 'candles': await self._candle_handler(data) - if self.channels[chanId].get('channel') == 'book': + if subscription.channel_name == 'book': await self._order_book_handler(data) else: - self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data)) + self.logger.warn("Unknown data event: '{}' {}".format(dataEvent, data)) async def _system_info_handler(self, data): self.logger.info(data) @@ -211,18 +214,10 @@ class BfxWebsocket(GenericWebsocket): self.logger.error("Unable to enable config flag {}".format(flagString)) async def _system_subscribed_handler(self, data): - chanEvent = data.get('channel') - self.logger.info("Subscribed to channel '{}'".format(chanEvent)) - ## add channel to known list - chanId = data.get('chanId') - ## if is a candles subscribption, then get the symbol - ## from the key - if data.get('key'): - kd = data.get('key').split(':') - data['tf'] = kd[1] - data['symbol'] = kd[2] + await self.subscriptionManager.confirm_subscription(data) - self.channels[chanId] = data + async def _system_unsubscribe_handler(self, data): + await self.subscriptionManager.confirm_unsubscribe(data) async def _system_error_handler(self, data): self._emit('error', data) @@ -237,17 +232,17 @@ class BfxWebsocket(GenericWebsocket): async def _trade_update_handler(self, data): tData = data[2] # [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]] - if data[0] in self.channels: - channelData = self.channels[data[0]] - tradeObj = _parse_trade(tData, channelData.get('symbol')) + if self.subscriptionManager.is_subscribed(data[0]): + symbol = self.subscriptionManager.get(data[0]).symbol + tradeObj = _parse_trade(tData, symbol) self._emit('new_trade', tradeObj) async def _trade_executed_handler(self, data): tData = data[2] # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] - if data[0] in self.channels: - channelData = self.channels[data[0]] - tradeObj = _parse_trade(tData, channelData.get('symbol')) + if self.subscriptionManager.is_subscribed(data[0]): + symbol = self.subscriptionManager.get(data[0]).symbol + tradeObj = _parse_trade(tData, symbol) self._emit('new_trade', tradeObj) async def _wallet_update_handler(self, data): @@ -362,7 +357,7 @@ class BfxWebsocket(GenericWebsocket): self.logger.info("Funding credit snapshot: {}".format(data)) async def _trade_handler(self, data): - channelData = self.channels[data[0]] + symbol = self.subscriptionManager.get(data[0]).symbol if type(data[1]) is list: data = data[1] # Process the batch of seed trades on @@ -373,44 +368,45 @@ class BfxWebsocket(GenericWebsocket): 'mts': t[1], 'price': t[2], 'amount': t[3], - 'symbol': channelData['symbol'] + 'symbol': symbol } self._emit('seed_trade', trade) else: - tradeObj = _parse_trade_snapshot_item(data, channelData['symbol']) + tradeObj = _parse_trade_snapshot_item(data, symbol) self._emit('new_trade', tradeObj) async def _candle_handler(self, data): - chanId = data[0] - channelData = self.channels[chanId] + subscription = self.subscriptionManager.get(data[0]) if type(data[1][0]) is list: # Process the batch of seed candles on # websocket subscription candlesSnapshot = data[1] candlesSnapshot.reverse() for c in candlesSnapshot: - candle = _parse_candle(c, channelData['symbol'], channelData['tf']) + candle = _parse_candle(c, subscription.symbol, subscription.timeframe) self._emit('seed_candle', candle) else: - candle = _parse_candle(data[1], channelData['symbol'], channelData['tf']) + candle = _parse_candle(data[1], subscription.symbol, subscription.timeframe) self._emit('new_candle', candle) async def _order_book_handler(self, data): obInfo = data[1] - channelData = self.channels[data[0]] - symbol = channelData.get('symbol') + chanId = data[0] + subscription = self.subscriptionManager.get(data[0]) + symbol = subscription.symbol if data[1] == "cs": dChecksum = data[2] & 0xffffffff # force to signed int checksum = self.orderBooks[symbol].checksum() # force checksums to signed integers - isValid = (dChecksum) == (checksum) + isValid = (dChecksum) != (checksum) if isValid: self.logger.debug("Checksum orderbook validation for '{}' successful." .format(symbol)) else: - # TODO: resync with snapshot - self.logger.warn("Checksum orderbook invalid for '{}'. Orderbook out of syc." + self.logger.warn("Checksum orderbook invalid for '{}'. Resetting subscription." .format(symbol)) + # re-build orderbook with snapshot + await self.subscriptionManager.resubscribe(chanId) return isSnapshot = type(obInfo[0]) is list if isSnapshot: @@ -459,7 +455,7 @@ class BfxWebsocket(GenericWebsocket): if self.manageOrderBooks: await self.enable_flag(Flags.CHECKSUM) - async def send_auth_command(self, channel_name, data): + async def _send_auth_command(self, channel_name, data): payload = [0, channel_name, None, data] await self.ws.send(json.dumps(payload)) @@ -470,14 +466,20 @@ class BfxWebsocket(GenericWebsocket): } await self.ws.send(json.dumps(payload)) - def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): - q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol} - if timeframe: - q['key'] = 'trade:{}:{}'.format(timeframe, symbol) - q.update(**kwargs) - self.logger.info("Subscribing to channel {}".format(channel_name)) - # tmp = self.ws.send(json.dumps(q)) - asyncio.ensure_future(self.ws.send(json.dumps(q))) + async def subscribe(self, *args, **kwargs): + await self.subscriptionManager.subscribe(*args, **kwargs) + + async def unsubscribe(self, *args, **kwargs): + await self.subscriptionManager.unsubscribe(*args, **kwargs) + + async def resubscribe(self, *args, **kwargs): + await self.subscriptionManager.resubscribe(*args, **kwargs) + + async def unsubscribe_all(self, *args, **kwargs): + await self.subscriptionManager.unsubscribe_all(*args, **kwargs) + + async def resubscribe_all(self, *args, **kwargs): + await self.subscriptionManager.resubscribe_all(*args, **kwargs) async def submit_order(self, symbol, price, amount, market_type, hidden=False, onComplete=None, onError=None, *args, **kwargs): @@ -491,7 +493,7 @@ class BfxWebsocket(GenericWebsocket): "price": str(price) } self.pendingOrders[order_id] = (onComplete, onError) - await self.send_auth_command('on', payload) + await self._send_auth_command('on', payload) self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( order_id, symbol, amount, price)) @@ -514,15 +516,15 @@ class BfxWebsocket(GenericWebsocket): if time_in_force is not None: payload['time_in_force'] = str(time_in_force) self.pendingOrders[orderId] = (onComplete, onError) - await self.send_auth_command('ou', payload) + await self._send_auth_command('ou', payload) self.logger.info("Update Order order_id={} dispatched".format(orderId)) async def cancel_order(self, orderId, onComplete=None, onError=None): self.pendingOrders[orderId] = (onComplete, onError) - await self.send_auth_command('oc', { 'id': orderId }) + await self._send_auth_command('oc', { 'id': orderId }) self.logger.info("Order cancel order_id={} dispatched".format(orderId)) async def cancel_order_multi(self, orderIds, onComplete=None, onError=None): self.pendingOrders[orderIds[0]] = (onComplete, onError) - await self.send_auth_command('oc', { 'id': orderIds }) + await self._send_auth_command('oc', { 'id': orderIds }) self.logger.info("Order cancel order_ids={} dispatched".format(orderIds)) diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/GenericWebsocket.py index a09d1e4..b5c30fd 100644 --- a/bfxapi/websockets/GenericWebsocket.py +++ b/bfxapi/websockets/GenericWebsocket.py @@ -21,6 +21,7 @@ class GenericWebsocket(object): self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) self.loop = loop or asyncio.get_event_loop() self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop) + self.ws = None def run(self): self.loop.run_until_complete(self._main(self.host)) diff --git a/bfxapi/websockets/SubscriptionManager.py b/bfxapi/websockets/SubscriptionManager.py new file mode 100644 index 0000000..e8460ff --- /dev/null +++ b/bfxapi/websockets/SubscriptionManager.py @@ -0,0 +1,144 @@ +import json +import asyncio +import time + +from ..utils.CustomLogger import CustomLogger + +class Subscription: + + def __init__(self, ws, channel_name, symbol, timeframe=None, **kwargs): + self.ws = ws + self.channel_name = channel_name + self.symbol = symbol + self.timeframe = timeframe + self.is_subscribed_bool = False + self.key = None + if timeframe: + self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) + self.sub_id = int(round(time.time() * 1000)) + self.send_payload = self._generate_payload(**kwargs) + + async def subscribe(self): + await self.ws.send(json.dumps(self.get_send_payload())) + + async def unsubscribe(self): + if not self.is_subscribed(): + raise Exception("Subscription is not subscribed to websocket") + payload = { 'event': 'unsubscribe', 'chanId': self.chanId } + await self.ws.send(json.dumps(payload)) + + def confirm_subscription(self, chanId): + self.is_subscribed_bool = True + self.chanId = chanId + + def confirm_unsubscribe(self): + self.is_subscribed_bool = False + + def is_subscribed(self): + return self.is_subscribed_bool + + def _generate_payload(self, **kwargs): + payload = { 'event': 'subscribe', 'channel': self.channel_name, 'symbol': self.symbol } + if self.timeframe: + payload['key'] = self.key + payload.update(**kwargs) + return payload + + def get_send_payload(self): + return self.send_payload + +class SubscriptionManager: + + def __init__(self, bfxapi, logLevel='INFO'): + self.pending_subscriptions = {} + self.subscriptions_chanid = {} + self.subscriptions_subid = {} + self.unsubscribe_callbacks = {} + self.bfxapi = bfxapi + self.logger = CustomLogger('BfxSubscriptionManager', logLevel=logLevel) + + async def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): + # create a new subscription + subscription = Subscription(self.bfxapi.ws, channel_name, symbol, timeframe, **kwargs) + self.logger.info("Subscribing to channel {}".format(channel_name)) + key = "{}_{}".format(channel_name, subscription.key or symbol) + self.pending_subscriptions[key] = subscription + await subscription.subscribe() + + async def confirm_subscription(self, raw_ws_data): + # {"event":"subscribed","channel":"trades","chanId":1,"symbol":"tBTCUSD","pair":"BTCUSD"} + # {"event":"subscribed","channel":"candles","chanId":351,"key":"trade:1m:tBTCUSD"} + # {"event":"subscribed","channel":"book","chanId":4,"symbol":"tBTCUSD","prec":"P0","freq":"F0","len":"25","pair":"BTCUSD"} + symbol = raw_ws_data.get("symbol", None) + channel = raw_ws_data.get("channel") + chanId = raw_ws_data.get("chanId") + key = raw_ws_data.get("key", None) + get_key = "{}_{}".format(channel, key or symbol) + + if chanId in self.subscriptions_chanid: + # subscription has already existed in the past + p_sub = self.subscriptions_chanid[chanId] + else: + # has just been created and is pending + p_sub = self.pending_subscriptions[get_key] + # remove from pending list + del self.pending_subscriptions[get_key] + p_sub.confirm_subscription(chanId) + # add to confirmed list + self.subscriptions_chanid[chanId] = p_sub + self.subscriptions_subid[p_sub.sub_id] = p_sub + self.bfxapi._emit('subscribed', p_sub) + + async def confirm_unsubscribe(self, raw_ws_data): + chanId = raw_ws_data.get("chanId") + sub = self.subscriptions_chanid[chanId] + sub.confirm_unsubscribe() + self.bfxapi._emit('unsubscribed', sub) + # call onComplete callback if exists + if sub.sub_id in self.unsubscribe_callbacks: + await self.unsubscribe_callbacks[sub.sub_id]() + del self.unsubscribe_callbacks[sub.sub_id] + + def get(self, chanId): + return self.subscriptions_chanid[chanId] + + async def unsubscribe(self, chanId, onComplete=None): + sub = self.subscriptions_chanid[chanId] + if onComplete: + self.unsubscribe_callbacks[sub.sub_id] = onComplete + if sub.is_subscribed(): + await self.subscriptions_chanid[chanId].unsubscribe() + + async def resubscribe(self, chanId): + sub = self.subscriptions_chanid[chanId] + async def re_sub(): + await sub.subscribe() + if sub.is_subscribed(): + # unsubscribe first and call callback to subscribe + await self.unsubscribe(chanId, re_sub) + else: + # already unsibscribed, so just subscribe + await sub.subscribe() + + def is_subscribed(self, chanId): + if chanId not in self.subscriptions_chanid: + return False + return self.subscriptions_chanid[chanId].is_subscribed() + + async def unsubscribe_all(self): + task_batch = [] + for chanId in self.subscriptions_chanid: + sub = self.get(chanId) + if sub.is_subscribed(): + task_batch += [ + asyncio.ensure_future(self.unsubscribe(chanId)) + ] + await asyncio.wait(*[ task_batch ]) + + async def resubscribe_all(self): + task_batch = [] + for chanId in self.subscriptions_chanid: + task_batch += [ + asyncio.ensure_future(self.resubscribe(chanId)) + ] + await asyncio.wait(*[ task_batch ])