(feat) remove labeling, controllers and strategy from quants lab

This commit is contained in:
cardosofede
2024-05-16 12:52:41 -04:00
parent 3e9cf2e7d9
commit a166540320
13 changed files with 0 additions and 980 deletions

View File

@@ -1,59 +0,0 @@
import time
import pandas as pd
import pandas_ta as ta
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.directional_trading import DirectionalTradingControllerConfigBase, \
DirectionalTradingControllerBase
from pydantic import Field
class BollingerV1Conf(DirectionalTradingControllerConfigBase):
strategy_name = "bollinger_v1"
bb_length: int = Field(default=100, ge=100, le=400)
bb_std: float = Field(default=2.0, ge=2.0, le=3.0)
bb_long_threshold: float = Field(default=0.0, ge=-1.0, le=0.2)
bb_short_threshold: float = Field(default=1.0, ge=0.8, le=2.0)
class BollingerV1(DirectionalTradingControllerBase):
def __init__(self, config: BollingerV1Conf):
super().__init__(config)
self.config = config
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
If an executor has an active position, should we close it based on a condition.
"""
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
After finishing an order, the executor will be in cooldown for a certain amount of time.
This prevents the executor from creating a new order immediately after finishing one and execute a lot
of orders in a short period of time from the same side.
"""
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self) -> pd.DataFrame:
df = self.candles[0].candles_df
# Add indicators
df.ta.bbands(length=self.config.bb_length, std=self.config.bb_std, append=True)
# Generate signal
long_condition = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"] < self.config.bb_long_threshold
short_condition = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"] > self.config.bb_short_threshold
# Generate signal
df["signal"] = 0
df.loc[long_condition, "signal"] = 1
df.loc[short_condition, "signal"] = -1
return df
def extra_columns_to_show(self):
return [f"BBP_{self.config.bb_length}_{self.config.bb_std}"]

View File

@@ -1,101 +0,0 @@
import time
from decimal import Decimal
import pandas_ta as ta # noqa: F401
from hummingbot.core.data_type.common import TradeType
from hummingbot.smart_components.executors.position_executor.data_types import PositionConfig, TrailingStop
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.market_making.market_making_controller_base import (
MarketMakingControllerBase,
MarketMakingControllerConfigBase,
)
class DManV1Config(MarketMakingControllerConfigBase):
strategy_name: str = "dman_v1"
natr_length: int = 14
class DManV1(MarketMakingControllerBase):
"""
Directional Market Making Strategy making use of NATR indicator to make spreads dynamic.
"""
def __init__(self, config: DManV1Config):
super().__init__(config)
self.config = config
def refresh_order_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
Checks if the order needs to be refreshed.
You can reimplement this method to add more conditions.
"""
if executor.position_config.timestamp + order_level.order_refresh_time > time.time():
return False
return True
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
If an executor has an active position, should we close it based on a condition.
"""
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
After finishing an order, the executor will be in cooldown for a certain amount of time.
This prevents the executor from creating a new order immediately after finishing one and execute a lot
of orders in a short period of time from the same side.
"""
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self):
"""
Gets the price and spread multiplier from the last candlestick.
"""
candles_df = self.candles[0].candles_df
natr = ta.natr(candles_df["high"], candles_df["low"], candles_df["close"], length=self.config.natr_length) / 100
candles_df["spread_multiplier"] = natr
candles_df["price_multiplier"] = 0.0
return candles_df
def get_position_config(self, order_level: OrderLevel) -> PositionConfig:
"""
Creates a PositionConfig object from an OrderLevel object.
Here you can use technical indicators to determine the parameters of the position config.
"""
close_price = self.get_close_price(self.config.trading_pair)
price_multiplier, spread_multiplier = self.get_price_and_spread_multiplier()
price_adjusted = close_price * (1 + price_multiplier)
side_multiplier = -1 if order_level.side == TradeType.BUY else 1
order_price = price_adjusted * (1 + order_level.spread_factor * spread_multiplier * side_multiplier)
amount = order_level.order_amount_usd / order_price
if order_level.triple_barrier_conf.trailing_stop_trailing_delta and order_level.triple_barrier_conf.trailing_stop_trailing_delta:
trailing_stop = TrailingStop(
activation_price_delta=order_level.triple_barrier_conf.trailing_stop_activation_price_delta,
trailing_delta=order_level.triple_barrier_conf.trailing_stop_trailing_delta,
)
else:
trailing_stop = None
position_config = PositionConfig(
timestamp=time.time(),
trading_pair=self.config.trading_pair,
exchange=self.config.exchange,
side=order_level.side,
amount=amount,
take_profit=order_level.triple_barrier_conf.take_profit,
stop_loss=order_level.triple_barrier_conf.stop_loss,
time_limit=order_level.triple_barrier_conf.time_limit,
entry_price=Decimal(order_price),
open_order_type=order_level.triple_barrier_conf.open_order_type,
take_profit_order_type=order_level.triple_barrier_conf.take_profit_order_type,
trailing_stop=trailing_stop,
leverage=self.config.leverage
)
return position_config

View File

@@ -1,112 +0,0 @@
import time
from decimal import Decimal
import pandas_ta as ta # noqa: F401
from hummingbot.core.data_type.common import TradeType
from hummingbot.smart_components.executors.position_executor.data_types import PositionConfig, TrailingStop
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.market_making.market_making_controller_base import (
MarketMakingControllerBase,
MarketMakingControllerConfigBase,
)
class DManV2Config(MarketMakingControllerConfigBase):
strategy_name: str = "dman_v2"
macd_fast: int = 12
macd_slow: int = 26
macd_signal: int = 9
natr_length: int = 14
class DManV2(MarketMakingControllerBase):
"""
Directional Market Making Strategy making use of NATR indicator to make spreads dynamic and shift the mid price.
"""
def __init__(self, config: DManV2Config):
super().__init__(config)
self.config = config
def refresh_order_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
Checks if the order needs to be refreshed.
You can reimplement this method to add more conditions.
"""
if executor.position_config.timestamp + order_level.order_refresh_time > time.time():
return False
return True
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
If an executor has an active position, should we close it based on a condition.
"""
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
After finishing an order, the executor will be in cooldown for a certain amount of time.
This prevents the executor from creating a new order immediately after finishing one and execute a lot
of orders in a short period of time from the same side.
"""
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self):
"""
Gets the price and spread multiplier from the last candlestick.
"""
candles_df = self.candles[0].candles_df
natr = ta.natr(candles_df["high"], candles_df["low"], candles_df["close"], length=self.config.natr_length) / 100
macd_output = ta.macd(candles_df["close"], fast=self.config.macd_fast, slow=self.config.macd_slow, signal=self.config.macd_signal)
macd = macd_output[f"MACD_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"]
macdh = macd_output[f"MACDh_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"]
macd_signal = - (macd - macd.mean()) / macd.std()
macdh_signal = macdh.apply(lambda x: 1 if x > 0 else -1)
max_price_shift = natr / 2
price_multiplier = (0.5 * macd_signal + 0.5 * macdh_signal) * max_price_shift
candles_df["spread_multiplier"] = natr
candles_df["price_multiplier"] = price_multiplier
return candles_df
def get_position_config(self, order_level: OrderLevel) -> PositionConfig:
"""
Creates a PositionConfig object from an OrderLevel object.
Here you can use technical indicators to determine the parameters of the position config.
"""
close_price = self.get_close_price(self.config.trading_pair)
price_multiplier, spread_multiplier = self.get_price_and_spread_multiplier()
price_adjusted = close_price * (1 + price_multiplier)
side_multiplier = -1 if order_level.side == TradeType.BUY else 1
order_price = price_adjusted * (1 + order_level.spread_factor * spread_multiplier * side_multiplier)
amount = order_level.order_amount_usd / order_price
if order_level.triple_barrier_conf.trailing_stop_trailing_delta and order_level.triple_barrier_conf.trailing_stop_trailing_delta:
trailing_stop = TrailingStop(
activation_price_delta=order_level.triple_barrier_conf.trailing_stop_activation_price_delta,
trailing_delta=order_level.triple_barrier_conf.trailing_stop_trailing_delta,
)
else:
trailing_stop = None
position_config = PositionConfig(
timestamp=time.time(),
trading_pair=self.config.trading_pair,
exchange=self.config.exchange,
side=order_level.side,
amount=amount,
take_profit=order_level.triple_barrier_conf.take_profit,
stop_loss=order_level.triple_barrier_conf.stop_loss,
time_limit=order_level.triple_barrier_conf.time_limit,
entry_price=Decimal(order_price),
open_order_type=order_level.triple_barrier_conf.open_order_type,
take_profit_order_type=order_level.triple_barrier_conf.take_profit_order_type,
trailing_stop=trailing_stop,
leverage=self.config.leverage
)
return position_config

View File

@@ -1,125 +0,0 @@
import time
from decimal import Decimal
import pandas_ta as ta # noqa: F401
from hummingbot.core.data_type.common import TradeType
from hummingbot.smart_components.executors.position_executor.data_types import PositionConfig, TrailingStop
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.market_making.market_making_controller_base import (
MarketMakingControllerBase,
MarketMakingControllerConfigBase,
)
class DManV3Config(MarketMakingControllerConfigBase):
strategy_name: str = "dman_v3"
bb_length: int = 100
bb_std: float = 2.0
side_filter: bool = False
smart_activation: bool = False
activation_threshold: Decimal = Decimal("0.001")
dynamic_spread_factor: bool = True
dynamic_target_spread: bool = False
class DManV3(MarketMakingControllerBase):
"""
Mean reversion strategy with Grid execution making use of Bollinger Bands indicator to make spreads dynamic
and shift the mid price.
"""
def __init__(self, config: DManV3Config):
super().__init__(config)
self.config = config
def refresh_order_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
Checks if the order needs to be refreshed.
You can reimplement this method to add more conditions.
"""
if executor.position_config.timestamp + order_level.order_refresh_time > time.time():
return False
return True
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
If an executor has an active position, should we close it based on a condition.
"""
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
After finishing an order, the executor will be in cooldown for a certain amount of time.
This prevents the executor from creating a new order immediately after finishing one and execute a lot
of orders in a short period of time from the same side.
"""
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self):
"""
Gets the price and spread multiplier from the last candlestick.
"""
candles_df = self.candles[0].candles_df
bbp = ta.bbands(candles_df["close"], length=self.config.bb_length, std=self.config.bb_std)
candles_df["price_multiplier"] = bbp[f"BBM_{self.config.bb_length}_{self.config.bb_std}"]
candles_df["spread_multiplier"] = bbp[f"BBB_{self.config.bb_length}_{self.config.bb_std}"] / 200
return candles_df
def get_position_config(self, order_level: OrderLevel) -> PositionConfig:
"""
Creates a PositionConfig object from an OrderLevel object.
Here you can use technical indicators to determine the parameters of the position config.
"""
close_price = self.get_close_price(self.config.trading_pair)
bollinger_mid_price, spread_multiplier = self.get_price_and_spread_multiplier()
if not self.config.dynamic_spread_factor:
spread_multiplier = 1
side_multiplier = -1 if order_level.side == TradeType.BUY else 1
order_spread_multiplier = order_level.spread_factor * spread_multiplier * side_multiplier
order_price = bollinger_mid_price * (1 + order_spread_multiplier)
amount = order_level.order_amount_usd / order_price
# Avoid placing the order from the opposite side
side_filter_condition = self.config.side_filter and (
(bollinger_mid_price > close_price and side_multiplier == 1) or
(bollinger_mid_price < close_price and side_multiplier == -1))
if side_filter_condition:
return
# Smart activation of orders
smart_activation_condition = self.config.smart_activation and (
side_multiplier == 1 and (close_price < order_price * (1 + self.config.activation_threshold)) or
(side_multiplier == -1 and (close_price > order_price * (1 - self.config.activation_threshold))))
if smart_activation_condition:
return
target_spread = spread_multiplier if self.config.dynamic_target_spread else 1
if order_level.triple_barrier_conf.trailing_stop_trailing_delta and order_level.triple_barrier_conf.trailing_stop_trailing_delta:
trailing_stop = TrailingStop(
activation_price_delta=order_level.triple_barrier_conf.trailing_stop_activation_price_delta * target_spread,
trailing_delta=order_level.triple_barrier_conf.trailing_stop_trailing_delta * target_spread,
)
else:
trailing_stop = None
position_config = PositionConfig(
timestamp=time.time(),
trading_pair=self.config.trading_pair,
exchange=self.config.exchange,
side=order_level.side,
amount=amount,
take_profit=order_level.triple_barrier_conf.take_profit * target_spread,
stop_loss=order_level.triple_barrier_conf.stop_loss * target_spread,
time_limit=order_level.triple_barrier_conf.time_limit,
entry_price=Decimal(order_price),
open_order_type=order_level.triple_barrier_conf.open_order_type,
take_profit_order_type=order_level.triple_barrier_conf.take_profit_order_type,
trailing_stop=trailing_stop,
leverage=self.config.leverage
)
return position_config

View File

@@ -1,72 +0,0 @@
import time
from typing import Optional
import pandas as pd
from pydantic import Field
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import (
DirectionalTradingControllerBase,
DirectionalTradingControllerConfigBase,
)
class MACDBBV1Config(DirectionalTradingControllerConfigBase):
strategy_name: str = "macd_bb_v1"
bb_length: int = Field(default=24, ge=100, le=200)
bb_std: float = Field(default=2.0, ge=2.0, le=3.0)
bb_long_threshold: float = Field(default=0.0, ge=-1.0, le=0.2)
bb_short_threshold: float = Field(default=1.0, ge=0.8, le=2.0)
macd_fast: int = Field(default=21, ge=12, le=60)
macd_slow: int = Field(default=42, ge=26, le=200)
macd_signal: int = Field(default=9, ge=8, le=20)
class MACDBBV1(DirectionalTradingControllerBase):
"""
Directional Market Making Strategy making use of NATR indicator to make spreads dynamic.
"""
def __init__(self, config: MACDBBV1Config):
super().__init__(config)
self.config = config
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
If an executor has an active position, should we close it based on a condition.
"""
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
After finishing an order, the executor will be in cooldown for a certain amount of time.
This prevents the executor from creating a new order immediately after finishing one and execute a lot
of orders in a short period of time from the same side.
"""
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self) -> pd.DataFrame:
df = self.candles[0].candles_df
# Add indicators
df.ta.bbands(length=self.config.bb_length, std=self.config.bb_std, append=True)
df.ta.macd(fast=self.config.macd_fast, slow=self.config.macd_slow, signal=self.config.macd_signal, append=True)
bbp = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"]
macdh = df[f"MACDh_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"]
macd = df[f"MACD_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"]
# Generate signal
long_condition = (bbp < self.config.bb_long_threshold) & (macdh > 0) & (macd < 0)
short_condition = (bbp > self.config.bb_short_threshold) & (macdh < 0) & (macd > 0)
df["signal"] = 0
df.loc[long_condition, "signal"] = 1
df.loc[short_condition, "signal"] = -1
return df
def extra_columns_to_show(self):
return [f"BBP_{self.config.bb_length}_{self.config.bb_std}",
f"MACDh_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}",
f"MACD_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"]

View File

@@ -1,52 +0,0 @@
import time
import pandas as pd
from pydantic import Field
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import (
DirectionalTradingControllerBase,
DirectionalTradingControllerConfigBase,
)
class SuperTrendConfig(DirectionalTradingControllerConfigBase):
strategy_name: str = "supertrend"
length: int = Field(default=20, ge=5, le=200)
multiplier: float = Field(default=4.0, ge=2.0, le=7.0)
percentage_threshold: float = Field(default=0.01, ge=0.005, le=0.05)
class SuperTrend(DirectionalTradingControllerBase):
def __init__(self, config: SuperTrendConfig):
super().__init__(config)
self.config = config
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
# If an executor has an active position, should we close it based on a condition. This feature is not available
# for the backtesting yet
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
# After finishing an order, the executor will be in cooldown for a certain amount of time.
# This prevents the executor from creating a new order immediately after finishing one and execute a lot
# of orders in a short period of time from the same side.
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self) -> pd.DataFrame:
df = self.candles[0].candles_df
df.ta.supertrend(length=self.config.length, multiplier=self.config.multiplier, append=True)
df["percentage_distance"] = abs(df["close"] - df[f"SUPERT_{self.config.length}_{self.config.multiplier}"]) / df["close"]
# Generate long and short conditions
long_condition = (df[f"SUPERTd_{self.config.length}_{self.config.multiplier}"] == 1) & (df["percentage_distance"] < self.config.percentage_threshold)
short_condition = (df[f"SUPERTd_{self.config.length}_{self.config.multiplier}"] == -1) & (df["percentage_distance"] < self.config.percentage_threshold)
# Choose side
df['signal'] = 0
df.loc[long_condition, 'signal'] = 1
df.loc[short_condition, 'signal'] = -1
return df

View File

@@ -1,89 +0,0 @@
import time
from typing import Optional, Callable
import pandas as pd
from pydantic import Field
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import (
DirectionalTradingControllerBase,
DirectionalTradingControllerConfigBase,
)
class SuperTrendMTConfig(DirectionalTradingControllerConfigBase):
strategy_name: str = "supertrend_multitimeframe"
length: int = Field(default=20, ge=5, le=200)
multiplier: float = Field(default=4.0, ge=2.0, le=7.0)
percentage_threshold: float = Field(default=0.01, ge=0.005, le=0.05)
class SuperTrendMT(DirectionalTradingControllerBase):
def __init__(self, config: SuperTrendMTConfig):
super().__init__(config)
self.config = config
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
# If an executor has an active position, should we close it based on a condition. This feature is not available
# for the backtesting yet
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
# After finishing an order, the executor will be in cooldown for a certain amount of time.
# This prevents the executor from creating a new order immediately after finishing one and execute a lot
# of orders in a short period of time from the same side.
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
@staticmethod
def get_minutes_from_interval(interval: str):
unit = interval[-1]
quantity = int(interval[:-1])
conversion = {"m": 1, "h": 60, "d": 1440}
return conversion[unit] * quantity
def ordered_market_data_dfs(self):
market_data = {f"{candles.name}_{candles.interval}": candles.candles_df for candles in self.candles}
return sorted(market_data.items(), key=lambda x: self.get_minutes_from_interval(x[0].split("_")[-1]))
def get_dataframes_merged_by_min_resolution(self, add_indicators_func: Optional[Callable] = None):
ordered_data = self.ordered_market_data_dfs()
if add_indicators_func:
processed_data = []
for interval, df in ordered_data:
processed_df = add_indicators_func(df)
processed_data.append((interval, processed_df))
else:
processed_data = ordered_data
interval_suffixes = {key: f'_{key.split("_")[-1]}' for key, _ in processed_data}
merged_df = None
for interval, df in processed_data:
if merged_df is None:
merged_df = df.copy()
else:
merged_df = pd.merge_asof(merged_df, df.add_suffix(interval_suffixes[interval]),
left_on=f"timestamp", right_on=f"timestamp{interval_suffixes[interval]}",
direction="backward")
return merged_df
def add_indicators(self, df):
df.ta.supertrend(length=self.config.length, multiplier=self.config.multiplier, append=True)
return df
def get_processed_data(self) -> pd.DataFrame:
df = self.get_dataframes_merged_by_min_resolution(self.add_indicators)
df["percentage_distance"] = abs(df["close"] - df[f"SUPERT_{self.config.length}_{self.config.multiplier}"]) / df["close"]
columns_with_supertrend = [col for col in df.columns if "SUPERTd" in col]
# Conditions for long and short signals
long_condition = df[columns_with_supertrend].apply(lambda x: all(item == 1 for item in x), axis=1)
short_condition = df[columns_with_supertrend].apply(lambda x: all(item == -1 for item in x), axis=1)
# Choose side
df['signal'] = 0
df.loc[long_condition, 'signal'] = 1
df.loc[short_condition, 'signal'] = -1
return df

View File

@@ -1,66 +0,0 @@
import time
from typing import Optional
import pandas as pd
from pydantic import Field
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import (
DirectionalTradingControllerBase,
DirectionalTradingControllerConfigBase,
)
class TrendFollowerV1Config(DirectionalTradingControllerConfigBase):
strategy_name: str = "trend_follower_v1"
sma_fast: int = Field(default=20, ge=10, le=150)
sma_slow: int = Field(default=100, ge=50, le=400)
bb_length: int = Field(default=100, ge=100, le=200)
bb_std: float = Field(default=2.0, ge=2.0, le=3.0)
bb_threshold: float = Field(default=0.2, ge=0.1, le=0.5)
class TrendFollowerV1(DirectionalTradingControllerBase):
def __init__(self, config: TrendFollowerV1Config):
super().__init__(config)
self.config = config
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
# If an executor has an active position, should we close it based on a condition. This feature is not available
# for the backtesting yet
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
# After finishing an order, the executor will be in cooldown for a certain amount of time.
# This prevents the executor from creating a new order immediately after finishing one and execute a lot
# of orders in a short period of time from the same side.
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self) -> pd.DataFrame:
df = self.candles[0].candles_df
df.ta.sma(length=self.config.sma_fast, append=True)
df.ta.sma(length=self.config.sma_slow, append=True)
df.ta.bbands(length=self.config.bb_length, std=2.0, append=True)
# Generate long and short conditions
bbp = df[f"BBP_{self.config.bb_length}_2.0"]
inside_bounds_condition = (bbp < 0.5 + self.config.bb_threshold) & (bbp > 0.5 - self.config.bb_threshold)
long_cond = (df[f'SMA_{self.config.sma_fast}'] > df[f'SMA_{self.config.sma_slow}'])
short_cond = (df[f'SMA_{self.config.sma_fast}'] < df[f'SMA_{self.config.sma_slow}'])
# Choose side
df['signal'] = 0
df.loc[long_cond & inside_bounds_condition, 'signal'] = 1
df.loc[short_cond & inside_bounds_condition, 'signal'] = -1
return df
def extra_columns_to_show(self):
return [f"BBP_{self.config.bb_length}_{self.config.bb_std}",
f"SMA_{self.config.sma_fast}",
f"SMA_{self.config.sma_slow}"]

View File

@@ -1,70 +0,0 @@
from typing import Optional
import numpy as np
import pandas as pd
def triple_barrier_method(df, tp=1.0, sl=1.0, tl=5, std_span: Optional[int] = 100, trade_cost=0.0006, max_executors: int = 1):
df.index = pd.to_datetime(df.timestamp, unit="ms")
if std_span:
df["target"] = df["close"].rolling(std_span).std() / df["close"]
else:
df["target"] = 1 / 100
df["tl"] = df.index + pd.Timedelta(seconds=tl)
df.dropna(subset="target", inplace=True)
df = apply_tp_sl_on_tl(df, tp=tp, sl=sl)
df = get_bins(df, trade_cost)
df['tp'] = df['close'] * (1 + df['target'] * tp * df["side"])
df['sl'] = df['close'] * (1 - df['target'] * sl * df["side"])
df = add_active_signals(df, max_executors)
return df
def add_active_signals(df, max_executors):
close_times = [pd.Timestamp.min] * max_executors
df["active_signal"] = 0
for index, row in df[(df["side"] != 0)].iterrows():
for close_time in close_times:
if row["timestamp"] > close_time:
df.loc[df.index == index, "active_signal"] = 1
close_times.remove(close_time)
close_times.append(row["close_time"])
break
return df
def get_bins(df, trade_cost):
# 1) prices aligned with events
px = df.index.union(df['tl'].values).drop_duplicates()
px = df.close.reindex(px, method='ffill')
# 2) create out object
df['ret'] = (px.loc[df['close_time'].values].values / px.loc[df.index] - 1) * df['side']
df['real_class'] = np.sign(df['ret'] - trade_cost)
return df
def apply_tp_sl_on_tl(df: pd.DataFrame, tp: float, sl: float):
events = df[df["side"] != 0].copy()
if tp > 0:
take_profit = tp * events['target']
else:
take_profit = pd.Series(index=df.index) # NaNs
if sl > 0:
stop_loss = - sl * events['target']
else:
stop_loss = pd.Series(index=df.index) # NaNs
for loc, tl in events['tl'].fillna(df.index[-1]).items():
df0 = df.close[loc:tl] # path prices
df0 = (df0 / df.close[loc] - 1) * events.at[loc, 'side'] # path returns
df.loc[loc, 'stop_loss_time'] = df0[df0 < stop_loss[loc]].index.min() # earliest stop loss.
df.loc[loc, 'take_profit_time'] = df0[df0 > take_profit[loc]].index.min() # earliest profit taking.
df["close_time"] = df[["tl", "take_profit_time", "stop_loss_time"]].dropna(how='all').min(axis=1)
df['close_type'] = df[['take_profit_time', 'stop_loss_time', 'tl']].dropna(how='all').idxmin(axis=1)
df['close_type'].replace({'take_profit_time': 'tp', 'stop_loss_time': 'sl'}, inplace=True)
return df

View File

@@ -1,234 +0,0 @@
from typing import Optional
import pandas as pd
from plotly.subplots import make_subplots
import pandas_ta as ta # noqa: F401
import plotly.graph_objs as go
import numpy as np
class StrategyAnalysis:
def __init__(self, positions: pd.DataFrame, candles_df: Optional[pd.DataFrame] = None):
self.candles_df = candles_df
self.positions = positions
self.candles_df["timestamp"] = pd.to_datetime(self.candles_df["timestamp"], unit="ms")
self.positions["timestamp"] = pd.to_datetime(self.positions["timestamp"], unit="ms")
self.positions["close_time"] = pd.to_datetime(self.positions["close_time"], unit="ms")
self.base_figure = None
def create_base_figure(self, candlestick=True, volume=True, positions=False, trade_pnl=False, extra_rows=0):
rows, heights = self.get_n_rows_and_heights(extra_rows + trade_pnl, volume)
self.rows = rows
specs = [[{"secondary_y": True}]] * rows
self.base_figure = make_subplots(rows=rows, cols=1, shared_xaxes=True, vertical_spacing=0.01,
row_heights=heights, specs=specs)
if candlestick:
self.add_candles_graph(row=1, col=1)
if volume:
self.add_volume()
if positions:
self.add_positions()
if trade_pnl:
self.add_trade_pnl(row=rows)
self.update_layout(volume)
def add_positions(self):
# Add long and short positions
active_signals = self.positions.copy()
active_signals.loc[active_signals["signal"] == -1, "symbol"] = "triangle-down"
active_signals.loc[active_signals["signal"] == 1, "symbol"] = "triangle-up"
active_signals.loc[active_signals["profitable"] == 1, "color"] = "lightgreen"
active_signals.loc[active_signals["profitable"] == -1, "color"] = "red"
self.base_figure.add_trace(go.Scatter(x=active_signals.loc[(active_signals["side"] != 0), "timestamp"],
y=active_signals.loc[active_signals["side"] != 0, "close"],
name="Entry Price: $",
mode="markers",
marker_color=active_signals.loc[(active_signals["side"] != 0), "color"],
marker_symbol=active_signals.loc[(active_signals["side"] != 0), "symbol"],
marker_size=20,
marker_line={"color": "black", "width": 0.7}),
row=1, col=1)
for index, row in active_signals.iterrows():
self.base_figure.add_shape(type="rect",
fillcolor="green",
opacity=0.5,
x0=row.timestamp,
y0=row.close,
x1=row.close_time,
y1=row.take_profit_price,
line=dict(color="green"),
row=1, col=1)
# Add SL
self.base_figure.add_shape(type="rect",
fillcolor="red",
opacity=0.5,
x0=row.timestamp,
y0=row.close,
x1=row.close_time,
y1=row.stop_loss_price,
line=dict(color="red"),
row=1, col=1)
def get_n_rows_and_heights(self, extra_rows, volume=True):
rows = 1 + extra_rows + volume
row_heights = [0.5] * (extra_rows)
if volume:
row_heights.insert(0, 0.2)
row_heights.insert(0, 0.8)
return rows, row_heights
def figure(self):
return self.base_figure
def add_candles_graph(self, row, col, name_suffix='', timeframe_suffix=''):
self.base_figure.add_trace(
go.Candlestick(
x=self.candles_df[f"timestamp{timeframe_suffix}"],
open=self.candles_df[f"open{timeframe_suffix}"],
high=self.candles_df[f"high{timeframe_suffix}"],
low=self.candles_df[f"low{timeframe_suffix}"],
close=self.candles_df[f"close{timeframe_suffix}"],
name=f"OHLC_{name_suffix}"
),
row=row, col=col,
)
def add_volume(self):
self.base_figure.add_trace(
go.Bar(
x=self.candles_df["timestamp"],
y=self.candles_df["volume"],
name="Volume",
opacity=0.5,
marker=dict(color="lightgreen")
),
row=2, col=1,
)
def add_trade_pnl(self, row=2):
self.base_figure.add_trace(
go.Scatter(
x=self.positions["timestamp"],
y=self.positions["net_pnl_quote"].cumsum(),
name="Cumulative Trade PnL",
mode="lines",
line=dict(color="chocolate", width=2)),
row=row, col=1
)
self.base_figure.update_yaxes(title_text="Cum Trade PnL", row=row, col=1)
def update_layout(self, volume=True):
self.base_figure.update_layout(
title={
"text": "Backtesting Analysis",
"y": 0.95,
"x": 0.5,
"xanchor": "center",
"yanchor": "top"
},
legend=dict(
orientation="h",
yanchor="bottom",
y=-0.2,
xanchor="right",
x=1
),
height=1500,
xaxis_rangeslider_visible=False,
hovermode="x unified"
)
self.base_figure.update_yaxes(title_text="Price", row=1, col=1)
if volume:
self.base_figure.update_yaxes(title_text="Volume", row=2, col=1)
self.base_figure.update_xaxes(title_text="Time", row=self.rows, col=1)
def initial_portfolio(self):
return self.positions["inventory"].dropna().values[0]
def final_portfolio(self):
return self.positions["inventory"].dropna().values[-1]
def net_profit_usd(self):
return self.final_portfolio() - self.initial_portfolio()
def net_profit_pct(self):
return self.net_profit_usd() / self.initial_portfolio()
def returns(self):
return self.positions["net_pnl_quote"] / self.initial_portfolio()
def total_positions(self):
return self.positions.shape[0] - 1
def win_signals(self):
return self.positions.loc[(self.positions["profitable"] > 0) & (self.positions["side"] != 0)]
def loss_signals(self):
return self.positions.loc[(self.positions["profitable"] < 0) & (self.positions["side"] != 0)]
def accuracy(self):
return self.win_signals().shape[0] / self.total_positions()
def max_drawdown_usd(self):
cumulative_returns = self.positions["net_pnl_quote"].cumsum()
peak = np.maximum.accumulate(cumulative_returns)
drawdown = (cumulative_returns - peak)
max_draw_down = np.min(drawdown)
return max_draw_down
def max_drawdown_pct(self):
return self.max_drawdown_usd() / self.initial_portfolio()
def sharpe_ratio(self):
returns = self.returns()
return returns.mean() / returns.std()
def profit_factor(self):
total_won = self.win_signals().loc[:, "net_pnl_quote"].sum()
total_loss = - self.loss_signals().loc[:, "net_pnl_quote"].sum()
return total_won / total_loss
def duration_in_minutes(self):
return (self.positions["timestamp"].iloc[-1] - self.positions["timestamp"].iloc[0]).total_seconds() / 60
def avg_trading_time_in_minutes(self):
time_diff_minutes = (self.positions["close_time"] - self.positions["timestamp"]).dt.total_seconds() / 60
return time_diff_minutes.mean()
def start_date(self):
return pd.to_datetime(self.candles_df.timestamp.min(), unit="ms")
def end_date(self):
return pd.to_datetime(self.candles_df.timestamp.max(), unit="ms")
def avg_profit(self):
return self.positions.net_pnl_quote.mean()
def text_report(self):
return f"""
Strategy Performance Report:
- Net Profit: {self.net_profit_usd():,.2f} USD ({self.net_profit_pct() * 100:,.2f}%)
- Total Positions: {self.total_positions()}
- Win Signals: {self.win_signals().shape[0]}
- Loss Signals: {self.loss_signals().shape[0]}
- Accuracy: {self.accuracy():,.2f}%
- Profit Factor: {self.profit_factor():,.2f}
- Max Drawdown: {self.max_drawdown_usd():,.2f} USD | {self.max_drawdown_pct() * 100:,.2f}%
- Sharpe Ratio: {self.sharpe_ratio():,.2f}
- Duration: {self.duration_in_minutes() / 60:,.2f} Hours
- Average Trade Duration: {self.avg_trading_time_in_minutes():,.2f} minutes
"""
def pnl_over_time(self):
fig = go.Figure()
fig.add_trace(go.Scatter(name="PnL Over Time",
x=self.positions.index,
y=self.positions.net_pnl_quote.cumsum()))
# Update layout with the required attributes
fig.update_layout(
title="PnL Over Time",
xaxis_title="N° Position",
yaxis=dict(title="Net PnL USD", side="left", showgrid=False),
)
return fig