Files
hummingbot-dashboard/utils/database_manager.py

208 lines
9.6 KiB
Python

import os
import streamlit as st
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from utils.data_manipulation import StrategyData
class DatabaseManager:
def __init__(self, db_name: str, executors_path: str = "data"):
self.db_name = db_name
# TODO: Create db path for all types of db
self.db_path = f'sqlite:///{os.path.join("data", db_name)}'
self.executors_path = executors_path
self.engine = create_engine(self.db_path, connect_args={'check_same_thread': False})
self.session_maker = sessionmaker(bind=self.engine)
@property
def status(self):
try:
with self.session_maker() as session:
query = 'SELECT DISTINCT config_file_path FROM TradeFill'
config_files = pd.read_sql_query(query, session.connection())
if len(config_files) > 0:
# TODO: improve error handling, think what to do with other cases
return "OK"
else:
return "No records found in the TradeFill table with non-null config_file_path"
except Exception as e:
return f"Error: {str(e)}"
@property
def config_files(self):
return self.get_config_files()
@property
def configs(self):
return {config_file: self.get_exchanges_trading_pairs_by_config_file(config_file) for config_file in self.config_files}
def get_config_files(self):
with self.session_maker() as session:
query = 'SELECT DISTINCT config_file_path FROM TradeFill'
config_files = pd.read_sql_query(query, session.connection())
return config_files['config_file_path'].tolist()
def get_exchanges_trading_pairs_by_config_file(self, config_file_path):
with self.session_maker() as session:
query = f"SELECT DISTINCT market, symbol FROM TradeFill WHERE config_file_path = '{config_file_path}'"
exchanges_trading_pairs = pd.read_sql_query(query, session.connection())
exchanges_trading_pairs["market"] = exchanges_trading_pairs["market"].apply(
lambda x: x.lower().replace("_papertrade", ""))
exchanges_trading_pairs = exchanges_trading_pairs.groupby("market")["symbol"].apply(list).to_dict()
return exchanges_trading_pairs
@staticmethod
def _get_orders_query(config_file_path=None, start_date=None, end_date=None):
query = "SELECT * FROM 'Order'"
conditions = []
if config_file_path:
conditions.append(f"config_file_path = '{config_file_path}'")
if start_date:
conditions.append(f"created_at >= '{start_date}'")
if end_date:
conditions.append(f"created_at <= '{end_date}'")
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
return query
@staticmethod
def _get_order_status_query(order_ids=None, start_date=None, end_date=None):
query = "SELECT * FROM OrderStatus"
conditions = []
if order_ids:
order_ids_string = ",".join(f"'{order_id}'" for order_id in order_ids)
conditions.append(f"order_id IN ({order_ids_string})")
if start_date:
conditions.append(f"created_at >= '{start_date}'")
if end_date:
conditions.append(f"created_at <= '{end_date}'")
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
return query
@staticmethod
def _get_trade_fills_query(config_file_path=None, start_date=None, end_date=None):
query = "SELECT * FROM TradeFill"
conditions = []
if config_file_path:
conditions.append(f"config_file_path = '{config_file_path}'")
if start_date:
conditions.append(f"created_at >= '{start_date}'")
if end_date:
conditions.append(f"created_at <= '{end_date}'")
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
return query
@staticmethod
def _get_market_data_query(start_date=None, end_date=None):
query = "SELECT * FROM MarketData"
conditions = []
if start_date:
conditions.append(f"timestamp >= '{start_date * 1e6}'")
if end_date:
conditions.append(f"timestamp <= '{end_date * 1e6}'")
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
return query
def get_orders(self, config_file_path=None, start_date=None, end_date=None):
with self.session_maker() as session:
query = self._get_orders_query(config_file_path, start_date, end_date)
orders = pd.read_sql_query(query, session.connection())
orders["market"] = orders["market"].apply(lambda x: x.lower().replace("_papertrade", ""))
orders["amount"] = orders["amount"] / 1e6
orders["price"] = orders["price"] / 1e6
orders['creation_timestamp'] = pd.to_datetime(orders['creation_timestamp'], unit="ms")
orders['last_update_timestamp'] = pd.to_datetime(orders['last_update_timestamp'], unit="ms")
return orders
def get_trade_fills(self, config_file_path=None, start_date=None, end_date=None):
groupers = ["config_file_path", "market", "symbol"]
with self.session_maker() as session:
query = self._get_trade_fills_query(config_file_path, start_date, end_date)
trade_fills = pd.read_sql_query(query, session.connection())
trade_fills.sort_values(by="timestamp", ascending=True, inplace=True)
trade_fills["amount"] = trade_fills["amount"] / 1e6
trade_fills["price"] = trade_fills["price"] / 1e6
trade_fills["trade_fee_in_quote"] = trade_fills["trade_fee_in_quote"] / 1e6
trade_fills["cum_fees_in_quote"] = trade_fills.groupby(groupers)["trade_fee_in_quote"].cumsum()
trade_fills["net_amount"] = trade_fills['amount'] * trade_fills['trade_type'].apply(lambda x: 1 if x == 'BUY' else -1)
trade_fills["net_amount_quote"] = trade_fills['net_amount'] * trade_fills['price']
trade_fills["cum_net_amount"] = trade_fills.groupby(groupers)["net_amount"].cumsum()
trade_fills["unrealized_trade_pnl"] = -1 * trade_fills.groupby(groupers)["net_amount_quote"].cumsum()
trade_fills["inventory_cost"] = trade_fills["cum_net_amount"] * trade_fills["price"]
trade_fills["realized_trade_pnl"] = trade_fills["unrealized_trade_pnl"] + trade_fills["inventory_cost"]
trade_fills["net_realized_pnl"] = trade_fills["realized_trade_pnl"] - trade_fills["cum_fees_in_quote"]
trade_fills["timestamp"] = pd.to_datetime(trade_fills["timestamp"], unit="ms")
trade_fills["market"] = trade_fills["market"].apply(lambda x: x.lower().replace("_papertrade", ""))
return trade_fills
def get_order_status(self, order_ids=None, start_date=None, end_date=None):
with self.session_maker() as session:
query = self._get_order_status_query(order_ids, start_date, end_date)
order_status = pd.read_sql_query(query, session.connection())
return order_status
def get_market_data(self, start_date=None, end_date=None):
with self.session_maker() as session:
query = self._get_market_data_query(start_date, end_date)
market_data = pd.read_sql_query(query, session.connection())
market_data["timestamp"] = pd.to_datetime(market_data["timestamp"] / 1e6, unit="ms")
market_data.set_index("timestamp", inplace=True)
market_data["mid_price"] = market_data["mid_price"] / 1e6
market_data["best_bid"] = market_data["best_bid"] / 1e6
market_data["best_ask"] = market_data["best_ask"] / 1e6
return market_data
def get_position_executor_data(self, start_date=None, end_date=None) -> pd.DataFrame:
df = pd.DataFrame()
files = [file for file in os.listdir(self.executors_path) if ".csv" in file and file is not "trades_market_making_.csv"]
for file in files:
df0 = pd.read_csv(f"{self.executors_path}/{file}")
df = pd.concat([df, df0])
df["datetime"] = pd.to_datetime(df["timestamp"], unit="s")
if start_date:
df = df[df["datetime"] >= start_date]
if end_date:
df = df[df["datetime"] <= end_date]
return df
@staticmethod
def _safe_table_loading(func, *args, **kwargs):
try:
table = func(*args, **kwargs)
except Exception:
table = None
return table
def get_strategy_data(self, config_file_path=None, start_date=None, end_date=None):
def load_orders():
return self.get_orders(config_file_path, start_date, end_date)
def load_trade_fills():
return self.get_trade_fills(config_file_path, start_date, end_date)
def load_order_status():
return self.get_order_status(orders['id'].tolist(), start_date, end_date)
def load_market_data():
return self.get_market_data(start_date, end_date)
def load_position_executor():
return self.get_position_executor_data(start_date, end_date)
# Use _safe_table_loading to load tables
orders = self._safe_table_loading(load_orders)
trade_fills = self._safe_table_loading(load_trade_fills)
order_status = self._safe_table_loading(load_order_status)
market_data = self._safe_table_loading(load_market_data)
position_executor = self._safe_table_loading(load_position_executor)
strategy_data = StrategyData(orders, order_status, trade_fills, market_data, position_executor)
return strategy_data