Compare commits

...

24 Commits

Author SHA1 Message Date
c1fa662552 feat(sign): 支持多环境配置和签名生成优化
- 新增开发环境和生产环境的配置管理
- 移除参数中的空格替换逻辑
- 更新默认搜索键和用户类型
- 支持通过命令行参数切换运行环境
- 自动构建完整请求URL用于curl测试
- 添加环境选择提示和错误处理机制
2025-11-25 11:06:04 +08:00
bab29150ab fix(generateSign): 更新系统配置和搜索键值
- 修改 systemCode 和 appId 为新的标识符
- 更新 secret_key 以增强安全性
- 扩展 searchKeys 列表,增加多个新的搜索键值
- 调整 searchKeys 数组格式以提高可读性
2025-11-24 16:01:44 +08:00
8c4e48f0ef feat(utils): 添加MD5签名生成工具
- 实现了基于Java规则的MD5签名计算方法
- 支持参数映射排序与URL编码处理
- 添加了空格去除和特殊字符处理逻辑
- 集成了时间戳、随机字符串生成功能
- 提供完整的curl请求示例输出
- 支持复杂对象序列化为JSON请求体
- 实现了多层级参数拼接与签名验证
2025-11-24 11:12:25 +08:00
167d96b6ba feat(wits): 添加WITS数据模拟发送脚本- 实现随机生成WITS协议数据功能
- 支持整数和浮点数两种数据格式
- 配置目标主机和端口进行TCP连接
- 循环发送预定义的WITS字段码
- 添加发送间隔控制和异常处理机制- 提供命令行入口直接运行脚本
2025-11-24 10:23:46 +08:00
123dfc4411 feat(wits): 添加WITS数据模拟发送脚本- 实现随机生成WITS协议数据功能
- 支持整数和浮点数两种数据格式
- 配置目标主机和端口进行TCP连接
- 循环发送预定义的WITS字段码
- 添加发送间隔控制和异常处理机制- 提供命令行入口直接运行脚本
2025-11-17 09:58:16 +08:00
ea2d12c74e feat(network): 添加 WITS 数据接收功能并更新 WebSocket 地址
- 新增 recive_wits.py 文件用于通过 TCP 接收 WITS 数据- 实现自动重连机制和数据黏包/拆包处理- 更新 test_websocket.py 中的 WebSocket 连接地址- 修改端口号以适配新的服务配置
2025-11-14 09:55:29 +08:00
9c6f5c3b63 feat(login): 实现动态密钥登录流程
- 移除默认静态密钥配置,改为从登录响应动态获取- 新增 login_and_get_dynamic_key_iv 函数处理未加密登录并提取密钥- 修改 get_des_encrypt 函数为必需传入密钥参数
- 更新 send_encrypted_request_with_dynamic_key 使用动态密钥发送加密请求
- 调整主程序逻辑,先执行未加密登录获取密钥再进行加密请求
- 修改测试账号为 test002 并移除手动切换加密模式的选项
2025-10-27 11:19:37 +08:00
3489709697 feat(login): 支持发送加密和未加密登录请求
- 添加 send_unencrypted_request 函数用于发送未加密请求
- 添加 send_encrypted_request 函数用于发送加密请求- 重构主流程,通过 USE_ENCRYPTION 开关选择请求类型
- 优化日志输出,区分加密与未加密请求状态
- 统一异常处理逻辑,增强代码健壮性
- 保留原有加密逻辑并整合到新结构中
2025-10-27 11:05:59 +08:00
3ab5f15b0e feat(login): 实现DES加密登录功能- 添加 DES/CBC/PKCS7 加密函数,兼容 CryptoJS.DES.encrypt
- 使用默认密钥和 IV 对登录数据进行加密
- 修改请求 payload 结构,将加密后的字符串作为 data 字段值
- 更新接口地址并调整请求逻辑
- 增强错误处理与调试信息打印- 优化响应解析方式,正确提取 token 信息
2025-10-21 13:28:35 +08:00
5fa68924e3 feat(websocket): 更新WebSocket连接地址并添加认证令牌
- 将连接地址从 wss://192.168.1.3/ws/ 更改为 ws://192.168.1.41:9516/ws/- 添加了用于身份验证的 Bearer Token
- 启用 WebSocket 调试追踪功能
2025-10-15 11:15:58 +08:00
945ac31725 Merge remote-tracking branch 'origin/main'
# Conflicts:
#	test_websocket.py
2025-10-15 11:09:01 +08:00
5b1ce354b9 feat(websocket): 更新WebSocket连接地址并添加认证令牌
- 将连接地址从 wss://192.168.1.3/ws/ 更改为 ws://192.168.1.41:9516/ws/- 添加了用于身份验证的 Bearer Token
- 启用 WebSocket 调试追踪功能
2025-10-15 11:08:39 +08:00
ce607c5637 feat(auth): 添加系统登录并获取token功能
- 实现用户登录接口调用
- 解析响应数据提取token
- 添加异常处理机制
- 支持JSON格式请求体- 集成requests库进行HTTP通信
- 提供登录成功/失败状态提示
2025-10-15 11:07:33 +08:00
266ba6ad31 feat(whatsapp): 实现 WhatsApp 风格的端到端加密通信原型
- 添加了基于 X3DH 和 Double Ratchet 的加密会话逻辑
- 实现了客户端密钥生成、bundle 注册与获取
- 构建了极简中继服务器用于转发加密消息
- 支持消息加密、解密及 MAC 校验- 提供完整演示流程,包括双向通信和多消息发送
- 使用 AES-CBC 加密和 HMAC-SHA256 认证
- 引入 X25519 密钥交换和 HKDF 密钥派生函数
- 包含一次性预共享密钥(OPK)管理机制
2025-10-06 14:42:14 +08:00
f7eb913be3 变更 2025-08-08 10:17:31 +08:00
1211d2fca9 变更 2025-08-08 09:51:44 +08:00
8c58329bfc Merge remote-tracking branch 'origin/main' 2025-08-08 09:47:33 +08:00
4ac98d9791 变更 2025-08-08 09:47:27 +08:00
91bb40770a update code 2025-01-15 15:34:32 +08:00
80617d3c28 update code 2024-12-26 13:08:20 +08:00
f77dc396db update code 2024-12-26 13:07:55 +08:00
d64e709d3f update code 2024-12-24 13:38:59 +08:00
eb7316b837 update code 2024-12-09 13:39:29 +08:00
6140aae7f7 add kinbase connect 2024-12-09 10:00:15 +08:00
18 changed files with 1245 additions and 6 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
./venv/
.idea/
.*png

40
base64EncodeUrl.py Normal file
View File

@@ -0,0 +1,40 @@
import base64
def generate_download_url(base_url: str, file_id: str, fullfilename: str, token: str) -> str:
"""
生成下载 URL 并对其进行 Base64 编码
:param base_url: 文件下载的基础 URL
:param file_id: 文件的 ID
:param fullfilename: 文件的完整名称
:param token: 授权的 Token
:return: Base64 编码后的完整下载 URL
"""
# 拼接完整的下载 URL
download_url = f"{base_url}/{file_id}?fullfilename={fullfilename}&token={token}"
print(f"Original URL: {download_url}")
# 对 URL 进行 Base64 编码
base64_encoded_url = base64.b64encode(download_url.encode("utf-8")).decode("utf-8")
print(f"Base64 Encoded URL: {base64_encoded_url}")
return base64_encoded_url
# 示例数据
pre_view_url = "http://192.168.1.12:8012/onlinePreview"
download_url = "http://192.168.1.28:8080/prod-api/file/download"
base_url = "http://192.168.1.28:8080/prod-api/file/download"
file_id = "a69ea8e0f8e28f92202975a3a4996e7b"
fullfilename = "文件预览.pptx"
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzUwMzIxMDMsImp0aSI6Ijl4Q3JrVDltXy1iZGh6czhKSWlrdkcySk5XcyIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.T7jZ4bSmPqlwH2LjZeKLRXg_5Q4t7ihRUhd-zvdehak"
# 生成 Base64 编码的 URL
encoded_url = generate_download_url(base_url, file_id, fullfilename, token)
# 拼接下载和预览 URL
down_url = f"{download_url}/{file_id}"
final_url = f"{pre_view_url}?url={encoded_url}"
# 打印结果
print(f"Download URL: {down_url}")
print(f"Final Preview URL: {final_url}")

91
fastapiTest.py Normal file
View File

@@ -0,0 +1,91 @@
from fastapi import FastAPI, HTTPException, Response
from minio import Minio
from minio.error import S3Error
from pydantic import BaseModel
from datetime import timedelta
app = FastAPI()
# 初始化 MinIO 客户端
minio_client = Minio(
endpoint="192.168.1.28:9500", # MinIO 服务器地址
access_key="UIiBRvUixj3GkJwvItmy", # Access Key
secret_key="wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm", # Secret Key
secure=False # 如果没有使用 HTTPS则设置为 False
)
# MinIO 配置
BUCKET_NAME = "test-bucket"
class FileRequest(BaseModel):
object_name: str # 文件对象名
expiry_in_seconds: int = 3600 # URL 有效期(默认 1 小时)
@app.post("/download-presigned-url")
def generate_presigned_url(file_request: FileRequest):
"""
生成用于下载文件的预签名 URL
"""
try:
# 检查存储桶是否存在
if not minio_client.bucket_exists(BUCKET_NAME):
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
# 检查对象是否存在
try:
minio_client.stat_object(BUCKET_NAME, file_request.object_name)
except S3Error:
raise HTTPException(status_code=404, detail=f"Object '{file_request.object_name}' does not exist.")
# 将秒数转换为 timedelta
expiry_timedelta = timedelta(seconds=file_request.expiry_in_seconds)
# 生成预签名 URL
presigned_url = minio_client.presigned_get_object(
BUCKET_NAME,
file_request.object_name,
expires=expiry_timedelta # 传递 timedelta
)
return {"presigned_url": presigned_url}
except S3Error as e:
raise HTTPException(status_code=500, detail=f"Error generating presigned URL: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
@app.get("/download-file/{object_name:path}")
def download_file(object_name: str):
"""
提供文件对象并直接下载
"""
try:
print(f"Requested object: {object_name}")
# 检查存储桶是否存在
if not minio_client.bucket_exists(BUCKET_NAME):
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
# 检查对象是否存在
try:
stat = minio_client.stat_object(BUCKET_NAME, object_name)
except S3Error:
raise HTTPException(status_code=404, detail=f"Object '{object_name}' does not exist.")
# 获取文件对象
response = minio_client.get_object(BUCKET_NAME, object_name)
# 设置响应头
headers = {
"Content-Disposition": f"attachment; filename*=UTF-8''{object_name.split('/')[-1]}",
"Content-Type": stat.content_type
}
return Response(content=response.read(), headers=headers, media_type=stat.content_type)
except S3Error as e:
raise HTTPException(status_code=500, detail=f"Error downloading file: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")

124
generateSign.py Normal file
View File

@@ -0,0 +1,124 @@
import json
import time
import uuid
import urllib.parse
import hashlib
import sys
# =================== 运行环境配置 ===================
CONFIG = {
"dev": {
"platform_ifc_system_code": "ef5b17caff6e4da19d6af82d539e894d",
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
"appId": "57395d2bc668496c9c57d8f2b19bd516",
"secret_key": "KMFHKo1Uzrl&MWXorbQIT&C$Qea$uQOY",
"ifcUrl": "http://192.168.1.202:8083/dev-api/cenertech-interface-center/IFC2"
},
"prod": {
"platform_ifc_system_code": "57395d2bc668496c9c57d8f2b19bd516",
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
"appId": "57395d2bc668496c9c57d8f2b19bd516",
"secret_key": "opS=K9Parlf&p+JxBOQD2q+zNZa+uXEE",
"ifcUrl": "https://dpc.cet.cnooc/prod-api/cenertech-interface-center/IFC2"
}
}
# =================== MD5签名计算 ===================
def calculate_md5(param, secret_key):
return hashlib.md5((secret_key + param).encode('utf-8')).hexdigest()
def create_sign(param_map, secret_key):
sorted_keys = sorted(param_map.keys())
parts = []
for key in sorted_keys:
val = param_map[key]
if val is None:
parts.append(f"{key}=")
continue
if isinstance(val, list):
for item in val:
encoded_val = urllib.parse.quote(str(item), encoding='utf-8')
parts.append(f"{key}={encoded_val}")
continue
val_str = str(val)
if key != "REQUEST_BODY_CONTENT":
val_str = urllib.parse.quote(val_str, encoding='utf-8')
parts.append(f"{key}={val_str}")
param = "&".join(parts)
param = "".join(param.split())
print("参数明文 param:", param)
sign = calculate_md5(param, secret_key)
print("生成签名 sign:", sign)
return sign
# =================== Python 主程序 ===================
if __name__ == '__main__':
# 选择 dev / prod
env = "dev"
if len(sys.argv) > 1:
env = sys.argv[1]
print(f"\n=== 当前环境:{env} ===\n")
if env not in CONFIG:
print("❗ 错误:请使用 python sign.py dev 或 python sign.py prod")
sys.exit(1)
cfg = CONFIG[env]
systemCode = cfg["systemCode"]
appId = cfg["appId"]
secret_key = cfg["secret_key"]
platform_ifc_system_code = cfg["platform_ifc_system_code"]
ifcUrl = cfg["ifcUrl"]
ts = str(int(time.time() * 1000))
randomString = uuid.uuid4().hex
selector = {
"searchKeys": ["9cb864213c6f48ceaf90e98e7ca375e9","3DC1B33E1B5B431E99FA163BF9E86E6A","13336","b353614a47e2425a8a8885d270267407"],
"userType": "1",
"hasCascade": True
}
request_body_json = json.dumps(selector, separators=(',', ':'))
param_map = {
"systemCode": systemCode,
"timestamp": ts,
"nonce": randomString,
"REQUEST_BODY_CONTENT": request_body_json
}
sign = create_sign(param_map, secret_key)
apiPath = "/userListByDeptSearch"
fullUrl = f"{ifcUrl}/{platform_ifc_system_code}{apiPath}"
curl = f"""
curl -X POST "{fullUrl}" \\
-H "Content-Type: application/json" \\
-H "systemCode: {systemCode}" \\
-H "timestamp: {ts}" \\
-H "nonce: {randomString}" \\
-H "sign: {sign}" \\
-H "App-Id: {appId}" \\
-d '{request_body_json}'
"""
print("\n===== 最终 curl 请求 =====\n")
print(curl)

28
generaterCardId.py Normal file
View File

@@ -0,0 +1,28 @@
import random
def generateCardId(length):
"""
生成指定长度的随机卡号
:param length: 卡号的长度
:return: 生成的随机卡号
"""
if length <= 0:
raise ValueError("Length must be a positive integer.")
# 初始化卡号
card_id = ""
# 数字范围
numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 生成随机卡号
for _ in range(length):
random_number = random.choice(numbers)
card_id += str(random_number)
return card_id
# 测试函数
if __name__ == "__main__":
length = 18 # 指定卡号长度
card_id = generateCardId(length)
print(f"Generated Card ID: {card_id}")

59
recive_wits.py Normal file
View File

@@ -0,0 +1,59 @@
import socket
import time
HOST = "192.168.1.41"
PORT = 9928
def connect():
"""建立 TCP 连接(带重试)"""
while True:
try:
print(f"Connecting to {HOST}:{PORT} ...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
s.settimeout(5)
print("Connected successfully!")
return s
except Exception as e:
print(f"Connection failed: {e}, retrying in 3s...")
time.sleep(3)
def receive_wits_data(sock):
"""持续接收 WITS 数据(自动处理黏包/拆包)"""
buffer = ""
while True:
try:
data = sock.recv(4096)
# 服务器关闭
if not data:
print("Server closed connection.")
return False
buffer += data.decode(errors="ignore")
# WITS 多为 \r\n 分隔
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
print("Received:", line)
except socket.timeout:
# 正常情况,继续接收即可
continue
except Exception as e:
print("Error:", e)
return False
if __name__ == "__main__":
while True:
sock = connect()
ok = receive_wits_data(sock)
sock.close()
print("Reconnecting in 3 seconds...")
time.sleep(3)

61
send_wtis.py Normal file
View File

@@ -0,0 +1,61 @@
import socket
import random
import time
HOST = "192.168.1.5" # 目标地址
PORT = 9929 # 目标端口
# 你给的示例里出现的所有前四位字段
WITS_CODES = [
"0105",
"0106",
"0108",
"0112",
"0114",
"0116",
"0118",
"0120",
"0121",
"0122"
]
def random_value(prefix):
"""
生成类似你收到的数据:
- 有些是整数:例如 0105 251114
- 有些是浮点:例如 0108 37.26745
"""
# 随机决定生成整数 or 小数
if random.random() < 0.3:
# 生成整数6位左右
value = str(random.randint(100000, 999999))
else:
# 生成浮点保留4~5位小数
value = f"{random.uniform(0, 500):.5f}"
return prefix + value
def send_wits_data():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
print("Connected to target. Sending WITS data...")
try:
while True:
for code in WITS_CODES:
msg = random_value(code)
sock.sendall((msg + "\r\n").encode())
print("Sent:", msg)
time.sleep(0.2) # 每条间隔 200ms可根据需要调整
except Exception as e:
print("Error:", e)
finally:
sock.close()
if __name__ == "__main__":
send_wits_data()

66
simulate_monty_hall.py Normal file
View File

@@ -0,0 +1,66 @@
import random
def simulate_monty_hall(switch, num_trials=100000):
"""
模拟三门悖论(蒙提霍尔问题)
参数:
- switch: bool, 是否在主持人打开门后选择换门
- num_trials: int, 模拟次数默认10万次
返回:
- 获胜次数
- 胜率
"""
win_count = 0
doors = [1, 2, 3] # 三扇门
for _ in range(num_trials):
# 随机放置汽车1 表示汽车0 表示山羊)
car_door = random.choice(doors)
# 参赛者初始选择
player_choice = random.choice(doors)
# 主持人要打开一扇门:不能是参赛者选的,也不能是有车的
remaining_doors = [d for d in doors if d != player_choice and d != car_door]
host_opens = random.choice(remaining_doors)
# 换门逻辑
if switch:
# 换到剩下的那一扇未被选、未被开的门
final_choice = [d for d in doors if d != player_choice and d != host_opens][0]
else:
# 不换,保持原选择
final_choice = player_choice
# 判断是否获胜
if final_choice == car_door:
win_count += 1
win_rate = win_count / num_trials
return win_count, win_rate
# === 运行模拟 ===
if __name__ == "__main__":
num_trials = 100000
print(f"模拟 {num_trials} 次三门问题:\n")
# 情况1坚持不换门
wins_stay, rate_stay = simulate_monty_hall(switch=False, num_trials=num_trials)
print(f"坚持原选择(不换门):")
print(f" 获胜次数: {wins_stay}")
print(f" 胜率: {rate_stay:.4f} ({rate_stay * 100:.2f}%)\n")
# 情况2总是换门
wins_switch, rate_switch = simulate_monty_hall(switch=True, num_trials=num_trials)
print(f"总是换门:")
print(f" 获胜次数: {wins_switch}")
print(f" 胜率: {rate_switch:.4f} ({rate_switch * 100:.2f}%)\n")
print("理论值对比:")
print(" 不换门胜率: 1/3 ≈ 33.33%")
print(" 换门胜率: 2/3 ≈ 66.67%")

66
snowflake_generator.py Normal file
View File

@@ -0,0 +1,66 @@
import time
import threading
EPOCH_JAVA_COMMON = 1288834974657
class Snowflake:
def __init__(self, datacenter_id: int = 0, worker_id: int = 0, epoch: int = 1480166465631):
# 机器和数据中心配置
self.worker_id_bits = 5
self.datacenter_id_bits = 5
self.sequence_bits = 12
self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
if worker_id > self.max_worker_id or worker_id < 0:
raise ValueError(f"worker_id 超出范围 (0 ~ {self.max_worker_id})")
if datacenter_id > self.max_datacenter_id or datacenter_id < 0:
raise ValueError(f"datacenter_id 超出范围 (0 ~ {self.max_datacenter_id})")
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.epoch = epoch
self.sequence = 0
self.last_timestamp = -1
self.worker_id_shift = self.sequence_bits
self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
self.sequence_mask = -1 ^ (-1 << self.sequence_bits)
self.lock = threading.Lock()
def _timestamp(self):
return int(time.time() * 1000)
def _til_next_millis(self, last_timestamp):
timestamp = self._timestamp()
while timestamp <= last_timestamp:
timestamp = self._timestamp()
return timestamp
def next_id(self) -> int:
with self.lock:
timestamp = self._timestamp()
if timestamp < self.last_timestamp:
raise Exception("时钟回拨拒绝生成ID")
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.sequence_mask
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
return ((timestamp - self.epoch) << self.timestamp_left_shift) | \
(self.datacenter_id << self.datacenter_id_shift) | \
(self.worker_id << self.worker_id_shift) | \
self.sequence
if __name__ == "__main__":
snowflake = Snowflake(datacenter_id=0, worker_id=0, epoch=EPOCH_JAVA_COMMON)
for _ in range(10):
print(str(snowflake.next_id()))

56
testApi.py Normal file
View File

@@ -0,0 +1,56 @@
import requests
class NotificationAPIClient:
def __init__(self, environment="dev"):
# 定义不同环境的域名
self.environment_urls = {
"dev": "http://dev.example.com",
"test": "http://192.168.1.202:9100/downhole-tool-system",
"prod": "http://prod.example.com"
}
# 设置当前环境的域名
if environment not in self.environment_urls:
raise ValueError(f"Invalid environment '{environment}'. Choose from: {list(self.environment_urls.keys())}")
self.base_url = self.environment_urls[environment]
def get_my_task(self, token):
# 拼接请求的完整 URL
url = f"{self.base_url}/notification/v1/my-task"
# 设置请求头,包含 Authorization Token
headers = {
"Authorization": f"Bearer {token}"
}
try:
# 发起 GET 请求
response = requests.get(url, headers=headers)
# 检查 HTTP 状态码并返回结果
if response.status_code == 200:
return response.json() # 返回解析后的 JSON 数据
else:
return {"error": f"HTTP {response.status_code}", "details": response.text}
except requests.RequestException as e:
return {"error": "Request failed", "details": str(e)}
# 使用示例
if __name__ == "__main__":
# 切换环境:可选 "dev", "test", "prod"
environment = "test"
# 创建客户端
client = NotificationAPIClient(environment=environment)
# 设置 Token
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzY5Mjc5MTQsImp0aSI6InZQTm5iTUdiczJ6OHdPYi1VVTljcHVxd0oxVSIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.5Z0yXfoLvJc4qEPf6RMS8Xw6VtjaNMPAyfQxEp0DIkM"
# 发起请求
result = client.get_my_task(token)
# 输出结果
print(result)

34
testConnectKingBase.py Normal file
View File

@@ -0,0 +1,34 @@
import psycopg2
# 请根据实际情况修改以下参数
HOST = "192.168.1.28"
PORT = "54321"
DATABASE = "upp_demo"
USER = "system"
PASSWORD = "kingbase_ok"
try:
# 建立连接
conn = psycopg2.connect(
host=HOST,
port=PORT,
database=DATABASE,
user=USER,
password=PASSWORD
)
print("成功连接到人大金仓数据库!")
# 创建一个游标对象
cur = conn.cursor()
# 测试执行简单SQL语句如查询版本信息
cur.execute("SELECT version();")
version_info = cur.fetchone()
print("数据库版本信息:", version_info)
# 关闭游标和连接
cur.close()
conn.close()
except psycopg2.Error as e:
print("连接失败:", e)

View File

@@ -4,11 +4,11 @@ from mysql.connector import Error
def test_mysql_connection():
# MySQL 配置
config = {
'host': '192.168.2.20',
'port': 8006,
'user': 'mticloud',
'password': 'fT3KsNDahADGcWCZ',
'database': 'mti-cloud',
'host': '192.168.1.202',
'port': 3306,
'user': 'gitea',
'password': 'CYVVMMxrox3ThsGy',
'database': 'gitea',
'charset': 'utf8mb4'
}

33
testGeneratefile.py Normal file
View File

@@ -0,0 +1,33 @@
import os
import random
# 输出目录
output_dir = "./output_files"
os.makedirs(output_dir, exist_ok=True)
# 配置
file_count = 40
min_size_mb = 70
max_size_mb = 80
chunk_size = 1024 * 1024 # 每次写入 1MB
def generate_random_file(file_path, size_bytes):
with open(file_path, "wb") as f:
written = 0
while written < size_bytes:
chunk = os.urandom(min(chunk_size, size_bytes - written))
f.write(chunk)
written += len(chunk)
for i in range(1, file_count + 1):
file_size_mb = random.randint(min_size_mb, max_size_mb)
file_size_bytes = file_size_mb * 1024 * 1024
filename = f"file_{i}.bin"
filepath = os.path.join(output_dir, filename)
print(f"Creating {filename} ({file_size_mb} MB)...")
generate_random_file(filepath, file_size_bytes)
print("✅ 所有文件生成完毕。")

View File

@@ -0,0 +1,58 @@
from minio import Minio
from minio.error import S3Error
import mimetypes
import os
def download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name):
try:
# 初始化 MinIO 客户端
minio_client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=False # 如果 MinIO 没有使用 HTTPS设置为 False
)
# 检查存储桶是否存在
if not minio_client.bucket_exists(bucket_name):
print(f"Bucket '{bucket_name}' does not exist.")
return
# 获取对象元数据
stat = minio_client.stat_object(bucket_name, object_name)
content_type = stat.content_type # 获取 Content-Type
print(f"Object '{object_name}' Content-Type: {content_type}")
# 根据 Content-Type 确定文件后缀
guessed_extension = mimetypes.guess_extension(content_type)
if guessed_extension is None:
guessed_extension = "" # 无法推断时默认不加后缀
print(f"Guessed file extension: {guessed_extension}")
# 提取文件名并设置保存路径
file_name = os.path.basename(object_name) # 提取路径中的文件名
if "." not in file_name: # 如果文件名中没有后缀,添加推断的后缀
file_name += guessed_extension
local_file_path = f"./{file_name}" # 设置保存路径
print(f"File will be saved as: {local_file_path}")
# 下载文件
print(f"Downloading '{object_name}' from bucket '{bucket_name}' to '{local_file_path}'...")
minio_client.fget_object(bucket_name, object_name, local_file_path)
print(f"File '{object_name}' downloaded successfully to '{local_file_path}'.")
except S3Error as e:
print(f"Error occurred: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# 示例配置
endpoint = "192.168.1.28:9500" # MinIO 服务器地址
access_key = "UIiBRvUixj3GkJwvItmy" # MinIO Access Key
secret_key = "wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm" # MinIO Secret Key
bucket_name = "test-bucket" # 存储桶名称
object_name = "test/uploadedFiles/20241223/175_big.png" # 不带后缀的文件名
# 测试下载文件
download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name)

20
test_websocket.py Normal file
View File

@@ -0,0 +1,20 @@
import ssl
import websocket
# URL = "ws://192.168.1.202:9100/well-tool-test-system/ws/"
URL = "wss://192.168.1.87/ws/"
ws = websocket.WebSocketApp(
URL,
on_open=lambda ws: print("连接成功"),
on_message=lambda ws, msg: print("消息:", msg),
on_error=lambda ws, err: print("错误:", err),
on_close=lambda ws, code, reason: print("关闭:", code, reason),
)
ws.run_forever(
sslopt={
"cert_reqs": ssl.CERT_NONE,
"check_hostname": False,
}
)

352
test_whatsapp.py Normal file
View File

@@ -0,0 +1,352 @@
import os, json, hmac, base64, hashlib, time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# ------------------------- Utils -------------------------
def b64e(b: bytes) -> str:
return base64.b64encode(b).decode()
def b64d(s: str) -> bytes:
return base64.b64decode(s.encode())
def hkdf(ikm: bytes, info: bytes, length: int = 32) -> bytes:
return HKDF(algorithm=hashes.SHA256(), length=length, salt=None, info=info).derive(ikm)
def aes_cbc_enc(key: bytes, iv: bytes, pt: bytes) -> bytes:
# PKCS#7 padding
pad = 16 - (len(pt) % 16)
pt = pt + bytes([pad])*pad
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
return cipher.encryptor().update(pt) + cipher.encryptor().finalize()
def aes_cbc_dec(key: bytes, iv: bytes, ct: bytes) -> bytes:
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
pt = cipher.decryptor().update(ct) + cipher.decryptor().finalize()
pad = pt[-1]
if pad < 1 or pad > 16 or pt[-pad:] != bytes([pad])*pad:
raise ValueError("Bad padding")
return pt[:-pad]
def hmac_sha256(key: bytes, data: bytes) -> bytes:
return hmac.new(key, data, hashlib.sha256).digest()
def x25519_shared(sk: X25519PrivateKey, pk: X25519PublicKey) -> bytes:
return sk.exchange(pk)
def pub_bytes(pk: X25519PublicKey) -> bytes:
return pk.public_bytes(encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw)
def priv_from_bytes(b: bytes) -> X25519PrivateKey:
return X25519PrivateKey.from_private_bytes(b)
def pub_from_bytes(b: bytes) -> X25519PublicKey:
return X25519PublicKey.from_public_bytes(b)
# ------------------ Data structures ----------------------
@dataclass
class PublicBundle:
identity_pub: str
signed_prekey_pub: str
signed_prekey_sig: str # 演示保留字段,未做真实签名校验
onetime_prekeys: Dict[str, str] # id -> pub
@dataclass
class SessionState:
# 简化:只做单向发送链(对称 ratchet 计数)
root_key: bytes
chain_key: bytes
counter: int
peer_identity_pub_b64: str
opk_id: Optional[str] # 用了哪个一次性预键(初始化时)
established_at: float
# ------------------------- Server ------------------------
class RelayServer:
"""
极简“服务器”:存放用户公开密钥束,提供查询;转发密文帧。
不保存/解密消息正文。
"""
def __init__(self):
self.directory: Dict[str, PublicBundle] = {}
self.mailbox: Dict[str, List[dict]] = {}
def register_bundle(self, user: str, bundle: PublicBundle):
self.directory[user] = bundle
def fetch_bundle(self, user: str) -> Optional[PublicBundle]:
return self.directory.get(user)
def mark_onetime_used(self, user: str, opk_id: str):
b = self.directory.get(user)
if not b: return
if opk_id in b.onetime_prekeys:
del b.onetime_prekeys[opk_id]
def send_frame(self, to_user: str, frame: dict):
self.mailbox.setdefault(to_user, []).append(frame)
def pull_frames(self, user: str) -> List[dict]:
return self.mailbox.pop(user, [])
# ------------------------- Client ------------------------
class Client:
def __init__(self, name: str, server: RelayServer):
self.name = name
self.server = server
# 长期身份密钥
self.ik_priv = X25519PrivateKey.generate()
self.ik_pub = self.ik_priv.public_key()
# 已签名预共享密钥(演示:不做真实签名)
self.spk_priv = X25519PrivateKey.generate()
self.spk_pub = self.spk_priv.public_key()
self.spk_sig = os.urandom(64) # 占位
# 一次性预共享密钥池
self.opk_priv_map: Dict[str, X25519PrivateKey] = {}
self.opk_pub_map: Dict[str, X25519PublicKey] = {}
for i in range(5):
sk = X25519PrivateKey.generate()
pk = sk.public_key()
opk_id = f"opk-{i}"
self.opk_priv_map[opk_id] = sk
self.opk_pub_map[opk_id] = pk
# 会话(按对端用户)
self.sessions: Dict[str, SessionState] = {}
def publish(self):
bundle = PublicBundle(
identity_pub=b64e(pub_bytes(self.ik_pub)),
signed_prekey_pub=b64e(pub_bytes(self.spk_pub)),
signed_prekey_sig=b64e(self.spk_sig),
onetime_prekeys={oid: b64e(pub_bytes(pk)) for oid, pk in self.opk_pub_map.items()}
)
self.server.register_bundle(self.name, bundle)
# ---------- 建会话(发起方)简化 X3DH ----------
def establish_session_as_initiator(self, peer: str) -> SessionState:
pb = self.server.fetch_bundle(peer)
assert pb, f"{peer} has no bundle"
IKb = pub_from_bytes(b64d(pb.identity_pub))
SPKb = pub_from_bytes(b64d(pb.signed_prekey_pub))
# 选一个对方的一次性预键
opk_items = list(pb.onetime_prekeys.items())
opk_id, OPKb_b64 = opk_items[0] if opk_items else (None, None)
OPKb = pub_from_bytes(b64d(OPKb_b64)) if OPKb_b64 else None
# 发起者生成临时密钥对
EKa_priv = X25519PrivateKey.generate()
EKa_pub = EKa_priv.public_key()
# X3DH 4次 ECDH无OPK则忽略第4项
s1 = x25519_shared(self.ik_priv, SPKb)
s2 = x25519_shared(EKa_priv, IKb)
s3 = x25519_shared(EKa_priv, SPKb)
parts = [s1, s2, s3]
if OPKb:
s4 = x25519_shared(EKa_priv, OPKb)
parts.append(s4)
master_secret = b"".join(parts)
root_key = hkdf(master_secret, info=b"ROOT", length=32)
chain_key = hkdf(root_key, info=b"CHAIN", length=32)
st = SessionState(
root_key=root_key,
chain_key=chain_key,
counter=0,
peer_identity_pub_b64=pb.identity_pub,
opk_id=opk_id,
established_at=time.time()
)
self.sessions[peer] = st
# 发送“建会话 + 第一条消息”的头信息(包含 EKa_pub、opk_id
self._pending_ephemeral_pub = b64e(pub_bytes(EKa_pub))
self._pending_opk_id = opk_id
return st
# ---------- 建会话(接收方) ----------
def _establish_session_as_responder(self, peer: str, ek_pub_b64: str, opk_id: Optional[str]) -> SessionState:
EKa = pub_from_bytes(b64d(ek_pub_b64)) # 对方临时公钥
# 自己的密钥
IKb_priv = self.ik_priv
SPKb_priv = self.spk_priv
OPKb_priv = self.opk_priv_map.get(opk_id) if opk_id else None
# 4次 ECDH无OPK则忽略第4项
s1 = x25519_shared(SPKb_priv, pub_from_bytes(b64d(self.server.directory[peer].identity_pub))) # = ECDH(Ia, SPKb)
# 注意:发起方 s1 是 ECDH(Ia, SPKb),接收方等价项应是 ECDH(SPKb, Ia)
# 但我们没有 Ia 私钥,这里换一种对称表达:按消息头与本地密钥构造相同串联
# 为确保与发起方一致,我们直接重算:
# 对于接收方ECDH(Iinitiator, Srecipient) == ECDH(Srecipient, Iinitiator)
# 需要 Iinitiator 公钥:来自会话第一帧中?为简化,我们用目录中对方 identity_pub。
Ia_pub = pub_from_bytes(b64d(self.server.directory[peer].identity_pub))
s1 = x25519_shared(self.spk_priv, Ia_pub)
s2 = x25519_shared(self.ik_priv, EKa)
s3 = x25519_shared(self.spk_priv, EKa)
parts = [s1, s2, s3]
if OPKb_priv:
s4 = x25519_shared(OPKb_priv, EKa)
parts.append(s4)
master_secret = b"".join(parts)
root_key = hkdf(master_secret, info=b"ROOT", length=32)
chain_key = hkdf(root_key, info=b"CHAIN", length=32)
st = SessionState(
root_key=root_key,
chain_key=chain_key,
counter=0,
peer_identity_pub_b64=self.server.directory[peer].identity_pub,
opk_id=opk_id,
established_at=time.time()
)
self.sessions[peer] = st
# 一次性预键被使用后,服务端目录也标记删除
if opk_id:
self.server.mark_onetime_used(self.name, opk_id)
return st
# ---------- 每条消息的派生与加密 ----------
def _derive_message_key(self, st: SessionState) -> Tuple[bytes, bytes, bytes]:
"""
从 chain_key 派生本条消息的 (aes_key, hmac_key, iv),然后更新 chain_keycounter+1
"""
info = b"MSG|" + st.counter.to_bytes(8, "big")
msg_key = hkdf(st.chain_key, info=info, length=80) # 32 AES + 32 HMAC + 16 IV
aes_key = msg_key[:32]
mac_key = msg_key[32:64]
iv = msg_key[64:80]
# 下一条链
st.chain_key = hkdf(st.chain_key, info=b"STEP", length=32)
st.counter += 1
return aes_key, mac_key, iv
# ---------- 发送 ----------
def send(self, to_user: str, plaintext: bytes):
# 若没有会话,先建立
if to_user not in self.sessions:
st = self.establish_session_as_initiator(to_user)
ek_pub_b64 = self._pending_ephemeral_pub
opk_id = self._pending_opk_id
else:
st = self.sessions[to_user]
ek_pub_b64 = None
opk_id = None
aes_key, mac_key, iv = self._derive_message_key(st)
ct = aes_cbc_enc(aes_key, iv, plaintext)
mac = hmac_sha256(mac_key, iv + ct)[:8]
frame = {
"from": self.name,
"to": to_user,
"hdr": {
"init": ek_pub_b64 is not None,
"ek_pub_b64": ek_pub_b64, # 仅首次带上
"opk_id": opk_id,
"counter": st.counter - 1, # 本条的计数
},
"body": {
"iv": b64e(iv),
"ct": b64e(ct),
"mac": b64e(mac),
}
}
self.server.send_frame(to_user, frame)
# ---------- 接收 ----------
def receive_all(self) -> List[Tuple[str, bytes]]:
frames = self.server.pull_frames(self.name)
outputs = []
for f in frames:
sender = f["from"]
hdr = f["hdr"]
body = f["body"]
if sender not in self.sessions:
# 首帧:建立被动会话
assert hdr["init"], "Missing init header"
st = self._establish_session_as_responder(
peer=sender,
ek_pub_b64=hdr["ek_pub_b64"],
opk_id=hdr["opk_id"]
)
else:
st = self.sessions[sender]
# 按对方 counter 对齐(演示:假设顺序到达)
aes_key, mac_key, iv = self._derive_message_key(st)
if st.counter - 1 != hdr["counter"]:
# 简化:严格顺序,真实实现需支持跳号、乱序恢复
raise ValueError("Out-of-order message (demo limitation)")
if b64e(iv) != body["iv"]:
# 教学演示:我们强制使用派生的 iv
iv = b64d(body["iv"]) # 宽松一点也可以直接信任对端 iv
ct = b64d(body["ct"])
mac = b64d(body["mac"])
calc = hmac_sha256(mac_key, iv + ct)[:8]
if not hmac.compare_digest(mac, calc):
raise ValueError("MAC verification failed")
pt = aes_cbc_dec(aes_key, iv, ct)
outputs.append((sender, pt))
return outputs
# ------------------------- Demo --------------------------
def main():
server = RelayServer()
alice = Client("alice", server)
bob = Client("bob", server)
# 发布各自的公钥包
alice.publish()
bob.publish()
# Alice 先发一条(会自动建会话)
alice.send("bob", b"Hello Bob, this is Alice.")
# Bob 拉取并解密
for frm in bob.receive_all():
print("[bob] got:", frm)
# Bob 回复
bob.send("alice", b"Hi Alice, Bob here. Message received.")
for frm in alice.receive_all():
print("[alice] got:", frm)
# 连发多条观察每条都换密钥server 无法解密)
for i in range(1, 4):
alice.send("bob", f"Msg#{i} from Alice".encode())
for frm in bob.receive_all():
print("[bob] got:", frm)
# 验证“服务器看不到明文”
print("\n[server] directory keys (truncated):")
print(json.dumps({
u: {
"identity_pub": v.identity_pub[:24] + "...",
"spk_pub": v.signed_prekey_pub[:24] + "...",
"opk_count": len(v.onetime_prekeys)
} for u, v in server.directory.items()
}, indent=2))
if __name__ == "__main__":
main()

49
testkkfileview.py Normal file
View File

@@ -0,0 +1,49 @@
import base64
import urllib.parse
import requests
def generate_preview_url():
# 原始 URL
origin_url = 'http://192.168.1.28:8080/prod-api/downhole-tool-system/common/files/v1/download/60cbe676ef181249af3fe2cbeea99746'
# 添加 fullfilename 参数
preview_url = origin_url
# Base64 编码
encoded_preview_url = base64.b64encode(preview_url.encode('utf-8')).decode('utf-8')
# URL 编码 Base64 编码的结果
final_url = f"http://127.0.0.1:8012/picturesPreview?urls={urllib.parse.quote(encoded_preview_url)}"
return final_url
def request_with_token():
# 生成完整的 URL
url = generate_preview_url()
print(f"完整的 URL: {url}")
# 设置请求头
headers = {
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzQ2MDA4MDgsImp0aSI6IjJOT1hqbnRJdEh4YUJhQzRqcThJdlMtMVVSayIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.ytexuU6OPBycL6Zoh74YTYCJ_9f-u6emsFDy4-OsDvE', # 替换为实际的 token
'Content-Type': 'application/json'
}
# 发起 GET 请求
response = requests.get(url, headers=headers)
# 处理响应
if response.status_code == 200:
print("请求成功!")
print("响应内容:")
print(response.content) # 如果是图片/文件数据,可以保存到本地
else:
print(f"请求失败,状态码: {response.status_code}")
print("响应内容:")
print(response.text)
# 执行请求
request_with_token()

View File

@@ -0,0 +1,101 @@
import requests
import json
from Crypto.Cipher import DES
from Crypto.Util.Padding import pad
import base64
# 全局变量,用于存储从登录响应中获取的 key 和 iv
DYNAMIC_KEY = None
DYNAMIC_IV = None
def get_des_encrypt(data: str, _key: str, _iv: str) -> str:
"""
使用 DES/CBC/PKCS7 加密字符串。
"""
key = _key.encode('utf-8')
iv = _iv.encode('utf-8')
if len(key) != 8 or len(iv) != 8:
raise ValueError("DES key and IV must be exactly 8 bytes")
plaintext = data.encode('utf-8')
padded_data = pad(plaintext, DES.block_size)
cipher = DES.new(key, DES.MODE_CBC, iv)
encrypted_bytes = cipher.encrypt(padded_data)
return base64.b64encode(encrypted_bytes).decode('utf-8')
def login_and_get_dynamic_key_iv(url, login_data):
"""
发送未加密登录请求,成功后提取 key 和 iv。
返回 (token, key, iv) 或 (None, None, None)
"""
payload = {
"encType": 0,
"x_flag": "",
"data": login_data
}
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json=payload)
print("🔓 [登录] 状态码:", response.status_code)
result = response.json()
print("🔓 [登录] 响应:", json.dumps(result, indent=2, ensure_ascii=False))
data = result.get("data", {})
token = data.get("token")
key = data.get("key")
iv = data.get("iv")
if token and key and iv:
print(f"✅ 登录成功Token: {token}")
print(f"🔑 动态 Key: {key}, IV: {iv}")
return token, key, iv
else:
print("❌ 登录成功但缺少 key 或 iv")
return None, None, None
except Exception as e:
print("❌ 登录异常:", e)
return None, None, None
def send_encrypted_request_with_dynamic_key(url, data, key, iv):
"""
使用动态 key/iv 加密并发送请求。
"""
data_str = json.dumps(data, separators=(',', ':'))
encrypted_data = get_des_encrypt(data_str, key, iv)
payload = {
"encType": 0,
"x_flag": "",
"data": encrypted_data
}
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json=payload)
print("🔒 [加密请求] 状态码:", response.status_code)
print("🔒 [加密请求] 响应:", response.text)
result = response.json()
token = result.get("data", {}).get("token")
if token:
print("✅ 加密请求成功Token:", token)
else:
print("⚠️ 加密请求完成,但无新 token")
except Exception as e:
print("❌ 加密请求异常:", e)
if __name__ == '__main__':
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
login_data = {
"userId": "test002",
"password": "123456"
}
token, key, iv = login_and_get_dynamic_key_iv(url, login_data)
if not (key and iv):
print("🛑 无法获取动态 key/iv退出。")
exit(1)