diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..03ae281 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +*.pyc +.vscode +*.log diff --git a/README.md b/README.md index 96a662c..db8a087 100644 --- a/README.md +++ b/README.md @@ -1 +1,216 @@ -# bitfinex-api-py \ No newline at end of file + +``` +from bfxapi import Client + +bfx = Client( + API_KEY=API_KEY, + API_SECRET=API_SECRET, + logLevel='DEBUG' +) + +@bfx.ws.on('new_trade') +def log_trade(trade): + print ("New trade: {}".format(trade)) + +@bfx.ws.on('connected') +def start(): + bfx.ws.subscribe('trades', 'tBTCUSD') + +bfx.ws.run() +``` + + +# Official Python `bfxapi` +This is an official python library that is used to connect interact with the Bitfinex api. Currently it only has support for websockets but will soon have Rest functionality as well. + +Install dependencies +``` +pip3 install -r requirements.txt +``` +Run the trades/candles example: +``` +cd bfxapi/examples +python3 subsribe_trades_candles.py +``` + +# Features +- Fast websocket connection +- Event based routing +- Subscribe to trade, candles and orderbook channels +- Authenticate with api key/secret +- Orderbook checksum validation +- Create, update and close orders +- Track wallet updates + +# Quickstart + +### Authenticate + +``` +bfx = Client( + API_KEY='' + API_SECRET='' +) + +@bfx.ws.on('authenticated') +async def do_something(): + print ("Success!") + +bfx.ws.run() +``` + +### Submit limit order + +``` +await bfx.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE LIMIT') +``` + +### Listen for completion +``` +@bfx.ws.on('order_confirmed') +async def order_completed(order, trade): + print ("Order has been confrmed") +``` + +### Get wallets +``` +wallets = bfxapi.wallets.get_wallets() +# [ Wallet <'exchange_BTC' balance='41.25809589' unsettled='0'>, +# Wallet <'exchange_USD' balance='62761.86070104' unsettled='0'> ] +``` + +### Close all orders +``` +await bfx.ws.close_all_orders() +``` + + +## `bfxapi.ws` events +The websocket exposes a collection of events that are triggered when certain data is received. When subscribing to an event you are able to pass either a standard function or an asyncio co-routine Here is a full list of available events: + +- `all` (array|json): listen for all messages coming through +- `connected:` () called when a connection is made +- `authenticated` (): called when the websocket passes authentication +- `notification` (array): incoming account notification +- `error` (array): error from the websocket +- `order_closed` (Order, Trade): when an order has been closed +- `order_new` (Order, Trade): when an order has been created but not closed. Note: will not be called if order is executed and filled instantly +- `order_confirmed` (Order, Trade): When an order has been submitted and received +- `wallet_snapshot` (array[Wallet]): Initial wallet balances (Fired once) +- `order_snapshot` (array[Order]): Initial open orders (Fired once) +- `positions_snapshot` (array): Initial open positions (Fired once) +- `wallet_update` (Wallet): changes to the balance of wallets +- `seed_candle` (json): initial past candle to prime strategy +- `seed_trade` (json): initial past trade to prime strategy +- `funding_offer_snapshot` (array): opening funding offer balances +- `funding_loan_snapshot` (array): opening funding loan balances +- `funding_credit_snapshot` (array): opening funding credit balances +- `balance_update` (array): when the state of a balance is changed +- `new_trade` (array): a new trade on the market has been executed +- `new_candle` (array): a new candle has been produced +- `margin_info_updates` (array): new margin information has been broadcasted +- `funding_info_updates` (array): new funding information has been broadcasted +- `order_book_snapshot` (array): initial snapshot of the order book on connection +- `order_book_update` (array): a new order has been placed into the ordebrook +- `subscribed` (Subscription): a new channel has been subscribed to +- `unsubscribed` (Subscription): a channel has been un-subscribed + + + + +# `bfxapi.ws` interface + +#### `on(event, function)` + + Subscribe a given function to be called when an event fires + +#### `once(event, function)` + + Subscribes the function to the given event but only triggers once. + +#### `subscribe(channel_name, symbol, timeframe=None, **kwargs)` + + Subscribes the socket to a data feed such as 'trades' or 'candles'. + +#### `unsubscribe(chanId, onComplete=None)` + + Unsubscribes from the given data feed + +#### `resubscribe(chanId, onComplete=None)` + + Unsubscribes and then subscribes to the given data feed. Usually used to trigger a snapshot response from the API. + +#### `unsubscribe_all()` + + Unsubscribe from all data feeds. + +#### `resubscribe_all(chanId, onComplete=None)` + + Unsubscribe and subscribe to all data feeds + +#### `submit_order(symbol, price, amount, market_type, hidden=False, onComplete=None, onError=None, *args, **kwargs)` + + Submits an order to the Bitfinex api. If the order is successful then the order_closed event will be triggered and the onComplete function will also be called if provided in the parameters. + +#### `update_order(orderId, price=None, amount=None, delta=None, price_aux_limit=None, price_trailing=None, flags=None, time_in_force=None, onComplete=None, onError=None)` + + Attempts to update an order with the given values. If the order is no longer open then the update will be ignored. + +#### `close_order(self, orderId, onComplete=None, onError=None):` + + Close the order with the given orderId if it still open. + +#### `close_all_orders()` + + Tells the OrderManager to close all of the open orders + +#### `close_orders_multi(self, orderIds)` + + Takes an array of orderIds and closes them all. + +# `bfxapi.wallets` + +### `get_wallets()` + + Returns an array of wallets that represent the users balance in the different currencies + +# `Order obj` + +### `close()` + + Signals Bitfinex to close the order + +### `update(self, price=None, amount=None, delta=None, price_aux_limit=None, price_trailing=None, flags=None, time_in_force=None)` + + Signals Bitfinex to update the order with the given values + +### `isOpen()` + + Returns true if the order has not been closed + +### `isPending()` + + Returns true if Bitfinex has not responded to confirm the order has been received + +### `isConfirmed()` + + Returns true if Bitfinex has responded to confirm the order + +# `Subscription obj` + +### `subscribe()` + + Sends a subscribe command to start receiving data + +### `unsubscribe()` + + Sends a unsubscribe command to stop receiving data + +### `is_subscribed()` + + Returns true if the subscription is open and receiving data + + +# Examples + +For more info on how to use this library please see the example scripts in the `bfxapi/examples` directory. diff --git a/bfxapi/Client.py b/bfxapi/Client.py new file mode 100644 index 0000000..9253d0d --- /dev/null +++ b/bfxapi/Client.py @@ -0,0 +1,13 @@ +import asyncio + +from .websockets.BfxWebsocket import BfxWebsocket +from .rest.BfxRest import BfxRest + +class Client: + def __init__(self, API_KEY=None, API_SECRET=None, rest_host='https://api.bitfinex.com/v2', + ws_host='wss://api.bitfinex.com/ws/2', loop=None, logLevel='INFO', *args, **kwargs): + self.loop = loop or asyncio.get_event_loop() + self.ws = BfxWebsocket(API_KEY=API_KEY, API_SECRET=API_SECRET, host=ws_host, + loop=self.loop, logLevel=logLevel, *args, **kwargs) + self.rest = BfxRest(API_KEY=API_KEY, API_SECRET=API_SECRET, host=rest_host, + loop=self.loop, logLevel=logLevel, *args, **kwargs) diff --git a/bfxapi/__init__.py b/bfxapi/__init__.py new file mode 100644 index 0000000..71fb092 --- /dev/null +++ b/bfxapi/__init__.py @@ -0,0 +1,4 @@ +name = 'bfxapi' + +from bfxapi.Client import Client +from bfxapi.websockets.GenericWebsocket import GenericWebsocket diff --git a/bfxapi/examples/candel_order.py b/bfxapi/examples/candel_order.py new file mode 100644 index 0000000..6571e25 --- /dev/null +++ b/bfxapi/examples/candel_order.py @@ -0,0 +1,38 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +API_KEY=os.getenv("BFX_KEY") +API_SECRET=os.getenv("BFX_SECRET") + +bfx = Client( + API_KEY=API_KEY, + API_SECRET=API_SECRET, + logLevel='INFO' +) + +@bfx.ws.on('order_closed') +def order_cancelled(order, trade): + print ("Order cancelled.") + print (order) + print (trade) + +@bfx.ws.on('order_confirmed') +async def trade_completed(order, trade): + print ("Order confirmed.") + print (order) + print (trade) + await bfx.ws.cancel_order(order.id) + +@bfx.ws.on('error') +def log_error(msg): + print ("Error: {}".format(msg)) + +@bfx.ws.once('authenticated') +async def submit_order(auth_message): + # create an inital order a really low price so it stays open + await bfx.ws.submit_order('tBTCUSD', 10, 1, 'EXCHANGE LIMIT') + +bfx.ws.run() diff --git a/bfxapi/examples/connect.py b/bfxapi/examples/connect.py new file mode 100644 index 0000000..c37de49 --- /dev/null +++ b/bfxapi/examples/connect.py @@ -0,0 +1,19 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +bfx = Client( + logLevel='DEBUG' +) + +@bfx.ws.on('error') +def log_error(msg): + print ("Error: {}".format(msg)) + +@bfx.ws.on('all') +async def log_output(output): + print ("WS: {}".format(output)) + +bfx.ws.run() diff --git a/bfxapi/examples/get_seed_trades.py b/bfxapi/examples/get_seed_trades.py new file mode 100644 index 0000000..154c306 --- /dev/null +++ b/bfxapi/examples/get_seed_trades.py @@ -0,0 +1,16 @@ +import os +import sys +import asyncio +sys.path.append('../') + +from bfxapi import Client + +bfx = Client( + logLevel='INFO' +) + +async def get_seeds(): + candles = await bfx.rest.get_seed_candles('tBTCUSD') + print (candles) + +asyncio.get_event_loop().run_until_complete(get_seeds()) diff --git a/bfxapi/examples/resubscribe_orderbook.py b/bfxapi/examples/resubscribe_orderbook.py new file mode 100644 index 0000000..a48c3db --- /dev/null +++ b/bfxapi/examples/resubscribe_orderbook.py @@ -0,0 +1,37 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +bfx = Client( + logLevel='INFO' +) + +@bfx.ws.on('error') +def log_error(err): + print ("Error: {}".format(err)) + +@bfx.ws.on('unsubscribed') +async def on_unsubscribe(subscription): + print ("Unsubscribed from {}".format(subscription.symbol)) + # await subscription.subscribe() + +@bfx.ws.on('subscribed') +async def on_subscribe(subscription): + print ("Subscribed to {}".format(subscription.symbol)) + # await subscription.unsubscribe() + # or + # await bfx.ws.unsubscribe(subscription.chanId) + +@bfx.ws.once('subscribed') +async def on_once_subscribe(subscription): + print ("Performig resubscribe") + await bfx.ws.resubscribe(subscription.chanId) + + +async def start(): + await bfx.ws.subscribe('book', 'tBTCUSD') + +bfx.ws.on('connected', start) +bfx.ws.run() diff --git a/bfxapi/examples/send_order.py b/bfxapi/examples/send_order.py new file mode 100644 index 0000000..a02d607 --- /dev/null +++ b/bfxapi/examples/send_order.py @@ -0,0 +1,48 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +API_KEY=os.getenv("BFX_KEY") +API_SECRET=os.getenv("BFX_SECRET") + +bfx = Client( + API_KEY=API_KEY, + API_SECRET=API_SECRET, + logLevel='DEBUG' +) + +@bfx.ws.on('order_snapshot') +async def close_all(data): + await bfx.ws.close_all_orders() + +@bfx.ws.on('order_confirmed') +async def trade_completed(order, trade): + print ("Order confirmed.") + print (order) + print (trade) + ## close the order + # await order.close() + # or + # await bfx.ws.close_order(order.id) + # or + # await bfx.ws.close_all_orders() + +@bfx.ws.on('error') +def log_error(msg): + print ("Error: {}".format(msg)) + +@bfx.ws.on('authenticated') +async def submit_order(auth_message): + await bfx.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE LIMIT') + +# If you dont want to use a decorator +# ws.on('authenticated', submit_order) +# ws.on('error', log_error) + +# You can also provide a callback +# await ws.submit_order('tBTCUSD', 0, 0.01, +# 'EXCHANGE MARKET', onComplete=trade_complete) + +bfx.ws.run() diff --git a/bfxapi/examples/subscribe_orderbook.py b/bfxapi/examples/subscribe_orderbook.py new file mode 100644 index 0000000..153fef5 --- /dev/null +++ b/bfxapi/examples/subscribe_orderbook.py @@ -0,0 +1,31 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +bfx = Client( + logLevel='DEBUG', + # Verifies that the local orderbook is up to date + # with the bitfinex servers + manageOrderBooks=True +) + +@bfx.ws.on('error') +def log_error(err): + print ("Error: {}".format(err)) + +@bfx.ws.on('order_book_update') +def log_update(data): + print ("Book update: {}".format(data)) + +@bfx.ws.on('order_book_snapshot') +def log_snapshot(data): + print ("Initial book: {}".format(data)) + +async def start(): + await bfx.ws.subscribe('book', 'tBTCUSD') + # bfx.ws.subscribe('book', 'tETHUSD') + +bfx.ws.on('connected', start) +bfx.ws.run() diff --git a/bfxapi/examples/subscribe_trades_candles.py b/bfxapi/examples/subscribe_trades_candles.py new file mode 100644 index 0000000..1b1dd27 --- /dev/null +++ b/bfxapi/examples/subscribe_trades_candles.py @@ -0,0 +1,28 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +bfx = Client( + logLevel='DEBUG' +) + +@bfx.ws.on('error') +def log_error(err): + print ("Error: {}".format(err)) + +@bfx.ws.on('new_candle') +def log_candle(candle): + print ("New candle: {}".format(candle)) + +@bfx.ws.on('new_trade') +def log_trade(trade): + print ("New trade: {}".format(trade)) + +async def start(): + await bfx.ws.subscribe('candles', 'tBTCUSD', timeframe='1m') + await bfx.ws.subscribe('trades', 'tBTCUSD') + +bfx.ws.on('connected', start) +bfx.ws.run() diff --git a/bfxapi/examples/update_order.py b/bfxapi/examples/update_order.py new file mode 100644 index 0000000..2fe4f2a --- /dev/null +++ b/bfxapi/examples/update_order.py @@ -0,0 +1,43 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +API_KEY=os.getenv("BFX_KEY") +API_SECRET=os.getenv("BFX_SECRET") + +bfx = Client( + API_KEY=API_KEY, + API_SECRET=API_SECRET, + logLevel='DEBUG' +) + +@bfx.ws.on('order_update') +def order_updated(order, trade): + print ("Order updated.") + print (order) + print (trade) + +@bfx.ws.once('order_update') +async def order_once_updated(order, trade): + # update a second time using the object function + await order.update(price=80, amount=0.02, flags="2nd update") + +@bfx.ws.once('order_confirmed') +async def trade_completed(order, trade): + print ("Order confirmed.") + print (order) + print (trade) + await bfx.ws.update_order(order.id, price=100, amount=0.01) + +@bfx.ws.on('error') +def log_error(msg): + print ("Error: {}".format(msg)) + +@bfx.ws.once('authenticated') +async def submit_order(auth_message): + # create an inital order a really low price so it stays open + await bfx.ws.submit_order('tBTCUSD', 10, 1, 'EXCHANGE LIMIT') + +bfx.ws.run() diff --git a/bfxapi/examples/wallet_balance.py b/bfxapi/examples/wallet_balance.py new file mode 100644 index 0000000..0d9ab35 --- /dev/null +++ b/bfxapi/examples/wallet_balance.py @@ -0,0 +1,31 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +API_KEY=os.getenv("BFX_KEY") +API_SECRET=os.getenv("BFX_SECRET") + +bfx = Client( + API_KEY=API_KEY, + API_SECRET=API_SECRET, + logLevel='INFO' +) + +@bfx.ws.on('wallet_snapshot') +def log_snapshot(wallets): + for wallet in wallets: + print (wallet) + + # or bfx.ws.wallets.get_wallets() + +@bfx.ws.on('wallet_update') +def log_update(wallet): + print ("Balance updates: {}".format(wallet)) + +@bfx.ws.on('error') +def log_error(msg): + print ("Error: {}".format(msg)) + +bfx.ws.run() diff --git a/bfxapi/models/Order.py b/bfxapi/models/Order.py new file mode 100644 index 0000000..1da0a7c --- /dev/null +++ b/bfxapi/models/Order.py @@ -0,0 +1,92 @@ +import time + +class OrderClosedModel: + ID = 0 + GID = 1 + CID = 2 + SYMBOL = 3 + MTS_CREATE = 4 + MTS_UPDATE = 5 + AMOUNT = 6 + AMOUNT_ORIG = 7 + TYPE = 8 + TYPE_PREV = 9 + FLAGS = 12 + STATUS = 13 + PRICE = 16 + PRIVE_AVG = 17 + PRICE_TRAILING = 18 + PRICE_AUX_LIMIT = 19 + NOTIFY = 23 + PLACE_ID = 25 + +def now_in_mills(): + return int(round(time.time() * 1000)) + +class Order: + def __init__(self, bfxapi, closingOrderArray): + self.bfxapi = bfxapi + self.id = closingOrderArray[OrderClosedModel.ID] + self.gId = closingOrderArray[OrderClosedModel.GID] + self.cId = closingOrderArray[OrderClosedModel.CID] + self.symbol = closingOrderArray[OrderClosedModel.SYMBOL] + self.mtsCreate = closingOrderArray[OrderClosedModel.MTS_CREATE] + self.mtsUpdate = closingOrderArray[OrderClosedModel.MTS_UPDATE] + self.amount = closingOrderArray[OrderClosedModel.AMOUNT] + self.amountOrig = closingOrderArray[OrderClosedModel.AMOUNT_ORIG] + self.type = closingOrderArray[OrderClosedModel.TYPE] + self.typePrev = closingOrderArray[OrderClosedModel.TYPE_PREV] + self.flags = closingOrderArray[OrderClosedModel.FLAGS] + self.status = closingOrderArray[OrderClosedModel.STATUS] + self.price = closingOrderArray[OrderClosedModel.PRICE] + self.priceAvg = closingOrderArray[OrderClosedModel.PRIVE_AVG] + self.priceTrailing = closingOrderArray[OrderClosedModel.PRICE_TRAILING] + self.priceAuxLimit = closingOrderArray[OrderClosedModel.PRICE_AUX_LIMIT] + self.notfiy = closingOrderArray[OrderClosedModel.NOTIFY] + self.placeId = closingOrderArray[OrderClosedModel.PLACE_ID] + self.is_pending_bool = True + self.is_confirmed_bool = False + self.is_open_bool = False + + async def update(self, price=None, amount=None, delta=None, price_aux_limit=None, + price_trailing=None, flags=None, time_in_force=None): + payload = { "id": self.id } + if price is not None: + payload['price'] = str(price) + if amount is not None: + payload['amount'] = str(amount) + if delta is not None: + payload['delta'] = str(delta) + if price_aux_limit is not None: + payload['price_aux_limit'] = str(price_aux_limit) + if price_trailing is not None: + payload['price_trailing'] = str(price_trailing) + if flags is not None: + payload['flags'] = str(flags) + if time_in_force is not None: + payload['time_in_force'] = str(time_in_force) + await self.bfxapi._send_auth_command('ou', payload) + + async def close(self): + await self.bfxapi._send_auth_command('oc', { 'id': self.id }) + + def set_confirmed(self): + self.is_pending_bool = False + self.is_confirmed_bool = True + + def set_open_state(self, isOpen): + self.is_open_bool = isOpen + + def isOpen(self): + return self.is_open_bool + + def isPending(self): + return self.is_pending_bool + + def isConfirmed(self): + return self.is_confirmed_bool + + def __str__(self): + ''' Allow us to print the Order object in a pretty format ''' + return "Order <'{0}' mtsCreate={1} {2}>".format(self.symbol, self.mtsCreate, + self.status) diff --git a/bfxapi/models/OrderBook.py b/bfxapi/models/OrderBook.py new file mode 100644 index 0000000..93ca4d6 --- /dev/null +++ b/bfxapi/models/OrderBook.py @@ -0,0 +1,84 @@ +import zlib + +def preparePrice(price): + # convert to 4 significant figures + prepPrice = '{0:.4f}'.format(price) + # remove decimal place if zero float + return '{0:g}'.format(float(prepPrice)) + +class OrderBook: + + def __init__(self): + self.asks = [] + self.bids = [] + + def updateFromSnapshot(self, data): + # [[4642.3, 1, 4.192], [4641.5, 1, 1]] + for order in data: + if len(order) is 4: + if order[3] < 0: + self.bids += [order] + else: + self.asks += [order] + else: + if order[2] < 0: + self.asks += [order] + else: + self.bids += [order] + + def updateWith(self, order): + if len(order) is 4: + amount = order[3] + count = order[2] + side = self.bids if amount < 0 else self.asks + else: + amount = order[2] + side = self.asks if amount < 0 else self.bids + count = order[1] + price = order[0] + + # if first item in ordebook + if len(side) is 0: + side += [order] + return + + # match price level + for index, sOrder in enumerate(side): + sPrice = sOrder[0] + if sPrice == price: + if count is 0: + del side[index] + return + else: + # remove but add as new below + del side[index] + + # if ob is initialised w/o all price levels + if count is 0: + return + + # add to book and sort lowest to highest + side += [order] + side.sort(key=lambda x: x[0], reverse=not amount < 0) + return + + def checksum(self): + data = [] + # take set of top 25 bids/asks + for index in range(0, 25): + if index < len(self.bids): + bid = self.bids[index] + price = bid[0] + amount = bid[3] if len(bid) is 4 else bid[2] + data += [preparePrice(price)] + data += [str(amount)] + if index < len(self.asks): + ask = self.asks[index] + price = ask[0] + amount = ask[3] if len(ask) is 4 else ask[2] + data += [preparePrice(price)] + data += [str(amount)] + checksumStr = ':'.join(data) + # calculate checksum and force signed integer + checksum = zlib.crc32(checksumStr.encode('utf8')) & 0xffffffff + return checksum diff --git a/bfxapi/models/Subscription.py b/bfxapi/models/Subscription.py new file mode 100644 index 0000000..572575a --- /dev/null +++ b/bfxapi/models/Subscription.py @@ -0,0 +1,45 @@ +import time +import json + +class Subscription: + + def __init__(self, ws, channel_name, symbol, timeframe=None, **kwargs): + self.ws = ws + self.channel_name = channel_name + self.symbol = symbol + self.timeframe = timeframe + self.is_subscribed_bool = False + self.key = None + if timeframe: + self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) + self.sub_id = int(round(time.time() * 1000)) + self.send_payload = self._generate_payload(**kwargs) + + async def subscribe(self): + await self.ws.send(json.dumps(self.get_send_payload())) + + async def unsubscribe(self): + if not self.is_subscribed(): + raise Exception("Subscription is not subscribed to websocket") + payload = { 'event': 'unsubscribe', 'chanId': self.chanId } + await self.ws.send(json.dumps(payload)) + + def confirm_subscription(self, chanId): + self.is_subscribed_bool = True + self.chanId = chanId + + def confirm_unsubscribe(self): + self.is_subscribed_bool = False + + def is_subscribed(self): + return self.is_subscribed_bool + + def _generate_payload(self, **kwargs): + payload = { 'event': 'subscribe', 'channel': self.channel_name, 'symbol': self.symbol } + if self.timeframe: + payload['key'] = self.key + payload.update(**kwargs) + return payload + + def get_send_payload(self): + return self.send_payload diff --git a/bfxapi/models/Trade.py b/bfxapi/models/Trade.py new file mode 100644 index 0000000..8fa60b7 --- /dev/null +++ b/bfxapi/models/Trade.py @@ -0,0 +1,20 @@ +import datetime + +class Trade: + SHORT = 'SHORT' + LONG = 'LONG' + + def __init__(self, order, tag=''): + self.order = order + self.amount = order.amount + self.price = order.priceAvg + self.fee = (order.priceAvg * abs(order.amount)) * 0.002 + self.mts = order.mtsCreate + self.date = datetime.datetime.fromtimestamp(order.mtsCreate/1000.0) + self.direction = self.SHORT if order.amount < 0 else self.LONG + self.tag = tag + + def __str__(self): + ''' Allow us to print the Trade object in a pretty format ''' + return "Trade {} @ {} fee={} ".format( + self.amount, self.price, self.fee, self.order) diff --git a/bfxapi/models/Wallet.py b/bfxapi/models/Wallet.py new file mode 100644 index 0000000..abb5c07 --- /dev/null +++ b/bfxapi/models/Wallet.py @@ -0,0 +1,19 @@ + +class Wallet: + + def __init__(self, wType, currency, balance, unsettled_interest): + self.type = wType + self.currency = currency + self.balance = balance + self.unsettled_interest = unsettled_interest + self.key = "{}_{}".format(wType, currency) + + def set_balance(self, data): + self.balance = data + + def set_unsettled_interest(self, data): + self.unsettled_interest = data + + def __str__(self): + return "Wallet <'{}_{}' balance='{}' unsettled='{}'>".format( + self.type, self.currency, self.balance, self.unsettled_interest) diff --git a/bfxapi/models/__init__.py b/bfxapi/models/__init__.py new file mode 100644 index 0000000..19d1e18 --- /dev/null +++ b/bfxapi/models/__init__.py @@ -0,0 +1,7 @@ +name = 'models' + +from .Order import * +from .Trade import * +from .OrderBook import * +from .Subscription import * +from .Wallet import * diff --git a/bfxapi/rest/BfxRest.py b/bfxapi/rest/BfxRest.py new file mode 100644 index 0000000..aad0f97 --- /dev/null +++ b/bfxapi/rest/BfxRest.py @@ -0,0 +1,45 @@ +import asyncio +import aiohttp +import time +import json + +from ..utils.CustomLogger import CustomLogger + +class BfxRest: + + def __init__(self, API_KEY, API_SECRET, host='https://api.bitfinex.com/v2', loop=None, + logLevel='INFO', *args, **kwargs): + self.loop = loop or asyncio.get_event_loop() + self.host = host + self.logger = CustomLogger('BfxRest', logLevel=logLevel) + + async def fetch(self, endpoint): + url = '{}/{}'.format(self.host, endpoint) + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + text = await resp.text() + if resp.status is not 200: + raise Exception('Unable to seed trades. Received status {} - {}' + .format(resp.status, text)) + return json.loads(text) + + async def get_seed_candles(self, symbol): + endpoint = 'candles/trade:1m:{}/hist?limit=5000&_bfx=1'.format(symbol) + time_difference = (1000 * 60) * 5000 + # get now to the nearest min + now = int(round((time.time() // 60 * 60) * 1000)) + task_batch = [] + for x in range(0, 10): + start = x * time_difference + end = now - (x * time_difference) - time_difference + e2 = endpoint + '&start={}&end={}'.format(start, end) + task_batch += [asyncio.ensure_future(self.fetch(e2))] + self.logger.info("Downloading seed candles from Bitfinex...") + # call all fetch requests async + done, _ = await asyncio.wait(*[ task_batch ]) + candles = [] + for task in done: + candles += task.result() + candles.sort(key=lambda x: x[0], reverse=True) + self.logger.info("Downloaded {} candles.".format(len(candles))) + return candles diff --git a/bfxapi/utils/CustomLogger.py b/bfxapi/utils/CustomLogger.py new file mode 100644 index 0000000..7ee622d --- /dev/null +++ b/bfxapi/utils/CustomLogger.py @@ -0,0 +1,80 @@ +import logging + +RESET_SEQ = "\033[0m" +COLOR_SEQ = "\033[1;%dm" +BOLD_SEQ = "\033[1m" +UNDERLINE_SEQ = "\033[04m" + +YELLOW = '\033[93m' +WHITE = '\33[37m' +BLUE = '\033[34m' +LIGHT_BLUE = '\033[94m' +RED = '\033[91m' +GREY = '\33[90m' + +KEYWORD_COLORS = { + 'WARNING': YELLOW, + 'INFO': LIGHT_BLUE, + 'DEBUG': WHITE, + 'CRITICAL': YELLOW, + 'ERROR': RED, + 'TRADE': '\33[102m\33[30m' +} + +def formatter_message(message, use_color = True): + if use_color: + message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ) + else: + message = message.replace("$RESET", "").replace("$BOLD", "") + return message + +def format_word(message, word, color_seq, bold=False, underline=False): + replacer = color_seq + word + RESET_SEQ + if underline: + replacer = UNDERLINE_SEQ + replacer + if bold: + replacer = BOLD_SEQ + replacer + return message.replace(word, replacer) + +class Formatter(logging.Formatter): + ''' + This Formatted simply colors in the levelname i.e 'INFO', 'DEBUG' + ''' + def __init__(self, msg, use_color = True): + logging.Formatter.__init__(self, msg) + self.use_color = use_color + + def format(self, record): + levelname = record.levelname + if self.use_color and levelname in KEYWORD_COLORS: + levelname_color = KEYWORD_COLORS[levelname] + levelname + RESET_SEQ + record.levelname = levelname_color + record.name = GREY + record.name + RESET_SEQ + return logging.Formatter.format(self, record) + +class CustomLogger(logging.Logger): + ''' + This adds extra logging functions such as logger.trade and also + sets the logger to use the custom formatter + ''' + FORMAT = "[$BOLD%(name)s$RESET] [%(levelname)s] %(message)s" + COLOR_FORMAT = formatter_message(FORMAT, True) + TRADE = 50 + + def __init__(self, name, logLevel='DEBUG'): + logging.Logger.__init__(self, name, logLevel) + color_formatter = Formatter(self.COLOR_FORMAT) + console = logging.StreamHandler() + console.setFormatter(color_formatter) + self.addHandler(console) + logging.addLevelName(self.TRADE, "TRADE") + return + + def trade(self, message, *args, **kws): + if self.isEnabledFor(self.TRADE): + message = format_word(message, 'CLOSED ', YELLOW, bold=True) + message = format_word(message, 'OPENED ', LIGHT_BLUE, bold=True) + message = format_word(message, 'UPDATED ', BLUE, bold=True) + message = format_word(message, 'CLOSED_ALL ', RED, bold=True) + # Yes, logger takes its '*args' as 'args'. + self._log(self.TRADE, message, args, **kws) diff --git a/bfxapi/websockets/BfxWebsocket.py b/bfxapi/websockets/BfxWebsocket.py new file mode 100644 index 0000000..e1081bd --- /dev/null +++ b/bfxapi/websockets/BfxWebsocket.py @@ -0,0 +1,458 @@ +import asyncio +import json +import time +import hashlib +import hmac +import random + +from .GenericWebsocket import GenericWebsocket, AuthError +from .SubscriptionManager import SubscriptionManager +from .WalletManager import WalletManager +from .OrderManager import OrderManager +from ..models import Order, Trade, OrderBook + +class Flags: + DEC_S = 9 + TIME_S = 32 + TIMESTAMP = 32768 + SEQ_ALL = 65536 + CHECKSUM = 131072 + + strings = { + 9: 'DEC_S', + 32: 'TIME_S', + 32768: 'TIMESTAMP', + 65536: 'SEQ_ALL', + 131072: 'CHECKSUM' + } + +def _parse_candle(cData, symbol, tf): + return { + 'mts': cData[0], + 'open': cData[1], + 'close': cData[2], + 'high': cData[3], + 'low': cData[4], + 'volume': cData[5], + 'symbol': symbol, + 'tf': tf + } + +def _parse_trade_snapshot_item(tData, symbol): + return { + 'mts': tData[3], + 'price': tData[4], + 'amount': tData[5], + 'symbol': symbol + } + + +def _parse_trade(tData, symbol): + return { + 'mts': tData[1], + 'price': tData[3], + 'amount': tData[2], + 'symbol': symbol + } + +class BfxWebsocket(GenericWebsocket): + ''' + More complex websocket that heavily relies on the btfxwss module. This websocket requires + authentication and is capable of handling orders. + https://github.com/Crypto-toolbox/btfxwss + + Translation names: + + translation table for channel names: + Data Channels + os - Orders + hos - Historical Orders + ps - Positions + hts - Trades (snapshot) + te - Trade Executed + tu - Trade Execution update + ws - Wallets + bu - Balance Info + miu - Margin Info + fiu - Funding Info + fos - Offers + hfos - Historical Offers + fcs - Credits + hfcs - Historical Credits + fls - Loans + hfls - Historical Loans + htfs - Funding Trades + n - Notifications (WIP) + + Events: + - all: listen for all messages coming through + - connected: called when a connection is made + - authenticated: called when the websocket passes authentication + - notification (array): incoming account notification + - error (string): error from the websocket + - order_closed (Order, Trade): when an order has been closed + - order_new (Order, Trade): when an order has been created but not closed. Note: will + not be called if order is executed and filled instantly + - order_confirmed (Order, Trade): when an order has been submitted and received + - wallet_snapshot (string): Initial wallet balances (Fired once) + - order_snapshot (string): Initial open orders (Fired once) + - positions_snapshot (string): Initial open positions (Fired once) + - wallet_update (string): changes to the balance of wallets + - seed_candle (candleArray): initial past candle to prime strategy + - seed_trade (tradeArray): initial past trade to prime strategy + - funding_offer_snapshot: + - funding_loan_snapshot: + - funding_credit_snapshot: + - balance_update when the state of a balance is changed + - new_trade: a new trade on the market has been executed + - new_candle: a new candle has been produced + - margin_info_update: new margin information has been broadcasted + - funding_info_update: new funding information has been broadcasted + ''' + + ERRORS = { + 10000: 'Unknown event', + 10001: 'Generic error', + 10008: 'Concurrency error', + 10020: 'Request parameters error', + 10050: 'Configuration setup failed', + 10100: 'Failed authentication', + 10111: 'Error in authentication request payload', + 10112: 'Error in authentication request signature', + 10113: 'Error in authentication request encryption', + 10114: 'Error in authentication request nonce', + 10200: 'Error in un-authentication request', + 10300: 'Subscription Failed (generic)', + 10301: 'Already Subscribed', + 10302: 'Unknown channel', + 10400: 'Subscription Failed (generic)', + 10401: 'Not subscribed', + 11000: 'Not ready, try again later', + 20000: 'User is invalid!', + 20051: 'Websocket server stopping', + 20060: 'Websocket server resyncing', + 20061: 'Websocket server resync complete' + } + + def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2', + onSeedCandleHook=None, onSeedTradeHook=None, manageOrderBooks=False, logLevel='INFO', *args, **kwargs): + self.API_KEY=API_KEY + self.API_SECRET=API_SECRET + self.manageOrderBooks = manageOrderBooks + self.pendingOrders = {} + self.orderBooks = {} + + super(BfxWebsocket, self).__init__(host, *args, **kwargs) + self.subscriptionManager = SubscriptionManager(self, logLevel=logLevel) + self.orderManager = OrderManager(self, logLevel=logLevel) + self.wallets = WalletManager() + + self._WS_DATA_HANDLERS = { + 'tu': self._trade_update_handler, + 'wu': self._wallet_update_handler, + 'hb': self._heart_beat_handler, + 'te': self._trade_executed_handler, + 'oc': self._order_closed_handler, + 'ou': self._order_update_handler, + 'on': self._order_new_handler, + 'os': self._order_snapshot_handler, + 'ws': self._wallet_snapshot_handler, + 'ps': self._position_snapshot_handler, + 'fos': self._funding_offer_snapshot_handler, + 'fcs': self._funding_credit_snapshot_handler, + 'fls': self._funding_load_snapshot_handler, + 'bu': self._balance_update_handler, + 'n': self._notification_handler, + 'miu': self._margin_info_update_handler, + 'fiu': self._funding_info_update_handler + } + + self._WS_SYSTEM_HANDLERS = { + 'info': self._system_info_handler, + 'subscribed': self._system_subscribed_handler, + 'unsubscribed': self._system_unsubscribe_handler, + 'error': self._system_error_handler, + 'auth': self._system_auth_handler, + 'conf': self._system_conf_handler + } + + async def _ws_system_handler(self, msg): + eType = msg.get('event') + if eType in self._WS_SYSTEM_HANDLERS: + await self._WS_SYSTEM_HANDLERS[eType](msg) + else: + self.logger.warn("Unknown websocket event: '{}' {}".format(eType, msg)) + + async def _ws_data_handler(self, data): + dataEvent = data[1] + chanId = data[0] + + if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS: + return await self._WS_DATA_HANDLERS[dataEvent](data) + elif self.subscriptionManager.is_subscribed(chanId): + subscription = self.subscriptionManager.get(chanId) + # candles do not have an event + if subscription.channel_name == 'candles': + await self._candle_handler(data) + if subscription.channel_name == 'book': + await self._order_book_handler(data) + else: + self.logger.warn("Unknown data event: '{}' {}".format(dataEvent, data)) + + async def _system_info_handler(self, data): + self.logger.info(data) + if data.get('serverId', None): + ## connection has been established + await self.on_open() + + async def _system_conf_handler(self, data): + flag = data.get('flags') + status = data.get('status') + if flag not in Flags.strings: + self.logger.warn("Unknown config value set {}".format(flag)) + return + flagString = Flags.strings[flag] + if status == "OK": + self.logger.info("Enabled config flag {}".format(flagString)) + else: + self.logger.error("Unable to enable config flag {}".format(flagString)) + + async def _system_subscribed_handler(self, data): + await self.subscriptionManager.confirm_subscription(data) + + async def _system_unsubscribe_handler(self, data): + await self.subscriptionManager.confirm_unsubscribe(data) + + async def _system_error_handler(self, data): + self._emit('error', data) + + async def _system_auth_handler(self, data): + if data.get('status') == 'FAILED': + raise AuthError(self.ERRORS[data.get('code')]) + else: + self._emit('authenticated', data) + self.logger.info("Authentication successful.") + + async def _trade_update_handler(self, data): + tData = data[2] + # [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]] + if self.subscriptionManager.is_subscribed(data[0]): + symbol = self.subscriptionManager.get(data[0]).symbol + tradeObj = _parse_trade(tData, symbol) + self._emit('new_trade', tradeObj) + + async def _trade_executed_handler(self, data): + tData = data[2] + # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] + if self.subscriptionManager.is_subscribed(data[0]): + symbol = self.subscriptionManager.get(data[0]).symbol + tradeObj = _parse_trade(tData, symbol) + self._emit('new_trade', tradeObj) + + async def _wallet_update_handler(self, data): + # [0,"wu",["exchange","USD",89134.66933283,0]] + uw = self.wallets._update_from_event(data) + self._emit('wallet_update', uw) + self.logger.info("Wallet update: {}".format(uw)) + + async def _heart_beat_handler(self, data): + self.logger.debug("Heartbeat - {}".format(self.host)) + + async def _margin_info_update_handler(self, data): + self._emit('margin_info_update', data) + self.logger.info("Margin info update: {}".format(data)) + + async def _funding_info_update_handler(self, data): + self._emit('funding_info_update', data) + self.logger.info("Funding info update: {}".format(data)) + + async def _notification_handler(self, data): + # [0, 'n', [1542289340429, 'on-req', None, None, + # [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET', + # None, None, None, None, None, None, None, 18970, None, 0, 0, None, None, None, 0, None, + # None, None, None, None, None, None, None], None, 'SUCCESS', 'Submitting exchange market buy order for 0.01 BTC.']] + nInfo = data[2] + self._emit('notification', nInfo) + notificationType = nInfo[6] + notificationText = nInfo[7] + if notificationType == 'ERROR': + self._emit('error', notificationText) + self.logger.error("Notification ERROR: {}".format(notificationText)) + else: + self.logger.info("Notification SUCCESS: {}".format(notificationText)) + + async def _balance_update_handler(self, data): + self.logger.info('Balance update: {}'.format(data[2])) + self._emit('balance_update', data[2]) + + async def _order_closed_handler(self, data): + await self.orderManager.confirm_order_closed(data) + + async def _order_update_handler(self, data): + await self.orderManager.confirm_order_update(data) + + async def _order_new_handler(self, data): + await self.orderManager.confirm_order_new(data) + + async def _order_snapshot_handler(self, data): + await self.orderManager.build_from_order_snapshot(data) + + async def _wallet_snapshot_handler(self, data): + wallets = self.wallets._update_from_snapshot(data) + self._emit('wallet_snapshot', wallets) + + async def _position_snapshot_handler(self, data): + self._emit('position_snapshot', data) + self.logger.info("Position snapshot: {}".format(data)) + + async def _funding_offer_snapshot_handler(self, data): + self._emit('funding_offer_snapshot', data) + self.logger.info("Funding offer snapshot: {}".format(data)) + + async def _funding_load_snapshot_handler(self, data): + self._emit('funding_loan_snapshot', data[2]) + self.logger.info("Funding loan snapshot: {}".format(data)) + + async def _funding_credit_snapshot_handler(self, data): + self._emit('funding_credit_snapshot', data[2]) + self.logger.info("Funding credit snapshot: {}".format(data)) + + async def _trade_handler(self, data): + symbol = self.subscriptionManager.get(data[0]).symbol + if type(data[1]) is list: + data = data[1] + # Process the batch of seed trades on + # connection + data.reverse() + for t in data: + trade = { + 'mts': t[1], + 'price': t[2], + 'amount': t[3], + 'symbol': symbol + } + self._emit('seed_trade', trade) + else: + tradeObj = _parse_trade_snapshot_item(data, symbol) + self._emit('new_trade', tradeObj) + + async def _candle_handler(self, data): + subscription = self.subscriptionManager.get(data[0]) + if type(data[1][0]) is list: + # Process the batch of seed candles on + # websocket subscription + candlesSnapshot = data[1] + candlesSnapshot.reverse() + for c in candlesSnapshot: + candle = _parse_candle(c, subscription.symbol, subscription.timeframe) + self._emit('seed_candle', candle) + else: + candle = _parse_candle(data[1], subscription.symbol, subscription.timeframe) + self._emit('new_candle', candle) + + async def _order_book_handler(self, data): + obInfo = data[1] + chanId = data[0] + subscription = self.subscriptionManager.get(data[0]) + symbol = subscription.symbol + if data[1] == "cs": + dChecksum = data[2] & 0xffffffff # force to signed int + checksum = self.orderBooks[symbol].checksum() + # force checksums to signed integers + isValid = (dChecksum) == (checksum) + if isValid: + self.logger.debug("Checksum orderbook validation for '{}' successful." + .format(symbol)) + else: + self.logger.warn("Checksum orderbook invalid for '{}'. Resetting subscription." + .format(symbol)) + # re-build orderbook with snapshot + await self.subscriptionManager.resubscribe(chanId) + return + isSnapshot = type(obInfo[0]) is list + if isSnapshot: + self.orderBooks[symbol] = OrderBook() + self.orderBooks[symbol].updateFromSnapshot(obInfo) + self._emit('order_book_snapshot', { 'symbol': symbol, 'data': obInfo }) + else: + self.orderBooks[symbol].updateWith(obInfo) + self._emit('order_book_update', { 'symbol': symbol, 'data': obInfo }) + + async def on_message(self, message): + self.logger.debug(message) + msg = json.loads(message) + self._emit('all', msg) + if type(msg) is dict: + # System messages are received as json + await self._ws_system_handler(msg) + elif type(msg) is list: + # All data messages are received as a list + await self._ws_data_handler(msg) + else: + self.logger.warn('Unknown websocket response: {}'.format(msg)) + + async def _ws_authenticate_socket(self): + nonce = int(round(time.time() * 1000000)) + authMsg = 'AUTH{}'.format(nonce) + secret = self.API_SECRET.encode() + sig = hmac.new(secret, authMsg.encode(), hashlib.sha384).hexdigest() + hmac.new(secret, self.API_SECRET.encode('utf'), hashlib.sha384).hexdigest() + jdata = { + 'apiKey': self.API_KEY, + 'authSig': sig, + 'authNonce': nonce, + 'authPayload': authMsg, + 'event': 'auth' + } + await self.ws.send(json.dumps(jdata)) + + async def on_open(self): + self.logger.info("Websocket opened.") + self._emit('connected') + # Orders are simulated in backtest mode + if self.API_KEY and self.API_SECRET: + await self._ws_authenticate_socket() + # enable order book checksums + if self.manageOrderBooks: + await self.enable_flag(Flags.CHECKSUM) + + async def _send_auth_command(self, channel_name, data): + payload = [0, channel_name, None, data] + await self.ws.send(json.dumps(payload)) + + async def enable_flag(self, flag): + payload = { + "event": 'conf', + "flags": flag + } + await self.ws.send(json.dumps(payload)) + + async def subscribe(self, *args, **kwargs): + return await self.subscriptionManager.subscribe(*args, **kwargs) + + async def unsubscribe(self, *args, **kwargs): + return await self.subscriptionManager.unsubscribe(*args, **kwargs) + + async def resubscribe(self, *args, **kwargs): + return await self.subscriptionManager.resubscribe(*args, **kwargs) + + async def unsubscribe_all(self, *args, **kwargs): + return await self.subscriptionManager.unsubscribe_all(*args, **kwargs) + + async def resubscribe_all(self, *args, **kwargs): + return await self.subscriptionManager.resubscribe_all(*args, **kwargs) + + async def submit_order(self, *args, **kwargs): + return await self.orderManager.submit_order(*args, **kwargs) + + async def update_order(self, *args, **kwargs): + return await self.orderManager.update_order(*args, **kwargs) + + async def close_order(self, *args, **kwargs): + return await self.orderManager.close_order(*args, **kwargs) + + async def close_all_orders(self, *args, **kwargs): + return await self.orderManager.close_all_orders(*args, **kwargs) + + async def close_order_multi(self, *args, **kwargs): + return await self.close_order_multi(*args, **kwargs) diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/GenericWebsocket.py new file mode 100644 index 0000000..b5c30fd --- /dev/null +++ b/bfxapi/websockets/GenericWebsocket.py @@ -0,0 +1,64 @@ +import asyncio +import websockets +import json + +from pyee import EventEmitter +from ..utils.CustomLogger import CustomLogger + +class AuthError(Exception): pass + +def is_json(myjson): + try: + json_object = json.loads(myjson) + except ValueError as e: + return False + return True + +class GenericWebsocket(object): + + def __init__(self, host, logLevel='INFO', loop=None): + self.host = host + self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) + self.loop = loop or asyncio.get_event_loop() + self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop) + self.ws = None + + def run(self): + self.loop.run_until_complete(self._main(self.host)) + + async def _main(self, host): + async with websockets.connect(host) as websocket: + self.ws = websocket + self.logger.info("Wesocket connectedt to {}".format(self.host)) + while True: + await asyncio.sleep(0) + message = await websocket.recv() + await self.on_message(message) + + def on(self, event, func=None): + if not func: + return self.events.on(event) + self.events.on(event, func) + + def once(self, event, func=None): + if not func: + return self.events.once(event) + self.events.once(event, func) + + def _emit(self, event, *args, **kwargs): + self.events.emit(event, *args, **kwargs) + + async def on_error(self, error): + self.logger.error(error) + self.events.emit('error', error) + + async def on_close(self): + self.logger.info("Websocket closed.") + await self.ws.close() + self._emit('done') + + async def on_open(self): + pass + + async def on_message(self, message): + pass diff --git a/bfxapi/websockets/OrderManager.py b/bfxapi/websockets/OrderManager.py new file mode 100644 index 0000000..52df201 --- /dev/null +++ b/bfxapi/websockets/OrderManager.py @@ -0,0 +1,144 @@ +import time +import asyncio + +from ..utils.CustomLogger import CustomLogger +from ..models import Order, Trade + +class OrderManager: + + def __init__(self, bfxapi, logLevel='INFO'): + self.bfxapi = bfxapi + self.pending_orders = {} + self.pending_callbacks = {} + self.closed_orders = {} + self.open_orders = {} + self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel) + + def get_open_orders(self): + return list(self.open_orders.values()) + + def get_closed_orders(self): + return list(self.closed_orders.values()) + + def get_pending_orders(self): + return list(self.pending_orders.values()) + + async def _confirm_order(self, order, trade): + ''' + Called once when we first recieve infomation back from the bitfinex api + that the order has been accepted. + ''' + if order.cId in self.pending_orders: + if self.pending_callbacks[order.cId][0]: + # call onComplete callback + await self.pending_callbacks[order.cId][0](order, trade) + order.set_confirmed() + # remove from pending orders list + del self.pending_orders[order.cId] + del self.pending_callbacks[order.cId] + self.bfxapi._emit('order_confirmed', order, trade) + + async def confirm_order_closed(self, raw_ws_data): + # order created and executed + # [0,"oc",[1151349678,null,1542203391995,"tBTCUSD",1542203389940,1542203389966,0,0.1, + # "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED + # @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null, + # "API>BFX",null,null,null]] + order = Order(self.bfxapi, raw_ws_data[2]) + trade = Trade(order) + order.set_open_state(False) + if order.id in self.open_orders: + del self.open_orders[order.id] + await self._confirm_order(order, trade) + self.logger.info("Order closed: {} {}".format(order.symbol, order.status)) + self.bfxapi._emit('order_closed', order, trade) + + async def build_from_order_snapshot(self, raw_ws_data): + ''' + Rebuild the user orderbook based on an incoming snapshot + ''' + osData = raw_ws_data[2] + self.open_orders = {} + for raw_order in osData: + order = Order(self.bfxapi, raw_order) + trade = Trade(order) + order.set_open_state(True) + self.open_orders[order.id] = order + # await self._confirm_order(order, trade) + self.bfxapi._emit('order_snapshot', self.get_open_orders()) + + async def confirm_order_update(self, raw_ws_data): + # order created but partially filled + # [0, 'ou', [1151351581, None, 1542629457873, 'tBTCUSD', 1542629458071, + # 1542629458189, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', + # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', + # None, None, None]] + order = Order(self.bfxapi, raw_ws_data[2]) + order.set_open_state(True) + trade = Trade(order) + self.open_orders[order.id] = order + await self._confirm_order(order, trade) + self.logger.info("Order update: {} {}".format(order, trade)) + self.bfxapi._emit('order_update', order, trade) + + async def confirm_order_new(self, raw_ws_data): + # order created but not executed / created but partially filled + # [0, 'on', [1151351563, None, 1542624024383, 'tBTCUSD', 1542624024596, + # 1542624024617, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', + # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', + # None, None, None]] + order = Order(self.bfxapi, raw_ws_data[2]) + order.set_open_state(True) + trade = Trade(order) + self.open_orders[order.id] = order + await self._confirm_order(order, trade) + self.logger.info("Order new: {} {}".format(order, trade)) + self.bfxapi._emit('order_new', order, trade) + + def _gen_unqiue_cid(self): + return int(round(time.time() * 1000)) + + async def submit_order(self, symbol, price, amount, market_type, + hidden=False, onComplete=None, onError=None, *args, **kwargs): + cId = self._gen_unqiue_cid() + # send order over websocket + payload = { + "cid": cId, + "type": str(market_type), + "symbol": symbol, + "amount": str(amount), + "price": str(price) + } + self.pending_orders[cId] = payload + self.pending_callbacks[cId] = (onComplete, onError) + await self.bfxapi._send_auth_command('on', payload) + self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( + cId, symbol, amount, price)) + + async def update_order(self, orderId, *args, onComplete=None, onError=None, **kwargs): + if orderId not in self.open_orders: + raise Exception("Order id={} is not open".format(orderId)) + order = self.open_orders[orderId] + self.pending_callbacks[order.cId] = (onComplete, onError) + await order.update(*args, **kwargs) + self.logger.info("Update Order order_id={} dispatched".format(orderId)) + + async def close_order(self, orderId, onComplete=None, onError=None): + if orderId not in self.open_orders: + raise Exception("Order id={} is not open".format(orderId)) + order = self.open_orders[orderId] + self.pending_callbacks[order.cId] = (onComplete, onError) + await order.cancel() + self.logger.info("Order cancel order_id={} dispatched".format(orderId)) + + async def close_all_orders(self): + ids = [self.open_orders[x].id for x in self.open_orders] + await self.close_order_multi(ids) + + async def close_order_multi(self, orderIds): + task_batch = [] + for oid in orderIds: + task_batch += [ + asyncio.ensure_future(self.open_orders[oid].close()) + ] + await asyncio.wait(*[ task_batch ]) diff --git a/bfxapi/websockets/SubscriptionManager.py b/bfxapi/websockets/SubscriptionManager.py new file mode 100644 index 0000000..af15bf9 --- /dev/null +++ b/bfxapi/websockets/SubscriptionManager.py @@ -0,0 +1,102 @@ +import json +import asyncio +import time + +from ..utils.CustomLogger import CustomLogger +from ..models import Subscription + +class SubscriptionManager: + + def __init__(self, bfxapi, logLevel='INFO'): + self.pending_subscriptions = {} + self.subscriptions_chanid = {} + self.subscriptions_subid = {} + self.unsubscribe_callbacks = {} + self.bfxapi = bfxapi + self.logger = CustomLogger('BfxSubscriptionManager', logLevel=logLevel) + + async def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): + # create a new subscription + subscription = Subscription(self.bfxapi.ws, channel_name, symbol, timeframe, **kwargs) + self.logger.info("Subscribing to channel {}".format(channel_name)) + key = "{}_{}".format(channel_name, subscription.key or symbol) + self.pending_subscriptions[key] = subscription + await subscription.subscribe() + + async def confirm_subscription(self, raw_ws_data): + # {"event":"subscribed","channel":"trades","chanId":1,"symbol":"tBTCUSD","pair":"BTCUSD"} + # {"event":"subscribed","channel":"candles","chanId":351,"key":"trade:1m:tBTCUSD"} + # {"event":"subscribed","channel":"book","chanId":4,"symbol":"tBTCUSD","prec":"P0","freq":"F0","len":"25","pair":"BTCUSD"} + symbol = raw_ws_data.get("symbol", None) + channel = raw_ws_data.get("channel") + chanId = raw_ws_data.get("chanId") + key = raw_ws_data.get("key", None) + get_key = "{}_{}".format(channel, key or symbol) + + if chanId in self.subscriptions_chanid: + # subscription has already existed in the past + p_sub = self.subscriptions_chanid[chanId] + else: + # has just been created and is pending + p_sub = self.pending_subscriptions[get_key] + # remove from pending list + del self.pending_subscriptions[get_key] + p_sub.confirm_subscription(chanId) + # add to confirmed list + self.subscriptions_chanid[chanId] = p_sub + self.subscriptions_subid[p_sub.sub_id] = p_sub + self.bfxapi._emit('subscribed', p_sub) + + async def confirm_unsubscribe(self, raw_ws_data): + chanId = raw_ws_data.get("chanId") + sub = self.subscriptions_chanid[chanId] + sub.confirm_unsubscribe() + self.bfxapi._emit('unsubscribed', sub) + # call onComplete callback if exists + if sub.sub_id in self.unsubscribe_callbacks: + await self.unsubscribe_callbacks[sub.sub_id]() + del self.unsubscribe_callbacks[sub.sub_id] + + def get(self, chanId): + return self.subscriptions_chanid[chanId] + + async def unsubscribe(self, chanId, onComplete=None): + sub = self.subscriptions_chanid[chanId] + if onComplete: + self.unsubscribe_callbacks[sub.sub_id] = onComplete + if sub.is_subscribed(): + await self.subscriptions_chanid[chanId].unsubscribe() + + async def resubscribe(self, chanId): + sub = self.subscriptions_chanid[chanId] + async def re_sub(): + await sub.subscribe() + if sub.is_subscribed(): + # unsubscribe first and call callback to subscribe + await self.unsubscribe(chanId, re_sub) + else: + # already unsibscribed, so just subscribe + await sub.subscribe() + + def is_subscribed(self, chanId): + if chanId not in self.subscriptions_chanid: + return False + return self.subscriptions_chanid[chanId].is_subscribed() + + async def unsubscribe_all(self): + task_batch = [] + for chanId in self.subscriptions_chanid: + sub = self.get(chanId) + if sub.is_subscribed(): + task_batch += [ + asyncio.ensure_future(self.unsubscribe(chanId)) + ] + await asyncio.wait(*[ task_batch ]) + + async def resubscribe_all(self): + task_batch = [] + for chanId in self.subscriptions_chanid: + task_batch += [ + asyncio.ensure_future(self.resubscribe(chanId)) + ] + await asyncio.wait(*[ task_batch ]) diff --git a/bfxapi/websockets/WalletManager.py b/bfxapi/websockets/WalletManager.py new file mode 100644 index 0000000..e194263 --- /dev/null +++ b/bfxapi/websockets/WalletManager.py @@ -0,0 +1,27 @@ + +from ..models import Wallet + +class WalletManager: + + def __init__(self): + self.wallets = {} + + def _update_from_snapshot(self, raw_ws_data): + # [0, 'ws', [['exchange', 'BTC', 41.25809589, 0, None], ['exchange', 'USD', 62761.86070104, 0, None]]] + wData = raw_ws_data[2] + self.wallets = {} + for wallet in wData: + new_wallet = Wallet(wallet[0], wallet[1], wallet[2], wallet[3]) + self.wallets[new_wallet.key] = new_wallet + return self.get_wallets() + + def _update_from_event(self, raw_ws_data): + # [0,"wu",["exchange","USD",62761.86070104,0,61618.66070104]] + wallet = raw_ws_data[2] + new_wallet = Wallet(wallet[0], wallet[1], wallet[2], wallet[3]) + self.wallets[new_wallet.key] = new_wallet + return new_wallet + + def get_wallets(self): + return list(self.wallets.values()) + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..616cb50 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +eventemitter==0.2.0 +asyncio==3.4.3 +websockets==7.0