mirror of
https://github.com/aljazceru/bitfinex-api-py.git
synced 2025-12-19 23:04:21 +01:00
Add support for SimpleNamespace (instead of TypedDict) in bfxapi/labeler.py and bfxapi/notifications.py. Add generics Notification type in notifications.py. Add support for new changes in bfxapi/rest/BfxRestInterface.py.
This commit is contained in:
@@ -1,17 +1,21 @@
|
||||
from typing import List, Dict, Union, Optional, Any, TypedDict, cast
|
||||
from typing import List, Dict, Union, Optional, Any, TypedDict, Generic, TypeVar, cast
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
from .labeler import _Serializer
|
||||
|
||||
class Notification(TypedDict):
|
||||
T = TypeVar("T")
|
||||
|
||||
class Notification(SimpleNamespace, Generic[T]):
|
||||
MTS: int
|
||||
TYPE: str
|
||||
MESSAGE_ID: Optional[int]
|
||||
NOTIFY_INFO: Union[Dict[str, Any], List[Dict[str, Any]]]
|
||||
NOTIFY_INFO: T
|
||||
CODE: Optional[int]
|
||||
STATUS: str
|
||||
TEXT: str
|
||||
|
||||
class _Notification(_Serializer):
|
||||
class _Notification(_Serializer, Generic[T]):
|
||||
__LABELS = [ "MTS", "TYPE", "MESSAGE_ID", "_PLACEHOLDER", "NOTIFY_INFO", "CODE", "STATUS", "TEXT" ]
|
||||
|
||||
def __init__(self, serializer: Optional[_Serializer] = None, iterate: bool = False):
|
||||
@@ -19,17 +23,17 @@ class _Notification(_Serializer):
|
||||
|
||||
self.serializer, self.iterate = serializer, iterate
|
||||
|
||||
def parse(self, *values: Any, skip: Optional[List[str]] = None) -> Notification:
|
||||
notification = dict(self._serialize(*values))
|
||||
def parse(self, *values: Any, skip: Optional[List[str]] = None) -> Notification[T]:
|
||||
notification = cast(Notification[T], SimpleNamespace(**dict(self._serialize(*values))))
|
||||
|
||||
if isinstance(self.serializer, _Serializer):
|
||||
if self.iterate == False:
|
||||
NOTIFY_INFO = notification["NOTIFY_INFO"]
|
||||
NOTIFY_INFO = cast(List[Any], notification.NOTIFY_INFO)
|
||||
|
||||
if self.iterate == False:
|
||||
if len(NOTIFY_INFO) == 1 and isinstance(NOTIFY_INFO[0], list):
|
||||
NOTIFY_INFO = NOTIFY_INFO[0]
|
||||
|
||||
notification["NOTIFY_INFO"] = dict(self.serializer._serialize(*NOTIFY_INFO, skip=skip))
|
||||
else: notification["NOTIFY_INFO"] = [ dict(self.serializer._serialize(*data, skip=skip)) for data in notification["NOTIFY_INFO"] ]
|
||||
notification.NOTIFY_INFO = cast(T, SimpleNamespace(**dict(self.serializer._serialize(*NOTIFY_INFO, skip=skip))))
|
||||
else: notification.NOTIFY_INFO = cast(T, [ SimpleNamespace(**dict(self.serializer._serialize(*data, skip=skip))) for data in NOTIFY_INFO ])
|
||||
|
||||
return cast(Notification, notification)
|
||||
return notification
|
||||
Reference in New Issue
Block a user