diff --git a/README.md b/README.md index 8182dba..b664387 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ The websocket exposes a collection of events that are triggered when certain dat - `funding_credit_snapshot` (array): opening funding credit balances - `balance_update` (array): when the state of a balance is changed - `new_trade` (array): a new trade on the market has been executed +- `trade_update` (array): a trade on the market has been updated - `new_candle` (array): a new candle has been produced - `margin_info_updates` (array): new margin information has been broadcasted - `funding_info_updates` (array): new funding information has been broadcasted diff --git a/bfxapi/Client.py b/bfxapi/Client.py deleted file mode 100644 index cad2bbd..0000000 --- a/bfxapi/Client.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -This module exposes the core bitfinex clients which includes both -a websocket client and a rest interface client -""" - -# pylint: disable-all - -import asyncio - -from .websockets.BfxWebsocket import BfxWebsocket -from .rest.BfxRest import BfxRest - -REST_HOST = 'https://api-pub.bitfinex.com/v2' -WS_HOST = 'wss://api-pub.bitfinex.com/ws/2' - -class Client: - """ - The bfx client exposes rest and websocket objects - """ - - def __init__(self, API_KEY=None, API_SECRET=None, rest_host=REST_HOST, - ws_host=WS_HOST, loop=None, logLevel='INFO', dead_man_switch=False, - ws_capacity=25, *args, **kwargs): - self.loop = loop or asyncio.get_event_loop() - self.ws = BfxWebsocket(API_KEY=API_KEY, API_SECRET=API_SECRET, host=ws_host, - loop=self.loop, logLevel=logLevel, dead_man_switch=dead_man_switch, - ws_capacity=ws_capacity, *args, **kwargs) - self.rest = BfxRest(API_KEY=API_KEY, API_SECRET=API_SECRET, host=rest_host, - loop=self.loop, logLevel=logLevel, *args, **kwargs) diff --git a/bfxapi/__init__.py b/bfxapi/__init__.py index 350f17c..f5fbc80 100644 --- a/bfxapi/__init__.py +++ b/bfxapi/__init__.py @@ -6,8 +6,8 @@ from .version import __version__ from .client import Client from .models import (Order, Trade, OrderBook, Subscription, Wallet, Position, FundingLoan, FundingOffer, FundingCredit) -from .websockets.GenericWebsocket import GenericWebsocket, Socket -from .websockets.BfxWebsocket import BfxWebsocket -from .utils.Decimal import Decimal +from .websockets.generic_websocket import GenericWebsocket, Socket +from .websockets.bfx_websocket import BfxWebsocket +from .utils.decimal import Decimal NAME = 'bfxapi' diff --git a/bfxapi/client.py b/bfxapi/client.py index cad2bbd..0ac9fd6 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -7,8 +7,8 @@ a websocket client and a rest interface client import asyncio -from .websockets.BfxWebsocket import BfxWebsocket -from .rest.BfxRest import BfxRest +from .websockets.bfx_websocket import BfxWebsocket +from .rest.bfx_rest import BfxRest REST_HOST = 'https://api-pub.bitfinex.com/v2' WS_HOST = 'wss://api-pub.bitfinex.com/ws/2' diff --git a/bfxapi/models/Order.py b/bfxapi/models/Order.py deleted file mode 100644 index f209bb8..0000000 --- a/bfxapi/models/Order.py +++ /dev/null @@ -1,222 +0,0 @@ -""" -Module used to describe all of the different data types -""" - -import time -import datetime - -class OrderType: - """ - Enum used to describe all of the different order types available for use - """ - MARKET = 'MARKET' - LIMIT = 'LIMIT' - STOP = 'STOP' - STOP_LIMIT = 'STOP LIMIT' - TRAILING_STOP = 'TRAILING STOP' - FILL_OR_KILL = 'FOK' - EXCHANGE_MARKET = 'EXCHANGE MARKET' - EXCHANGE_LIMIT = 'EXCHANGE LIMIT' - EXCHANGE_STOP = 'EXCHANGE STOP' - EXCHANGE_STOP_LIMIT = 'EXCHANGE STOP LIMIT' - EXCHANGE_TRAILING_STOP = 'EXCHANGE TRAILING STOP' - EXCHANGE_FILL_OR_KILL = 'EXCHANGE FOK' - - -LIMIT_ORDERS = [OrderType.LIMIT, OrderType.STOP_LIMIT, OrderType.EXCHANGE_LIMIT, - OrderType.EXCHANGE_STOP_LIMIT, OrderType.FILL_OR_KILL, - OrderType.EXCHANGE_FILL_OR_KILL] - - -class OrderSide: - """ - Enum used to describe the different directions of an order - """ - BUY = 'buy' - SELL = 'sell' - - -class OrderClosedModel: - """ - Enum used ad an index match to locate the different values in a - raw order array - """ - ID = 0 - GID = 1 - CID = 2 - SYMBOL = 3 - MTS_CREATE = 4 - MTS_UPDATE = 5 - AMOUNT = 6 - AMOUNT_ORIG = 7 - TYPE = 8 - TYPE_PREV = 9 - FLAGS = 12 - STATUS = 13 - PRICE = 16 - PRICE_AVG = 17 - PRICE_TRAILING = 18 - PRICE_AUX_LIMIT = 19 - NOTIFY = 23 - PLACE_ID = 25 - - -class OrderFlags: - """ - Enum used to explain the different values that can be passed in - as flags - """ - HIDDEN = 64 - CLOSE = 12 - REDUCE_ONLY = 1024 - POST_ONLY = 4096 - OCO = 16384 - - -def now_in_mills(): - """ - Gets the current time in milliseconds - """ - return int(round(time.time() * 1000)) - - -class Order: - """ - ID int64 Order ID - GID int Group ID - CID int Client Order ID - SYMBOL string Pair (tBTCUSD, ...) - MTS_CREATE int Millisecond timestamp of creation - MTS_UPDATE int Millisecond timestamp of update - AMOUNT float Positive means buy, negative means sell. - AMOUNT_ORIG float Original amount - TYPE string The type of the order: LIMIT, MARKET, STOP, TRAILING STOP, - EXCHANGE MARKET, EXCHANGE LIMIT, EXCHANGE STOP, EXCHANGE TRAILING STOP, FOK, EXCHANGE FOK. - TYPE_PREV string Previous order type - FLAGS int Upcoming Params Object (stay tuned) - ORDER_STATUS string Order Status: ACTIVE, EXECUTED, PARTIALLY FILLED, CANCELED - PRICE float Price - PRICE_AVG float Average price - PRICE_TRAILING float The trailing price - PRICE_AUX_LIMIT float Auxiliary Limit price (for STOP LIMIT) - HIDDEN int 1 if Hidden, 0 if not hidden - PLACED_ID int If another order caused this order to be placed (OCO) this will be that other - order's ID - """ - - Type = OrderType() - Side = OrderSide() - Flags = OrderFlags() - - def __init__(self, oid, gid, cid, symbol, mts_create, mts_update, amount, - amount_orig, o_type, typePrev, flags, status, price, price_avg, - price_trailing, price_aux_limit, notfiy, place_id): - self.id = oid # pylint: disable=invalid-name - self.gid = gid - self.cid = cid - self.symbol = symbol - self.mts_create = mts_create - self.mts_update = mts_update - self.amount = amount - self.amount_orig = amount_orig - if self.amount_orig > 0: - self.amount_filled = amount_orig - amount - else: - self.amount_filled = -(abs(amount_orig) - abs(amount)) - self.type = o_type - self.type_prev = typePrev - self.flags = flags - self.status = status - self.price = price - self.price_avg = price_avg - self.price_trailing = price_trailing - self.price_aux_limit = price_aux_limit - self.notfiy = notfiy - self.place_id = place_id - self.tag = "" - self.fee = 0 - self.is_pending_bool = True - self.is_confirmed_bool = False - self.is_open_bool = False - - self.date = datetime.datetime.fromtimestamp(mts_create/1000.0) - # if cancelled then priceAvg wont exist - if price_avg: - # check if order is taker or maker - if self.type in LIMIT_ORDERS: - self.fee = (price_avg * abs(self.amount_filled)) * 0.001 - else: - self.fee = (price_avg * abs(self.amount_filled)) * 0.002 - - @staticmethod - def from_raw_order(raw_order): - """ - Parse a raw order object into an Order oject - - @return Order - """ - oid = raw_order[OrderClosedModel.ID] - gid = raw_order[OrderClosedModel.GID] - cid = raw_order[OrderClosedModel.CID] - symbol = raw_order[OrderClosedModel.SYMBOL] - mts_create = raw_order[OrderClosedModel.MTS_CREATE] - mts_update = raw_order[OrderClosedModel.MTS_UPDATE] - amount = raw_order[OrderClosedModel.AMOUNT] - amount_orig = raw_order[OrderClosedModel.AMOUNT_ORIG] - o_type = raw_order[OrderClosedModel.TYPE] - type_prev = raw_order[OrderClosedModel.TYPE_PREV] - flags = raw_order[OrderClosedModel.FLAGS] - status = raw_order[OrderClosedModel.STATUS] - price = raw_order[OrderClosedModel.PRICE] - price_avg = raw_order[OrderClosedModel.PRICE_AVG] - price_trailing = raw_order[OrderClosedModel.PRICE_TRAILING] - price_aux_limit = raw_order[OrderClosedModel.PRICE_AUX_LIMIT] - notfiy = raw_order[OrderClosedModel.NOTIFY] - place_id = raw_order[OrderClosedModel.PLACE_ID] - - return Order(oid, gid, cid, symbol, mts_create, mts_update, amount, - amount_orig, o_type, type_prev, flags, status, price, price_avg, - price_trailing, price_aux_limit, notfiy, place_id) - - def set_confirmed(self): - """ - Set the state of the order to be confirmed - """ - self.is_pending_bool = False - self.is_confirmed_bool = True - - def set_open_state(self, is_open): - """ - Set the is_open state of the order - """ - self.is_open_bool = is_open - - def is_open(self): - """ - Check if the order is still open - - @return bool: Ture if order open else False - """ - return self.is_open_bool - - def is_pending(self): - """ - Check if the state of the order is still pending - - @return bool: True if is pending else False - """ - return self.is_pending_bool - - def is_confirmed(self): - """ - Check if the order has been confirmed by the bitfinex api - - @return bool: True if has been confirmed else False - """ - return self.is_confirmed_bool - - def __str__(self): - ''' Allow us to print the Order object in a pretty format ''' - text = "Order <'{}' amount_orig={} amount_filled={} mts_create={} status='{}' id={}>" - return text.format(self.symbol, self.amount_orig, self.amount_filled, - self.mts_create, self.status, self.id) diff --git a/bfxapi/models/Position.py b/bfxapi/models/Position.py deleted file mode 100644 index 43fc1ef..0000000 --- a/bfxapi/models/Position.py +++ /dev/null @@ -1,47 +0,0 @@ -""" -Module used to describe all of the different data types -""" - - -class Position: - """ - SYMBOL string Pair (tBTCUSD, ...). - STATUS string Status (ACTIVE, CLOSED). - AMOUNT float Size of the position. Positive values means a long position, - negative values means a short position. - BASE_PRICE float The price at which you entered your position. - MARGIN_FUNDING float The amount of funding being used for this position. - MARGIN_FUNDING_TYPE int 0 for daily, 1 for term. - PL float Profit & Loss - PL_PERC float Profit & Loss Percentage - PRICE_LIQ float Liquidation price - LEVERAGE float Beta value - """ - - def __init__(self, symbol, status, amount, b_price, m_funding, m_funding_type, - profit_loss, profit_loss_perc, l_price, lev): - self.symbol = symbol - self.status = status - self.amount = amount - self.base_price = b_price - self.margin_funding = m_funding - self.margin_funding_type = m_funding_type - self.profit_loss = profit_loss - self.profit_loss_percentage = profit_loss_perc - self.liquidation_price = l_price - self.leverage = lev - - @staticmethod - def from_raw_rest_position(raw_position): - """ - Generate a Position object from a raw position array - - @return Position - """ - return Position(*raw_position) - - def __str__(self): - ''' Allow us to print the Trade object in a pretty format ''' - text = "Position '{}' {} x {} " - return text.format(self.symbol, self.base_price, self.amount, - self.status, self.profit_loss) diff --git a/bfxapi/models/Subscription.py b/bfxapi/models/Subscription.py deleted file mode 100644 index 19b5158..0000000 --- a/bfxapi/models/Subscription.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -Module used to describe all of the different data types -""" - -import time -import json -from random import randint - -def generate_sub_id(): - """ - Generates a unique id in the form of 12345566-12334556 - """ - prefix = str(int(round(time.time() * 1000))) - suffix = str(randint(0, 9999999)) - return "{}-{}".format(prefix, suffix) - -class Subscription: - """ - Object used to represent an individual subscription to the websocket. - This class also exposes certain functions which helps to manage the subscription - such as unsibscribe and subscribe. - """ - - def __init__(self, socket, channel_name, symbol, timeframe=None, **kwargs): - self.socket = socket - self.channel_name = channel_name - self.symbol = symbol - self.timeframe = timeframe - self.is_subscribed_bool = False - self.key = None - self.chan_id = None - if timeframe: - self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) - self.sub_id = generate_sub_id() - self.send_payload = self._generate_payload(**kwargs) - - def get_key(self): - """ - Generates a unique key string for the subscription - """ - return "{}_{}".format(self.channel_name, self.key or self.symbol) - - def confirm_subscription(self, chan_id): - """ - Update the subscription to confirmed state - """ - self.is_subscribed_bool = True - self.chan_id = chan_id - - async def unsubscribe(self): - """ - Send an unsubscription request to the bitfinex socket - """ - if not self.is_subscribed(): - raise Exception("Subscription is not subscribed to websocket") - payload = {'event': 'unsubscribe', 'chanId': self.chan_id} - await self.socket.ws.send(json.dumps(payload)) - - async def subscribe(self): - """ - Send a subscription request to the bitfinex socket - """ - await self.socket.ws.send(json.dumps(self._get_send_payload())) - - def confirm_unsubscribe(self): - """ - Update the subscription to unsubscribed state - """ - self.is_subscribed_bool = False - - def is_subscribed(self): - """ - Check if the subscription is currently subscribed - - @return bool: True if subscribed else False - """ - return self.is_subscribed_bool - - def _generate_payload(self, **kwargs): - payload = {'event': 'subscribe', - 'channel': self.channel_name, 'symbol': self.symbol} - if self.timeframe: - payload['key'] = self.key - payload.update(**kwargs) - return payload - - def _get_send_payload(self): - return self.send_payload diff --git a/bfxapi/models/Trade.py b/bfxapi/models/Trade.py deleted file mode 100644 index a4325f3..0000000 --- a/bfxapi/models/Trade.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Module used to describe all of the different data types -""" - -import datetime - - -class Trade: - """ - ID integer Trade database id - PAIR string Pair (BTCUSD, ...) - MTS_CREATE integer Execution timestamp - ORDER_ID integer Order id - EXEC_AMOUNT float Positive means buy, negative means sell - EXEC_PRICE float Execution price - ORDER_TYPE string Order type - ORDER_PRICE float Order price - MAKER int 1 if true, 0 if false - FEE float Fee - FEE_CURRENCY string Fee currency - """ - - SHORT = 'SHORT' - LONG = 'LONG' - - def __init__(self, tid, pair, mts_create, order_id, amount, price, order_type, - order_price, maker, fee, fee_currency): - # pylint: disable=invalid-name - self.id = tid - self.pair = pair - self.mts_create = mts_create - self.date = datetime.datetime.fromtimestamp(mts_create/1000.0) - self.order_id = order_id - self.amount = amount - self.direction = Trade.SHORT if amount < 0 else Trade.LONG - self.price = price - self.order_type = order_type - self.order_price = order_price - self.maker = maker - self.fee = fee - self.fee_currency = fee_currency - - @staticmethod - def from_raw_rest_trade(raw_trade): - """ - Generate a Trade object from a raw trade array - """ - # [24224048, 'tBTCUSD', 1542800024000, 1151353484, 0.09399997, 19963, None, None, - # -1, -0.000188, 'BTC'] - return Trade(*raw_trade) - - def __str__(self): - return "Trade '{}' x {} @ {} ".format( - self.pair, self.amount, self.price, self.direction, self.fee) diff --git a/bfxapi/models/Wallet.py b/bfxapi/models/Wallet.py deleted file mode 100644 index 028d5ac..0000000 --- a/bfxapi/models/Wallet.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -Module used to describe all of the different data types -""" - - -class Wallet: - """ - Stores data relevant to a users wallet such as balance and - currency - """ - - def __init__(self, wType, currency, balance, unsettled_interest): - self.type = wType - self.currency = currency - self.balance = balance - self.unsettled_interest = unsettled_interest - self.key = "{}_{}".format(wType, currency) - - def set_balance(self, data): - """ - Set the balance of the wallet - """ - self.balance = data - - def set_unsettled_interest(self, data): - """ - Set the unsettled interest of the wallet - """ - self.unsettled_interest = data - - def __str__(self): - return "Wallet <'{}_{}' balance='{}' unsettled='{}'>".format( - self.type, self.currency, self.balance, self.unsettled_interest) diff --git a/bfxapi/rest/BfxRest.py b/bfxapi/rest/bfx_rest.py similarity index 99% rename from bfxapi/rest/BfxRest.py rename to bfxapi/rest/bfx_rest.py index ee3d8d1..cd71b54 100644 --- a/bfxapi/rest/BfxRest.py +++ b/bfxapi/rest/bfx_rest.py @@ -7,7 +7,7 @@ import aiohttp import time import json -from ..utils.CustomLogger import CustomLogger +from ..utils.custom_logger import CustomLogger from ..utils.auth import generate_auth_headers from ..models import Wallet, Order, Position, Trade, FundingLoan, FundingOffer from ..models import FundingCredit @@ -27,7 +27,7 @@ class BfxRest: self.API_KEY = API_KEY self.API_SECRET = API_SECRET self.host = host - # this value can also be set to bfxapi.Decimal for much higher precision + # this value can also be set to bfxapi.decimal for much higher precision self.parse_float = parse_float self.logger = CustomLogger('BfxRest', logLevel=logLevel) diff --git a/bfxapi/utils/CustomLogger.py b/bfxapi/utils/custom_logger.py similarity index 96% rename from bfxapi/utils/CustomLogger.py rename to bfxapi/utils/custom_logger.py index c2b76a0..52669cb 100644 --- a/bfxapi/utils/CustomLogger.py +++ b/bfxapi/utils/custom_logger.py @@ -1,96 +1,96 @@ -""" -Module used to describe all of the different data types -""" - -import logging - -RESET_SEQ = "\033[0m" -COLOR_SEQ = "\033[1;%dm" -BOLD_SEQ = "\033[1m" -UNDERLINE_SEQ = "\033[04m" - -YELLOW = '\033[93m' -WHITE = '\33[37m' -BLUE = '\033[34m' -LIGHT_BLUE = '\033[94m' -RED = '\033[91m' -GREY = '\33[90m' - -KEYWORD_COLORS = { - 'WARNING': YELLOW, - 'INFO': LIGHT_BLUE, - 'DEBUG': WHITE, - 'CRITICAL': YELLOW, - 'ERROR': RED, - 'TRADE': '\33[102m\33[30m' -} - -def formatter_message(message, use_color = True): - """ - Syntax highlight certain keywords - """ - if use_color: - message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ) - else: - message = message.replace("$RESET", "").replace("$BOLD", "") - return message - -def format_word(message, word, color_seq, bold=False, underline=False): - """ - Surround the fiven word with a sequence - """ - replacer = color_seq + word + RESET_SEQ - if underline: - replacer = UNDERLINE_SEQ + replacer - if bold: - replacer = BOLD_SEQ + replacer - return message.replace(word, replacer) - -class Formatter(logging.Formatter): - ''' - This Formatted simply colors in the levelname i.e 'INFO', 'DEBUG' - ''' - def __init__(self, msg, use_color = True): - logging.Formatter.__init__(self, msg) - self.use_color = use_color - - def format(self, record): - """ - Format and highlight certain keywords - """ - levelname = record.levelname - if self.use_color and levelname in KEYWORD_COLORS: - levelname_color = KEYWORD_COLORS[levelname] + levelname + RESET_SEQ - record.levelname = levelname_color - record.name = GREY + record.name + RESET_SEQ - return logging.Formatter.format(self, record) - -class CustomLogger(logging.Logger): - ''' - This adds extra logging functions such as logger.trade and also - sets the logger to use the custom formatter - ''' - FORMAT = "[$BOLD%(name)s$RESET] [%(levelname)s] %(message)s" - COLOR_FORMAT = formatter_message(FORMAT, True) - TRADE = 50 - - def __init__(self, name, logLevel='DEBUG'): - logging.Logger.__init__(self, name, logLevel) - color_formatter = Formatter(self.COLOR_FORMAT) - console = logging.StreamHandler() - console.setFormatter(color_formatter) - self.addHandler(console) - logging.addLevelName(self.TRADE, "TRADE") - return - - def trade(self, message, *args, **kws): - """ - Print a syntax highlighted trade signal - """ - if self.isEnabledFor(self.TRADE): - message = format_word(message, 'CLOSED ', YELLOW, bold=True) - message = format_word(message, 'OPENED ', LIGHT_BLUE, bold=True) - message = format_word(message, 'UPDATED ', BLUE, bold=True) - message = format_word(message, 'CLOSED_ALL ', RED, bold=True) - # Yes, logger takes its '*args' as 'args'. - self._log(self.TRADE, message, args, **kws) +""" +Module used to describe all of the different data types +""" + +import logging + +RESET_SEQ = "\033[0m" +COLOR_SEQ = "\033[1;%dm" +BOLD_SEQ = "\033[1m" +UNDERLINE_SEQ = "\033[04m" + +YELLOW = '\033[93m' +WHITE = '\33[37m' +BLUE = '\033[34m' +LIGHT_BLUE = '\033[94m' +RED = '\033[91m' +GREY = '\33[90m' + +KEYWORD_COLORS = { + 'WARNING': YELLOW, + 'INFO': LIGHT_BLUE, + 'DEBUG': WHITE, + 'CRITICAL': YELLOW, + 'ERROR': RED, + 'TRADE': '\33[102m\33[30m' +} + +def formatter_message(message, use_color = True): + """ + Syntax highlight certain keywords + """ + if use_color: + message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ) + else: + message = message.replace("$RESET", "").replace("$BOLD", "") + return message + +def format_word(message, word, color_seq, bold=False, underline=False): + """ + Surround the fiven word with a sequence + """ + replacer = color_seq + word + RESET_SEQ + if underline: + replacer = UNDERLINE_SEQ + replacer + if bold: + replacer = BOLD_SEQ + replacer + return message.replace(word, replacer) + +class Formatter(logging.Formatter): + ''' + This Formatted simply colors in the levelname i.e 'INFO', 'DEBUG' + ''' + def __init__(self, msg, use_color = True): + logging.Formatter.__init__(self, msg) + self.use_color = use_color + + def format(self, record): + """ + Format and highlight certain keywords + """ + levelname = record.levelname + if self.use_color and levelname in KEYWORD_COLORS: + levelname_color = KEYWORD_COLORS[levelname] + levelname + RESET_SEQ + record.levelname = levelname_color + record.name = GREY + record.name + RESET_SEQ + return logging.Formatter.format(self, record) + +class CustomLogger(logging.Logger): + ''' + This adds extra logging functions such as logger.trade and also + sets the logger to use the custom formatter + ''' + FORMAT = "[$BOLD%(name)s$RESET] [%(levelname)s] %(message)s" + COLOR_FORMAT = formatter_message(FORMAT, True) + TRADE = 50 + + def __init__(self, name, logLevel='DEBUG'): + logging.Logger.__init__(self, name, logLevel) + color_formatter = Formatter(self.COLOR_FORMAT) + console = logging.StreamHandler() + console.setFormatter(color_formatter) + self.addHandler(console) + logging.addLevelName(self.TRADE, "TRADE") + return + + def trade(self, message, *args, **kws): + """ + Print a syntax highlighted trade signal + """ + if self.isEnabledFor(self.TRADE): + message = format_word(message, 'CLOSED ', YELLOW, bold=True) + message = format_word(message, 'OPENED ', LIGHT_BLUE, bold=True) + message = format_word(message, 'UPDATED ', BLUE, bold=True) + message = format_word(message, 'CLOSED_ALL ', RED, bold=True) + # Yes, logger takes its '*args' as 'args'. + self._log(self.TRADE, message, args, **kws) diff --git a/bfxapi/utils/Decimal.py b/bfxapi/utils/decimal.py similarity index 100% rename from bfxapi/utils/Decimal.py rename to bfxapi/utils/decimal.py diff --git a/bfxapi/websockets/BfxWebsocket.py b/bfxapi/websockets/bfx_websocket.py similarity index 98% rename from bfxapi/websockets/BfxWebsocket.py rename to bfxapi/websockets/bfx_websocket.py index cb4ab60..9bf2edd 100644 --- a/bfxapi/websockets/BfxWebsocket.py +++ b/bfxapi/websockets/bfx_websocket.py @@ -7,10 +7,10 @@ import json import time import random -from .GenericWebsocket import GenericWebsocket, AuthError -from .SubscriptionManager import SubscriptionManager -from .WalletManager import WalletManager -from .OrderManager import OrderManager +from .generic_websocket import GenericWebsocket, AuthError +from .subscription_manager import SubscriptionManager +from .wallet_manager import WalletManager +from .order_manager import OrderManager from ..utils.auth import generate_auth_payload from ..models import Order, Trade, OrderBook @@ -107,7 +107,7 @@ class BfxWebsocket(GenericWebsocket): self.pendingOrders = {} self.orderBooks = {} self.ws_capacity = ws_capacity - # How should we store float values? could also be bfxapi.Decimal + # How should we store float values? could also be bfxapi.decimal # which is slower but has higher precision. self.parse_float = parse_float super(BfxWebsocket, self).__init__(host, logLevel=logLevel, *args, **kwargs) @@ -217,7 +217,7 @@ class BfxWebsocket(GenericWebsocket): if self.subscriptionManager.is_subscribed(data[0]): symbol = self.subscriptionManager.get(data[0]).symbol tradeObj = _parse_trade(tData, symbol) - self._emit('new_trade', tradeObj) + self._emit('trade_update', tradeObj) async def _trade_executed_handler(self, data): tData = data[2] diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/generic_websocket.py similarity index 96% rename from bfxapi/websockets/GenericWebsocket.py rename to bfxapi/websockets/generic_websocket.py index 4af3b09..1f4db35 100644 --- a/bfxapi/websockets/GenericWebsocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -1,214 +1,214 @@ -""" -Module used as a interfeace to describe a generick websocket client -""" - -import asyncio -import websockets -import socket -import json -import time -from threading import Thread - -from pyee import EventEmitter -from ..utils.CustomLogger import CustomLogger - -# websocket exceptions -from websockets.exceptions import ConnectionClosed - -class AuthError(Exception): - """ - Thrown whenever there is a problem with the authentication packet - """ - pass - -def is_json(myjson): - try: - json_object = json.loads(myjson) - except ValueError as e: - return False - return True - -class Socket(): - def __init__(self, sId): - self.ws = None - self.isConnected = False - self.isAuthenticated = False - self.id = sId - - def set_connected(self): - self.isConnected = True - - def set_disconnected(self): - self.isConnected = False - - def set_authenticated(self): - self.isAuthenticated = True - - def set_websocket(self, ws): - self.ws = ws - -def _start_event_worker(): - async def event_sleep_process(): - """ - sleeping process for event emitter to schedule on - """ - while True: - await asyncio.sleep(0) - def start_loop(loop): - asyncio.set_event_loop(loop) - loop.run_until_complete(event_sleep_process()) - event_loop = asyncio.new_event_loop() - worker = Thread(target=start_loop, args=(event_loop,)) - worker.start() - return event_loop - -class GenericWebsocket: - """ - Websocket object used to contain the base functionality of a websocket. - Inlcudes an event emitter and a standard websocket client. - """ - - def __init__(self, host, logLevel='INFO', loop=None, max_retries=5, - create_event_emitter=_start_event_worker): - self.host = host - self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) - self.loop = loop or asyncio.get_event_loop() - # overide 'error' event to stop it raising an exception - # self.events.on('error', self.on_error) - self.ws = None - self.max_retries = max_retries - self.attempt_retry = True - self.sockets = {} - # start seperate process for the even emitter - eventLoop = create_event_emitter() - self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=eventLoop) - - def run(self): - """ - Starte the websocket connection. This functions spawns the initial socket - thread and connection. - """ - self._start_new_socket() - - def get_task_executable(self): - """ - Get the run indefinitely asyncio task - """ - return self._run_socket() - - def _start_new_socket(self, socketId=None): - if not socketId: - socketId = len(self.sockets) - def start_loop(loop): - asyncio.set_event_loop(loop) - loop.run_until_complete(self._run_socket()) - worker_loop = asyncio.new_event_loop() - worker = Thread(target=start_loop, args=(worker_loop,)) - worker.start() - return socketId - - def _wait_for_socket(self, socket_id): - """ - Block until the given socket connection is open - """ - while True: - socket = self.sockets.get(socket_id, False) - if socket: - if socket.isConnected and socket.ws: - return - time.sleep(0.01) - - async def _connect(self, socket): - async with websockets.connect(self.host) as websocket: - self.sockets[socket.id].set_websocket(websocket) - self.sockets[socket.id].set_connected() - self.logger.info("Wesocket connected to {}".format(self.host)) - while True: - await asyncio.sleep(0) - message = await websocket.recv() - await self.on_message(socket.id, message) - - def get_socket(self, socketId): - return self.sockets[socketId] - - def get_authenticated_socket(self): - for socketId in self.sockets: - if self.sockets[socketId].isAuthenticated: - return self.sockets[socketId] - return None - - async def _run_socket(self): - retries = 0 - sId = len(self.sockets) - s = Socket(sId) - self.sockets[sId] = s - while retries < self.max_retries and self.attempt_retry: - try: - await self._connect(s) - retries = 0 - except (ConnectionClosed, socket.error) as e: - self.sockets[sId].set_disconnected() - self._emit('disconnected') - if (not self.attempt_retry): - return - self.logger.error(str(e)) - retries += 1 - # wait 5 seconds befor retrying - self.logger.info("Waiting 5 seconds before retrying...") - await asyncio.sleep(5) - self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries)) - self.logger.info("Unable to connect to websocket.") - self._emit('stopped') - - def remove_all_listeners(self, event): - """ - Remove all listeners from event emitter - """ - self.events.remove_all_listeners(event) - - def on(self, event, func=None): - """ - Add a new event to the event emitter - """ - if not func: - return self.events.on(event) - self.events.on(event, func) - - def once(self, event, func=None): - """ - Add a new event to only fire once to the event - emitter - """ - if not func: - return self.events.once(event) - self.events.once(event, func) - - def _emit(self, event, *args, **kwargs): - self.events.emit(event, *args, **kwargs) - - async def on_error(self, error): - """ - On websocket error print and fire event - """ - self.logger.error(error) - - async def on_close(self): - """ - On websocket close print and fire event. This is used by the data server. - """ - self.logger.info("Websocket closed.") - self.attempt_retry = False - await self.ws.close() - self._emit('done') - - async def on_open(self): - """ - On websocket open - """ - pass - - async def on_message(self, message): - """ - On websocket message - """ - pass +""" +Module used as a interfeace to describe a generick websocket client +""" + +import asyncio +import websockets +import socket +import json +import time +from threading import Thread + +from pyee import EventEmitter +from ..utils.custom_logger import CustomLogger + +# websocket exceptions +from websockets.exceptions import ConnectionClosed + +class AuthError(Exception): + """ + Thrown whenever there is a problem with the authentication packet + """ + pass + +def is_json(myjson): + try: + json_object = json.loads(myjson) + except ValueError as e: + return False + return True + +class Socket(): + def __init__(self, sId): + self.ws = None + self.isConnected = False + self.isAuthenticated = False + self.id = sId + + def set_connected(self): + self.isConnected = True + + def set_disconnected(self): + self.isConnected = False + + def set_authenticated(self): + self.isAuthenticated = True + + def set_websocket(self, ws): + self.ws = ws + +def _start_event_worker(): + async def event_sleep_process(): + """ + sleeping process for event emitter to schedule on + """ + while True: + await asyncio.sleep(0) + def start_loop(loop): + asyncio.set_event_loop(loop) + loop.run_until_complete(event_sleep_process()) + event_loop = asyncio.new_event_loop() + worker = Thread(target=start_loop, args=(event_loop,)) + worker.start() + return event_loop + +class GenericWebsocket: + """ + Websocket object used to contain the base functionality of a websocket. + Inlcudes an event emitter and a standard websocket client. + """ + + def __init__(self, host, logLevel='INFO', loop=None, max_retries=5, + create_event_emitter=_start_event_worker): + self.host = host + self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) + self.loop = loop or asyncio.get_event_loop() + # overide 'error' event to stop it raising an exception + # self.events.on('error', self.on_error) + self.ws = None + self.max_retries = max_retries + self.attempt_retry = True + self.sockets = {} + # start seperate process for the even emitter + eventLoop = create_event_emitter() + self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=eventLoop) + + def run(self): + """ + Starte the websocket connection. This functions spawns the initial socket + thread and connection. + """ + self._start_new_socket() + + def get_task_executable(self): + """ + Get the run indefinitely asyncio task + """ + return self._run_socket() + + def _start_new_socket(self, socketId=None): + if not socketId: + socketId = len(self.sockets) + def start_loop(loop): + asyncio.set_event_loop(loop) + loop.run_until_complete(self._run_socket()) + worker_loop = asyncio.new_event_loop() + worker = Thread(target=start_loop, args=(worker_loop,)) + worker.start() + return socketId + + def _wait_for_socket(self, socket_id): + """ + Block until the given socket connection is open + """ + while True: + socket = self.sockets.get(socket_id, False) + if socket: + if socket.isConnected and socket.ws: + return + time.sleep(0.01) + + async def _connect(self, socket): + async with websockets.connect(self.host) as websocket: + self.sockets[socket.id].set_websocket(websocket) + self.sockets[socket.id].set_connected() + self.logger.info("Wesocket connected to {}".format(self.host)) + while True: + await asyncio.sleep(0) + message = await websocket.recv() + await self.on_message(socket.id, message) + + def get_socket(self, socketId): + return self.sockets[socketId] + + def get_authenticated_socket(self): + for socketId in self.sockets: + if self.sockets[socketId].isAuthenticated: + return self.sockets[socketId] + return None + + async def _run_socket(self): + retries = 0 + sId = len(self.sockets) + s = Socket(sId) + self.sockets[sId] = s + while retries < self.max_retries and self.attempt_retry: + try: + await self._connect(s) + retries = 0 + except (ConnectionClosed, socket.error) as e: + self.sockets[sId].set_disconnected() + self._emit('disconnected') + if (not self.attempt_retry): + return + self.logger.error(str(e)) + retries += 1 + # wait 5 seconds befor retrying + self.logger.info("Waiting 5 seconds before retrying...") + await asyncio.sleep(5) + self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries)) + self.logger.info("Unable to connect to websocket.") + self._emit('stopped') + + def remove_all_listeners(self, event): + """ + Remove all listeners from event emitter + """ + self.events.remove_all_listeners(event) + + def on(self, event, func=None): + """ + Add a new event to the event emitter + """ + if not func: + return self.events.on(event) + self.events.on(event, func) + + def once(self, event, func=None): + """ + Add a new event to only fire once to the event + emitter + """ + if not func: + return self.events.once(event) + self.events.once(event, func) + + def _emit(self, event, *args, **kwargs): + self.events.emit(event, *args, **kwargs) + + async def on_error(self, error): + """ + On websocket error print and fire event + """ + self.logger.error(error) + + async def on_close(self): + """ + On websocket close print and fire event. This is used by the data server. + """ + self.logger.info("Websocket closed.") + self.attempt_retry = False + await self.ws.close() + self._emit('done') + + async def on_open(self): + """ + On websocket open + """ + pass + + async def on_message(self, message): + """ + On websocket message + """ + pass diff --git a/bfxapi/websockets/OrderManager.py b/bfxapi/websockets/order_manager.py similarity index 99% rename from bfxapi/websockets/OrderManager.py rename to bfxapi/websockets/order_manager.py index a1dffed..bb478ae 100644 --- a/bfxapi/websockets/OrderManager.py +++ b/bfxapi/websockets/order_manager.py @@ -5,7 +5,7 @@ Module used to house all of the functions/classes used to handle orders import time import asyncio -from ..utils.CustomLogger import CustomLogger +from ..utils.custom_logger import CustomLogger from ..models import Order diff --git a/bfxapi/websockets/SubscriptionManager.py b/bfxapi/websockets/subscription_manager.py similarity index 99% rename from bfxapi/websockets/SubscriptionManager.py rename to bfxapi/websockets/subscription_manager.py index fa22da7..542a941 100644 --- a/bfxapi/websockets/SubscriptionManager.py +++ b/bfxapi/websockets/subscription_manager.py @@ -7,7 +7,7 @@ import json import asyncio import time -from ..utils.CustomLogger import CustomLogger +from ..utils.custom_logger import CustomLogger from ..models import Subscription MAX_CHANNEL_COUNT = 25 diff --git a/bfxapi/websockets/WalletManager.py b/bfxapi/websockets/wallet_manager.py similarity index 100% rename from bfxapi/websockets/WalletManager.py rename to bfxapi/websockets/wallet_manager.py