Merge pull request #61 from hummingbot/feat/backtesting-frontend-v2

Feat/backtesting frontend v2
This commit is contained in:
dardonacci
2023-08-08 13:32:23 +02:00
committed by GitHub
5 changed files with 568 additions and 1 deletions

View File

@@ -1,4 +1,156 @@
import constants
import os
import json
import streamlit as st
from quants_lab.strategy.strategy_analysis import StrategyAnalysis
from utils.graphs import BacktestingGraphs
from utils.optuna_database_manager import OptunaDBManager
from utils.os_utils import load_directional_strategies
from utils.st_utils import initialize_st_page
initialize_st_page(title="Analyze", icon="🔬", initial_sidebar_state="collapsed")
@st.cache_resource
def get_databases():
sqlite_files = [db_name for db_name in os.listdir("data/backtesting") if db_name.endswith(".db")]
databases_list = [OptunaDBManager(db) for db in sqlite_files]
databases_dict = {database.db_name: database for database in databases_list}
return [x.db_name for x in databases_dict.values() if x.status == 'OK']
def initialize_session_state_vars():
if "strategy_params" not in st.session_state:
st.session_state.strategy_params = {}
if "backtesting_params" not in st.session_state:
st.session_state.backtesting_params = {}
initialize_session_state_vars()
dbs = get_databases()
if not dbs:
st.warning("We couldn't find any Optuna database.")
selected_db_name = None
selected_db = None
else:
# Select database from selectbox
selected_db = st.selectbox("Select your database:", dbs)
# Instantiate database manager
opt_db = OptunaDBManager(selected_db)
# Load studies
studies = opt_db.load_studies()
# Choose study
study_selected = st.selectbox("Select a study:", studies.keys())
# Filter trials from selected study
merged_df = opt_db.merged_df[opt_db.merged_df["study_name"] == study_selected]
bt_graphs = BacktestingGraphs(merged_df)
# Show and compare all of the study trials
st.plotly_chart(bt_graphs.pnl_vs_maxdrawdown(), use_container_width=True)
# Get study trials
trials = studies[study_selected]
# Choose trial
trial_selected = st.selectbox("Select a trial to backtest", list(trials.keys()))
trial = trials[trial_selected]
# Transform trial config in a dictionary
trial_config = json.loads(trial["config"])
# Strategy parameters section
st.write("## Strategy parameters")
# Load strategies (class, config, module)
strategies = load_directional_strategies(constants.DIRECTIONAL_STRATEGIES_PATH)
# Select strategy
strategy = strategies[trial_config["name"]]
# Get field schema
field_schema = strategy["config"].schema()["properties"]
c1, c2 = st.columns([5, 1])
# Render every field according to schema
with c1:
columns = st.columns(4)
column_index = 0
for field_name, properties in field_schema.items():
field_type = properties["type"]
field_value = trial_config[field_name]
with columns[column_index]:
if field_type in ["number", "integer"]:
field_value = st.number_input(field_name,
value=field_value,
min_value=properties.get("minimum"),
max_value=properties.get("maximum"),
key=field_name)
elif field_type == "string":
field_value = st.text_input(field_name, value=field_value)
elif field_type == "boolean":
# TODO: Add support for boolean fields in optimize tab
field_value = st.checkbox(field_name, value=field_value)
else:
raise ValueError(f"Field type {field_type} not supported")
try:
st.session_state["strategy_params"][field_name] = field_value
except KeyError as e:
pass
column_index = (column_index + 1) % 4
with c2:
add_positions = st.checkbox("Add positions", value=True)
add_volume = st.checkbox("Add volume", value=True)
add_pnl = st.checkbox("Add PnL", value=True)
# Backtesting parameters section
st.write("## Backtesting parameters")
# Get every trial params
# TODO: Filter only from selected study
backtesting_configs = opt_db.load_params()
# Get trial backtesting params
backtesting_params = backtesting_configs[trial_selected]
col1, col2, col3 = st.columns(3)
with col1:
selected_order_amount = st.number_input("Order amount",
value=50.0,
min_value=0.1,
max_value=999999999.99)
selected_leverage = st.number_input("Leverage",
value=10,
min_value=1,
max_value=200)
with col2:
selected_initial_portfolio = st.number_input("Initial portfolio",
value=10000.00,
min_value=1.00,
max_value=999999999.99)
selected_time_limit = st.number_input("Time Limit",
value=60 * 60 * backtesting_params["time_limit"]["param_value"],
min_value=60 * 60 * float(backtesting_params["time_limit"]["low"]),
max_value=60 * 60 * float(backtesting_params["time_limit"]["high"]))
with col3:
selected_tp_multiplier = st.number_input("Take Profit Multiplier",
value=backtesting_params["take_profit_multiplier"]["param_value"],
min_value=backtesting_params["take_profit_multiplier"]["low"],
max_value=backtesting_params["take_profit_multiplier"]["high"])
selected_sl_multiplier = st.number_input("Stop Loss Multiplier",
value=backtesting_params["stop_loss_multiplier"]["param_value"],
min_value=backtesting_params["stop_loss_multiplier"]["low"],
max_value=backtesting_params["stop_loss_multiplier"]["high"])
if st.button("Run Backtesting!"):
config = strategy["config"](**st.session_state["strategy_params"])
strategy = strategy["class"](config=config)
try:
market_data, positions = strategy.run_backtesting(
order_amount=selected_order_amount,
leverage=selected_order_amount,
initial_portfolio=selected_initial_portfolio,
take_profit_multiplier=selected_tp_multiplier,
stop_loss_multiplier=selected_sl_multiplier,
time_limit=selected_time_limit,
std_span=None,
)
strategy_analysis = StrategyAnalysis(
positions=positions,
candles_df=market_data,
)
metrics_container = bt_graphs.get_trial_metrics(strategy_analysis,
add_positions=add_positions,
add_volume=add_volume,
add_pnl=add_pnl)
except FileNotFoundError:
st.warning(f"The requested candles could not be found.")

View File

@@ -193,6 +193,15 @@ class StrategyAnalysis:
time_diff_minutes = (pd.to_datetime(self.positions['close_time']) - self.positions['timestamp']).dt.total_seconds() / 60
return time_diff_minutes.mean()
def start_date(self):
return self.candles_df.timestamp.min()
def end_date(self):
return self.candles_df.timestamp.max()
def avg_profit(self):
return self.positions.ret_usd.mean()
def text_report(self):
return f"""
Strategy Performance Report:
@@ -207,3 +216,16 @@ Strategy Performance Report:
- Duration: {self.duration_in_minutes() / 60:,.2f} Hours
- Average Trade Duration: {self.avg_trading_time_in_minutes():,.2f} minutes
"""
def pnl_over_time(self):
fig = go.Figure()
fig.add_trace(go.Scatter(name="PnL Over Time",
x=self.positions.index,
y=self.positions.ret_usd.cumsum()))
# Update layout with the required attributes
fig.update_layout(
title="PnL Over Time",
xaxis_title="N° Position",
yaxis=dict(title="Net PnL USD", side="left", showgrid=False),
)
return fig

View File

@@ -11,6 +11,7 @@ from quants_lab.strategy.directional_strategy_base import DirectionalStrategyBas
class {strategy_config_cls_name}(BaseModel):
name: str = "{strategy_cls_name.lower()}"
exchange: str = Field(default="binance_perpetual")
trading_pair: str = Field(default="ETH-USDT")
interval: str = Field(default="1h")

View File

@@ -4,6 +4,7 @@ import pandas_ta as ta # noqa: F401
import streamlit as st
from utils.data_manipulation import StrategyData, SingleMarketStrategyData
from quants_lab.strategy.strategy_analysis import StrategyAnalysis
import plotly.graph_objs as go
@@ -248,3 +249,97 @@ class CandlesGraph:
merged_df["trade_pnl_continuos"] = merged_df["unrealized_trade_pnl"] + merged_df["cum_net_amount"] * merged_df["close"]
merged_df["net_pnl_continuos"] = merged_df["trade_pnl_continuos"] - merged_df["cum_fees_in_quote"]
return merged_df
class BacktestingGraphs:
def __init__(self, study_df: pd.DataFrame):
self.study_df = study_df
def pnl_vs_maxdrawdown(self):
fig = go.Figure()
fig.add_trace(go.Scatter(name="Pnl vs Max Drawdown",
x=-100 * self.study_df["max_drawdown_pct"],
y=100 * self.study_df["net_profit_pct"],
mode="markers",
text=None,
hovertext=self.study_df["hover_text"]))
fig.update_layout(
title="PnL vs Max Drawdown",
xaxis_title="Max Drawdown [%]",
yaxis_title="Net Profit [%]",
height=800
)
fig.data[0].text = []
return fig
@staticmethod
def get_trial_metrics(strategy_analysis: StrategyAnalysis,
add_volume: bool = True,
add_positions: bool = True,
add_pnl: bool = True):
"""Isolated method because it needs to be called from analyze and simulate pages"""
metrics_container = st.container()
with metrics_container:
col1, col2 = st.columns(2)
with col1:
st.subheader("🏦 Market")
with col2:
st.subheader("📋 General stats")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Exchange", st.session_state["strategy_params"]["exchange"])
with col2:
st.metric("Trading Pair", st.session_state["strategy_params"]["trading_pair"])
with col3:
st.metric("Start date", strategy_analysis.start_date().strftime("%Y-%m-%d %H:%M"))
st.metric("End date", strategy_analysis.end_date().strftime("%Y-%m-%d %H:%M"))
with col4:
st.metric("Duration (hours)", f"{strategy_analysis.duration_in_minutes() / 60:.2f}")
st.metric("Price change", st.session_state["strategy_params"]["trading_pair"])
st.subheader("📈 Performance")
col1, col2, col3, col4, col5, col6, col7, col8 = st.columns(8)
with col1:
st.metric("Net PnL USD",
f"{strategy_analysis.net_profit_usd():.2f}",
delta=f"{100 * strategy_analysis.net_profit_pct():.2f}%",
help="The overall profit or loss achieved.")
with col2:
st.metric("Total positions",
f"{strategy_analysis.total_positions()}",
help="The total number of closed trades, winning and losing.")
with col3:
st.metric("Accuracy",
f"{100 * (len(strategy_analysis.win_signals()) / strategy_analysis.total_positions()):.2f} %",
help="The percentage of winning trades, the number of winning trades divided by the"
" total number of closed trades")
with col4:
st.metric("Profit factor",
f"{strategy_analysis.profit_factor():.2f}",
help="The amount of money the strategy made for every unit of money it lost, "
"gross profits divided by gross losses.")
with col5:
st.metric("Max Drawdown",
f"{strategy_analysis.max_drawdown_usd():.2f}",
delta=f"{100 * strategy_analysis.max_drawdown_pct():.2f}%",
help="The greatest loss drawdown, i.e., the greatest possible loss the strategy had compared "
"to its highest profits")
with col6:
st.metric("Avg Profit",
f"{strategy_analysis.avg_profit():.2f}",
help="The sum of money gained or lost by the average trade, Net Profit divided by "
"the overall number of closed trades.")
with col7:
st.metric("Avg Minutes",
f"{strategy_analysis.avg_trading_time_in_minutes():.2f}",
help="The average number of minutes that elapsed during trades for all closed trades.")
with col8:
st.metric("Sharpe Ratio",
f"{strategy_analysis.sharpe_ratio():.2f}",
help="The Sharpe ratio is a measure that quantifies the risk-adjusted return of an investment"
" or portfolio. It compares the excess return earned above a risk-free rate per unit of"
" risk taken.")
st.plotly_chart(strategy_analysis.pnl_over_time(), use_container_width=True)
strategy_analysis.create_base_figure(volume=add_volume, positions=add_positions, trade_pnl=add_pnl)
st.plotly_chart(strategy_analysis.figure(), use_container_width=True)
return metrics_container

View File

@@ -0,0 +1,297 @@
import os
import json
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from utils.data_manipulation import StrategyData
class OptunaDBManager:
def __init__(self, db_name):
self.db_name = db_name
self.db_path = f'sqlite:///{os.path.join("data/backtesting", db_name)}'
self.engine = create_engine(self.db_path, connect_args={'check_same_thread': False})
self.session_maker = sessionmaker(bind=self.engine)
@property
def status(self):
try:
with self.session_maker() as session:
query = 'SELECT * FROM trials WHERE state = "COMPLETE"'
completed_trials = pd.read_sql_query(query, session.connection())
if len(completed_trials) > 0:
# TODO: improve error handling, think what to do with other cases
return "OK"
else:
return "No records found in the trials table with completed state"
except Exception as e:
return f"Error: {str(e)}"
@property
def tables(self):
return self._get_tables()
def _get_tables(self):
try:
with self.session_maker() as session:
query = "SELECT name FROM sqlite_master WHERE type='table';"
tables = pd.read_sql_query(query, session.connection())
return tables["name"].tolist()
except Exception as e:
return f"Error: {str(e)}"
@property
def trials(self):
return self._get_trials_table()
def _get_trials_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trials", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def studies(self):
return self._get_studies_table()
def _get_studies_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM studies", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def trial_params(self):
return self._get_trial_params_table()
def _get_trial_params_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trial_params", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def trial_values(self):
return self._get_trial_values_table()
def _get_trial_values_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trial_values", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def trial_system_attributes(self):
return self._get_trial_system_attributes_table()
def _get_trial_system_attributes_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trial_system_attributes", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def trial_system_attributes(self):
return self._get_trial_system_attributes_table()
def _get_trial_system_attributes_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trial_system_attributes", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def version_info(self):
return self._get_version_info_table()
def _get_version_info_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM version_info", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def study_directions(self):
return self._get_study_directions_table()
def _get_study_directions_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM study_directions", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def study_user_attributes(self):
return self._get_study_user_attributes_table()
def _get_study_user_attributes_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM study_user_attributes", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def study_system_attributes(self):
return self._get_study_system_attributes_table()
def _get_study_system_attributes_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM study_system_attributes", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def trial_user_attributes(self):
return self._get_trial_user_attributes_table()
def _get_trial_user_attributes_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trial_user_attributes", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def trial_intermediate_values(self):
return self._get_trial_intermediate_values_table()
def _get_trial_intermediate_values_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trial_intermediate_values", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def trial_heartbeats(self):
return self._get_trial_heartbeats_table()
def _get_trial_heartbeats_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM trial_heartbeats", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def alembic_version(self):
return self._get_alembic_version_table()
def _get_alembic_version_table(self):
try:
with self.session_maker() as session:
df = pd.read_sql_query("SELECT * FROM alembic_version", session.connection())
return df
except Exception as e:
return f"Error: {str(e)}"
@property
def merged_df(self):
return self._get_merged_df()
@staticmethod
def _add_hovertext(x):
summary_label = (f"<b>Trial ID: {x['trial_id']}</b><br>"
f"<b>Study: {x['study_name']}</b><br>"
f"--------------------<br>"
f"Accuracy: {100 * x['accuracy']:.2f} %<br>"
f"Avg Trading Time in Hours: {x['avg_trading_time_in_hours']:.2f}<br>"
f"Duration in Hours: {x['duration_in_hours']:.2f}<br>"
f"Loss Signals: {x['loss_signals']}<br>"
f"Max Drawdown [%]: {100 * x['max_drawdown_pct']:.2f} %<br>"
f"Max Drawdown [USD]: $ {x['max_drawdown_usd']:.2f}<br>"
f"Net Profit [%]: {100 * x['net_profit_pct']:.2f} %<br>"
f"Net Profit [$]: $ {x['net_profit_usd']:.2f}<br>"
f"Profit Factor: {x['profit_factor']:.2f}<br>"
f"Sharpe Ratio: {x['sharpe_ratio']:.4f}<br>"
f"Total Positions: {x['total_positions']}<br>"
f"Win Signals: {x['win_signals']}<br>"
f"Trial value: {x['value']}<br>"
f"Direction: {x['direction']}<br>"
)
return summary_label
def _get_merged_df(self):
float_cols = ["accuracy", "avg_trading_time_in_hours", "duration_in_hours", "max_drawdown_pct", "max_drawdown_usd",
"net_profit_pct", "net_profit_usd", "profit_factor", "sharpe_ratio", "value"]
int_cols = ["loss_signals", "total_positions", "win_signals"]
merged_df = self.trials\
.merge(self.studies, on="study_id")\
.merge(pd.pivot(self.trial_user_attributes, index="trial_id", columns="key", values="value_json"),
on="trial_id")\
.merge(self.trial_values, on="trial_id")\
.merge(self.study_directions, on="study_id")
merged_df[float_cols] = merged_df[float_cols].astype("float")
merged_df[int_cols] = merged_df[int_cols].astype("int64")
merged_df["hover_text"] = merged_df.apply(self._add_hovertext, axis=1)
return merged_df
def load_studies(self):
df = self.merged_df
study_name_col = 'study_name'
trial_id_col = 'trial_id'
nested_dict = {}
for _, row in df.iterrows():
study_name = row[study_name_col]
trial_id = row[trial_id_col]
data_dict = row.drop([study_name_col, trial_id_col]).to_dict()
if study_name not in nested_dict:
nested_dict[study_name] = {}
nested_dict[study_name][trial_id] = data_dict
return nested_dict
def load_params(self):
trial_id_col = 'trial_id'
param_name_col = 'param_name'
param_value_col = 'param_value'
distribution_json_col = 'distribution_json'
nested_dict = {}
for _, row in self.trial_params.iterrows():
trial_id = row[trial_id_col]
param_name = row[param_name_col]
param_value = row[param_value_col]
distribution_json = row[distribution_json_col]
if trial_id not in nested_dict:
nested_dict[trial_id] = {}
dist_json = json.loads(distribution_json)
default_step = None
default_low = None
default_high = None
default_log = None
nested_dict[trial_id][param_name] = {
'param_name': param_name,
'param_value': param_value,
'step': dist_json["attributes"].get("step", default_step),
'low': dist_json["attributes"].get("low", default_low),
'high': dist_json["attributes"].get("high", default_high),
'log': dist_json["attributes"].get("log", default_log),
}
return nested_dict