Compare commits
12 Commits
ce607c5637
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| c1fa662552 | |||
| bab29150ab | |||
| 8c4e48f0ef | |||
| 167d96b6ba | |||
| 123dfc4411 | |||
| ea2d12c74e | |||
| 9c6f5c3b63 | |||
| 3489709697 | |||
| 3ab5f15b0e | |||
| 5fa68924e3 | |||
| 945ac31725 | |||
| 5b1ce354b9 |
124
generateSign.py
Normal file
124
generateSign.py
Normal file
@@ -0,0 +1,124 @@
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
import urllib.parse
|
||||
import hashlib
|
||||
import sys
|
||||
|
||||
|
||||
# =================== 运行环境配置 ===================
|
||||
CONFIG = {
|
||||
"dev": {
|
||||
"platform_ifc_system_code": "ef5b17caff6e4da19d6af82d539e894d",
|
||||
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
|
||||
"appId": "57395d2bc668496c9c57d8f2b19bd516",
|
||||
"secret_key": "KMFHKo1Uzrl&MWXorbQIT&C$Qea$uQOY",
|
||||
"ifcUrl": "http://192.168.1.202:8083/dev-api/cenertech-interface-center/IFC2"
|
||||
},
|
||||
"prod": {
|
||||
"platform_ifc_system_code": "57395d2bc668496c9c57d8f2b19bd516",
|
||||
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
|
||||
"appId": "57395d2bc668496c9c57d8f2b19bd516",
|
||||
"secret_key": "opS=K9Parlf&p+JxBOQD2q+zNZa+uXEE",
|
||||
"ifcUrl": "https://dpc.cet.cnooc/prod-api/cenertech-interface-center/IFC2"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# =================== MD5签名计算 ===================
|
||||
def calculate_md5(param, secret_key):
|
||||
return hashlib.md5((secret_key + param).encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def create_sign(param_map, secret_key):
|
||||
sorted_keys = sorted(param_map.keys())
|
||||
parts = []
|
||||
|
||||
for key in sorted_keys:
|
||||
val = param_map[key]
|
||||
|
||||
if val is None:
|
||||
parts.append(f"{key}=")
|
||||
continue
|
||||
|
||||
if isinstance(val, list):
|
||||
for item in val:
|
||||
encoded_val = urllib.parse.quote(str(item), encoding='utf-8')
|
||||
parts.append(f"{key}={encoded_val}")
|
||||
continue
|
||||
|
||||
val_str = str(val)
|
||||
|
||||
if key != "REQUEST_BODY_CONTENT":
|
||||
val_str = urllib.parse.quote(val_str, encoding='utf-8')
|
||||
|
||||
parts.append(f"{key}={val_str}")
|
||||
|
||||
param = "&".join(parts)
|
||||
param = "".join(param.split())
|
||||
|
||||
print("参数明文 param:", param)
|
||||
|
||||
sign = calculate_md5(param, secret_key)
|
||||
print("生成签名 sign:", sign)
|
||||
|
||||
return sign
|
||||
|
||||
|
||||
# =================== Python 主程序 ===================
|
||||
if __name__ == '__main__':
|
||||
|
||||
# 选择 dev / prod
|
||||
env = "dev"
|
||||
if len(sys.argv) > 1:
|
||||
env = sys.argv[1]
|
||||
print(f"\n=== 当前环境:{env} ===\n")
|
||||
|
||||
if env not in CONFIG:
|
||||
print("❗ 错误:请使用 python sign.py dev 或 python sign.py prod")
|
||||
sys.exit(1)
|
||||
|
||||
cfg = CONFIG[env]
|
||||
|
||||
systemCode = cfg["systemCode"]
|
||||
appId = cfg["appId"]
|
||||
secret_key = cfg["secret_key"]
|
||||
platform_ifc_system_code = cfg["platform_ifc_system_code"]
|
||||
ifcUrl = cfg["ifcUrl"]
|
||||
|
||||
ts = str(int(time.time() * 1000))
|
||||
randomString = uuid.uuid4().hex
|
||||
|
||||
selector = {
|
||||
"searchKeys": ["9cb864213c6f48ceaf90e98e7ca375e9","3DC1B33E1B5B431E99FA163BF9E86E6A","13336","b353614a47e2425a8a8885d270267407"],
|
||||
"userType": "1",
|
||||
"hasCascade": True
|
||||
}
|
||||
|
||||
request_body_json = json.dumps(selector, separators=(',', ':'))
|
||||
|
||||
param_map = {
|
||||
"systemCode": systemCode,
|
||||
"timestamp": ts,
|
||||
"nonce": randomString,
|
||||
"REQUEST_BODY_CONTENT": request_body_json
|
||||
}
|
||||
|
||||
sign = create_sign(param_map, secret_key)
|
||||
|
||||
apiPath = "/userListByDeptSearch"
|
||||
fullUrl = f"{ifcUrl}/{platform_ifc_system_code}{apiPath}"
|
||||
|
||||
curl = f"""
|
||||
curl -X POST "{fullUrl}" \\
|
||||
-H "Content-Type: application/json" \\
|
||||
-H "systemCode: {systemCode}" \\
|
||||
-H "timestamp: {ts}" \\
|
||||
-H "nonce: {randomString}" \\
|
||||
-H "sign: {sign}" \\
|
||||
-H "App-Id: {appId}" \\
|
||||
-d '{request_body_json}'
|
||||
"""
|
||||
|
||||
print("\n===== 最终 curl 请求 =====\n")
|
||||
print(curl)
|
||||
59
recive_wits.py
Normal file
59
recive_wits.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import socket
|
||||
import time
|
||||
|
||||
HOST = "192.168.1.41"
|
||||
PORT = 9928
|
||||
|
||||
def connect():
|
||||
"""建立 TCP 连接(带重试)"""
|
||||
while True:
|
||||
try:
|
||||
print(f"Connecting to {HOST}:{PORT} ...")
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect((HOST, PORT))
|
||||
s.settimeout(5)
|
||||
print("Connected successfully!")
|
||||
return s
|
||||
except Exception as e:
|
||||
print(f"Connection failed: {e}, retrying in 3s...")
|
||||
time.sleep(3)
|
||||
|
||||
|
||||
def receive_wits_data(sock):
|
||||
"""持续接收 WITS 数据(自动处理黏包/拆包)"""
|
||||
buffer = ""
|
||||
|
||||
while True:
|
||||
try:
|
||||
data = sock.recv(4096)
|
||||
|
||||
# 服务器关闭
|
||||
if not data:
|
||||
print("Server closed connection.")
|
||||
return False
|
||||
|
||||
buffer += data.decode(errors="ignore")
|
||||
|
||||
# WITS 多为 \r\n 分隔
|
||||
while "\n" in buffer:
|
||||
line, buffer = buffer.split("\n", 1)
|
||||
line = line.strip()
|
||||
if line:
|
||||
print("Received:", line)
|
||||
|
||||
except socket.timeout:
|
||||
# 正常情况,继续接收即可
|
||||
continue
|
||||
except Exception as e:
|
||||
print("Error:", e)
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
sock = connect()
|
||||
ok = receive_wits_data(sock)
|
||||
sock.close()
|
||||
|
||||
print("Reconnecting in 3 seconds...")
|
||||
time.sleep(3)
|
||||
61
send_wtis.py
Normal file
61
send_wtis.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import socket
|
||||
import random
|
||||
import time
|
||||
|
||||
HOST = "192.168.1.5" # 目标地址
|
||||
PORT = 9929 # 目标端口
|
||||
|
||||
# 你给的示例里出现的所有前四位字段
|
||||
WITS_CODES = [
|
||||
"0105",
|
||||
"0106",
|
||||
"0108",
|
||||
"0112",
|
||||
"0114",
|
||||
"0116",
|
||||
"0118",
|
||||
"0120",
|
||||
"0121",
|
||||
"0122"
|
||||
]
|
||||
|
||||
def random_value(prefix):
|
||||
"""
|
||||
生成类似你收到的数据:
|
||||
- 有些是整数:例如 0105 251114
|
||||
- 有些是浮点:例如 0108 37.26745
|
||||
"""
|
||||
# 随机决定生成整数 or 小数
|
||||
if random.random() < 0.3:
|
||||
# 生成整数(6位左右)
|
||||
value = str(random.randint(100000, 999999))
|
||||
else:
|
||||
# 生成浮点(保留4~5位小数)
|
||||
value = f"{random.uniform(0, 500):.5f}"
|
||||
|
||||
return prefix + value
|
||||
|
||||
|
||||
def send_wits_data():
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect((HOST, PORT))
|
||||
print("Connected to target. Sending WITS data...")
|
||||
|
||||
try:
|
||||
while True:
|
||||
for code in WITS_CODES:
|
||||
msg = random_value(code)
|
||||
|
||||
sock.sendall((msg + "\r\n").encode())
|
||||
print("Sent:", msg)
|
||||
|
||||
time.sleep(0.2) # 每条间隔 200ms,可根据需要调整
|
||||
|
||||
except Exception as e:
|
||||
print("Error:", e)
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
send_wits_data()
|
||||
66
snowflake_generator.py
Normal file
66
snowflake_generator.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import time
|
||||
import threading
|
||||
EPOCH_JAVA_COMMON = 1288834974657
|
||||
class Snowflake:
|
||||
def __init__(self, datacenter_id: int = 0, worker_id: int = 0, epoch: int = 1480166465631):
|
||||
# 机器和数据中心配置
|
||||
self.worker_id_bits = 5
|
||||
self.datacenter_id_bits = 5
|
||||
self.sequence_bits = 12
|
||||
|
||||
self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
|
||||
self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
|
||||
|
||||
if worker_id > self.max_worker_id or worker_id < 0:
|
||||
raise ValueError(f"worker_id 超出范围 (0 ~ {self.max_worker_id})")
|
||||
if datacenter_id > self.max_datacenter_id or datacenter_id < 0:
|
||||
raise ValueError(f"datacenter_id 超出范围 (0 ~ {self.max_datacenter_id})")
|
||||
|
||||
self.worker_id = worker_id
|
||||
self.datacenter_id = datacenter_id
|
||||
self.epoch = epoch
|
||||
|
||||
self.sequence = 0
|
||||
self.last_timestamp = -1
|
||||
|
||||
self.worker_id_shift = self.sequence_bits
|
||||
self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
|
||||
self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
|
||||
self.sequence_mask = -1 ^ (-1 << self.sequence_bits)
|
||||
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def _timestamp(self):
|
||||
return int(time.time() * 1000)
|
||||
|
||||
def _til_next_millis(self, last_timestamp):
|
||||
timestamp = self._timestamp()
|
||||
while timestamp <= last_timestamp:
|
||||
timestamp = self._timestamp()
|
||||
return timestamp
|
||||
|
||||
def next_id(self) -> int:
|
||||
with self.lock:
|
||||
timestamp = self._timestamp()
|
||||
if timestamp < self.last_timestamp:
|
||||
raise Exception("时钟回拨,拒绝生成ID")
|
||||
|
||||
if timestamp == self.last_timestamp:
|
||||
self.sequence = (self.sequence + 1) & self.sequence_mask
|
||||
if self.sequence == 0:
|
||||
timestamp = self._til_next_millis(self.last_timestamp)
|
||||
else:
|
||||
self.sequence = 0
|
||||
|
||||
self.last_timestamp = timestamp
|
||||
|
||||
return ((timestamp - self.epoch) << self.timestamp_left_shift) | \
|
||||
(self.datacenter_id << self.datacenter_id_shift) | \
|
||||
(self.worker_id << self.worker_id_shift) | \
|
||||
self.sequence
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
snowflake = Snowflake(datacenter_id=0, worker_id=0, epoch=EPOCH_JAVA_COMMON)
|
||||
for _ in range(10):
|
||||
print(str(snowflake.next_id()))
|
||||
@@ -1,28 +1,20 @@
|
||||
import ssl
|
||||
import websocket
|
||||
|
||||
URL = "ws://192.168.1.41:9516/ws/" # 注意走 443,不要再连 8080 了
|
||||
# 如果你的 WS 路径是 /ws/,就写成上面这样;若是别的路径自己改
|
||||
# URL = "ws://192.168.1.202:9100/well-tool-test-system/ws/"
|
||||
|
||||
def on_message(ws, msg): print("收到:", msg)
|
||||
def on_error(ws, err): print("错误:", err)
|
||||
def on_close(ws, code, reason): print("关闭:", code, reason)
|
||||
def on_open(ws):
|
||||
print("连接成功")
|
||||
ws.send("hello server mac")
|
||||
URL = "wss://192.168.1.87/ws/"
|
||||
ws = websocket.WebSocketApp(
|
||||
URL,
|
||||
on_open=lambda ws: print("连接成功"),
|
||||
on_message=lambda ws, msg: print("消息:", msg),
|
||||
on_error=lambda ws, err: print("错误:", err),
|
||||
on_close=lambda ws, code, reason: print("关闭:", code, reason),
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
websocket.enableTrace(True)
|
||||
ws = websocket.WebSocketApp(
|
||||
URL,
|
||||
on_open=on_open,
|
||||
on_message=on_message,
|
||||
on_error=on_error,
|
||||
on_close=on_close,
|
||||
# header=["Origin: https://192.168.1.3"] # 如后端不校验 Origin 可删
|
||||
header=[] # 如后端不校验 Origin 可删
|
||||
)
|
||||
ws.run_forever(sslopt={
|
||||
ws.run_forever(
|
||||
sslopt={
|
||||
"cert_reqs": ssl.CERT_NONE,
|
||||
"check_hostname": False,
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1,43 +1,101 @@
|
||||
import requests
|
||||
import json
|
||||
from Crypto.Cipher import DES
|
||||
from Crypto.Util.Padding import pad
|
||||
import base64
|
||||
|
||||
# 接口地址
|
||||
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
|
||||
# 全局变量,用于存储从登录响应中获取的 key 和 iv
|
||||
DYNAMIC_KEY = None
|
||||
DYNAMIC_IV = None
|
||||
|
||||
# 请求体
|
||||
payload = {
|
||||
"encType": 0,
|
||||
"x_flag": "",
|
||||
"data": {
|
||||
"userId": "admin",
|
||||
|
||||
def get_des_encrypt(data: str, _key: str, _iv: str) -> str:
|
||||
"""
|
||||
使用 DES/CBC/PKCS7 加密字符串。
|
||||
"""
|
||||
key = _key.encode('utf-8')
|
||||
iv = _iv.encode('utf-8')
|
||||
|
||||
if len(key) != 8 or len(iv) != 8:
|
||||
raise ValueError("DES key and IV must be exactly 8 bytes")
|
||||
|
||||
plaintext = data.encode('utf-8')
|
||||
padded_data = pad(plaintext, DES.block_size)
|
||||
cipher = DES.new(key, DES.MODE_CBC, iv)
|
||||
encrypted_bytes = cipher.encrypt(padded_data)
|
||||
return base64.b64encode(encrypted_bytes).decode('utf-8')
|
||||
|
||||
|
||||
def login_and_get_dynamic_key_iv(url, login_data):
|
||||
"""
|
||||
发送未加密登录请求,成功后提取 key 和 iv。
|
||||
返回 (token, key, iv) 或 (None, None, None)
|
||||
"""
|
||||
payload = {
|
||||
"encType": 0,
|
||||
"x_flag": "",
|
||||
"data": login_data
|
||||
}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
try:
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
print("🔓 [登录] 状态码:", response.status_code)
|
||||
result = response.json()
|
||||
print("🔓 [登录] 响应:", json.dumps(result, indent=2, ensure_ascii=False))
|
||||
|
||||
data = result.get("data", {})
|
||||
token = data.get("token")
|
||||
key = data.get("key")
|
||||
iv = data.get("iv")
|
||||
|
||||
if token and key and iv:
|
||||
print(f"✅ 登录成功!Token: {token}")
|
||||
print(f"🔑 动态 Key: {key}, IV: {iv}")
|
||||
return token, key, iv
|
||||
else:
|
||||
print("❌ 登录成功但缺少 key 或 iv")
|
||||
return None, None, None
|
||||
except Exception as e:
|
||||
print("❌ 登录异常:", e)
|
||||
return None, None, None
|
||||
|
||||
|
||||
def send_encrypted_request_with_dynamic_key(url, data, key, iv):
|
||||
"""
|
||||
使用动态 key/iv 加密并发送请求。
|
||||
"""
|
||||
data_str = json.dumps(data, separators=(',', ':'))
|
||||
encrypted_data = get_des_encrypt(data_str, key, iv)
|
||||
|
||||
payload = {
|
||||
"encType": 0,
|
||||
"x_flag": "",
|
||||
"data": encrypted_data
|
||||
}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
try:
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
print("🔒 [加密请求] 状态码:", response.status_code)
|
||||
print("🔒 [加密请求] 响应:", response.text)
|
||||
|
||||
result = response.json()
|
||||
token = result.get("data", {}).get("token")
|
||||
if token:
|
||||
print("✅ 加密请求成功,Token:", token)
|
||||
else:
|
||||
print("⚠️ 加密请求完成,但无新 token")
|
||||
except Exception as e:
|
||||
print("❌ 加密请求异常:", e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
|
||||
login_data = {
|
||||
"userId": "test002",
|
||||
"password": "123456"
|
||||
}
|
||||
}
|
||||
token, key, iv = login_and_get_dynamic_key_iv(url, login_data)
|
||||
|
||||
# 请求头
|
||||
headers = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
# 发送 POST 请求
|
||||
response = requests.post(url, headers=headers, data=json.dumps(payload))
|
||||
|
||||
# 检查状态码
|
||||
response.raise_for_status()
|
||||
|
||||
# 转为 JSON
|
||||
result = response.json()
|
||||
|
||||
# 从响应中解析 token
|
||||
token = result.get("data", {}).get("token")
|
||||
|
||||
if token:
|
||||
print("登录成功,Token:", token)
|
||||
else:
|
||||
print("未获取到 Token,响应内容:", result)
|
||||
|
||||
except requests.RequestException as e:
|
||||
print("请求异常:", e)
|
||||
except ValueError:
|
||||
print("响应不是合法的 JSON 格式")
|
||||
if not (key and iv):
|
||||
print("🛑 无法获取动态 key/iv,退出。")
|
||||
exit(1)
|
||||
|
||||
Reference in New Issue
Block a user