TurtleTrade/TurtleOnTime.py

680 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import math
import akshare as ak
import os
from datetime import datetime, timedelta, date
import pandas as pd
import mplfinance as mpf
import sqlite3
import stock_database
import mysql_database
from EmailTest import send_email, parse_return_email
from dataclasses import dataclass
import time
@dataclass
class BuyState:
trigger_time: float # 触发次数
buy_price: float # 买入价格
add_price: float # 加仓价格
stop_price: float # 止损价格
shares: int # 买入股数
atr: int # ATR
available_cash: float # 可用资金
@dataclass
class TradeLog:
data: str # 时间
type: str # 操作类型
buy_price: float # 买入价格
shares: int # 买入股数
cost: float # 成本
atr: int # ATR
available_cash: float # 可用资金
all_shares: float # 总股数
all_cost: float # 总成本
Net_value: float # 净值
Net_return: float # 净收益
def calc_sma_atr_pd(kdf,period):
"""计算TR与ATR
Args:
kdf (_type_): 历史数据
period (_type_): ATR周期
Returns:
_type_: 返回kdf增加TR与ATR列
"""
kdf['最高'] = kdf['最高'].astype(float)
kdf['最低'] = kdf['最低'].astype(float)
kdf['收盘'] = kdf['收盘'].astype(float)
kdf['HL'] = kdf['最高'] - kdf['最低']
kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1))
kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1))
kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3)
# ranges = pd.concat([high_low, high_close, low_close], axis=1)
# true_range = np.max(ranges, axis=1)
kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3)
return kdf.drop(['HL','HC','LC'], axis = 1)
class TurtleTrading(object):
"""对象范围较小对某一个标的创建一个海龟如513300
计算ATR、唐奇安通道线
基础数据
Args:
object (_type_): _description_
"""
def __init__(self, TradeCode, type, riskcoe, Capital, cash) -> None:
self.TradeCode = TradeCode
self.type = type
self.riskcoe = riskcoe
self.Capital = Capital
self.cash = cash
self.TrigerTime = 0
self.BuyStates = list[BuyState]
self.tradeslog = list[TradeLog] # 交易记录
def GetRecentData(self):
"""获取某个标的的最近数据,从两年前到今天, 计算后的数据保存在self.CurrentData
Returns:
_type_: _description_
"""
Today = datetime.today()
# print(Today)
formatted_date = Today.strftime("%Y%m%d")
two_years_ago = (date.today() - timedelta(days=365*2)).strftime("%Y%m%d")
# print(formatted_date)
Code = f"{self.TradeCode}"
CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date=two_years_ago, end_date=formatted_date, adjust="")
# 将日期列转换为datetime
CurrentData = pd.DataFrame(CurrentData)
CurrentData['日期'] = pd.to_datetime(CurrentData['日期'])
# print(type(CurrentData['日期'].iloc[0]))
CurrentData.set_index('日期', inplace=True)
# CurrentData.reset_index(inplace=True)
# print(type(CurrentData['日期'].iloc[0]))
# create table
# stock_database.create_table(Code)
# stock_database.insert_data(Code, CurrentData)
# mysql_database.insert_db(CurrentData, Code, True, "'日期'")
self.CurrentData = CurrentData
# return self.CurrentData
def CalATR(self, data, ATRday):
"""计算某个标的的ATR从上市日到今天, 计算后的数据保存在self.CurrentData
Args:
ATRday: 多少日ATR
SaveOrNot (_type_): 是否保存.csv数据
"""
self.CurrentData = calc_sma_atr_pd(data, ATRday)
self.N = self.CurrentData['ATR']
# return self.N
def ReadExistData(self, data):
"""除了通过发请求获取数据也可以读本地的数据库赋值给self.CurrentData
Args:
data (_type_): 本地csv名称
"""
self.CurrentData = pd.read_csv(data)
def DrawKLine(self, days):
"""画出k线图看看,画出最近days天的K线图
"""
# 日期部分
# dates = pd.to_datetime(self.CurrentData['日期'][-days:])
# # Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期'])
Klinedf = pd.DataFrame()
# Klinedf.set_index = Klinedf['Data']
# 其他数据
Klinedf['Date'] = self.CurrentData['日期'][-days:]
Klinedf['Open'] = self.CurrentData['开盘'][-days:].astype(float)
Klinedf['High'] = self.CurrentData['最高'][-days:].astype(float)
Klinedf['Low'] = self.CurrentData['最低'][-days:].astype(float)
Klinedf['Close'] = self.CurrentData['收盘'][-days:].astype(float)
Klinedf['Volume'] = self.CurrentData['成交量'][-days:].astype(float)
Klinedf.set_index(pd.to_datetime(Klinedf['Date']), inplace=True)
# 画图
mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian_up['Upper'][-days:]), mpf.make_addplot(self.Donchian_down['lower'][-days:])], title=f"{self.TradeCode} K线图")
def calculate_donchian_channel_up(self, n):
"""
计算n日唐奇安上通道
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame包含"High"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper"
"""
Donchian = pd.DataFrame() # 创建一个空的DataFrame用于存储唐奇安通道数据
# 计算最高价和最低价的N日移动平均线
name = 'Donchian_' + str(n) + '_upper'
Donchian[name] = self.CurrentData['最高'].rolling(n).max() # 使用rolling函数计算n日最高价的移动最大值
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2 # 计算上通道和下通道的中间线,但此行代码被注释掉了
return Donchian # 返回包含唐奇安上通道的DataFrame
def calculate_donchian_channel_down(self, n):
"""
计算n日唐奇安上通道
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame包含"High"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper"
"""
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
name = 'Donchian_' + str(n) + '_lower'
Donchian[name] = self.CurrentData['最低'].rolling(n).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
def calc_atr_donchian_short(self):
"""计算ATR、短期唐奇安通道
"""
# 计算ATR
self.CalATR(self.CurrentData, 20)
# 计算唐奇安通道
self.Donchian_20_ups = self.calculate_donchian_channel_up(20)
self.Donchian_50_ups = self.calculate_donchian_channel_up(50)
self.Donchian_downs = self.calculate_donchian_channel_down(10)
# 画图
# self.DrawKLine(days)
# 把self.N, self.Donchian_up, self.Donchian_down, 添加到self.CurrentData后面保存到mysql数据库
self.CurrentData = pd.concat([self.CurrentData, self.Donchian_20_ups, self.Donchian_50_ups, self.Donchian_downs], axis=1)
def get_ready(self, days):
"""创建一个turtle对象获取数据计算ATR计算唐奇安通道
Args:
days (_type_): _description_
n (_type_): _description_
"""
# if 不存在database
if not mysql_database.check_db_table(f"{self.TradeCode}"):
self.GetRecentData()
self.calc_atr_donchian_short()
Code = f"{self.TradeCode}"
mysql_database.insert_db(self.CurrentData, Code, True, "日期")
else:
# 检查数据库最后一条的时间距离今天是否两天以上
current_date = date.today()
threshold_date = current_date - timedelta(days=2)
last_update = mysql_database.check_db_table_last_date(f"{self.TradeCode}")
if last_update < threshold_date:
# 如果不存在则从akshare获取数据并保存到mysql数据库
mysql_database.delete_table(f"{self.TradeCode}")
self.GetRecentData()
self.calc_atr_donchian_short()
Code = f"{self.TradeCode}"
mysql_database.insert_db(self.CurrentData, Code, True, "日期")
else:
# 如果存在则从mysql数据库中读取数据
self.CurrentData = mysql_database.fetch_all_data(f"{self.TradeCode}")
def CalPositionSize(self):
"""根据风险系数、ATR计算仓位大小, 存于self.IntPositionSize
"""
PositionSize = self.riskcoe * self.Capital /(self.N) # 默认用股票形式了 100
self.IntPositionSize = int(PositionSize // 100) * 100
def system1EnterNormal(self, PriceNow, TempDonchian20Upper, BreakOutLog):
# 没有持仓且价格向上突破---此时包含两种情形1 对某标的首次使用系统2 已发生过突破,此时上次突破天然是失败的
if self.TrigerTime == 0 and PriceNow > TempDonchian20Upper:
# 买入
return True
elif PriceNow > TempDonchian20Upper:#todo !=0不会满足条件 先跳过
self.system1BreakoutValid(PriceNow)
if BreakOutLog[-1][5] == 'Lose': # TT!= 0且突破且上一次突破unseccessful
return True
else:
return False
else:
return False
def system1EnterSafe(self, PriceNow, TempDonchian55Upper):
if PriceNow > TempDonchian55Upper[-1]: # 保底的55日突破
return True
else:
return False
def system1BreakoutValid(self, priceNow):
"""判断前一次突破是否成功是log[-1][5]写入“win”否则写入“Lose”
"""
if priceNow < self.BreakOutLog[-1][3]:
self.BreakOutLog[-1][5] = 'Lose'
else:
self.BreakOutLog[-1][5] = 'None'
# 一天结束计算ATR计算唐奇安通道追加到已有的mysql数据库中
def system_1_Out(self, PriceNow, TempDonchian10Lower):
# 退出:低于20日最低价多头方向,空头以突破20日最高价为止损价格--有持仓且价格向下突破
if self.TrigerTime != 0 and PriceNow < TempDonchian10Lower:
# 退出
return True
else:
return False
def add(self, PriceNow):
"""加仓
"""
if self.TrigerTime < 4 and PriceNow > self.BuyStates[self.TrigerTime - 1][2]:#todo BuyStates是空的
# 买入
return True
else:
return False
def system_1_stop(self, PriceNow):
"""止损判断:如果当前价格<上一次买入后的止损价格则止损
"""
if PriceNow < self.BuyStates[self.TrigerTime - 1][3]:
# 买入
return True
else:
return False
def day_end(self):
pass
class TurtleTrading_OnTime(object):
''' 实时监测主程序可以处理多个turtle
1、获取实时大盘数据
2、根据turtles的代码比较是否触发条件
3、实时监测主流程
'''
def __init__(self, turtle: TurtleTrading, user_email):
self.turtle = turtle
self.user_email = user_email
def get_stocks_data(self):
"""获取实时股票、基金数据,不保存
"""
stock_data = ak.stock_zh_a_spot_em()
stock_data = stock_data.dropna(subset=['最新价'])
# # print(stock_zh_a_spot_df)
# # stock_zh_a_spot_df第一列加上时间精确到分钟
# stock_zh_a_spot_df['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# mysql_database.insert_db(stock_zh_a_spot_df, "stock_price", True, "代码")
etf_data = ak.fund_etf_spot_em()
# etf_data = ak.fund_etf_spot_ths()
# etf_data = etf_data.dropna(subset=['当前-单位净值'])
etf_data = etf_data.dropna(subset=['最新价'])
# etf_data['时间'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# mysql_database.insert_db(etf_data, "etf_price", True, "代码")
return stock_data, etf_data
def Buy_stock(self, price_now):
# 发送邮件 代码self.turtle.TradeCode, 建议买入价格price_now买入份额self.turtle.IntPositionSize
if self.turtle.TrigerTime == 0: # 第一次买入
subject = "买入"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际买入价格-买入份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, buy_price, buy_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功买入
self.turtle.TrigerTime += 1
# 记录self.turtle.BuyStates
add_price = buy_price + 1/2 * self.turtle.N
stop_price = buy_price - 2 * self.turtle.N
cost = buy_price * buy_share - fee
available_cash = self.turtle.Capital - cost
buy_this_time = BuyState(self.turtle.TrigerTime,
buy_price,
add_price,
stop_price,
buy_share,
self.turtle.N,
available_cash)
self.turtle.BuyStates.append(buy_this_time)
today = datetime.now().strftime("%Y-%m-%d")
log_this_time = TradeLog(today,
"买入",
buy_price,
buy_share,
cost,
self.turtle.N,
available_cash,
all_shares=buy_share,
all_cost=cost,
Net_value=buy_price * buy_share,
Net_return=0)
self.turtle.tradeslog.append(log_this_time)
else:
# 加仓
subject = "加仓"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际买入价格-买入份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, buy_price, buy_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功买入
self.turtle.TrigerTime += 1
# 记录self.turtle.BuyStates
add_price = buy_price + 1/2 * self.turtle.N
stop_price = buy_price - 2 * self.turtle.N
cost = buy_price * buy_share - fee
available_cash = self.turtle.BuyStates[-1].available_cash - cost
all_shares = buy_share + self.turtle.BuyStates[-1].all_shares
all_cost = cost + self.turtle.BuyStates[-1].all_cost
net_value = buy_price * all_shares
net_return = net_value - all_cost
buy_this_time = BuyState(self.turtle.TrigerTime,
buy_price,
add_price,
stop_price,
buy_share,
self.turtle.N,
available_cash)
self.turtle.BuyStates.append(buy_this_time)
today = datetime.now().strftime("%Y-%m-%d")
log_this_time = TradeLog(today,
"加仓",
buy_price,
buy_share,
cost,
self.turtle.N,
available_cash,
all_shares,
all_cost,
net_value,
net_return)
self.turtle.tradeslog.append(log_this_time)
pass
def stop_sale_stock(self, price_now):
"""止损卖出
Args:
price_now (_type_): 现价
"""
# 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now卖出份额self.turtle.IntPositionSize
subject = "止损卖出"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际卖出价格-卖出份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, sale_price, sale_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功卖出
self.turtle.TrigerTime = 0
# 记录self.turtle.BuyStates
available_cash = self.turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee
self.turtle.BuyStates = []
sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"),
"止损",
sale_price,
sale_share,
sale_price * sale_share - fee,
self.turtle.N,
available_cash,
all_shares=0,
all_cost=0,
Net_value=sale_price * sale_share,
Net_return=abs(self.turtle.Capital - available_cash))
self.turtle.tradeslog.append(sale_this_time)
def out_sale_stock(self, price_now):
"""止盈卖出
Args:
price_now (_type_): 现价
"""
# 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now卖出份额self.turtle.IntPositionSize
subject = "止盈卖出"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际卖出价格-卖出份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 每隔1分钟检测回信解析邮件。
parsed_email_flag = False
while not parsed_email_flag:
time.sleep(60) # 每次尝试前等待 60 秒
parse_states, sale_price, sale_share, fee = parse_return_email(
self.user_email, send_email_time
)
if parse_states:
parsed_email_flag = True
break
# 成功卖出
self.turtle.TrigerTime = 0
# 记录self.turtle.BuyStates
available_cash = self.turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee
self.turtle.BuyStates = []
sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"),
"止盈",
sale_price,
sale_share,
sale_price * sale_share - fee,
self.turtle.N,
available_cash,
all_shares=0,
all_cost=0,
Net_value=sale_price * sale_share,
Net_return=abs(self.turtle.Capital - available_cash))
self.turtle.tradeslog.append(sale_this_time)
def run_short_trading_loop(self):
while True:
# 获取当前时间
now = datetime.now().time()
# 判断当前时间是否在交易时段内9:30-11:30 或 13:00-15:00
is_trading_time = (
(now.hour == 9 and now.minute >= 30) or
(now.hour == 10 and 0 <= now.minute <= 59) or
(now.hour == 11 and now.minute <= 30) or
(now.hour == 13 and 0 <= now.minute <= 59) or
(now.hour == 14 and 0 <= now.minute <= 59) or
(now.hour == 15 and now.minute <= 0)
)
# if not is_trading_time:
# # 非交易时间,等待 1 分钟后继续循环
# time.sleep(60)
# continue
# 获取股票和ETF数据
stock_data, etf_data = self.get_stocks_data()
# 根据类型获取当前价格
if self.turtle.type == "stock":
self.turtle.PriceNow = float(stock_data.loc[etf_data['代码'] == self.turtle.TradeCode, '最新价'].values[0])
elif self.turtle.type == "etf":
# self.turtle.PriceNow = float(etf_data.loc[etf_data['基金代码'] == self.turtle.TradeCode, '当前-单位净值'].values[0])
self.turtle.PriceNow = float(etf_data.loc[etf_data['代码'] == self.turtle.TradeCode, '最新价'].values[0])
# 判断当前仓位状态并执行相应操作
if self.turtle.TrigerTime == 0:
# 空仓状态
if self.turtle.system1EnterNormal(
self.turtle.PriceNow,
self.turtle.Donchian_20_up,
self.turtle.BreakOutLog
):
self.Buy_stock(self.turtle.PriceNow)
elif self.turtle.system1EnterSafe(
self.turtle.PriceNow,
self.turtle.Donchian_50_up
):
self.Buy_stock(self.turtle.PriceNow)
elif 1 <= self.turtle.TrigerTime <= 3:
# 加仓状态
if self.turtle.system1EnterNormal(
self.turtle.PriceNow,
self.turtle.Donchian_20_up,
self.turtle.BreakOutLog
):
self.Buy_stock(self.turtle.PriceNow)
elif self.turtle.system1EnterSafe(
self.turtle.PriceNow,
self.turtle.Donchian_50_up
):
self.Buy_stock(self.turtle.PriceNow)
elif self.turtle.add(self.turtle.PriceNow):
self.Buy_stock(self.turtle.PriceNow)
elif self.turtle.system_1_stop(self.turtle.PriceNow):
self.stop_sale_stock(self.turtle.PriceNow)
elif self.turtle.system_1_Out(
self.turtle.PriceNow,
self.turtle.Donchian_10_down
):
self.out_sale_stock(self.turtle.PriceNow)
elif self.turtle.TrigerTime == 4:
# 满仓状态
if self.turtle.system_1_stop(self.turtle.PriceNow):
self.stop_sale_stock(self.turtle.PriceNow)
elif self.turtle.system_1_Out(
self.turtle.PriceNow,
self.turtle.Donchian_10_down
):
self.out_sale_stock(self.turtle.PriceNow)
# 等待 1 分钟后下一次循环
time.sleep(60)
def Start_short_system(self):
"""启动short系统
"""
# ------------------准备阶段--------------------
# 获取数据或读取数据 -- 计算ATR Donchian 20 50 up, 20 down
self.turtle.get_ready(100)
self.turtle.N = float(self.turtle.CurrentData['ATR'].iloc[-1])
self.turtle.Donchian_20_up = float(self.turtle.CurrentData['Donchian_20_upper'].iloc[-1])
self.turtle.Donchian_50_up = float(self.turtle.CurrentData['Donchian_50_upper'].iloc[-1])
self.turtle.Donchian_10_down = float(self.turtle.CurrentData['Donchian_10_lower'].iloc[-1])
self.turtle.CalPositionSize()
# ------------------实时监测阶段--------------------
# 9:00 1、判断是否是新的一周是则重新计算Position Size
# 判断是否是新的一周
if datetime.now().weekday() == 0:
self.turtle.CalPositionSize()
# 每分钟获取一次数据,判断是否触发条件 9:30-11:30 13:00-15:00
self.run_short_trading_loop()
# ------------------结束阶段--------------------
# 数据库更新当天数据增加ATR、donchian数据
# 直接做个新表
mysql_database.delete_table(f"{self.turtle.TradeCode}")
self.turtle.get_ready(100)
time.sleep(16.5*600)
if __name__ == '__main__':
user_email = "guoyize2209@163.com"
t = TurtleTrading('513870', "etf", 0.0025, 100000, 200000)
# t.get_ready(100)
a = TurtleTrading_OnTime(t, user_email)
a.Start_short_system()
# # 全是股票
# stock_zh_a_spot_df = ak.stock_zh_a_spot_em()
# # stock_zh_a_spot_df.to_csv("stock_zh_a_spot.txt", sep="\t", index=False, encoding="utf-8")
# stock_zh_a_spot_df = stock_zh_a_spot_df.dropna(subset=['最新价'])
# print(stock_zh_a_spot_df)
# # 全是基金
# etf_data = ak.fund_etf_spot_em()
# etf_data = etf_data.dropna(subset=['最新价'])
# etf_data.to_csv("fund_etf_spot.txt", sep="\t", index=False, encoding="utf-8")
# print(etf_data)