Merge pull request #33 from hummingbot/feat/directional_backtesting

Feat/directional backtesting
This commit is contained in:
Michael Feng
2023-07-21 09:00:00 -04:00
committed by GitHub
22 changed files with 1082 additions and 98 deletions

2
.gitignore vendored
View File

@@ -6,8 +6,6 @@ __pycache__/
# C extensions
*.
*.ipynb
secrets.toml
.idea/*

View File

@@ -1,3 +1,5 @@
CANDLES_DATA_PATH = "data/candles"
DOWNLOAD_CANDLES_CONFIG_YML = "hummingbot_files/scripts_configs/data_downloader_config.yml"
BOTS_FOLDER = "hummingbot_files/bot_configs"
DIRECTIONAL_STRATEGIES_PATH = "quants_lab/strategy/experiments"
OPTIMIZATIONS_PATH = "quants_lab/optimizations"

View File

@@ -6,7 +6,6 @@ dependencies:
- sqlalchemy
- pip
- pip:
- ccxt
- streamlit
- watchdog
- plotly
@@ -17,5 +16,9 @@ dependencies:
- pandas_ta
- pyyaml
- commlib-py
- jupyter
- optuna
- optuna-dashboard
- streamlit-ace
- git+https://github.com/hummingbot/hbot-remote-client-py.git
- git+https://github.com/hummingbot/docker-manager.git

View File

@@ -191,4 +191,9 @@ color:
# The tick size is the frequency with which the clock notifies the time iterators by calling the
# c_tick() method, that means for example that if the tick size is 1, the logic of the strategy
# will run every second.
tick_size: 1.0
tick_size: 1.0
market_data_collection:
market_data_collection_enabled: true
market_data_collection_interval: 60
market_data_collection_depth: 20

View File

@@ -0,0 +1,194 @@
####################################
### client_config_map config ###
####################################
instance_id: e90c0d6f2b1e2d54fa0c0a69612b07174320963b
log_level: INFO
debug_console: false
strategy_report_interval: 900.0
logger_override_whitelist:
- hummingbot.strategy.arbitrage
- hummingbot.strategy.cross_exchange_market_making
- conf
log_file_path: /home/hummingbot/logs
kill_switch_mode: {}
# What to auto-fill in the prompt after each import command (start/config)
autofill_import: disabled
telegram_mode: {}
# MQTT Bridge configuration.
mqtt_bridge:
mqtt_host: localhost
mqtt_port: 1883
mqtt_username: ''
mqtt_password: ''
mqtt_namespace: hbot
mqtt_ssl: false
mqtt_logger: true
mqtt_notifier: true
mqtt_commands: true
mqtt_events: true
mqtt_external_events: true
mqtt_autostart: true
# Error log sharing
send_error_logs: true
# Can store the previous strategy ran for quick retrieval.
previous_strategy: null
# Advanced database options, currently supports SQLAlchemy's included dialects
# Reference: https://docs.sqlalchemy.org/en/13/dialects/
# To use an instance of SQLite DB the required configuration is
# db_engine: sqlite
# To use a DBMS the required configuration is
# db_host: 127.0.0.1
# db_port: 3306
# db_username: username
# db_password: password
# db_name: dbname
db_mode:
db_engine: sqlite
pmm_script_mode: {}
# Balance Limit Configurations
# e.g. Setting USDT and BTC limits on Binance.
# balance_asset_limit:
# binance:
# BTC: 0.1
# USDT: 1000
balance_asset_limit:
kucoin: {}
ciex: {}
ascend_ex_paper_trade: {}
crypto_com: {}
mock_paper_exchange: {}
btc_markets: {}
bitmart: {}
hitbtc: {}
loopring: {}
mexc: {}
polkadex: {}
bybit: {}
foxbit: {}
gate_io_paper_trade: {}
kucoin_paper_trade: {}
altmarkets: {}
ascend_ex: {}
bittrex: {}
probit_kr: {}
binance: {}
bybit_testnet: {}
okx: {}
bitmex: {}
binance_us: {}
probit: {}
gate_io: {}
lbank: {}
whitebit: {}
bitmex_testnet: {}
kraken: {}
huobi: {}
binance_paper_trade: {}
ndax_testnet: {}
coinbase_pro: {}
ndax: {}
bitfinex: {}
# Fixed gas price (in Gwei) for Ethereum transactions
manual_gas_price: 50.0
# Gateway API Configurations
# default host to only use localhost
# Port need to match the final installation port for Gateway
gateway:
gateway_api_host: localhost
gateway_api_port: '15888'
certs_path: /home/hummingbot/certs
# Whether to enable aggregated order and trade data collection
anonymized_metrics_mode:
anonymized_metrics_interval_min: 15.0
# Command Shortcuts
# Define abbreviations for often used commands
# or batch grouped commands together
command_shortcuts:
- command: spreads
help: Set bid and ask spread
arguments:
- Bid Spread
- Ask Spread
output:
- config bid_spread $1
- config ask_spread $2
# A source for rate oracle, currently ascend_ex, binance, coin_gecko, kucoin, gate_io
rate_oracle_source:
name: binance
# A universal token which to display tokens values in, e.g. USD,EUR,BTC
global_token:
global_token_name: USD
global_token_symbol: $
# Percentage of API rate limits (on any exchange and any end point) allocated to this bot instance.
# Enter 50 to indicate 50%. E.g. if the API rate limit is 100 calls per second, and you allocate
# 50% to this setting, the bot will have a maximum (limit) of 50 calls per second
rate_limits_share_pct: 100.0
commands_timeout:
create_command_timeout: 10.0
other_commands_timeout: 30.0
# Tabulate table format style (https://github.com/astanin/python-tabulate#table-format)
tables_format: psql
paper_trade:
paper_trade_exchanges:
- binance
- kucoin
- ascend_ex
- gate_io
paper_trade_account_balance:
BTC: 1.0
USDT: 1000.0
ONE: 1000.0
USDQ: 1000.0
TUSD: 1000.0
ETH: 10.0
WETH: 10.0
USDC: 1000.0
DAI: 1000.0
color:
top_pane: '#000000'
bottom_pane: '#000000'
output_pane: '#262626'
input_pane: '#1C1C1C'
logs_pane: '#121212'
terminal_primary: '#5FFFD7'
primary_label: '#5FFFD7'
secondary_label: '#FFFFFF'
success_label: '#5FFFD7'
warning_label: '#FFFF00'
info_label: '#5FD7FF'
error_label: '#FF0000'
gold_label: '#FFD700'
silver_label: '#C0C0C0'
bronze_label: '#CD7F32'
# The tick size is the frequency with which the clock notifies the time iterators by calling the
# c_tick() method, that means for example that if the tick size is 1, the logic of the strategy
# will run every second.
tick_size: 1.0

View File

@@ -2,7 +2,7 @@ version: "3.9"
services:
bot:
container_name: candles_downloader
image: dardonacci/hummingbot:development
image: hummingbot/hummingbot:development
volumes:
- "../../data/candles:/home/hummingbot/data"
- "../bot_configs/data_downloader/conf:/home/hummingbot/conf"

View File

@@ -1,5 +1,4 @@
import os
import ccxt
import pandas as pd
import streamlit as st

View File

@@ -0,0 +1,174 @@
import datetime
import threading
import webbrowser
import streamlit as st
from streamlit_ace import st_ace
import constants
from quants_lab.strategy.strategy_analysis import StrategyAnalysis
from utils import os_utils
from utils.file_templates import strategy_optimization_template, directional_strategy_template
from utils.os_utils import load_directional_strategies, save_file, get_function_from_file
import optuna
st.set_page_config(
page_title="Hummingbot Dashboard",
page_icon="🚀",
layout="wide",
)
if "strategy_params" not in st.session_state:
st.session_state.strategy_params = {}
st.title("⚙️ Backtesting")
create, modify, backtest, optimize, analyze = st.tabs(["Create", "Modify", "Backtest", "Optimize", "Analyze"])
with create:
# TODO:
# * Add videos explaining how to the triple barrier method works and how the backtesting is designed,
# link to video of how to create a strategy, etc in a toggle.
# * Add functionality to start strategy creation from scratch or by duplicating an existing one
c1, c2 = st.columns([4, 1])
with c1:
# TODO: Allow change the strategy name and see the effect in the code
strategy_name = st.text_input("Strategy class name", value="CustomStrategy")
with c2:
update_strategy_name = st.button("Update Strategy Name")
c1, c2 = st.columns([4, 1])
with c1:
# TODO: every time that we save and run the optimizations, we should save the code in a file
# so the user then can correlate the results with the code.
if update_strategy_name:
st.session_state.directional_strategy_code = st_ace(key="create_directional_strategy",
value=directional_strategy_template(strategy_name),
language='python',
keybinding='vscode',
theme='pastel_on_dark')
with c2:
if st.button("Save strategy"):
save_file(name=f"{strategy_name.lower()}.py", content=st.session_state.directional_strategy_code,
path=constants.DIRECTIONAL_STRATEGIES_PATH)
st.success(f"Strategy {strategy_name} saved successfully")
with modify:
pass
with backtest:
# TODO:
# * Add videos explaining how to the triple barrier method works and how the backtesting is designed,
# link to video of how to create a strategy, etc in a toggle.
# * Add performance analysis graphs of the backtesting run
strategies = load_directional_strategies(constants.DIRECTIONAL_STRATEGIES_PATH)
strategy_to_optimize = st.selectbox("Select strategy to backtest", strategies.keys())
strategy = strategies[strategy_to_optimize]
strategy_config = strategy["config"]
field_schema = strategy_config.schema()["properties"]
st.write("## Strategy parameters")
c1, c2 = st.columns([5, 1])
with c1:
columns = st.columns(4)
column_index = 0
for field_name, properties in field_schema.items():
field_type = properties["type"]
with columns[column_index]:
if field_type in ["number", "integer"]:
field_value = st.number_input(field_name,
value=properties["default"],
min_value=properties.get("minimum"),
max_value=properties.get("maximum"),
key=field_name)
elif field_type == "string":
field_value = st.text_input(field_name, value=properties["default"])
elif field_type == "boolean":
# TODO: Add support for boolean fields in optimize tab
field_value = st.checkbox(field_name, value=properties["default"])
else:
raise ValueError(f"Field type {field_type} not supported")
st.session_state["strategy_params"][field_name] = field_value
column_index = (column_index + 1) % 4
with c2:
add_positions = st.checkbox("Add positions", value=True)
add_volume = st.checkbox("Add volume", value=True)
add_pnl = st.checkbox("Add PnL", value=True)
run_backtesting_button = st.button("Run Backtesting!")
if run_backtesting_button:
config = strategy["config"](**st.session_state["strategy_params"])
strategy = strategy["class"](config=config)
# TODO: add form for order amount, leverage, tp, sl, etc.
market_data, positions = strategy.run_backtesting(
start='2021-04-01',
order_amount=50,
leverage=20,
initial_portfolio=100,
take_profit_multiplier=2.3,
stop_loss_multiplier=1.2,
time_limit=60 * 60 * 3,
std_span=None,
)
strategy_analysis = StrategyAnalysis(
positions=positions,
candles_df=market_data,
)
st.text(strategy_analysis.text_report())
# TODO: check why the pnl is not being plotted
strategy_analysis.create_base_figure(volume=add_volume, positions=add_positions, trade_pnl=add_pnl)
st.plotly_chart(strategy_analysis.figure(), use_container_width=True)
with optimize:
# TODO:
# * Add videos explaining how to use the optimization tool, quick intro to optuna, etc in a toggle
with st.container():
c1, c2, c3 = st.columns([1, 1, 1])
with c1:
strategies = load_directional_strategies(constants.DIRECTIONAL_STRATEGIES_PATH)
strategy_to_optimize = st.selectbox("Select strategy to optimize", strategies.keys())
with c2:
today = datetime.datetime.today()
# TODO: add hints about the study name
STUDY_NAME = st.text_input("Study name",
f"{strategy_to_optimize}_study_{today.day:02d}-{today.month:02d}-{today.year}")
with c3:
generate_optimization_code_button = st.button("Generate Optimization Code")
if st.button("Launch optuna dashboard"):
os_utils.execute_bash_command(f"optuna-dashboard sqlite:///data/backtesting/backtesting_report.db")
webbrowser.open("http://127.0.0.1:8080/dashboard", new=2)
c1, c2 = st.columns([4, 1])
if generate_optimization_code_button:
with c1:
# TODO: every time that we save and run the optimizations, we should save the code in a file
# so the user then can correlate the results with the code.
st.session_state.optimization_code = st_ace(key="create_optimization_code",
value=strategy_optimization_template(
strategy_info=strategies[strategy_to_optimize]),
language='python',
keybinding='vscode',
theme='pastel_on_dark')
if "optimization_code" in st.session_state:
with c2:
if st.button("Run optimization"):
save_file(name=f"{STUDY_NAME}.py", content=st.session_state.optimization_code, path=constants.OPTIMIZATIONS_PATH)
study = optuna.create_study(direction="maximize", study_name=STUDY_NAME,
storage="sqlite:///data/backtesting/backtesting_report.db",
load_if_exists=True)
objective = get_function_from_file(file_path=f"{constants.OPTIMIZATIONS_PATH}/{STUDY_NAME}.py",
function_name="objective")
def optimization_process():
study.optimize(objective, n_trials=2000)
optimization_thread = threading.Thread(target=optimization_process)
optimization_thread.start()
with analyze:
# TODO:
# * Add graphs for all backtesting results
# * Add management of backtesting results (delete, rename, save, share, upload s3, etc)
pass

0
quants_lab/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,84 @@
from typing import Optional
import numpy as np
import pandas as pd
def triple_barrier_method(df, tp=1.0, sl=1.0, tl=5, std_span: Optional[int] = 100, trade_cost=0.0006, max_executors: int = 1):
df.index = pd.to_datetime(df.timestamp, unit="ms")
if std_span:
df["target"] = df["close"].rolling(std_span).std() / df["close"]
else:
df["target"] = 1 / 100
df["tl"] = df.index + pd.Timedelta(seconds=tl)
df.dropna(subset="target", inplace=True)
df = apply_tp_sl_on_tl(df, tp=tp, sl=sl)
df = get_bins(df, trade_cost)
df['tp'] = df['close'] * (1 + df['target'] * tp * df["side"])
df['sl'] = df['close'] * (1 - df['target'] * sl * df["side"])
df = add_active_signals(df, max_executors)
return df
def add_active_signals(df, max_executors):
close_times = [pd.Timestamp.min] * max_executors
df["active_signal"] = 0
for index, row in df[(df["side"] != 0)].iterrows():
for close_time in close_times:
if row["timestamp"] > close_time:
df.loc[df.index == index, "active_signal"] = 1
close_times.remove(close_time)
close_times.append(row["close_time"])
break
return df
def get_bins(df, trade_cost):
# 1) prices aligned with events
px = df.index.union(df['tl'].values).drop_duplicates()
px = df.close.reindex(px, method='ffill')
# 2) create out object
df['ret'] = (px.loc[df['close_time'].values].values / px.loc[df.index] - 1) * df['side']
df['real_class'] = np.sign(df['ret'] - trade_cost)
return df
def apply_tp_sl_on_tl(df: pd.DataFrame, tp: float, sl: float):
events = df[df["side"] != 0].copy()
if tp > 0:
take_profit = tp * events['target']
else:
take_profit = pd.Series(index=df.index) # NaNs
if sl > 0:
stop_loss = - sl * events['target']
else:
stop_loss = pd.Series(index=df.index) # NaNs
for loc, tl in events['tl'].fillna(df.index[-1]).items():
# In the future we can think about including High and Low prices in the calculation
# side = events.at[loc, 'side'] # side (1 or -1)
# sl = stop_loss[loc]
# tp = take_profit[loc]
# close = df.close[loc] # path close price
# # path_close = df.close[loc:tl] # path prices
# path_high = (df.high[loc:tl] / close) - 1 # path high prices
# path_low = (df.low[loc:tl] / close) - 1 # path low prices
# if side == 1:
# df.loc[loc, 'stop_loss_time'] = path_low[path_low < sl].index.min() # earliest stop loss.
# df.loc[loc, 'take_profit_time'] = path_high[path_high > tp].index.min() # earliest profit taking.
# elif side == -1:
# df.loc[loc, 'stop_loss_time'] = path_high[path_high > -sl].index.min()
# df.loc[loc, 'take_profit_time'] = path_low[path_low < -tp].index.min()
df0 = df.close[loc:tl] # path prices
df0 = (df0 / df.close[loc] - 1) * events.at[loc, 'side'] # path returns
df.loc[loc, 'stop_loss_time'] = df0[df0 < stop_loss[loc]].index.min() # earliest stop loss.
df.loc[loc, 'take_profit_time'] = df0[df0 > take_profit[loc]].index.min() # earliest profit taking.
df["close_time"] = df[["tl", "take_profit_time", "stop_loss_time"]].dropna(how='all').min(axis=1)
df['close_type'] = df[['take_profit_time', 'stop_loss_time', 'tl']].dropna(how='all').idxmin(axis=1)
df['close_type'].replace({'take_profit_time': 'tp', 'stop_loss_time': 'sl'}, inplace=True)
return df

View File

View File

View File

@@ -0,0 +1,94 @@
import os
from datetime import datetime
from typing import Optional, TypeVar, Generic
import pandas as pd
from pydantic import BaseModel
from quants_lab.labeling.triple_barrier_method import triple_barrier_method
ConfigType = TypeVar("ConfigType", bound=BaseModel)
class DirectionalStrategyBase(Generic[ConfigType]):
# TODO:
# * Add a data structure to request candles from CSV files as part of the config
# * Evaluate to move the get data outside the backtesting to optimize the performance.
def __init__(self, config: ConfigType):
self.config = config
def get_data(self, start: Optional[str] = None, end: Optional[str] = None):
df = self.get_raw_data()
return self.filter_df_by_time(df, start, end)
def get_raw_data(self):
raise NotImplemented
def preprocessing(self, df):
raise NotImplemented
def predict(self, df):
raise NotImplemented
@staticmethod
def get_candles(exchange: str, trading_pair: str, interval: str) -> pd.DataFrame:
"""
Get a dataframe of market data from the database.
:param exchange: Exchange name
:param trading_pair: Trading pair
:param interval: Interval of the data
:return: Dataframe of market data
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = os.path.join(script_dir, "../../data/candles")
filename = f"candles_{exchange}_{trading_pair.upper()}_{interval}.csv"
file_path = os.path.join(data_dir, filename)
if not os.path.exists(file_path):
raise FileNotFoundError(f"File '{file_path}' does not exist.")
df = pd.read_csv(file_path)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
return df
@staticmethod
def filter_df_by_time(df, start: Optional[str] = None, end: Optional[str] = None):
if start is not None:
start_condition = df["timestamp"] >= datetime.strptime(start, "%Y-%m-%d")
else:
start_condition = pd.Series([True]*len(df))
if end is not None:
end_condition = df["timestamp"] <= datetime.strptime(end, "%Y-%m-%d")
else:
end_condition = pd.Series([True]*len(df))
return df[start_condition & end_condition]
def run_backtesting(self,
take_profit_multiplier, stop_loss_multiplier, time_limit,
std_span, order_amount=100, leverage=20, initial_portfolio=1000,
taker_fee=0.0003, maker_fee=0.00012,
start: Optional[str] = None, end: Optional[str] = None):
df = self.get_data(start=start, end=end)
df = self.preprocessing(df)
df = self.predict(df)
df = triple_barrier_method(
df=df,
std_span=std_span,
tp=take_profit_multiplier,
sl=stop_loss_multiplier,
tl=time_limit,
trade_cost=taker_fee * 2,
max_executors=1,
)
first_row = df.iloc[0].tolist()
first_row.extend([0, 0, 0, 0, 0, initial_portfolio])
active_signals = df[df["active_signal"] == 1].copy()
active_signals.loc[:, "amount"] = order_amount
active_signals.loc[:, "margin_used"] = order_amount / leverage
active_signals.loc[:, "fee_pct"] = active_signals["close_type"].apply(
lambda x: maker_fee + taker_fee if x == "tp" else taker_fee * 2)
active_signals.loc[:, "fee_usd"] = active_signals["fee_pct"] * active_signals["amount"]
active_signals.loc[:, "ret_usd"] = active_signals.apply(lambda x: (x["ret"] - x["fee_pct"]) * x["amount"],
axis=1)
active_signals.loc[:, "current_portfolio"] = initial_portfolio + active_signals["ret_usd"].cumsum()
active_signals.loc[:, "current_portfolio"].fillna(method='ffill', inplace=True)
positions = pd.concat([pd.DataFrame([first_row], columns=active_signals.columns), active_signals])
return df, positions.reset_index(drop=True)

View File

@@ -0,0 +1,36 @@
import pandas_ta as ta
from pydantic import BaseModel, Field
from quants_lab.strategy.directional_strategy_base import DirectionalStrategyBase
class BollingerConf(BaseModel):
exchange: str = Field(default="binance_perpetual")
trading_pair: str = Field(default="ETH-USDT")
interval: str = Field(default="1h")
bb_length: int = Field(default=100, ge=2, le=1000)
bb_std: float = Field(default=2.0, ge=0.5, le=4.0)
bb_long_threshold: float = Field(default=0.0, ge=-3.0, le=0.5)
bb_short_threshold: float = Field(default=1.0, ge=0.5, le=3.0)
class Bollinger(DirectionalStrategyBase[BollingerConf]):
def get_raw_data(self):
df = self.get_candles(
exchange=self.config.exchange,
trading_pair=self.config.trading_pair,
interval=self.config.interval,
)
return df
def preprocessing(self, df):
df.ta.bbands(length=self.config.bb_length, std=self.config.bb_std, append=True)
return df
def predict(self, df):
df["side"] = 0
long_condition = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"] < self.config.bb_long_threshold
short_condition = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"] > self.config.bb_short_threshold
df.loc[long_condition, "side"] = 1
df.loc[short_condition, "side"] = -1
return df

View File

@@ -0,0 +1,45 @@
import pandas_ta as ta
from pydantic import BaseModel, Field
from quants_lab.strategy.directional_strategy_base import DirectionalStrategyBase
class MACDBBConfig(BaseModel):
exchange: str = Field(default="binance_perpetual")
trading_pair: str = Field(default="ETH-USDT")
interval: str = Field(default="1h")
bb_length: int = Field(default=24, ge=2, le=1000)
bb_std: float = Field(default=2.0, ge=0.5, le=4.0)
bb_long_threshold: float = Field(default=0.0, ge=-3.0, le=0.5)
bb_short_threshold: float = Field(default=1.0, ge=0.5, le=3.0)
fast_macd: int = Field(default=21, ge=2, le=100)
slow_macd: int = Field(default=42, ge=30, le=1000)
signal_macd: int = Field(default=9, ge=2, le=100)
class MacdBollinger(DirectionalStrategyBase[MACDBBConfig]):
def get_raw_data(self):
df = self.get_candles(
exchange=self.config.exchange,
trading_pair=self.config.trading_pair,
interval=self.config.interval,
)
return df
def preprocessing(self, df):
df.ta.bbands(length=self.config.bb_length, std=self.config.bb_std, append=True)
df.ta.macd(fast=self.config.fast_macd, slow=self.config.slow_macd, signal=self.config.signal_macd, append=True)
return df
def predict(self, df):
bbp = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"]
macdh = df[f"MACDh_{self.config.fast_macd}_{self.config.slow_macd}_{self.config.signal_macd}"]
macd = df[f"MACD_{self.config.fast_macd}_{self.config.slow_macd}_{self.config.signal_macd}"]
long_condition = (bbp < self.config.bb_long_threshold) & (macdh > 0) & (macd < 0)
short_condition = (bbp > self.config.bb_short_threshold) & (macdh < 0) & (macd > 0)
df["side"] = 0
df.loc[long_condition, "side"] = 1
df.loc[short_condition, "side"] = -1
return df

View File

@@ -0,0 +1,47 @@
import pandas as pd
import pandas_ta as ta
from pydantic import BaseModel, Field
from quants_lab.strategy.directional_strategy_base import DirectionalStrategyBase
class StatArbConfig(BaseModel):
exchange: str = Field(default="binance_perpetual")
trading_pair: str = Field(default="ETH-USDT")
target_trading_pair: str = Field(default="BTC-USDT")
interval: str = Field(default="1h")
lookback: int = Field(default=100, ge=2, le=10000)
z_score_long: float = Field(default=2, ge=0, le=5)
z_score_short: float = Field(default=-2, ge=-5, le=0)
class StatArb(DirectionalStrategyBase[StatArbConfig]):
def get_raw_data(self):
df = self.get_candles(
exchange=self.config.exchange,
trading_pair=self.config.trading_pair,
interval=self.config.interval,
)
df_target = self.get_candles(
exchange=self.config.exchange,
trading_pair=self.config.target_trading_pair,
interval=self.config.interval,
)
df = pd.merge(df, df_target, on="timestamp", how='inner', suffixes=('', '_target'))
return df
def preprocessing(self, df):
df["pct_change_original"] = df["close"].pct_change()
df["pct_change_target"] = df["close_target"].pct_change()
df["spread"] = df["pct_change_target"] - df["pct_change_original"]
df["cum_spread"] = df["spread"].rolling(self.config.lookback).sum()
df["z_score"] = ta.zscore(df["cum_spread"], length=self.config.lookback)
return df
def predict(self, df):
df["side"] = 0
short_condition = df["z_score"] < - self.config.z_score_short
long_condition = df["z_score"] > self.config.z_score_long
df.loc[long_condition, "side"] = 1
df.loc[short_condition, "side"] = -1
return df

View File

@@ -0,0 +1,209 @@
from typing import Optional
import pandas as pd
from plotly.subplots import make_subplots
import pandas_ta as ta # noqa: F401
import plotly.graph_objs as go
import numpy as np
class StrategyAnalysis:
def __init__(self, positions: pd.DataFrame, candles_df: Optional[pd.DataFrame] = None):
self.candles_df = candles_df
self.positions = positions
self.base_figure = None
def create_base_figure(self, candlestick=True, volume=True, positions=False, trade_pnl=False, extra_rows=0):
rows, heights = self.get_n_rows_and_heights(extra_rows + volume + trade_pnl, volume)
self.rows = rows
specs = [[{"secondary_y": True}]] * rows
self.base_figure = make_subplots(rows=rows, cols=1, shared_xaxes=True, vertical_spacing=0.05,
row_heights=heights, specs=specs)
if candlestick:
self.add_candles_graph()
if volume:
self.add_volume()
if positions:
self.add_positions()
if trade_pnl:
self.add_trade_pnl()
self.update_layout(volume)
def add_positions(self):
# Add long and short positions
active_signals = self.positions.copy()
active_signals.loc[active_signals['side'] == -1, 'symbol'] = 'triangle-down'
active_signals.loc[active_signals['side'] == 1, 'symbol'] = 'triangle-up'
active_signals.loc[active_signals['real_class'] == 1, 'color'] = 'lightgreen'
active_signals.loc[active_signals['real_class'] == -1, 'color'] = 'red'
self.base_figure.add_trace(go.Scatter(x=active_signals.loc[(active_signals['side'] != 0), 'timestamp'],
y=active_signals.loc[active_signals['side'] != 0, 'close'],
name='Entry Price: $',
mode='markers',
marker_color=active_signals.loc[(active_signals['side'] != 0), 'color'],
marker_symbol=active_signals.loc[(active_signals['side'] != 0), 'symbol'],
marker_size=20,
marker_line={'color': 'black', 'width': 0.7}),
row=1, col=1)
for index, row in active_signals.iterrows():
self.base_figure.add_shape(type="rect",
fillcolor="green",
opacity=0.5,
x0=row.timestamp,
y0=row.close,
x1=row.close_time,
y1=row.tp,
line=dict(color="green"),
row=1, col=1)
# Add SL
self.base_figure.add_shape(type="rect",
fillcolor="red",
opacity=0.5,
x0=row.timestamp,
y0=row.close,
x1=row.close_time,
y1=row.sl,
line=dict(color="red"),
row=1, col=1)
def get_n_rows_and_heights(self, extra_rows, volume=True):
rows = 1 + extra_rows + volume
row_heights = [0.5] * (extra_rows)
if volume:
row_heights.insert(0, 0.2)
row_heights.insert(0, 0.8)
return rows, row_heights
def figure(self):
return self.base_figure
def add_candles_graph(self):
self.base_figure.add_trace(
go.Candlestick(
x=self.candles_df['timestamp'],
open=self.candles_df['open'],
high=self.candles_df['high'],
low=self.candles_df['low'],
close=self.candles_df['close'],
name="OHLC"
),
row=1, col=1,
)
def add_volume(self):
self.base_figure.add_trace(
go.Bar(
x=self.candles_df['timestamp'],
y=self.candles_df['volume'],
name="Volume",
opacity=0.5,
marker=dict(color='lightgreen')
),
row=2, col=1,
)
def add_trade_pnl(self, row=2):
self.base_figure.add_trace(
go.Scatter(
x=self.positions['timestamp'],
y=self.positions['ret_usd'].cumsum(),
name="Cumulative Trade PnL",
mode='lines',
line=dict(color='chocolate', width=2)),
row=row, col=1
)
self.base_figure.update_yaxes(title_text='Cum Trade PnL', row=row, col=1)
def update_layout(self, volume=True):
self.base_figure.update_layout(
title={
'text': "Backtesting Analysis",
'y': 0.95,
'x': 0.5,
'xanchor': 'center',
'yanchor': 'top'
},
legend=dict(
orientation="h",
yanchor="bottom",
y=-0.2,
xanchor="right",
x=1
),
height=1000,
xaxis_rangeslider_visible=False,
hovermode='x unified'
)
self.base_figure.update_yaxes(title_text="Price", row=1, col=1)
if volume:
self.base_figure.update_yaxes(title_text="Volume", row=2, col=1)
self.base_figure.update_xaxes(title_text="Time", row=self.rows, col=1)
def initial_portfolio(self):
return self.positions['current_portfolio'].dropna().values[0]
def final_portfolio(self):
return self.positions['current_portfolio'].dropna().values[-1]
def net_profit_usd(self):
return self.final_portfolio() - self.initial_portfolio()
def net_profit_pct(self):
return self.net_profit_usd() / self.initial_portfolio()
def returns(self):
return self.positions['ret_usd'] / self.initial_portfolio()
def total_positions(self):
return self.positions.shape[0] - 1
def win_signals(self):
return self.positions.loc[(self.positions['real_class'] > 0) & (self.positions["side"] != 0)]
def loss_signals(self):
return self.positions.loc[(self.positions['real_class'] < 0) & (self.positions["side"] != 0)]
def accuracy(self):
return self.win_signals().shape[0] / self.total_positions()
def max_drawdown_usd(self):
cumulative_returns = self.positions["ret_usd"].cumsum()
peak = np.maximum.accumulate(cumulative_returns)
drawdown = (cumulative_returns - peak)
max_draw_down = np.min(drawdown)
return max_draw_down
def max_drawdown_pct(self):
return self.max_drawdown_usd() / self.initial_portfolio()
def sharpe_ratio(self):
returns = self.returns()
return returns.mean() / returns.std()
def profit_factor(self):
total_won = self.win_signals().loc[:, 'ret_usd'].sum()
total_loss = - self.loss_signals().loc[:, 'ret_usd'].sum()
return total_won / total_loss
def duration_in_minutes(self):
return (self.positions['timestamp'].iloc[-1] - self.positions['timestamp'].iloc[0]).total_seconds() / 60
def avg_trading_time_in_minutes(self):
time_diff_minutes = (pd.to_datetime(self.positions['close_time']) - self.positions['timestamp']).dt.total_seconds() / 60
return time_diff_minutes.mean()
def text_report(self):
return f"""
Strategy Performance Report:
- Net Profit: {self.net_profit_usd():,.2f} USD ({self.net_profit_pct() * 100:,.2f}%)
- Total Positions: {self.total_positions()}
- Win Signals: {self.win_signals().shape[0]}
- Loss Signals: {self.loss_signals().shape[0]}
- Accuracy: {self.accuracy():,.2f}%
- Profit Factor: {self.profit_factor():,.2f}
- Max Drawdown: {self.max_drawdown_usd():,.2f} USD | {self.max_drawdown_pct() * 100:,.2f}%
- Sharpe Ratio: {self.sharpe_ratio():,.2f}
- Duration: {self.duration_in_minutes() / 60:,.2f} Hours
- Average Trade Duration: {self.avg_trading_time_in_minutes():,.2f} minutes
"""

View File

@@ -1,91 +0,0 @@
import subprocess
from typing import Dict
import yaml
import constants
from utils import os_utils
class DockerManager:
def __init__(self):
pass
@staticmethod
def get_active_containers():
cmd = "docker ps --format '{{.Names}}'"
output = subprocess.check_output(cmd, shell=True)
backtestings = [container for container in output.decode().split()]
return backtestings
@staticmethod
def get_exited_containers():
cmd = "docker ps --filter status=exited --format '{{.Names}}'"
output = subprocess.check_output(cmd, shell=True)
containers = output.decode().split()
return containers
@staticmethod
def clean_exited_containers():
cmd = "docker container prune --force"
subprocess.Popen(cmd, shell=True)
def stop_active_containers(self):
containers = self.get_active_containers()
for container in containers:
cmd = f"docker stop {container}"
subprocess.Popen(cmd, shell=True)
def stop_container(self, container_name):
cmd = f"docker stop {container_name}"
subprocess.Popen(cmd, shell=True)
def start_container(self, container_name):
cmd = f"docker start {container_name}"
subprocess.Popen(cmd, shell=True)
def remove_container(self, container_name):
cmd = f"docker rm {container_name}"
subprocess.Popen(cmd, shell=True)
def create_download_candles_container(self, candles_config: Dict):
os_utils.dump_dict_to_yaml(candles_config, constants.DOWNLOAD_CANDLES_CONFIG_YML)
command = ["docker", "compose", "-p", "data_downloader", "-f",
"hummingbot_files/compose_files/data-downloader-compose.yml", "up", "-d"]
subprocess.Popen(command)
def create_broker(self):
command = ["docker", "compose", "-p", "hummingbot-broker", "-f",
"hummingbot_files/compose_files/broker-compose.yml", "up", "-d", "--remove-orphans"]
subprocess.Popen(command)
def create_hummingbot_instance(self, instance_name):
bot_name = f"hummingbot-{instance_name}"
base_conf_folder = f"{constants.BOTS_FOLDER}/data_downloader/conf"
bot_folder = f"{constants.BOTS_FOLDER}/{bot_name}"
if not os_utils.directory_exists(bot_folder):
create_folder_command = ["mkdir", "-p", bot_folder]
create_folder_task = subprocess.Popen(create_folder_command)
create_folder_task.wait()
command = ["cp", "-rf", base_conf_folder, bot_folder]
copy_folder_task = subprocess.Popen(command)
copy_folder_task.wait()
conf_file_path = f"{bot_folder}/conf/conf_client.yml"
config = os_utils.read_yaml_file(conf_file_path)
config['instance_id'] = bot_name
os_utils.dump_dict_to_yaml(config, conf_file_path)
# TODO: Mount script folder for custom scripts
create_container_command = ["docker", "run", "-it", "-d", "--log-opt", "max-size=10m", "--log-opt",
"max-file=5",
"--name", bot_name,
"--network", "host",
"-v", f"./{bot_folder}/conf:/home/hummingbot/conf",
"-v", f"./{bot_folder}/conf/connectors:/home/hummingbot/conf/connectors",
"-v", f"./{bot_folder}/conf/strategies:/home/hummingbot/conf/strategies",
"-v", f"./{bot_folder}/logs:/home/hummingbot/logs",
"-v", "./data/:/home/hummingbot/data",
# "-v", f"./{bot_folder}/scripts:/home/hummingbot/scripts",
"-v", f"./{bot_folder}/certs:/home/hummingbot/certs",
"-e", "CONFIG_PASSWORD=a",
"dardonacci/hummingbot:development"]
subprocess.Popen(create_container_command)

118
utils/file_templates.py Normal file
View File

@@ -0,0 +1,118 @@
from typing import Dict
def directional_strategy_template(strategy_cls_name: str) -> str:
strategy_config_cls_name = f"{strategy_cls_name}Config"
sma_config_text = "{self.config.sma_length}"
return f"""import pandas_ta as ta
from pydantic import BaseModel, Field
from quants_lab.strategy.directional_strategy_base import DirectionalStrategyBase
class {strategy_config_cls_name}(BaseModel):
exchange: str = Field(default="binance_perpetual")
trading_pair: str = Field(default="ETH-USDT")
interval: str = Field(default="1h")
sma_length: int = Field(default=20, ge=10, le=200)
# ... Add more fields here
class {strategy_cls_name}(DirectionalStrategyBase[{strategy_config_cls_name}]):
def get_raw_data(self):
# The method get candles will search for the data in the folder data/candles
# If the data is not there, you can use the candles downloader to get the data
df = self.get_candles(
exchange=self.config.exchange,
trading_pair=self.config.trading_pair,
interval=self.config.interval,
)
return df
def preprocessing(self, df):
df.ta.sma(length=self.config.sma_length, append=True)
# ... Add more indicators here
# ... Check https://github.com/twopirllc/pandas-ta#indicators-by-category for more indicators
# ... Use help(ta.indicator_name) to get more info
return df
def predict(self, df):
# Generate long and short conditions
long_cond = (df['close'] > df[f'SMA_{sma_config_text}'])
short_cond = (df['close'] < df[f'SMA_{sma_config_text}'])
# Choose side
df['side'] = 0
df.loc[long_cond, 'side'] = 1
df.loc[short_cond, 'side'] = -1
return df
"""
def get_optuna_suggest_str(field_name: str, properties: Dict):
map_by_type = {
"number": "trial.suggest_float",
"integer": "trial.suggest_int",
"string": "trial.suggest_categorical",
}
config_num = f"('{field_name}', {properties.get('minimum', '_')}, {properties.get('maximum', '_')})"
config_cat = f"('{field_name}', ['{properties.get('default', '_')}',])"
optuna_trial_str = map_by_type[properties["type"]] + config_num if properties["type"] != "string" \
else map_by_type[properties["type"]] + config_cat
return f"{field_name}={optuna_trial_str}"
def strategy_optimization_template(strategy_info: dict):
strategy_cls = strategy_info["class"]
strategy_config = strategy_info["config"]
strategy_module = strategy_info["module"]
field_schema = strategy_config.schema()["properties"]
fields_str = [get_optuna_suggest_str(field_name, properties) for field_name, properties in field_schema.items()]
fields_str = "".join([f" {field_str},\n" for field_str in fields_str])
return f"""import traceback
from optuna import TrialPruned
from quants_lab.strategy.experiments.{strategy_module} import {strategy_cls.__name__}, {strategy_config.__name__}
from quants_lab.strategy.strategy_analysis import StrategyAnalysis
def objective(trial):
try:
config = {strategy_config.__name__}(
{fields_str}
)
strategy = {strategy_cls.__name__}(config=config)
market_data, positions = strategy.run_backtesting(
start='2021-04-01',
order_amount=50,
leverage=20,
initial_portfolio=100,
take_profit_multiplier=trial.suggest_float("take_profit_multiplier", 1.0, 3.0),
stop_loss_multiplier=trial.suggest_float("stop_loss_multiplier", 1.0, 3.0),
time_limit=60 * 60 * trial.suggest_int("time_limit", 1, 24),
std_span=None,
)
strategy_analysis = StrategyAnalysis(
positions=positions,
)
trial.set_user_attr("net_profit_usd", strategy_analysis.net_profit_usd())
trial.set_user_attr("net_profit_pct", strategy_analysis.net_profit_pct())
trial.set_user_attr("max_drawdown_usd", strategy_analysis.max_drawdown_usd())
trial.set_user_attr("max_drawdown_pct", strategy_analysis.max_drawdown_pct())
trial.set_user_attr("sharpe_ratio", strategy_analysis.sharpe_ratio())
trial.set_user_attr("accuracy", strategy_analysis.accuracy())
trial.set_user_attr("total_positions", strategy_analysis.total_positions())
trial.set_user_attr("win_signals", strategy_analysis.win_signals().shape[0])
trial.set_user_attr("loss_signals", strategy_analysis.loss_signals().shape[0])
trial.set_user_attr("profit_factor", strategy_analysis.profit_factor())
trial.set_user_attr("duration_in_hours", strategy_analysis.duration_in_minutes() / 60)
trial.set_user_attr("avg_trading_time_in_hours", strategy_analysis.avg_trading_time_in_minutes() / 60)
return strategy_analysis.net_profit_pct()
except Exception as e:
traceback.print_exc()
raise TrialPruned()
"""

View File

@@ -1,6 +1,12 @@
import os
import glob
import subprocess
import importlib.util
import inspect
import os
from pydantic import BaseModel
from quants_lab.strategy.directional_strategy_base import DirectionalStrategyBase # update this to the actual import
import yaml
@@ -26,3 +32,64 @@ def read_yaml_file(file_path):
def directory_exists(directory: str):
return os.path.exists(directory)
def save_file(name: str, content: str, path: str):
complete_file_path = os.path.join(path, name)
os.makedirs(path, exist_ok=True)
with open(complete_file_path, "w") as file:
file.write(content)
def load_file(path: str) -> str:
try:
with open(path, 'r') as file:
contents = file.read()
return contents
except FileNotFoundError:
print(f"File '{path}' not found.")
return ""
except IOError:
print(f"Error reading file '{path}'.")
return ""
def get_python_files_from_directory(directory: str) -> list:
py_files = glob.glob(directory + "/**/*.py", recursive=True)
py_files = [path for path in py_files if not path.endswith("__init__.py")]
return py_files
def load_directional_strategies(path):
strategies = {}
for filename in os.listdir(path):
if filename.endswith('.py') and "__init__" not in filename:
module_name = filename[:-3] # strip the .py to get the module name
strategies[module_name] = {"module": module_name}
file_path = os.path.join(path, filename)
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
for name, cls in inspect.getmembers(module, inspect.isclass):
if issubclass(cls, DirectionalStrategyBase) and cls is not DirectionalStrategyBase:
strategies[module_name]["class"] = cls
if issubclass(cls, BaseModel) and cls is not BaseModel:
strategies[module_name]["config"] = cls
return strategies
def get_function_from_file(file_path: str, function_name: str):
# Create a module specification from the file path and load it
spec = importlib.util.spec_from_file_location("module.name", file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Get the function from the module
function = getattr(module, function_name)
return function
def execute_bash_command(command: str, shell: bool = True, wait: bool = False):
process = subprocess.Popen(command, shell=shell)
if wait:
process.wait()