Add new websocket authenticated inputs in _BfxWebsocketInputs class. Add new type hinting inside bfxapi/websocket/typings.py.

This commit is contained in:
Davide Casale
2022-11-30 17:24:51 +01:00
parent d9ecbaa9f0
commit e71d4b6e26
2 changed files with 78 additions and 6 deletions

View File

@@ -4,8 +4,8 @@ from enum import Enum
from pyee.asyncio import AsyncIOEventEmitter
from .typings import Inputs
from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler
from .exceptions import ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion
from ..utils.logger import Formatter, CustomLogger
@@ -225,11 +225,20 @@ class _BfxWebsocketInputs(object):
def __init__(self, __handle_websocket_input):
self.__handle_websocket_input = __handle_websocket_input
async def order_new(self, data):
async def order_new(self, data: Inputs.Order.New):
await self.__handle_websocket_input("on", data)
async def order_update(self, data):
async def order_update(self, data: Inputs.Order.Update):
await self.__handle_websocket_input("ou", data)
async def order_cancel(self, data):
async def order_cancel(self, data: Inputs.Order.Cancel):
await self.__handle_websocket_input("oc", data)
async def offer_new(self, data: Inputs.Offer.New):
await self.__handle_websocket_input("fon", data)
async def offer_cancel(self, data: Inputs.Offer.Cancel):
await self.__handle_websocket_input("foc", data)
async def calc(self, *args: str):
await self.__handle_websocket_input("calc", list(map(lambda arg: [arg], args)))

View File

@@ -1,4 +1,10 @@
from typing import Type, List, Dict, TypedDict, Union, Optional
from decimal import Decimal
from datetime import datetime
from typing import Type, List, Dict, TypedDict, Union, Optional, Any
int16 = int32 = int45 = int64 = int
JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]]
@@ -293,3 +299,60 @@ BalanceInfo = TypedDict("BalanceInfo", {
})
#endregion
#region Type hinting for Websocket Authenticated Inputs
class Inputs:
class Order:
New = TypedDict("Inputs.Order.New", {
"gid": Optional[int32],
"cid": int45,
"type": str,
"symbol": str,
"amount": Union[Decimal, str],
"price": Union[Decimal, str],
"lev": int,
"price_trailing": Union[Decimal, str],
"price_aux_limit": Union[Decimal, str],
"price_oco_stop": Union[Decimal, str],
"flags": int16,
"tif": Union[datetime, str],
"meta": JSON
})
Update = TypedDict("Inputs.Order.Update", {
"id": int64,
"cid": int45,
"cid_date": str,
"gid": int32,
"price": Union[Decimal, str],
"amount": Union[Decimal, str],
"lev": int,
"delta": Union[Decimal, str],
"price_aux_limit": Union[Decimal, str],
"price_trailing": Union[Decimal, str],
"flags": int16,
"tif": Union[datetime, str]
})
Cancel = TypedDict("Inputs.Order.Cancel", {
"id": int64,
"cid": int45,
"cid_date": str
})
class Offer:
New = TypedDict("Inputs.Offer.New", {
"type": str,
"symbol": str,
"amount": Union[Decimal, str],
"rate": Union[Decimal, str],
"period": int,
"flags": int16
})
Cancel = TypedDict("Inputs.Offer.Cancel", {
"id": int
})
#endregion