mirror of
https://github.com/aljazceru/bitfinex-api-py.git
synced 2025-12-19 14:54:21 +01:00
207 lines
7.4 KiB
Python
207 lines
7.4 KiB
Python
"""
|
|
Module used to house all of the functions/classes used to handle
|
|
subscriptions
|
|
"""
|
|
|
|
import json
|
|
import asyncio
|
|
import time
|
|
|
|
from ..utils.custom_logger import CustomLogger
|
|
from ..models import Subscription
|
|
|
|
MAX_CHANNEL_COUNT = 25
|
|
|
|
class SubscriptionManager:
|
|
|
|
def __init__(self, bfxapi, logLevel='INFO'):
|
|
self.pending_subscriptions = {}
|
|
self.subscriptions_chanid = {}
|
|
self.subscriptions_subid = {}
|
|
self.unsubscribe_callbacks = {}
|
|
self.bfxapi = bfxapi
|
|
self.logger = CustomLogger('BfxSubscriptionManager', logLevel=logLevel)
|
|
|
|
def get_sub_count_by_socket(self, socket_id):
|
|
count = 0
|
|
for sub in self.subscriptions_chanid.values():
|
|
if sub.socket.id == socket_id and sub.is_subscribed():
|
|
count += 1
|
|
for sub in self.pending_subscriptions.values():
|
|
if sub.socket.id == socket_id:
|
|
count += 1
|
|
return count
|
|
|
|
async def subscribe(self, channel_name, symbol, key=None, timeframe=None, **kwargs):
|
|
"""
|
|
Subscribe to a new channel
|
|
|
|
@param channel_name: the name of the channel i.e 'books', 'candles'
|
|
@param symbol: the trading symbol i.e 'tBTCUSD'
|
|
@param timeframe: sepecifies the data timeframe between each candle (only required
|
|
for the candles channel)
|
|
"""
|
|
if self.bfxapi.get_total_available_capcity() < 2:
|
|
sId = self.bfxapi._start_new_socket()
|
|
self.bfxapi._wait_for_socket(sId)
|
|
soc = self.bfxapi.sockets[sId]
|
|
socket = self.bfxapi.sockets[sId]
|
|
else:
|
|
# get the socket with the least amount of subscriptions
|
|
socket = self.bfxapi.get_most_available_socket()
|
|
# create a new subscription
|
|
subscription = Subscription(
|
|
socket, channel_name, symbol, key, timeframe, **kwargs)
|
|
self.logger.info("Subscribing to channel {}".format(channel_name))
|
|
self.pending_subscriptions[subscription.get_key()] = subscription
|
|
|
|
await subscription.subscribe()
|
|
|
|
async def confirm_subscription(self, socket_id, raw_ws_data):
|
|
symbol = raw_ws_data.get("symbol", None)
|
|
channel = raw_ws_data.get("channel")
|
|
chan_id = raw_ws_data.get("chanId")
|
|
key = raw_ws_data.get("key", None)
|
|
get_key = "{}_{}".format(channel, key or symbol)
|
|
if chan_id in self.subscriptions_chanid:
|
|
# subscription has already existed in the past
|
|
p_sub = self.subscriptions_chanid[chan_id]
|
|
elif get_key in self.pending_subscriptions:
|
|
# has just been created and is pending
|
|
p_sub = self.pending_subscriptions[get_key]
|
|
# remove from pending list
|
|
del self.pending_subscriptions[get_key]
|
|
else:
|
|
# might have been disconnected, so we need to check if exists
|
|
# as subscribed but with a new channel ID
|
|
for sub in self.subscriptions_chanid.values():
|
|
if sub.get_key() == get_key and not sub.is_subscribed():
|
|
# delete old channelId
|
|
del self.subscriptions_chanid[sub.chan_id]
|
|
p_sub = sub
|
|
break
|
|
if p_sub is None:
|
|
# no sub matches confirmation
|
|
self.logger.warn("unknown subscription confirmed {}".format(get_key))
|
|
return
|
|
|
|
p_sub.confirm_subscription(chan_id)
|
|
# add to confirmed list
|
|
self.subscriptions_chanid[chan_id] = p_sub
|
|
self.subscriptions_subid[p_sub.sub_id] = p_sub
|
|
self.bfxapi._emit('subscribed', p_sub)
|
|
|
|
async def confirm_unsubscribe(self, socket_id, raw_ws_data):
|
|
chan_id = raw_ws_data.get("chanId")
|
|
sub = self.subscriptions_chanid[chan_id]
|
|
sub.confirm_unsubscribe()
|
|
# call onComplete callback if exists
|
|
if sub.sub_id in self.unsubscribe_callbacks:
|
|
await self.unsubscribe_callbacks[sub.sub_id]()
|
|
del self.unsubscribe_callbacks[sub.sub_id]
|
|
self.bfxapi._emit('unsubscribed', sub)
|
|
|
|
def get(self, chan_id):
|
|
return self.subscriptions_chanid[chan_id]
|
|
|
|
def set_unsubscribed_by_socket(self, socket_id):
|
|
"""
|
|
Sets all f the subscriptions ot state 'unsubscribed'
|
|
"""
|
|
for sub in self.subscriptions_chanid.values():
|
|
if sub.socket.id == socket_id:
|
|
sub.confirm_unsubscribe()
|
|
|
|
def set_all_unsubscribed(self):
|
|
"""
|
|
Sets all f the subscriptions ot state 'unsubscribed'
|
|
"""
|
|
for sub in self.subscriptions_chanid.values():
|
|
sub.confirm_unsubscribe()
|
|
|
|
async def unsubscribe(self, chan_id, onComplete=None):
|
|
"""
|
|
Unsubscribe from the channel with the given chanId
|
|
|
|
@param onComplete: function called when the bitfinex websocket resoponds with
|
|
a signal that confirms the subscription has been unsubscribed to
|
|
"""
|
|
sub = self.subscriptions_chanid[chan_id]
|
|
if onComplete:
|
|
self.unsubscribe_callbacks[sub.sub_id] = onComplete
|
|
if sub.is_subscribed():
|
|
await self.subscriptions_chanid[chan_id].unsubscribe()
|
|
|
|
async def resubscribe(self, chan_id):
|
|
"""
|
|
Unsubscribes and then subscribes to the channel with the given Id
|
|
|
|
This function is mostly used to force the channel to produce a fresh snapshot.
|
|
"""
|
|
sub = self.subscriptions_chanid[chan_id]
|
|
|
|
async def re_sub():
|
|
await sub.subscribe()
|
|
if sub.is_subscribed():
|
|
# unsubscribe first and call callback to subscribe
|
|
await self.unsubscribe(chan_id, re_sub)
|
|
else:
|
|
# already unsibscribed, so just subscribe
|
|
await sub.subscribe()
|
|
|
|
def channel_count(self):
|
|
"""
|
|
Returns the number of cannels
|
|
"""
|
|
return len(self.pending_subscriptions) + len(self.subscriptions_chanid)
|
|
|
|
def is_subscribed(self, chan_id):
|
|
"""
|
|
Returns True if the channel with the given chanId is currenly subscribed to
|
|
"""
|
|
if chan_id not in self.subscriptions_chanid:
|
|
return False
|
|
return self.subscriptions_chanid[chan_id].is_subscribed()
|
|
|
|
async def unsubscribe_all(self):
|
|
"""
|
|
Unsubscribe from all channels.
|
|
"""
|
|
task_batch = []
|
|
for chan_id in self.subscriptions_chanid:
|
|
sub = self.get(chan_id)
|
|
if sub.is_subscribed():
|
|
task_batch += [
|
|
asyncio.ensure_future(self.unsubscribe(chan_id))
|
|
]
|
|
if len(task_batch) == 0:
|
|
return
|
|
await asyncio.wait(*[task_batch])
|
|
|
|
async def resubscribe_by_socket(self, socket_id):
|
|
"""
|
|
Unsubscribe channels on socket and then subscribe to all channels
|
|
"""
|
|
task_batch = []
|
|
for sub in self.subscriptions_chanid.values():
|
|
if sub.socket.id == socket_id:
|
|
task_batch += [
|
|
asyncio.ensure_future(self.resubscribe(sub.chan_id))
|
|
]
|
|
if len(task_batch) == 0:
|
|
return
|
|
await asyncio.wait(*[task_batch])
|
|
|
|
async def resubscribe_all(self):
|
|
"""
|
|
Unsubscribe and then subscribe to all channels
|
|
"""
|
|
task_batch = []
|
|
for chan_id in self.subscriptions_chanid:
|
|
task_batch += [
|
|
asyncio.ensure_future(self.resubscribe(chan_id))
|
|
]
|
|
if len(task_batch) == 0:
|
|
return
|
|
await asyncio.wait(*[task_batch])
|