Adds subscription manager to handle channels

This commit is contained in:
Jacob Plaster
2018-11-27 15:12:09 +00:00
parent fa9aed0a4e
commit 0add7426af
4 changed files with 196 additions and 47 deletions

View File

@@ -38,6 +38,8 @@ The websocket exposes a collection of events that are triggered when certain dat
- `funding_info_updates` (array): new funding information has been broadcasted - `funding_info_updates` (array): new funding information has been broadcasted
- `order_book_snapshot` (array): initial snapshot of the order book on connection - `order_book_snapshot` (array): initial snapshot of the order book on connection
- `order_book_update` (array): a new order has been placed into the ordebrook - `order_book_update` (array): a new order has been placed into the ordebrook
- `subscribed` (Subscription): a new channel has been subscribed to
- `unsubscribed` (Subscription): a channel has been un-subscribed
For example. If you wanted to subscribe to all of the trades on the `tBTCUSD` market, then you can simply listen to the `new_trade` event. For Example: For example. If you wanted to subscribe to all of the trades on the `tBTCUSD` market, then you can simply listen to the `new_trade` event. For Example:

View File

@@ -6,6 +6,7 @@ import hmac
import random import random
from .GenericWebsocket import GenericWebsocket, AuthError from .GenericWebsocket import GenericWebsocket, AuthError
from .SubscriptionManager import SubscriptionManager
from ..models import Order, Trade, OrderBook from ..models import Order, Trade, OrderBook
class Flags: class Flags:
@@ -133,7 +134,6 @@ class BfxWebsocket(GenericWebsocket):
def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2', def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2',
onSeedCandleHook=None, onSeedTradeHook=None, manageOrderBooks=False, *args, **kwargs): onSeedCandleHook=None, onSeedTradeHook=None, manageOrderBooks=False, *args, **kwargs):
self.channels = {}
self.API_KEY=API_KEY self.API_KEY=API_KEY
self.API_SECRET=API_SECRET self.API_SECRET=API_SECRET
self.manageOrderBooks = manageOrderBooks self.manageOrderBooks = manageOrderBooks
@@ -141,6 +141,7 @@ class BfxWebsocket(GenericWebsocket):
self.orderBooks = {} self.orderBooks = {}
super(BfxWebsocket, self).__init__(host, *args, **kwargs) super(BfxWebsocket, self).__init__(host, *args, **kwargs)
self.subscriptionManager = SubscriptionManager(self)
self._WS_DATA_HANDLERS = { self._WS_DATA_HANDLERS = {
'tu': self._trade_update_handler, 'tu': self._trade_update_handler,
@@ -165,6 +166,7 @@ class BfxWebsocket(GenericWebsocket):
self._WS_SYSTEM_HANDLERS = { self._WS_SYSTEM_HANDLERS = {
'info': self._system_info_handler, 'info': self._system_info_handler,
'subscribed': self._system_subscribed_handler, 'subscribed': self._system_subscribed_handler,
'unsubscribed': self._system_unsubscribe_handler,
'error': self._system_error_handler, 'error': self._system_error_handler,
'auth': self._system_auth_handler, 'auth': self._system_auth_handler,
'conf': self._system_conf_handler 'conf': self._system_conf_handler
@@ -183,14 +185,15 @@ class BfxWebsocket(GenericWebsocket):
if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS: if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS:
return await self._WS_DATA_HANDLERS[dataEvent](data) return await self._WS_DATA_HANDLERS[dataEvent](data)
elif chanId in self.channels: elif self.subscriptionManager.is_subscribed(chanId):
subscription = self.subscriptionManager.get(chanId)
# candles do not have an event # candles do not have an event
if self.channels[chanId].get('channel') == 'candles': if subscription.channel_name == 'candles':
await self._candle_handler(data) await self._candle_handler(data)
if self.channels[chanId].get('channel') == 'book': if subscription.channel_name == 'book':
await self._order_book_handler(data) await self._order_book_handler(data)
else: else:
self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data)) self.logger.warn("Unknown data event: '{}' {}".format(dataEvent, data))
async def _system_info_handler(self, data): async def _system_info_handler(self, data):
self.logger.info(data) self.logger.info(data)
@@ -211,18 +214,10 @@ class BfxWebsocket(GenericWebsocket):
self.logger.error("Unable to enable config flag {}".format(flagString)) self.logger.error("Unable to enable config flag {}".format(flagString))
async def _system_subscribed_handler(self, data): async def _system_subscribed_handler(self, data):
chanEvent = data.get('channel') await self.subscriptionManager.confirm_subscription(data)
self.logger.info("Subscribed to channel '{}'".format(chanEvent))
## add channel to known list
chanId = data.get('chanId')
## if is a candles subscribption, then get the symbol
## from the key
if data.get('key'):
kd = data.get('key').split(':')
data['tf'] = kd[1]
data['symbol'] = kd[2]
self.channels[chanId] = data async def _system_unsubscribe_handler(self, data):
await self.subscriptionManager.confirm_unsubscribe(data)
async def _system_error_handler(self, data): async def _system_error_handler(self, data):
self._emit('error', data) self._emit('error', data)
@@ -237,17 +232,17 @@ class BfxWebsocket(GenericWebsocket):
async def _trade_update_handler(self, data): async def _trade_update_handler(self, data):
tData = data[2] tData = data[2]
# [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]] # [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]]
if data[0] in self.channels: if self.subscriptionManager.is_subscribed(data[0]):
channelData = self.channels[data[0]] symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, channelData.get('symbol')) tradeObj = _parse_trade(tData, symbol)
self._emit('new_trade', tradeObj) self._emit('new_trade', tradeObj)
async def _trade_executed_handler(self, data): async def _trade_executed_handler(self, data):
tData = data[2] tData = data[2]
# [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]]
if data[0] in self.channels: if self.subscriptionManager.is_subscribed(data[0]):
channelData = self.channels[data[0]] symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, channelData.get('symbol')) tradeObj = _parse_trade(tData, symbol)
self._emit('new_trade', tradeObj) self._emit('new_trade', tradeObj)
async def _wallet_update_handler(self, data): async def _wallet_update_handler(self, data):
@@ -362,7 +357,7 @@ class BfxWebsocket(GenericWebsocket):
self.logger.info("Funding credit snapshot: {}".format(data)) self.logger.info("Funding credit snapshot: {}".format(data))
async def _trade_handler(self, data): async def _trade_handler(self, data):
channelData = self.channels[data[0]] symbol = self.subscriptionManager.get(data[0]).symbol
if type(data[1]) is list: if type(data[1]) is list:
data = data[1] data = data[1]
# Process the batch of seed trades on # Process the batch of seed trades on
@@ -373,44 +368,45 @@ class BfxWebsocket(GenericWebsocket):
'mts': t[1], 'mts': t[1],
'price': t[2], 'price': t[2],
'amount': t[3], 'amount': t[3],
'symbol': channelData['symbol'] 'symbol': symbol
} }
self._emit('seed_trade', trade) self._emit('seed_trade', trade)
else: else:
tradeObj = _parse_trade_snapshot_item(data, channelData['symbol']) tradeObj = _parse_trade_snapshot_item(data, symbol)
self._emit('new_trade', tradeObj) self._emit('new_trade', tradeObj)
async def _candle_handler(self, data): async def _candle_handler(self, data):
chanId = data[0] subscription = self.subscriptionManager.get(data[0])
channelData = self.channels[chanId]
if type(data[1][0]) is list: if type(data[1][0]) is list:
# Process the batch of seed candles on # Process the batch of seed candles on
# websocket subscription # websocket subscription
candlesSnapshot = data[1] candlesSnapshot = data[1]
candlesSnapshot.reverse() candlesSnapshot.reverse()
for c in candlesSnapshot: for c in candlesSnapshot:
candle = _parse_candle(c, channelData['symbol'], channelData['tf']) candle = _parse_candle(c, subscription.symbol, subscription.timeframe)
self._emit('seed_candle', candle) self._emit('seed_candle', candle)
else: else:
candle = _parse_candle(data[1], channelData['symbol'], channelData['tf']) candle = _parse_candle(data[1], subscription.symbol, subscription.timeframe)
self._emit('new_candle', candle) self._emit('new_candle', candle)
async def _order_book_handler(self, data): async def _order_book_handler(self, data):
obInfo = data[1] obInfo = data[1]
channelData = self.channels[data[0]] chanId = data[0]
symbol = channelData.get('symbol') subscription = self.subscriptionManager.get(data[0])
symbol = subscription.symbol
if data[1] == "cs": if data[1] == "cs":
dChecksum = data[2] & 0xffffffff # force to signed int dChecksum = data[2] & 0xffffffff # force to signed int
checksum = self.orderBooks[symbol].checksum() checksum = self.orderBooks[symbol].checksum()
# force checksums to signed integers # force checksums to signed integers
isValid = (dChecksum) == (checksum) isValid = (dChecksum) != (checksum)
if isValid: if isValid:
self.logger.debug("Checksum orderbook validation for '{}' successful." self.logger.debug("Checksum orderbook validation for '{}' successful."
.format(symbol)) .format(symbol))
else: else:
# TODO: resync with snapshot self.logger.warn("Checksum orderbook invalid for '{}'. Resetting subscription."
self.logger.warn("Checksum orderbook invalid for '{}'. Orderbook out of syc."
.format(symbol)) .format(symbol))
# re-build orderbook with snapshot
await self.subscriptionManager.resubscribe(chanId)
return return
isSnapshot = type(obInfo[0]) is list isSnapshot = type(obInfo[0]) is list
if isSnapshot: if isSnapshot:
@@ -459,7 +455,7 @@ class BfxWebsocket(GenericWebsocket):
if self.manageOrderBooks: if self.manageOrderBooks:
await self.enable_flag(Flags.CHECKSUM) await self.enable_flag(Flags.CHECKSUM)
async def send_auth_command(self, channel_name, data): async def _send_auth_command(self, channel_name, data):
payload = [0, channel_name, None, data] payload = [0, channel_name, None, data]
await self.ws.send(json.dumps(payload)) await self.ws.send(json.dumps(payload))
@@ -470,14 +466,20 @@ class BfxWebsocket(GenericWebsocket):
} }
await self.ws.send(json.dumps(payload)) await self.ws.send(json.dumps(payload))
def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): async def subscribe(self, *args, **kwargs):
q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol} await self.subscriptionManager.subscribe(*args, **kwargs)
if timeframe:
q['key'] = 'trade:{}:{}'.format(timeframe, symbol) async def unsubscribe(self, *args, **kwargs):
q.update(**kwargs) await self.subscriptionManager.unsubscribe(*args, **kwargs)
self.logger.info("Subscribing to channel {}".format(channel_name))
# tmp = self.ws.send(json.dumps(q)) async def resubscribe(self, *args, **kwargs):
asyncio.ensure_future(self.ws.send(json.dumps(q))) await self.subscriptionManager.resubscribe(*args, **kwargs)
async def unsubscribe_all(self, *args, **kwargs):
await self.subscriptionManager.unsubscribe_all(*args, **kwargs)
async def resubscribe_all(self, *args, **kwargs):
await self.subscriptionManager.resubscribe_all(*args, **kwargs)
async def submit_order(self, symbol, price, amount, market_type, async def submit_order(self, symbol, price, amount, market_type,
hidden=False, onComplete=None, onError=None, *args, **kwargs): hidden=False, onComplete=None, onError=None, *args, **kwargs):
@@ -491,7 +493,7 @@ class BfxWebsocket(GenericWebsocket):
"price": str(price) "price": str(price)
} }
self.pendingOrders[order_id] = (onComplete, onError) self.pendingOrders[order_id] = (onComplete, onError)
await self.send_auth_command('on', payload) await self._send_auth_command('on', payload)
self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( self.logger.info("Order cid={} ({} {} @ {}) dispatched".format(
order_id, symbol, amount, price)) order_id, symbol, amount, price))
@@ -514,15 +516,15 @@ class BfxWebsocket(GenericWebsocket):
if time_in_force is not None: if time_in_force is not None:
payload['time_in_force'] = str(time_in_force) payload['time_in_force'] = str(time_in_force)
self.pendingOrders[orderId] = (onComplete, onError) self.pendingOrders[orderId] = (onComplete, onError)
await self.send_auth_command('ou', payload) await self._send_auth_command('ou', payload)
self.logger.info("Update Order order_id={} dispatched".format(orderId)) self.logger.info("Update Order order_id={} dispatched".format(orderId))
async def cancel_order(self, orderId, onComplete=None, onError=None): async def cancel_order(self, orderId, onComplete=None, onError=None):
self.pendingOrders[orderId] = (onComplete, onError) self.pendingOrders[orderId] = (onComplete, onError)
await self.send_auth_command('oc', { 'id': orderId }) await self._send_auth_command('oc', { 'id': orderId })
self.logger.info("Order cancel order_id={} dispatched".format(orderId)) self.logger.info("Order cancel order_id={} dispatched".format(orderId))
async def cancel_order_multi(self, orderIds, onComplete=None, onError=None): async def cancel_order_multi(self, orderIds, onComplete=None, onError=None):
self.pendingOrders[orderIds[0]] = (onComplete, onError) self.pendingOrders[orderIds[0]] = (onComplete, onError)
await self.send_auth_command('oc', { 'id': orderIds }) await self._send_auth_command('oc', { 'id': orderIds })
self.logger.info("Order cancel order_ids={} dispatched".format(orderIds)) self.logger.info("Order cancel order_ids={} dispatched".format(orderIds))

View File

@@ -21,6 +21,7 @@ class GenericWebsocket(object):
self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel)
self.loop = loop or asyncio.get_event_loop() self.loop = loop or asyncio.get_event_loop()
self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop) self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop)
self.ws = None
def run(self): def run(self):
self.loop.run_until_complete(self._main(self.host)) self.loop.run_until_complete(self._main(self.host))

View File

@@ -0,0 +1,144 @@
import json
import asyncio
import time
from ..utils.CustomLogger import CustomLogger
class Subscription:
def __init__(self, ws, channel_name, symbol, timeframe=None, **kwargs):
self.ws = ws
self.channel_name = channel_name
self.symbol = symbol
self.timeframe = timeframe
self.is_subscribed_bool = False
self.key = None
if timeframe:
self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol)
self.sub_id = int(round(time.time() * 1000))
self.send_payload = self._generate_payload(**kwargs)
async def subscribe(self):
await self.ws.send(json.dumps(self.get_send_payload()))
async def unsubscribe(self):
if not self.is_subscribed():
raise Exception("Subscription is not subscribed to websocket")
payload = { 'event': 'unsubscribe', 'chanId': self.chanId }
await self.ws.send(json.dumps(payload))
def confirm_subscription(self, chanId):
self.is_subscribed_bool = True
self.chanId = chanId
def confirm_unsubscribe(self):
self.is_subscribed_bool = False
def is_subscribed(self):
return self.is_subscribed_bool
def _generate_payload(self, **kwargs):
payload = { 'event': 'subscribe', 'channel': self.channel_name, 'symbol': self.symbol }
if self.timeframe:
payload['key'] = self.key
payload.update(**kwargs)
return payload
def get_send_payload(self):
return self.send_payload
class SubscriptionManager:
def __init__(self, bfxapi, logLevel='INFO'):
self.pending_subscriptions = {}
self.subscriptions_chanid = {}
self.subscriptions_subid = {}
self.unsubscribe_callbacks = {}
self.bfxapi = bfxapi
self.logger = CustomLogger('BfxSubscriptionManager', logLevel=logLevel)
async def subscribe(self, channel_name, symbol, timeframe=None, **kwargs):
# create a new subscription
subscription = Subscription(self.bfxapi.ws, channel_name, symbol, timeframe, **kwargs)
self.logger.info("Subscribing to channel {}".format(channel_name))
key = "{}_{}".format(channel_name, subscription.key or symbol)
self.pending_subscriptions[key] = subscription
await subscription.subscribe()
async def confirm_subscription(self, raw_ws_data):
# {"event":"subscribed","channel":"trades","chanId":1,"symbol":"tBTCUSD","pair":"BTCUSD"}
# {"event":"subscribed","channel":"candles","chanId":351,"key":"trade:1m:tBTCUSD"}
# {"event":"subscribed","channel":"book","chanId":4,"symbol":"tBTCUSD","prec":"P0","freq":"F0","len":"25","pair":"BTCUSD"}
symbol = raw_ws_data.get("symbol", None)
channel = raw_ws_data.get("channel")
chanId = raw_ws_data.get("chanId")
key = raw_ws_data.get("key", None)
get_key = "{}_{}".format(channel, key or symbol)
if chanId in self.subscriptions_chanid:
# subscription has already existed in the past
p_sub = self.subscriptions_chanid[chanId]
else:
# has just been created and is pending
p_sub = self.pending_subscriptions[get_key]
# remove from pending list
del self.pending_subscriptions[get_key]
p_sub.confirm_subscription(chanId)
# add to confirmed list
self.subscriptions_chanid[chanId] = p_sub
self.subscriptions_subid[p_sub.sub_id] = p_sub
self.bfxapi._emit('subscribed', p_sub)
async def confirm_unsubscribe(self, raw_ws_data):
chanId = raw_ws_data.get("chanId")
sub = self.subscriptions_chanid[chanId]
sub.confirm_unsubscribe()
self.bfxapi._emit('unsubscribed', sub)
# call onComplete callback if exists
if sub.sub_id in self.unsubscribe_callbacks:
await self.unsubscribe_callbacks[sub.sub_id]()
del self.unsubscribe_callbacks[sub.sub_id]
def get(self, chanId):
return self.subscriptions_chanid[chanId]
async def unsubscribe(self, chanId, onComplete=None):
sub = self.subscriptions_chanid[chanId]
if onComplete:
self.unsubscribe_callbacks[sub.sub_id] = onComplete
if sub.is_subscribed():
await self.subscriptions_chanid[chanId].unsubscribe()
async def resubscribe(self, chanId):
sub = self.subscriptions_chanid[chanId]
async def re_sub():
await sub.subscribe()
if sub.is_subscribed():
# unsubscribe first and call callback to subscribe
await self.unsubscribe(chanId, re_sub)
else:
# already unsibscribed, so just subscribe
await sub.subscribe()
def is_subscribed(self, chanId):
if chanId not in self.subscriptions_chanid:
return False
return self.subscriptions_chanid[chanId].is_subscribed()
async def unsubscribe_all(self):
task_batch = []
for chanId in self.subscriptions_chanid:
sub = self.get(chanId)
if sub.is_subscribed():
task_batch += [
asyncio.ensure_future(self.unsubscribe(chanId))
]
await asyncio.wait(*[ task_batch ])
async def resubscribe_all(self):
task_batch = []
for chanId in self.subscriptions_chanid:
task_batch += [
asyncio.ensure_future(self.resubscribe(chanId))
]
await asyncio.wait(*[ task_batch ])