import imaplib import smtplib import email from email.header import decode_header from email.mime.text import MIMEText from imapclient import IMAPClient from email.parser import Parser import base64 def send_email(subject, body, to_email): # 这个函数名为send_email,它接受三个参数: # subject(邮件主题)、body(邮件内容)和to_email(收件人的电子邮件地址)。 from_email = 'yizeguo1@126.com' mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码 # mail_pass = 'pwvzuqbiysqshgha'# qq授权码 msg = MIMEText(body) msg['Subject'] = subject msg['From'] = from_email msg['To'] = to_email server = smtplib.SMTP('smtp.126.com', 25) # server = smtplib.SMTP('smtp.qq.com', 465) server.starttls() server.login(from_email, mail_pass) server.sendmail(from_email, to_email, msg.as_string()) server.quit() def get_latest_email_body(to_email): # 连接到126邮箱的IMAP服务器 mail = IMAPClient("imap.126.com") from_email = 'yizeguo1@126.com' mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码 mail.login(from_email, mail_pass) mail.id_({"name": "IMAPClient", "version": "2.1.0"}) # mail.list_folders() print(mail.list_folders()) # 选择收件箱 # messages = mail.select_folder(folder="inbox", readonly=True) mail.select_folder('INBOX') # 搜索发件人为指定邮箱的所有邮件 encoded_email = to_email.encode('utf-8') # 构建搜索条件:HEADER FROM "<发件人邮箱>" search_criteria = f'FROM "{encoded_email.decode("utf-8")}"' # 执行搜索操作 messages = mail.search(search_criteria) # print(f"Search Status: {status}") print(f"Matching Emails: {messages}") for msgid, data in mail.fetch(messages, ['ENVELOPE', 'BODY[]']).items(): envelope = data[b'ENVELOPE'] raw_email = data[b'BODY[]'] # 使用 email 库解析原始内容 email_message = email.message_from_bytes(raw_email) # 获取邮件正文(处理多部分的情况) body = "" if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if content_type == "text/plain" and "attachment" not in content_disposition: body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8') break else: body = email_message.get_payload(decode=True).decode(email_message.get_content_charset() or 'utf-8') raw_subject = envelope.subject.decode() # 然后 decode_header 接受 str 类型 decoded_parts = decode_header(raw_subject) subject = ''.join( part.decode(encoding or 'utf-8') if isinstance(part, bytes) else part for part, encoding in decoded_parts ) print('ID #%d: "%s" received %s' % (msgid, subject, envelope.date)) print('Body:', body) def parse_return_email(to_email, send_email_time): """解析回信邮件body中的内容 根据"实际买入价格-买入份额-手续费" 格式解析body中的内容 """ mail = IMAPClient("imap.126.com") from_email = 'yizeguo1@126.com' mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码 mail.login(from_email, mail_pass) mail.id_({"name": "IMAPClient", "version": "2.1.0"}) mail.select_folder('INBOX') # 搜索发件人为指定邮箱的所有邮件 encoded_email = to_email.encode('utf-8') # 构建搜索条件:HEADER FROM "<发件人邮箱>, 时间大于 " search_criteria = ['FROM', encoded_email.decode('utf-8'), 'SINCE', send_email_time] # search_criteria = f'FROM "{encoded_email.decode("utf-8")}"' # 执行搜索操作 try: messages = mail.search(search_criteria) if messages: print(f"Matching Emails: {messages}") else: print("No matching emails found.") return False, None, None, None except imaplib.IMAP4.error as e: print(f"Error searching emails: {e}") return False, None, None, None for msgid, data in mail.fetch(messages, ['ENVELOPE', 'BODY[]']).items(): envelope = data[b'ENVELOPE'] raw_email = data[b'BODY[]'] # 使用 email 库解析原始内容 email_message = email.message_from_bytes(raw_email) # 获取邮件正文(处理多部分的情况) body = "" if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if content_type == "text/plain" and "attachment" not in content_disposition: body = part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8') break else: body = email_message.get_payload(decode=True).decode(email_message.get_content_charset() or 'utf-8') raw_subject = envelope.subject.decode() # 然后 decode_header 接受 str 类型 decoded_parts = decode_header(raw_subject) subject = ''.join( part.decode(encoding or 'utf-8') if isinstance(part, bytes) else part for part, encoding in decoded_parts ) # 根据“实际买入价格-买入份额-手续费”格式解析body中的内容 parse_states = True body.split("-") price = float(body.split("-")[0]) share = float(body.split("-")[1]) fee = float(body.split("-")[2]) return parse_states, price, share, fee def check_email(to_email, send_email_time): """检查回信邮件是否已经收到 """ mail = IMAPClient("imap.126.com") if __name__ == "__main__": # send_email("测试", 'test', "guoyize2209@163.com") # id_info = { # 'name': 'myname', # 'version': '1.0.0', # 'vendor': 'myclient', # 'support-email': 'yizeguo1@126.com' # } get_latest_email_body("guoyize2209@163.com") # if body: # print("Latest email body:", body) # else: # print("No plain text email found.")