TurtleTrade/TurtleOnTime.py
2025-05-25 15:27:01 +08:00

972 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import math
import akshare as ak
import os
from datetime import datetime, timedelta, date
import pandas as pd
import mplfinance as mpf
import mysql_database
from EmailTest import send_email, parse_return_email
from dataclasses import dataclass
import time
import threading
import yaml # 添加YAML支持
'''
todo
1 运行过程框架调整支持多个turtle同时监测
2 增加运行状态写入yaml文件读取文件恢复状态
'''
@dataclass
class BuyState:
trigger_time: float # 触发次数
buy_price: float # 买入价格
add_price: float # 加仓价格
stop_price: float # 止损价格
is_gap_up: bool # 是否跳空高开
shares: int # 买入股数
atr: int # ATR
available_cash: float # 可用资金
@dataclass
class TradeLog:
data: str # 时间
type: str # 操作类型
buy_price: float # 买入价格
shares: int # 买入股数
cost: float # 成本
atr: int # ATR
available_cash: float # 可用资金
all_shares: float # 总股数
all_cost: float # 总成本
Net_value: float # 净值
Net_return: float # 净收益
@dataclass
class BreakOutLog:
# 记录突破信息
data: str # 时间
breakout_price: float # 突破价格
lose_price: float # 亏损价格
valid_or_not: str # 是否有效
win_or_lose: bool # 是否盈利
def calc_sma_atr_pd(kdf,period):
"""计算TR与ATR
Args:
kdf (_type_): 历史数据
period (_type_): ATR周期
Returns:
_type_: 返回kdf增加TR与ATR列
"""
kdf['最高'] = kdf['最高'].astype(float)
kdf['最低'] = kdf['最低'].astype(float)
kdf['收盘'] = kdf['收盘'].astype(float)
kdf['HL'] = kdf['最高'] - kdf['最低']
kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1))
kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1))
kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3)
# ranges = pd.concat([high_low, high_close, low_close], axis=1)
# true_range = np.max(ranges, axis=1)
kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3)
return kdf.drop(['HL','HC','LC'], axis = 1)
class TurtleTrading(object):
"""对象范围较小对某一个标的创建一个海龟如513300
计算ATR、唐奇安通道线
基础数据
Args:
object (_type_): _description_
"""
def __init__(self, TradeCode, type, riskcoe, Capital, cash) -> None:
self.TradeCode = TradeCode
self.type = type
self.riskcoe = riskcoe
self.Capital = Capital
self.cash = cash
self.TrigerTime = 0
self.BuyStates = list[BuyState]
self.tradeslog = list[TradeLog]
self.BreakOutLog = list[BreakOutLog]
self.PriceNow = 0.0
self.Donchian_20_up = 0.0
self.Donchian_10_down = 0.0
self.Donchian_50_up = 0.0
self.is_gap_up = False # 是否跳空高开
self.prev_heigh = 0.0 # 前一天最高价
def GetRecentData(self):
"""获取某个标的的最近数据,从两年前到今天, 计算后的数据保存在self.CurrentData
Returns:
_type_: _description_
"""
Today = datetime.today()
# print(Today)
formatted_date = Today.strftime("%Y%m%d")
two_years_ago = (date.today() - timedelta(days=365*2)).strftime("%Y%m%d")
# print(formatted_date)
Code = f"{self.TradeCode}"
CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date=two_years_ago, end_date=formatted_date, adjust="")
# 将日期列转换为datetime
CurrentData = pd.DataFrame(CurrentData)
CurrentData['日期'] = pd.to_datetime(CurrentData['日期'])
# print(type(CurrentData['日期'].iloc[0]))
CurrentData.set_index('日期', inplace=True)
# CurrentData.reset_index(inplace=True)
# print(type(CurrentData['日期'].iloc[0]))
# create table
# stock_database.create_table(Code)
# stock_database.insert_data(Code, CurrentData)
# mysql_database.insert_db(CurrentData, Code, True, "'日期'")
self.CurrentData = CurrentData
# return self.CurrentData
def CalATR(self, data, ATRday):
"""计算某个标的的ATR从上市日到今天, 计算后的数据保存在self.CurrentData
Args:
ATRday: 多少日ATR
SaveOrNot (_type_): 是否保存.csv数据
"""
self.CurrentData = calc_sma_atr_pd(data, ATRday)
self.N = self.CurrentData['ATR']
# return self.N
def ReadExistData(self, data):
"""除了通过发请求获取数据也可以读本地的数据库赋值给self.CurrentData
Args:
data (_type_): 本地csv名称
"""
self.CurrentData = pd.read_csv(data)
def DrawKLine(self, days):
"""画出k线图看看,画出最近days天的K线图
"""
# 日期部分
# dates = pd.to_datetime(self.CurrentData['日期'][-days:])
# # Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期'])
Klinedf = pd.DataFrame()
# Klinedf.set_index = Klinedf['Data']
# 其他数据
Klinedf['Date'] = self.CurrentData['日期'][-days:]
Klinedf['Open'] = self.CurrentData['开盘'][-days:].astype(float)
Klinedf['High'] = self.CurrentData['最高'][-days:].astype(float)
Klinedf['Low'] = self.CurrentData['最低'][-days:].astype(float)
Klinedf['Close'] = self.CurrentData['收盘'][-days:].astype(float)
Klinedf['Volume'] = self.CurrentData['成交量'][-days:].astype(float)
Klinedf.set_index(pd.to_datetime(Klinedf['Date']), inplace=True)
# 画图
mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian_up['Upper'][-days:]), mpf.make_addplot(self.Donchian_down['lower'][-days:])], title=f"{self.TradeCode} K线图")
def calculate_donchian_channel_up(self, n):
"""
计算n日唐奇安上通道
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame包含"High"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper"
"""
Donchian = pd.DataFrame() # 创建一个空的DataFrame用于存储唐奇安通道数据
# 计算最高价和最低价的N日移动平均线
name = 'Donchian_' + str(n) + '_upper'
Donchian[name] = self.CurrentData['最高'].rolling(n).max() # 使用rolling函数计算n日最高价的移动最大值
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 # 计算上通道和下通道的中间线,但此行代码被注释掉了
return Donchian # 返回包含唐奇安上通道的DataFrame
def calculate_donchian_channel_down(self, n):
"""
计算n日唐奇安上通道
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame包含"High"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper"
"""
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
name = 'Donchian_' + str(n) + '_lower'
Donchian[name] = self.CurrentData['最低'].rolling(n).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
def calc_atr_donchian_short(self):
"""计算ATR、短期唐奇安通道
"""
# 计算ATR
self.CalATR(self.CurrentData, 20)
# 计算唐奇安通道
self.Donchian_20_ups = self.calculate_donchian_channel_up(20)
self.Donchian_50_ups = self.calculate_donchian_channel_up(50)
self.Donchian_downs = self.calculate_donchian_channel_down(10)
# 画图
# self.DrawKLine(days)
# 把self.N, self.Donchian_up, self.Donchian_down, 添加到self.CurrentData后面保存到mysql数据库
self.CurrentData = pd.concat([self.CurrentData, self.Donchian_20_ups, self.Donchian_50_ups, self.Donchian_downs], axis=1)
def get_ready(self, days):
"""创建一个turtle对象获取数据计算ATR计算唐奇安通道
Args:
days (_type_): _description_
n (_type_): _description_
"""
# if 不存在database
if not mysql_database.check_db_table(f"{self.TradeCode}"):
self.GetRecentData()
self.calc_atr_donchian_short()
Code = f"{self.TradeCode}"
mysql_database.insert_db(self.CurrentData, Code, True, "日期")
else:
# 检查数据库最后一条的时间距离今天是否两天以上
current_date = date.today()
threshold_date = current_date - timedelta(days=2)
last_update = mysql_database.check_db_table_last_date(f"{self.TradeCode}")
if last_update < threshold_date:
# 如果不存在则从akshare获取数据并保存到mysql数据库
mysql_database.delete_table(f"{self.TradeCode}")
self.GetRecentData()
self.calc_atr_donchian_short()
Code = f"{self.TradeCode}"
mysql_database.insert_db(self.CurrentData, Code, True, "日期")
else:
# 如果存在则从mysql数据库中读取数据
self.CurrentData = mysql_database.fetch_all_data(f"{self.TradeCode}")
def CalPositionSize(self):
"""根据风险系数、ATR计算仓位大小, 存于self.IntPositionSize
"""
PositionSize = self.riskcoe * self.Capital /(self.N) # 默认用股票形式了 100
self.IntPositionSize = int(PositionSize // 100) * 100
def system1EnterNormal(self, PriceNow, TempDonchian20Upper, BreakOutLog):
# 没有持仓且价格向上突破---此时包含两种情形1 对某标的首次使用系统2 已发生过突破,此时上次突破天然是失败的
if self.TrigerTime == 0 and PriceNow > TempDonchian20Upper:
# 买入
return True
elif PriceNow > TempDonchian20Upper:#todo !=0不会满足条件 先跳过
self.system1BreakoutValid(PriceNow)
if BreakOutLog[-1].win_or_lose == None: # TT!= 0且突破且上一次突破unseccessful
return True
else:
return False
else:
return False
def system1EnterSafe(self, PriceNow, TempDonchian55Upper):
if PriceNow > TempDonchian55Upper[-1]: # 保底的55日突破
return True
else:
return False
def system1BreakoutValid(self, priceNow):
"""判断前一次突破是否成功是log[-1][5]写入“win”否则写入“Lose”
"""
if priceNow < self.BreakOutLog[-1].lose_price:
self.BreakOutLog[-1].win_or_lose = None
else:
self.BreakOutLog[-1].win_or_lose = True
# 一天结束计算ATR计算唐奇安通道追加到已有的mysql数据库中
def system_1_Out(self, PriceNow, TempDonchian10Lower):
# 退出:低于20日最低价多头方向,空头以突破20日最高价为止损价格--有持仓且价格向下突破
if self.TrigerTime != 0 and PriceNow < TempDonchian10Lower:
# 退出
return True
else:
return False
def add(self, PriceNow):
"""加仓
"""
if self.TrigerTime < 4 and PriceNow > self.BuyStates[self.TrigerTime - 1].add_price:#todo BuyStates是空的
# 买入
return True
else:
return False
def system_1_stop(self, PriceNow):
"""止损判断:如果当前价格<上一次买入后的止损价格则止损
"""
if PriceNow < self.BuyStates[self.TrigerTime - 1].stop_price:
# 买入
return True
else:
return False
def day_end(self):
"""Save current state to YAML file at the end of the day"""
# Create state directory if not exists
state_dir = "state"
if not os.path.exists(state_dir):
os.makedirs(state_dir)
# Generate filename with current date
today = datetime.now().strftime("%Y-%m-%d")
filename = os.path.join(state_dir, f"{today}.yaml")
# Save state to YAML
state_data = {
"main_state": {
"user_email": self.user_email,
"email_events": self.email_events,
"turtles": [
{
"TradeCode": t.TradeCode,
"type": t.type,
"riskcoe": t.riskcoe,
"Capital": t.Capital,
"cash": t.cash,
"TrigerTime": t.TrigerTime,
"BuyStates": [vars(bs) for bs in t.BuyStates],
"tradeslog": [vars(tl) for tl in t.tradeslog],
"BreakOutLog": [vars(bol) for bol in t.BreakOutLog]
} for t in self.turtles
]
}
}
with open(filename, 'w') as f:
yaml.dump(state_data, f)
class TurtleTrading_OnTime(object):
''' 实时监测主程序可以处理多个turtle
1、获取实时大盘数据
2、根据turtles的代码比较是否触发条件
3、实时监测主流程
'''
def load_previous_state(self):
"""Load previous state from YAML file if exists"""
state_dir = "state"
today = datetime.now().strftime("%Y-%m-%d")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
filename = os.path.join(state_dir, f"{yesterday}.yaml")
if os.path.exists(filename):
with open(filename, 'r') as f:
state_data = yaml.safe_load(f)
# Restore state
for turtle_data in state_data.get('turtles', []):
# Find or create TurtleTrading instance
turtle = next((t for t in self.turtles if t.TradeCode == turtle_data['TradeCode']), None)
if not turtle:
# Create new instance if not found (should not happen)
turtle = TurtleTrading(**turtle_data)
self.turtles.append(turtle)
# Restore attributes
turtle.TradeCode = turtle_data['TradeCode']
turtle.type = turtle_data['type']
turtle.riskcoe = turtle_data['riskcoe']
turtle.Capital = turtle_data['Capital']
turtle.cash = turtle_data['cash']
turtle.TrigerTime = turtle_data['TrigerTime']
turtle.BuyStates = [BuyState(**bs) for bs in turtle_data['BuyStates']]
turtle.tradeslog = [TradeLog(**tl) for tl in turtle_data['tradeslog']]
turtle.BreakOutLog = [BreakOutLog(**bol) for bol in turtle_data['BreakOutLog']]
turtle.PriceNow = turtle_data['PriceNow']
turtle.Donchian_20_up = turtle_data['Donchian_20_up']
turtle.Donchian_10_down = turtle_data['Donchian_10_down']
turtle.Donchian_50_up = turtle_data['Donchian_50_up']
turtle.is_gap_up = turtle_data['is_gap_up']
turtle.prev_heigh = turtle_data['prev_heigh']
def __init__(self, turtles: list[TurtleTrading], user_email):
self.turtles = turtles # List of TurtleTrading instances
self.user_email = user_email
self.email_events = {} # Track email response events for each turtle
# Load previous state from YAML if exists
self.load_previous_state()
def get_stocks_data(self):
"""获取实时股票、基金数据,不保存
"""
stock_data = ak.stock_zh_a_spot_em()
stock_data = stock_data.dropna(subset=['最新价'])
# # print(stock_zh_a_spot_df)
# # stock_zh_a_spot_df第一列加上时间精确到分钟
# stock_zh_a_spot_df['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# mysql_database.insert_db(stock_zh_a_spot_df, "stock_price", True, "代码")
etf_data = ak.fund_etf_spot_em()
# etf_data = ak.fund_etf_spot_ths()
# etf_data = etf_data.dropna(subset=['当前-单位净值'])
etf_data = etf_data.dropna(subset=['最新价'])
# etf_data['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# mysql_database.insert_db(etf_data, "etf_price", True, "代码")
return stock_data, etf_data
def Buy_stock(self, turtle: TurtleTrading, price_now):
# 发送邮件 代码self.turtle.TradeCode, 建议买入价格price_now买入份额self.turtle.IntPositionSize
subject = "买入"
body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize} \n "
body += "回复:实际买入价格-买入份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, buy_price, buy_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功买入
turtle.TrigerTime += 1
# 记录self.turtle.BuyStates
add_price = buy_price + 1/2 * turtle.N
stop_price = buy_price - 2 * turtle.N
cost = buy_price * buy_share - fee
available_cash = turtle.Capital - cost
buy_this_time = BuyState(turtle.TrigerTime,
buy_price,
add_price,
stop_price,
False,
buy_share,
turtle.N,
available_cash)
turtle.BuyStates.append(buy_this_time)
# 记录self.turtle.tradeslog
today = datetime.now().strftime("%Y-%m-%d")
log_this_time = TradeLog(today,
"买入",
buy_price,
buy_share,
cost,
turtle.N,
available_cash,
all_shares=buy_share,
all_cost=cost,
Net_value=buy_price * buy_share,
Net_return=0)
turtle.tradeslog.append(log_this_time)
def add_stock(self, turtle: TurtleTrading, price_now):
"""加仓
Args:
price_now (_type_): 现价
"""
# 加仓
subject = "加仓"
body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize} \n "
body += "回复:实际买入价格-买入份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, buy_price, buy_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功买入
turtle.TrigerTime += 1
# 记录self.turtle.BuyStates
add_price = buy_price + 1/2 * turtle.N
stop_price = buy_price - 2 * turtle.N
cost = buy_price * buy_share - fee
available_cash = turtle.BuyStates[-1].available_cash - cost
all_shares = buy_share + turtle.BuyStates[-1].all_shares
all_cost = cost + turtle.BuyStates[-1].all_cost
net_value = buy_price * all_shares
net_return = net_value - all_cost
buy_this_time = BuyState(turtle.TrigerTime,
buy_price,
add_price,
stop_price,
turtle.is_gap_up,
buy_share,
turtle.N,
available_cash)
turtle.BuyStates.append(buy_this_time)
today = datetime.now().strftime("%Y-%m-%d")
log_this_time = TradeLog(today,
"加仓",
buy_price,
buy_share,
cost,
turtle.N,
available_cash,
all_shares,
all_cost,
net_value,
net_return)
turtle.tradeslog.append(log_this_time)
# 处理其他次买入的止损价格
# 检查BuyStates中有几个gap_up,返回个数和索引
gap_up_num = 0
gap_up_index = []
for i in range(len(turtle.BuyStates)):
if turtle.BuyStates[i].is_gap_up:
gap_up_num += 1
gap_up_index.append(i)
if gap_up_num == 0:
# 之前BuyStates中的stop_price = stop_price
for j in range(len(turtle.BuyStates)):
turtle.BuyStates[j].stop_price = stop_price
if not turtle.is_gap_up and gap_up_num == 1:
if gap_up_index[0] == 1:
number_tobe_change = turtle.TrigerTime -1 - gap_up_index[0]
for k in range(number_tobe_change):
turtle.BuyStates[k+1].stop_price = stop_price
elif gap_up_index[0] == 2:
turtle.BuyStates[2].stop_price = stop_price
elif not turtle.is_gap_up and gap_up_num == 2:
number_tobe_change = 2
for k in range(number_tobe_change):
turtle.BuyStates[k+1].stop_price = stop_price
def stop_sale_stock(self, turtle: TurtleTrading, price_now):
"""止损卖出
Args:
price_now (_type_): 现价
"""
# 判断需要卖出几份
sale_shares = 0
for i in range(len(turtle.BuyStates)):
if price_now <= turtle.BuyStates[i].stop_price:
sale_shares += 1
break
# 比较price_now与self.turtle.BuyStates[-1].stop_price
# 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now卖出份额self.turtle.IntPositionSize
subject = "止损卖出"
body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize * sale_shares} \n "
body += "回复:实际卖出价格-卖出份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, sale_price, sale_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功卖出
turtle.TrigerTime -= sale_shares
# 记录self.turtle.BuyStates
available_cash = turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee
# 删除BuyStates中卖出股票的记录
turtle.BuyStates = turtle.BuyStates[:-sale_shares]
sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"),
"止损",
sale_price,
sale_share,
sale_price * sale_share - fee,
turtle.N,
available_cash,
all_shares=0,
all_cost=0,
Net_value=sale_price * sale_share,
Net_return=abs(turtle.Capital - available_cash))
turtle.tradeslog.append(sale_this_time)
def out_sale_stock(self, turtle: TurtleTrading, price_now):
"""止盈卖出
Args:
price_now (_type_): 现价
"""
# 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now卖出份额self.turtle.IntPositionSize
subject = "止盈卖出"
body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize} \n "
body += "回复:实际卖出价格-卖出份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, sale_price, sale_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功卖出
turtle.TrigerTime = 0
# 记录self.turtle.BuyStates
available_cash = turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee
turtle.BuyStates = []
sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"),
"止盈",
sale_price,
sale_share,
sale_price * sale_share - fee,
turtle.N,
available_cash,
all_shares=0,
all_cost=0,
Net_value=sale_price * sale_share,
Net_return=abs(turtle.Capital - available_cash))
turtle.tradeslog.append(sale_this_time)
# def run_short_trading_loop(self, stock_data, etf_data):
# now = datetime.now().time()
# # 根据类型获取当前价格
# if self.turtle.type == "stock":
# self.turtle.PriceNow = float(stock_data.loc[etf_data['代码'] == self.turtle.TradeCode, '最新价'].values[0])
# elif self.turtle.type == "etf":
# # self.turtle.PriceNow = float(etf_data.loc[etf_data['基金代码'] == self.turtle.TradeCode, '当前-单位净值'].values[0])
# self.turtle.PriceNow = float(etf_data.loc[etf_data['代码'] == self.turtle.TradeCode, '最新价'].values[0])
# # # 9点30 判断是否跳空高开
# if now.hour == 9 and now.minute == 30 and self.turtle.PriceNow > self.turtle.prev_heigh:
# self.turtle.is_gap_up = True
# # 判断当前仓位状态并执行相应操作
# if self.turtle.TrigerTime == 0:
# # 空仓状态
# if self.turtle.system1EnterNormal(
# self.turtle.PriceNow,
# self.turtle.Donchian_20_up,
# self.turtle.BreakOutLog
# ):
# self.Buy_stock(self.turtle.PriceNow)
# # 突破 记录self.turtle.breakoutlog
# today = datetime.now().strftime("%Y-%m-%d")
# breakout_this_time = BreakOutLog(today,
# self.turtle.Donchian_20_up,
# self.turtle.Donchian_20_up - 2 * self.turtle.N,
# 'valid',
# None)
# self.turtle.BreakOutLog.append(breakout_this_time)
# elif self.turtle.system1EnterSafe(
# self.turtle.PriceNow,
# self.turtle.Donchian_50_up
# ):
# self.Buy_stock(self.turtle.PriceNow)
# elif 1 <= self.turtle.TrigerTime <= 3:
# # # 突破状态
# # if self.turtle.system1EnterNormal(
# # self.turtle.PriceNow,
# # self.turtle.Donchian_20_up,
# # self.turtle.BreakOutLog
# # ):
# # self.Buy_stock(self.turtle.PriceNow)
# # elif self.turtle.system1EnterSafe(
# # self.turtle.PriceNow,
# # self.turtle.Donchian_50_up
# # ):
# # self.Buy_stock(self.turtle.PriceNow)
# # 加仓状态
# if self.turtle.add(self.turtle.PriceNow):
# self.add_stock(self.turtle.PriceNow)
# # 止损状态
# elif self.turtle.system_1_stop(self.turtle.PriceNow):
# self.stop_sale_stock(self.turtle.PriceNow)
# # 止盈
# elif self.turtle.system_1_Out(
# self.turtle.PriceNow,
# self.turtle.Donchian_10_down
# ):
# self.out_sale_stock(self.turtle.PriceNow)
# elif self.turtle.TrigerTime == 4:
# # 满仓 止损 止盈
# if self.turtle.system_1_stop(self.turtle.PriceNow):
# self.stop_sale_stock(self.turtle.PriceNow)
# elif self.turtle.system_1_Out(
# self.turtle.PriceNow,
# self.turtle.Donchian_10_down
# ):
# self.out_sale_stock(self.turtle.PriceNow)
# # 等待 1 分钟后下一次循环
# time.sleep(60)
def Start_short_system(self):
"""启动short系统
"""
# ------------------准备阶段--------------------
# 获取数据或读取数据 -- 计算ATR Donchian 20 50 up, 20 down
# 初始化所有turtle
for turtle in self.turtles:
# 准备数据
turtle.get_ready(100)
turtle.N = float(turtle.CurrentData['ATR'].iloc[-1])
turtle.prev_heigh = float(turtle.CurrentData['最高价'].iloc[-1])
turtle.Donchian_20_up = float(turtle.CurrentData['Donchian_20_upper'].iloc[-1])
turtle.Donchian_50_up = float(turtle.CurrentData['Donchian_50_upper'].iloc[-1])
turtle.Donchian_10_down = float(turtle.CurrentData['Donchian_10_lower'].iloc[-1])
turtle.CalPositionSize()
# ------------------实时监测阶段--------------------
# 9:00 1、判断是否是新的一周是则重新计算Position Size
# 判断是否是新的一周
if datetime.now().weekday() == 0:
for turtle in self.turtles:
turtle.CalPositionSize()
# 每分钟获取一次数据,判断是否触发条件 9:30-11:30 13:00-15:00
while True:
# 获取当前时间
now = datetime.now().time()
# 判断当前时间是否在交易时段内9:30-11:30 或 13:00-15:00
is_trading_time = (
(now.hour == 9 and now.minute >= 30) or
(now.hour == 10 and 0 <= now.minute <= 59) or
(now.hour == 11 and now.minute <= 30) or
(now.hour == 13 and 0 <= now.minute <= 59) or
(now.hour == 14 and 0 <= now.minute <= 59) or
(now.hour == 15 and now.minute <= 0)
)
if not is_trading_time:
# 非交易时间,等待 1 分钟后继续循环
time.sleep(60)
continue
is_stop_time = (now.hour > 15 and now.minute > 0) #收盘时间
if is_stop_time:
break
# 获取股票和ETF数据
self.monitor_all_turtles()
# 等待一段时间后再次检查
time.sleep(60) # 每分钟检查一次
# ------------------结束阶段--------------------
# 数据库更新当天数据增加ATR、donchian数据
# 直接做个新表
for turtle in self.turtles:
mysql_database.delete_table(f"{turtle.TradeCode}")
turtle.get_ready(100)
time.sleep(16.5*600)
def monitor_all_turtles(self):
"""主监控循环"""
# 获取实时数据
stock_data, etf_data = self.get_stocks_data()
# 遍历所有turtle进行监控
for turtle in self.turtles:
self.monitor_single_turtle(turtle, stock_data, etf_data)
def monitor_single_turtle(self, turtle: TurtleTrading, stock_data, etf_data):
"""监控单个turtle的交易条件"""
now = datetime.now().time()
if turtle.type == "stock":
turtle.PriceNow = float(stock_data.loc[etf_data['代码'] == self.turtle.TradeCode, '最新价'].values[0])
elif turtle.type == "etf":
# self.turtle.PriceNow = float(etf_data.loc[etf_data['基金代码'] == self.turtle.TradeCode, '当前-单位净值'].values[0])
turtle.PriceNow = float(etf_data.loc[etf_data['代码'] == self.turtle.TradeCode, '最新价'].values[0])
if now.hour == 9 and now.minute == 30 and self.turtle.PriceNow > self.turtle.prev_heigh:
turtle.is_gap_up = True
# 判断当前仓位状态并执行相应操作
if turtle.TrigerTime == 0:
if turtle.system1EnterNormal(
turtle.PriceNow,
turtle.Donchian_20_up,
turtle.BreakOutLog
):
self.start_email_thread(turtle, "买入", turtle.PriceNow)
# 突破 记录self.turtle.breakoutlog
today = datetime.now().strftime("%Y-%m-%d")
breakout_this_time = BreakOutLog(today,
turtle.Donchian_20_up,
turtle.Donchian_20_up - 2 * turtle.N,
'valid',
None)
turtle.BreakOutLog.append(breakout_this_time)
elif turtle.system1EnterSafe(
turtle.PriceNow,
turtle.Donchian_50_up
):
self.start_email_thread(turtle, "买入", turtle.PriceNow)
elif 1 <= turtle.TrigerTime <= 3:
# 加仓状态
if turtle.add(turtle.PriceNow):
self.start_email_thread(turtle, "加仓", turtle.PriceNow)
# 止损状态
elif turtle.system_1_stop(turtle.PriceNow):
self.start_email_thread(turtle, "止损", turtle.PriceNow)
# 止盈
elif turtle.system_1_Out(
turtle.PriceNow,
turtle.Donchian_10_down
):
self.start_email_thread(turtle, "止盈", turtle.PriceNow)
elif turtle.TrigerTime == 4:
# 满仓 止损 止盈
if turtle.system_1_stop(turtle.PriceNow):
self.start_email_thread(turtle, "止损", turtle.PriceNow)
elif turtle.system_1_Out(
turtle.PriceNow,
turtle.Donchian_10_down
):
self.start_email_thread(turtle, "止盈", turtle.PriceNow)
def start_email_thread(self, turtle:TurtleTrading, action, price_now):
"""启动邮件处理线程"""
event = threading.Event()
self.email_events[turtle.TradeCode] = event
thread = threading.Thread(
target=self.handle_email_response,
args=(turtle, action, price_now, event)
)
thread.start()
def handle_email_response(self, turtle:TurtleTrading, action, price_now, event):
"""处理邮件响应的线程"""
# 发送邮件
if action == "买入":
self.Buy_stock(turtle, price_now)
elif action == "加仓":
self.add_stock(turtle, price_now)
elif action == "止损":
self.stop_sale_stock(turtle, price_now)
elif action == "止盈":
self.out_sale_stock(turtle, price_now)
# 等待邮件响应
event.wait()
if __name__ == '__main__':
user_email = "guoyize2209@163.com"
t = TurtleTrading('513870', "etf", 0.0025, 100000, 200000)
# t.get_ready(100)
a = TurtleTrading_OnTime(t, user_email)
a.Start_short_system()
# # 全是股票
# stock_zh_a_spot_df = ak.stock_zh_a_spot_em()
# # stock_zh_a_spot_df.to_csv("stock_zh_a_spot.txt", sep="\t", index=False, encoding="utf-8")
# stock_zh_a_spot_df = stock_zh_a_spot_df.dropna(subset=['最新价'])
# print(stock_zh_a_spot_df)
# # 全是基金
# etf_data = ak.fund_etf_spot_em()
# etf_data = etf_data.dropna(subset=['最新价'])
# etf_data.to_csv("fund_etf_spot.txt", sep="\t", index=False, encoding="utf-8")
# print(etf_data)