feat(login): 实现动态密钥登录流程

- 移除默认静态密钥配置,改为从登录响应动态获取- 新增 login_and_get_dynamic_key_iv 函数处理未加密登录并提取密钥- 修改 get_des_encrypt 函数为必需传入密钥参数
- 更新 send_encrypted_request_with_dynamic_key 使用动态密钥发送加密请求
- 调整主程序逻辑,先执行未加密登录获取密钥再进行加密请求
- 修改测试账号为 test002 并移除手动切换加密模式的选项
main
wsy182 2025-10-27 11:19:37 +08:00
parent 3489709697
commit 9c6f5c3b63
1 changed files with 56 additions and 50 deletions

View File

@ -4,21 +4,20 @@ from Crypto.Cipher import DES
from Crypto.Util.Padding import pad from Crypto.Util.Padding import pad
import base64 import base64
# === 配置你的密钥和 IV必须是 8 字节!)=== # 全局变量,用于存储从登录响应中获取的 key 和 iv
DEFAULT_KEY = "12345678" # 替换为实际 key必须 8 字符 DYNAMIC_KEY = None
DEFAULT_IV = "abcdefgh" # 替换为实际 iv必须 8 字符 DYNAMIC_IV = None
def get_des_encrypt(data: str, _key: str = DEFAULT_KEY, _iv: str = DEFAULT_IV) -> str:
def get_des_encrypt(data: str, _key: str, _iv: str) -> str:
""" """
使用 DES/CBC/PKCS7 加密字符串 CryptoJS.DES.encrypt 兼容 使用 DES/CBC/PKCS7 加密字符串
""" """
key = _key.encode('utf-8') key = _key.encode('utf-8')
iv = _iv.encode('utf-8') iv = _iv.encode('utf-8')
if len(key) != 8: if len(key) != 8 or len(iv) != 8:
raise ValueError("DES key must be exactly 8 bytes long") raise ValueError("DES key and IV must be exactly 8 bytes")
if len(iv) != 8:
raise ValueError("DES IV must be exactly 8 bytes long")
plaintext = data.encode('utf-8') plaintext = data.encode('utf-8')
padded_data = pad(plaintext, DES.block_size) padded_data = pad(plaintext, DES.block_size)
@ -27,69 +26,76 @@ def get_des_encrypt(data: str, _key: str = DEFAULT_KEY, _iv: str = DEFAULT_IV) -
return base64.b64encode(encrypted_bytes).decode('utf-8') return base64.b64encode(encrypted_bytes).decode('utf-8')
def send_unencrypted_request(url, data): def login_and_get_dynamic_key_iv(url, login_data):
headers = { """
"Content-Type": "application/json" 发送未加密登录请求成功后提取 key iv
返回 (token, key, iv) (None, None, None)
"""
payload = {
"encType": 0,
"x_flag": "",
"data": login_data
} }
headers = {"Content-Type": "application/json"}
try: try:
response = requests.post(url, headers=headers, data=json.dumps(data)) response = requests.post(url, headers=headers, json=payload)
print("🔓 [未加密] Status Code:", response.status_code) print("🔓 [登录] 状态码:", response.status_code)
print("🔓 [未加密] Response Text:", response.text)
result = response.json() result = response.json()
token = result.get("data", {}).get("token") print("🔓 [登录] 响应:", json.dumps(result, indent=2, ensure_ascii=False))
if token:
print("✅ [未加密] 登录成功Token", token) data = result.get("data", {})
token = data.get("token")
key = data.get("key")
iv = data.get("iv")
if token and key and iv:
print(f"✅ 登录成功Token: {token}")
print(f"🔑 动态 Key: {key}, IV: {iv}")
return token, key, iv
else: else:
print("❌ [未加密] 未获取到 Token响应内容", result) print("登录成功但缺少 key 或 iv")
except requests.RequestException as e: return None, None, None
print("🌐 [未加密] 请求异常:", e) except Exception as e:
except json.JSONDecodeError: print("❌ 登录异常:", e)
print("🧩 [未加密] 响应不是合法的 JSON 格式") return None, None, None
def send_encrypted_request(url, data): def send_encrypted_request_with_dynamic_key(url, data, key, iv):
# 将原始数据转为紧凑 JSON 字符串 """
使用动态 key/iv 加密并发送请求
"""
data_str = json.dumps(data, separators=(',', ':')) data_str = json.dumps(data, separators=(',', ':'))
# 加密 encrypted_data = get_des_encrypt(data_str, key, iv)
encrypted_data = get_des_encrypt(data_str, DEFAULT_KEY, DEFAULT_IV)
# 构造最终 payload
payload = { payload = {
"encType": 0, "encType": 0,
"x_flag": "", "x_flag": "",
"data": encrypted_data "data": encrypted_data
} }
headers = { headers = {"Content-Type": "application/json"}
"Content-Type": "application/json"
}
try: try:
response = requests.post(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, json=payload)
print("🔒 [已加密] Status Code:", response.status_code) print("🔒 [加密请求] 状态码:", response.status_code)
print("🔒 [已加密] Response Text:", response.text) print("🔒 [加密请求] 响应:", response.text)
result = response.json() result = response.json()
token = result.get("data", {}).get("token") token = result.get("data", {}).get("token")
if token: if token:
print("[已加密] 登录成功Token", token) print("加密请求成功Token:", token)
else: else:
print("❌ [已加密] 未获取到 Token响应内容", result) print("⚠️ 加密请求完成,但无新 token")
except requests.RequestException as e: except Exception as e:
print("🌐 [已加密] 请求异常:", e) print("❌ 加密请求异常:", e)
except json.JSONDecodeError:
print("🧩 [已加密] 响应不是合法的 JSON 格式")
except ValueError as ve:
print("⚠️ [已加密] 加密参数错误:", ve)
if __name__ == '__main__': if __name__ == '__main__':
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login" url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
login_data = { login_data = {
"userId": "admin", "userId": "test002",
"password": "123456" "password": "123456"
} }
token, key, iv = login_and_get_dynamic_key_iv(url, login_data)
# ========== 选择模式 ========== if not (key and iv):
USE_ENCRYPTION = False # 改为 False 则发送未加密请求 print("🛑 无法获取动态 key/iv退出。")
exit(1)
if USE_ENCRYPTION:
send_encrypted_request(url, login_data)
else:
send_unencrypted_request(url, login_data)