Rename bfxapi/websocket/typings.py to types.py. Replace TypedDicts with dataclasses (with _Type as base class). Update demos in examples/websocket to use new implementation.

This commit is contained in:
Davide Casale
2023-01-16 17:07:16 +01:00
parent 1613a56d81
commit e185da4cc9
4 changed files with 77 additions and 49 deletions

View File

@@ -1,3 +1,7 @@
from typing import List
from .types import *
from . import serializers
from .enums import Channels
from .exceptions import BfxWebsocketException
@@ -174,13 +178,15 @@ class AuthenticatedChannelsHandler(object):
raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.")
def __notification(self, stream):
type, serializer = "notification", serializers._Notification(serializer=None)
if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req":
return self.event_emitter.emit(f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order).parse(*stream))
type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order)
if stream[1] == "oc_multi-req":
return self.event_emitter.emit(f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order, iterate=True).parse(*stream))
type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True)
if stream[1] == "fon-req" or stream[1] == "foc-req":
return self.event_emitter.emit(f"{stream[1]}-notification", serializers._Notification(serializer=serializers.FundingOffer).parse(*stream))
type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer)
return self.event_emitter.emit("notification", serializers._Notification(serializer=None).parse(*stream))
return self.event_emitter.emit(type, serializer.parse(*stream))