TurtleTrade/回测/TurtleClassNew.py

861 lines
37 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import math
import akshare as ak
import os
from datetime import datetime, timedelta
import pandas as pd
import mplfinance as mpf
def CalTrueFluc(data, day):
H_L = data.iloc[day]['最高'] - data.iloc[day]['最低']
H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘']
PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低']
TrueFluc = np.max([H_L, H_PDC, PDC_L])
print('high', data.iloc[day]['最高'], 'low', data.iloc[day]['最低'], 'TrueRange', TrueFluc)
return TrueFluc
def calc_sma_atr_pd(kdf,period):
"""计算TR与ATR
Args:
kdf (_type_): 历史数据
period (_type_): ATR周期
Returns:
_type_: 返回kdf增加TR与ATR列
"""
kdf['HL'] = kdf['最高'] - kdf['最低']
kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1))
kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1))
kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3)
# ranges = pd.concat([high_low, high_close, low_close], axis=1)
# true_range = np.max(ranges, axis=1)
kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3)
return kdf.drop(['HL','HC','LC'], axis = 1)
# A股数据 东方财富网
# all_data = ak.stock_zh_a_spot_em()
# 基金实时数据 东方财富网
# fund_etf_spot_em_df = ak.fund_etf_spot_em()
# 后复权历史数据
# fund_etf_hist_em_df = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date="20240408", adjust="hfq")
# fund_etf_hist_em_df.to_csv('513300data.csv', index=False)
# data = pd.read_csv('513300data.csv')
# # 一、计算头寸规模
# # 真实波动幅度 = max (H-L, H-pdc, pdc-L)
# today = datetime.today()
# # print(today)
# # print(data.iloc[-1]['成交额'])
# TrueFlucs = []
# Nserious = np.zeros(101)
# last120days = np.arange(-120, -100)
# for i in last120days:
# H_L = data.iloc[i]['最高'] - data.iloc[i]['最低']
# H_PDC = data.iloc[i]['最高'] - data.iloc[i-1]['收盘']
# PDC_L = data.iloc[i-1]['收盘'] - data.iloc[i]['最低']
# TrueFlucs.append(np.max([H_L, H_PDC, PDC_L]))
# # 求简单平均,放入N序列第一个
# Nsimple = np.average(TrueFlucs)
# Nserious[0] = Nsimple
# # 计算-21到-1的N
# last100days = np.arange(-100, 0)
# for i in range(0,100):
# day = last100days[i]
# H_L = data.iloc[day]['最高'] - data.iloc[day]['最低']
# H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘']
# PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低']
# TrueFluc = np.max([H_L, H_PDC, PDC_L])
# Ntemp = (19 * Nserious[i] + TrueFluc)/20
# Nserious[i+1] = Ntemp
# # print(Nserious)
# total_rows = len(data)
# Ndata = np.zeros(total_rows)
# Ndata[total_rows-101:] = Nserious
# # NewColumn = [0]*(total_rows-101) + Nserious
# data['N'] = Ndata
# data.to_csv('513300data-N.csv', index=False)
# pass
# -----------------------更新atr----------------------
"""已有数据与新数据对比补充新的N,同时更新数据库
"""
# Today = datetime.today()
# # print(Today)
# formatted_date = Today.strftime("%Y%m%d")
# # print(formatted_date)
# CurrentData = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date=formatted_date, adjust="hfq")
# CurrentData = calc_sma_atr_pd(CurrentData, 20)
# CurrentData.to_csv('513300data-N.csv', index=False)
# pass
# ------------------计算头寸规模 资金10w, 1%波动------------
# money = 100000
# OldData = pd.read_csv('513300data-N.csv')
# N = OldData.iloc[-1]['ATR']
# # N = 0.473
# Price = OldData.iloc[-1]['收盘']
# # Price = 5.60
# EveryUnit = 0.0025 * money /(N*100*Price)
# print('单位',EveryUnit)
# print(113*100*Price)
class TurtleTrading(object):
"""对象范围较小对某一个标的创建一个海龟如513300
计算ATR、
Position Size
买入、卖出、加仓等行为
Args:
object (_type_): _description_
"""
def __init__(self, TradeCode) -> None:
self.TradeCode = TradeCode
def GetRecentData(self):
Today = datetime.today()
# print(Today)
formatted_date = Today.strftime("%Y%m%d")
# print(formatted_date)
Code = f"{self.TradeCode}"
CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date="20130101", end_date=formatted_date, adjust="")
return CurrentData
def CalATR(self, data, ATRday, SaveOrNot):
"""计算某个标的的ATR从上市日到今天, 计算后的数据保存在self.CurrentData
Args:
ATRday: 多少日ATR
SaveOrNot (_type_): 是否保存.csv数据
"""
self.CurrentData = calc_sma_atr_pd(data, ATRday)
self.N = self.CurrentData['ATR']
if SaveOrNot:
self.CurrentData.to_csv('513300data-N.csv', index=False)
print("csv保存成功")
return self.N
def CalPositionSize(self, RiskCoef, Capital):
"""计算PosizionSize 持有的单位该单位某标的1N波动对应RiskCoef * Capital资金
Args:
RiskCoef (_type_): 风险系数
Capital (_type_): 资金
"""
N = self.CurrentData.iloc[-1]['ATR']
# N = 0.473
Price = self.CurrentData.iloc[-1]['收盘']
# Price = 5.60
self.PositionSize = RiskCoef * Capital /( N*100*Price) # 默认用股票形式了 100
return self.PositionSize
def ReadExistData(self, data):
"""除了通过发请求获取数据也可以读本地的数据库赋值给self.CurrentData
Args:
data (_type_): 本地csv名称
"""
self.CurrentData = pd.read_csv(data)
def DrawKLine(self, days):
"""画出k线图看看,画出最近days天的K线图
"""
# 日期部分
dates = pd.to_datetime(self.CurrentData['日期'][-days:])
# Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期'])
Klinedf = pd.DataFrame()
# Klinedf.set_index = Klinedf['Data']
# 其他数据
Klinedf['Open'] = self.CurrentData['开盘'][-days:]
Klinedf['High'] = self.CurrentData['最高'][-days:]
Klinedf['Low'] = self.CurrentData['最低'][-days:]
Klinedf['Close'] = self.CurrentData['收盘'][-days:]
Klinedf['Volume'] = self.CurrentData['成交量'][-days:]
Klinedf.set_index(dates, inplace=True)
# 画图
mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian[['Upper', 'Lower']])])
def calculate_donchian_channel(self, days, n):
"""
计算唐奇安通道days一共多少日 n多少日唐奇安
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame至少包含"High""Low"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper", "Lower", 和 "Middle"
"""
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
Donchian['Upper'] = self.CurrentData['最高'][-days:].rolling(n).max()
Donchian['Lower'] = self.CurrentData['最低'][-days:].rolling(n).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
class Trade(object):
"""具有以下功能:
接收Turtle Class作为输入
1 数据准备:
a 获取数据
b 计算atr
c 计算55日、20日、10日Donchian
2 总持有单位判断
3 定义系统2 超过55日xxx分段加仓
4 回测功能
5 实时功能 问题实时功能如何同时监控几个item
78
Args:
object (_type_): _description_
"""
def __init__(self, turtles, riskcoe, cash, StartTime, EndTime) -> None:
"""接收所有的turtles
Args:
turtles (_type_): _description_
"""
self.turtles = turtles
self.riskcoe = riskcoe
self.cash = cash
self.Capital = cash
self.StartTime = StartTime
self.EndTime = EndTime
self.TrigerTime = 0
# self.BuyStates = {
# "trigger_count": 0, # 触发次数
# "BuyPrice": None, # 买入/持有价格
# "StopPrice":None, # 止损价格
# "quantity": 0, # 持有份数
# "N": 0, # ATR
# "available_cash": self.cash # 可用资金
# }
# 0"trigger_count", 1"BuyPrice", 2"StopPrice", 3"quantity", 4"N", 5"available_cash"
self.BuyStates = [[0, None, None, 0, 0, self.cash]]
self.tradeslog = [] # 交易记录
self.current_week = None # 当前周数
# def TurtleDataPre(self):
# for turtle in self.turtles:
# turtle.CalATR(20, True)
# turtle.Donchian20 = turtle.calculate_donchian_channel(500, 20)
# turtle.Donchian10 = turtle.calculate_donchian_channel(500, 10)
# turtle.Donchian55 = turtle.calculate_donchian_channel(500, 55)
# pass
def PortfolioPositon(self):
"""总共能持有多少个单位
"""
pass
def TestBuyStocks(self, PriceNow, date):
# 回测中的买入函数 如果开盘价大于55日最高价大于四份价格执行加满
# 返回self.BuyStates
# 实盘中应该是触发买入信号发送买入邮件价格份数。当前Turtle程序暂停收到邮件返回更新买入价格份数
# 更新BuyStates:"触发次数"、"买入/持有价格"、"持有份数"、"N"、"可用资金"
N = self.ThisWeekN
Shares = self.ThisWeekPosizionSize
# 更新 BuyStates
if self.TrigerTime == 0: # 第一次买入是直接修改
# if self.BuyStates[0] == 0:
self.TrigerTime += 1
BuyPrice = PriceNow
AddPrice = PriceNow + 1/2 * N
StopPrice = PriceNow - 2 * N
available_cash = self.cash - Shares * BuyPrice
# 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash"
self.BuyStates = [[self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]]
# 更新log
#
AllShares = sum(row[4] for row in self.BuyStates)
NetValue = available_cash + AllShares * BuyPrice
self.tradeslog.append([date, 'Buy', Shares, PriceNow, N, available_cash, NetValue])
else: # 加仓的操作在BuyStates后边追加
self.TrigerTime += 1
BuyPrice = PriceNow
AddPrice = PriceNow + 1/2 * N
StopPrice = PriceNow - 2 * N
available_cash = self.BuyStates[self.TrigerTime-2][6] - Shares * BuyPrice
self.BuyStates.append([self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash])
# 更新log
AllShares = sum(row[4] for row in self.BuyStates)
NetValue = available_cash + AllShares * BuyPrice
self.tradeslog.append([date, 'Buy', Shares, PriceNow, N, available_cash, NetValue])
pass
def TestStopSaleStocks(self, PriceNow, date):
# 回测中的卖出函数,仓位全卖
N = self.ThisWeekN
# Shares应该等于所有持仓的和
Shares = sum(row[4] for row in self.BuyStates)
# Shares = sum(self.BuyStates[:, 4])
available_cash = self.BuyStates[-1][6] + Shares * PriceNow
# 更新log
NetValue = available_cash
self.tradeslog.append([date, 'StopSale', Shares, PriceNow, N, available_cash, NetValue])
# self.trades.append((date, 'Sale', Shares, PriceNow, N, available_cash))
# TrigerTime归0
self.TrigerTime = 0
# self.cash更新
self.cash = available_cash
# 回到初始状态
self.BuyStates = [[0, None, None, None, 0, 0, self.cash]]
def TestOutSaleStocks(self, PriceNow, date):
# 回测中的卖出函数,仓位全卖
N = self.ThisWeekN
# Shares应该等于所有持仓的和
Shares = sum(row[4] for row in self.BuyStates)
# Shares = sum(self.BuyStates[:, 4])
available_cash = self.BuyStates[-1][6] + Shares * PriceNow
# 更新log
NetValue = available_cash
self.tradeslog.append([date, 'OutSale', Shares, PriceNow, N, available_cash, NetValue])
# self.trades.append((date, 'Sale', Shares, PriceNow, N, available_cash))
# TrigerTime归0
self.TrigerTime = 0
# self.cash更新
self.cash = available_cash
# 回到初始状态
self.BuyStates = [[0, None, None, None, 0, 0, self.cash]]
def system2Enter(self, PriceNow, TempDonchian55Upper):
"""以50日突破为基础的较简单的长线系统
入市价格超过了前55日的最高价或最低价就建立头寸。
- 如果价格超过55日最高价买入一个单位建立多头头寸。
- 如果价格跌破55日最低价卖出一个单位建立空头头寸。
退出:
增加单位突破时只建立一个单位建立后以1/2N的间隔增加头寸以前面指令的实际成交价为基础。实际成交价+1/2N
单个品种最大4个单位。
以多头编写
"""
# # 触发次数
# Trigertime = 0
# 只有没买入过,且 现价超过55日最高价是一次突破直接以突破价格买入--没有持仓且价格向上突破
if self.TrigerTime == 0 and PriceNow > TempDonchian55Upper[-1]:
# 买入
return True
else:
return False
def system1EnterNormal(self, PriceNow, TempDonchian20Upper, BreakOutLog):
# 没有持仓且价格向上突破---此时包含两种情形1 对某标的首次使用系统2 已发生过突破,此时上次突破天然是失败的
if self.TrigerTime == 0 and PriceNow > TempDonchian20Upper[-1]:
# 买入
return True
elif self.TrigerTime != 0 and PriceNow > TempDonchian20Upper[-1]:
self.system1BreakoutValid(PriceNow)
if BreakOutLog[-1][5] == 'Lose': # TrigerTime != 0且突破且上一次突破unseccessful
return True
else:
return False
else:
return False
def system1EnterSafe(self, PriceNow, TempDonchian55Upper):
if PriceNow > TempDonchian55Upper[-1]: # 保底的55日突破
return True
else:
return False
def system1BreakoutValid(self, priceNow):
"""判断前一次突破是否成功是log[-1][5]写入“win”否则写入“Lose”
"""
if priceNow < self.BreakOutLog[-1][3]:
self.BreakOutLog[-1][5] = 'Lose'
else:
self.BreakOutLog[-1][5] = 'None'
pass
def system2Out(self, PriceNow, TempDonchian20Lower):
# 退出:低于20日最低价多头方向,空头以突破20日最高价为止损价格--有持仓且价格向下突破
if self.TrigerTime != 0 and PriceNow < TempDonchian20Lower[-1]:
# 退出
return True
else:
return False
def system1Out(self, PriceNow, TempDonchian10Lower):
# 退出:低于20日最低价多头方向,空头以突破20日最高价为止损价格--有持仓且价格向下突破
if self.TrigerTime != 0 and PriceNow < TempDonchian10Lower[-1]:
# 退出
return True
else:
return False
def system2Add(self, PriceNow):
"""加仓判断:如果当前价格>上一次买入后的加仓价格则加仓
"""
if self.TrigerTime < 4 and PriceNow > self.BuyStates[self.TrigerTime - 1][2]:
# 买入
return True
else:
return False
def system2Stop(self, PriceNow):
"""止损判断:如果当前价格<上一次买入后的止损价格则止损
"""
if PriceNow < self.BuyStates[self.TrigerTime - 1][3]:
# 买入
return True
else:
return False
def system2CalDonchian(self):
# 按照system2的要求计算唐奇安通道上下边界--55日上界20日下界
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
Donchian['Upper'] = self.RecentAlldata['最高'].rolling(55).max()
Donchian['Lower'] = self.RecentAlldata['最低'].rolling(20).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
def system1CalDonchian(self):
# 按照system2的要求计算唐奇安通道上下边界--55日上界20日下界
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
Donchian['Upper55'] = self.RecentAlldata['最高'].rolling(55).max()
Donchian['Upper20'] = self.RecentAlldata['最高'].rolling(20).max()
Donchian['Lower'] = self.RecentAlldata['最低'].rolling(10).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
def CalPositionSize(self, N, price):
PositionSize = self.riskcoe * self.Capital /(N) # 默认用股票形式了 100
IntPositionSize = int(PositionSize // 100) * 100
return IntPositionSize
def TestSys2Function(self):
"""回测函数,对于某个标的,
准备:获取价格数据计算ATR,N,头寸单位,唐奇安通道上下边界
运行过程:按时间推进,开始时间-结束时间按照系统1或2或组合形式执行买入、加仓、退出、止损
操作记录:形成一个log
"""
# ------------------准备阶段----------------
# 1、获取标的价格
self.RecentAlldata = self.turtles.GetRecentData()
self.RecentAlldata['日期'] = pd.to_datetime(self.RecentAlldata['日期'])
self.RecentAlldata.set_index('日期', inplace=True)
# 2、计算ATR
self.N = self.turtles.CalATR(self.RecentAlldata, 20, None)
# self.PositionSize = self.turtles.CalPositionSize(self.riskcoe, self.cash)
# 3、计算唐奇安通道上下边界--system2
self.Donchian = self.system2CalDonchian()
# 开始、结束日期
RowStart = self.RecentAlldata.index.get_loc(self.StartTime)
RowEnd = self.RecentAlldata.index.get_loc(self.EndTime)
# 准备迭代使用的数据
TempData = pd.DataFrame() # 存储最高、最低价等信息
TempN = []
TempDonchian55Upper = []
TempDonchian20Lower = []
# ------------------运行阶段----------------
for date, row in self.RecentAlldata[RowStart:RowEnd].iterrows():
# 增加一个迭代更新的数据
TempData = pd.concat([TempData, row.to_frame().T], ignore_index=True)
# day <=55不执行任何操作跳过,等待
# day < 55,<20刚开始运行N、Donchian数据从总数据拿, 不用框时间,直接从总数据拿
# if TempData.shape[0] < 20:
# 每天更新上下边界
TempDonchian20Lower.append(self.Donchian['Lower'].iloc[RowStart + TempData.shape[0]-2])
# if TempData.shape[0] < 55:
TempDonchian55Upper.append(self.Donchian['Upper'].iloc[RowStart + TempData.shape[0]-2])
# 检查是否是新的一周
if self.current_week is None or date.week != self.current_week:
self.current_week = date.week
# 更新atr、头寸规模
self.ThisWeekN = self.N.iloc[RowStart + TempData.shape[0]-2]
TempN.append(self.ThisWeekN)
self.ThisWeekPosizionSize = self.CalPositionSize(self.ThisWeekN, TempData.iloc[-1]['收盘'])
# 如果空仓,判断当天开盘价是否突破,收盘价是否突破,突破则买入
if self.TrigerTime == 0:
if self.system2Enter(row["开盘"], TempDonchian55Upper):
self.TestBuyStocks(row["开盘"], date)
# 开盘突破后,最高价能否加仓?
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 最高价突破,买入
elif self.system2Enter(row["最高"], TempDonchian55Upper):
self.TestBuyStocks(TempDonchian55Upper[-1]+0.001, date)
delta = math.floor((row['最高']-TempDonchian55Upper[-1]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((TempDonchian55Upper[-1] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((TempDonchian55Upper[-1] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash"
# self.BuyStates = [self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]
elif 1 <= self.TrigerTime < 4: # 加仓1-3次考虑止损和加仓
# 开盘价加仓
if self.system2Add(row["开盘"]):
self.TestBuyStocks(row["开盘"], date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 最高价加仓
elif self.system2Add(row["最高"]):
self.TestBuyStocks(self.BuyStates[self.TrigerTime - 1][2] + 0.001, date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-self.BuyStates[self.TrigerTime - 1][2]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((self.BuyStates[self.TrigerTime - 1][2] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 止损
elif self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
elif self.TrigerTime == 4: # 满仓 考虑止损和退出
# 止损
if self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
# 交易日结束,重新计算高低突破点:
print("---------------回测结束,回测日志如下----------------")
for sublist in self.tradeslog:
print(sublist)
def TestSys1Function(self):
"""回测函数,对于某个标的,
准备:获取价格数据计算ATR,N,头寸单位,唐奇安通道上下边界
运行过程:按时间推进,开始时间-结束时间按照系统1或2或组合形式执行买入、加仓、退出、止损
操作记录:形成一个log
"""
# ------------------准备阶段----------------
# 1、获取标的价格
self.RecentAlldata = self.turtles.GetRecentData()
self.RecentAlldata['日期'] = pd.to_datetime(self.RecentAlldata['日期'])
self.RecentAlldata.set_index('日期', inplace=True)
# 2、计算ATR
self.N = self.turtles.CalATR(self.RecentAlldata, 20, None)
# self.PositionSize = self.turtles.CalPositionSize(self.riskcoe, self.cash)
# 3、计算唐奇安通道上下边界--system2
self.Donchian = self.system1CalDonchian()
# 开始、结束日期
RowStart = self.RecentAlldata.index.get_loc(self.StartTime)
RowEnd = self.RecentAlldata.index.get_loc(self.EndTime)
# 准备迭代使用的数据
TempData = pd.DataFrame() # 存储最高、最低价等信息
TempN = []
TempDonchian55Upper = []
TempDonchian20Upper = []
TempDonchian10Lower = []
self.BreakOutLog = [['date', 'breakout', 'BOprice', 'LosePrice', 'ValidOrNot', 'WinOrLose']]
# ------------------运行阶段----------------
for date, row in self.RecentAlldata[RowStart:RowEnd].iterrows():
# 增加一个迭代更新的数据
TempData = pd.concat([TempData, row.to_frame().T], ignore_index=True)
# day <=55不执行任何操作跳过,等待
# day < 55,<20刚开始运行N、Donchian数据从总数据拿, 不用框时间,直接从总数据拿
# if TempData.shape[0] < 20:
# 每天更新上下边界
TempDonchian10Lower.append(self.Donchian['Lower'].iloc[RowStart + TempData.shape[0]-2])
# if TempData.shape[0] < 55:
TempDonchian55Upper.append(self.Donchian['Upper55'].iloc[RowStart + TempData.shape[0]-2])
TempDonchian20Upper.append(self.Donchian['Upper20'].iloc[RowStart + TempData.shape[0]-2])
# 检查是否是新的一周
if self.current_week is None or date.week != self.current_week:
self.current_week = date.week
# 更新atr、头寸规模
self.ThisWeekN = self.N.iloc[RowStart + TempData.shape[0]-2]
TempN.append(self.ThisWeekN)
self.ThisWeekPosizionSize = self.CalPositionSize(self.ThisWeekN, TempData.iloc[-1]['收盘'])
# 如果空仓,判断当天开盘价是否突破,收盘价是否突破,突破则买入
if self.TrigerTime == 0:
if self.system1EnterNormal(row["开盘"], TempDonchian20Upper, self.BreakOutLog):
self.TestBuyStocks(row["开盘"], date)
# 写入BreakOut状态
self.BreakOutLog.append([date, 'breakout', row["开盘"], row["开盘"] - 2*self.ThisWeekN, 'Valid','None'])
# 开盘突破后,最高价能否加仓?
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif self.system1EnterSafe(row["开盘"], TempDonchian55Upper):
self.TestBuyStocks(row["开盘"], date)
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
# 最高价突破买入
elif self.system1EnterNormal(row["最高"], TempDonchian20Upper, self.BreakOutLog):
self.TestBuyStocks(TempDonchian20Upper[-1] + 0.001, date)
# 写入BreakOut状态
self.BreakOutLog.append([date, 'breakout', TempDonchian20Upper[-1] + 0.001, TempDonchian20Upper[-1] + 0.001 - 2*self.ThisWeekN, 'Valid','None'])
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-TempDonchian20Upper[-1]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((TempDonchian20Upper[-1] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((TempDonchian20Upper[-1] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif self.system1EnterSafe(row["最高"], TempDonchian55Upper):
self.TestBuyStocks(TempDonchian55Upper[-1] + 0.001, date)
# 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash"
# self.BuyStates = [self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]
elif 1 <= self.TrigerTime < 4: # 加仓1-3次考虑止损和加仓
# 开盘价突破
if self.system1EnterNormal(row["开盘"], TempDonchian20Upper, self.BreakOutLog):
self.TestBuyStocks(row["开盘"], date)
self.BreakOutLog.append([date, 'breakout', TempDonchian20Upper[-1] + 0.001, TempDonchian20Upper[-1] + 0.001 - 2*self.ThisWeekN, 'Valid','None'])
elif self.system1EnterSafe(row["开盘"], TempDonchian55Upper):
self.TestBuyStocks(row["开盘"], date)
# 开盘价加仓
if self.system2Add(row["开盘"]):
self.TestBuyStocks(row["开盘"], date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
# 最高价加仓
elif self.system2Add(row["最高"]):
self.TestBuyStocks(self.BuyStates[self.TrigerTime - 1][2] + 0.001, date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-self.BuyStates[self.TrigerTime - 1][2]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((self.BuyStates[self.TrigerTime - 1][2] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
# 止损
elif self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
elif self.TrigerTime == 4: # 满仓 考虑止损和退出
# 止损
if self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
# 交易日结束,重新计算高低突破点:
print("---------------回测结束,回测日志如下----------------")
for sublist in self.tradeslog:
print(sublist)
for sublist in self.BreakOutLog:
print(sublist)
# nsdk513300 = TurtleTrading(513300)
# # 每周更新
# nsdk513300.GetRecentData()
# nsdk513300.CalATR(20, True)
# nsdk513300.ReadExistData('513300data-N.csv')
# nsdk513300.CalPositionSize(0.0025, 100000)
# # 每天更新
# nsdk513300.Donchian20 = nsdk513300.calculate_donchian_channel(500, 20)
# nsdk513300.Donchian10 = nsdk513300.calculate_donchian_channel(500, 10)
# nsdk513300.Donchian55 = nsdk513300.calculate_donchian_channel(500, 55)
# nsdk513300.DrawKLine(500)
# print(nsdk513300.PositionSize)
if __name__ == "__main__":
nsdk513300 = TurtleTrading(513300)
# nsdk513300test = Trade(nsdk513300, 0.0025, 100000, '2023-1-3', '2024-05-9')
nsdk513300test = Trade(nsdk513300, 0.0025, 100000, '2014-1-15', '2025-02-18')
nsdk513300test.TestSys2Function()
# nsdk513300test.TestSys1Function()