Move subscriptions type hinting from bfxapi/websocket/types.py to bfxapi/websocket/subscriptions.py.

This commit is contained in:
Davide Casale
2023-01-19 18:00:51 +01:00
parent c471a3b52b
commit 5fe4d83902
7 changed files with 54 additions and 69 deletions

View File

@@ -5,9 +5,11 @@ from typing import Type, Generic, TypeVar, Iterable, Optional, List, Tuple, Any,
T = TypeVar("T", bound="_Type")
class _Type(object):
def __init__(self, **kwargs):
for key, value in kwargs.items():
self.__setattr__(key, value)
"""
Base class for any dataclass serializable by the _Serializer generic class.
"""
pass
class _Serializer(Generic[T]):
def __init__(self, name: str, klass: Type[_Type], labels: List[str], IGNORE: List[str] = [ "_PLACEHOLDER" ]):

View File

@@ -6,9 +6,6 @@ from . import serializers
from .enums import Channels
from .exceptions import BfxWebsocketException
def _get_sub_dictionary(dictionary, keys):
return { key: dictionary[key] for key in dictionary if key in keys }
class PublicChannelsHandler(object):
EVENTS = [
"t_ticker_update", "f_ticker_update",
@@ -30,21 +27,23 @@ class PublicChannelsHandler(object):
}
def handle(self, subscription, *stream):
_clear = lambda dictionary, *args: { key: value for key, value in dictionary.items() if key not in args }
if channel := subscription["channel"] or channel in self.__handlers.keys():
return self.__handlers[channel](subscription, *stream)
return self.__handlers[channel](_clear(subscription, "event", "channel"), *stream)
def __ticker_channel_handler(self, subscription, *stream):
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
"t_ticker_update",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
subscription,
serializers.TradingPairTicker.parse(*stream[0])
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
"f_ticker_update",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
subscription,
serializers.FundingCurrencyTicker.parse(*stream[0])
)
@@ -53,34 +52,32 @@ class PublicChannelsHandler(object):
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
{ "te": "t_trade_executed", "tu": "t_trade_execution_update" }[type],
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
subscription,
serializers.TradingPairTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
{ "fte": "f_trade_executed", "ftu": "f_trade_execution_update" }[type],
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
subscription,
serializers.FundingCurrencyTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
"t_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
subscription,
[ serializers.TradingPairTrade.parse(*substream) for substream in stream[0] ]
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
"f_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
subscription,
[ serializers.FundingCurrencyTrade.parse(*substream) for substream in stream[0] ]
)
def __book_channel_handler(self, subscription, *stream):
subscription = _get_sub_dictionary(subscription, [ "chanId", "symbol", "prec", "freq", "len", "subId", "pair" ])
type = subscription["symbol"][0]
if subscription["prec"] == "R0":
@@ -101,8 +98,6 @@ class PublicChannelsHandler(object):
)
def __candles_channel_handler(self, subscription, *stream):
subscription = _get_sub_dictionary(subscription, [ "chanId", "key" ])
if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit(
"candles_snapshot",
@@ -117,8 +112,6 @@ class PublicChannelsHandler(object):
)
def __status_channel_handler(self, subscription, *stream):
subscription = _get_sub_dictionary(subscription, [ "chanId", "key" ])
if subscription["key"].startswith("deriv:"):
return self.event_emitter.emit(
"derivatives_status_update",

View File

@@ -0,0 +1,28 @@
from typing import TypedDict, Optional
class Ticker(TypedDict):
chanId: int; symbol: str
pair: Optional[str]
currency: Optional[str]
class Trades(TypedDict):
chanId: int; symbol: str
pair: Optional[str]
currency: Optional[str]
class Book(TypedDict):
chanId: int
symbol: str
prec: str
freq: str
len: str
subId: int
pair: str
class Candles(TypedDict):
chanId: int
key: str
class Status(TypedDict):
chanId: int
key: str

View File

@@ -8,48 +8,6 @@ from ..notification import Notification
JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]]
#region Type hinting for subscription objects
class Subscriptions:
class TradingPairTicker(TypedDict):
chanId: int
symbol: str
pair: str
class FundingCurrencyTicker(TypedDict):
chanId: int
symbol: str
currency: str
class TradingPairTrades(TypedDict):
chanId: int
symbol: str
pair: str
class FundingCurrencyTrades(TypedDict):
chanId: int
symbol: str
currency: str
class Book(TypedDict):
chanId: int
symbol: str
prec: str
freq: str
len: str
subId: int
pair: str
class Candles(TypedDict):
chanId: int
key: str
class DerivativesStatus(TypedDict):
chanId: int
key: str
#endregion
#region Type hinting for Websocket Public Channels
@dataclass

View File

@@ -6,8 +6,9 @@ from typing import List
from bfxapi import Client, Constants
from bfxapi.websocket import subscriptions
from bfxapi.websocket.enums import Channels, Error
from bfxapi.websocket.types import Subscriptions, TradingPairBook
from bfxapi.websocket.types import TradingPairBook
class OrderBook(object):
def __init__(self, symbols: List[str]):
@@ -53,12 +54,12 @@ def on_subscribed(subscription):
print(f"Subscription successful for pair <{subscription['pair']}>")
@bfx.wss.on("t_book_snapshot")
def on_t_book_snapshot(subscription: Subscriptions.Book, snapshot: List[TradingPairBook]):
def on_t_book_snapshot(subscription: subscriptions.Book, snapshot: List[TradingPairBook]):
for data in snapshot:
order_book.update(subscription["symbol"], data)
@bfx.wss.on("t_book_update")
def on_t_book_update(subscription: Subscriptions.Book, data: TradingPairBook):
def on_t_book_update(subscription: subscriptions.Book, data: TradingPairBook):
order_book.update(subscription["symbol"], data)
bfx.wss.run()

View File

@@ -6,8 +6,9 @@ from typing import List
from bfxapi import Client, Constants
from bfxapi.websocket import subscriptions
from bfxapi.websocket.enums import Channels, Error
from bfxapi.websocket.types import Subscriptions, TradingPairRawBook
from bfxapi.websocket.types import TradingPairRawBook
class RawOrderBook(object):
def __init__(self, symbols: List[str]):
@@ -53,12 +54,12 @@ def on_subscribed(subscription):
print(f"Subscription successful for pair <{subscription['pair']}>")
@bfx.wss.on("t_raw_book_snapshot")
def on_t_raw_book_snapshot(subscription: Subscriptions.Book, snapshot: List[TradingPairRawBook]):
def on_t_raw_book_snapshot(subscription: subscriptions.Book, snapshot: List[TradingPairRawBook]):
for data in snapshot:
raw_order_book.update(subscription["symbol"], data)
@bfx.wss.on("t_raw_book_update")
def on_t_raw_book_update(subscription: Subscriptions.Book, data: TradingPairRawBook):
def on_t_raw_book_update(subscription: subscriptions.Book, data: TradingPairRawBook):
raw_order_book.update(subscription["symbol"], data)
bfx.wss.run()

View File

@@ -1,13 +1,15 @@
# python -c "from examples.websocket.ticker import *"
from bfxapi import Client, Constants
from bfxapi.websocket import subscriptions
from bfxapi.websocket.enums import Channels
from bfxapi.websocket.types import Subscriptions, TradingPairTicker
from bfxapi.websocket.types import TradingPairTicker
bfx = Client(WSS_HOST=Constants.PUB_WSS_HOST)
@bfx.wss.on("t_ticker_update")
def on_t_ticker_update(subscription: Subscriptions.TradingPairTicker, data: TradingPairTicker):
def on_t_ticker_update(subscription: subscriptions.Ticker, data: TradingPairTicker):
print(f"Subscription with channel ID: {subscription['chanId']}")
print(f"Data: {data}")