Rename AuthenticatedEventsHandler to AuthenticatedChannelsHandler. Rewrite handlers.py gathering events in channels. Update references in bfxapi.

This commit is contained in:
Davide Casale
2022-11-14 18:37:31 +01:00
parent 495b51a2f6
commit 7d6ba4302a
2 changed files with 238 additions and 423 deletions

View File

@@ -2,7 +2,7 @@ import json, asyncio, hmac, hashlib, time, websockets
from pyee.asyncio import AsyncIOEventEmitter from pyee.asyncio import AsyncIOEventEmitter
from .handlers import Channels, PublicChannelsHandler, AuthenticatedEventsHandler from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler
from .errors import BfxWebsocketException, ConnectionNotOpen, InvalidAuthenticationCredentials from .errors import BfxWebsocketException, ConnectionNotOpen, InvalidAuthenticationCredentials
@@ -16,7 +16,7 @@ class BfxWebsocketClient(object):
self.handlers = { self.handlers = {
"public": PublicChannelsHandler(event_emitter=self.event_emitter), "public": PublicChannelsHandler(event_emitter=self.event_emitter),
"authenticated": AuthenticatedEventsHandler(event_emitter=self.event_emitter) "authenticated": AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
} }
async def connect(self): async def connect(self):

View File

@@ -47,456 +47,271 @@ class PublicChannelsHandler(object):
def __status_channel_handler(self, subscription, *parameters): def __status_channel_handler(self, subscription, *parameters):
self.event_emitter.emit("status", subscription, parameters[0]) self.event_emitter.emit("status", subscription, parameters[0])
class AuthenticatedEventsHandler(object): class AuthenticatedChannelsHandler(object):
def __init__(self, event_emitter, strict = False): def __init__(self, event_emitter, strict = False):
self.event_emitter, self.strict = event_emitter, strict self.event_emitter, self.strict = event_emitter, strict
self.__handlers = { self.__handlers = {
"bu": self.__bu_event_handler, ("os", "on", "ou", "oc",): self.__orders_channel_handler,
"ws": self.__ws_event_handler, ("ps", "pn", "pu", "pc",): self.__positions_channel_handler,
"wu": self.__wu_event_handler, ("te", "tu",): self.__trades_channel_handler,
"os": self.__os_event_handler, ("fos", "fon", "fou", "foc",): self.__funding_offers_channel_handler,
"on": self.__on_event_handler, ("fcs", "fcn", "fcu", "fcc",): self.__funding_credits_channel_handler,
"ou": self.__ou_event_handler, ("fls", "fln", "flu", "flc",): self.__funding_loans_channel_handler,
"oc": self.__oc_event_handler, ("ws", "wu",): self.__wallets_channel_handler,
"ps": self.__ps_event_handler, ("bu",): self.__balance_info_channel_handler
"pn": self.__pn_event_handler,
"pu": self.__pu_event_handler,
"pc": self.__pc_event_handler,
"fos": self.__fos_event_handler,
"fon": self.__fon_event_handler,
"fou": self.__fou_event_handler,
"foc": self.__foc_event_handler,
} }
def handle(self, type, stream): def handle(self, type, stream):
if type in self.__handlers: for abbreviations in self.__handlers.keys():
self.__handlers[type](*stream) if type in abbreviations:
elif self.strict == True: return self.__handlers[abbreviations](type, stream)
if self.strict == True:
raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.")
def __bu_event_handler(self, *stream): def __orders_channel_handler(self, type, stream):
self.event_emitter.emit("balance_update", _label_stream_data( _labels = [
[ "ID",
"AUM", "GID",
"AUM_NET" "CID",
], "SYMBOL",
*stream "MTS_CREATE",
)) "MTS_UPDATE",
"AMOUNT",
"AMOUNT_ORIG",
"ORDER_TYPE",
"TYPE_PREV",
"MTS_TIF",
"_PLACEHOLDER",
"FLAGS",
"ORDER_STATUS",
"_PLACEHOLDER",
"_PLACEHOLDER",
"PRICE",
"PRICE_AVG",
"PRICE_TRAILING",
"PRICE_AUX_LIMIT",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"NOTIFY",
"HIDDEN",
"PLACED_ID",
"_PLACEHOLDER",
"_PLACEHOLDER",
"ROUTING",
"_PLACEHOLDER",
"_PLACEHOLDER",
"META"
]
def __ws_event_handler(self, *stream): if type == "os":
self.event_emitter.emit("wallet_snapshot", [ self.event_emitter.emit("order_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ])
_label_stream_data(
[
"WALLET_TYPE",
"CURRENCY",
"BALANCE",
"UNSETTLED_INTEREST",
"BALANCE_AVAILABLE",
"DESCRIPTION",
"META"
],
*substream
) for substream in stream
])
def __wu_event_handler(self, *stream):
self.event_emitter.emit("wallet_update", _label_stream_data(
[
"WALLET_TYPE",
"CURRENCY",
"BALANCE",
"UNSETTLED_INTEREST",
"BALANCE_AVAILABLE",
"DESCRIPTION",
"META"
],
*stream
))
def __os_event_handler(self, *stream): if type == "on" or type == "ou" or type == "oc":
self.event_emitter.emit("order_snapshot", [ self.event_emitter.emit({
_label_stream_data( "on": "new_order",
[ "ou": "order_update",
"ID", "oc": "order_cancel"
"GID", }[type], _label_stream_data(_labels, *stream))
"CID",
"SYMBOL",
"MTS_CREATE",
"MTS_UPDATE",
"AMOUNT",
"AMOUNT_ORIG",
"ORDER_TYPE",
"TYPE_PREV",
"MTS_TIF",
"_PLACEHOLDER",
"FLAGS",
"STATUS",
"_PLACEHOLDER",
"_PLACEHOLDER",
"PRICE",
"PRICE_AVG",
"PRICE_TRAILING",
"PRICE_AUX_LIMIT",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"NOTIFY",
"HIDDEN",
"PLACED_ID",
"_PLACEHOLDER",
"_PLACEHOLDER",
"ROUTING",
"_PLACEHOLDER",
"_PLACEHOLDER",
"META"
],
*substream
) for substream in stream
])
def __on_event_handler(self, *stream): def __positions_channel_handler(self, type, stream):
self.event_emitter.emit("new_order", _label_stream_data( _labels = [
[ "SYMBOL",
"STATUS",
"AMOUNT",
"BASE_PRICE",
"MARGIN_FUNDING",
"MARGIN_FUNDING_TYPE",
"PL",
"PL_PERC",
"PRICE_LIQ",
"LEVERAGE",
"FLAG",
"POSITION_ID",
"MTS_CREATE",
"MTS_UPDATE",
"_PLACEHOLDER",
"TYPE",
"_PLACEHOLDER",
"COLLATERAL",
"COLLATERAL_MIN",
"META"
]
if type == "ps":
self.event_emitter.emit("position_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ])
if type == "pn" or type == "pu" or type == "pc":
self.event_emitter.emit({
"pn": "new_position",
"pu": "position_update",
"pc": "position_close"
}[type], _label_stream_data(_labels, *stream))
def __trades_channel_handler(self, type, stream):
if type == "te":
self.event_emitter.emit("trade_executed", _label_stream_data([
"ID", "ID",
"GID",
"CID",
"SYMBOL", "SYMBOL",
"MTS_CREATE", "MTS_CREATE",
"MTS_UPDATE", "ORDER_ID",
"AMOUNT", "EXEC_AMOUNT",
"AMOUNT_ORIG", "EXEC_PRICE",
"ORDER_TYPE", "ORDER_TYPE",
"TYPE_PREV", "ORDER_PRICE",
"MTS_TIF", "MAKER",
"_PLACEHOLDER", "_PLACEHOLDER",
"FLAGS", "_PLACEHOLDER",
"ORDER_STATUS", "CID"
"_PLACEHOLDER", ], *stream))
"_PLACEHOLDER",
"PRICE",
"PRICE_AVG",
"PRICE_TRAILING",
"PRICE_AUX_LIMIT",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"NOTIFY",
"HIDDEN",
"PLACED_ID",
"_PLACEHOLDER",
"_PLACEHOLDER",
"ROUTING",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER"
],
*stream
))
def __ou_event_handler(self, *stream): if type == "tu":
self.event_emitter.emit("order_update", _label_stream_data( self.event_emitter.emit("trade_execution_update", _label_stream_data([
[
"ID", "ID",
"GID",
"CID",
"SYMBOL", "SYMBOL",
"MTS_CREATE", "MTS_CREATE",
"MTS_UPDATE", "ORDER_ID",
"AMOUNT", "EXEC_AMOUNT",
"AMOUNT_ORIG", "EXEC_PRICE",
"ORDER_TYPE", "ORDER_TYPE",
"TYPE_PREV", "ORDER_PRICE",
"MTS_TIF", "MAKER",
"_PLACEHOLDER", "FEE",
"FLAGS", "FEE_CURRENCY",
"ORDER_STATUS", "CID"
"_PLACEHOLDER", ], *stream))
"_PLACEHOLDER",
"PRICE",
"PRICE_AVG",
"PRICE_TRAILING",
"PRICE_AUX_LIMIT",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"NOTIFY",
"HIDDEN",
"PLACED_ID",
"_PLACEHOLDER",
"_PLACEHOLDER",
"ROUTING",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER"
],
*stream
))
def __oc_event_handler(self, *stream): def __funding_offers_channel_handler(self, type, stream):
self.event_emitter.emit("order_cancel", _label_stream_data( _labels = [
[ "ID",
"ID", "SYMBOL",
"GID", "MTS_CREATED",
"CID", "MTS_UPDATED",
"SYMBOL", "AMOUNT",
"MTS_CREATE", "AMOUNT_ORIG",
"MTS_UPDATE", "OFFER_TYPE",
"AMOUNT", "_PLACEHOLDER",
"AMOUNT_ORIG", "_PLACEHOLDER",
"ORDER_TYPE", "FLAGS",
"TYPE_PREV", "STATUS",
"MTS_TIF", "_PLACEHOLDER",
"_PLACEHOLDER", "_PLACEHOLDER",
"FLAGS", "_PLACEHOLDER",
"ORDER_STATUS", "RATE",
"_PLACEHOLDER", "PERIOD",
"_PLACEHOLDER", "NOTIFY",
"PRICE", "HIDDEN",
"PRICE_AVG", "_PLACEHOLDER",
"PRICE_TRAILING", "RENEW",
"PRICE_AUX_LIMIT", "_PLACEHOLDER",
"_PLACEHOLDER", ]
"_PLACEHOLDER",
"_PLACEHOLDER",
"NOTIFY",
"HIDDEN",
"PLACED_ID",
"_PLACEHOLDER",
"_PLACEHOLDER",
"ROUTING",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER"
],
*stream
))
def __ps_event_handler(self, *stream): if type == "fos":
self.event_emitter.emit("position_snapshot", [ self.event_emitter.emit("funding_offer_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ])
_label_stream_data(
[
"SYMBOL",
"STATUS",
"AMOUNT",
"BASE_PRICE",
"MARGIN_FUNDING",
"MARGIN_FUNDING_TYPE",
"PL",
"PL_PERC",
"PRICE_LIQ",
"LEVERAGE",
"FLAG",
"POSITION_ID",
"MTS_CREATE",
"MTS_UPDATE",
"_PLACEHOLDER",
"TYPE",
"_PLACEHOLDER",
"COLLATERAL",
"COLLATERAL_MIN",
"META"
],
*substream
)
for substream in stream
])
def __pn_event_handler(self, *stream): if type == "fon" or type == "fou" or type == "foc":
self.event_emitter.emit("new_position", _label_stream_data( self.event_emitter.emit({
[ "fon": "funding_offer_new",
"SYMBOL", "fou": "funding_offer_update",
"STATUS", "foc": "funding_offer_cancel"
"AMOUNT", }[type], _label_stream_data(_labels, *stream))
"BASE_PRICE",
"MARGIN_FUNDING",
"MARGIN_FUNDING_TYPE",
"PL",
"PL_PERC",
"PRICE_LIQ",
"LEVERAGE",
"FLAG",
"POSITION_ID",
"MTS_CREATE",
"MTS_UPDATE",
"_PLACEHOLDER",
"TYPE",
"_PLACEHOLDER",
"COLLATERAL",
"COLLATERAL_MIN",
"META"
],
*stream
))
def __pu_event_handler(self, *stream): def __funding_credits_channel_handler(self, type, stream):
self.event_emitter.emit("position_update", _label_stream_data( _labels = [
[ "ID",
"SYMBOL", "SYMBOL",
"STATUS", "SIDE",
"AMOUNT", "MTS_CREATE",
"BASE_PRICE", "MTS_UPDATE",
"MARGIN_FUNDING", "AMOUNT",
"MARGIN_FUNDING_TYPE", "FLAGS",
"PL", "STATUS",
"PL_PERC", "_PLACEHOLDER",
"PRICE_LIQ", "_PLACEHOLDER",
"LEVERAGE", "_PLACEHOLDER",
"FLAG", "RATE",
"POSITION_ID", "PERIOD",
"MTS_CREATE", "MTS_OPENING",
"MTS_UPDATE", "MTS_LAST_PAYOUT",
"_PLACEHOLDER", "NOTIFY",
"TYPE", "HIDDEN",
"_PLACEHOLDER", "_PLACEHOLDER",
"COLLATERAL", "RENEW",
"COLLATERAL_MIN", "RATE_REAL",
"META" "NO_CLOSE",
], "POSITION_PAIR"
*stream ]
))
def __pc_event_handler(self, *stream): if type == "fcs":
self.event_emitter.emit("position_cancel", _label_stream_data( self.event_emitter.emit("funding_credit_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ])
[
"SYMBOL",
"STATUS",
"AMOUNT",
"BASE_PRICE",
"MARGIN_FUNDING",
"MARGIN_FUNDING_TYPE",
"PL",
"PL_PERC",
"PRICE_LIQ",
"LEVERAGE",
"FLAG",
"POSITION_ID",
"MTS_CREATE",
"MTS_UPDATE",
"_PLACEHOLDER",
"TYPE",
"_PLACEHOLDER",
"COLLATERAL",
"COLLATERAL_MIN",
"META"
],
*stream
))
def __fos_event_handler(self, *stream): if type == "fcn" or type == "fcu" or type == "fcc":
self.event_emitter.emit("funding_offer_snapshot", [ self.event_emitter.emit({
_label_stream_data( "fcn": "funding_credit_new",
[ "fcu": "funding_credit_update",
"ID", "fcc": "funding_credit_close"
"SYMBOL", }[type], _label_stream_data(_labels, *stream))
"MTS_CREATED",
"MTS_UPDATED",
"AMOUNT",
"AMOUNT_ORIG",
"OFFER_TYPE",
"_PLACEHOLDER",
"_PLACEHOLDER",
"FLAGS",
"STATUS",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"RATE",
"PERIOD",
"NOTIFY",
"HIDDEN",
"_PLACEHOLDER",
"RENEW",
"_PLACEHOLDER",
],
*substream
)
for substream in stream
])
def __fon_event_handler(self, *stream): def __funding_loans_channel_handler(self, type, stream):
self.event_emitter.emit("funding_offer_new", _label_stream_data( _labels = [
[ "ID",
"ID", "SYMBOL",
"SYMBOL", "SIDE",
"MTS_CREATED", "MTS_CREATE",
"MTS_UPDATED", "MTS_UPDATE",
"AMOUNT", "AMOUNT",
"AMOUNT_ORIG", "FLAGS",
"TYPE", "STATUS",
"_PLACEHOLDER", "_PLACEHOLDER",
"_PLACEHOLDER", "_PLACEHOLDER",
"FLAGS", "_PLACEHOLDER",
"STATUS", "RATE",
"_PLACEHOLDER", "PERIOD",
"_PLACEHOLDER", "MTS_OPENING",
"_PLACEHOLDER", "MTS_LAST_PAYOUT",
"RATE", "NOTIFY",
"PERIOD", "HIDDEN",
"NOTIFY", "_PLACEHOLDER",
"HIDDEN", "RENEW",
"_PLACEHOLDER", "RATE_REAL",
"RENEW", "NO_CLOSE"
"RATE_REAL" ]
],
*stream
))
def __fou_event_handler(self, *stream): if type == "fls":
self.event_emitter.emit("funding_offer_update", _label_stream_data( self.event_emitter.emit("funding_loan_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ])
[
"ID",
"SYMBOL",
"MTS_CREATED",
"MTS_UPDATED",
"AMOUNT",
"AMOUNT_ORIG",
"TYPE",
"_PLACEHOLDER",
"_PLACEHOLDER",
"FLAGS",
"STATUS",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"RATE",
"PERIOD",
"NOTIFY",
"HIDDEN",
"_PLACEHOLDER",
"RENEW",
"RATE_REAL"
],
*stream
))
def __foc_event_handler(self, *stream): if type == "fln" or type == "flu" or type == "flc":
self.event_emitter.emit("funding_offer_cancel", _label_stream_data( self.event_emitter.emit({
[ "fln": "funding_loan_new",
"ID", "flu": "funding_loan_update",
"SYMBOL", "flc": "funding_loan_close"
"MTS_CREATED", }[type], _label_stream_data(_labels, *stream))
"MTS_UPDATED",
"AMOUNT", def __wallets_channel_handler(self, type, stream):
"AMOUNT_ORIG", _labels = [
"TYPE", "WALLET_TYPE",
"_PLACEHOLDER", "CURRENCY",
"_PLACEHOLDER", "BALANCE",
"FLAGS", "UNSETTLED_INTEREST",
"STATUS", "BALANCE_AVAILABLE",
"_PLACEHOLDER", "DESCRIPTION",
"_PLACEHOLDER", "META"
"_PLACEHOLDER", ]
"RATE",
"PERIOD", if type == "ws":
"NOTIFY", self.event_emitter.emit("wallet_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ])
"HIDDEN",
"_PLACEHOLDER", if type == "wu":
"RENEW", self.event_emitter.emit("wallet_update", _label_stream_data(_labels, *stream))
"RATE_REAL"
], def __balance_info_channel_handler(self, type, stream):
*stream if type == "bu":
)) self.event_emitter.emit("balance_update", _label_stream_data([
"AUM",
"AUM_NET"
], *stream))
def _label_stream_data(labels, *args, IGNORE = [ "_PLACEHOLDER" ]): def _label_stream_data(labels, *args, IGNORE = [ "_PLACEHOLDER" ]):
if len(labels) != len(args): if len(labels) != len(args):