Rewrite all logic regarding connection multiplexing.

This commit is contained in:
Davide Casale
2023-10-13 05:43:42 +02:00
parent 378e89b504
commit 122d692684
3 changed files with 18 additions and 38 deletions

View File

@@ -29,7 +29,6 @@ from bfxapi.websocket.exceptions import \
InvalidCredentialError, \
ReconnectionTimeoutError, \
VersionMismatchError, \
ZeroConnectionsError, \
UnknownChannelError, \
UnknownSubscriptionError
@@ -246,27 +245,35 @@ class BfxWebSocketClient(Connection):
message[0] == 0 and message[1] != Connection._HEARTBEAT:
self.__handler.handle(message[1], message[2])
async def __new_bucket(self) -> BfxWebSocketBucket:
bucket = BfxWebSocketBucket( \
self._host, self.__event_emitter)
self.__buckets[bucket] = asyncio \
.create_task(bucket.start())
await bucket.wait()
return bucket
@Connection.require_websocket_connection
async def subscribe(self,
channel: str,
sub_id: Optional[str] = None,
**kwargs: Any) -> None:
if len(self.__buckets) == 0:
raise ZeroConnectionsError("Unable to subscribe: " \
"the number of connections must be greater than 0.")
if not channel in ["ticker", "trades", "book", "candles", "status"]:
raise UnknownChannelError("Available channels are: " + \
"ticker, trades, book, candles and status.")
_buckets = list(self.__buckets.keys())
for bucket in self.__buckets:
if not bucket.is_full:
return await bucket.subscribe( \
channel, sub_id, **kwargs)
counters = [ bucket.count for bucket in _buckets ]
bucket = await self.__new_bucket()
index = counters.index(min(counters))
await _buckets[index] \
.subscribe(channel, sub_id, **kwargs)
return await bucket.subscribe( \
channel, sub_id, **kwargs)
@Connection.require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None:

View File

@@ -11,11 +11,6 @@ class FullBucketError(BfxBaseException):
Thrown when a user attempts a subscription but all buckets are full.
"""
class ZeroConnectionsError(BfxBaseException):
"""
This error indicates an attempt to subscribe to a public channel while the number of connections is 0.
"""
class ReconnectionTimeoutError(BfxBaseException):
"""
This error indicates that the connection has been offline for too long without being able to reconnect.