diff --git a/.gitignore b/.gitignore index 9610f72..afb2cb7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ ./venv/ -.idea/ \ No newline at end of file +.idea/ +.*png \ No newline at end of file diff --git a/base64EncodeUrl.py b/base64EncodeUrl.py new file mode 100644 index 0000000..3ef68e1 --- /dev/null +++ b/base64EncodeUrl.py @@ -0,0 +1,40 @@ +import base64 + +def generate_download_url(base_url: str, file_id: str, fullfilename: str, token: str) -> str: + """ + 生成下载 URL 并对其进行 Base64 编码 + :param base_url: 文件下载的基础 URL + :param file_id: 文件的 ID + :param fullfilename: 文件的完整名称 + :param token: 授权的 Token + :return: Base64 编码后的完整下载 URL + """ + # 拼接完整的下载 URL + download_url = f"{base_url}/{file_id}?fullfilename={fullfilename}&token={token}" + print(f"Original URL: {download_url}") + + # 对 URL 进行 Base64 编码 + base64_encoded_url = base64.b64encode(download_url.encode("utf-8")).decode("utf-8") + print(f"Base64 Encoded URL: {base64_encoded_url}") + + return base64_encoded_url + + +# 示例数据 +pre_view_url = "http://192.168.1.12:8012/onlinePreview" +download_url = "http://192.168.1.28:8080/prod-api/file/download" +base_url = "http://192.168.1.28:8080/prod-api/file/download" +file_id = "a69ea8e0f8e28f92202975a3a4996e7b" +fullfilename = "文件预览.pptx" +token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzUwMzIxMDMsImp0aSI6Ijl4Q3JrVDltXy1iZGh6czhKSWlrdkcySk5XcyIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.T7jZ4bSmPqlwH2LjZeKLRXg_5Q4t7ihRUhd-zvdehak" + +# 生成 Base64 编码的 URL +encoded_url = generate_download_url(base_url, file_id, fullfilename, token) + +# 拼接下载和预览 URL +down_url = f"{download_url}/{file_id}" +final_url = f"{pre_view_url}?url={encoded_url}" + +# 打印结果 +print(f"Download URL: {down_url}") +print(f"Final Preview URL: {final_url}") diff --git a/fastapiTest.py b/fastapiTest.py new file mode 100644 index 0000000..b2b9d1d --- /dev/null +++ b/fastapiTest.py @@ -0,0 +1,91 @@ +from fastapi import FastAPI, HTTPException, Response +from minio import Minio +from minio.error import S3Error +from pydantic import BaseModel +from datetime import timedelta + +app = FastAPI() + +# 初始化 MinIO 客户端 +minio_client = Minio( + endpoint="192.168.1.28:9500", # MinIO 服务器地址 + access_key="UIiBRvUixj3GkJwvItmy", # Access Key + secret_key="wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm", # Secret Key + secure=False # 如果没有使用 HTTPS,则设置为 False +) + +# MinIO 配置 +BUCKET_NAME = "test-bucket" + + +class FileRequest(BaseModel): + object_name: str # 文件对象名 + expiry_in_seconds: int = 3600 # URL 有效期(默认 1 小时) + + +@app.post("/download-presigned-url") +def generate_presigned_url(file_request: FileRequest): + """ + 生成用于下载文件的预签名 URL + """ + try: + # 检查存储桶是否存在 + if not minio_client.bucket_exists(BUCKET_NAME): + raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.") + + # 检查对象是否存在 + try: + minio_client.stat_object(BUCKET_NAME, file_request.object_name) + except S3Error: + raise HTTPException(status_code=404, detail=f"Object '{file_request.object_name}' does not exist.") + + # 将秒数转换为 timedelta + expiry_timedelta = timedelta(seconds=file_request.expiry_in_seconds) + + # 生成预签名 URL + presigned_url = minio_client.presigned_get_object( + BUCKET_NAME, + file_request.object_name, + expires=expiry_timedelta # 传递 timedelta + ) + return {"presigned_url": presigned_url} + + except S3Error as e: + raise HTTPException(status_code=500, detail=f"Error generating presigned URL: {e}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Unexpected error: {e}") + + +@app.get("/download-file/{object_name:path}") +def download_file(object_name: str): + """ + 提供文件对象并直接下载 + """ + try: + print(f"Requested object: {object_name}") + + # 检查存储桶是否存在 + if not minio_client.bucket_exists(BUCKET_NAME): + raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.") + + # 检查对象是否存在 + try: + stat = minio_client.stat_object(BUCKET_NAME, object_name) + except S3Error: + raise HTTPException(status_code=404, detail=f"Object '{object_name}' does not exist.") + + # 获取文件对象 + response = minio_client.get_object(BUCKET_NAME, object_name) + + # 设置响应头 + headers = { + "Content-Disposition": f"attachment; filename*=UTF-8''{object_name.split('/')[-1]}", + "Content-Type": stat.content_type + } + return Response(content=response.read(), headers=headers, media_type=stat.content_type) + + except S3Error as e: + raise HTTPException(status_code=500, detail=f"Error downloading file: {e}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Unexpected error: {e}") + diff --git a/generaterCardId.py b/generaterCardId.py new file mode 100644 index 0000000..562ea46 --- /dev/null +++ b/generaterCardId.py @@ -0,0 +1,28 @@ +import random + +def generateCardId(length): + """ + 生成指定长度的随机卡号 + :param length: 卡号的长度 + :return: 生成的随机卡号 + """ + if length <= 0: + raise ValueError("Length must be a positive integer.") + + # 初始化卡号 + card_id = "" + # 数字范围 + numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + # 生成随机卡号 + for _ in range(length): + random_number = random.choice(numbers) + card_id += str(random_number) + + return card_id + +# 测试函数 +if __name__ == "__main__": + length = 18 # 指定卡号长度 + card_id = generateCardId(length) + print(f"Generated Card ID: {card_id}") diff --git a/testApi.py b/testApi.py new file mode 100644 index 0000000..ea40ee5 --- /dev/null +++ b/testApi.py @@ -0,0 +1,56 @@ +import requests + + +class NotificationAPIClient: + def __init__(self, environment="dev"): + # 定义不同环境的域名 + self.environment_urls = { + "dev": "http://dev.example.com", + "test": "http://192.168.1.202:9100/downhole-tool-system", + "prod": "http://prod.example.com" + } + + # 设置当前环境的域名 + if environment not in self.environment_urls: + raise ValueError(f"Invalid environment '{environment}'. Choose from: {list(self.environment_urls.keys())}") + + self.base_url = self.environment_urls[environment] + + def get_my_task(self, token): + # 拼接请求的完整 URL + url = f"{self.base_url}/notification/v1/my-task" + + # 设置请求头,包含 Authorization Token + headers = { + "Authorization": f"Bearer {token}" + } + + try: + # 发起 GET 请求 + response = requests.get(url, headers=headers) + + # 检查 HTTP 状态码并返回结果 + if response.status_code == 200: + return response.json() # 返回解析后的 JSON 数据 + else: + return {"error": f"HTTP {response.status_code}", "details": response.text} + except requests.RequestException as e: + return {"error": "Request failed", "details": str(e)} + + +# 使用示例 +if __name__ == "__main__": + # 切换环境:可选 "dev", "test", "prod" + environment = "test" + + # 创建客户端 + client = NotificationAPIClient(environment=environment) + + # 设置 Token + token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzY5Mjc5MTQsImp0aSI6InZQTm5iTUdiczJ6OHdPYi1VVTljcHVxd0oxVSIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.5Z0yXfoLvJc4qEPf6RMS8Xw6VtjaNMPAyfQxEp0DIkM" + + # 发起请求 + result = client.get_my_task(token) + + # 输出结果 + print(result) diff --git a/testConnectKingBase.py b/testConnectKingBase.py new file mode 100644 index 0000000..f8d9ee8 --- /dev/null +++ b/testConnectKingBase.py @@ -0,0 +1,34 @@ +import psycopg2 + +# 请根据实际情况修改以下参数 +HOST = "192.168.1.28" +PORT = "54321" +DATABASE = "upp_demo" +USER = "system" +PASSWORD = "kingbase_ok" + +try: + # 建立连接 + conn = psycopg2.connect( + host=HOST, + port=PORT, + database=DATABASE, + user=USER, + password=PASSWORD + ) + print("成功连接到人大金仓数据库!") + + # 创建一个游标对象 + cur = conn.cursor() + + # 测试执行简单SQL语句,如查询版本信息 + cur.execute("SELECT version();") + version_info = cur.fetchone() + print("数据库版本信息:", version_info) + + # 关闭游标和连接 + cur.close() + conn.close() + +except psycopg2.Error as e: + print("连接失败:", e) diff --git a/testConnectMySQL.py b/testConnectMySQL.py index 6e67223..8f110b7 100644 --- a/testConnectMySQL.py +++ b/testConnectMySQL.py @@ -4,11 +4,11 @@ from mysql.connector import Error def test_mysql_connection(): # MySQL 配置 config = { - 'host': '192.168.2.20', - 'port': 8006, - 'user': 'mticloud', - 'password': 'fT3KsNDahADGcWCZ', - 'database': 'mti-cloud', + 'host': '192.168.1.202', + 'port': 3306, + 'user': 'gitea', + 'password': 'CYVVMMxrox3ThsGy', + 'database': 'gitea', 'charset': 'utf8mb4' } diff --git a/testUserMinioDownloadFile.py b/testUserMinioDownloadFile.py new file mode 100644 index 0000000..1157a09 --- /dev/null +++ b/testUserMinioDownloadFile.py @@ -0,0 +1,58 @@ +from minio import Minio +from minio.error import S3Error +import mimetypes +import os + +def download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name): + try: + # 初始化 MinIO 客户端 + minio_client = Minio( + endpoint=endpoint, + access_key=access_key, + secret_key=secret_key, + secure=False # 如果 MinIO 没有使用 HTTPS,设置为 False + ) + + # 检查存储桶是否存在 + if not minio_client.bucket_exists(bucket_name): + print(f"Bucket '{bucket_name}' does not exist.") + return + + # 获取对象元数据 + stat = minio_client.stat_object(bucket_name, object_name) + content_type = stat.content_type # 获取 Content-Type + print(f"Object '{object_name}' Content-Type: {content_type}") + + # 根据 Content-Type 确定文件后缀 + guessed_extension = mimetypes.guess_extension(content_type) + if guessed_extension is None: + guessed_extension = "" # 无法推断时默认不加后缀 + print(f"Guessed file extension: {guessed_extension}") + + # 提取文件名并设置保存路径 + file_name = os.path.basename(object_name) # 提取路径中的文件名 + if "." not in file_name: # 如果文件名中没有后缀,添加推断的后缀 + file_name += guessed_extension + local_file_path = f"./{file_name}" # 设置保存路径 + print(f"File will be saved as: {local_file_path}") + + # 下载文件 + print(f"Downloading '{object_name}' from bucket '{bucket_name}' to '{local_file_path}'...") + minio_client.fget_object(bucket_name, object_name, local_file_path) + + print(f"File '{object_name}' downloaded successfully to '{local_file_path}'.") + + except S3Error as e: + print(f"Error occurred: {e}") + except Exception as e: + print(f"Unexpected error: {e}") + +# 示例配置 +endpoint = "192.168.1.28:9500" # MinIO 服务器地址 +access_key = "UIiBRvUixj3GkJwvItmy" # MinIO Access Key +secret_key = "wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm" # MinIO Secret Key +bucket_name = "test-bucket" # 存储桶名称 +object_name = "test/uploadedFiles/20241223/175_big.png" # 不带后缀的文件名 + +# 测试下载文件 +download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name) diff --git a/testkkfileview.py b/testkkfileview.py new file mode 100644 index 0000000..3f837a0 --- /dev/null +++ b/testkkfileview.py @@ -0,0 +1,49 @@ +import base64 +import urllib.parse +import requests + + +def generate_preview_url(): + # 原始 URL + origin_url = 'http://192.168.1.28:8080/prod-api/downhole-tool-system/common/files/v1/download/60cbe676ef181249af3fe2cbeea99746' + + # 添加 fullfilename 参数 + preview_url = origin_url + + # Base64 编码 + encoded_preview_url = base64.b64encode(preview_url.encode('utf-8')).decode('utf-8') + + # URL 编码 Base64 编码的结果 + final_url = f"http://127.0.0.1:8012/picturesPreview?urls={urllib.parse.quote(encoded_preview_url)}" + + return final_url + + +def request_with_token(): + # 生成完整的 URL + url = generate_preview_url() + + print(f"完整的 URL: {url}") + + # 设置请求头 + headers = { + 'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzQ2MDA4MDgsImp0aSI6IjJOT1hqbnRJdEh4YUJhQzRqcThJdlMtMVVSayIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.ytexuU6OPBycL6Zoh74YTYCJ_9f-u6emsFDy4-OsDvE', # 替换为实际的 token + 'Content-Type': 'application/json' + } + + # 发起 GET 请求 + response = requests.get(url, headers=headers) + + # 处理响应 + if response.status_code == 200: + print("请求成功!") + print("响应内容:") + print(response.content) # 如果是图片/文件数据,可以保存到本地 + else: + print(f"请求失败,状态码: {response.status_code}") + print("响应内容:") + print(response.text) + + +# 执行请求 +request_with_token()