增加email功能

This commit is contained in:
gyz 2025-05-06 22:27:23 +08:00
parent d2272641f2
commit 1994e91d72
2 changed files with 247 additions and 15 deletions

View File

@ -83,8 +83,65 @@ def get_latest_email_body(to_email):
print('ID #%d: "%s" received %s' % (msgid, subject, envelope.date))
print('Body:', body)
def parse_return_email(to_email, send_email_time):
"""解析回信邮件body中的内容
根据"实际买入价格-买入份额-手续费"
格式解析body中的内容
"""
mail = IMAPClient("imap.126.com")
from_email = 'yizeguo1@126.com'
mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码
mail.login(from_email, mail_pass)
mail.id_({"name": "IMAPClient", "version": "2.1.0"})
mail.select_folder('INBOX')
# 搜索发件人为指定邮箱的所有邮件
encoded_email = to_email.encode('utf-8')
# 构建搜索条件HEADER FROM "<发件人邮箱>, 时间大于 <send_email_time>"
search_criteria = f'FROM "{encoded_email.decode("utf-8")}" SINCE "{send_email_time}"'
# search_criteria = f'FROM "{encoded_email.decode("utf-8")}"'
# 执行搜索操作
messages = mail.search(search_criteria)
for msgid, data in mail.fetch(messages, ['ENVELOPE', 'BODY[]']).items():
envelope = data[b'ENVELOPE']
raw_email = data[b'BODY[]']
# 使用 email 库解析原始内容
email_message = email.message_from_bytes(raw_email)
# 获取邮件正文(处理多部分的情况)
body = ""
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8')
break
else:
body = email_message.get_payload(decode=True).decode(email_message.get_content_charset() or 'utf-8')
raw_subject = envelope.subject.decode()
# 然后 decode_header 接受 str 类型
decoded_parts = decode_header(raw_subject)
subject = ''.join(
part.decode(encoding or 'utf-8') if isinstance(part, bytes) else part
for part, encoding in decoded_parts
)
# 根据“实际买入价格-买入份额-手续费”格式解析body中的内容
body.split("-")
price = body.split("-")[0]
share = body.split("-")[1]
fee = body.split("-")[2]
return price, share, fee
if __name__ == "__main__":
# send_email("测试", 'test', "guoyize2209@163.com")

View File

@ -8,19 +8,33 @@ import mplfinance as mpf
import sqlite3
import stock_database
import mysql_database
from EmailTest import send_email
from EmailTest import send_email, parse_return_email
from dataclasses import dataclass
@dataclass
class BuyState:
trigger_time: float
buy_price: float
add_price: float
stop_price: float
quantity: int
n: int
available_cash: float
trigger_time: float # 触发次数
buy_price: float # 买入价格
add_price: float # 加仓价格
stop_price: float # 止损价格
shares: int # 买入股数
atr: int # ATR
available_cash: float # 可用资金
@dataclass
class TradeLog:
data: str # 时间
type: str # 操作类型
buy_price: float # 买入价格
shares: int # 买入股数
cost: float # 成本
atr: int # ATR
available_cash: float # 可用资金
all_shares: float # 总股数
all_cost: float # 总成本
Net_value: float # 净值
Net_return: float # 净收益
def calc_sma_atr_pd(kdf,period):
"""计算TR与ATR
@ -60,7 +74,7 @@ class TurtleTrading(object):
self.TrigerTime = 0
self.BuyStates = list[BuyState] = []
self.tradeslog = [] # 交易记录
self.tradeslog = list[TradeLog] # 交易记录
def GetRecentData(self):
"""获取某个标的的最近数据,从两年前到今天, 计算后的数据保存在self.CurrentData
@ -306,8 +320,9 @@ class TurtleTrading_OnTime(object):
3实时监测主流程
'''
def __init__(self, turtle: TurtleTrading):
def __init__(self, turtle: TurtleTrading, user_email):
self.turtle = turtle
self.user_email = user_email
def get_stocks_data(self):
"""获取实时股票、基金数据,不保存
@ -328,13 +343,173 @@ class TurtleTrading_OnTime(object):
def Buy_stock(self, price_now):
# 发送邮件 代码self.turtle.TradeCode, 建议买入价格price_now买入份额self.turtle.IntPositionSize
send_email()
# 每隔1分钟检测回信解析邮件。
if self.turtle.TrigerTime == 0: # 第一次买入
# 记录self.turtle.BuyStates
pass
subject = "买入"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际买入价格-买入份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#todo 每隔1分钟检测回信解析邮件。
buy_price, buy_share, fee = parse_return_email(self.user_email, send_email_time)
# 成功买入
self.turtle.TrigerTime += 1
# 记录self.turtle.BuyStates
add_price = buy_price + 1/2 * self.turtle.N
stop_price = buy_price - 2 * self.turtle.N
cost = buy_price * buy_share - fee
available_cash = self.turtle.Capital - cost
buy_this_time = BuyState(self.turtle.TrigerTime,
buy_price,
add_price,
stop_price,
buy_share,
self.turtle.N,
available_cash)
self.turtle.BuyStates.append(buy_this_time)
today = datetime.now().strftime("%Y-%m-%d")
log_this_time = TradeLog(today,
"买入",
buy_price,
buy_share,
cost,
self.turtle.N,
available_cash,
all_shares=buy_share,
all_cost=cost,
Net_value=buy_price * buy_share,
Net_return=0)
self.turtle.tradeslog.append(log_this_time)
else:
# 加仓
subject = "加仓"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际买入价格-买入份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#todo 每隔1分钟检测回信解析邮件。
buy_price, buy_share, fee = parse_return_email(self.user_email, send_email_time)
# 成功买入
self.turtle.TrigerTime += 1
# 记录self.turtle.BuyStates
add_price = buy_price + 1/2 * self.turtle.N
stop_price = buy_price - 2 * self.turtle.N
cost = buy_price * buy_share - fee
available_cash = self.turtle.BuyStates[-1].available_cash - cost
all_shares = buy_share + self.turtle.BuyStates[-1].all_shares
all_cost = cost + self.turtle.BuyStates[-1].all_cost
net_value = buy_price * all_shares
net_return = net_value - all_cost
buy_this_time = BuyState(self.turtle.TrigerTime,
buy_price,
add_price,
stop_price,
buy_share,
self.turtle.N,
available_cash)
self.turtle.BuyStates.append(buy_this_time)
today = datetime.now().strftime("%Y-%m-%d")
log_this_time = TradeLog(today,
"加仓",
buy_price,
buy_share,
cost,
self.turtle.N,
available_cash,
all_shares,
all_cost,
net_value,
net_return)
self.turtle.tradeslog.append(log_this_time)
pass
def stop_sale_stock(self, price_now):
"""止损卖出
Args:
price_now (_type_): 现价
"""
# 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now卖出份额self.turtle.IntPositionSize
subject = "止损卖出"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际卖出价格-卖出份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# todo 每隔1分钟检测回信解析邮件。
sale_price, sale_share, fee = parse_return_email(self.user_email, send_email_time)
# 成功卖出
self.turtle.TrigerTime = 0
# 记录self.turtle.BuyStates
available_cash = self.turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee
self.turtle.BuyStates = []
sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"),
"止损",
sale_price,
sale_share,
sale_price * sale_share - fee,
self.turtle.N,
available_cash,
all_shares=0,
all_cost=0,
Net_value=sale_price * sale_share,
Net_return=abs(self.turtle.Capital - available_cash))
self.turtle.tradeslog.append(sale_this_time)
def out_sale_stock(self, price_now):
"""止盈卖出
Args:
price_now (_type_): 现价
"""
# 发送邮件 代码self.turtle.TradeCode, 建议卖出价格price_now卖出份额self.turtle.IntPositionSize
subject = "止盈卖出"
body = f"{self.turtle.TradeCode},价格{price_now},份额{self.turtle.IntPositionSize} \n "
body += "回复:实际卖出价格-卖出份额-手续费"
send_email(subject, body, self.user_email)
send_email_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# todo 每隔1分钟检测回信解析邮件。
sale_price, sale_share, fee = parse_return_email(self.user_email, send_email_time)
# 成功卖出
self.turtle.TrigerTime = 0
# 记录self.turtle.BuyStates
available_cash = self.turtle.BuyStates[-1].available_cash + sale_price * sale_share - fee
self.turtle.BuyStates = []
sale_this_time = TradeLog(datetime.now().strftime("%Y-%m-%d"),
"止盈",
sale_price,
sale_share,
sale_price * sale_share - fee,
self.turtle.N,
available_cash,
all_shares=0,
all_cost=0,
Net_value=sale_price * sale_share,
Net_return=abs(self.turtle.Capital - available_cash))
self.turtle.tradeslog.append(sale_this_time)
def Start_short_system(self):
"""启动short系统
"""