import pandas as pd
import numpy as np
from datetime import datetime
import time
class MACDHistogramBot:
"""
Bot Trading sử dụng chiến lược MACD Histogram
"""
def __init__(self, initial_capital=10000, fast=12, slow=26, signal=9,
stop_loss_pct=0.02, take_profit_pct=0.04):
self.initial_capital = initial_capital
self.capital = initial_capital
self.fast = fast
self.slow = slow
self.signal = signal
self.stop_loss_pct = stop_loss_pct
self.take_profit_pct = take_profit_pct
self.positions = []
self.trades = []
self.equity_curve = [initial_capital]
def calculate_macd(self, data):
"""Tính toán MACD Histogram"""
ema_fast = data['Close'].ewm(span=self.fast, adjust=False).mean()
ema_slow = data['Close'].ewm(span=self.slow, adjust=False).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=self.signal, adjust=False).mean()
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def generate_signals(self, data):
"""Tạo tín hiệu giao dịch"""
macd, signal_line, histogram = self.calculate_macd(data)
data['MACD'] = macd
data['Signal_Line'] = signal_line
data['Histogram'] = histogram
data['Signal'] = 0
buy_condition = (
(histogram > 0) &
(histogram.shift(1) <= 0) &
(macd > signal_line)
)
sell_condition = (
(histogram < 0) &
(histogram.shift(1) >= 0) &
(macd < signal_line)
)
data.loc[buy_condition, 'Signal'] = 1
data.loc[sell_condition, 'Signal'] = -1
return data
def calculate_position_size(self, price, risk_pct=0.02):
"""
Tính toán kích thước vị thế dựa trên rủi ro
"""
risk_amount = self.capital * risk_pct
stop_loss_distance = price * self.stop_loss_pct
position_size = risk_amount / stop_loss_distance
return min(position_size, self.capital / price * 0.95)
def execute_trade(self, signal, price, timestamp):
"""Thực thi giao dịch"""
if signal == 0:
return
if self.positions:
for position in self.positions[:]:
if (position['direction'] > 0 and signal < 0) or \
(position['direction'] < 0 and signal > 0):
self.close_position(position, price, timestamp)
if signal != 0 and not self.positions:
position_size = self.calculate_position_size(price)
if position_size > 0:
position = {
'entry_price': price,
'size': position_size,
'direction': signal,
'entry_time': timestamp,
'stop_loss': price * (1 - self.stop_loss_pct) if signal > 0 \
else price * (1 + self.stop_loss_pct),
'take_profit': price * (1 + self.take_profit_pct) if signal > 0 \
else price * (1 - self.take_profit_pct)
}
self.positions.append(position)
self.capital -= position_size * price
def close_position(self, position, exit_price, exit_time, reason='Signal'):
"""Đóng vị thế"""
if position['direction'] > 0:
pnl = (exit_price - position['entry_price']) * position['size']
else:
pnl = (position['entry_price'] - exit_price) * position['size']
pnl_pct = pnl / (position['entry_price'] * position['size']) * 100
trade = {
'entry_time': position['entry_time'],
'exit_time': exit_time,
'entry_price': position['entry_price'],
'exit_price': exit_price,
'direction': 'Long' if position['direction'] > 0 else 'Short',
'size': position['size'],
'pnl': pnl,
'pnl_pct': pnl_pct,
'exit_reason': reason
}
self.trades.append(trade)
self.capital += position['size'] * exit_price + pnl
self.positions.remove(position)
def check_stop_loss_take_profit(self, current_price, timestamp):
"""Kiểm tra Stop Loss và Take Profit"""
for position in self.positions[:]:
if position['direction'] > 0:
if current_price <= position['stop_loss']:
self.close_position(position, current_price, timestamp, 'Stop Loss')
elif current_price >= position['take_profit']:
self.close_position(position, current_price, timestamp, 'Take Profit')
else:
if current_price >= position['stop_loss']:
self.close_position(position, current_price, timestamp, 'Stop Loss')
elif current_price <= position['take_profit']:
self.close_position(position, current_price, timestamp, 'Take Profit')
def run_backtest(self, data):
"""Chạy backtest"""
data = self.generate_signals(data.copy())
for i in range(len(data)):
current_price = data['Close'].iloc[i]
signal = data['Signal'].iloc[i]
timestamp = data.index[i]
self.check_stop_loss_take_profit(current_price, timestamp)
self.execute_trade(signal, current_price, timestamp)
current_equity = self.capital
for position in self.positions:
if position['direction'] > 0:
unrealized_pnl = (current_price - position['entry_price']) * position['size']
else:
unrealized_pnl = (position['entry_price'] - current_price) * position['size']
current_equity += position['size'] * position['entry_price'] + unrealized_pnl
self.equity_curve.append(current_equity)
final_price = data['Close'].iloc[-1]
for position in self.positions[:]:
self.close_position(position, final_price, data.index[-1], 'End of Data')
return pd.DataFrame(self.trades), pd.Series(self.equity_curve, index=data.index)
def get_performance_metrics(self, trades_df, equity_curve):
"""Tính toán các chỉ số hiệu suất"""
if len(trades_df) == 0:
return {}
total_return = (equity_curve.iloc[-1] / self.initial_capital - 1) * 100
winning_trades = trades_df[trades_df['pnl'] > 0]
losing_trades = trades_df[trades_df['pnl'] < 0]
win_rate = len(winning_trades) / len(trades_df) * 100 if len(trades_df) > 0 else 0
avg_win = winning_trades['pnl'].mean() if len(winning_trades) > 0 else 0
avg_loss = abs(losing_trades['pnl'].mean()) if len(losing_trades) > 0 else 0
profit_factor = (avg_win * len(winning_trades)) / (avg_loss * len(losing_trades)) \
if avg_loss > 0 and len(losing_trades) > 0 else 0
returns = equity_curve.pct_change().dropna()
sharpe_ratio = np.sqrt(252) * returns.mean() / returns.std() if returns.std() > 0 else 0
peak = equity_curve.expanding().max()
drawdown = (equity_curve - peak) / peak
max_drawdown = drawdown.min() * 100
return {
'Total Return (%)': round(total_return, 2),
'Win Rate (%)': round(win_rate, 2),
'Profit Factor': round(profit_factor, 2),
'Average Win': round(avg_win, 2),
'Average Loss': round(avg_loss, 2),
'Sharpe Ratio': round(sharpe_ratio, 2),
'Max Drawdown (%)': round(max_drawdown, 2),
'Total Trades': len(trades_df)
}