邮箱信息解析

# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
# @Author  : RanKe
# @Time    : 2024/10/18 22:16
# @File      : get_mail_account.py
# @Desc   : 
"""

# 卡网发货格式如下所示
mail_info = '账号----密码------------------------------------------------------------clientid----授权码'
mail_info_list = mail_info.split('----')
mail_name = mail_info_list[0]
mail_password = mail_info_list[1]
client_id = mail_info_list[16]
refresh_token = mail_info_list[17]

print(f"邮箱账号:{mail_name}")
print(f"邮箱密码:{mail_password}")
print(f"client_id:{client_id}")
print(f"refresh_token:{refresh_token}")

IMAP取件示例

# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
# @Author  : RanKe
# @Time    : 2024/10/18 22:16
# @File      : get_mail_info.py
# @Desc   : 
"""

import imaplib
import email
import requests
from email.utils import parsedate_to_datetime
from email.header import decode_header, make_header


def get_access_token(client_id,refresh_token):
    url = 'https://login.microsoftonline.com/common/oauth2/v2.0/token'
    data = {
        'client_id': client_id,
        'grant_type': 'refresh_token',
        'refresh_token': refresh_token,
    }
    response = requests.post(url, data=data)
    result_status = response.json().get('error')
    if result_status is not None:
        print(result_status)
        return [False, f"邮箱状态异常:{result_status}"]
    else:
        new_access_token = response.json()['access_token']
        return [True, new_access_token]

def generate_auth_string(email_name,access_token):
    auth_string = f"user={email_name}\1auth=Bearer {access_token}\1\1"
    print(auth_string)
    return auth_string

def get_mail_info(email_name,access_token):
    result_list = []
    mail = imaplib.IMAP4_SSL('outlook.live.com')
    mail.authenticate('XOAUTH2', lambda x: generate_auth_string(email_name, access_token))
    mail.select('inbox') #选择收件箱
    # mail.select('Junk')  #选择垃圾箱
    result, data = mail.search(None, 'ALL')
    if result == "OK":
        mail_ids = sorted(data[0].split(), reverse=True)
        last_mail_id_list = mail_ids[:3]
        for last_mail_id in last_mail_id_list:
            result, msg_data = mail.fetch(last_mail_id, "(RFC822)")
            body = ""
            if result == 'OK':
                # 解析邮件内容
                raw_email = msg_data[0][1]
                email_message = email.message_from_bytes(raw_email)
                subject = str(make_header(decode_header(email_message['SUBJECT'])))  # 主题
                mail_from = str(make_header(decode_header(email_message['From']))).replace('<', '(').replace('>',
                                                                                                             ')')  # 发件人
                mail_to = str(make_header(decode_header(email_message['To']))).replace('<', '(').replace('>',
                                                                                                         ')')  # 收件人
                mail_dt = parsedate_to_datetime(email_message['Date']).strftime("%Y-%m-%d %H:%M:%S")  # 收件时间
                if email_message.is_multipart():
                    for part in email_message.walk():
                        content_type = part.get_content_type()
                        if content_type in ["text/html"]:
                            payload = part.get_payload(decode=True)
                            body += payload.decode('utf-8', errors='ignore')

                else:
                    body = email_message.get_payload(decode=True).decode('utf-8', errors='ignore')
                res_dict = {"subject": subject, "mail_from": mail_from, "mail_to": mail_to, "mail_dt": mail_dt,
                            "body": body}
                result_list.append(res_dict)
            else:
                res_dict = {"error_key": "解析失败", "error_msg": "邮件信息解析失败,请联系管理员优化处理!"}
                return res_dict
        return result_list
    else:
        res_dict = {"error_key": "登录失败", "error_msg": "登录失败,账号异常!"}
        return res_dict

if __name__ == '__main__':
    client_id = '<CLIENT_ID>'
    email_name = '<EMAIL>'
    refresh_token = '<PASSWORD>'
    access_res = get_access_token(client_id,refresh_token)
    if access_res[0]:
        access_token = access_res[1]
        mail_info_res = get_mail_info(email_name,access_token)
        if isinstance(mail_info_res, list):
            for mail_info in mail_info_res:
                print(f"邮件主题:{mail_info['subject']}")
                print(f"发件时间:{mail_info['mail_dt']}")
                print(f"发件人:{mail_info['mail_from']}")
                print(f"收件人:{mail_info['mail_to']}")
                print(f"邮件正文:{mail_info['body']}")
        else:
            print(mail_info_res.get("error_msg"))
    else:
        print(access_res[1])