Compare commits

..

10 Commits

Author SHA1 Message Date
c1fa662552 feat(sign): 支持多环境配置和签名生成优化
- 新增开发环境和生产环境的配置管理
- 移除参数中的空格替换逻辑
- 更新默认搜索键和用户类型
- 支持通过命令行参数切换运行环境
- 自动构建完整请求URL用于curl测试
- 添加环境选择提示和错误处理机制
2025-11-25 11:06:04 +08:00
bab29150ab fix(generateSign): 更新系统配置和搜索键值
- 修改 systemCode 和 appId 为新的标识符
- 更新 secret_key 以增强安全性
- 扩展 searchKeys 列表,增加多个新的搜索键值
- 调整 searchKeys 数组格式以提高可读性
2025-11-24 16:01:44 +08:00
8c4e48f0ef feat(utils): 添加MD5签名生成工具
- 实现了基于Java规则的MD5签名计算方法
- 支持参数映射排序与URL编码处理
- 添加了空格去除和特殊字符处理逻辑
- 集成了时间戳、随机字符串生成功能
- 提供完整的curl请求示例输出
- 支持复杂对象序列化为JSON请求体
- 实现了多层级参数拼接与签名验证
2025-11-24 11:12:25 +08:00
167d96b6ba feat(wits): 添加WITS数据模拟发送脚本- 实现随机生成WITS协议数据功能
- 支持整数和浮点数两种数据格式
- 配置目标主机和端口进行TCP连接
- 循环发送预定义的WITS字段码
- 添加发送间隔控制和异常处理机制- 提供命令行入口直接运行脚本
2025-11-24 10:23:46 +08:00
123dfc4411 feat(wits): 添加WITS数据模拟发送脚本- 实现随机生成WITS协议数据功能
- 支持整数和浮点数两种数据格式
- 配置目标主机和端口进行TCP连接
- 循环发送预定义的WITS字段码
- 添加发送间隔控制和异常处理机制- 提供命令行入口直接运行脚本
2025-11-17 09:58:16 +08:00
ea2d12c74e feat(network): 添加 WITS 数据接收功能并更新 WebSocket 地址
- 新增 recive_wits.py 文件用于通过 TCP 接收 WITS 数据- 实现自动重连机制和数据黏包/拆包处理- 更新 test_websocket.py 中的 WebSocket 连接地址- 修改端口号以适配新的服务配置
2025-11-14 09:55:29 +08:00
9c6f5c3b63 feat(login): 实现动态密钥登录流程
- 移除默认静态密钥配置,改为从登录响应动态获取- 新增 login_and_get_dynamic_key_iv 函数处理未加密登录并提取密钥- 修改 get_des_encrypt 函数为必需传入密钥参数
- 更新 send_encrypted_request_with_dynamic_key 使用动态密钥发送加密请求
- 调整主程序逻辑,先执行未加密登录获取密钥再进行加密请求
- 修改测试账号为 test002 并移除手动切换加密模式的选项
2025-10-27 11:19:37 +08:00
3489709697 feat(login): 支持发送加密和未加密登录请求
- 添加 send_unencrypted_request 函数用于发送未加密请求
- 添加 send_encrypted_request 函数用于发送加密请求- 重构主流程,通过 USE_ENCRYPTION 开关选择请求类型
- 优化日志输出,区分加密与未加密请求状态
- 统一异常处理逻辑,增强代码健壮性
- 保留原有加密逻辑并整合到新结构中
2025-10-27 11:05:59 +08:00
3ab5f15b0e feat(login): 实现DES加密登录功能- 添加 DES/CBC/PKCS7 加密函数,兼容 CryptoJS.DES.encrypt
- 使用默认密钥和 IV 对登录数据进行加密
- 修改请求 payload 结构,将加密后的字符串作为 data 字段值
- 更新接口地址并调整请求逻辑
- 增强错误处理与调试信息打印- 优化响应解析方式,正确提取 token 信息
2025-10-21 13:28:35 +08:00
5fa68924e3 feat(websocket): 更新WebSocket连接地址并添加认证令牌
- 将连接地址从 wss://192.168.1.3/ws/ 更改为 ws://192.168.1.41:9516/ws/- 添加了用于身份验证的 Bearer Token
- 启用 WebSocket 调试追踪功能
2025-10-15 11:15:58 +08:00
6 changed files with 417 additions and 57 deletions

124
generateSign.py Normal file
View File

@@ -0,0 +1,124 @@
import json
import time
import uuid
import urllib.parse
import hashlib
import sys
# =================== 运行环境配置 ===================
CONFIG = {
"dev": {
"platform_ifc_system_code": "ef5b17caff6e4da19d6af82d539e894d",
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
"appId": "57395d2bc668496c9c57d8f2b19bd516",
"secret_key": "KMFHKo1Uzrl&MWXorbQIT&C$Qea$uQOY",
"ifcUrl": "http://192.168.1.202:8083/dev-api/cenertech-interface-center/IFC2"
},
"prod": {
"platform_ifc_system_code": "57395d2bc668496c9c57d8f2b19bd516",
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
"appId": "57395d2bc668496c9c57d8f2b19bd516",
"secret_key": "opS=K9Parlf&p+JxBOQD2q+zNZa+uXEE",
"ifcUrl": "https://dpc.cet.cnooc/prod-api/cenertech-interface-center/IFC2"
}
}
# =================== MD5签名计算 ===================
def calculate_md5(param, secret_key):
return hashlib.md5((secret_key + param).encode('utf-8')).hexdigest()
def create_sign(param_map, secret_key):
sorted_keys = sorted(param_map.keys())
parts = []
for key in sorted_keys:
val = param_map[key]
if val is None:
parts.append(f"{key}=")
continue
if isinstance(val, list):
for item in val:
encoded_val = urllib.parse.quote(str(item), encoding='utf-8')
parts.append(f"{key}={encoded_val}")
continue
val_str = str(val)
if key != "REQUEST_BODY_CONTENT":
val_str = urllib.parse.quote(val_str, encoding='utf-8')
parts.append(f"{key}={val_str}")
param = "&".join(parts)
param = "".join(param.split())
print("参数明文 param:", param)
sign = calculate_md5(param, secret_key)
print("生成签名 sign:", sign)
return sign
# =================== Python 主程序 ===================
if __name__ == '__main__':
# 选择 dev / prod
env = "dev"
if len(sys.argv) > 1:
env = sys.argv[1]
print(f"\n=== 当前环境:{env} ===\n")
if env not in CONFIG:
print("❗ 错误:请使用 python sign.py dev 或 python sign.py prod")
sys.exit(1)
cfg = CONFIG[env]
systemCode = cfg["systemCode"]
appId = cfg["appId"]
secret_key = cfg["secret_key"]
platform_ifc_system_code = cfg["platform_ifc_system_code"]
ifcUrl = cfg["ifcUrl"]
ts = str(int(time.time() * 1000))
randomString = uuid.uuid4().hex
selector = {
"searchKeys": ["9cb864213c6f48ceaf90e98e7ca375e9","3DC1B33E1B5B431E99FA163BF9E86E6A","13336","b353614a47e2425a8a8885d270267407"],
"userType": "1",
"hasCascade": True
}
request_body_json = json.dumps(selector, separators=(',', ':'))
param_map = {
"systemCode": systemCode,
"timestamp": ts,
"nonce": randomString,
"REQUEST_BODY_CONTENT": request_body_json
}
sign = create_sign(param_map, secret_key)
apiPath = "/userListByDeptSearch"
fullUrl = f"{ifcUrl}/{platform_ifc_system_code}{apiPath}"
curl = f"""
curl -X POST "{fullUrl}" \\
-H "Content-Type: application/json" \\
-H "systemCode: {systemCode}" \\
-H "timestamp: {ts}" \\
-H "nonce: {randomString}" \\
-H "sign: {sign}" \\
-H "App-Id: {appId}" \\
-d '{request_body_json}'
"""
print("\n===== 最终 curl 请求 =====\n")
print(curl)

59
recive_wits.py Normal file
View File

@@ -0,0 +1,59 @@
import socket
import time
HOST = "192.168.1.41"
PORT = 9928
def connect():
"""建立 TCP 连接(带重试)"""
while True:
try:
print(f"Connecting to {HOST}:{PORT} ...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
s.settimeout(5)
print("Connected successfully!")
return s
except Exception as e:
print(f"Connection failed: {e}, retrying in 3s...")
time.sleep(3)
def receive_wits_data(sock):
"""持续接收 WITS 数据(自动处理黏包/拆包)"""
buffer = ""
while True:
try:
data = sock.recv(4096)
# 服务器关闭
if not data:
print("Server closed connection.")
return False
buffer += data.decode(errors="ignore")
# WITS 多为 \r\n 分隔
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
print("Received:", line)
except socket.timeout:
# 正常情况,继续接收即可
continue
except Exception as e:
print("Error:", e)
return False
if __name__ == "__main__":
while True:
sock = connect()
ok = receive_wits_data(sock)
sock.close()
print("Reconnecting in 3 seconds...")
time.sleep(3)

61
send_wtis.py Normal file
View File

@@ -0,0 +1,61 @@
import socket
import random
import time
HOST = "192.168.1.5" # 目标地址
PORT = 9929 # 目标端口
# 你给的示例里出现的所有前四位字段
WITS_CODES = [
"0105",
"0106",
"0108",
"0112",
"0114",
"0116",
"0118",
"0120",
"0121",
"0122"
]
def random_value(prefix):
"""
生成类似你收到的数据:
- 有些是整数:例如 0105 251114
- 有些是浮点:例如 0108 37.26745
"""
# 随机决定生成整数 or 小数
if random.random() < 0.3:
# 生成整数6位左右
value = str(random.randint(100000, 999999))
else:
# 生成浮点保留4~5位小数
value = f"{random.uniform(0, 500):.5f}"
return prefix + value
def send_wits_data():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
print("Connected to target. Sending WITS data...")
try:
while True:
for code in WITS_CODES:
msg = random_value(code)
sock.sendall((msg + "\r\n").encode())
print("Sent:", msg)
time.sleep(0.2) # 每条间隔 200ms可根据需要调整
except Exception as e:
print("Error:", e)
finally:
sock.close()
if __name__ == "__main__":
send_wits_data()

66
snowflake_generator.py Normal file
View File

@@ -0,0 +1,66 @@
import time
import threading
EPOCH_JAVA_COMMON = 1288834974657
class Snowflake:
def __init__(self, datacenter_id: int = 0, worker_id: int = 0, epoch: int = 1480166465631):
# 机器和数据中心配置
self.worker_id_bits = 5
self.datacenter_id_bits = 5
self.sequence_bits = 12
self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
if worker_id > self.max_worker_id or worker_id < 0:
raise ValueError(f"worker_id 超出范围 (0 ~ {self.max_worker_id})")
if datacenter_id > self.max_datacenter_id or datacenter_id < 0:
raise ValueError(f"datacenter_id 超出范围 (0 ~ {self.max_datacenter_id})")
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.epoch = epoch
self.sequence = 0
self.last_timestamp = -1
self.worker_id_shift = self.sequence_bits
self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
self.sequence_mask = -1 ^ (-1 << self.sequence_bits)
self.lock = threading.Lock()
def _timestamp(self):
return int(time.time() * 1000)
def _til_next_millis(self, last_timestamp):
timestamp = self._timestamp()
while timestamp <= last_timestamp:
timestamp = self._timestamp()
return timestamp
def next_id(self) -> int:
with self.lock:
timestamp = self._timestamp()
if timestamp < self.last_timestamp:
raise Exception("时钟回拨拒绝生成ID")
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.sequence_mask
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
return ((timestamp - self.epoch) << self.timestamp_left_shift) | \
(self.datacenter_id << self.datacenter_id_shift) | \
(self.worker_id << self.worker_id_shift) | \
self.sequence
if __name__ == "__main__":
snowflake = Snowflake(datacenter_id=0, worker_id=0, epoch=EPOCH_JAVA_COMMON)
for _ in range(10):
print(str(snowflake.next_id()))

View File

@@ -1,28 +1,20 @@
import ssl
import websocket
URL = "ws://192.168.1.41:9516/ws/" # 注意走 443不要再连 8080 了
# 如果你的 WS 路径是 /ws/,就写成上面这样;若是别的路径自己改
# URL = "ws://192.168.1.202:9100/well-tool-test-system/ws/"
def on_message(ws, msg): print("收到:", msg)
def on_error(ws, err): print("错误:", err)
def on_close(ws, code, reason): print("关闭:", code, reason)
def on_open(ws):
print("连接成功")
ws.send("hello server mac")
URL = "wss://192.168.1.87/ws/"
ws = websocket.WebSocketApp(
URL,
on_open=lambda ws: print("连接成功"),
on_message=lambda ws, msg: print("消息:", msg),
on_error=lambda ws, err: print("错误:", err),
on_close=lambda ws, code, reason: print("关闭:", code, reason),
)
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp(
URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
# header=["Origin: https://192.168.1.3"] # 如后端不校验 Origin 可删
header=[] # 如后端不校验 Origin 可删
)
ws.run_forever(sslopt={
ws.run_forever(
sslopt={
"cert_reqs": ssl.CERT_NONE,
"check_hostname": False,
})
}
)

View File

@@ -1,43 +1,101 @@
import requests
import json
from Crypto.Cipher import DES
from Crypto.Util.Padding import pad
import base64
# 接口地址
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
# 全局变量,用于存储从登录响应中获取的 key 和 iv
DYNAMIC_KEY = None
DYNAMIC_IV = None
# 请求体
payload = {
"encType": 0,
"x_flag": "",
"data": {
"userId": "admin",
def get_des_encrypt(data: str, _key: str, _iv: str) -> str:
"""
使用 DES/CBC/PKCS7 加密字符串。
"""
key = _key.encode('utf-8')
iv = _iv.encode('utf-8')
if len(key) != 8 or len(iv) != 8:
raise ValueError("DES key and IV must be exactly 8 bytes")
plaintext = data.encode('utf-8')
padded_data = pad(plaintext, DES.block_size)
cipher = DES.new(key, DES.MODE_CBC, iv)
encrypted_bytes = cipher.encrypt(padded_data)
return base64.b64encode(encrypted_bytes).decode('utf-8')
def login_and_get_dynamic_key_iv(url, login_data):
"""
发送未加密登录请求,成功后提取 key 和 iv。
返回 (token, key, iv) 或 (None, None, None)
"""
payload = {
"encType": 0,
"x_flag": "",
"data": login_data
}
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json=payload)
print("🔓 [登录] 状态码:", response.status_code)
result = response.json()
print("🔓 [登录] 响应:", json.dumps(result, indent=2, ensure_ascii=False))
data = result.get("data", {})
token = data.get("token")
key = data.get("key")
iv = data.get("iv")
if token and key and iv:
print(f"✅ 登录成功Token: {token}")
print(f"🔑 动态 Key: {key}, IV: {iv}")
return token, key, iv
else:
print("❌ 登录成功但缺少 key 或 iv")
return None, None, None
except Exception as e:
print("❌ 登录异常:", e)
return None, None, None
def send_encrypted_request_with_dynamic_key(url, data, key, iv):
"""
使用动态 key/iv 加密并发送请求。
"""
data_str = json.dumps(data, separators=(',', ':'))
encrypted_data = get_des_encrypt(data_str, key, iv)
payload = {
"encType": 0,
"x_flag": "",
"data": encrypted_data
}
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json=payload)
print("🔒 [加密请求] 状态码:", response.status_code)
print("🔒 [加密请求] 响应:", response.text)
result = response.json()
token = result.get("data", {}).get("token")
if token:
print("✅ 加密请求成功Token:", token)
else:
print("⚠️ 加密请求完成,但无新 token")
except Exception as e:
print("❌ 加密请求异常:", e)
if __name__ == '__main__':
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
login_data = {
"userId": "test002",
"password": "123456"
}
}
token, key, iv = login_and_get_dynamic_key_iv(url, login_data)
# 请求头
headers = {
"Content-Type": "application/json"
}
try:
# 发送 POST 请求
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查状态码
response.raise_for_status()
# 转为 JSON
result = response.json()
# 从响应中解析 token
token = result.get("data", {}).get("token")
if token:
print("登录成功Token", token)
else:
print("未获取到 Token响应内容", result)
except requests.RequestException as e:
print("请求异常:", e)
except ValueError:
print("响应不是合法的 JSON 格式")
if not (key and iv):
print("🛑 无法获取动态 key/iv退出。")
exit(1)