Compare commits
24 Commits
046bbafa4e
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| c1fa662552 | |||
| bab29150ab | |||
| 8c4e48f0ef | |||
| 167d96b6ba | |||
| 123dfc4411 | |||
| ea2d12c74e | |||
| 9c6f5c3b63 | |||
| 3489709697 | |||
| 3ab5f15b0e | |||
| 5fa68924e3 | |||
| 945ac31725 | |||
| 5b1ce354b9 | |||
| ce607c5637 | |||
| 266ba6ad31 | |||
| f7eb913be3 | |||
| 1211d2fca9 | |||
| 8c58329bfc | |||
| 4ac98d9791 | |||
| 91bb40770a | |||
| 80617d3c28 | |||
| f77dc396db | |||
| d64e709d3f | |||
| eb7316b837 | |||
| 6140aae7f7 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,3 @@
|
|||||||
./venv/
|
./venv/
|
||||||
.idea/
|
.idea/
|
||||||
|
.*png
|
||||||
40
base64EncodeUrl.py
Normal file
40
base64EncodeUrl.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
import base64
|
||||||
|
|
||||||
|
def generate_download_url(base_url: str, file_id: str, fullfilename: str, token: str) -> str:
|
||||||
|
"""
|
||||||
|
生成下载 URL 并对其进行 Base64 编码
|
||||||
|
:param base_url: 文件下载的基础 URL
|
||||||
|
:param file_id: 文件的 ID
|
||||||
|
:param fullfilename: 文件的完整名称
|
||||||
|
:param token: 授权的 Token
|
||||||
|
:return: Base64 编码后的完整下载 URL
|
||||||
|
"""
|
||||||
|
# 拼接完整的下载 URL
|
||||||
|
download_url = f"{base_url}/{file_id}?fullfilename={fullfilename}&token={token}"
|
||||||
|
print(f"Original URL: {download_url}")
|
||||||
|
|
||||||
|
# 对 URL 进行 Base64 编码
|
||||||
|
base64_encoded_url = base64.b64encode(download_url.encode("utf-8")).decode("utf-8")
|
||||||
|
print(f"Base64 Encoded URL: {base64_encoded_url}")
|
||||||
|
|
||||||
|
return base64_encoded_url
|
||||||
|
|
||||||
|
|
||||||
|
# 示例数据
|
||||||
|
pre_view_url = "http://192.168.1.12:8012/onlinePreview"
|
||||||
|
download_url = "http://192.168.1.28:8080/prod-api/file/download"
|
||||||
|
base_url = "http://192.168.1.28:8080/prod-api/file/download"
|
||||||
|
file_id = "a69ea8e0f8e28f92202975a3a4996e7b"
|
||||||
|
fullfilename = "文件预览.pptx"
|
||||||
|
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzUwMzIxMDMsImp0aSI6Ijl4Q3JrVDltXy1iZGh6czhKSWlrdkcySk5XcyIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.T7jZ4bSmPqlwH2LjZeKLRXg_5Q4t7ihRUhd-zvdehak"
|
||||||
|
|
||||||
|
# 生成 Base64 编码的 URL
|
||||||
|
encoded_url = generate_download_url(base_url, file_id, fullfilename, token)
|
||||||
|
|
||||||
|
# 拼接下载和预览 URL
|
||||||
|
down_url = f"{download_url}/{file_id}"
|
||||||
|
final_url = f"{pre_view_url}?url={encoded_url}"
|
||||||
|
|
||||||
|
# 打印结果
|
||||||
|
print(f"Download URL: {down_url}")
|
||||||
|
print(f"Final Preview URL: {final_url}")
|
||||||
91
fastapiTest.py
Normal file
91
fastapiTest.py
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
from fastapi import FastAPI, HTTPException, Response
|
||||||
|
from minio import Minio
|
||||||
|
from minio.error import S3Error
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
# 初始化 MinIO 客户端
|
||||||
|
minio_client = Minio(
|
||||||
|
endpoint="192.168.1.28:9500", # MinIO 服务器地址
|
||||||
|
access_key="UIiBRvUixj3GkJwvItmy", # Access Key
|
||||||
|
secret_key="wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm", # Secret Key
|
||||||
|
secure=False # 如果没有使用 HTTPS,则设置为 False
|
||||||
|
)
|
||||||
|
|
||||||
|
# MinIO 配置
|
||||||
|
BUCKET_NAME = "test-bucket"
|
||||||
|
|
||||||
|
|
||||||
|
class FileRequest(BaseModel):
|
||||||
|
object_name: str # 文件对象名
|
||||||
|
expiry_in_seconds: int = 3600 # URL 有效期(默认 1 小时)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/download-presigned-url")
|
||||||
|
def generate_presigned_url(file_request: FileRequest):
|
||||||
|
"""
|
||||||
|
生成用于下载文件的预签名 URL
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 检查存储桶是否存在
|
||||||
|
if not minio_client.bucket_exists(BUCKET_NAME):
|
||||||
|
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
|
||||||
|
|
||||||
|
# 检查对象是否存在
|
||||||
|
try:
|
||||||
|
minio_client.stat_object(BUCKET_NAME, file_request.object_name)
|
||||||
|
except S3Error:
|
||||||
|
raise HTTPException(status_code=404, detail=f"Object '{file_request.object_name}' does not exist.")
|
||||||
|
|
||||||
|
# 将秒数转换为 timedelta
|
||||||
|
expiry_timedelta = timedelta(seconds=file_request.expiry_in_seconds)
|
||||||
|
|
||||||
|
# 生成预签名 URL
|
||||||
|
presigned_url = minio_client.presigned_get_object(
|
||||||
|
BUCKET_NAME,
|
||||||
|
file_request.object_name,
|
||||||
|
expires=expiry_timedelta # 传递 timedelta
|
||||||
|
)
|
||||||
|
return {"presigned_url": presigned_url}
|
||||||
|
|
||||||
|
except S3Error as e:
|
||||||
|
raise HTTPException(status_code=500, detail=f"Error generating presigned URL: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/download-file/{object_name:path}")
|
||||||
|
def download_file(object_name: str):
|
||||||
|
"""
|
||||||
|
提供文件对象并直接下载
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
print(f"Requested object: {object_name}")
|
||||||
|
|
||||||
|
# 检查存储桶是否存在
|
||||||
|
if not minio_client.bucket_exists(BUCKET_NAME):
|
||||||
|
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
|
||||||
|
|
||||||
|
# 检查对象是否存在
|
||||||
|
try:
|
||||||
|
stat = minio_client.stat_object(BUCKET_NAME, object_name)
|
||||||
|
except S3Error:
|
||||||
|
raise HTTPException(status_code=404, detail=f"Object '{object_name}' does not exist.")
|
||||||
|
|
||||||
|
# 获取文件对象
|
||||||
|
response = minio_client.get_object(BUCKET_NAME, object_name)
|
||||||
|
|
||||||
|
# 设置响应头
|
||||||
|
headers = {
|
||||||
|
"Content-Disposition": f"attachment; filename*=UTF-8''{object_name.split('/')[-1]}",
|
||||||
|
"Content-Type": stat.content_type
|
||||||
|
}
|
||||||
|
return Response(content=response.read(), headers=headers, media_type=stat.content_type)
|
||||||
|
|
||||||
|
except S3Error as e:
|
||||||
|
raise HTTPException(status_code=500, detail=f"Error downloading file: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
|
||||||
|
|
||||||
124
generateSign.py
Normal file
124
generateSign.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
import urllib.parse
|
||||||
|
import hashlib
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
# =================== 运行环境配置 ===================
|
||||||
|
CONFIG = {
|
||||||
|
"dev": {
|
||||||
|
"platform_ifc_system_code": "ef5b17caff6e4da19d6af82d539e894d",
|
||||||
|
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
|
||||||
|
"appId": "57395d2bc668496c9c57d8f2b19bd516",
|
||||||
|
"secret_key": "KMFHKo1Uzrl&MWXorbQIT&C$Qea$uQOY",
|
||||||
|
"ifcUrl": "http://192.168.1.202:8083/dev-api/cenertech-interface-center/IFC2"
|
||||||
|
},
|
||||||
|
"prod": {
|
||||||
|
"platform_ifc_system_code": "57395d2bc668496c9c57d8f2b19bd516",
|
||||||
|
"systemCode": "57395d2bc668496c9c57d8f2b19bd516",
|
||||||
|
"appId": "57395d2bc668496c9c57d8f2b19bd516",
|
||||||
|
"secret_key": "opS=K9Parlf&p+JxBOQD2q+zNZa+uXEE",
|
||||||
|
"ifcUrl": "https://dpc.cet.cnooc/prod-api/cenertech-interface-center/IFC2"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# =================== MD5签名计算 ===================
|
||||||
|
def calculate_md5(param, secret_key):
|
||||||
|
return hashlib.md5((secret_key + param).encode('utf-8')).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def create_sign(param_map, secret_key):
|
||||||
|
sorted_keys = sorted(param_map.keys())
|
||||||
|
parts = []
|
||||||
|
|
||||||
|
for key in sorted_keys:
|
||||||
|
val = param_map[key]
|
||||||
|
|
||||||
|
if val is None:
|
||||||
|
parts.append(f"{key}=")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if isinstance(val, list):
|
||||||
|
for item in val:
|
||||||
|
encoded_val = urllib.parse.quote(str(item), encoding='utf-8')
|
||||||
|
parts.append(f"{key}={encoded_val}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
val_str = str(val)
|
||||||
|
|
||||||
|
if key != "REQUEST_BODY_CONTENT":
|
||||||
|
val_str = urllib.parse.quote(val_str, encoding='utf-8')
|
||||||
|
|
||||||
|
parts.append(f"{key}={val_str}")
|
||||||
|
|
||||||
|
param = "&".join(parts)
|
||||||
|
param = "".join(param.split())
|
||||||
|
|
||||||
|
print("参数明文 param:", param)
|
||||||
|
|
||||||
|
sign = calculate_md5(param, secret_key)
|
||||||
|
print("生成签名 sign:", sign)
|
||||||
|
|
||||||
|
return sign
|
||||||
|
|
||||||
|
|
||||||
|
# =================== Python 主程序 ===================
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
# 选择 dev / prod
|
||||||
|
env = "dev"
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
env = sys.argv[1]
|
||||||
|
print(f"\n=== 当前环境:{env} ===\n")
|
||||||
|
|
||||||
|
if env not in CONFIG:
|
||||||
|
print("❗ 错误:请使用 python sign.py dev 或 python sign.py prod")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
cfg = CONFIG[env]
|
||||||
|
|
||||||
|
systemCode = cfg["systemCode"]
|
||||||
|
appId = cfg["appId"]
|
||||||
|
secret_key = cfg["secret_key"]
|
||||||
|
platform_ifc_system_code = cfg["platform_ifc_system_code"]
|
||||||
|
ifcUrl = cfg["ifcUrl"]
|
||||||
|
|
||||||
|
ts = str(int(time.time() * 1000))
|
||||||
|
randomString = uuid.uuid4().hex
|
||||||
|
|
||||||
|
selector = {
|
||||||
|
"searchKeys": ["9cb864213c6f48ceaf90e98e7ca375e9","3DC1B33E1B5B431E99FA163BF9E86E6A","13336","b353614a47e2425a8a8885d270267407"],
|
||||||
|
"userType": "1",
|
||||||
|
"hasCascade": True
|
||||||
|
}
|
||||||
|
|
||||||
|
request_body_json = json.dumps(selector, separators=(',', ':'))
|
||||||
|
|
||||||
|
param_map = {
|
||||||
|
"systemCode": systemCode,
|
||||||
|
"timestamp": ts,
|
||||||
|
"nonce": randomString,
|
||||||
|
"REQUEST_BODY_CONTENT": request_body_json
|
||||||
|
}
|
||||||
|
|
||||||
|
sign = create_sign(param_map, secret_key)
|
||||||
|
|
||||||
|
apiPath = "/userListByDeptSearch"
|
||||||
|
fullUrl = f"{ifcUrl}/{platform_ifc_system_code}{apiPath}"
|
||||||
|
|
||||||
|
curl = f"""
|
||||||
|
curl -X POST "{fullUrl}" \\
|
||||||
|
-H "Content-Type: application/json" \\
|
||||||
|
-H "systemCode: {systemCode}" \\
|
||||||
|
-H "timestamp: {ts}" \\
|
||||||
|
-H "nonce: {randomString}" \\
|
||||||
|
-H "sign: {sign}" \\
|
||||||
|
-H "App-Id: {appId}" \\
|
||||||
|
-d '{request_body_json}'
|
||||||
|
"""
|
||||||
|
|
||||||
|
print("\n===== 最终 curl 请求 =====\n")
|
||||||
|
print(curl)
|
||||||
28
generaterCardId.py
Normal file
28
generaterCardId.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
import random
|
||||||
|
|
||||||
|
def generateCardId(length):
|
||||||
|
"""
|
||||||
|
生成指定长度的随机卡号
|
||||||
|
:param length: 卡号的长度
|
||||||
|
:return: 生成的随机卡号
|
||||||
|
"""
|
||||||
|
if length <= 0:
|
||||||
|
raise ValueError("Length must be a positive integer.")
|
||||||
|
|
||||||
|
# 初始化卡号
|
||||||
|
card_id = ""
|
||||||
|
# 数字范围
|
||||||
|
numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||||
|
|
||||||
|
# 生成随机卡号
|
||||||
|
for _ in range(length):
|
||||||
|
random_number = random.choice(numbers)
|
||||||
|
card_id += str(random_number)
|
||||||
|
|
||||||
|
return card_id
|
||||||
|
|
||||||
|
# 测试函数
|
||||||
|
if __name__ == "__main__":
|
||||||
|
length = 18 # 指定卡号长度
|
||||||
|
card_id = generateCardId(length)
|
||||||
|
print(f"Generated Card ID: {card_id}")
|
||||||
59
recive_wits.py
Normal file
59
recive_wits.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import socket
|
||||||
|
import time
|
||||||
|
|
||||||
|
HOST = "192.168.1.41"
|
||||||
|
PORT = 9928
|
||||||
|
|
||||||
|
def connect():
|
||||||
|
"""建立 TCP 连接(带重试)"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print(f"Connecting to {HOST}:{PORT} ...")
|
||||||
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
s.connect((HOST, PORT))
|
||||||
|
s.settimeout(5)
|
||||||
|
print("Connected successfully!")
|
||||||
|
return s
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Connection failed: {e}, retrying in 3s...")
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
|
||||||
|
def receive_wits_data(sock):
|
||||||
|
"""持续接收 WITS 数据(自动处理黏包/拆包)"""
|
||||||
|
buffer = ""
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = sock.recv(4096)
|
||||||
|
|
||||||
|
# 服务器关闭
|
||||||
|
if not data:
|
||||||
|
print("Server closed connection.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
buffer += data.decode(errors="ignore")
|
||||||
|
|
||||||
|
# WITS 多为 \r\n 分隔
|
||||||
|
while "\n" in buffer:
|
||||||
|
line, buffer = buffer.split("\n", 1)
|
||||||
|
line = line.strip()
|
||||||
|
if line:
|
||||||
|
print("Received:", line)
|
||||||
|
|
||||||
|
except socket.timeout:
|
||||||
|
# 正常情况,继续接收即可
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
print("Error:", e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
while True:
|
||||||
|
sock = connect()
|
||||||
|
ok = receive_wits_data(sock)
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
print("Reconnecting in 3 seconds...")
|
||||||
|
time.sleep(3)
|
||||||
61
send_wtis.py
Normal file
61
send_wtis.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import socket
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
HOST = "192.168.1.5" # 目标地址
|
||||||
|
PORT = 9929 # 目标端口
|
||||||
|
|
||||||
|
# 你给的示例里出现的所有前四位字段
|
||||||
|
WITS_CODES = [
|
||||||
|
"0105",
|
||||||
|
"0106",
|
||||||
|
"0108",
|
||||||
|
"0112",
|
||||||
|
"0114",
|
||||||
|
"0116",
|
||||||
|
"0118",
|
||||||
|
"0120",
|
||||||
|
"0121",
|
||||||
|
"0122"
|
||||||
|
]
|
||||||
|
|
||||||
|
def random_value(prefix):
|
||||||
|
"""
|
||||||
|
生成类似你收到的数据:
|
||||||
|
- 有些是整数:例如 0105 251114
|
||||||
|
- 有些是浮点:例如 0108 37.26745
|
||||||
|
"""
|
||||||
|
# 随机决定生成整数 or 小数
|
||||||
|
if random.random() < 0.3:
|
||||||
|
# 生成整数(6位左右)
|
||||||
|
value = str(random.randint(100000, 999999))
|
||||||
|
else:
|
||||||
|
# 生成浮点(保留4~5位小数)
|
||||||
|
value = f"{random.uniform(0, 500):.5f}"
|
||||||
|
|
||||||
|
return prefix + value
|
||||||
|
|
||||||
|
|
||||||
|
def send_wits_data():
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.connect((HOST, PORT))
|
||||||
|
print("Connected to target. Sending WITS data...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
for code in WITS_CODES:
|
||||||
|
msg = random_value(code)
|
||||||
|
|
||||||
|
sock.sendall((msg + "\r\n").encode())
|
||||||
|
print("Sent:", msg)
|
||||||
|
|
||||||
|
time.sleep(0.2) # 每条间隔 200ms,可根据需要调整
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print("Error:", e)
|
||||||
|
finally:
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
send_wits_data()
|
||||||
66
simulate_monty_hall.py
Normal file
66
simulate_monty_hall.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
import random
|
||||||
|
|
||||||
|
|
||||||
|
def simulate_monty_hall(switch, num_trials=100000):
|
||||||
|
"""
|
||||||
|
模拟三门悖论(蒙提霍尔问题)
|
||||||
|
|
||||||
|
参数:
|
||||||
|
- switch: bool, 是否在主持人打开门后选择换门
|
||||||
|
- num_trials: int, 模拟次数(默认10万次)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
- 获胜次数
|
||||||
|
- 胜率
|
||||||
|
"""
|
||||||
|
win_count = 0
|
||||||
|
doors = [1, 2, 3] # 三扇门
|
||||||
|
|
||||||
|
for _ in range(num_trials):
|
||||||
|
# 随机放置汽车(1 表示汽车,0 表示山羊)
|
||||||
|
car_door = random.choice(doors)
|
||||||
|
|
||||||
|
# 参赛者初始选择
|
||||||
|
player_choice = random.choice(doors)
|
||||||
|
|
||||||
|
# 主持人要打开一扇门:不能是参赛者选的,也不能是有车的
|
||||||
|
remaining_doors = [d for d in doors if d != player_choice and d != car_door]
|
||||||
|
host_opens = random.choice(remaining_doors)
|
||||||
|
|
||||||
|
# 换门逻辑
|
||||||
|
if switch:
|
||||||
|
# 换到剩下的那一扇未被选、未被开的门
|
||||||
|
final_choice = [d for d in doors if d != player_choice and d != host_opens][0]
|
||||||
|
else:
|
||||||
|
# 不换,保持原选择
|
||||||
|
final_choice = player_choice
|
||||||
|
|
||||||
|
# 判断是否获胜
|
||||||
|
if final_choice == car_door:
|
||||||
|
win_count += 1
|
||||||
|
|
||||||
|
win_rate = win_count / num_trials
|
||||||
|
return win_count, win_rate
|
||||||
|
|
||||||
|
|
||||||
|
# === 运行模拟 ===
|
||||||
|
if __name__ == "__main__":
|
||||||
|
num_trials = 100000
|
||||||
|
|
||||||
|
print(f"模拟 {num_trials} 次三门问题:\n")
|
||||||
|
|
||||||
|
# 情况1:坚持不换门
|
||||||
|
wins_stay, rate_stay = simulate_monty_hall(switch=False, num_trials=num_trials)
|
||||||
|
print(f"坚持原选择(不换门):")
|
||||||
|
print(f" 获胜次数: {wins_stay}")
|
||||||
|
print(f" 胜率: {rate_stay:.4f} ({rate_stay * 100:.2f}%)\n")
|
||||||
|
|
||||||
|
# 情况2:总是换门
|
||||||
|
wins_switch, rate_switch = simulate_monty_hall(switch=True, num_trials=num_trials)
|
||||||
|
print(f"总是换门:")
|
||||||
|
print(f" 获胜次数: {wins_switch}")
|
||||||
|
print(f" 胜率: {rate_switch:.4f} ({rate_switch * 100:.2f}%)\n")
|
||||||
|
|
||||||
|
print("理论值对比:")
|
||||||
|
print(" 不换门胜率: 1/3 ≈ 33.33%")
|
||||||
|
print(" 换门胜率: 2/3 ≈ 66.67%")
|
||||||
66
snowflake_generator.py
Normal file
66
snowflake_generator.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
import time
|
||||||
|
import threading
|
||||||
|
EPOCH_JAVA_COMMON = 1288834974657
|
||||||
|
class Snowflake:
|
||||||
|
def __init__(self, datacenter_id: int = 0, worker_id: int = 0, epoch: int = 1480166465631):
|
||||||
|
# 机器和数据中心配置
|
||||||
|
self.worker_id_bits = 5
|
||||||
|
self.datacenter_id_bits = 5
|
||||||
|
self.sequence_bits = 12
|
||||||
|
|
||||||
|
self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
|
||||||
|
self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
|
||||||
|
|
||||||
|
if worker_id > self.max_worker_id or worker_id < 0:
|
||||||
|
raise ValueError(f"worker_id 超出范围 (0 ~ {self.max_worker_id})")
|
||||||
|
if datacenter_id > self.max_datacenter_id or datacenter_id < 0:
|
||||||
|
raise ValueError(f"datacenter_id 超出范围 (0 ~ {self.max_datacenter_id})")
|
||||||
|
|
||||||
|
self.worker_id = worker_id
|
||||||
|
self.datacenter_id = datacenter_id
|
||||||
|
self.epoch = epoch
|
||||||
|
|
||||||
|
self.sequence = 0
|
||||||
|
self.last_timestamp = -1
|
||||||
|
|
||||||
|
self.worker_id_shift = self.sequence_bits
|
||||||
|
self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
|
||||||
|
self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
|
||||||
|
self.sequence_mask = -1 ^ (-1 << self.sequence_bits)
|
||||||
|
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
|
def _timestamp(self):
|
||||||
|
return int(time.time() * 1000)
|
||||||
|
|
||||||
|
def _til_next_millis(self, last_timestamp):
|
||||||
|
timestamp = self._timestamp()
|
||||||
|
while timestamp <= last_timestamp:
|
||||||
|
timestamp = self._timestamp()
|
||||||
|
return timestamp
|
||||||
|
|
||||||
|
def next_id(self) -> int:
|
||||||
|
with self.lock:
|
||||||
|
timestamp = self._timestamp()
|
||||||
|
if timestamp < self.last_timestamp:
|
||||||
|
raise Exception("时钟回拨,拒绝生成ID")
|
||||||
|
|
||||||
|
if timestamp == self.last_timestamp:
|
||||||
|
self.sequence = (self.sequence + 1) & self.sequence_mask
|
||||||
|
if self.sequence == 0:
|
||||||
|
timestamp = self._til_next_millis(self.last_timestamp)
|
||||||
|
else:
|
||||||
|
self.sequence = 0
|
||||||
|
|
||||||
|
self.last_timestamp = timestamp
|
||||||
|
|
||||||
|
return ((timestamp - self.epoch) << self.timestamp_left_shift) | \
|
||||||
|
(self.datacenter_id << self.datacenter_id_shift) | \
|
||||||
|
(self.worker_id << self.worker_id_shift) | \
|
||||||
|
self.sequence
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
snowflake = Snowflake(datacenter_id=0, worker_id=0, epoch=EPOCH_JAVA_COMMON)
|
||||||
|
for _ in range(10):
|
||||||
|
print(str(snowflake.next_id()))
|
||||||
56
testApi.py
Normal file
56
testApi.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationAPIClient:
|
||||||
|
def __init__(self, environment="dev"):
|
||||||
|
# 定义不同环境的域名
|
||||||
|
self.environment_urls = {
|
||||||
|
"dev": "http://dev.example.com",
|
||||||
|
"test": "http://192.168.1.202:9100/downhole-tool-system",
|
||||||
|
"prod": "http://prod.example.com"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 设置当前环境的域名
|
||||||
|
if environment not in self.environment_urls:
|
||||||
|
raise ValueError(f"Invalid environment '{environment}'. Choose from: {list(self.environment_urls.keys())}")
|
||||||
|
|
||||||
|
self.base_url = self.environment_urls[environment]
|
||||||
|
|
||||||
|
def get_my_task(self, token):
|
||||||
|
# 拼接请求的完整 URL
|
||||||
|
url = f"{self.base_url}/notification/v1/my-task"
|
||||||
|
|
||||||
|
# 设置请求头,包含 Authorization Token
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"Bearer {token}"
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 发起 GET 请求
|
||||||
|
response = requests.get(url, headers=headers)
|
||||||
|
|
||||||
|
# 检查 HTTP 状态码并返回结果
|
||||||
|
if response.status_code == 200:
|
||||||
|
return response.json() # 返回解析后的 JSON 数据
|
||||||
|
else:
|
||||||
|
return {"error": f"HTTP {response.status_code}", "details": response.text}
|
||||||
|
except requests.RequestException as e:
|
||||||
|
return {"error": "Request failed", "details": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
# 使用示例
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# 切换环境:可选 "dev", "test", "prod"
|
||||||
|
environment = "test"
|
||||||
|
|
||||||
|
# 创建客户端
|
||||||
|
client = NotificationAPIClient(environment=environment)
|
||||||
|
|
||||||
|
# 设置 Token
|
||||||
|
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzY5Mjc5MTQsImp0aSI6InZQTm5iTUdiczJ6OHdPYi1VVTljcHVxd0oxVSIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.5Z0yXfoLvJc4qEPf6RMS8Xw6VtjaNMPAyfQxEp0DIkM"
|
||||||
|
|
||||||
|
# 发起请求
|
||||||
|
result = client.get_my_task(token)
|
||||||
|
|
||||||
|
# 输出结果
|
||||||
|
print(result)
|
||||||
34
testConnectKingBase.py
Normal file
34
testConnectKingBase.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
import psycopg2
|
||||||
|
|
||||||
|
# 请根据实际情况修改以下参数
|
||||||
|
HOST = "192.168.1.28"
|
||||||
|
PORT = "54321"
|
||||||
|
DATABASE = "upp_demo"
|
||||||
|
USER = "system"
|
||||||
|
PASSWORD = "kingbase_ok"
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 建立连接
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
host=HOST,
|
||||||
|
port=PORT,
|
||||||
|
database=DATABASE,
|
||||||
|
user=USER,
|
||||||
|
password=PASSWORD
|
||||||
|
)
|
||||||
|
print("成功连接到人大金仓数据库!")
|
||||||
|
|
||||||
|
# 创建一个游标对象
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
# 测试执行简单SQL语句,如查询版本信息
|
||||||
|
cur.execute("SELECT version();")
|
||||||
|
version_info = cur.fetchone()
|
||||||
|
print("数据库版本信息:", version_info)
|
||||||
|
|
||||||
|
# 关闭游标和连接
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
print("连接失败:", e)
|
||||||
@@ -4,11 +4,11 @@ from mysql.connector import Error
|
|||||||
def test_mysql_connection():
|
def test_mysql_connection():
|
||||||
# MySQL 配置
|
# MySQL 配置
|
||||||
config = {
|
config = {
|
||||||
'host': '192.168.2.20',
|
'host': '192.168.1.202',
|
||||||
'port': 8006,
|
'port': 3306,
|
||||||
'user': 'mticloud',
|
'user': 'gitea',
|
||||||
'password': 'fT3KsNDahADGcWCZ',
|
'password': 'CYVVMMxrox3ThsGy',
|
||||||
'database': 'mti-cloud',
|
'database': 'gitea',
|
||||||
'charset': 'utf8mb4'
|
'charset': 'utf8mb4'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
33
testGeneratefile.py
Normal file
33
testGeneratefile.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
import os
|
||||||
|
import random
|
||||||
|
|
||||||
|
# 输出目录
|
||||||
|
output_dir = "./output_files"
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# 配置
|
||||||
|
file_count = 40
|
||||||
|
min_size_mb = 70
|
||||||
|
max_size_mb = 80
|
||||||
|
chunk_size = 1024 * 1024 # 每次写入 1MB
|
||||||
|
|
||||||
|
|
||||||
|
def generate_random_file(file_path, size_bytes):
|
||||||
|
with open(file_path, "wb") as f:
|
||||||
|
written = 0
|
||||||
|
while written < size_bytes:
|
||||||
|
chunk = os.urandom(min(chunk_size, size_bytes - written))
|
||||||
|
f.write(chunk)
|
||||||
|
written += len(chunk)
|
||||||
|
|
||||||
|
|
||||||
|
for i in range(1, file_count + 1):
|
||||||
|
file_size_mb = random.randint(min_size_mb, max_size_mb)
|
||||||
|
file_size_bytes = file_size_mb * 1024 * 1024
|
||||||
|
filename = f"file_{i}.bin"
|
||||||
|
filepath = os.path.join(output_dir, filename)
|
||||||
|
|
||||||
|
print(f"Creating {filename} ({file_size_mb} MB)...")
|
||||||
|
generate_random_file(filepath, file_size_bytes)
|
||||||
|
|
||||||
|
print("✅ 所有文件生成完毕。")
|
||||||
58
testUserMinioDownloadFile.py
Normal file
58
testUserMinioDownloadFile.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
from minio import Minio
|
||||||
|
from minio.error import S3Error
|
||||||
|
import mimetypes
|
||||||
|
import os
|
||||||
|
|
||||||
|
def download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name):
|
||||||
|
try:
|
||||||
|
# 初始化 MinIO 客户端
|
||||||
|
minio_client = Minio(
|
||||||
|
endpoint=endpoint,
|
||||||
|
access_key=access_key,
|
||||||
|
secret_key=secret_key,
|
||||||
|
secure=False # 如果 MinIO 没有使用 HTTPS,设置为 False
|
||||||
|
)
|
||||||
|
|
||||||
|
# 检查存储桶是否存在
|
||||||
|
if not minio_client.bucket_exists(bucket_name):
|
||||||
|
print(f"Bucket '{bucket_name}' does not exist.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 获取对象元数据
|
||||||
|
stat = minio_client.stat_object(bucket_name, object_name)
|
||||||
|
content_type = stat.content_type # 获取 Content-Type
|
||||||
|
print(f"Object '{object_name}' Content-Type: {content_type}")
|
||||||
|
|
||||||
|
# 根据 Content-Type 确定文件后缀
|
||||||
|
guessed_extension = mimetypes.guess_extension(content_type)
|
||||||
|
if guessed_extension is None:
|
||||||
|
guessed_extension = "" # 无法推断时默认不加后缀
|
||||||
|
print(f"Guessed file extension: {guessed_extension}")
|
||||||
|
|
||||||
|
# 提取文件名并设置保存路径
|
||||||
|
file_name = os.path.basename(object_name) # 提取路径中的文件名
|
||||||
|
if "." not in file_name: # 如果文件名中没有后缀,添加推断的后缀
|
||||||
|
file_name += guessed_extension
|
||||||
|
local_file_path = f"./{file_name}" # 设置保存路径
|
||||||
|
print(f"File will be saved as: {local_file_path}")
|
||||||
|
|
||||||
|
# 下载文件
|
||||||
|
print(f"Downloading '{object_name}' from bucket '{bucket_name}' to '{local_file_path}'...")
|
||||||
|
minio_client.fget_object(bucket_name, object_name, local_file_path)
|
||||||
|
|
||||||
|
print(f"File '{object_name}' downloaded successfully to '{local_file_path}'.")
|
||||||
|
|
||||||
|
except S3Error as e:
|
||||||
|
print(f"Error occurred: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Unexpected error: {e}")
|
||||||
|
|
||||||
|
# 示例配置
|
||||||
|
endpoint = "192.168.1.28:9500" # MinIO 服务器地址
|
||||||
|
access_key = "UIiBRvUixj3GkJwvItmy" # MinIO Access Key
|
||||||
|
secret_key = "wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm" # MinIO Secret Key
|
||||||
|
bucket_name = "test-bucket" # 存储桶名称
|
||||||
|
object_name = "test/uploadedFiles/20241223/175_big.png" # 不带后缀的文件名
|
||||||
|
|
||||||
|
# 测试下载文件
|
||||||
|
download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name)
|
||||||
20
test_websocket.py
Normal file
20
test_websocket.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
import ssl
|
||||||
|
import websocket
|
||||||
|
|
||||||
|
# URL = "ws://192.168.1.202:9100/well-tool-test-system/ws/"
|
||||||
|
|
||||||
|
URL = "wss://192.168.1.87/ws/"
|
||||||
|
ws = websocket.WebSocketApp(
|
||||||
|
URL,
|
||||||
|
on_open=lambda ws: print("连接成功"),
|
||||||
|
on_message=lambda ws, msg: print("消息:", msg),
|
||||||
|
on_error=lambda ws, err: print("错误:", err),
|
||||||
|
on_close=lambda ws, code, reason: print("关闭:", code, reason),
|
||||||
|
)
|
||||||
|
|
||||||
|
ws.run_forever(
|
||||||
|
sslopt={
|
||||||
|
"cert_reqs": ssl.CERT_NONE,
|
||||||
|
"check_hostname": False,
|
||||||
|
}
|
||||||
|
)
|
||||||
352
test_whatsapp.py
Normal file
352
test_whatsapp.py
Normal file
@@ -0,0 +1,352 @@
|
|||||||
|
import os, json, hmac, base64, hashlib, time
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
|
||||||
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||||
|
|
||||||
|
# ------------------------- Utils -------------------------
|
||||||
|
|
||||||
|
def b64e(b: bytes) -> str:
|
||||||
|
return base64.b64encode(b).decode()
|
||||||
|
|
||||||
|
def b64d(s: str) -> bytes:
|
||||||
|
return base64.b64decode(s.encode())
|
||||||
|
|
||||||
|
def hkdf(ikm: bytes, info: bytes, length: int = 32) -> bytes:
|
||||||
|
return HKDF(algorithm=hashes.SHA256(), length=length, salt=None, info=info).derive(ikm)
|
||||||
|
|
||||||
|
def aes_cbc_enc(key: bytes, iv: bytes, pt: bytes) -> bytes:
|
||||||
|
# PKCS#7 padding
|
||||||
|
pad = 16 - (len(pt) % 16)
|
||||||
|
pt = pt + bytes([pad])*pad
|
||||||
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
|
||||||
|
return cipher.encryptor().update(pt) + cipher.encryptor().finalize()
|
||||||
|
|
||||||
|
def aes_cbc_dec(key: bytes, iv: bytes, ct: bytes) -> bytes:
|
||||||
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
|
||||||
|
pt = cipher.decryptor().update(ct) + cipher.decryptor().finalize()
|
||||||
|
pad = pt[-1]
|
||||||
|
if pad < 1 or pad > 16 or pt[-pad:] != bytes([pad])*pad:
|
||||||
|
raise ValueError("Bad padding")
|
||||||
|
return pt[:-pad]
|
||||||
|
|
||||||
|
def hmac_sha256(key: bytes, data: bytes) -> bytes:
|
||||||
|
return hmac.new(key, data, hashlib.sha256).digest()
|
||||||
|
|
||||||
|
def x25519_shared(sk: X25519PrivateKey, pk: X25519PublicKey) -> bytes:
|
||||||
|
return sk.exchange(pk)
|
||||||
|
|
||||||
|
def pub_bytes(pk: X25519PublicKey) -> bytes:
|
||||||
|
return pk.public_bytes(encoding=serialization.Encoding.Raw,
|
||||||
|
format=serialization.PublicFormat.Raw)
|
||||||
|
|
||||||
|
def priv_from_bytes(b: bytes) -> X25519PrivateKey:
|
||||||
|
return X25519PrivateKey.from_private_bytes(b)
|
||||||
|
|
||||||
|
def pub_from_bytes(b: bytes) -> X25519PublicKey:
|
||||||
|
return X25519PublicKey.from_public_bytes(b)
|
||||||
|
|
||||||
|
# ------------------ Data structures ----------------------
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PublicBundle:
|
||||||
|
identity_pub: str
|
||||||
|
signed_prekey_pub: str
|
||||||
|
signed_prekey_sig: str # 演示保留字段,未做真实签名校验
|
||||||
|
onetime_prekeys: Dict[str, str] # id -> pub
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SessionState:
|
||||||
|
# 简化:只做单向发送链(对称 ratchet 计数)
|
||||||
|
root_key: bytes
|
||||||
|
chain_key: bytes
|
||||||
|
counter: int
|
||||||
|
peer_identity_pub_b64: str
|
||||||
|
opk_id: Optional[str] # 用了哪个一次性预键(初始化时)
|
||||||
|
established_at: float
|
||||||
|
|
||||||
|
# ------------------------- Server ------------------------
|
||||||
|
|
||||||
|
class RelayServer:
|
||||||
|
"""
|
||||||
|
极简“服务器”:存放用户公开密钥束,提供查询;转发密文帧。
|
||||||
|
不保存/解密消息正文。
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
self.directory: Dict[str, PublicBundle] = {}
|
||||||
|
self.mailbox: Dict[str, List[dict]] = {}
|
||||||
|
|
||||||
|
def register_bundle(self, user: str, bundle: PublicBundle):
|
||||||
|
self.directory[user] = bundle
|
||||||
|
|
||||||
|
def fetch_bundle(self, user: str) -> Optional[PublicBundle]:
|
||||||
|
return self.directory.get(user)
|
||||||
|
|
||||||
|
def mark_onetime_used(self, user: str, opk_id: str):
|
||||||
|
b = self.directory.get(user)
|
||||||
|
if not b: return
|
||||||
|
if opk_id in b.onetime_prekeys:
|
||||||
|
del b.onetime_prekeys[opk_id]
|
||||||
|
|
||||||
|
def send_frame(self, to_user: str, frame: dict):
|
||||||
|
self.mailbox.setdefault(to_user, []).append(frame)
|
||||||
|
|
||||||
|
def pull_frames(self, user: str) -> List[dict]:
|
||||||
|
return self.mailbox.pop(user, [])
|
||||||
|
|
||||||
|
# ------------------------- Client ------------------------
|
||||||
|
|
||||||
|
class Client:
|
||||||
|
def __init__(self, name: str, server: RelayServer):
|
||||||
|
self.name = name
|
||||||
|
self.server = server
|
||||||
|
|
||||||
|
# 长期身份密钥
|
||||||
|
self.ik_priv = X25519PrivateKey.generate()
|
||||||
|
self.ik_pub = self.ik_priv.public_key()
|
||||||
|
|
||||||
|
# 已签名预共享密钥(演示:不做真实签名)
|
||||||
|
self.spk_priv = X25519PrivateKey.generate()
|
||||||
|
self.spk_pub = self.spk_priv.public_key()
|
||||||
|
self.spk_sig = os.urandom(64) # 占位
|
||||||
|
|
||||||
|
# 一次性预共享密钥池
|
||||||
|
self.opk_priv_map: Dict[str, X25519PrivateKey] = {}
|
||||||
|
self.opk_pub_map: Dict[str, X25519PublicKey] = {}
|
||||||
|
for i in range(5):
|
||||||
|
sk = X25519PrivateKey.generate()
|
||||||
|
pk = sk.public_key()
|
||||||
|
opk_id = f"opk-{i}"
|
||||||
|
self.opk_priv_map[opk_id] = sk
|
||||||
|
self.opk_pub_map[opk_id] = pk
|
||||||
|
|
||||||
|
# 会话(按对端用户)
|
||||||
|
self.sessions: Dict[str, SessionState] = {}
|
||||||
|
|
||||||
|
def publish(self):
|
||||||
|
bundle = PublicBundle(
|
||||||
|
identity_pub=b64e(pub_bytes(self.ik_pub)),
|
||||||
|
signed_prekey_pub=b64e(pub_bytes(self.spk_pub)),
|
||||||
|
signed_prekey_sig=b64e(self.spk_sig),
|
||||||
|
onetime_prekeys={oid: b64e(pub_bytes(pk)) for oid, pk in self.opk_pub_map.items()}
|
||||||
|
)
|
||||||
|
self.server.register_bundle(self.name, bundle)
|
||||||
|
|
||||||
|
# ---------- 建会话(发起方)简化 X3DH ----------
|
||||||
|
def establish_session_as_initiator(self, peer: str) -> SessionState:
|
||||||
|
pb = self.server.fetch_bundle(peer)
|
||||||
|
assert pb, f"{peer} has no bundle"
|
||||||
|
|
||||||
|
IKb = pub_from_bytes(b64d(pb.identity_pub))
|
||||||
|
SPKb = pub_from_bytes(b64d(pb.signed_prekey_pub))
|
||||||
|
|
||||||
|
# 选一个对方的一次性预键
|
||||||
|
opk_items = list(pb.onetime_prekeys.items())
|
||||||
|
opk_id, OPKb_b64 = opk_items[0] if opk_items else (None, None)
|
||||||
|
OPKb = pub_from_bytes(b64d(OPKb_b64)) if OPKb_b64 else None
|
||||||
|
|
||||||
|
# 发起者生成临时密钥对
|
||||||
|
EKa_priv = X25519PrivateKey.generate()
|
||||||
|
EKa_pub = EKa_priv.public_key()
|
||||||
|
|
||||||
|
# X3DH 4次 ECDH(无OPK则忽略第4项)
|
||||||
|
s1 = x25519_shared(self.ik_priv, SPKb)
|
||||||
|
s2 = x25519_shared(EKa_priv, IKb)
|
||||||
|
s3 = x25519_shared(EKa_priv, SPKb)
|
||||||
|
parts = [s1, s2, s3]
|
||||||
|
if OPKb:
|
||||||
|
s4 = x25519_shared(EKa_priv, OPKb)
|
||||||
|
parts.append(s4)
|
||||||
|
|
||||||
|
master_secret = b"".join(parts)
|
||||||
|
root_key = hkdf(master_secret, info=b"ROOT", length=32)
|
||||||
|
chain_key = hkdf(root_key, info=b"CHAIN", length=32)
|
||||||
|
|
||||||
|
st = SessionState(
|
||||||
|
root_key=root_key,
|
||||||
|
chain_key=chain_key,
|
||||||
|
counter=0,
|
||||||
|
peer_identity_pub_b64=pb.identity_pub,
|
||||||
|
opk_id=opk_id,
|
||||||
|
established_at=time.time()
|
||||||
|
)
|
||||||
|
self.sessions[peer] = st
|
||||||
|
|
||||||
|
# 发送“建会话 + 第一条消息”的头信息(包含 EKa_pub、opk_id)
|
||||||
|
self._pending_ephemeral_pub = b64e(pub_bytes(EKa_pub))
|
||||||
|
self._pending_opk_id = opk_id
|
||||||
|
return st
|
||||||
|
|
||||||
|
# ---------- 建会话(接收方) ----------
|
||||||
|
def _establish_session_as_responder(self, peer: str, ek_pub_b64: str, opk_id: Optional[str]) -> SessionState:
|
||||||
|
EKa = pub_from_bytes(b64d(ek_pub_b64)) # 对方临时公钥
|
||||||
|
# 自己的密钥
|
||||||
|
IKb_priv = self.ik_priv
|
||||||
|
SPKb_priv = self.spk_priv
|
||||||
|
OPKb_priv = self.opk_priv_map.get(opk_id) if opk_id else None
|
||||||
|
|
||||||
|
# 4次 ECDH(无OPK则忽略第4项)
|
||||||
|
s1 = x25519_shared(SPKb_priv, pub_from_bytes(b64d(self.server.directory[peer].identity_pub))) # = ECDH(Ia, SPKb)
|
||||||
|
# 注意:发起方 s1 是 ECDH(Ia, SPKb),接收方等价项应是 ECDH(SPKb, Ia)
|
||||||
|
# 但我们没有 Ia 私钥,这里换一种对称表达:按消息头与本地密钥构造相同串联
|
||||||
|
# 为确保与发起方一致,我们直接重算:
|
||||||
|
# 对于接收方:ECDH(Iinitiator, Srecipient) == ECDH(Srecipient, Iinitiator)
|
||||||
|
# 需要 Iinitiator 公钥:来自会话第一帧中?为简化,我们用目录中对方 identity_pub。
|
||||||
|
Ia_pub = pub_from_bytes(b64d(self.server.directory[peer].identity_pub))
|
||||||
|
|
||||||
|
s1 = x25519_shared(self.spk_priv, Ia_pub)
|
||||||
|
s2 = x25519_shared(self.ik_priv, EKa)
|
||||||
|
s3 = x25519_shared(self.spk_priv, EKa)
|
||||||
|
parts = [s1, s2, s3]
|
||||||
|
if OPKb_priv:
|
||||||
|
s4 = x25519_shared(OPKb_priv, EKa)
|
||||||
|
parts.append(s4)
|
||||||
|
|
||||||
|
master_secret = b"".join(parts)
|
||||||
|
root_key = hkdf(master_secret, info=b"ROOT", length=32)
|
||||||
|
chain_key = hkdf(root_key, info=b"CHAIN", length=32)
|
||||||
|
|
||||||
|
st = SessionState(
|
||||||
|
root_key=root_key,
|
||||||
|
chain_key=chain_key,
|
||||||
|
counter=0,
|
||||||
|
peer_identity_pub_b64=self.server.directory[peer].identity_pub,
|
||||||
|
opk_id=opk_id,
|
||||||
|
established_at=time.time()
|
||||||
|
)
|
||||||
|
self.sessions[peer] = st
|
||||||
|
# 一次性预键被使用后,服务端目录也标记删除
|
||||||
|
if opk_id:
|
||||||
|
self.server.mark_onetime_used(self.name, opk_id)
|
||||||
|
return st
|
||||||
|
|
||||||
|
# ---------- 每条消息的派生与加密 ----------
|
||||||
|
def _derive_message_key(self, st: SessionState) -> Tuple[bytes, bytes, bytes]:
|
||||||
|
"""
|
||||||
|
从 chain_key 派生本条消息的 (aes_key, hmac_key, iv),然后更新 chain_key,counter+1
|
||||||
|
"""
|
||||||
|
info = b"MSG|" + st.counter.to_bytes(8, "big")
|
||||||
|
msg_key = hkdf(st.chain_key, info=info, length=80) # 32 AES + 32 HMAC + 16 IV
|
||||||
|
aes_key = msg_key[:32]
|
||||||
|
mac_key = msg_key[32:64]
|
||||||
|
iv = msg_key[64:80]
|
||||||
|
# 下一条链
|
||||||
|
st.chain_key = hkdf(st.chain_key, info=b"STEP", length=32)
|
||||||
|
st.counter += 1
|
||||||
|
return aes_key, mac_key, iv
|
||||||
|
|
||||||
|
# ---------- 发送 ----------
|
||||||
|
def send(self, to_user: str, plaintext: bytes):
|
||||||
|
# 若没有会话,先建立
|
||||||
|
if to_user not in self.sessions:
|
||||||
|
st = self.establish_session_as_initiator(to_user)
|
||||||
|
ek_pub_b64 = self._pending_ephemeral_pub
|
||||||
|
opk_id = self._pending_opk_id
|
||||||
|
else:
|
||||||
|
st = self.sessions[to_user]
|
||||||
|
ek_pub_b64 = None
|
||||||
|
opk_id = None
|
||||||
|
|
||||||
|
aes_key, mac_key, iv = self._derive_message_key(st)
|
||||||
|
ct = aes_cbc_enc(aes_key, iv, plaintext)
|
||||||
|
mac = hmac_sha256(mac_key, iv + ct)[:8]
|
||||||
|
|
||||||
|
frame = {
|
||||||
|
"from": self.name,
|
||||||
|
"to": to_user,
|
||||||
|
"hdr": {
|
||||||
|
"init": ek_pub_b64 is not None,
|
||||||
|
"ek_pub_b64": ek_pub_b64, # 仅首次带上
|
||||||
|
"opk_id": opk_id,
|
||||||
|
"counter": st.counter - 1, # 本条的计数
|
||||||
|
},
|
||||||
|
"body": {
|
||||||
|
"iv": b64e(iv),
|
||||||
|
"ct": b64e(ct),
|
||||||
|
"mac": b64e(mac),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.server.send_frame(to_user, frame)
|
||||||
|
|
||||||
|
# ---------- 接收 ----------
|
||||||
|
def receive_all(self) -> List[Tuple[str, bytes]]:
|
||||||
|
frames = self.server.pull_frames(self.name)
|
||||||
|
outputs = []
|
||||||
|
for f in frames:
|
||||||
|
sender = f["from"]
|
||||||
|
hdr = f["hdr"]
|
||||||
|
body = f["body"]
|
||||||
|
|
||||||
|
if sender not in self.sessions:
|
||||||
|
# 首帧:建立被动会话
|
||||||
|
assert hdr["init"], "Missing init header"
|
||||||
|
st = self._establish_session_as_responder(
|
||||||
|
peer=sender,
|
||||||
|
ek_pub_b64=hdr["ek_pub_b64"],
|
||||||
|
opk_id=hdr["opk_id"]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
st = self.sessions[sender]
|
||||||
|
|
||||||
|
# 按对方 counter 对齐(演示:假设顺序到达)
|
||||||
|
aes_key, mac_key, iv = self._derive_message_key(st)
|
||||||
|
if st.counter - 1 != hdr["counter"]:
|
||||||
|
# 简化:严格顺序,真实实现需支持跳号、乱序恢复
|
||||||
|
raise ValueError("Out-of-order message (demo limitation)")
|
||||||
|
|
||||||
|
if b64e(iv) != body["iv"]:
|
||||||
|
# 教学演示:我们强制使用派生的 iv
|
||||||
|
iv = b64d(body["iv"]) # 宽松一点也可以直接信任对端 iv
|
||||||
|
ct = b64d(body["ct"])
|
||||||
|
mac = b64d(body["mac"])
|
||||||
|
calc = hmac_sha256(mac_key, iv + ct)[:8]
|
||||||
|
if not hmac.compare_digest(mac, calc):
|
||||||
|
raise ValueError("MAC verification failed")
|
||||||
|
|
||||||
|
pt = aes_cbc_dec(aes_key, iv, ct)
|
||||||
|
outputs.append((sender, pt))
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
# ------------------------- Demo --------------------------
|
||||||
|
|
||||||
|
def main():
|
||||||
|
server = RelayServer()
|
||||||
|
|
||||||
|
alice = Client("alice", server)
|
||||||
|
bob = Client("bob", server)
|
||||||
|
|
||||||
|
# 发布各自的公钥包
|
||||||
|
alice.publish()
|
||||||
|
bob.publish()
|
||||||
|
|
||||||
|
# Alice 先发一条(会自动建会话)
|
||||||
|
alice.send("bob", b"Hello Bob, this is Alice.")
|
||||||
|
# Bob 拉取并解密
|
||||||
|
for frm in bob.receive_all():
|
||||||
|
print("[bob] got:", frm)
|
||||||
|
|
||||||
|
# Bob 回复
|
||||||
|
bob.send("alice", b"Hi Alice, Bob here. Message received.")
|
||||||
|
for frm in alice.receive_all():
|
||||||
|
print("[alice] got:", frm)
|
||||||
|
|
||||||
|
# 连发多条,观察每条都换密钥(server 无法解密)
|
||||||
|
for i in range(1, 4):
|
||||||
|
alice.send("bob", f"Msg#{i} from Alice".encode())
|
||||||
|
for frm in bob.receive_all():
|
||||||
|
print("[bob] got:", frm)
|
||||||
|
|
||||||
|
# 验证“服务器看不到明文”
|
||||||
|
print("\n[server] directory keys (truncated):")
|
||||||
|
print(json.dumps({
|
||||||
|
u: {
|
||||||
|
"identity_pub": v.identity_pub[:24] + "...",
|
||||||
|
"spk_pub": v.signed_prekey_pub[:24] + "...",
|
||||||
|
"opk_count": len(v.onetime_prekeys)
|
||||||
|
} for u, v in server.directory.items()
|
||||||
|
}, indent=2))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
49
testkkfileview.py
Normal file
49
testkkfileview.py
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import base64
|
||||||
|
import urllib.parse
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
def generate_preview_url():
|
||||||
|
# 原始 URL
|
||||||
|
origin_url = 'http://192.168.1.28:8080/prod-api/downhole-tool-system/common/files/v1/download/60cbe676ef181249af3fe2cbeea99746'
|
||||||
|
|
||||||
|
# 添加 fullfilename 参数
|
||||||
|
preview_url = origin_url
|
||||||
|
|
||||||
|
# Base64 编码
|
||||||
|
encoded_preview_url = base64.b64encode(preview_url.encode('utf-8')).decode('utf-8')
|
||||||
|
|
||||||
|
# URL 编码 Base64 编码的结果
|
||||||
|
final_url = f"http://127.0.0.1:8012/picturesPreview?urls={urllib.parse.quote(encoded_preview_url)}"
|
||||||
|
|
||||||
|
return final_url
|
||||||
|
|
||||||
|
|
||||||
|
def request_with_token():
|
||||||
|
# 生成完整的 URL
|
||||||
|
url = generate_preview_url()
|
||||||
|
|
||||||
|
print(f"完整的 URL: {url}")
|
||||||
|
|
||||||
|
# 设置请求头
|
||||||
|
headers = {
|
||||||
|
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzQ2MDA4MDgsImp0aSI6IjJOT1hqbnRJdEh4YUJhQzRqcThJdlMtMVVSayIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.ytexuU6OPBycL6Zoh74YTYCJ_9f-u6emsFDy4-OsDvE', # 替换为实际的 token
|
||||||
|
'Content-Type': 'application/json'
|
||||||
|
}
|
||||||
|
|
||||||
|
# 发起 GET 请求
|
||||||
|
response = requests.get(url, headers=headers)
|
||||||
|
|
||||||
|
# 处理响应
|
||||||
|
if response.status_code == 200:
|
||||||
|
print("请求成功!")
|
||||||
|
print("响应内容:")
|
||||||
|
print(response.content) # 如果是图片/文件数据,可以保存到本地
|
||||||
|
else:
|
||||||
|
print(f"请求失败,状态码: {response.status_code}")
|
||||||
|
print("响应内容:")
|
||||||
|
print(response.text)
|
||||||
|
|
||||||
|
|
||||||
|
# 执行请求
|
||||||
|
request_with_token()
|
||||||
101
trian_system_login_and_get_token.py
Normal file
101
trian_system_login_and_get_token.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
import requests
|
||||||
|
import json
|
||||||
|
from Crypto.Cipher import DES
|
||||||
|
from Crypto.Util.Padding import pad
|
||||||
|
import base64
|
||||||
|
|
||||||
|
# 全局变量,用于存储从登录响应中获取的 key 和 iv
|
||||||
|
DYNAMIC_KEY = None
|
||||||
|
DYNAMIC_IV = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_des_encrypt(data: str, _key: str, _iv: str) -> str:
|
||||||
|
"""
|
||||||
|
使用 DES/CBC/PKCS7 加密字符串。
|
||||||
|
"""
|
||||||
|
key = _key.encode('utf-8')
|
||||||
|
iv = _iv.encode('utf-8')
|
||||||
|
|
||||||
|
if len(key) != 8 or len(iv) != 8:
|
||||||
|
raise ValueError("DES key and IV must be exactly 8 bytes")
|
||||||
|
|
||||||
|
plaintext = data.encode('utf-8')
|
||||||
|
padded_data = pad(plaintext, DES.block_size)
|
||||||
|
cipher = DES.new(key, DES.MODE_CBC, iv)
|
||||||
|
encrypted_bytes = cipher.encrypt(padded_data)
|
||||||
|
return base64.b64encode(encrypted_bytes).decode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
def login_and_get_dynamic_key_iv(url, login_data):
|
||||||
|
"""
|
||||||
|
发送未加密登录请求,成功后提取 key 和 iv。
|
||||||
|
返回 (token, key, iv) 或 (None, None, None)
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"encType": 0,
|
||||||
|
"x_flag": "",
|
||||||
|
"data": login_data
|
||||||
|
}
|
||||||
|
headers = {"Content-Type": "application/json"}
|
||||||
|
try:
|
||||||
|
response = requests.post(url, headers=headers, json=payload)
|
||||||
|
print("🔓 [登录] 状态码:", response.status_code)
|
||||||
|
result = response.json()
|
||||||
|
print("🔓 [登录] 响应:", json.dumps(result, indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
|
data = result.get("data", {})
|
||||||
|
token = data.get("token")
|
||||||
|
key = data.get("key")
|
||||||
|
iv = data.get("iv")
|
||||||
|
|
||||||
|
if token and key and iv:
|
||||||
|
print(f"✅ 登录成功!Token: {token}")
|
||||||
|
print(f"🔑 动态 Key: {key}, IV: {iv}")
|
||||||
|
return token, key, iv
|
||||||
|
else:
|
||||||
|
print("❌ 登录成功但缺少 key 或 iv")
|
||||||
|
return None, None, None
|
||||||
|
except Exception as e:
|
||||||
|
print("❌ 登录异常:", e)
|
||||||
|
return None, None, None
|
||||||
|
|
||||||
|
|
||||||
|
def send_encrypted_request_with_dynamic_key(url, data, key, iv):
|
||||||
|
"""
|
||||||
|
使用动态 key/iv 加密并发送请求。
|
||||||
|
"""
|
||||||
|
data_str = json.dumps(data, separators=(',', ':'))
|
||||||
|
encrypted_data = get_des_encrypt(data_str, key, iv)
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"encType": 0,
|
||||||
|
"x_flag": "",
|
||||||
|
"data": encrypted_data
|
||||||
|
}
|
||||||
|
headers = {"Content-Type": "application/json"}
|
||||||
|
try:
|
||||||
|
response = requests.post(url, headers=headers, json=payload)
|
||||||
|
print("🔒 [加密请求] 状态码:", response.status_code)
|
||||||
|
print("🔒 [加密请求] 响应:", response.text)
|
||||||
|
|
||||||
|
result = response.json()
|
||||||
|
token = result.get("data", {}).get("token")
|
||||||
|
if token:
|
||||||
|
print("✅ 加密请求成功,Token:", token)
|
||||||
|
else:
|
||||||
|
print("⚠️ 加密请求完成,但无新 token")
|
||||||
|
except Exception as e:
|
||||||
|
print("❌ 加密请求异常:", e)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
url = "http://127.0.0.1:8084/c/v1/wellMudLogAndWireLine/system/privateManager/userLoginAggregate/other/login"
|
||||||
|
login_data = {
|
||||||
|
"userId": "test002",
|
||||||
|
"password": "123456"
|
||||||
|
}
|
||||||
|
token, key, iv = login_and_get_dynamic_key_iv(url, login_data)
|
||||||
|
|
||||||
|
if not (key and iv):
|
||||||
|
print("🛑 无法获取动态 key/iv,退出。")
|
||||||
|
exit(1)
|
||||||
Reference in New Issue
Block a user