Rename manager.py to handlers.py. Add code to BfxWebsocketClient.py to handle authenticated channel. Update bfxapi/websocket/__init__.py imports.

This commit is contained in:
Davide Casale
2022-11-09 19:13:48 +01:00
parent 1fc9aacd86
commit 50644e8116
4 changed files with 43 additions and 33 deletions

View File

@@ -2,20 +2,22 @@ import json, asyncio, hmac, hashlib, time, websockets
from pyee.asyncio import AsyncIOEventEmitter from pyee.asyncio import AsyncIOEventEmitter
from .manager import Manager from .handlers import Channels, PublicChannelsHandler, AuthenticatedEventsHandler
from .errors import ConnectionNotOpen, AuthenticationCredentialsError from .errors import ConnectionNotOpen, AuthenticationCredentialsError
HEARTBEAT = "hb"
class BfxWebsocketClient(object): class BfxWebsocketClient(object):
def __init__(self, host, API_KEY=None, API_SECRET=None): def __init__(self, host, API_KEY=None, API_SECRET=None):
self.host, self.chanIds, self.event_emitter = host, dict(), AsyncIOEventEmitter() self.host, self.chanIds, self.event_emitter = host, dict(), AsyncIOEventEmitter()
self.manager, self.websocket = Manager(event_emitter=self.event_emitter), None self.websocket, self.API_KEY, self.API_SECRET = None, API_KEY, API_SECRET
self.API_KEY, self.API_SECRET = API_KEY, API_SECRET self.handlers = {
"public": PublicChannelsHandler(event_emitter=self.event_emitter),
def run_forever(self): "authenticated": AuthenticatedEventsHandler(event_emitter=self.event_emitter)
asyncio.run(self.connect()) }
async def connect(self): async def connect(self):
async for websocket in websockets.connect(self.host): async for websocket in websockets.connect(self.host):
@@ -25,31 +27,26 @@ class BfxWebsocketClient(object):
self.event_emitter.emit("open") self.event_emitter.emit("open")
if self.API_KEY != None and self.API_SECRET != None: if self.API_KEY != None and self.API_SECRET != None:
self.authenticate(self.API_KEY, self.API_SECRET) await self.authenticate(self.API_KEY, self.API_SECRET)
async for message in websocket: async for message in websocket:
message = json.loads(message) message = json.loads(message)
if isinstance(message, dict) and message["event"] == "subscribed": if isinstance(message, dict) and message["event"] == "subscribed":
del message["event"]
self.chanIds[message["chanId"]] = message self.chanIds[message["chanId"]] = message
self.event_emitter.emit("subscribed", message)
self.event_emitter.emit("subscribed", message)
elif isinstance(message, dict) and message["event"] == "unsubscribed": elif isinstance(message, dict) and message["event"] == "unsubscribed":
if message["status"] == "OK": if message["status"] == "OK":
del self.chanIds[message["chanId"]] del self.chanIds[message["chanId"]]
elif isinstance(message, dict) and message["event"] == "auth": elif isinstance(message, dict) and message["event"] == "auth":
if message["status"] == "OK": if message["status"] == "OK":
self.chanIds[message["chanId"]] = message
self.event_emitter.emit("authenticated", message) self.event_emitter.emit("authenticated", message)
else: raise AuthenticationCredentialsError("Cannot authenticate with given API-KEY and API-SECRET.") else: raise AuthenticationCredentialsError("Cannot authenticate with given API-KEY and API-SECRET.")
elif isinstance(message, list) and (chanId := message[0]) and message[1] != HEARTBEAT:
elif isinstance(message, list): if chanId == 0:
chanId, parameters = message[0], message[1:] self.handlers["authenticated"].handle(message[1], *message[2:])
else: self.handlers["public"].handle(self.chanIds[chanId], *message[1:])
self.manager.handle(self.chanIds[chanId], *parameters)
except websockets.ConnectionClosed: except websockets.ConnectionClosed:
continue continue
@@ -109,3 +106,6 @@ class BfxWebsocketClient(object):
self.event_emitter.once(event, function) self.event_emitter.once(event, function)
return handler return handler
def run(self):
asyncio.run(self.connect())

View File

@@ -1,3 +1,3 @@
from .BfxWebsocketClient import BfxWebsocketClient from .BfxWebsocketClient import BfxWebsocketClient
from .channels import Channels from .handlers import Channels
from .errors import ConnectionNotOpen from .errors import ConnectionNotOpen, AuthenticationCredentialsError

View File

@@ -1,8 +0,0 @@
from enum import Enum
class Channels(str, Enum):
TICKER = "ticker"
TRADES = "trades"
BOOK = "book"
CANDLES = "candles"
STATUS = "status"

View File

@@ -1,8 +1,13 @@
from .channels import Channels from enum import Enum
HEARTBEAT = "hb" class Channels(str, Enum):
TICKER = "ticker"
TRADES = "trades"
BOOK = "book"
CANDLES = "candles"
STATUS = "status"
class Manager(object): class PublicChannelsHandler(object):
def __init__(self, event_emitter): def __init__(self, event_emitter):
self.event_emitter = event_emitter self.event_emitter = event_emitter
@@ -15,8 +20,7 @@ class Manager(object):
} }
def handle(self, subscription, *parameters): def handle(self, subscription, *parameters):
if parameters[0] != HEARTBEAT: self.__handlers[subscription["channel"]](subscription, *parameters)
self.__handlers[subscription["channel"]](subscription, *parameters)
def __ticker_channel_handler(self, subscription, *parameters): def __ticker_channel_handler(self, subscription, *parameters):
self.event_emitter.emit("ticker", subscription, parameters[0]) self.event_emitter.emit("ticker", subscription, parameters[0])
@@ -40,3 +44,17 @@ class Manager(object):
def __status_channel_handler(self, subscription, *parameters): def __status_channel_handler(self, subscription, *parameters):
self.event_emitter.emit("status", subscription, parameters[0]) self.event_emitter.emit("status", subscription, parameters[0])
class AuthenticatedEventsHandler(object):
def __init__(self, event_emitter):
self.event_emitter = event_emitter
self.__handlers = {
"bu": self.__bu_event_handler
}
def handle(self, type, *parameters):
self.__handlers[type](*parameters)
def __bu_event_handler(self, AUM, AUM_NET):
self.event_emitter.emit("balance_update", AUM, AUM_NET)