基础基本上写完了,开始写突破逻辑

This commit is contained in:
gyz 2025-04-03 17:26:52 +08:00
parent f0814c7a27
commit e36c9baa0e
4 changed files with 142 additions and 37 deletions

View File

@ -39,8 +39,16 @@ class TurtleTrading(object):
Args: Args:
object (_type_): _description_ object (_type_): _description_
""" """
def __init__(self, TradeCode) -> None: def __init__(self, TradeCode, type, riskcoe, Capital, cash) -> None:
self.TradeCode = TradeCode self.TradeCode = TradeCode
self.type = type
self.riskcoe = riskcoe
self.Capital = Capital
self.cash = cash
self.TrigerTime = 0
self.BuyStates = [[0, None, None, 0, 0, self.cash]]
self.tradeslog = [] # 交易记录
def GetRecentData(self): def GetRecentData(self):
"""获取某个标的的最近数据,从两年前到今天, 计算后的数据保存在self.CurrentData """获取某个标的的最近数据,从两年前到今天, 计算后的数据保存在self.CurrentData
@ -131,7 +139,8 @@ class TurtleTrading(object):
""" """
Donchian = pd.DataFrame() # 创建一个空的DataFrame用于存储唐奇安通道数据 Donchian = pd.DataFrame() # 创建一个空的DataFrame用于存储唐奇安通道数据
# 计算最高价和最低价的N日移动平均线 # 计算最高价和最低价的N日移动平均线
Donchian['Upper'] = self.CurrentData['最高'].rolling(n).max() # 使用rolling函数计算n日最高价的移动最大值 name = 'Donchian_' + str(n) + '_upper'
Donchian[name] = self.CurrentData['最高'].rolling(n).max() # 使用rolling函数计算n日最高价的移动最大值
# # 计算中间线 # # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 # 计算上通道和下通道的中间线,但此行代码被注释掉了 # Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 # 计算上通道和下通道的中间线,但此行代码被注释掉了
@ -151,13 +160,29 @@ class TurtleTrading(object):
""" """
Donchian = pd.DataFrame() Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线 # 计算最高价和最低价的N日移动平均线
Donchian['lower'] = self.CurrentData['最低'].rolling(n).min() name = 'Donchian_' + str(n) + '_lower'
Donchian[name] = self.CurrentData['最低'].rolling(n).min()
# # 计算中间线 # # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 # Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian return Donchian
def calc_atr_donchian_short(self):
"""计算ATR、短期唐奇安通道
"""
# 计算ATR
self.CalATR(self.CurrentData, 20)
# 计算唐奇安通道
self.Donchian_20_ups = self.calculate_donchian_channel_up(20)
self.Donchian_50_ups = self.calculate_donchian_channel_up(50)
self.Donchian_downs = self.calculate_donchian_channel_down(10)
# 画图
# self.DrawKLine(days)
# 把self.N, self.Donchian_up, self.Donchian_down, 添加到self.CurrentData后面保存到mysql数据库
self.CurrentData = pd.concat([self.CurrentData, self.Donchian_20_ups, self.Donchian_50_ups, self.Donchian_downs], axis=1)
def get_ready(self, days): def get_ready(self, days):
"""创建一个turtle对象获取数据计算ATR计算唐奇安通道 """创建一个turtle对象获取数据计算ATR计算唐奇安通道
@ -165,37 +190,82 @@ class TurtleTrading(object):
days (_type_): _description_ days (_type_): _description_
n (_type_): _description_ n (_type_): _description_
""" """
# 检查mysql数据库中是否存在该股票的数据 或者数据库最后一条的时间距离今天是否两天以上
current_date = date.today()
threshold_date = current_date - timedelta(days=2)
last_update = mysql_database.check_db_table_last_date(f"{self.TradeCode}")
if not mysql_database.check_db_table(f"{self.TradeCode}") or last_update < threshold_date: # if 不存在database
# 如果不存在则从akshare获取数据并保存到mysql数据库 if not mysql_database.check_db_table(f"{self.TradeCode}"):
if mysql_database.check_db_table(f"{self.TradeCode}") and last_update < threshold_date:
mysql_database.delete_table(f"{self.TradeCode}")
self.GetRecentData() self.GetRecentData()
self.calc_atr_donchian_short()
Code = f"{self.TradeCode}"
mysql_database.insert_db(self.CurrentData, Code, True, "日期")
else: else:
# 如果存在则从mysql数据库中读取数据
self.CurrentData = mysql_database.fetch_all_data(f"{self.TradeCode}")
# 计算ATR
self.CalATR(self.CurrentData, 20)
# 计算唐奇安通道
self.Donchian_up = self.calculate_donchian_channel_up(20)
self.Donchian_down = self.calculate_donchian_channel_down(10)
# 画图
# self.DrawKLine(days)
# 把self.N, self.Donchian_up, self.Donchian_down, 添加到self.CurrentData后面保存到mysql数据库
self.CurrentData = pd.concat([self.CurrentData, self.Donchian_up, self.Donchian_down], axis=1)
Code = f"{self.TradeCode}" # 检查数据库最后一条的时间距离今天是否两天以上
mysql_database.insert_db(self.CurrentData, Code, True, "日期") current_date = date.today()
threshold_date = current_date - timedelta(days=2)
last_update = mysql_database.check_db_table_last_date(f"{self.TradeCode}")
if last_update < threshold_date:
# 如果不存在则从akshare获取数据并保存到mysql数据库
mysql_database.delete_table(f"{self.TradeCode}")
self.GetRecentData()
self.calc_atr_donchian_short()
Code = f"{self.TradeCode}"
mysql_database.insert_db(self.CurrentData, Code, True, "日期")
else:
# 如果存在则从mysql数据库中读取数据
self.CurrentData = mysql_database.fetch_all_data(f"{self.TradeCode}")
def CalPositionSize(self):
"""根据风险系数、ATR计算仓位大小, 存于self.IntPositionSize
"""
PositionSize = self.riskcoe * self.Capital /(self.N) # 默认用股票形式了 100
self.IntPositionSize = int(PositionSize // 100) * 100
def system1EnterNormal(self, PriceNow, TempDonchian20Upper, BreakOutLog):
# 没有持仓且价格向上突破---此时包含两种情形1 对某标的首次使用系统2 已发生过突破,此时上次突破天然是失败的
if self.TrigerTime == 0 and PriceNow > TempDonchian20Upper[-1]:
# 买入
return True
elif self.TrigerTime != 0 and PriceNow > TempDonchian20Upper[-1]:
self.system1BreakoutValid(PriceNow)
if BreakOutLog[-1][5] == 'Lose': # TT!= 0且突破且上一次突破unseccessful
return True
else:
return False
else:
return False
def system1EnterSafe(self, PriceNow, TempDonchian55Upper):
if PriceNow > TempDonchian55Upper[-1]: # 保底的55日突破
return True
else:
return False
def system1BreakoutValid(self, priceNow):
"""判断前一次突破是否成功是log[-1][5]写入“win”否则写入“Lose”
"""
if priceNow < self.BreakOutLog[-1][3]:
self.BreakOutLog[-1][5] = 'Lose'
else:
self.BreakOutLog[-1][5] = 'None'
# 一天结束计算ATR计算唐奇安通道追加到已有的mysql数据库中 # 一天结束计算ATR计算唐奇安通道追加到已有的mysql数据库中
def system1Out(self, PriceNow, TempDonchian10Lower):
# 退出:低于20日最低价多头方向,空头以突破20日最高价为止损价格--有持仓且价格向下突破
if self.TrigerTime != 0 and PriceNow < TempDonchian10Lower[-1]:
# 退出
return True
else:
return False
def day_end(self): def day_end(self):
pass pass
@ -207,13 +277,14 @@ class TurtleTrading_OnTime(object):
3实时监测主流程 3实时监测主流程
''' '''
def __init__(self): def __init__(self, turtle: TurtleTrading):
pass self.turtle = turtle
def get_stocks_data(self): def get_stocks_data(self):
"""获取实时股票、基金数据,不保存 """获取实时股票、基金数据,不保存
""" """
stock_zh_a_spot_df = ak.stock_zh_a_spot_em() stock_data = ak.stock_zh_a_spot_em()
stock_zh_a_spot_df = stock_zh_a_spot_df.dropna(subset=['最新价']) stock_data = stock_data.dropna(subset=['最新价'])
# # print(stock_zh_a_spot_df) # # print(stock_zh_a_spot_df)
# # stock_zh_a_spot_df第一列加上时间精确到分钟 # # stock_zh_a_spot_df第一列加上时间精确到分钟
# stock_zh_a_spot_df['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # stock_zh_a_spot_df['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@ -224,14 +295,43 @@ class TurtleTrading_OnTime(object):
etf_data = etf_data.dropna(subset=['当前-单位净值']) etf_data = etf_data.dropna(subset=['当前-单位净值'])
# etf_data['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # etf_data['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# mysql_database.insert_db(etf_data, "etf_price", True, "代码") # mysql_database.insert_db(etf_data, "etf_price", True, "代码")
return stock_data, etf_data
def Start_S1_system(self):
"""启动S1系统
"""
# ------------------准备阶段--------------------
# 获取数据或读取数据 -- 计算ATR Donchian 20 50 up, 20 down
self.turtle.get_ready(100)
self.turtle.N = self.turtle.CurrentData['ATR'].iloc[-1]
self.turtle.Donchian_20_up = self.turtle.CurrentData['Donchian_20_upper'].iloc[-1]
self.turtle.Donchian_50_up = self.turtle.CurrentData['Donchian_50_upper'].iloc[-1]
self.turtle.Donchian_10_down = self.turtle.CurrentData['Donchian_10_lower'].iloc[-1]
# ------------------实时监测阶段--------------------
# 9:00 1、判断是否是新的一周是则重新计算Position Size
# 判断是否是新的一周
if datetime.now().weekday() == 0:
self.turtle.CalPositionSize()
# 每分钟获取一次数据,判断是否触发条件 9:30-11:30 13:00-15:00
stock_data, etf_data = self.get_stocks_data()
# 根据typecode, 取得实时价格self.turtle.PriceNow
# ------------------结束阶段--------------------
# 数据库更新当天数据增加ATR、donchian数据
pass
if __name__ == '__main__': if __name__ == '__main__':
# t = TurtleTrading('513300') t = TurtleTrading('513300', "etf", 0.25, 100000, 200000)
# t.get_ready(100) # t.get_ready(100)
a = TurtleTrading_OnTime() a = TurtleTrading_OnTime(t)
a.get_stocks_data() a.Start_S1_system()
# # 全是股票 # # 全是股票
# stock_zh_a_spot_df = ak.stock_zh_a_spot_em() # stock_zh_a_spot_df = ak.stock_zh_a_spot_em()

Binary file not shown.

Binary file not shown.

View File

@ -22,7 +22,7 @@ MYSQL_HOST = os.environ.get('MYSQL_HOST') if (os.environ.get('MYSQL_HOST') != No
MYSQL_USER = os.environ.get('MYSQL_USER') if (os.environ.get('MYSQL_USER') != None) else "root" MYSQL_USER = os.environ.get('MYSQL_USER') if (os.environ.get('MYSQL_USER') != None) else "root"
MYSQL_PWD = os.environ.get('MYSQL_PWD') if (os.environ.get('MYSQL_PWD') != None) else "1212" MYSQL_PWD = os.environ.get('MYSQL_PWD') if (os.environ.get('MYSQL_PWD') != None) else "1212"
MYSQL_DB = os.environ.get('MYSQL_DB') if (os.environ.get('MYSQL_DB') != None) else "stock_data" MYSQL_DB = os.environ.get('MYSQL_DB') if (os.environ.get('MYSQL_DB') != None) else "stock_data"
MYSQL_PORT = os.environ.get('MYSQL_PORT') if (os.environ.get('MYSQL_PORT') != None) else "3306" MYSQL_PORT = os.environ.get('MYSQL_PORT') if (os.environ.get('MYSQL_PORT') != None) else "3307"
print("MYSQL_HOST :", MYSQL_HOST, ",MYSQL_USER :", MYSQL_USER, ",MYSQL_DB :", MYSQL_DB) print("MYSQL_HOST :", MYSQL_HOST, ",MYSQL_USER :", MYSQL_USER, ",MYSQL_DB :", MYSQL_DB)
MYSQL_CONN_URL = "mysql+mysqldb://" + MYSQL_USER + ":" + MYSQL_PWD + "@" + MYSQL_HOST + ":" + MYSQL_PORT + "/" + MYSQL_DB + "?charset=utf8mb4" MYSQL_CONN_URL = "mysql+mysqldb://" + MYSQL_USER + ":" + MYSQL_PWD + "@" + MYSQL_HOST + ":" + MYSQL_PORT + "/" + MYSQL_DB + "?charset=utf8mb4"
@ -45,7 +45,12 @@ def engine_to_db(to_db):
# 通过数据库链接 engine。 # 通过数据库链接 engine。
def conn(): def conn():
try: try:
db = MySQLdb.connect(MYSQL_HOST, MYSQL_USER, MYSQL_PWD, MYSQL_DB, charset="utf8") db = MySQLdb.connect(host=MYSQL_HOST,
user=MYSQL_USER,
passwd=MYSQL_PWD,
db=MYSQL_DB,
port=int(MYSQL_PORT), # 确保转换为整数
charset="utf8")
# db.autocommit = True # db.autocommit = True
except Exception as e: except Exception as e:
print("conn error :", e) print("conn error :", e)