Compare commits

...

18 Commits

Author SHA1 Message Date
c1fa662552 feat(sign): 支持多环境配置和签名生成优化
- 新增开发环境和生产环境的配置管理
- 移除参数中的空格替换逻辑
- 更新默认搜索键和用户类型
- 支持通过命令行参数切换运行环境
- 自动构建完整请求URL用于curl测试
- 添加环境选择提示和错误处理机制
2025-11-25 11:06:04 +08:00
bab29150ab fix(generateSign): 更新系统配置和搜索键值
- 修改 systemCode 和 appId 为新的标识符
- 更新 secret_key 以增强安全性
- 扩展 searchKeys 列表,增加多个新的搜索键值
- 调整 searchKeys 数组格式以提高可读性
2025-11-24 16:01:44 +08:00
8c4e48f0ef feat(utils): 添加MD5签名生成工具
- 实现了基于Java规则的MD5签名计算方法
- 支持参数映射排序与URL编码处理
- 添加了空格去除和特殊字符处理逻辑
- 集成了时间戳、随机字符串生成功能
- 提供完整的curl请求示例输出
- 支持复杂对象序列化为JSON请求体
- 实现了多层级参数拼接与签名验证
2025-11-24 11:12:25 +08:00
167d96b6ba feat(wits): 添加WITS数据模拟发送脚本- 实现随机生成WITS协议数据功能
- 支持整数和浮点数两种数据格式
- 配置目标主机和端口进行TCP连接
- 循环发送预定义的WITS字段码
- 添加发送间隔控制和异常处理机制- 提供命令行入口直接运行脚本
2025-11-24 10:23:46 +08:00
123dfc4411 feat(wits): 添加WITS数据模拟发送脚本- 实现随机生成WITS协议数据功能
- 支持整数和浮点数两种数据格式
- 配置目标主机和端口进行TCP连接
- 循环发送预定义的WITS字段码
- 添加发送间隔控制和异常处理机制- 提供命令行入口直接运行脚本
2025-11-17 09:58:16 +08:00
ea2d12c74e feat(network): 添加 WITS 数据接收功能并更新 WebSocket 地址
- 新增 recive_wits.py 文件用于通过 TCP 接收 WITS 数据- 实现自动重连机制和数据黏包/拆包处理- 更新 test_websocket.py 中的 WebSocket 连接地址- 修改端口号以适配新的服务配置
2025-11-14 09:55:29 +08:00
9c6f5c3b63 feat(login): 实现动态密钥登录流程
- 移除默认静态密钥配置,改为从登录响应动态获取- 新增 login_and_get_dynamic_key_iv 函数处理未加密登录并提取密钥- 修改 get_des_encrypt 函数为必需传入密钥参数
- 更新 send_encrypted_request_with_dynamic_key 使用动态密钥发送加密请求
- 调整主程序逻辑,先执行未加密登录获取密钥再进行加密请求
- 修改测试账号为 test002 并移除手动切换加密模式的选项
2025-10-27 11:19:37 +08:00
3489709697 feat(login): 支持发送加密和未加密登录请求
- 添加 send_unencrypted_request 函数用于发送未加密请求
- 添加 send_encrypted_request 函数用于发送加密请求- 重构主流程,通过 USE_ENCRYPTION 开关选择请求类型
- 优化日志输出,区分加密与未加密请求状态
- 统一异常处理逻辑,增强代码健壮性
- 保留原有加密逻辑并整合到新结构中
2025-10-27 11:05:59 +08:00
3ab5f15b0e feat(login): 实现DES加密登录功能- 添加 DES/CBC/PKCS7 加密函数,兼容 CryptoJS.DES.encrypt
- 使用默认密钥和 IV 对登录数据进行加密
- 修改请求 payload 结构,将加密后的字符串作为 data 字段值
- 更新接口地址并调整请求逻辑
- 增强错误处理与调试信息打印- 优化响应解析方式,正确提取 token 信息
2025-10-21 13:28:35 +08:00
5fa68924e3 feat(websocket): 更新WebSocket连接地址并添加认证令牌
- 将连接地址从 wss://192.168.1.3/ws/ 更改为 ws://192.168.1.41:9516/ws/- 添加了用于身份验证的 Bearer Token
- 启用 WebSocket 调试追踪功能
2025-10-15 11:15:58 +08:00
945ac31725 Merge remote-tracking branch 'origin/main'
# Conflicts:
#	test_websocket.py
2025-10-15 11:09:01 +08:00
5b1ce354b9 feat(websocket): 更新WebSocket连接地址并添加认证令牌
- 将连接地址从 wss://192.168.1.3/ws/ 更改为 ws://192.168.1.41:9516/ws/- 添加了用于身份验证的 Bearer Token
- 启用 WebSocket 调试追踪功能
2025-10-15 11:08:39 +08:00
ce607c5637 feat(auth): 添加系统登录并获取token功能
- 实现用户登录接口调用
- 解析响应数据提取token
- 添加异常处理机制
- 支持JSON格式请求体- 集成requests库进行HTTP通信
- 提供登录成功/失败状态提示
2025-10-15 11:07:33 +08:00
266ba6ad31 feat(whatsapp): 实现 WhatsApp 风格的端到端加密通信原型
- 添加了基于 X3DH 和 Double Ratchet 的加密会话逻辑
- 实现了客户端密钥生成、bundle 注册与获取
- 构建了极简中继服务器用于转发加密消息
- 支持消息加密、解密及 MAC 校验- 提供完整演示流程,包括双向通信和多消息发送
- 使用 AES-CBC 加密和 HMAC-SHA256 认证
- 引入 X25519 密钥交换和 HKDF 密钥派生函数
- 包含一次性预共享密钥(OPK)管理机制
2025-10-06 14:42:14 +08:00
f7eb913be3 变更 2025-08-08 10:17:31 +08:00
1211d2fca9 变更 2025-08-08 09:51:44 +08:00
8c58329bfc Merge remote-tracking branch 'origin/main' 2025-08-08 09:47:33 +08:00
4ac98d9791 变更 2025-08-08 09:47:27 +08:00
9 changed files with 882 additions and 0 deletions

124
generateSign.py Normal file
View File

@@ -0,0 +1,124 @@
import json
import time
import uuid
import urllib.parse
import hashlib
import sys
# =================== 运行环境配置 ===================
CONFIG = {
"dev": {
"platform_ifc_system_code": "ef5b17caff6e4da19d6af82d539e894d",
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
"appId": "57395d2bc668496c9c57d8f2b19bd516",
"secret_key": "KMFHKo1Uzrl&MWXorbQIT&C$Qea$uQOY",
"ifcUrl": "http://192.168.1.202:8083/dev-api/cenertech-interface-center/IFC2"
},
"prod": {
"platform_ifc_system_code": "57395d2bc668496c9c57d8f2b19bd516",
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
"appId": "57395d2bc668496c9c57d8f2b19bd516",
"secret_key": "opS=K9Parlf&p+JxBOQD2q+zNZa+uXEE",
"ifcUrl": "https://dpc.cet.cnooc/prod-api/cenertech-interface-center/IFC2"
}
}
# =================== MD5签名计算 ===================
def calculate_md5(param, secret_key):
return hashlib.md5((secret_key + param).encode('utf-8')).hexdigest()
def create_sign(param_map, secret_key):
sorted_keys = sorted(param_map.keys())
parts = []
for key in sorted_keys:
val = param_map[key]
if val is None:
parts.append(f"{key}=")
continue
if isinstance(val, list):
for item in val:
encoded_val = urllib.parse.quote(str(item), encoding='utf-8')
parts.append(f"{key}={encoded_val}")
continue
val_str = str(val)
if key != "REQUEST_BODY_CONTENT":
val_str = urllib.parse.quote(val_str, encoding='utf-8')
parts.append(f"{key}={val_str}")
param = "&".join(parts)
param = "".join(param.split())
print("参数明文 param:", param)
sign = calculate_md5(param, secret_key)
print("生成签名 sign:", sign)
return sign
# =================== Python 主程序 ===================
if __name__ == '__main__':
# 选择 dev / prod
env = "dev"
if len(sys.argv) > 1:
env = sys.argv[1]
print(f"\n=== 当前环境:{env} ===\n")
if env not in CONFIG:
print("❗ 错误:请使用 python sign.py dev 或 python sign.py prod")
sys.exit(1)
cfg = CONFIG[env]
systemCode = cfg["systemCode"]
appId = cfg["appId"]
secret_key = cfg["secret_key"]
platform_ifc_system_code = cfg["platform_ifc_system_code"]
ifcUrl = cfg["ifcUrl"]
ts = str(int(time.time() * 1000))
randomString = uuid.uuid4().hex
selector = {
"searchKeys": ["9cb864213c6f48ceaf90e98e7ca375e9","3DC1B33E1B5B431E99FA163BF9E86E6A","13336","b353614a47e2425a8a8885d270267407"],
"userType": "1",
"hasCascade": True
}
request_body_json = json.dumps(selector, separators=(',', ':'))
param_map = {
"systemCode": systemCode,
"timestamp": ts,
"nonce": randomString,
"REQUEST_BODY_CONTENT": request_body_json
}
sign = create_sign(param_map, secret_key)
apiPath = "/userListByDeptSearch"
fullUrl = f"{ifcUrl}/{platform_ifc_system_code}{apiPath}"
curl = f"""
curl -X POST "{fullUrl}" \\
-H "Content-Type: application/json" \\
-H "systemCode: {systemCode}" \\
-H "timestamp: {ts}" \\
-H "nonce: {randomString}" \\
-H "sign: {sign}" \\
-H "App-Id: {appId}" \\
-d '{request_body_json}'
"""
print("\n===== 最终 curl 请求 =====\n")
print(curl)

59
recive_wits.py Normal file
View File

@@ -0,0 +1,59 @@
import socket
import time
HOST = "192.168.1.41"
PORT = 9928
def connect():
"""建立 TCP 连接(带重试)"""
while True:
try:
print(f"Connecting to {HOST}:{PORT} ...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
s.settimeout(5)
print("Connected successfully!")
return s
except Exception as e:
print(f"Connection failed: {e}, retrying in 3s...")
time.sleep(3)
def receive_wits_data(sock):
"""持续接收 WITS 数据(自动处理黏包/拆包)"""
buffer = ""
while True:
try:
data = sock.recv(4096)
# 服务器关闭
if not data:
print("Server closed connection.")
return False
buffer += data.decode(errors="ignore")
# WITS 多为 \r\n 分隔
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
print("Received:", line)
except socket.timeout:
# 正常情况,继续接收即可
continue
except Exception as e:
print("Error:", e)
return False
if __name__ == "__main__":
while True:
sock = connect()
ok = receive_wits_data(sock)
sock.close()
print("Reconnecting in 3 seconds...")
time.sleep(3)

61
send_wtis.py Normal file
View File

@@ -0,0 +1,61 @@
import socket
import random
import time
HOST = "192.168.1.5" # 目标地址
PORT = 9929 # 目标端口
# 你给的示例里出现的所有前四位字段
WITS_CODES = [
"0105",
"0106",
"0108",
"0112",
"0114",
"0116",
"0118",
"0120",
"0121",
"0122"
]
def random_value(prefix):
"""
生成类似你收到的数据:
- 有些是整数:例如 0105 251114
- 有些是浮点:例如 0108 37.26745
"""
# 随机决定生成整数 or 小数
if random.random() < 0.3:
# 生成整数6位左右
value = str(random.randint(100000, 999999))
else:
# 生成浮点保留4~5位小数
value = f"{random.uniform(0, 500):.5f}"
return prefix + value
def send_wits_data():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
print("Connected to target. Sending WITS data...")
try:
while True:
for code in WITS_CODES:
msg = random_value(code)
sock.sendall((msg + "\r\n").encode())
print("Sent:", msg)
time.sleep(0.2) # 每条间隔 200ms可根据需要调整
except Exception as e:
print("Error:", e)
finally:
sock.close()
if __name__ == "__main__":
send_wits_data()

66
simulate_monty_hall.py Normal file
View File

@@ -0,0 +1,66 @@
import random
def simulate_monty_hall(switch, num_trials=100000):
"""
模拟三门悖论(蒙提霍尔问题)
参数:
- switch: bool, 是否在主持人打开门后选择换门
- num_trials: int, 模拟次数默认10万次
返回:
- 获胜次数
- 胜率
"""
win_count = 0
doors = [1, 2, 3] # 三扇门
for _ in range(num_trials):
# 随机放置汽车1 表示汽车0 表示山羊)
car_door = random.choice(doors)
# 参赛者初始选择
player_choice = random.choice(doors)
# 主持人要打开一扇门:不能是参赛者选的,也不能是有车的
remaining_doors = [d for d in doors if d != player_choice and d != car_door]
host_opens = random.choice(remaining_doors)
# 换门逻辑
if switch:
# 换到剩下的那一扇未被选、未被开的门
final_choice = [d for d in doors if d != player_choice and d != host_opens][0]
else:
# 不换,保持原选择
final_choice = player_choice
# 判断是否获胜
if final_choice == car_door:
win_count += 1
win_rate = win_count / num_trials
return win_count, win_rate
# === 运行模拟 ===
if __name__ == "__main__":
num_trials = 100000
print(f"模拟 {num_trials} 次三门问题:\n")
# 情况1坚持不换门
wins_stay, rate_stay = simulate_monty_hall(switch=False, num_trials=num_trials)
print(f"坚持原选择(不换门):")
print(f" 获胜次数: {wins_stay}")
print(f" 胜率: {rate_stay:.4f} ({rate_stay * 100:.2f}%)\n")
# 情况2总是换门
wins_switch, rate_switch = simulate_monty_hall(switch=True, num_trials=num_trials)
print(f"总是换门:")
print(f" 获胜次数: {wins_switch}")
print(f" 胜率: {rate_switch:.4f} ({rate_switch * 100:.2f}%)\n")
print("理论值对比:")
print(" 不换门胜率: 1/3 ≈ 33.33%")
print(" 换门胜率: 2/3 ≈ 66.67%")

66
snowflake_generator.py Normal file
View File

@@ -0,0 +1,66 @@
import time
import threading
EPOCH_JAVA_COMMON = 1288834974657
class Snowflake:
def __init__(self, datacenter_id: int = 0, worker_id: int = 0, epoch: int = 1480166465631):
# 机器和数据中心配置
self.worker_id_bits = 5
self.datacenter_id_bits = 5
self.sequence_bits = 12
self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
if worker_id > self.max_worker_id or worker_id < 0:
raise ValueError(f"worker_id 超出范围 (0 ~ {self.max_worker_id})")
if datacenter_id > self.max_datacenter_id or datacenter_id < 0:
raise ValueError(f"datacenter_id 超出范围 (0 ~ {self.max_datacenter_id})")
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.epoch = epoch
self.sequence = 0
self.last_timestamp = -1
self.worker_id_shift = self.sequence_bits
self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
self.sequence_mask = -1 ^ (-1 << self.sequence_bits)
self.lock = threading.Lock()
def _timestamp(self):
return int(time.time() * 1000)
def _til_next_millis(self, last_timestamp):
timestamp = self._timestamp()
while timestamp <= last_timestamp:
timestamp = self._timestamp()
return timestamp
def next_id(self) -> int:
with self.lock:
timestamp = self._timestamp()
if timestamp < self.last_timestamp:
raise Exception("时钟回拨拒绝生成ID")
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.sequence_mask
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
return ((timestamp - self.epoch) << self.timestamp_left_shift) | \
(self.datacenter_id << self.datacenter_id_shift) | \
(self.worker_id << self.worker_id_shift) | \
self.sequence
if __name__ == "__main__":
snowflake = Snowflake(datacenter_id=0, worker_id=0, epoch=EPOCH_JAVA_COMMON)
for _ in range(10):
print(str(snowflake.next_id()))

33
testGeneratefile.py Normal file
View File

@@ -0,0 +1,33 @@
import os
import random
# 输出目录
output_dir = "./output_files"
os.makedirs(output_dir, exist_ok=True)
# 配置
file_count = 40
min_size_mb = 70
max_size_mb = 80
chunk_size = 1024 * 1024 # 每次写入 1MB
def generate_random_file(file_path, size_bytes):
with open(file_path, "wb") as f:
written = 0
while written < size_bytes:
chunk = os.urandom(min(chunk_size, size_bytes - written))
f.write(chunk)
written += len(chunk)
for i in range(1, file_count + 1):
file_size_mb = random.randint(min_size_mb, max_size_mb)
file_size_bytes = file_size_mb * 1024 * 1024
filename = f"file_{i}.bin"
filepath = os.path.join(output_dir, filename)
print(f"Creating {filename} ({file_size_mb} MB)...")
generate_random_file(filepath, file_size_bytes)
print("✅ 所有文件生成完毕。")

20
test_websocket.py Normal file
View File

@@ -0,0 +1,20 @@
import ssl
import websocket
# URL = "ws://192.168.1.202:9100/well-tool-test-system/ws/"
URL = "wss://192.168.1.87/ws/"
ws = websocket.WebSocketApp(
URL,
on_open=lambda ws: print("连接成功"),
on_message=lambda ws, msg: print("消息:", msg),
on_error=lambda ws, err: print("错误:", err),
on_close=lambda ws, code, reason: print("关闭:", code, reason),
)
ws.run_forever(
sslopt={
"cert_reqs": ssl.CERT_NONE,
"check_hostname": False,
}
)

352
test_whatsapp.py Normal file
View File

@@ -0,0 +1,352 @@
import os, json, hmac, base64, hashlib, time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# ------------------------- Utils -------------------------
def b64e(b: bytes) -> str:
return base64.b64encode(b).decode()
def b64d(s: str) -> bytes:
return base64.b64decode(s.encode())
def hkdf(ikm: bytes, info: bytes, length: int = 32) -> bytes:
return HKDF(algorithm=hashes.SHA256(), length=length, salt=None, info=info).derive(ikm)
def aes_cbc_enc(key: bytes, iv: bytes, pt: bytes) -> bytes:
# PKCS#7 padding
pad = 16 - (len(pt) % 16)
pt = pt + bytes([pad])*pad
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
return cipher.encryptor().update(pt) + cipher.encryptor().finalize()
def aes_cbc_dec(key: bytes, iv: bytes, ct: bytes) -> bytes:
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
pt = cipher.decryptor().update(ct) + cipher.decryptor().finalize()
pad = pt[-1]
if pad < 1 or pad > 16 or pt[-pad:] != bytes([pad])*pad:
raise ValueError("Bad padding")
return pt[:-pad]
def hmac_sha256(key: bytes, data: bytes) -> bytes:
return hmac.new(key, data, hashlib.sha256).digest()
def x25519_shared(sk: X25519PrivateKey, pk: X25519PublicKey) -> bytes:
return sk.exchange(pk)
def pub_bytes(pk: X25519PublicKey) -> bytes:
return pk.public_bytes(encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw)
def priv_from_bytes(b: bytes) -> X25519PrivateKey:
return X25519PrivateKey.from_private_bytes(b)
def pub_from_bytes(b: bytes) -> X25519PublicKey:
return X25519PublicKey.from_public_bytes(b)
# ------------------ Data structures ----------------------
@dataclass
class PublicBundle:
identity_pub: str
signed_prekey_pub: str
signed_prekey_sig: str # 演示保留字段,未做真实签名校验
onetime_prekeys: Dict[str, str] # id -> pub
@dataclass
class SessionState:
# 简化:只做单向发送链(对称 ratchet 计数)
root_key: bytes
chain_key: bytes
counter: int
peer_identity_pub_b64: str
opk_id: Optional[str] # 用了哪个一次性预键(初始化时)
established_at: float
# ------------------------- Server ------------------------
class RelayServer:
"""
极简“服务器”:存放用户公开密钥束,提供查询;转发密文帧。
不保存/解密消息正文。
"""
def __init__(self):
self.directory: Dict[str, PublicBundle] = {}
self.mailbox: Dict[str, List[dict]] = {}
def register_bundle(self, user: str, bundle: PublicBundle):
self.directory[user] = bundle
def fetch_bundle(self, user: str) -> Optional[PublicBundle]:
return self.directory.get(user)
def mark_onetime_used(self, user: str, opk_id: str):
b = self.directory.get(user)
if not b: return
if opk_id in b.onetime_prekeys:
del b.onetime_prekeys[opk_id]
def send_frame(self, to_user: str, frame: dict):
self.mailbox.setdefault(to_user, []).append(frame)
def pull_frames(self, user: str) -> List[dict]:
return self.mailbox.pop(user, [])
# ------------------------- Client ------------------------
class Client:
def __init__(self, name: str, server: RelayServer):
self.name = name
self.server = server
# 长期身份密钥
self.ik_priv = X25519PrivateKey.generate()
self.ik_pub = self.ik_priv.public_key()
# 已签名预共享密钥(演示:不做真实签名)
self.spk_priv = X25519PrivateKey.generate()
self.spk_pub = self.spk_priv.public_key()
self.spk_sig = os.urandom(64) # 占位
# 一次性预共享密钥池
self.opk_priv_map: Dict[str, X25519PrivateKey] = {}
self.opk_pub_map: Dict[str, X25519PublicKey] = {}
for i in range(5):
sk = X25519PrivateKey.generate()
pk = sk.public_key()
opk_id = f"opk-{i}"
self.opk_priv_map[opk_id] = sk
self.opk_pub_map[opk_id] = pk
# 会话(按对端用户)
self.sessions: Dict[str, SessionState] = {}
def publish(self):
bundle = PublicBundle(
identity_pub=b64e(pub_bytes(self.ik_pub)),
signed_prekey_pub=b64e(pub_bytes(self.spk_pub)),
signed_prekey_sig=b64e(self.spk_sig),
onetime_prekeys={oid: b64e(pub_bytes(pk)) for oid, pk in self.opk_pub_map.items()}
)
self.server.register_bundle(self.name, bundle)
# ---------- 建会话(发起方)简化 X3DH ----------
def establish_session_as_initiator(self, peer: str) -> SessionState:
pb = self.server.fetch_bundle(peer)
assert pb, f"{peer} has no bundle"
IKb = pub_from_bytes(b64d(pb.identity_pub))
SPKb = pub_from_bytes(b64d(pb.signed_prekey_pub))
# 选一个对方的一次性预键
opk_items = list(pb.onetime_prekeys.items())
opk_id, OPKb_b64 = opk_items[0] if opk_items else (None, None)
OPKb = pub_from_bytes(b64d(OPKb_b64)) if OPKb_b64 else None
# 发起者生成临时密钥对
EKa_priv = X25519PrivateKey.generate()
EKa_pub = EKa_priv.public_key()
# X3DH 4次 ECDH无OPK则忽略第4项
s1 = x25519_shared(self.ik_priv, SPKb)
s2 = x25519_shared(EKa_priv, IKb)
s3 = x25519_shared(EKa_priv, SPKb)
parts = [s1, s2, s3]
if OPKb:
s4 = x25519_shared(EKa_priv, OPKb)
parts.append(s4)
master_secret = b"".join(parts)
root_key = hkdf(master_secret, info=b"ROOT", length=32)
chain_key = hkdf(root_key, info=b"CHAIN", length=32)
st = SessionState(
root_key=root_key,
chain_key=chain_key,
counter=0,
peer_identity_pub_b64=pb.identity_pub,
opk_id=opk_id,
established_at=time.time()
)
self.sessions[peer] = st
# 发送“建会话 + 第一条消息”的头信息(包含 EKa_pub、opk_id
self._pending_ephemeral_pub = b64e(pub_bytes(EKa_pub))
self._pending_opk_id = opk_id
return st
# ---------- 建会话(接收方) ----------
def _establish_session_as_responder(self, peer: str, ek_pub_b64: str, opk_id: Optional[str]) -> SessionState:
EKa = pub_from_bytes(b64d(ek_pub_b64)) # 对方临时公钥
# 自己的密钥
IKb_priv = self.ik_priv
SPKb_priv = self.spk_priv
OPKb_priv = self.opk_priv_map.get(opk_id) if opk_id else None
# 4次 ECDH无OPK则忽略第4项
s1 = x25519_shared(SPKb_priv, pub_from_bytes(b64d(self.server.directory[peer].identity_pub))) # = ECDH(Ia, SPKb)
# 注意:发起方 s1 是 ECDH(Ia, SPKb),接收方等价项应是 ECDH(SPKb, Ia)
# 但我们没有 Ia 私钥,这里换一种对称表达:按消息头与本地密钥构造相同串联
# 为确保与发起方一致,我们直接重算:
# 对于接收方ECDH(Iinitiator, Srecipient) == ECDH(Srecipient, Iinitiator)
# 需要 Iinitiator 公钥:来自会话第一帧中?为简化,我们用目录中对方 identity_pub。
Ia_pub = pub_from_bytes(b64d(self.server.directory[peer].identity_pub))
s1 = x25519_shared(self.spk_priv, Ia_pub)
s2 = x25519_shared(self.ik_priv, EKa)
s3 = x25519_shared(self.spk_priv, EKa)
parts = [s1, s2, s3]
if OPKb_priv:
s4 = x25519_shared(OPKb_priv, EKa)
parts.append(s4)
master_secret = b"".join(parts)
root_key = hkdf(master_secret, info=b"ROOT", length=32)
chain_key = hkdf(root_key, info=b"CHAIN", length=32)
st = SessionState(
root_key=root_key,
chain_key=chain_key,
counter=0,
peer_identity_pub_b64=self.server.directory[peer].identity_pub,
opk_id=opk_id,
established_at=time.time()
)
self.sessions[peer] = st
# 一次性预键被使用后,服务端目录也标记删除
if opk_id:
self.server.mark_onetime_used(self.name, opk_id)
return st
# ---------- 每条消息的派生与加密 ----------
def _derive_message_key(self, st: SessionState) -> Tuple[bytes, bytes, bytes]:
"""
从 chain_key 派生本条消息的 (aes_key, hmac_key, iv),然后更新 chain_keycounter+1
"""
info = b"MSG|" + st.counter.to_bytes(8, "big")
msg_key = hkdf(st.chain_key, info=info, length=80) # 32 AES + 32 HMAC + 16 IV
aes_key = msg_key[:32]
mac_key = msg_key[32:64]
iv = msg_key[64:80]
# 下一条链
st.chain_key = hkdf(st.chain_key, info=b"STEP", length=32)
st.counter += 1
return aes_key, mac_key, iv
# ---------- 发送 ----------
def send(self, to_user: str, plaintext: bytes):
# 若没有会话,先建立
if to_user not in self.sessions:
st = self.establish_session_as_initiator(to_user)
ek_pub_b64 = self._pending_ephemeral_pub
opk_id = self._pending_opk_id
else:
st = self.sessions[to_user]
ek_pub_b64 = None
opk_id = None
aes_key, mac_key, iv = self._derive_message_key(st)
ct = aes_cbc_enc(aes_key, iv, plaintext)
mac = hmac_sha256(mac_key, iv + ct)[:8]
frame = {
"from": self.name,
"to": to_user,
"hdr": {
"init": ek_pub_b64 is not None,
"ek_pub_b64": ek_pub_b64, # 仅首次带上
"opk_id": opk_id,
"counter": st.counter - 1, # 本条的计数
},
"body": {
"iv": b64e(iv),
"ct": b64e(ct),
"mac": b64e(mac),
}
}
self.server.send_frame(to_user, frame)
# ---------- 接收 ----------
def receive_all(self) -> List[Tuple[str, bytes]]:
frames = self.server.pull_frames(self.name)
outputs = []
for f in frames:
sender = f["from"]
hdr = f["hdr"]
body = f["body"]
if sender not in self.sessions:
# 首帧:建立被动会话
assert hdr["init"], "Missing init header"
st = self._establish_session_as_responder(
peer=sender,
ek_pub_b64=hdr["ek_pub_b64"],
opk_id=hdr["opk_id"]
)
else:
st = self.sessions[sender]
# 按对方 counter 对齐(演示:假设顺序到达)
aes_key, mac_key, iv = self._derive_message_key(st)
if st.counter - 1 != hdr["counter"]:
# 简化:严格顺序,真实实现需支持跳号、乱序恢复
raise ValueError("Out-of-order message (demo limitation)")
if b64e(iv) != body["iv"]:
# 教学演示:我们强制使用派生的 iv
iv = b64d(body["iv"]) # 宽松一点也可以直接信任对端 iv
ct = b64d(body["ct"])
mac = b64d(body["mac"])
calc = hmac_sha256(mac_key, iv + ct)[:8]
if not hmac.compare_digest(mac, calc):
raise ValueError("MAC verification failed")
pt = aes_cbc_dec(aes_key, iv, ct)
outputs.append((sender, pt))
return outputs
# ------------------------- Demo --------------------------
def main():
server = RelayServer()
alice = Client("alice", server)
bob = Client("bob", server)
# 发布各自的公钥包
alice.publish()
bob.publish()
# Alice 先发一条(会自动建会话)
alice.send("bob", b"Hello Bob, this is Alice.")
# Bob 拉取并解密
for frm in bob.receive_all():
print("[bob] got:", frm)
# Bob 回复
bob.send("alice", b"Hi Alice, Bob here. Message received.")
for frm in alice.receive_all():
print("[alice] got:", frm)
# 连发多条观察每条都换密钥server 无法解密)
for i in range(1, 4):
alice.send("bob", f"Msg#{i} from Alice".encode())
for frm in bob.receive_all():
print("[bob] got:", frm)
# 验证“服务器看不到明文”
print("\n[server] directory keys (truncated):")
print(json.dumps({
u: {
"identity_pub": v.identity_pub[:24] + "...",
"spk_pub": v.signed_prekey_pub[:24] + "...",
"opk_count": len(v.onetime_prekeys)
} for u, v in server.directory.items()
}, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,101 @@
import requests
import json
from Crypto.Cipher import DES
from Crypto.Util.Padding import pad
import base64
# 全局变量,用于存储从登录响应中获取的 key 和 iv
DYNAMIC_KEY = None
DYNAMIC_IV = None
def get_des_encrypt(data: str, _key: str, _iv: str) -> str:
"""
使用 DES/CBC/PKCS7 加密字符串。
"""
key = _key.encode('utf-8')
iv = _iv.encode('utf-8')
if len(key) != 8 or len(iv) != 8:
raise ValueError("DES key and IV must be exactly 8 bytes")
plaintext = data.encode('utf-8')
padded_data = pad(plaintext, DES.block_size)
cipher = DES.new(key, DES.MODE_CBC, iv)
encrypted_bytes = cipher.encrypt(padded_data)
return base64.b64encode(encrypted_bytes).decode('utf-8')
def login_and_get_dynamic_key_iv(url, login_data):
"""
发送未加密登录请求,成功后提取 key 和 iv。
返回 (token, key, iv) 或 (None, None, None)
"""
payload = {
"encType": 0,
"x_flag": "",
"data": login_data
}
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json=payload)
print("🔓 [登录] 状态码:", response.status_code)
result = response.json()
print("🔓 [登录] 响应:", json.dumps(result, indent=2, ensure_ascii=False))
data = result.get("data", {})
token = data.get("token")
key = data.get("key")
iv = data.get("iv")
if token and key and iv:
print(f"✅ 登录成功Token: {token}")
print(f"🔑 动态 Key: {key}, IV: {iv}")
return token, key, iv
else:
print("❌ 登录成功但缺少 key 或 iv")
return None, None, None
except Exception as e:
print("❌ 登录异常:", e)
return None, None, None
def send_encrypted_request_with_dynamic_key(url, data, key, iv):
"""
使用动态 key/iv 加密并发送请求。
"""
data_str = json.dumps(data, separators=(',', ':'))
encrypted_data = get_des_encrypt(data_str, key, iv)
payload = {
"encType": 0,
"x_flag": "",
"data": encrypted_data
}
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json=payload)
print("🔒 [加密请求] 状态码:", response.status_code)
print("🔒 [加密请求] 响应:", response.text)
result = response.json()
token = result.get("data", {}).get("token")
if token:
print("✅ 加密请求成功Token:", token)
else:
print("⚠️ 加密请求完成,但无新 token")
except Exception as e:
print("❌ 加密请求异常:", e)
if __name__ == '__main__':
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
login_data = {
"userId": "test002",
"password": "123456"
}
token, key, iv = login_and_get_dynamic_key_iv(url, login_data)
if not (key and iv):
print("🛑 无法获取动态 key/iv退出。")
exit(1)