import numpy as np import math import akshare as ak import os from datetime import datetime, timedelta import pandas as pd import mplfinance as mpf def CalTrueFluc(data, day): H_L = data.iloc[day]['最高'] - data.iloc[day]['最低'] H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘'] PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低'] TrueFluc = np.max([H_L, H_PDC, PDC_L]) print('high', data.iloc[day]['最高'], 'low', data.iloc[day]['最低'], 'TrueRange', TrueFluc) return TrueFluc def calc_sma_atr_pd(kdf,period): """计算TR与ATR Args: kdf (_type_): 历史数据 period (_type_): ATR周期 Returns: _type_: 返回kdf,增加TR与ATR列 """ kdf['HL'] = kdf['最高'] - kdf['最低'] kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1)) kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1)) kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3) # ranges = pd.concat([high_low, high_close, low_close], axis=1) # true_range = np.max(ranges, axis=1) kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3) return kdf.drop(['HL','HC','LC'], axis = 1) # A股数据 东方财富网 # all_data = ak.stock_zh_a_spot_em() # 基金实时数据 东方财富网 # fund_etf_spot_em_df = ak.fund_etf_spot_em() # 后复权历史数据 # fund_etf_hist_em_df = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date="20240408", adjust="hfq") # fund_etf_hist_em_df.to_csv('513300data.csv', index=False) # data = pd.read_csv('513300data.csv') # # 一、计算头寸规模 # # 真实波动幅度 = max (H-L, H-pdc, pdc-L) # today = datetime.today() # # print(today) # # print(data.iloc[-1]['成交额']) # TrueFlucs = [] # Nserious = np.zeros(101) # last120days = np.arange(-120, -100) # for i in last120days: # H_L = data.iloc[i]['最高'] - data.iloc[i]['最低'] # H_PDC = data.iloc[i]['最高'] - data.iloc[i-1]['收盘'] # PDC_L = data.iloc[i-1]['收盘'] - data.iloc[i]['最低'] # TrueFlucs.append(np.max([H_L, H_PDC, PDC_L])) # # 求简单平均,放入N序列第一个 # Nsimple = np.average(TrueFlucs) # Nserious[0] = Nsimple # # 计算-21到-1的N # last100days = np.arange(-100, 0) # for i in range(0,100): # day = last100days[i] # H_L = data.iloc[day]['最高'] - data.iloc[day]['最低'] # H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘'] # PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低'] # TrueFluc = np.max([H_L, H_PDC, PDC_L]) # Ntemp = (19 * Nserious[i] + TrueFluc)/20 # Nserious[i+1] = Ntemp # # print(Nserious) # total_rows = len(data) # Ndata = np.zeros(total_rows) # Ndata[total_rows-101:] = Nserious # # NewColumn = [0]*(total_rows-101) + Nserious # data['N'] = Ndata # data.to_csv('513300data-N.csv', index=False) # pass # -----------------------更新atr---------------------- """已有数据与新数据对比,补充新的N,同时更新数据库 """ # Today = datetime.today() # # print(Today) # formatted_date = Today.strftime("%Y%m%d") # # print(formatted_date) # CurrentData = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date=formatted_date, adjust="hfq") # CurrentData = calc_sma_atr_pd(CurrentData, 20) # CurrentData.to_csv('513300data-N.csv', index=False) # pass # ------------------计算头寸规模 资金10w, 1%波动------------ # money = 100000 # OldData = pd.read_csv('513300data-N.csv') # N = OldData.iloc[-1]['ATR'] # # N = 0.473 # Price = OldData.iloc[-1]['收盘'] # # Price = 5.60 # EveryUnit = 0.0025 * money /(N*100*Price) # print('单位',EveryUnit) # print(113*100*Price) class TurtleTrading(object): """对象范围较小,对某一个标的创建一个海龟,如513300, 计算ATR、 Position Size, 买入、卖出、加仓等行为 Args: object (_type_): _description_ """ def __init__(self, TradeCode) -> None: self.TradeCode = TradeCode def GetRecentData(self): Today = datetime.today() # print(Today) formatted_date = Today.strftime("%Y%m%d") # print(formatted_date) Code = f"{self.TradeCode}" CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date="20130101", end_date=formatted_date, adjust="") return CurrentData def CalATR(self, data, ATRday, SaveOrNot): """计算某个标的的ATR,从上市日到今天, 计算后的数据保存在self.CurrentData Args: ATRday: 多少日ATR SaveOrNot (_type_): 是否保存.csv数据 """ self.CurrentData = calc_sma_atr_pd(data, ATRday) self.N = self.CurrentData['ATR'] if SaveOrNot: self.CurrentData.to_csv('513300data-N.csv', index=False) print("csv保存成功") return self.N def CalPositionSize(self, RiskCoef, Capital): """计算PosizionSize 持有的单位,该单位某标的,1N波动对应RiskCoef * Capital资金 Args: RiskCoef (_type_): 风险系数 Capital (_type_): 资金 """ N = self.CurrentData.iloc[-1]['ATR'] # N = 0.473 Price = self.CurrentData.iloc[-1]['收盘'] # Price = 5.60 self.PositionSize = RiskCoef * Capital /( N*100*Price) # 默认用股票形式了 100 return self.PositionSize def ReadExistData(self, data): """除了通过发请求获取数据,也可以读本地的数据库,赋值给self.CurrentData Args: data (_type_): 本地csv名称 """ self.CurrentData = pd.read_csv(data) def DrawKLine(self, days): """画出k线图看看,画出最近days天的K线图 """ # 日期部分 dates = pd.to_datetime(self.CurrentData['日期'][-days:]) # Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期']) Klinedf = pd.DataFrame() # Klinedf.set_index = Klinedf['Data'] # 其他数据 Klinedf['Open'] = self.CurrentData['开盘'][-days:] Klinedf['High'] = self.CurrentData['最高'][-days:] Klinedf['Low'] = self.CurrentData['最低'][-days:] Klinedf['Close'] = self.CurrentData['收盘'][-days:] Klinedf['Volume'] = self.CurrentData['成交量'][-days:] Klinedf.set_index(dates, inplace=True) # 画图 mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian[['Upper', 'Lower']])]) def calculate_donchian_channel(self, days, n): """ 计算唐奇安通道days一共多少日, n多少日唐奇安 参数: self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame,至少包含"High"和"Low"列 n (int): 时间周期 返回:self.Donchian DataFrame: 唐奇安通道的DataFrame,包含"Upper", "Lower", 和 "Middle"列 """ Donchian = pd.DataFrame() # 计算最高价和最低价的N日移动平均线 Donchian['Upper'] = self.CurrentData['最高'][-days:].rolling(n).max() Donchian['Lower'] = self.CurrentData['最低'][-days:].rolling(n).min() # # 计算中间线 # Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 return Donchian class Trade(object): """具有以下功能: 接收Turtle Class作为输入 1 数据准备: a 获取数据 b 计算atr c 计算55日、20日、10日Donchian 2 总持有单位判断 3 定义系统2 超过55日xxx,分段加仓 4 回测功能 5 实时功能 问题实时功能如何同时监控几个item 78 Args: object (_type_): _description_ """ def __init__(self, turtles, riskcoe, cash, StartTime, EndTime) -> None: """接收所有的turtles Args: turtles (_type_): _description_ """ self.turtles = turtles self.riskcoe = riskcoe self.cash = cash self.Capital = cash self.StartTime = StartTime self.EndTime = EndTime self.TrigerTime = 0 # self.BuyStates = { # "trigger_count": 0, # 触发次数 # "BuyPrice": None, # 买入/持有价格 # "StopPrice":None, # 止损价格 # "quantity": 0, # 持有份数 # "N": 0, # ATR # "available_cash": self.cash # 可用资金 # } # 0"trigger_count", 1"BuyPrice", 2"StopPrice", 3"quantity", 4"N", 5"available_cash" self.BuyStates = [[0, None, None, 0, 0, self.cash]] self.tradeslog = [] # 交易记录 self.current_week = None # 当前周数 # def TurtleDataPre(self): # for turtle in self.turtles: # turtle.CalATR(20, True) # turtle.Donchian20 = turtle.calculate_donchian_channel(500, 20) # turtle.Donchian10 = turtle.calculate_donchian_channel(500, 10) # turtle.Donchian55 = turtle.calculate_donchian_channel(500, 55) # pass def PortfolioPositon(self): """总共能持有多少个单位 """ pass def TestBuyStocks(self, PriceNow, date): # 回测中的买入函数 如果开盘价大于55日,最高价大于四份价格,执行加满 # 返回self.BuyStates # 实盘中应该是触发买入信号,发送买入邮件,价格,份数。当前Turtle程序暂停,收到邮件返回,更新买入价格,份数 # 更新BuyStates:"触发次数"、"买入/持有价格"、"持有份数"、"N"、"可用资金" N = self.ThisWeekN Shares = self.ThisWeekPosizionSize # 更新 BuyStates if self.TrigerTime == 0: # 第一次买入是直接修改 # if self.BuyStates[0] == 0: self.TrigerTime += 1 BuyPrice = PriceNow AddPrice = PriceNow + 1/2 * N StopPrice = PriceNow - 2 * N available_cash = self.cash - Shares * BuyPrice # 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash" self.BuyStates = [[self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]] # 更新log # AllShares = sum(row[4] for row in self.BuyStates) NetValue = available_cash + AllShares * BuyPrice self.tradeslog.append([date, 'Buy', Shares, PriceNow, N, available_cash, NetValue]) else: # 加仓的操作,在BuyStates后边追加 self.TrigerTime += 1 BuyPrice = PriceNow AddPrice = PriceNow + 1/2 * N StopPrice = PriceNow - 2 * N available_cash = self.BuyStates[self.TrigerTime-2][6] - Shares * BuyPrice self.BuyStates.append([self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]) # 更新log AllShares = sum(row[4] for row in self.BuyStates) NetValue = available_cash + AllShares * BuyPrice self.tradeslog.append([date, 'Buy', Shares, PriceNow, N, available_cash, NetValue]) pass def TestStopSaleStocks(self, PriceNow, date): # 回测中的卖出函数,仓位全卖 N = self.ThisWeekN # Shares应该等于所有持仓的和 Shares = sum(row[4] for row in self.BuyStates) # Shares = sum(self.BuyStates[:, 4]) available_cash = self.BuyStates[-1][6] + Shares * PriceNow # 更新log NetValue = available_cash self.tradeslog.append([date, 'StopSale', Shares, PriceNow, N, available_cash, NetValue]) # self.trades.append((date, 'Sale', Shares, PriceNow, N, available_cash)) # TrigerTime归0 self.TrigerTime = 0 # self.cash更新 self.cash = available_cash # 回到初始状态 self.BuyStates = [[0, None, None, None, 0, 0, self.cash]] def TestOutSaleStocks(self, PriceNow, date): # 回测中的卖出函数,仓位全卖 N = self.ThisWeekN # Shares应该等于所有持仓的和 Shares = sum(row[4] for row in self.BuyStates) # Shares = sum(self.BuyStates[:, 4]) available_cash = self.BuyStates[-1][6] + Shares * PriceNow # 更新log NetValue = available_cash self.tradeslog.append([date, 'OutSale', Shares, PriceNow, N, available_cash, NetValue]) # self.trades.append((date, 'Sale', Shares, PriceNow, N, available_cash)) # TrigerTime归0 self.TrigerTime = 0 # self.cash更新 self.cash = available_cash # 回到初始状态 self.BuyStates = [[0, None, None, None, 0, 0, self.cash]] def system2Enter(self, PriceNow, TempDonchian55Upper): """以50日突破为基础的较简单的长线系统 入市:价格超过了前55日的最高价或最低价就建立头寸。 - 如果价格超过55日最高价,买入一个单位建立多头头寸。 - 如果价格跌破55日最低价,卖出一个单位建立空头头寸。 退出: 增加单位:突破时只建立一个单位,建立后以1/2N的间隔增加头寸,以前面指令的实际成交价为基础。实际成交价+1/2N 单个品种,最大4个单位。 以多头编写 """ # # 触发次数 # Trigertime = 0 # 只有没买入过,且 现价超过55日最高价,是一次突破,直接以突破价格买入--没有持仓且价格向上突破 if self.TrigerTime == 0 and PriceNow > TempDonchian55Upper[-1]: # 买入 return True else: return False def system1EnterNormal(self, PriceNow, TempDonchian20Upper, BreakOutLog): # 没有持仓且价格向上突破---此时包含两种情形:1 对某标的首次使用系统,2 已发生过突破,此时上次突破天然是失败的 if self.TrigerTime == 0 and PriceNow > TempDonchian20Upper[-1]: # 买入 return True elif self.TrigerTime != 0 and PriceNow > TempDonchian20Upper[-1]: self.system1BreakoutValid(PriceNow) if BreakOutLog[-1][5] == 'Lose': # TT!= 0且突破且上一次突破unseccessful return True else: return False else: return False def system1EnterSafe(self, PriceNow, TempDonchian55Upper): if PriceNow > TempDonchian55Upper[-1]: # 保底的55日突破 return True else: return False def system1BreakoutValid(self, priceNow): """判断前一次突破是否成功,是log[-1][5]写入“win”,否则写入“Lose” """ if priceNow < self.BreakOutLog[-1][3]: self.BreakOutLog[-1][5] = 'Lose' else: self.BreakOutLog[-1][5] = 'None' pass def system2Out(self, PriceNow, TempDonchian20Lower): # 退出:低于20日最低价(多头方向),空头以突破20日最高价为止损价格--有持仓且价格向下突破 if self.TrigerTime != 0 and PriceNow < TempDonchian20Lower[-1]: # 退出 return True else: return False def system1Out(self, PriceNow, TempDonchian10Lower): # 退出:低于20日最低价(多头方向),空头以突破20日最高价为止损价格--有持仓且价格向下突破 if self.TrigerTime != 0 and PriceNow < TempDonchian10Lower[-1]: # 退出 return True else: return False def system2Add(self, PriceNow): """加仓判断:如果当前价格>上一次买入后的加仓价格则加仓 """ if self.TrigerTime < 4 and PriceNow > self.BuyStates[self.TrigerTime - 1][2]: # 买入 return True else: return False def system2Stop(self, PriceNow): """止损判断:如果当前价格<上一次买入后的止损价格则止损 """ if PriceNow < self.BuyStates[self.TrigerTime - 1][3]: # 买入 return True else: return False def system2CalDonchian(self): # 按照system2的要求计算唐奇安通道上下边界--55日上界,20日下界 Donchian = pd.DataFrame() # 计算最高价和最低价的N日移动平均线 Donchian['Upper'] = self.RecentAlldata['最高'].rolling(55).max() Donchian['Lower'] = self.RecentAlldata['最低'].rolling(20).min() # # 计算中间线 # Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 return Donchian def system1CalDonchian(self): # 按照system2的要求计算唐奇安通道上下边界--55日上界,20日下界 Donchian = pd.DataFrame() # 计算最高价和最低价的N日移动平均线 Donchian['Upper55'] = self.RecentAlldata['最高'].rolling(55).max() Donchian['Upper20'] = self.RecentAlldata['最高'].rolling(20).max() Donchian['Lower'] = self.RecentAlldata['最低'].rolling(10).min() # # 计算中间线 # Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 return Donchian def CalPositionSize(self, N, price): PositionSize = self.riskcoe * self.Capital /(N) # 默认用股票形式了 100 IntPositionSize = int(PositionSize // 100) * 100 return IntPositionSize def TestSys2Function(self): """回测函数,对于某个标的, 准备:获取价格数据,计算ATR,N,头寸单位,唐奇安通道上下边界 运行过程:按时间推进,开始时间-结束时间,按照系统1或2或组合形式,执行买入、加仓、退出、止损 操作记录:形成一个log """ # ------------------准备阶段---------------- # 1、获取标的价格 self.RecentAlldata = self.turtles.GetRecentData() self.RecentAlldata['日期'] = pd.to_datetime(self.RecentAlldata['日期']) self.RecentAlldata.set_index('日期', inplace=True) # 2、计算ATR self.N = self.turtles.CalATR(self.RecentAlldata, 20, None) # self.PositionSize = self.turtles.CalPositionSize(self.riskcoe, self.cash) # 3、计算唐奇安通道上下边界--system2 self.Donchian = self.system2CalDonchian() # 开始、结束日期 RowStart = self.RecentAlldata.index.get_loc(self.StartTime) RowEnd = self.RecentAlldata.index.get_loc(self.EndTime) # 准备迭代使用的数据 TempData = pd.DataFrame() # 存储最高、最低价等信息 TempN = [] TempDonchian55Upper = [] TempDonchian20Lower = [] # ------------------运行阶段---------------- for date, row in self.RecentAlldata[RowStart:RowEnd].iterrows(): # 增加一个迭代更新的数据 TempData = pd.concat([TempData, row.to_frame().T], ignore_index=True) # day <=55,不执行任何操作,跳过,等待 # day < 55,<20刚开始运行,N、Donchian数据从总数据拿, 不用框时间,直接从总数据拿 # if TempData.shape[0] < 20: # 每天更新上下边界 TempDonchian20Lower.append(self.Donchian['Lower'].iloc[RowStart + TempData.shape[0]-2]) # if TempData.shape[0] < 55: TempDonchian55Upper.append(self.Donchian['Upper'].iloc[RowStart + TempData.shape[0]-2]) # 检查是否是新的一周 if self.current_week is None or date.week != self.current_week: self.current_week = date.week # 更新atr、头寸规模 self.ThisWeekN = self.N.iloc[RowStart + TempData.shape[0]-2] TempN.append(self.ThisWeekN) self.ThisWeekPosizionSize = self.CalPositionSize(self.ThisWeekN, TempData.iloc[-1]['收盘']) # 如果空仓,判断当天开盘价是否突破,收盘价是否突破,突破则买入 if self.TrigerTime == 0: if self.system2Enter(row["开盘"], TempDonchian55Upper): self.TestBuyStocks(row["开盘"], date) # 开盘突破后,最高价能否加仓? # 价格差取整是能加仓几手,循环加仓 delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 3: for i in range(delta): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date) elif 3 < delta: for i in range(3): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date) # 最高价突破,买入 elif self.system2Enter(row["最高"], TempDonchian55Upper): self.TestBuyStocks(TempDonchian55Upper[-1]+0.001, date) delta = math.floor((row['最高']-TempDonchian55Upper[-1]) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 3: for i in range(delta): self.TestBuyStocks((TempDonchian55Upper[-1] + (i+1) * 1/2 * self.ThisWeekN +0.001), date) elif 3 < delta: for i in range(3): self.TestBuyStocks((TempDonchian55Upper[-1] + (i+1) * 1/2 * self.ThisWeekN +0.001), date) # 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash" # self.BuyStates = [self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash] elif 1 <= self.TrigerTime < 4: # 加仓1-3次,考虑止损和加仓 # 开盘价加仓 if self.system2Add(row["开盘"]): self.TestBuyStocks(row["开盘"], date) if self.TrigerTime == 4: # 满仓了,不能再加 pass # 开盘加仓后,最高价能否继续加仓? else: delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 4-self.TrigerTime: for i in range(delta): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date) # 最高价加仓 elif self.system2Add(row["最高"]): self.TestBuyStocks(self.BuyStates[self.TrigerTime - 1][2] + 0.001, date) if self.TrigerTime == 4: # 满仓了,不能再加 pass # 开盘加仓后,最高价能否继续加仓? else: delta = math.floor((row['最高']-self.BuyStates[self.TrigerTime - 1][2]) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 4-self.TrigerTime: for i in range(delta): self.TestBuyStocks((self.BuyStates[self.TrigerTime - 1][2] + (i+1) * 1/2 * self.ThisWeekN +0.001), date) # 止损 elif self.system2Stop(row["收盘"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) elif self.system2Stop(row["最低"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) # 止盈 elif self.system2Out(row["收盘"], TempDonchian20Lower): self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date) elif self.system2Out(row["最低"], TempDonchian20Lower): self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date) elif self.TrigerTime == 4: # 满仓 考虑止损和退出 # 止损 if self.system2Stop(row["收盘"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) elif self.system2Stop(row["最低"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) # 止盈 elif self.system2Out(row["收盘"], TempDonchian20Lower): self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date) elif self.system2Out(row["最低"], TempDonchian20Lower): self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date) # 交易日结束,重新计算高低突破点: print("---------------回测结束,回测日志如下----------------") for sublist in self.tradeslog: print(sublist) def TestSys1Function(self): """回测函数,对于某个标的, 准备:获取价格数据,计算ATR,N,头寸单位,唐奇安通道上下边界 运行过程:按时间推进,开始时间-结束时间,按照系统1或2或组合形式,执行买入、加仓、退出、止损 操作记录:形成一个log """ # ------------------准备阶段---------------- # 1、获取标的价格 self.RecentAlldata = self.turtles.GetRecentData() self.RecentAlldata['日期'] = pd.to_datetime(self.RecentAlldata['日期']) self.RecentAlldata.set_index('日期', inplace=True) # 2、计算ATR self.N = self.turtles.CalATR(self.RecentAlldata, 20, None) # self.PositionSize = self.turtles.CalPositionSize(self.riskcoe, self.cash) # 3、计算唐奇安通道上下边界--system2 self.Donchian = self.system1CalDonchian() # 开始、结束日期 RowStart = self.RecentAlldata.index.get_loc(self.StartTime) RowEnd = self.RecentAlldata.index.get_loc(self.EndTime) # 准备迭代使用的数据 TempData = pd.DataFrame() # 存储最高、最低价等信息 TempN = [] TempDonchian55Upper = [] TempDonchian20Upper = [] TempDonchian10Lower = [] self.BreakOutLog = [['date', 'breakout', 'BOprice', 'LosePrice', 'ValidOrNot', 'WinOrLose']] # ------------------运行阶段---------------- for date, row in self.RecentAlldata[RowStart:RowEnd].iterrows(): # 增加一个迭代更新的数据 TempData = pd.concat([TempData, row.to_frame().T], ignore_index=True) # day <=55,不执行任何操作,跳过,等待 # day < 55,<20刚开始运行,N、Donchian数据从总数据拿, 不用框时间,直接从总数据拿 # if TempData.shape[0] < 20: # 每天更新上下边界 TempDonchian10Lower.append(self.Donchian['Lower'].iloc[RowStart + TempData.shape[0]-2]) # if TempData.shape[0] < 55: TempDonchian55Upper.append(self.Donchian['Upper55'].iloc[RowStart + TempData.shape[0]-2]) TempDonchian20Upper.append(self.Donchian['Upper20'].iloc[RowStart + TempData.shape[0]-2]) # 检查是否是新的一周 if self.current_week is None or date.week != self.current_week: self.current_week = date.week # 更新atr、头寸规模 self.ThisWeekN = self.N.iloc[RowStart + TempData.shape[0]-2] TempN.append(self.ThisWeekN) self.ThisWeekPosizionSize = self.CalPositionSize(self.ThisWeekN, TempData.iloc[-1]['收盘']) # 如果空仓,判断当天开盘价是否突破,收盘价是否突破,突破则买入 if self.TrigerTime == 0: if self.system1EnterNormal(row["开盘"], TempDonchian20Upper, self.BreakOutLog): self.TestBuyStocks(row["开盘"], date) # 写入BreakOut状态 self.BreakOutLog.append([date, 'breakout', row["开盘"], row["开盘"] - 2*self.ThisWeekN, 'Valid','None']) # 开盘突破后,最高价能否加仓? # 价格差取整是能加仓几手,循环加仓 delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 3: for i in range(delta): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) elif 3 < delta: for i in range(3): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) elif self.system1EnterSafe(row["开盘"], TempDonchian55Upper): self.TestBuyStocks(row["开盘"], date) # 价格差取整是能加仓几手,循环加仓 delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 3: for i in range(delta): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) elif 3 < delta: for i in range(3): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) # 最高价突破买入 elif self.system1EnterNormal(row["最高"], TempDonchian20Upper, self.BreakOutLog): self.TestBuyStocks(TempDonchian20Upper[-1] + 0.001, date) # 写入BreakOut状态 self.BreakOutLog.append([date, 'breakout', TempDonchian20Upper[-1] + 0.001, TempDonchian20Upper[-1] + 0.001 - 2*self.ThisWeekN, 'Valid','None']) # 价格差取整是能加仓几手,循环加仓 delta = math.floor((row['最高']-TempDonchian20Upper[-1]) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 3: for i in range(delta): self.TestBuyStocks((TempDonchian20Upper[-1] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) elif 3 < delta: for i in range(3): self.TestBuyStocks((TempDonchian20Upper[-1] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) elif self.system1EnterSafe(row["最高"], TempDonchian55Upper): self.TestBuyStocks(TempDonchian55Upper[-1] + 0.001, date) # 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash" # self.BuyStates = [self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash] elif 1 <= self.TrigerTime < 4: # 加仓1-3次,考虑止损和加仓 # 开盘价突破 if self.system1EnterNormal(row["开盘"], TempDonchian20Upper, self.BreakOutLog): self.TestBuyStocks(row["开盘"], date) self.BreakOutLog.append([date, 'breakout', TempDonchian20Upper[-1] + 0.001, TempDonchian20Upper[-1] + 0.001 - 2*self.ThisWeekN, 'Valid','None']) elif self.system1EnterSafe(row["开盘"], TempDonchian55Upper): self.TestBuyStocks(row["开盘"], date) # 开盘价加仓 if self.system2Add(row["开盘"]): self.TestBuyStocks(row["开盘"], date) if self.TrigerTime == 4: # 满仓了,不能再加 pass # 开盘加仓后,最高价能否继续加仓? else: delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 4-self.TrigerTime: for i in range(delta): self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) # 最高价加仓 elif self.system2Add(row["最高"]): self.TestBuyStocks(self.BuyStates[self.TrigerTime - 1][2] + 0.001, date) if self.TrigerTime == 4: # 满仓了,不能再加 pass # 开盘加仓后,最高价能否继续加仓? else: delta = math.floor((row['最高']-self.BuyStates[self.TrigerTime - 1][2]) / (1/2 * self.ThisWeekN)) if 1 <= delta <= 4-self.TrigerTime: for i in range(delta): self.TestBuyStocks((self.BuyStates[self.TrigerTime - 1][2] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date) # 止损 elif self.system2Stop(row["收盘"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) elif self.system2Stop(row["最低"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) # 止盈 elif self.system2Out(row["收盘"], TempDonchian10Lower): self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date) elif self.system2Out(row["最低"], TempDonchian10Lower): self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date) elif self.TrigerTime == 4: # 满仓 考虑止损和退出 # 止损 if self.system2Stop(row["收盘"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) elif self.system2Stop(row["最低"]): self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date) # 止盈 elif self.system2Out(row["收盘"], TempDonchian10Lower): self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date) elif self.system2Out(row["最低"], TempDonchian10Lower): self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date) # 交易日结束,重新计算高低突破点: print("---------------回测结束,回测日志如下----------------") for sublist in self.tradeslog: print(sublist) for sublist in self.BreakOutLog: print(sublist) # nsdk513300 = TurtleTrading(513300) # # 每周更新 # nsdk513300.GetRecentData() # nsdk513300.CalATR(20, True) # nsdk513300.ReadExistData('513300data-N.csv') # nsdk513300.CalPositionSize(0.0025, 100000) # # 每天更新 # nsdk513300.Donchian20 = nsdk513300.calculate_donchian_channel(500, 20) # nsdk513300.Donchian10 = nsdk513300.calculate_donchian_channel(500, 10) # nsdk513300.Donchian55 = nsdk513300.calculate_donchian_channel(500, 55) # nsdk513300.DrawKLine(500) # print(nsdk513300.PositionSize) if __name__ == "__main__": nsdk513300 = TurtleTrading(513300) # nsdk513300test = Trade(nsdk513300, 0.0025, 100000, '2023-1-3', '2024-05-9') nsdk513300test = Trade(nsdk513300, 0.0025, 100000, '2014-1-15', '2025-02-18') nsdk513300test.TestSys2Function() # nsdk513300test.TestSys1Function()