General fixups

This commit is contained in:
Jacob Plaster
2018-12-14 16:02:33 +00:00
parent c1aea594a8
commit 1cb33692a0
4 changed files with 39 additions and 10 deletions

View File

@@ -148,6 +148,8 @@ class BfxWebsocket(GenericWebsocket):
await self._candle_handler(data)
if subscription.channel_name == 'book':
await self._order_book_handler(data)
if subscription.channel_name == 'trades':
await self._trade_handler(data)
else:
self.logger.warn("Unknown data event: '{}' {}".format(dataEvent, data))
@@ -228,7 +230,7 @@ class BfxWebsocket(GenericWebsocket):
notificationType = nInfo[6]
notificationText = nInfo[7]
if notificationType == 'ERROR':
self._emit('error', notificationText)
# self._emit('error', notificationText)
self.logger.error("Notification ERROR: {}".format(notificationText))
else:
self.logger.info("Notification SUCCESS: {}".format(notificationText))
@@ -279,14 +281,11 @@ class BfxWebsocket(GenericWebsocket):
for t in data:
trade = {
'mts': t[1],
'price': t[2],
'amount': t[3],
'amount': t[2],
'price': t[3],
'symbol': symbol
}
self._emit('seed_trade', trade)
else:
tradeObj = _parse_trade_snapshot_item(data, symbol)
self._emit('new_trade', tradeObj)
async def _candle_handler(self, data):
subscription = self.subscriptionManager.get(data[0])
@@ -321,6 +320,9 @@ class BfxWebsocket(GenericWebsocket):
# re-build orderbook with snapshot
await self.subscriptionManager.resubscribe(chanId)
return
if obInfo == []:
self.orderBooks[symbol] = OrderBook()
return
isSnapshot = type(obInfo[0]) is list
if isSnapshot:
self.orderBooks[symbol] = OrderBook()
@@ -370,6 +372,9 @@ class BfxWebsocket(GenericWebsocket):
}
await self.ws.send(json.dumps(payload))
def get_orderbook(self, symbol):
return self.orderBooks.get(symbol, None)
async def subscribe(self, *args, **kwargs):
return await self.subscriptionManager.subscribe(*args, **kwargs)

View File

@@ -26,6 +26,9 @@ class GenericWebsocket(object):
def run(self):
self.loop.run_until_complete(self._main(self.host))
def get_task_executable(self):
return self._main(self.host)
async def _main(self, host):
async with websockets.connect(host) as websocket:
self.ws = websocket
@@ -35,6 +38,9 @@ class GenericWebsocket(object):
message = await websocket.recv()
await self.on_message(message)
def remove_all_listeners(self, event):
self.events.remove_all_listeners(event)
def on(self, event, func=None):
if not func:
return self.events.on(event)