Merge pull request #10 from JacobPlaster/improve-callbacks-and-checksum

Improve callbacks
This commit is contained in:
Paolo Ardoino
2019-02-08 18:54:23 +01:00
committed by GitHub
10 changed files with 295 additions and 77 deletions

View File

@@ -1,12 +1,8 @@
language: python language: python
python:
before_install: - "3.6"
- sudo apt-get update
- sudo apt-get install python3
- sudo apt-get install python3-pip
- sudo python3 -m pip install --upgrade pip
# command to install dependencies
install: install:
- python3 -m pip install -r requirements.txt --user - python3.6 -m pip install -r requirements.txt
# command to run tests script:
script: python3 -m pylint --rcfile=pylint.rc bfxapi - pylint --rcfile=pylint.rc bfxapi
- pytest

View File

@@ -102,6 +102,8 @@ The websocket exposes a collection of events that are triggered when certain dat
- `all` (array|json): listen for all messages coming through - `all` (array|json): listen for all messages coming through
- `connected:` () called when a connection is made - `connected:` () called when a connection is made
- `disconnected`: () called when a connection is ended (A reconnect attempt may follow)
- `stopped`: () called when max amount of connection retries is met and the socket is closed
- `authenticated` (): called when the websocket passes authentication - `authenticated` (): called when the websocket passes authentication
- `notification` (array): incoming account notification - `notification` (array): incoming account notification
- `error` (array): error from the websocket - `error` (array): error from the websocket

View File

@@ -5,7 +5,7 @@ import asyncio
from .. import Client, BfxWebsocket from .. import Client, BfxWebsocket
def get_now(): def get_now():
return int(round(time.time() * 1000)) return int(round(time.time() * 1000))
class StubbedWebsocket(BfxWebsocket): class StubbedWebsocket(BfxWebsocket):
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):

View File

@@ -110,3 +110,99 @@ async def test_events_on_cancel_order():
assert close_res.price == 10 assert close_res.price == 10
assert close_res.type == 'EXCHANGE LIMIT' assert close_res.type == 'EXCHANGE LIMIT'
@pytest.mark.asyncio
async def test_closed_callback_on_submit_order_closed():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
async def c(order):
client.ws._emit('c1', order)
callback_wait = EventWatcher.watch(client.ws, 'c1')
# override cid generation
client.ws.orderManager._gen_unqiue_cid = lambda: 123
await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onClose=c)
await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
callback_wait.wait_until_complete()
@pytest.mark.asyncio
async def test_confirmed_callback_on_submit_order_closed():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
async def c(order):
client.ws._emit('c1', order)
callback_wait = EventWatcher.watch(client.ws, 'c1')
# override cid generation
client.ws.orderManager._gen_unqiue_cid = lambda: 123
await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c)
await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
callback_wait.wait_until_complete()
@pytest.mark.asyncio
async def test_confirmed_callback_on_submit_new_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
async def c(order):
client.ws._emit('c1', order)
callback_wait = EventWatcher.watch(client.ws, 'c1')
# override cid generation
client.ws.orderManager._gen_unqiue_cid = lambda: 123
await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c)
await client.ws.publish([0,"on",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
callback_wait.wait_until_complete()
@pytest.mark.asyncio
async def test_confirmed_callback_on_submit_order_update():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
async def c(order):
client.ws._emit('c1', order)
callback_wait = EventWatcher.watch(client.ws, 'c1')
# override cid generation
client.ws.orderManager._gen_unqiue_cid = lambda: 123
await client.ws.update_order(123, price=100, onConfirm=c)
await client.ws.publish([0,"ou",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
callback_wait.wait_until_complete()
@pytest.mark.asyncio
async def test_confirmed_callback_on_submit_cancel_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
async def c(order):
client.ws._emit('c1', order)
callback_wait = EventWatcher.watch(client.ws, 'c1')
# override cid generation
client.ws.orderManager._gen_unqiue_cid = lambda: 123
await client.ws.cancel_order(123, onConfirm=c)
await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
callback_wait.wait_until_complete()
@pytest.mark.asyncio
async def test_confirmed_callback_on_submit_cancel_group_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
async def c(order):
client.ws._emit('c1', order)
callback_wait = EventWatcher.watch(client.ws, 'c1')
# override cid generation
client.ws.orderManager._gen_unqiue_cid = lambda: 123
await client.ws.cancel_order_group(123, onConfirm=c)
await client.ws.publish([0,"oc",[1548262833910,123,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
callback_wait.wait_until_complete()

View File

@@ -0,0 +1,90 @@
import time
import json
import asyncio
from .. import Client, BfxWebsocket
def get_now():
return int(round(time.time() * 1000))
class StubbedWebsocket(BfxWebsocket):
def __new__(cls, *args, **kwargs):
instance = super(StubbedWebsocket, cls).__new__(cls, *args, **kwargs)
instance.sent_items = []
instance.published_items = []
return instance
async def _main(self, host):
print ("Faking wesocket connection to {}".format(host))
def get_ws(self):
return self
async def publish(self, data, is_json=True):
self.published_items += [{
'time': get_now(),
'data': data
}]
# convert to string and push through the websocket
data = json.dumps(data) if is_json else data
return await self.on_message(data)
async def publish_auth_confirmation(self):
return self.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}})
async def send(self, data_string):
self.sent_items += [{
'time': get_now(),
'data': data_string
}]
def get_published_items(self):
return self.published_items
def get_sent_items(self):
return self.sent_items
def get_last_sent_item(self):
return self.sent_items[-1:][0]
def get_sent_items_count(self):
return len(self.sent_items)
class EventWatcher():
def __init__(self, ws, event):
self.value = None
self.event = event
ws.once(event, self._finish)
def _finish(self, value):
self.value = value or {}
@classmethod
def watch(cls, ws, event):
return EventWatcher(ws, event)
def wait_until_complete(self, max_wait_time=5):
counter = 0
while self.value == None:
if counter > 5:
raise Exception('Wait time limit exceeded for event {}'.format(self.event))
time.sleep(1)
counter += 1
return self.value
def create_stubbed_client(*args, **kwargs):
client = Client(*args, **kwargs)
# no support for rest stubbing yet
client.rest = None
client.ws = StubbedWebsocket(*args, **kwargs)
return client
async def ws_publish_auth_accepted(ws):
return await ws.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}})
async def ws_publish_connection_init(ws):
return await ws.publish({"event":"info","version":2,"serverId":"748c00f2-250b-46bb-8519-ce1d7d68e4f0","platform":{"status":1}})
async def ws_publish_conf_accepted(ws, flags_code):
return await ws.publish({"event":"conf","status":"OK","flags":flags_code})

View File

@@ -201,7 +201,7 @@ class BfxWebsocket(GenericWebsocket):
err_string = self.ERRORS[data.get('code', 10000)] err_string = self.ERRORS[data.get('code', 10000)]
err_string = "{} - {}".format(self.ERRORS[data.get('code', 10000)], err_string = "{} - {}".format(self.ERRORS[data.get('code', 10000)],
data.get("msg", "")) data.get("msg", ""))
self._emit('error', Exception(err_string)) self._emit('error', err_string)
async def _system_auth_handler(self, data): async def _system_auth_handler(self, data):
if data.get('status') == 'FAILED': if data.get('status') == 'FAILED':
@@ -310,6 +310,9 @@ class BfxWebsocket(GenericWebsocket):
async def _candle_handler(self, data): async def _candle_handler(self, data):
subscription = self.subscriptionManager.get(data[0]) subscription = self.subscriptionManager.get(data[0])
# if candle data is empty
if data[1] == []:
return
if type(data[1][0]) is list: if type(data[1][0]) is list:
# Process the batch of seed candles on # Process the batch of seed candles on
# websocket subscription # websocket subscription
@@ -385,7 +388,9 @@ class BfxWebsocket(GenericWebsocket):
# enable order book checksums # enable order book checksums
if self.manageOrderBooks: if self.manageOrderBooks:
await self.enable_flag(Flags.CHECKSUM) await self.enable_flag(Flags.CHECKSUM)
# resubscribe to any channels # set any existing subscriptions to not subscribed
self.subscriptionManager.set_all_unsubscribed()
# re-subscribe to existing channels
await self.subscriptionManager.resubscribe_all() await self.subscriptionManager.resubscribe_all()
async def _send_auth_command(self, channel_name, data): async def _send_auth_command(self, channel_name, data):
@@ -426,8 +431,11 @@ class BfxWebsocket(GenericWebsocket):
async def cancel_order(self, *args, **kwargs): async def cancel_order(self, *args, **kwargs):
return await self.orderManager.cancel_order(*args, **kwargs) return await self.orderManager.cancel_order(*args, **kwargs)
async def cancel_order_group(self, *args, **kwargs):
return await self.orderManager.cancel_order_group(*args, **kwargs)
async def cancel_all_orders(self, *args, **kwargs): async def cancel_all_orders(self, *args, **kwargs):
return await self.orderManager.cancel_all_orders(*args, **kwargs) return await self.orderManager.cancel_all_orders(*args, **kwargs)
async def cancel_order_multi(self, *args, **kwargs): async def cancel_order_multi(self, *args, **kwargs):
return await self.cancel_order_multi(*args, **kwargs) return await self.orderManager.cancel_order_multi(*args, **kwargs)

View File

@@ -38,8 +38,11 @@ class GenericWebsocket:
self.loop = loop or asyncio.get_event_loop() self.loop = loop or asyncio.get_event_loop()
self.events = EventEmitter( self.events = EventEmitter(
scheduler=asyncio.ensure_future, loop=self.loop) scheduler=asyncio.ensure_future, loop=self.loop)
# overide 'error' event to stop it raising an exception
# self.events.on('error', self.on_error)
self.ws = None self.ws = None
self.max_retries = max_retries self.max_retries = max_retries
self.attempt_retry = True
def run(self): def run(self):
""" """
@@ -67,18 +70,22 @@ class GenericWebsocket:
async def _main(self, host): async def _main(self, host):
retries = 0 retries = 0
while retries < self.max_retries: while retries < self.max_retries and self.attempt_retry:
try: try:
await self._connect(host) await self._connect(host)
retries = 0 retries = 0
except (ConnectionClosed, socket.error) as e: except (ConnectionClosed, socket.error) as e:
self._emit('disconnected')
if (not self.attempt_retry):
return
self.logger.error(str(e)) self.logger.error(str(e))
retries += 1 retries += 1
# wait 5 seconds befor retrying # wait 5 seconds befor retrying
self.logger.info("Waiting 5 seconds befor retrying...") self.logger.info("Waiting 5 seconds before retrying...")
await asyncio.sleep(5) await asyncio.sleep(5)
self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries)) self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries))
self.logger.info("Unable to connect to websocket.") self.logger.info("Unable to connect to websocket.")
self._emit('stopped')
def remove_all_listeners(self, event): def remove_all_listeners(self, event):
""" """
@@ -111,13 +118,13 @@ class GenericWebsocket:
On websocket error print and fire event On websocket error print and fire event
""" """
self.logger.error(error) self.logger.error(error)
self.events.emit('error', error)
async def on_close(self): async def on_close(self):
""" """
On websocket close print and fire event On websocket close print and fire event. This is used by the data server.
""" """
self.logger.info("Websocket closed.") self.logger.info("Websocket closed.")
self.attempt_retry = False
await self.ws.close() await self.ws.close()
self._emit('done') self._emit('done')

View File

@@ -19,9 +19,13 @@ class OrderManager:
def __init__(self, bfxapi, logLevel='INFO'): def __init__(self, bfxapi, logLevel='INFO'):
self.bfxapi = bfxapi self.bfxapi = bfxapi
self.pending_orders = {} self.pending_orders = {}
self.pending_callbacks = {}
self.closed_orders = {} self.closed_orders = {}
self.open_orders = {} self.open_orders = {}
self.pending_order_close_callbacks = {}
self.pending_order_confirm_callbacks = {}
self.pending_update_confirm_callbacks = {}
self.pending_cancel_confirm_callbacks = {}
self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel) self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel)
def get_open_orders(self): def get_open_orders(self):
@@ -33,30 +37,18 @@ class OrderManager:
def get_pending_orders(self): def get_pending_orders(self):
return list(self.pending_orders.values()) return list(self.pending_orders.values())
async def _confirm_order(self, order, isClosed=False):
"""
Called every time an order signal has been received. This function
manages the local list of open orders.
"""
if order.cid in self.pending_orders:
await self._execute_confirm_callback(order.cid, order)
if isClosed:
await self._execute_close_callback(order.cid, order)
order.set_confirmed()
# remove from pending orders list
del self.pending_orders[order.cid]
self.bfxapi._emit('order_confirmed', order)
else:
await self._execute_confirm_callback(order.id, order)
if isClosed:
await self._execute_close_callback(order.id, order)
async def confirm_order_closed(self, raw_ws_data): async def confirm_order_closed(self, raw_ws_data):
order = Order.from_raw_order(raw_ws_data[2]) order = Order.from_raw_order(raw_ws_data[2])
order.set_open_state(False) order.set_open_state(False)
if order.id in self.open_orders: if order.id in self.open_orders:
del self.open_orders[order.id] del self.open_orders[order.id]
await self._confirm_order(order, isClosed=True) if not order.is_confirmed():
order.set_confirmed()
self.bfxapi._emit('order_confirmed', order)
await self._execute_callback(order, self.pending_order_confirm_callbacks)
await self._execute_callback(order, self.pending_cancel_confirm_callbacks)
await self._execute_callback(order, self.pending_update_confirm_callbacks)
await self._execute_callback(order, self.pending_order_close_callbacks)
self.logger.info("Order closed: {} {}".format( self.logger.info("Order closed: {} {}".format(
order.symbol, order.status)) order.symbol, order.status))
self.bfxapi._emit('order_closed', order) self.bfxapi._emit('order_closed', order)
@@ -77,7 +69,7 @@ class OrderManager:
order = Order.from_raw_order(raw_ws_data[2]) order = Order.from_raw_order(raw_ws_data[2])
order.set_open_state(True) order.set_open_state(True)
self.open_orders[order.id] = order self.open_orders[order.id] = order
await self._confirm_order(order) await self._execute_callback(order, self.pending_update_confirm_callbacks)
self.logger.info("Order update: {}".format(order)) self.logger.info("Order update: {}".format(order))
self.bfxapi._emit('order_update', order) self.bfxapi._emit('order_update', order)
@@ -85,7 +77,9 @@ class OrderManager:
order = Order.from_raw_order(raw_ws_data[2]) order = Order.from_raw_order(raw_ws_data[2])
order.set_open_state(True) order.set_open_state(True)
self.open_orders[order.id] = order self.open_orders[order.id] = order
await self._confirm_order(order) order.set_confirmed()
self.bfxapi._emit('order_confirmed', order)
await self._execute_callback(order, self.pending_order_confirm_callbacks)
self.logger.info("Order new: {}".format(order)) self.logger.info("Order new: {}".format(order))
self.bfxapi._emit('order_new', order) self.bfxapi._emit('order_new', order)
@@ -96,10 +90,11 @@ class OrderManager:
hidden=False, price_trailing=None, price_aux_limit=None, hidden=False, price_trailing=None, price_aux_limit=None,
oco_stop_price=None, close=False, reduce_only=False, oco_stop_price=None, close=False, reduce_only=False,
post_only=False, oco=False, time_in_force=None, post_only=False, oco=False, time_in_force=None,
onConfirm=None, onClose=None, *args, **kwargs): onConfirm=None, onClose=None, gid=None, *args, **kwargs):
""" """
Submit a new order Submit a new order
@param gid: assign the order to a group identitfier
@param symbol: the name of the symbol i.e 'tBTCUSD @param symbol: the name of the symbol i.e 'tBTCUSD
@param price: the price you want to buy/sell at (must be positive) @param price: the price you want to buy/sell at (must be positive)
@param amount: order size: how much you want to buy/sell, @param amount: order size: how much you want to buy/sell,
@@ -133,8 +128,7 @@ class OrderManager:
"price": str(price), "price": str(price),
} }
# caclulate and add flags # caclulate and add flags
flags = self._calculate_flags( flags = self._calculate_flags(hidden, close, reduce_only, post_only, oco)
hidden, close, reduce_only, post_only, oco)
payload['flags'] = flags payload['flags'] = flags
# add extra parameters # add extra parameters
if (price_trailing): if (price_trailing):
@@ -142,19 +136,22 @@ class OrderManager:
if (price_aux_limit): if (price_aux_limit):
payload['price_aux_limit'] = price_aux_limit payload['price_aux_limit'] = price_aux_limit
if (oco_stop_price): if (oco_stop_price):
payload['price_oco_stop'] = oco_stop_price payload['price_oco_stop'] = str(oco_stop_price)
if (time_in_force): if (time_in_force):
payload['tif'] = time_in_force payload['tif'] = time_in_force
if (gid):
payload['gid'] = gid
# submit the order # submit the order
self.pending_orders[cid] = payload self.pending_orders[cid] = payload
self._create_callback(cid, onConfirm=onConfirm, onClose=onClose) self._create_callback(cid, onConfirm, self.pending_order_confirm_callbacks)
self._create_callback(cid, onClose, self.pending_order_close_callbacks)
await self.bfxapi._send_auth_command('on', payload) await self.bfxapi._send_auth_command('on', payload)
self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( self.logger.info("Order cid={} ({} {} @ {}) dispatched".format(
cid, symbol, amount, price)) cid, symbol, amount, price))
async def update_order(self, orderId, price=None, amount=None, delta=None, price_aux_limit=None, async def update_order(self, orderId, price=None, amount=None, delta=None, price_aux_limit=None,
price_trailing=None, hidden=False, close=False, reduce_only=False, price_trailing=None, hidden=False, close=False, reduce_only=False,
post_only=False, time_in_force=None, onConfirm=None, onClose=None): post_only=False, time_in_force=None, onConfirm=None):
""" """
Update an existing order Update an existing order
@@ -176,7 +173,7 @@ class OrderManager:
@param onClose: function called when the bitfinex websocket receives signal that the order @param onClose: function called when the bitfinex websocket receives signal that the order
was closed due to being filled or cancelled was closed due to being filled or cancelled
""" """
self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose) self._create_callback(orderId, onConfirm, self.pending_update_confirm_callbacks)
payload = {"id": orderId} payload = {"id": orderId}
if price is not None: if price is not None:
payload['price'] = str(price) payload['price'] = str(price)
@@ -196,7 +193,7 @@ class OrderManager:
await self.bfxapi._send_auth_command('ou', payload) await self.bfxapi._send_auth_command('ou', payload)
self.logger.info("Update Order order_id={} dispatched".format(orderId)) self.logger.info("Update Order order_id={} dispatched".format(orderId))
async def cancel_order(self, orderId, onConfirm=None, onClose=None): async def cancel_order(self, orderId, onConfirm=None):
""" """
Cancel an existing open order Cancel an existing open order
@@ -207,8 +204,7 @@ class OrderManager:
@param onClose: function called when the bitfinex websocket receives signal that the order @param onClose: function called when the bitfinex websocket receives signal that the order
was closed due to being filled or cancelled was closed due to being filled or cancelled
""" """
# order = self.open_orders[orderId] self._create_callback(orderId, onConfirm, self.pending_cancel_confirm_callbacks)
self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose)
await self.bfxapi._send_auth_command('oc', {'id': orderId}) await self.bfxapi._send_auth_command('oc', {'id': orderId})
self.logger.info("Order cancel order_id={} dispatched".format(orderId)) self.logger.info("Order cancel order_id={} dispatched".format(orderId))
@@ -216,42 +212,55 @@ class OrderManager:
""" """
Cancel all existing open orders Cancel all existing open orders
This function closes orders that have been tracked locally by the OrderManager. This function closes all open orders.
""" """
ids = [self.open_orders[x].id for x in self.open_orders] await self.bfxapi._send_auth_command('oc_multi', { 'all': 1 })
await self.cancel_order_multi(ids)
async def cancel_order_multi(self, orderIds): async def cancel_order_group(self, gid, onConfirm=None):
"""
Cancel a set of orders using a single group id.
"""
self._create_callback(gid, onConfirm, self.pending_cancel_confirm_callbacks)
await self.bfxapi._send_auth_command('oc_multi', { 'gid': [gid] })
async def cancel_order_multi(self, ids=None, gids=None):
""" """
Cancel existing open orders as a batch Cancel existing open orders as a batch
@param orderIds: an array of order ids @param ids: an array of order ids
@param gids: an array of group ids
""" """
task_batch = [] payload = {}
for oid in orderIds: if ids:
task_batch += [ payload['id'] = ids
asyncio.ensure_future(self.open_orders[oid].close()) if gids:
] payload['gid'] = gids
await asyncio.wait(*[task_batch]) await self.bfxapi._send_auth_command('oc_multi', payload)
def _create_callback(self, order_identifier, onConfirm=None, onClose=None): def _create_callback(self, identifier, func, callback_storage):
if order_identifier in self.pending_callbacks: if not func:
self.pending_callbacks[order_identifier] += [(onClose, onConfirm)] return
if identifier in callback_storage:
callback_storage[identifier] += [func]
else: else:
self.pending_callbacks[order_identifier] = [(onClose, onConfirm)] callback_storage[identifier] = [func]
async def _execute_close_callback(self, order_identifier, *args, **kwargs): async def _execute_callback(self, order, callback_storage):
if order_identifier in self.pending_callbacks: idents = [order.id, order.cid, order.gid]
for c in self.pending_callbacks[order_identifier]: tasks = []
if c[0]: key = None
await c[0](*args, **kwargs) for k in callback_storage.keys():
del self.pending_callbacks[order_identifier] if k in idents:
print (callback_storage[k])
async def _execute_confirm_callback(self, order_identifier, *args, **kwargs): key = k
if order_identifier in self.pending_callbacks: # call all callbacks associated with identifier
for c in self.pending_callbacks[order_identifier]: for callback in callback_storage[k]:
if c[1]: tasks += [callback(order)]
await c[1](*args, **kwargs) break
# remove from callbacks
if key:
del callback_storage[key]
await asyncio.gather(*tasks)
def _calculate_flags(self, hidden, close, reduce_only, post_only, oco): def _calculate_flags(self, hidden, close, reduce_only, post_only, oco):
flags = 0 flags = 0

View File

@@ -72,6 +72,13 @@ class SubscriptionManager:
def get(self, chan_id): def get(self, chan_id):
return self.subscriptions_chanid[chan_id] return self.subscriptions_chanid[chan_id]
def set_all_unsubscribed(self):
"""
Sets all f the subscriptions ot state 'unsubscribed'
"""
for sub in self.subscriptions_chanid.values():
sub.confirm_unsubscribe()
async def unsubscribe(self, chan_id, onComplete=None): async def unsubscribe(self, chan_id, onComplete=None):
""" """
Unsubscribe from the channel with the given chanId Unsubscribe from the channel with the given chanId

View File

@@ -3,3 +3,6 @@ asyncio==3.4.3
websockets==7.0 websockets==7.0
pylint==2.2.2 pylint==2.2.2
pytest-asyncio==0.6.0 pytest-asyncio==0.6.0
six==1.12.0
pyee==5.0.0
aiohttp==3.4.4