import numpy as np import math import akshare as ak import os from datetime import datetime, timedelta, date import pandas as pd import mplfinance as mpf import mysql_database from EmailTest import send_email, parse_return_email from dataclasses import dataclass import time import threading import yaml # 添加YAML支持 import logging from stock_em import stock_zh_a_spot_em from etf_em import fund_etf_spot_em import random import urllib.parse import requests ''' todo v0.1.1 运行过程框架调整,支持多个turtle同时监测 done v0.1.2 增加运行状态写入yaml文件,读取文件恢复状态 done v0.1.3 测试每分钟获取实时信息,是否稳定。 done v0.1.4 etf实时数据使用异步方式,获取速度加快了 done v0.1.5 获取数据,调用clash代理 done v0.1.6 把东方财富网写进规则代理 done v0.1.7 每天定时启动 done ''' @dataclass class BuyState: trigger_time: float # 触发次数 buy_price: float # 买入价格 add_price: float # 加仓价格 stop_price: float # 止损价格 is_gap_up: bool # 是否跳空高开 shares: int # 买入股数 atr: int # ATR available_cash: float # 可用资金 @dataclass class TradeLog: data: str # 时间 type: str # 操作类型 buy_price: float # 买入价格 shares: int # 买入股数 cost: float # 成本 atr: int # ATR available_cash: float # 可用资金 all_shares: float # 总股数 all_cost: float # 总成本 Net_value: float # 净值 Net_return: float # 净收益 @dataclass class BreakOutLog: # 记录突破信息 data: str # 时间 breakout_price: float # 突破价格 lose_price: float # 亏损价格 valid_or_not: str # 是否有效 win_or_lose: bool # 是否盈利 def calc_sma_atr_pd(kdf,period): """计算TR与ATR Args: kdf (_type_): 历史数据 period (_type_): ATR周期 Returns: _type_: 返回kdf,增加TR与ATR列 """ kdf['最高'] = kdf['最高'].astype(float) kdf['最低'] = kdf['最低'].astype(float) kdf['收盘'] = kdf['收盘'].astype(float) kdf['HL'] = kdf['最高'] - kdf['最低'] kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1)) kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1)) kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3) # ranges = pd.concat([high_low, high_close, low_close], axis=1) # true_range = np.max(ranges, axis=1) kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3) return kdf.drop(['HL','HC','LC'], axis = 1) class TurtleTrading(object): """对象范围较小,对某一个标的创建一个海龟,如513300, 计算ATR、唐奇安通道线 基础数据 Args: object (_type_): _description_ """ def __init__(self, TradeCode, type, riskcoe, Capital, cash) -> None: self.TradeCode = TradeCode self.type = type self.riskcoe = riskcoe self.Capital = Capital self.cash = cash self.TrigerTime = 0 self.BuyStates = [] self.tradeslog = [] self.BreakOutLog = [] self.PriceNow = 0.0 self.Donchian_20_up = 0.0 self.Donchian_10_down = 0.0 self.Donchian_50_up = 0.0 self.is_gap_up = False # 是否跳空高开 self.prev_heigh = 0.0 # 前一天最高价 def GetRecentData(self): """获取某个标的的最近数据,从两年前到今天, 计算后的数据保存在self.CurrentData Returns: _type_: _description_ """ Today = datetime.today() # print(Today) formatted_date = Today.strftime("%Y%m%d") two_years_ago = (date.today() - timedelta(days=365*2)).strftime("%Y%m%d") # print(formatted_date) Code = f"{self.TradeCode}" CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date=two_years_ago, end_date=formatted_date, adjust="") # 将日期列转换为datetime CurrentData = pd.DataFrame(CurrentData) CurrentData['日期'] = pd.to_datetime(CurrentData['日期']) # print(type(CurrentData['日期'].iloc[0])) CurrentData.set_index('日期', inplace=True) # CurrentData.reset_index(inplace=True) # print(type(CurrentData['日期'].iloc[0])) # create table # stock_database.create_table(Code) # stock_database.insert_data(Code, CurrentData) # mysql_database.insert_db(CurrentData, Code, True, "'日期'") self.CurrentData = CurrentData # return self.CurrentData def CalATR(self, data, ATRday): """计算某个标的的ATR,从上市日到今天, 计算后的数据保存在self.CurrentData Args: ATRday: 多少日ATR SaveOrNot (_type_): 是否保存.csv数据 """ self.CurrentData = calc_sma_atr_pd(data, ATRday) self.N = self.CurrentData['ATR'] # return self.N def ReadExistData(self, data): """除了通过发请求获取数据,也可以读本地的数据库,赋值给self.CurrentData Args: data (_type_): 本地csv名称 """ self.CurrentData = pd.read_csv(data) def DrawKLine(self, days): """画出k线图看看,画出最近days天的K线图 """ # 日期部分 # dates = pd.to_datetime(self.CurrentData['日期'][-days:]) # # Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期']) Klinedf = pd.DataFrame() # Klinedf.set_index = Klinedf['Data'] # 其他数据 Klinedf['Date'] = self.CurrentData['日期'][-days:] Klinedf['Open'] = self.CurrentData['开盘'][-days:].astype(float) Klinedf['High'] = self.CurrentData['最高'][-days:].astype(float) Klinedf['Low'] = self.CurrentData['最低'][-days:].astype(float) Klinedf['Close'] = self.CurrentData['收盘'][-days:].astype(float) Klinedf['Volume'] = self.CurrentData['成交量'][-days:].astype(float) Klinedf.set_index(pd.to_datetime(Klinedf['Date']), inplace=True) # 画图 mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian_up['Upper'][-days:]), mpf.make_addplot(self.Donchian_down['lower'][-days:])], title=f"{self.TradeCode} K线图") def calculate_donchian_channel_up(self, n): """ 计算n日唐奇安上通道 参数: self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame,包含"High" n (int): 时间周期 返回:self.Donchian DataFrame: 唐奇安通道的DataFrame,包含"Upper" """ Donchian = pd.DataFrame() # 创建一个空的DataFrame用于存储唐奇安通道数据 # 计算最高价和最低价的N日移动平均线 name = 'Donchian_' + str(n) + '_upper' Donchian[name] = self.CurrentData['最高'].rolling(n).max() # 使用rolling函数计算n日最高价的移动最大值 # # 计算中间线 # Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 # 计算上通道和下通道的中间线,但此行代码被注释掉了 return Donchian # 返回包含唐奇安上通道的DataFrame def calculate_donchian_channel_down(self, n): """ 计算n日唐奇安上通道 参数: self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame,包含"High" n (int): 时间周期 返回:self.Donchian DataFrame: 唐奇安通道的DataFrame,包含"Upper" """ Donchian = pd.DataFrame() # 计算最高价和最低价的N日移动平均线 name = 'Donchian_' + str(n) + '_lower' Donchian[name] = self.CurrentData['最低'].rolling(n).min() # # 计算中间线 # Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 return Donchian def calc_atr_donchian_short(self): """计算ATR、短期唐奇安通道 """ # 计算ATR self.CalATR(self.CurrentData, 20) # 计算唐奇安通道 self.Donchian_20_ups = self.calculate_donchian_channel_up(20) self.Donchian_50_ups = self.calculate_donchian_channel_up(50) self.Donchian_downs = self.calculate_donchian_channel_down(10) # 画图 # self.DrawKLine(days) # 把self.N, self.Donchian_up, self.Donchian_down, 添加到self.CurrentData后面,保存到mysql数据库 self.CurrentData = pd.concat([self.CurrentData, self.Donchian_20_ups, self.Donchian_50_ups, self.Donchian_downs], axis=1) def get_ready(self, days): """创建一个turtle对象,获取数据,计算ATR,计算唐奇安通道 Args: days (_type_): _description_ n (_type_): _description_ """ # if 不存在database if not mysql_database.check_db_table(f"{self.TradeCode}"): self.GetRecentData() self.calc_atr_donchian_short() Code = f"{self.TradeCode}" mysql_database.insert_db(self.CurrentData, Code, True, "日期") else: # 检查数据库最后一条的时间距离今天是否两天以上 current_date = date.today() threshold_date = current_date - timedelta(days=2) last_update = mysql_database.check_db_table_last_date(f"{self.TradeCode}") if last_update < threshold_date: # 如果不存在,则从akshare获取数据并保存到mysql数据库 mysql_database.delete_table(f"{self.TradeCode}") self.GetRecentData() self.calc_atr_donchian_short() Code = f"{self.TradeCode}" mysql_database.insert_db(self.CurrentData, Code, True, "日期") else: # 如果存在,则从mysql数据库中读取数据 self.CurrentData = mysql_database.fetch_all_data(f"{self.TradeCode}") def CalPositionSize(self): """根据风险系数、ATR,计算仓位大小, 存于self.IntPositionSize """ PositionSize = self.riskcoe * self.Capital /(self.N) # 默认用股票形式了 100 self.IntPositionSize = int(PositionSize // 100) * 100 def system1EnterNormal(self, PriceNow, TempDonchian20Upper, BreakOutLog): # 没有持仓且价格向上突破---此时包含两种情形:1 对某标的首次使用系统,2 已发生过突破,此时上次突破天然是失败的 if self.TrigerTime == 0 and PriceNow > TempDonchian20Upper: # 买入 return True elif PriceNow > TempDonchian20Upper:#todo !=0不会满足条件 先跳过 self.system1BreakoutValid(PriceNow) if BreakOutLog[-1].win_or_lose == None: # TT!= 0且突破且上一次突破unseccessful return True else: return False else: return False def system1EnterSafe(self, PriceNow, TempDonchian55Upper): if PriceNow > TempDonchian55Upper: # 保底的55日突破 return True else: return False def system1BreakoutValid(self, priceNow): """判断前一次突破是否成功,是log[-1][5]写入“win”,否则写入“Lose” """ if priceNow < self.BreakOutLog[-1].lose_price: self.BreakOutLog[-1].win_or_lose = None else: self.BreakOutLog[-1].win_or_lose = True # 一天结束,计算ATR,计算唐奇安通道,追加到已有的mysql数据库中 def system_1_Out(self, PriceNow, TempDonchian10Lower): # 退出:低于20日最低价(多头方向),空头以突破20日最高价为止损价格--有持仓且价格向下突破 if self.TrigerTime != 0 and PriceNow < TempDonchian10Lower: # 退出 return True else: return False def add(self, PriceNow): """加仓 """ if self.TrigerTime < 4 and PriceNow > self.BuyStates[self.TrigerTime - 1].add_price:#todo BuyStates是空的 # 买入 return True else: return False def system_1_stop(self, PriceNow): """止损判断:如果当前价格<上一次买入后的止损价格则止损 """ if PriceNow < self.BuyStates[self.TrigerTime - 1].stop_price: # 买入 return True else: return False class TurtleTrading_OnTime(object): ''' 实时监测主程序,可以处理多个turtle 1、获取实时大盘数据 2、根据turtles的代码,比较是否触发条件 3、实时监测主流程 ''' def __init__(self, turtles: list[TurtleTrading], user_email): self.turtles = turtles # List of TurtleTrading instances self.user_email = user_email self.email_events = {} # Track email response events for each turtle logging.basicConfig(level=logging.INFO) # Load previous state from YAML if exists self.load_previous_state() self.clash_api = "http://127.0.0.1:9090" self.node_group_name_encoded = urllib.parse.quote("🚀代理线路", safe='') # 根据 Clash 中策略组的名字填写 self.clash_secret = "6Gp-fdt-veS-ugv" self.node_names = [ "R1-0|香港-NF|深|负载均衡", "R1-1|香港-NF|粤|负载均衡", "R1-2|香港-NF|沪|负载均衡", "R1-3|香港-NF|湘|负载均衡", "R2-1|香港-NF|HKT家宽", "R2-2|香港-NF|HKT家宽", "R2-3|香港-NF|HKT家宽", "R2-5|香港-NF|HKT家宽", "R3-1|香港-NF|BGP静态", "R5-1|日本-NF|GMO|原生IP", "R5-4|日本-NF|IIJ|精品", "R9-1|狮城-NF|FDC", "D1-1直连|香港-NF", "D1-2直连|香港-NF", "D1-3直连|香港-NF" ] def load_previous_state(self): """Load previous state from YAML file if exists""" state_dir = "state" today = datetime.now().strftime("%Y-%m-%d") yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") filename = os.path.join(state_dir, f"{yesterday}.yaml") if os.path.exists(filename): with open(filename, 'r') as f: state_data = yaml.safe_load(f) main_state = state_data.get('main_state', {}) # Restore state try: for turtle_data in main_state.get('turtles', []): # Find or create TurtleTrading instance turtle = next((t for t in self.turtles if t.TradeCode == turtle_data['TradeCode']), None) if not turtle: # Create new instance if not found (should not happen) turtle = TurtleTrading(**turtle_data) self.turtles.append(turtle) # Restore attributes turtle.TradeCode = turtle_data['TradeCode'] turtle.type = turtle_data['type'] turtle.riskcoe = turtle_data['riskcoe'] turtle.Capital = turtle_data['Capital'] turtle.cash = turtle_data['cash'] turtle.TrigerTime = turtle_data['TrigerTime'] turtle.BuyStates = [BuyState(**bs) for bs in turtle_data['BuyStates']] turtle.tradeslog = [TradeLog(**tl) for tl in turtle_data['tradeslog']] turtle.BreakOutLog = [BreakOutLog(**bol) for bol in turtle_data['BreakOutLog']] except Exception as e: logging.error(f"Error loading previous state: {e}") def switch_random_node(self): '''随机切换clash节点 ''' selected_node = random.choice(self.node_names) url = f"{self.clash_api}/proxies/{self.node_group_name_encoded}" headers = { "Authorization": f"Bearer {self.clash_secret}" } try: res = requests.put(url, headers=headers, json={"name": selected_node}) if res.status_code == 204: print(f"[✓] 成功切换节点为:{selected_node}") else: print(f"[✗] 切换失败:{res.status_code} - {res.text}") except Exception as e: print(f"[!] 请求失败: {e}") def day_end(self): """Save current state to YAML file at the end of the day""" # Create state directory if not exists state_dir = "state" if not os.path.exists(state_dir): os.makedirs(state_dir) # Generate filename with current date today = datetime.now().strftime("%Y-%m-%d") filename = os.path.join(state_dir, f"{today}.yaml") # Save state to YAML state_data = { "main_state": { "user_email": self.user_email, "email_events": self.email_events, "turtles": [ { "TradeCode": t.TradeCode, "type": t.type, "riskcoe": t.riskcoe, "Capital": t.Capital, "cash": t.cash, "TrigerTime": t.TrigerTime, "BuyStates": [vars(bs) for bs in t.BuyStates], "tradeslog": [vars(tl) for tl in t.tradeslog], "BreakOutLog": [vars(bol) for bol in t.BreakOutLog] } for t in self.turtles ] } } with open(filename, 'w') as f: yaml.dump(state_data, f) def get_stocks_data(self): """获取实时股票、基金数据,不保存 """ try: self.switch_random_node() stock_data = stock_zh_a_spot_em() stock_data = stock_data.dropna(subset=['最新价']) etf_data = fund_etf_spot_em() etf_data = etf_data.dropna(subset=['最新价']) # 成功调用,使用logging记录日志,加上时间 logging.info(f"Successfully fetched stock and ETF data at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") except Exception as e: logging.error(f"Error occurred while getting stock data: {e}") return None, None return stock_data, etf_data def Buy_stock(self, turtle: TurtleTrading, price_now): # 发送邮件 代码self.turtle.TradeCode, 建议买入价格price_now,买入份额self.turtle.IntPositionSize subject = "买入" body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize} \n " body += "回复:实际买入价格-买入份额-手续费" send_email(subject, body, self.user_email) # send_email_time = datetime.strftime(datetime.now(),"%Y%m%d %H:%M:%S").date() send_email_time = datetime.now().date() #每隔1分钟检测回信,解析邮件。 parsed_email_flag = False while not parsed_email_flag: time.sleep(60) # 每次尝试前等待 60 秒 parse_states, buy_price, buy_share, fee = parse_return_email( self.user_email, send_email_time ) if parse_states: parsed_email_flag = True break # 成功买入 turtle.TrigerTime += 1 # 记录self.turtle.BuyStates add_price = buy_price + 1/2 * turtle.N stop_price = buy_price - 2 * turtle.N cost = buy_price * buy_share - fee available_cash = turtle.Capital - cost buy_this_time = BuyState(turtle.TrigerTime, buy_price, add_price, stop_price, False, buy_share, turtle.N, available_cash) turtle.BuyStates.append(buy_this_time) # 记录self.turtle.tradeslog today = datetime.now().strftime("%Y-%m-%d") log_this_time = TradeLog(today, "买入", buy_price, buy_share, cost, turtle.N, available_cash, all_shares=buy_share, all_cost=cost, Net_value=buy_price * buy_share, Net_return=0) turtle.tradeslog.append(log_this_time) def add_stock(self, turtle: TurtleTrading, price_now): """加仓 Args: price_now (_type_): 现价 """ # 加仓 subject = "加仓" body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize} \n " body += "回复:实际买入价格-买入份额-手续费" send_email(subject, body, self.user_email) send_email_time = datetime.now().date() #每隔1分钟检测回信,解析邮件。 parsed_email_flag = False while not parsed_email_flag: time.sleep(60) # 每次尝试前等待 60 秒 parse_states, buy_price, buy_share, fee = parse_return_email( self.user_email, send_email_time ) if parse_states: parsed_email_flag = True break # 成功买入 turtle.TrigerTime += 1 # 记录self.turtle.BuyStates add_price = buy_price + 1/2 * turtle.N stop_price = buy_price - 2 * turtle.N cost = buy_price * buy_share - fee available_cash = turtle.BuyStates[-1].available_cash - cost all_shares = buy_share + turtle.BuyStates[-1].all_shares all_cost = cost + turtle.BuyStates[-1].all_cost net_value = buy_price * all_shares net_return = net_value - all_cost buy_this_time = BuyState(turtle.TrigerTime, buy_price, add_price, stop_price, turtle.is_gap_up, buy_share, turtle.N, available_cash) turtle.BuyStates.append(buy_this_time) today = datetime.now().strftime("%Y-%m-%d") log_this_time = TradeLog(today, "加仓", buy_price, buy_share, cost, turtle.N, available_cash, all_shares, all_cost, net_value, net_return) turtle.tradeslog.append(log_this_time) # 处理其他次买入的止损价格 # 检查BuyStates中有几个gap_up,返回个数和索引 gap_up_num = 0 gap_up_index = [] for i in range(len(turtle.BuyStates)): if turtle.BuyStates[i].is_gap_up: gap_up_num += 1 gap_up_index.append(i) if gap_up_num == 0: # 之前BuyStates中的stop_price = stop_price for j in range(len(turtle.BuyStates)): turtle.BuyStates[j].stop_price = stop_price if not turtle.is_gap_up and gap_up_num == 1: if gap_up_index[0] == 1: number_tobe_change = turtle.TrigerTime -1 - gap_up_index[0] for k in range(number_tobe_change): turtle.BuyStates[k+1].stop_price = stop_price elif gap_up_index[0] == 2: turtle.BuyStates[2].stop_price = stop_price elif not turtle.is_gap_up and gap_up_num == 2: number_tobe_change = 2 for k in range(number_tobe_change): turtle.BuyStates[k+1].stop_price = stop_price def stop_sale_stock(self, turtle: TurtleTrading, price_now): """止损卖出 Args: price_now (_type_): 现价 """ # 判断需要卖出几份 sale_shares = 0 for i in range(len(turtle.BuyStates)): if price_now <= turtle.BuyStates[i].stop_price: sale_shares += 1 break # 比较price_now与self.turtle.BuyStates[-1].stop_price # 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now,卖出份额self.turtle.IntPositionSize subject = "止损卖出" body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize * sale_shares} \n " body += "回复:实际卖出价格-卖出份额-手续费" send_email(subject, body, self.user_email) send_email_time = datetime.now().date() # 每隔1分钟检测回信,解析邮件。 parsed_email_flag = False while not parsed_email_flag: time.sleep(60) # 每次尝试前等待 60 秒 parse_states, sale_price, sale_share, fee = parse_return_email( self.user_email, send_email_time ) if parse_states: parsed_email_flag = True break # 成功卖出 turtle.TrigerTime -= sale_shares # 记录self.turtle.BuyStates available_cash = turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee # 删除BuyStates中卖出股票的记录 turtle.BuyStates = turtle.BuyStates[:-sale_shares] sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"), "止损", sale_price, sale_share, sale_price * sale_share - fee, turtle.N, available_cash, all_shares=0, all_cost=0, Net_value=sale_price * sale_share, Net_return=abs(turtle.Capital - available_cash)) turtle.tradeslog.append(sale_this_time) def out_sale_stock(self, turtle: TurtleTrading, price_now): """止盈卖出 Args: price_now (_type_): 现价 """ # 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now,卖出份额self.turtle.IntPositionSize subject = "止盈卖出" body = f"{turtle.TradeCode},价格{price_now},份额{turtle.IntPositionSize} \n " body += "回复:实际卖出价格-卖出份额-手续费" send_email(subject, body, self.user_email) send_email_time = datetime.now().date() # 每隔1分钟检测回信,解析邮件。 parsed_email_flag = False while not parsed_email_flag: time.sleep(60) # 每次尝试前等待 60 秒 parse_states, sale_price, sale_share, fee = parse_return_email( self.user_email, send_email_time ) if parse_states: parsed_email_flag = True break # 成功卖出 turtle.TrigerTime = 0 # 记录self.turtle.BuyStates available_cash = turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee turtle.BuyStates = [] sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"), "止盈", sale_price, sale_share, sale_price * sale_share - fee, turtle.N, available_cash, all_shares=0, all_cost=0, Net_value=sale_price * sale_share, Net_return=abs(turtle.Capital - available_cash)) turtle.tradeslog.append(sale_this_time) def Start_short_system(self): """启动short系统 """ # ------------------准备阶段-------------------- # 获取数据或读取数据 -- 计算ATR Donchian 20 50 up, 20 down # 初始化所有turtle for turtle in self.turtles: # 准备数据 turtle.get_ready(100) turtle.N = float(turtle.CurrentData['ATR'].iloc[-1]) turtle.prev_heigh = float(turtle.CurrentData['最高'].iloc[-1]) turtle.Donchian_20_up = float(turtle.CurrentData['Donchian_20_upper'].iloc[-1]) turtle.Donchian_50_up = float(turtle.CurrentData['Donchian_50_upper'].iloc[-1]) turtle.Donchian_10_down = float(turtle.CurrentData['Donchian_10_lower'].iloc[-1]) turtle.CalPositionSize() # ------------------实时监测阶段-------------------- # 9:00 1、判断是否是新的一周,是则重新计算Position Size # 判断是否是新的一周 if datetime.now().weekday() == 0: for turtle in self.turtles: turtle.CalPositionSize() # 每分钟获取一次数据,判断是否触发条件 9:30-11:30 13:00-15:00 while True: # 获取当前时间 now = datetime.now().time() # 优先判断是否收盘 is_stop_time = (now.hour >= 15 and now.minute > 5) #收盘时间 # is_stop_time = (now.hour >= 18 and now.minute > 32) #收盘时间 if is_stop_time: break # 判断当前时间是否在交易时段内(9:30-11:30 或 13:00-15:00) is_trading_time = ( (now.hour == 9 and now.minute >= 30) or (now.hour == 10 and 0 <= now.minute <= 59) or (now.hour == 11 and now.minute <= 30) or (now.hour == 13 and 0 <= now.minute <= 59) or (now.hour == 14 and 0 <= now.minute <= 59) or (now.hour == 15 and now.minute <= 0) ) # is_trading_time = True if not is_trading_time: # 非交易时间,等待 1 分钟后继续循环 time.sleep(60) continue # 获取股票和ETF数据 self.monitor_all_turtles() # 等待一段时间后再次检查 time.sleep(random.randint(60, 65))# 每分钟检查一次 # ------------------结束阶段-------------------- # 数据库更新当天数据,增加ATR、donchian数据 # 直接做个新表 for turtle in self.turtles: mysql_database.delete_table(f"{turtle.TradeCode}") turtle.get_ready(100) self.day_end() time.sleep(16.5*600) def monitor_all_turtles(self): """主监控循环""" # 获取实时数据 stock_data, etf_data = self.get_stocks_data() # 遍历所有turtle进行监控 # 为每个 Turtle 启动一个线程 threads = [] for turtle in self.turtles: thread = threading.Thread( target=self.monitor_single_turtle, args=(turtle, stock_data, etf_data) ) thread.start() threads.append(thread) # 可选:等待所有线程完成(如果需要) for thread in threads: thread.join() def monitor_single_turtle(self, turtle: TurtleTrading, stock_data, etf_data): """监控单个turtle的交易条件""" now = datetime.now().time() if turtle.type == "stock": turtle.PriceNow = float(stock_data.loc[stock_data['代码'] == turtle.TradeCode, '最新价'].values[0]) elif turtle.type == "etf": # self.turtle.PriceNow = float(etf_data.loc[etf_data['基金代码'] == self.turtle.TradeCode, '当前-单位净值'].values[0]) turtle.PriceNow = float(etf_data.loc[etf_data['代码'] == turtle.TradeCode, '最新价'].values[0]) if now.hour == 9 and now.minute == 30 and turtle.PriceNow > turtle.prev_heigh: turtle.is_gap_up = True # fake_price = 1.492 # turtle.PriceNow = fake_price # 判断当前仓位状态并执行相应操作 if turtle.TrigerTime == 0: if turtle.system1EnterNormal( turtle.PriceNow, turtle.Donchian_20_up, turtle.BreakOutLog ): self.start_email_thread(turtle, "买入", turtle.PriceNow) # 突破 记录self.turtle.breakoutlog today = datetime.now().strftime("%Y-%m-%d") breakout_this_time = BreakOutLog(today, turtle.Donchian_20_up, turtle.Donchian_20_up - 2 * turtle.N, 'valid', None) turtle.BreakOutLog.append(breakout_this_time) elif turtle.system1EnterSafe( turtle.PriceNow, turtle.Donchian_50_up ): self.start_email_thread(turtle, "买入", turtle.PriceNow) elif 1 <= turtle.TrigerTime <= 3: # 加仓状态 if turtle.add(turtle.PriceNow): self.start_email_thread(turtle, "加仓", turtle.PriceNow) # 止损状态 elif turtle.system_1_stop(turtle.PriceNow): self.start_email_thread(turtle, "止损", turtle.PriceNow) # 止盈 elif turtle.system_1_Out( turtle.PriceNow, turtle.Donchian_10_down ): self.start_email_thread(turtle, "止盈", turtle.PriceNow) elif turtle.TrigerTime == 4: # 满仓 止损 止盈 if turtle.system_1_stop(turtle.PriceNow): self.start_email_thread(turtle, "止损", turtle.PriceNow) elif turtle.system_1_Out( turtle.PriceNow, turtle.Donchian_10_down ): self.start_email_thread(turtle, "止盈", turtle.PriceNow) def start_email_thread(self, turtle:TurtleTrading, action, price_now): """启动邮件处理线程""" self.handle_email_response(turtle, action, price_now) def handle_email_response(self, turtle:TurtleTrading, action, price_now): """处理邮件响应的线程""" try: logging.info("handle_email_response is called with action: {}".format(action)) # 发送邮件 if action == "买入": self.Buy_stock(turtle, price_now) elif action == "加仓": self.add_stock(turtle, price_now) elif action == "止损": self.stop_sale_stock(turtle, price_now) elif action == "止盈": self.out_sale_stock(turtle, price_now) else: logging.warning(f"Unknown action: {action} for TradeCode: {turtle.TradeCode}") except Exception as e: logging.error(f"Error in handle_email_response for TradeCode: {turtle.TradeCode}, Error: {e}") if __name__ == '__main__': user_email = "guoyize2209@163.com" nsdk = TurtleTrading('513870', "etf", 0.0025, 50000, 200000) cjdl = TurtleTrading('600900', "stock", 0.0025, 50000, 200000) # t.get_ready(100) a = TurtleTrading_OnTime([nsdk, cjdl], user_email) a.Start_short_system() # # 全是股票 # stock_zh_a_spot_df = ak.stock_zh_a_spot_em() # # stock_zh_a_spot_df.to_csv("stock_zh_a_spot.txt", sep="\t", index=False, encoding="utf-8") # stock_zh_a_spot_df = stock_zh_a_spot_df.dropna(subset=['最新价']) # print(stock_zh_a_spot_df) # # 全是基金 # etf_data = ak.fund_etf_spot_em() # etf_data = etf_data.dropna(subset=['最新价']) # etf_data.to_csv("fund_etf_spot.txt", sep="\t", index=False, encoding="utf-8") # print(etf_data)