Implement bfxapi/notifications.py in websocket subpackage.

This commit is contained in:
Davide Casale
2023-01-12 18:36:23 +01:00
parent ff58f049a7
commit e64c25bf19
4 changed files with 24 additions and 33 deletions

View File

@@ -146,7 +146,13 @@ class AuthenticatedChannelsHandler(object):
("bu",): serializers.BalanceInfo ("bu",): serializers.BalanceInfo
} }
EVENTS = [ "notification", *list(__abbreviations.values()) ] EVENTS = [
"notification",
"on-req-notification", "ou-req-notification", "oc-req-notification",
"oc_multi-notification",
"fon-req-notification", "foc-req-notification",
*list(__abbreviations.values())
]
def __init__(self, event_emitter, strict = False): def __init__(self, event_emitter, strict = False):
self.event_emitter, self.strict = event_emitter, strict self.event_emitter, self.strict = event_emitter, strict
@@ -168,4 +174,13 @@ class AuthenticatedChannelsHandler(object):
raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.")
def __notification(self, stream): def __notification(self, stream):
return self.event_emitter.emit("notification", serializers.Notification.parse(*stream)) if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req":
return self.event_emitter.emit(f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order).parse(*stream))
if stream[1] == "oc_multi-req":
return self.event_emitter.emit(f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order, iterate=True).parse(*stream))
if stream[1] == "fon-req" or stream[1] == "foc-req":
return self.event_emitter.emit(f"{stream[1]}-notification", serializers._Notification(serializer=serializers.FundingOffer).parse(*stream))
return self.event_emitter.emit("notification", serializers._Notification(serializer=None).parse(*stream))

View File

@@ -2,6 +2,8 @@ from . import typings
from .. labeler import _Serializer from .. labeler import _Serializer
from .. notification import _Notification
#region Serializers definition for Websocket Public Channels #region Serializers definition for Websocket Public Channels
TradingPairTicker = _Serializer[typings.TradingPairTicker]("TradingPairTicker", labels=[ TradingPairTicker = _Serializer[typings.TradingPairTicker]("TradingPairTicker", labels=[
@@ -292,19 +294,4 @@ BalanceInfo = _Serializer[typings.BalanceInfo]("BalanceInfo", labels=[
"AUM_NET", "AUM_NET",
]) ])
#endregion
#region Serializers definition for Notifications channel
Notification = _Serializer[typings.Notification]("Notification", labels=[
"MTS",
"TYPE",
"MESSAGE_ID",
"_PLACEHOLDER",
"NOTIFY_INFO",
"CODE",
"STATUS",
"TEXT"
])
#endregion #endregion

View File

@@ -1,5 +1,7 @@
from typing import Type, Tuple, List, Dict, TypedDict, Union, Optional, Any from typing import Type, Tuple, List, Dict, TypedDict, Union, Optional, Any
from .. notification import Notification
JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]] JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]]
#region Type hinting for subscription objects #region Type hinting for subscription objects
@@ -272,17 +274,4 @@ class BalanceInfo(TypedDict):
AUM: float AUM: float
AUM_NET: float AUM_NET: float
#endregion
#region Type hinting for Notifications channel
class Notification(TypedDict):
MTS: int
TYPE: str
MESSAGE_ID: int
NOTIFY_INFO: JSON
CODE: int
STATUS: str
TEXT: str
#endregion #endregion

View File

@@ -4,7 +4,7 @@ import os
from bfxapi.client import Client, Constants from bfxapi.client import Client, Constants
from bfxapi.websocket.enums import Error, OrderType from bfxapi.websocket.enums import Error, OrderType
from bfxapi.websocket.typings import Order from bfxapi.websocket.typings import Notification, Order
bfx = Client( bfx = Client(
WSS_HOST=Constants.WSS_HOST, WSS_HOST=Constants.WSS_HOST,
@@ -29,8 +29,8 @@ async def on_authenticated(event):
print("The order has been sent.") print("The order has been sent.")
@bfx.wss.on("notification") @bfx.wss.on("on-req-notification")
async def on_notification(notification): async def on_notification(notification: Notification):
print(f"Notification: {notification}.") print(f"Notification: {notification}.")
@bfx.wss.on("order_new") @bfx.wss.on("order_new")