Add filter parameter to BfxWebsocketClient's __init__.py. Rewrite .notify coroutine adding new feature. Add Notifications channel handlers in handlers.py. Add Notification serializer in serializers.py.

This commit is contained in:
Davide Casale
2022-11-24 17:31:47 +01:00
parent 6f1befbcf0
commit 3a09ba2e90
3 changed files with 30 additions and 5 deletions

View File

@@ -146,12 +146,15 @@ class AuthenticatedChannelsHandler(object):
("bu",): serializers.BalanceInfo
}
EVENTS = list(__abbreviations.values())
EVENTS = [ "notification", *list(__abbreviations.values()) ]
def __init__(self, event_emitter, strict = False):
self.event_emitter, self.strict = event_emitter, strict
def handle(self, type, stream):
if type == "n":
return self.__notification(stream)
for types, serializer in AuthenticatedChannelsHandler.__serializers.items():
if type in types:
event = AuthenticatedChannelsHandler.__abbreviations[type]
@@ -162,4 +165,7 @@ class AuthenticatedChannelsHandler(object):
return self.event_emitter.emit(event, serializer.parse(*stream))
if self.strict == True:
raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.")
raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.")
def __notification(self, stream):
return self.event_emitter.emit("notification", serializers.Notification.parse(*stream))