"""
# @Author : RanKe
# @Time : 2024/10/18 22:16
# @File : get_mail_info.py
# @Desc :
"""
import imaplib
import email
import requests
from email.utils import parsedate_to_datetime
from email.header import decode_header, make_header
def get_access_token(client_id,refresh_token):
url = 'https://login.microsoftonline.com/common/oauth2/v2.0/token'
data = {
'client_id': client_id,
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}
response = requests.post(url, data=data)
result_status = response.json().get('error')
if result_status is not None:
print(result_status)
return [False, f"邮箱状态异常:{result_status}"]
else:
new_access_token = response.json()['access_token']
return [True, new_access_token]
def generate_auth_string(email_name,access_token):
auth_string = f"user={email_name}\1auth=Bearer {access_token}\1\1"
print(auth_string)
return auth_string
def get_mail_info(email_name,access_token):
result_list = []
mail = imaplib.IMAP4_SSL('outlook.live.com')
mail.authenticate('XOAUTH2', lambda x: generate_auth_string(email_name, access_token))
mail.select('inbox')
result, data = mail.search(None, 'ALL')
if result == "OK":
mail_ids = sorted(data[0].split(), reverse=True)
last_mail_id_list = mail_ids[:3]
for last_mail_id in last_mail_id_list:
result, msg_data = mail.fetch(last_mail_id, "(RFC822)")
body = ""
if result == 'OK':
raw_email = msg_data[0][1]
email_message = email.message_from_bytes(raw_email)
subject = str(make_header(decode_header(email_message['SUBJECT'])))
mail_from = str(make_header(decode_header(email_message['From']))).replace('<', '(').replace('>',
')')
mail_to = str(make_header(decode_header(email_message['To']))).replace('<', '(').replace('>',
')')
mail_dt = parsedate_to_datetime(email_message['Date']).strftime("%Y-%m-%d %H:%M:%S")
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
if content_type in ["text/html"]:
payload = part.get_payload(decode=True)
body += payload.decode('utf-8', errors='ignore')
else:
body = email_message.get_payload(decode=True).decode('utf-8', errors='ignore')
res_dict = {"subject": subject, "mail_from": mail_from, "mail_to": mail_to, "mail_dt": mail_dt,
"body": body}
result_list.append(res_dict)
else:
res_dict = {"error_key": "解析失败", "error_msg": "邮件信息解析失败,请联系管理员优化处理!"}
return res_dict
return result_list
else:
res_dict = {"error_key": "登录失败", "error_msg": "登录失败,账号异常!"}
return res_dict
if __name__ == '__main__':
client_id = '<CLIENT_ID>'
email_name = '<EMAIL>'
refresh_token = '<PASSWORD>'
access_res = get_access_token(client_id,refresh_token)
if access_res[0]:
access_token = access_res[1]
mail_info_res = get_mail_info(email_name,access_token)
if isinstance(mail_info_res, list):
for mail_info in mail_info_res:
print(f"邮件主题:{mail_info['subject']}")
print(f"发件时间:{mail_info['mail_dt']}")
print(f"发件人:{mail_info['mail_from']}")
print(f"收件人:{mail_info['mail_to']}")
print(f"邮件正文:{mail_info['body']}")
else:
print(mail_info_res.get("error_msg"))
else:
print(access_res[1])