From a1665403200876e2a039d6da102ceb0d03d6c834 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Thu, 16 May 2024 12:52:41 -0400 Subject: [PATCH] (feat) remove labeling, controllers and strategy from quants lab --- quants_lab/controllers/__init__.py | 0 quants_lab/controllers/bollinger_v1.py | 59 ----- quants_lab/controllers/dman_v1.py | 101 -------- quants_lab/controllers/dman_v2.py | 112 --------- quants_lab/controllers/dman_v3.py | 125 ---------- quants_lab/controllers/macd_bb_v1.py | 72 ------ quants_lab/controllers/supertrend.py | 52 ---- .../controllers/supertrend_multitimeframe.py | 89 ------- quants_lab/controllers/trend_follower_v1.py | 66 ----- quants_lab/labeling/__init__.py | 0 quants_lab/labeling/triple_barrier_method.py | 70 ------ quants_lab/strategy/__init__.py | 0 quants_lab/strategy/strategy_analysis.py | 234 ------------------ 13 files changed, 980 deletions(-) delete mode 100644 quants_lab/controllers/__init__.py delete mode 100644 quants_lab/controllers/bollinger_v1.py delete mode 100644 quants_lab/controllers/dman_v1.py delete mode 100644 quants_lab/controllers/dman_v2.py delete mode 100644 quants_lab/controllers/dman_v3.py delete mode 100644 quants_lab/controllers/macd_bb_v1.py delete mode 100644 quants_lab/controllers/supertrend.py delete mode 100644 quants_lab/controllers/supertrend_multitimeframe.py delete mode 100644 quants_lab/controllers/trend_follower_v1.py delete mode 100644 quants_lab/labeling/__init__.py delete mode 100644 quants_lab/labeling/triple_barrier_method.py delete mode 100644 quants_lab/strategy/__init__.py delete mode 100644 quants_lab/strategy/strategy_analysis.py diff --git a/quants_lab/controllers/__init__.py b/quants_lab/controllers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/quants_lab/controllers/bollinger_v1.py b/quants_lab/controllers/bollinger_v1.py deleted file mode 100644 index 30c1372..0000000 --- a/quants_lab/controllers/bollinger_v1.py +++ /dev/null @@ -1,59 +0,0 @@ -import time - -import pandas as pd -import pandas_ta as ta -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.directional_trading import DirectionalTradingControllerConfigBase, \ - DirectionalTradingControllerBase -from pydantic import Field - - -class BollingerV1Conf(DirectionalTradingControllerConfigBase): - strategy_name = "bollinger_v1" - bb_length: int = Field(default=100, ge=100, le=400) - bb_std: float = Field(default=2.0, ge=2.0, le=3.0) - bb_long_threshold: float = Field(default=0.0, ge=-1.0, le=0.2) - bb_short_threshold: float = Field(default=1.0, ge=0.8, le=2.0) - - -class BollingerV1(DirectionalTradingControllerBase): - - def __init__(self, config: BollingerV1Conf): - super().__init__(config) - self.config = config - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - If an executor has an active position, should we close it based on a condition. - """ - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - After finishing an order, the executor will be in cooldown for a certain amount of time. - This prevents the executor from creating a new order immediately after finishing one and execute a lot - of orders in a short period of time from the same side. - """ - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - def get_processed_data(self) -> pd.DataFrame: - df = self.candles[0].candles_df - - # Add indicators - df.ta.bbands(length=self.config.bb_length, std=self.config.bb_std, append=True) - - # Generate signal - long_condition = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"] < self.config.bb_long_threshold - short_condition = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"] > self.config.bb_short_threshold - - # Generate signal - df["signal"] = 0 - df.loc[long_condition, "signal"] = 1 - df.loc[short_condition, "signal"] = -1 - return df - - def extra_columns_to_show(self): - return [f"BBP_{self.config.bb_length}_{self.config.bb_std}"] \ No newline at end of file diff --git a/quants_lab/controllers/dman_v1.py b/quants_lab/controllers/dman_v1.py deleted file mode 100644 index 782e055..0000000 --- a/quants_lab/controllers/dman_v1.py +++ /dev/null @@ -1,101 +0,0 @@ -import time -from decimal import Decimal - -import pandas_ta as ta # noqa: F401 - -from hummingbot.core.data_type.common import TradeType -from hummingbot.smart_components.executors.position_executor.data_types import PositionConfig, TrailingStop -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.market_making.market_making_controller_base import ( - MarketMakingControllerBase, - MarketMakingControllerConfigBase, -) - - -class DManV1Config(MarketMakingControllerConfigBase): - strategy_name: str = "dman_v1" - natr_length: int = 14 - - -class DManV1(MarketMakingControllerBase): - """ - Directional Market Making Strategy making use of NATR indicator to make spreads dynamic. - """ - - def __init__(self, config: DManV1Config): - super().__init__(config) - self.config = config - - def refresh_order_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - Checks if the order needs to be refreshed. - You can reimplement this method to add more conditions. - """ - if executor.position_config.timestamp + order_level.order_refresh_time > time.time(): - return False - return True - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - If an executor has an active position, should we close it based on a condition. - """ - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - After finishing an order, the executor will be in cooldown for a certain amount of time. - This prevents the executor from creating a new order immediately after finishing one and execute a lot - of orders in a short period of time from the same side. - """ - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - def get_processed_data(self): - """ - Gets the price and spread multiplier from the last candlestick. - """ - candles_df = self.candles[0].candles_df - natr = ta.natr(candles_df["high"], candles_df["low"], candles_df["close"], length=self.config.natr_length) / 100 - - candles_df["spread_multiplier"] = natr - candles_df["price_multiplier"] = 0.0 - return candles_df - - def get_position_config(self, order_level: OrderLevel) -> PositionConfig: - """ - Creates a PositionConfig object from an OrderLevel object. - Here you can use technical indicators to determine the parameters of the position config. - """ - close_price = self.get_close_price(self.config.trading_pair) - price_multiplier, spread_multiplier = self.get_price_and_spread_multiplier() - - price_adjusted = close_price * (1 + price_multiplier) - side_multiplier = -1 if order_level.side == TradeType.BUY else 1 - order_price = price_adjusted * (1 + order_level.spread_factor * spread_multiplier * side_multiplier) - amount = order_level.order_amount_usd / order_price - - if order_level.triple_barrier_conf.trailing_stop_trailing_delta and order_level.triple_barrier_conf.trailing_stop_trailing_delta: - trailing_stop = TrailingStop( - activation_price_delta=order_level.triple_barrier_conf.trailing_stop_activation_price_delta, - trailing_delta=order_level.triple_barrier_conf.trailing_stop_trailing_delta, - ) - else: - trailing_stop = None - position_config = PositionConfig( - timestamp=time.time(), - trading_pair=self.config.trading_pair, - exchange=self.config.exchange, - side=order_level.side, - amount=amount, - take_profit=order_level.triple_barrier_conf.take_profit, - stop_loss=order_level.triple_barrier_conf.stop_loss, - time_limit=order_level.triple_barrier_conf.time_limit, - entry_price=Decimal(order_price), - open_order_type=order_level.triple_barrier_conf.open_order_type, - take_profit_order_type=order_level.triple_barrier_conf.take_profit_order_type, - trailing_stop=trailing_stop, - leverage=self.config.leverage - ) - return position_config diff --git a/quants_lab/controllers/dman_v2.py b/quants_lab/controllers/dman_v2.py deleted file mode 100644 index 402d479..0000000 --- a/quants_lab/controllers/dman_v2.py +++ /dev/null @@ -1,112 +0,0 @@ -import time -from decimal import Decimal - -import pandas_ta as ta # noqa: F401 - -from hummingbot.core.data_type.common import TradeType -from hummingbot.smart_components.executors.position_executor.data_types import PositionConfig, TrailingStop -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.market_making.market_making_controller_base import ( - MarketMakingControllerBase, - MarketMakingControllerConfigBase, -) - - -class DManV2Config(MarketMakingControllerConfigBase): - strategy_name: str = "dman_v2" - macd_fast: int = 12 - macd_slow: int = 26 - macd_signal: int = 9 - natr_length: int = 14 - - -class DManV2(MarketMakingControllerBase): - """ - Directional Market Making Strategy making use of NATR indicator to make spreads dynamic and shift the mid price. - """ - - def __init__(self, config: DManV2Config): - super().__init__(config) - self.config = config - - def refresh_order_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - Checks if the order needs to be refreshed. - You can reimplement this method to add more conditions. - """ - if executor.position_config.timestamp + order_level.order_refresh_time > time.time(): - return False - return True - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - If an executor has an active position, should we close it based on a condition. - """ - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - After finishing an order, the executor will be in cooldown for a certain amount of time. - This prevents the executor from creating a new order immediately after finishing one and execute a lot - of orders in a short period of time from the same side. - """ - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - def get_processed_data(self): - """ - Gets the price and spread multiplier from the last candlestick. - """ - candles_df = self.candles[0].candles_df - natr = ta.natr(candles_df["high"], candles_df["low"], candles_df["close"], length=self.config.natr_length) / 100 - - macd_output = ta.macd(candles_df["close"], fast=self.config.macd_fast, slow=self.config.macd_slow, signal=self.config.macd_signal) - macd = macd_output[f"MACD_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"] - macdh = macd_output[f"MACDh_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"] - macd_signal = - (macd - macd.mean()) / macd.std() - macdh_signal = macdh.apply(lambda x: 1 if x > 0 else -1) - max_price_shift = natr / 2 - - price_multiplier = (0.5 * macd_signal + 0.5 * macdh_signal) * max_price_shift - - candles_df["spread_multiplier"] = natr - candles_df["price_multiplier"] = price_multiplier - return candles_df - - def get_position_config(self, order_level: OrderLevel) -> PositionConfig: - """ - Creates a PositionConfig object from an OrderLevel object. - Here you can use technical indicators to determine the parameters of the position config. - """ - close_price = self.get_close_price(self.config.trading_pair) - price_multiplier, spread_multiplier = self.get_price_and_spread_multiplier() - - price_adjusted = close_price * (1 + price_multiplier) - side_multiplier = -1 if order_level.side == TradeType.BUY else 1 - order_price = price_adjusted * (1 + order_level.spread_factor * spread_multiplier * side_multiplier) - amount = order_level.order_amount_usd / order_price - if order_level.triple_barrier_conf.trailing_stop_trailing_delta and order_level.triple_barrier_conf.trailing_stop_trailing_delta: - trailing_stop = TrailingStop( - activation_price_delta=order_level.triple_barrier_conf.trailing_stop_activation_price_delta, - trailing_delta=order_level.triple_barrier_conf.trailing_stop_trailing_delta, - ) - else: - trailing_stop = None - position_config = PositionConfig( - timestamp=time.time(), - trading_pair=self.config.trading_pair, - exchange=self.config.exchange, - side=order_level.side, - amount=amount, - take_profit=order_level.triple_barrier_conf.take_profit, - stop_loss=order_level.triple_barrier_conf.stop_loss, - time_limit=order_level.triple_barrier_conf.time_limit, - entry_price=Decimal(order_price), - open_order_type=order_level.triple_barrier_conf.open_order_type, - take_profit_order_type=order_level.triple_barrier_conf.take_profit_order_type, - trailing_stop=trailing_stop, - leverage=self.config.leverage - ) - return position_config diff --git a/quants_lab/controllers/dman_v3.py b/quants_lab/controllers/dman_v3.py deleted file mode 100644 index caea6f2..0000000 --- a/quants_lab/controllers/dman_v3.py +++ /dev/null @@ -1,125 +0,0 @@ -import time -from decimal import Decimal - -import pandas_ta as ta # noqa: F401 - -from hummingbot.core.data_type.common import TradeType -from hummingbot.smart_components.executors.position_executor.data_types import PositionConfig, TrailingStop -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.market_making.market_making_controller_base import ( - MarketMakingControllerBase, - MarketMakingControllerConfigBase, -) - - -class DManV3Config(MarketMakingControllerConfigBase): - strategy_name: str = "dman_v3" - bb_length: int = 100 - bb_std: float = 2.0 - side_filter: bool = False - smart_activation: bool = False - activation_threshold: Decimal = Decimal("0.001") - dynamic_spread_factor: bool = True - dynamic_target_spread: bool = False - - -class DManV3(MarketMakingControllerBase): - """ - Mean reversion strategy with Grid execution making use of Bollinger Bands indicator to make spreads dynamic - and shift the mid price. - """ - - def __init__(self, config: DManV3Config): - super().__init__(config) - self.config = config - - def refresh_order_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - Checks if the order needs to be refreshed. - You can reimplement this method to add more conditions. - """ - if executor.position_config.timestamp + order_level.order_refresh_time > time.time(): - return False - return True - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - If an executor has an active position, should we close it based on a condition. - """ - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - After finishing an order, the executor will be in cooldown for a certain amount of time. - This prevents the executor from creating a new order immediately after finishing one and execute a lot - of orders in a short period of time from the same side. - """ - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - def get_processed_data(self): - """ - Gets the price and spread multiplier from the last candlestick. - """ - candles_df = self.candles[0].candles_df - bbp = ta.bbands(candles_df["close"], length=self.config.bb_length, std=self.config.bb_std) - - candles_df["price_multiplier"] = bbp[f"BBM_{self.config.bb_length}_{self.config.bb_std}"] - candles_df["spread_multiplier"] = bbp[f"BBB_{self.config.bb_length}_{self.config.bb_std}"] / 200 - return candles_df - - def get_position_config(self, order_level: OrderLevel) -> PositionConfig: - """ - Creates a PositionConfig object from an OrderLevel object. - Here you can use technical indicators to determine the parameters of the position config. - """ - close_price = self.get_close_price(self.config.trading_pair) - - bollinger_mid_price, spread_multiplier = self.get_price_and_spread_multiplier() - if not self.config.dynamic_spread_factor: - spread_multiplier = 1 - side_multiplier = -1 if order_level.side == TradeType.BUY else 1 - order_spread_multiplier = order_level.spread_factor * spread_multiplier * side_multiplier - order_price = bollinger_mid_price * (1 + order_spread_multiplier) - amount = order_level.order_amount_usd / order_price - - # Avoid placing the order from the opposite side - side_filter_condition = self.config.side_filter and ( - (bollinger_mid_price > close_price and side_multiplier == 1) or - (bollinger_mid_price < close_price and side_multiplier == -1)) - if side_filter_condition: - return - - # Smart activation of orders - smart_activation_condition = self.config.smart_activation and ( - side_multiplier == 1 and (close_price < order_price * (1 + self.config.activation_threshold)) or - (side_multiplier == -1 and (close_price > order_price * (1 - self.config.activation_threshold)))) - if smart_activation_condition: - return - - target_spread = spread_multiplier if self.config.dynamic_target_spread else 1 - if order_level.triple_barrier_conf.trailing_stop_trailing_delta and order_level.triple_barrier_conf.trailing_stop_trailing_delta: - trailing_stop = TrailingStop( - activation_price_delta=order_level.triple_barrier_conf.trailing_stop_activation_price_delta * target_spread, - trailing_delta=order_level.triple_barrier_conf.trailing_stop_trailing_delta * target_spread, - ) - else: - trailing_stop = None - position_config = PositionConfig( - timestamp=time.time(), - trading_pair=self.config.trading_pair, - exchange=self.config.exchange, - side=order_level.side, - amount=amount, - take_profit=order_level.triple_barrier_conf.take_profit * target_spread, - stop_loss=order_level.triple_barrier_conf.stop_loss * target_spread, - time_limit=order_level.triple_barrier_conf.time_limit, - entry_price=Decimal(order_price), - open_order_type=order_level.triple_barrier_conf.open_order_type, - take_profit_order_type=order_level.triple_barrier_conf.take_profit_order_type, - trailing_stop=trailing_stop, - leverage=self.config.leverage - ) - return position_config diff --git a/quants_lab/controllers/macd_bb_v1.py b/quants_lab/controllers/macd_bb_v1.py deleted file mode 100644 index 834362f..0000000 --- a/quants_lab/controllers/macd_bb_v1.py +++ /dev/null @@ -1,72 +0,0 @@ -import time -from typing import Optional - -import pandas as pd -from pydantic import Field - -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import ( - DirectionalTradingControllerBase, - DirectionalTradingControllerConfigBase, -) - - -class MACDBBV1Config(DirectionalTradingControllerConfigBase): - strategy_name: str = "macd_bb_v1" - bb_length: int = Field(default=24, ge=100, le=200) - bb_std: float = Field(default=2.0, ge=2.0, le=3.0) - bb_long_threshold: float = Field(default=0.0, ge=-1.0, le=0.2) - bb_short_threshold: float = Field(default=1.0, ge=0.8, le=2.0) - macd_fast: int = Field(default=21, ge=12, le=60) - macd_slow: int = Field(default=42, ge=26, le=200) - macd_signal: int = Field(default=9, ge=8, le=20) - - -class MACDBBV1(DirectionalTradingControllerBase): - """ - Directional Market Making Strategy making use of NATR indicator to make spreads dynamic. - """ - - def __init__(self, config: MACDBBV1Config): - super().__init__(config) - self.config = config - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - If an executor has an active position, should we close it based on a condition. - """ - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - """ - After finishing an order, the executor will be in cooldown for a certain amount of time. - This prevents the executor from creating a new order immediately after finishing one and execute a lot - of orders in a short period of time from the same side. - """ - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - def get_processed_data(self) -> pd.DataFrame: - df = self.candles[0].candles_df - - # Add indicators - df.ta.bbands(length=self.config.bb_length, std=self.config.bb_std, append=True) - df.ta.macd(fast=self.config.macd_fast, slow=self.config.macd_slow, signal=self.config.macd_signal, append=True) - bbp = df[f"BBP_{self.config.bb_length}_{self.config.bb_std}"] - macdh = df[f"MACDh_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"] - macd = df[f"MACD_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"] - - # Generate signal - long_condition = (bbp < self.config.bb_long_threshold) & (macdh > 0) & (macd < 0) - short_condition = (bbp > self.config.bb_short_threshold) & (macdh < 0) & (macd > 0) - df["signal"] = 0 - df.loc[long_condition, "signal"] = 1 - df.loc[short_condition, "signal"] = -1 - return df - - def extra_columns_to_show(self): - return [f"BBP_{self.config.bb_length}_{self.config.bb_std}", - f"MACDh_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}", - f"MACD_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"] diff --git a/quants_lab/controllers/supertrend.py b/quants_lab/controllers/supertrend.py deleted file mode 100644 index 6da96c4..0000000 --- a/quants_lab/controllers/supertrend.py +++ /dev/null @@ -1,52 +0,0 @@ -import time - -import pandas as pd -from pydantic import Field - -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import ( - DirectionalTradingControllerBase, - DirectionalTradingControllerConfigBase, -) - - -class SuperTrendConfig(DirectionalTradingControllerConfigBase): - strategy_name: str = "supertrend" - length: int = Field(default=20, ge=5, le=200) - multiplier: float = Field(default=4.0, ge=2.0, le=7.0) - percentage_threshold: float = Field(default=0.01, ge=0.005, le=0.05) - - -class SuperTrend(DirectionalTradingControllerBase): - def __init__(self, config: SuperTrendConfig): - super().__init__(config) - self.config = config - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - # If an executor has an active position, should we close it based on a condition. This feature is not available - # for the backtesting yet - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - # After finishing an order, the executor will be in cooldown for a certain amount of time. - # This prevents the executor from creating a new order immediately after finishing one and execute a lot - # of orders in a short period of time from the same side. - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - def get_processed_data(self) -> pd.DataFrame: - df = self.candles[0].candles_df - df.ta.supertrend(length=self.config.length, multiplier=self.config.multiplier, append=True) - df["percentage_distance"] = abs(df["close"] - df[f"SUPERT_{self.config.length}_{self.config.multiplier}"]) / df["close"] - - # Generate long and short conditions - long_condition = (df[f"SUPERTd_{self.config.length}_{self.config.multiplier}"] == 1) & (df["percentage_distance"] < self.config.percentage_threshold) - short_condition = (df[f"SUPERTd_{self.config.length}_{self.config.multiplier}"] == -1) & (df["percentage_distance"] < self.config.percentage_threshold) - - # Choose side - df['signal'] = 0 - df.loc[long_condition, 'signal'] = 1 - df.loc[short_condition, 'signal'] = -1 - return df diff --git a/quants_lab/controllers/supertrend_multitimeframe.py b/quants_lab/controllers/supertrend_multitimeframe.py deleted file mode 100644 index 2e389d3..0000000 --- a/quants_lab/controllers/supertrend_multitimeframe.py +++ /dev/null @@ -1,89 +0,0 @@ -import time -from typing import Optional, Callable - -import pandas as pd -from pydantic import Field - -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import ( - DirectionalTradingControllerBase, - DirectionalTradingControllerConfigBase, -) - - -class SuperTrendMTConfig(DirectionalTradingControllerConfigBase): - strategy_name: str = "supertrend_multitimeframe" - length: int = Field(default=20, ge=5, le=200) - multiplier: float = Field(default=4.0, ge=2.0, le=7.0) - percentage_threshold: float = Field(default=0.01, ge=0.005, le=0.05) - - -class SuperTrendMT(DirectionalTradingControllerBase): - def __init__(self, config: SuperTrendMTConfig): - super().__init__(config) - self.config = config - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - # If an executor has an active position, should we close it based on a condition. This feature is not available - # for the backtesting yet - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - # After finishing an order, the executor will be in cooldown for a certain amount of time. - # This prevents the executor from creating a new order immediately after finishing one and execute a lot - # of orders in a short period of time from the same side. - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - @staticmethod - def get_minutes_from_interval(interval: str): - unit = interval[-1] - quantity = int(interval[:-1]) - conversion = {"m": 1, "h": 60, "d": 1440} - return conversion[unit] * quantity - - def ordered_market_data_dfs(self): - market_data = {f"{candles.name}_{candles.interval}": candles.candles_df for candles in self.candles} - return sorted(market_data.items(), key=lambda x: self.get_minutes_from_interval(x[0].split("_")[-1])) - - def get_dataframes_merged_by_min_resolution(self, add_indicators_func: Optional[Callable] = None): - ordered_data = self.ordered_market_data_dfs() - if add_indicators_func: - processed_data = [] - for interval, df in ordered_data: - processed_df = add_indicators_func(df) - processed_data.append((interval, processed_df)) - else: - processed_data = ordered_data - interval_suffixes = {key: f'_{key.split("_")[-1]}' for key, _ in processed_data} - merged_df = None - for interval, df in processed_data: - if merged_df is None: - merged_df = df.copy() - else: - merged_df = pd.merge_asof(merged_df, df.add_suffix(interval_suffixes[interval]), - left_on=f"timestamp", right_on=f"timestamp{interval_suffixes[interval]}", - direction="backward") - return merged_df - - def add_indicators(self, df): - df.ta.supertrend(length=self.config.length, multiplier=self.config.multiplier, append=True) - return df - - def get_processed_data(self) -> pd.DataFrame: - df = self.get_dataframes_merged_by_min_resolution(self.add_indicators) - df["percentage_distance"] = abs(df["close"] - df[f"SUPERT_{self.config.length}_{self.config.multiplier}"]) / df["close"] - - columns_with_supertrend = [col for col in df.columns if "SUPERTd" in col] - - # Conditions for long and short signals - long_condition = df[columns_with_supertrend].apply(lambda x: all(item == 1 for item in x), axis=1) - short_condition = df[columns_with_supertrend].apply(lambda x: all(item == -1 for item in x), axis=1) - - # Choose side - df['signal'] = 0 - df.loc[long_condition, 'signal'] = 1 - df.loc[short_condition, 'signal'] = -1 - return df diff --git a/quants_lab/controllers/trend_follower_v1.py b/quants_lab/controllers/trend_follower_v1.py deleted file mode 100644 index 167de8f..0000000 --- a/quants_lab/controllers/trend_follower_v1.py +++ /dev/null @@ -1,66 +0,0 @@ -import time -from typing import Optional - -import pandas as pd -from pydantic import Field - -from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor -from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel -from hummingbot.smart_components.strategy_frameworks.directional_trading.directional_trading_controller_base import ( - DirectionalTradingControllerBase, - DirectionalTradingControllerConfigBase, -) - - -class TrendFollowerV1Config(DirectionalTradingControllerConfigBase): - strategy_name: str = "trend_follower_v1" - sma_fast: int = Field(default=20, ge=10, le=150) - sma_slow: int = Field(default=100, ge=50, le=400) - bb_length: int = Field(default=100, ge=100, le=200) - bb_std: float = Field(default=2.0, ge=2.0, le=3.0) - bb_threshold: float = Field(default=0.2, ge=0.1, le=0.5) - - -class TrendFollowerV1(DirectionalTradingControllerBase): - - def __init__(self, config: TrendFollowerV1Config): - super().__init__(config) - self.config = config - - def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - # If an executor has an active position, should we close it based on a condition. This feature is not available - # for the backtesting yet - return False - - def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool: - # After finishing an order, the executor will be in cooldown for a certain amount of time. - # This prevents the executor from creating a new order immediately after finishing one and execute a lot - # of orders in a short period of time from the same side. - if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time(): - return True - return False - - def get_processed_data(self) -> pd.DataFrame: - df = self.candles[0].candles_df - df.ta.sma(length=self.config.sma_fast, append=True) - df.ta.sma(length=self.config.sma_slow, append=True) - df.ta.bbands(length=self.config.bb_length, std=2.0, append=True) - - - # Generate long and short conditions - bbp = df[f"BBP_{self.config.bb_length}_2.0"] - inside_bounds_condition = (bbp < 0.5 + self.config.bb_threshold) & (bbp > 0.5 - self.config.bb_threshold) - - long_cond = (df[f'SMA_{self.config.sma_fast}'] > df[f'SMA_{self.config.sma_slow}']) - short_cond = (df[f'SMA_{self.config.sma_fast}'] < df[f'SMA_{self.config.sma_slow}']) - - # Choose side - df['signal'] = 0 - df.loc[long_cond & inside_bounds_condition, 'signal'] = 1 - df.loc[short_cond & inside_bounds_condition, 'signal'] = -1 - return df - - def extra_columns_to_show(self): - return [f"BBP_{self.config.bb_length}_{self.config.bb_std}", - f"SMA_{self.config.sma_fast}", - f"SMA_{self.config.sma_slow}"] diff --git a/quants_lab/labeling/__init__.py b/quants_lab/labeling/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/quants_lab/labeling/triple_barrier_method.py b/quants_lab/labeling/triple_barrier_method.py deleted file mode 100644 index 0650d56..0000000 --- a/quants_lab/labeling/triple_barrier_method.py +++ /dev/null @@ -1,70 +0,0 @@ -from typing import Optional - -import numpy as np -import pandas as pd - - -def triple_barrier_method(df, tp=1.0, sl=1.0, tl=5, std_span: Optional[int] = 100, trade_cost=0.0006, max_executors: int = 1): - df.index = pd.to_datetime(df.timestamp, unit="ms") - if std_span: - df["target"] = df["close"].rolling(std_span).std() / df["close"] - else: - df["target"] = 1 / 100 - df["tl"] = df.index + pd.Timedelta(seconds=tl) - df.dropna(subset="target", inplace=True) - - df = apply_tp_sl_on_tl(df, tp=tp, sl=sl) - - df = get_bins(df, trade_cost) - - df['tp'] = df['close'] * (1 + df['target'] * tp * df["side"]) - df['sl'] = df['close'] * (1 - df['target'] * sl * df["side"]) - - df = add_active_signals(df, max_executors) - return df - - -def add_active_signals(df, max_executors): - close_times = [pd.Timestamp.min] * max_executors - df["active_signal"] = 0 - for index, row in df[(df["side"] != 0)].iterrows(): - for close_time in close_times: - if row["timestamp"] > close_time: - df.loc[df.index == index, "active_signal"] = 1 - close_times.remove(close_time) - close_times.append(row["close_time"]) - break - return df - - -def get_bins(df, trade_cost): - # 1) prices aligned with events - px = df.index.union(df['tl'].values).drop_duplicates() - px = df.close.reindex(px, method='ffill') - - # 2) create out object - df['ret'] = (px.loc[df['close_time'].values].values / px.loc[df.index] - 1) * df['side'] - df['real_class'] = np.sign(df['ret'] - trade_cost) - return df - - -def apply_tp_sl_on_tl(df: pd.DataFrame, tp: float, sl: float): - events = df[df["side"] != 0].copy() - if tp > 0: - take_profit = tp * events['target'] - else: - take_profit = pd.Series(index=df.index) # NaNs - if sl > 0: - stop_loss = - sl * events['target'] - else: - stop_loss = pd.Series(index=df.index) # NaNs - - for loc, tl in events['tl'].fillna(df.index[-1]).items(): - df0 = df.close[loc:tl] # path prices - df0 = (df0 / df.close[loc] - 1) * events.at[loc, 'side'] # path returns - df.loc[loc, 'stop_loss_time'] = df0[df0 < stop_loss[loc]].index.min() # earliest stop loss. - df.loc[loc, 'take_profit_time'] = df0[df0 > take_profit[loc]].index.min() # earliest profit taking. - df["close_time"] = df[["tl", "take_profit_time", "stop_loss_time"]].dropna(how='all').min(axis=1) - df['close_type'] = df[['take_profit_time', 'stop_loss_time', 'tl']].dropna(how='all').idxmin(axis=1) - df['close_type'].replace({'take_profit_time': 'tp', 'stop_loss_time': 'sl'}, inplace=True) - return df diff --git a/quants_lab/strategy/__init__.py b/quants_lab/strategy/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/quants_lab/strategy/strategy_analysis.py b/quants_lab/strategy/strategy_analysis.py deleted file mode 100644 index bd4ca0b..0000000 --- a/quants_lab/strategy/strategy_analysis.py +++ /dev/null @@ -1,234 +0,0 @@ -from typing import Optional - -import pandas as pd -from plotly.subplots import make_subplots -import pandas_ta as ta # noqa: F401 -import plotly.graph_objs as go -import numpy as np - - -class StrategyAnalysis: - def __init__(self, positions: pd.DataFrame, candles_df: Optional[pd.DataFrame] = None): - self.candles_df = candles_df - self.positions = positions - self.candles_df["timestamp"] = pd.to_datetime(self.candles_df["timestamp"], unit="ms") - self.positions["timestamp"] = pd.to_datetime(self.positions["timestamp"], unit="ms") - self.positions["close_time"] = pd.to_datetime(self.positions["close_time"], unit="ms") - self.base_figure = None - - def create_base_figure(self, candlestick=True, volume=True, positions=False, trade_pnl=False, extra_rows=0): - rows, heights = self.get_n_rows_and_heights(extra_rows + trade_pnl, volume) - self.rows = rows - specs = [[{"secondary_y": True}]] * rows - self.base_figure = make_subplots(rows=rows, cols=1, shared_xaxes=True, vertical_spacing=0.01, - row_heights=heights, specs=specs) - if candlestick: - self.add_candles_graph(row=1, col=1) - if volume: - self.add_volume() - if positions: - self.add_positions() - if trade_pnl: - self.add_trade_pnl(row=rows) - self.update_layout(volume) - - def add_positions(self): - # Add long and short positions - active_signals = self.positions.copy() - active_signals.loc[active_signals["signal"] == -1, "symbol"] = "triangle-down" - active_signals.loc[active_signals["signal"] == 1, "symbol"] = "triangle-up" - active_signals.loc[active_signals["profitable"] == 1, "color"] = "lightgreen" - active_signals.loc[active_signals["profitable"] == -1, "color"] = "red" - self.base_figure.add_trace(go.Scatter(x=active_signals.loc[(active_signals["side"] != 0), "timestamp"], - y=active_signals.loc[active_signals["side"] != 0, "close"], - name="Entry Price: $", - mode="markers", - marker_color=active_signals.loc[(active_signals["side"] != 0), "color"], - marker_symbol=active_signals.loc[(active_signals["side"] != 0), "symbol"], - marker_size=20, - marker_line={"color": "black", "width": 0.7}), - row=1, col=1) - - for index, row in active_signals.iterrows(): - self.base_figure.add_shape(type="rect", - fillcolor="green", - opacity=0.5, - x0=row.timestamp, - y0=row.close, - x1=row.close_time, - y1=row.take_profit_price, - line=dict(color="green"), - row=1, col=1) - # Add SL - self.base_figure.add_shape(type="rect", - fillcolor="red", - opacity=0.5, - x0=row.timestamp, - y0=row.close, - x1=row.close_time, - y1=row.stop_loss_price, - line=dict(color="red"), - row=1, col=1) - - def get_n_rows_and_heights(self, extra_rows, volume=True): - rows = 1 + extra_rows + volume - row_heights = [0.5] * (extra_rows) - if volume: - row_heights.insert(0, 0.2) - row_heights.insert(0, 0.8) - return rows, row_heights - - def figure(self): - return self.base_figure - - def add_candles_graph(self, row, col, name_suffix='', timeframe_suffix=''): - self.base_figure.add_trace( - go.Candlestick( - x=self.candles_df[f"timestamp{timeframe_suffix}"], - open=self.candles_df[f"open{timeframe_suffix}"], - high=self.candles_df[f"high{timeframe_suffix}"], - low=self.candles_df[f"low{timeframe_suffix}"], - close=self.candles_df[f"close{timeframe_suffix}"], - name=f"OHLC_{name_suffix}" - ), - row=row, col=col, - ) - - def add_volume(self): - self.base_figure.add_trace( - go.Bar( - x=self.candles_df["timestamp"], - y=self.candles_df["volume"], - name="Volume", - opacity=0.5, - marker=dict(color="lightgreen") - ), - row=2, col=1, - ) - - def add_trade_pnl(self, row=2): - self.base_figure.add_trace( - go.Scatter( - x=self.positions["timestamp"], - y=self.positions["net_pnl_quote"].cumsum(), - name="Cumulative Trade PnL", - mode="lines", - line=dict(color="chocolate", width=2)), - row=row, col=1 - ) - self.base_figure.update_yaxes(title_text="Cum Trade PnL", row=row, col=1) - - def update_layout(self, volume=True): - self.base_figure.update_layout( - title={ - "text": "Backtesting Analysis", - "y": 0.95, - "x": 0.5, - "xanchor": "center", - "yanchor": "top" - }, - legend=dict( - orientation="h", - yanchor="bottom", - y=-0.2, - xanchor="right", - x=1 - ), - height=1500, - xaxis_rangeslider_visible=False, - hovermode="x unified" - ) - self.base_figure.update_yaxes(title_text="Price", row=1, col=1) - if volume: - self.base_figure.update_yaxes(title_text="Volume", row=2, col=1) - self.base_figure.update_xaxes(title_text="Time", row=self.rows, col=1) - - def initial_portfolio(self): - return self.positions["inventory"].dropna().values[0] - - def final_portfolio(self): - return self.positions["inventory"].dropna().values[-1] - - def net_profit_usd(self): - return self.final_portfolio() - self.initial_portfolio() - - def net_profit_pct(self): - return self.net_profit_usd() / self.initial_portfolio() - - def returns(self): - return self.positions["net_pnl_quote"] / self.initial_portfolio() - - def total_positions(self): - return self.positions.shape[0] - 1 - - def win_signals(self): - return self.positions.loc[(self.positions["profitable"] > 0) & (self.positions["side"] != 0)] - - def loss_signals(self): - return self.positions.loc[(self.positions["profitable"] < 0) & (self.positions["side"] != 0)] - - def accuracy(self): - return self.win_signals().shape[0] / self.total_positions() - - def max_drawdown_usd(self): - cumulative_returns = self.positions["net_pnl_quote"].cumsum() - peak = np.maximum.accumulate(cumulative_returns) - drawdown = (cumulative_returns - peak) - max_draw_down = np.min(drawdown) - return max_draw_down - - def max_drawdown_pct(self): - return self.max_drawdown_usd() / self.initial_portfolio() - - def sharpe_ratio(self): - returns = self.returns() - return returns.mean() / returns.std() - - def profit_factor(self): - total_won = self.win_signals().loc[:, "net_pnl_quote"].sum() - total_loss = - self.loss_signals().loc[:, "net_pnl_quote"].sum() - return total_won / total_loss - - def duration_in_minutes(self): - return (self.positions["timestamp"].iloc[-1] - self.positions["timestamp"].iloc[0]).total_seconds() / 60 - - def avg_trading_time_in_minutes(self): - time_diff_minutes = (self.positions["close_time"] - self.positions["timestamp"]).dt.total_seconds() / 60 - return time_diff_minutes.mean() - - def start_date(self): - return pd.to_datetime(self.candles_df.timestamp.min(), unit="ms") - - def end_date(self): - return pd.to_datetime(self.candles_df.timestamp.max(), unit="ms") - - def avg_profit(self): - return self.positions.net_pnl_quote.mean() - - def text_report(self): - return f""" -Strategy Performance Report: - - Net Profit: {self.net_profit_usd():,.2f} USD ({self.net_profit_pct() * 100:,.2f}%) - - Total Positions: {self.total_positions()} - - Win Signals: {self.win_signals().shape[0]} - - Loss Signals: {self.loss_signals().shape[0]} - - Accuracy: {self.accuracy():,.2f}% - - Profit Factor: {self.profit_factor():,.2f} - - Max Drawdown: {self.max_drawdown_usd():,.2f} USD | {self.max_drawdown_pct() * 100:,.2f}% - - Sharpe Ratio: {self.sharpe_ratio():,.2f} - - Duration: {self.duration_in_minutes() / 60:,.2f} Hours - - Average Trade Duration: {self.avg_trading_time_in_minutes():,.2f} minutes - """ - - def pnl_over_time(self): - fig = go.Figure() - fig.add_trace(go.Scatter(name="PnL Over Time", - x=self.positions.index, - y=self.positions.net_pnl_quote.cumsum())) - # Update layout with the required attributes - fig.update_layout( - title="PnL Over Time", - xaxis_title="N° Position", - yaxis=dict(title="Net PnL USD", side="left", showgrid=False), - ) - return fig