TurtleTrade/EmailTest.py
2025-05-09 22:42:59 +08:00

175 lines
6.7 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import imaplib
import smtplib
import email
from email.header import decode_header
from email.mime.text import MIMEText
from imapclient import IMAPClient
from email.parser import Parser
import base64
def send_email(subject, body, to_email):
# 这个函数名为send_email它接受三个参数
# subject邮件主题、body邮件内容和to_email收件人的电子邮件地址
from_email = 'yizeguo1@126.com'
mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码
# mail_pass = 'pwvzuqbiysqshgha'# qq授权码
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = from_email
msg['To'] = to_email
server = smtplib.SMTP('smtp.126.com', 25)
# server = smtplib.SMTP('smtp.qq.com', 465)
server.starttls()
server.login(from_email, mail_pass)
server.sendmail(from_email, to_email, msg.as_string())
server.quit()
def get_latest_email_body(to_email):
# 连接到126邮箱的IMAP服务器
mail = IMAPClient("imap.126.com")
from_email = 'yizeguo1@126.com'
mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码
mail.login(from_email, mail_pass)
mail.id_({"name": "IMAPClient", "version": "2.1.0"})
# mail.list_folders()
print(mail.list_folders())
# 选择收件箱
# messages = mail.select_folder(folder="inbox", readonly=True)
mail.select_folder('INBOX')
# 搜索发件人为指定邮箱的所有邮件
encoded_email = to_email.encode('utf-8')
# 构建搜索条件HEADER FROM "<发件人邮箱>"
search_criteria = f'FROM "{encoded_email.decode("utf-8")}"'
# 执行搜索操作
messages = mail.search(search_criteria)
# print(f"Search Status: {status}")
print(f"Matching Emails: {messages}")
for msgid, data in mail.fetch(messages, ['ENVELOPE', 'BODY[]']).items():
envelope = data[b'ENVELOPE']
raw_email = data[b'BODY[]']
# 使用 email 库解析原始内容
email_message = email.message_from_bytes(raw_email)
# 获取邮件正文(处理多部分的情况)
body = ""
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8')
break
else:
body = email_message.get_payload(decode=True).decode(email_message.get_content_charset() or 'utf-8')
raw_subject = envelope.subject.decode()
# 然后 decode_header 接受 str 类型
decoded_parts = decode_header(raw_subject)
subject = ''.join(
part.decode(encoding or 'utf-8') if isinstance(part, bytes) else part
for part, encoding in decoded_parts
)
print('ID #%d: "%s" received %s' % (msgid, subject, envelope.date))
print('Body:', body)
def parse_return_email(to_email, send_email_time):
"""解析回信邮件body中的内容
根据"实际买入价格-买入份额-手续费"
格式解析body中的内容
"""
mail = IMAPClient("imap.126.com")
from_email = 'yizeguo1@126.com'
mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码
mail.login(from_email, mail_pass)
mail.id_({"name": "IMAPClient", "version": "2.1.0"})
mail.select_folder('INBOX')
# 搜索发件人为指定邮箱的所有邮件
encoded_email = to_email.encode('utf-8')
# 构建搜索条件HEADER FROM "<发件人邮箱>, 时间大于 <send_email_time>"
search_criteria = f'FROM "{encoded_email.decode("utf-8")}" SINCE "{send_email_time}"'
# search_criteria = f'FROM "{encoded_email.decode("utf-8")}"'
# 执行搜索操作
try:
messages = mail.search(search_criteria)
if messages:
print(f"Matching Emails: {messages}")
else:
print("No matching emails found.")
return False, None, None, None
except imaplib.IMAP4.error as e:
print(f"Error searching emails: {e}")
return False, None, None, None
for msgid, data in mail.fetch(messages, ['ENVELOPE', 'BODY[]']).items():
envelope = data[b'ENVELOPE']
raw_email = data[b'BODY[]']
# 使用 email 库解析原始内容
email_message = email.message_from_bytes(raw_email)
# 获取邮件正文(处理多部分的情况)
body = ""
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8')
break
else:
body = email_message.get_payload(decode=True).decode(email_message.get_content_charset() or 'utf-8')
raw_subject = envelope.subject.decode()
# 然后 decode_header 接受 str 类型
decoded_parts = decode_header(raw_subject)
subject = ''.join(
part.decode(encoding or 'utf-8') if isinstance(part, bytes) else part
for part, encoding in decoded_parts
)
# 根据“实际买入价格-买入份额-手续费”格式解析body中的内容
parse_states = True
body.split("-")
price = body.split("-")[0]
share = body.split("-")[1]
fee = body.split("-")[2]
return parse_states, price, share, fee
def check_email(to_email, send_email_time):
"""检查回信邮件是否已经收到
"""
mail = IMAPClient("imap.126.com")
if __name__ == "__main__":
# send_email("测试", 'test', "guoyize2209@163.com")
# id_info = {
# 'name': 'myname',
# 'version': '1.0.0',
# 'vendor': 'myclient',
# 'support-email': 'yizeguo1@126.com'
# }
get_latest_email_body("guoyize2209@163.com")
# if body:
# print("Latest email body:", body)
# else:
# print("No plain text email found.")