Compare commits
6 Commits
046bbafa4e
...
91bb40770a
| Author | SHA1 | Date |
|---|---|---|
|
|
91bb40770a | |
|
|
80617d3c28 | |
|
|
f77dc396db | |
|
|
d64e709d3f | |
|
|
eb7316b837 | |
|
|
6140aae7f7 |
|
|
@ -1,2 +1,3 @@
|
|||
./venv/
|
||||
.idea/
|
||||
.idea/
|
||||
.*png
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
import base64
|
||||
|
||||
def generate_download_url(base_url: str, file_id: str, fullfilename: str, token: str) -> str:
|
||||
"""
|
||||
生成下载 URL 并对其进行 Base64 编码
|
||||
:param base_url: 文件下载的基础 URL
|
||||
:param file_id: 文件的 ID
|
||||
:param fullfilename: 文件的完整名称
|
||||
:param token: 授权的 Token
|
||||
:return: Base64 编码后的完整下载 URL
|
||||
"""
|
||||
# 拼接完整的下载 URL
|
||||
download_url = f"{base_url}/{file_id}?fullfilename={fullfilename}&token={token}"
|
||||
print(f"Original URL: {download_url}")
|
||||
|
||||
# 对 URL 进行 Base64 编码
|
||||
base64_encoded_url = base64.b64encode(download_url.encode("utf-8")).decode("utf-8")
|
||||
print(f"Base64 Encoded URL: {base64_encoded_url}")
|
||||
|
||||
return base64_encoded_url
|
||||
|
||||
|
||||
# 示例数据
|
||||
pre_view_url = "http://192.168.1.12:8012/onlinePreview"
|
||||
download_url = "http://192.168.1.28:8080/prod-api/file/download"
|
||||
base_url = "http://192.168.1.28:8080/prod-api/file/download"
|
||||
file_id = "a69ea8e0f8e28f92202975a3a4996e7b"
|
||||
fullfilename = "文件预览.pptx"
|
||||
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzUwMzIxMDMsImp0aSI6Ijl4Q3JrVDltXy1iZGh6czhKSWlrdkcySk5XcyIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.T7jZ4bSmPqlwH2LjZeKLRXg_5Q4t7ihRUhd-zvdehak"
|
||||
|
||||
# 生成 Base64 编码的 URL
|
||||
encoded_url = generate_download_url(base_url, file_id, fullfilename, token)
|
||||
|
||||
# 拼接下载和预览 URL
|
||||
down_url = f"{download_url}/{file_id}"
|
||||
final_url = f"{pre_view_url}?url={encoded_url}"
|
||||
|
||||
# 打印结果
|
||||
print(f"Download URL: {down_url}")
|
||||
print(f"Final Preview URL: {final_url}")
|
||||
|
|
@ -0,0 +1,91 @@
|
|||
from fastapi import FastAPI, HTTPException, Response
|
||||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
from pydantic import BaseModel
|
||||
from datetime import timedelta
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# 初始化 MinIO 客户端
|
||||
minio_client = Minio(
|
||||
endpoint="192.168.1.28:9500", # MinIO 服务器地址
|
||||
access_key="UIiBRvUixj3GkJwvItmy", # Access Key
|
||||
secret_key="wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm", # Secret Key
|
||||
secure=False # 如果没有使用 HTTPS,则设置为 False
|
||||
)
|
||||
|
||||
# MinIO 配置
|
||||
BUCKET_NAME = "test-bucket"
|
||||
|
||||
|
||||
class FileRequest(BaseModel):
|
||||
object_name: str # 文件对象名
|
||||
expiry_in_seconds: int = 3600 # URL 有效期(默认 1 小时)
|
||||
|
||||
|
||||
@app.post("/download-presigned-url")
|
||||
def generate_presigned_url(file_request: FileRequest):
|
||||
"""
|
||||
生成用于下载文件的预签名 URL
|
||||
"""
|
||||
try:
|
||||
# 检查存储桶是否存在
|
||||
if not minio_client.bucket_exists(BUCKET_NAME):
|
||||
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
|
||||
|
||||
# 检查对象是否存在
|
||||
try:
|
||||
minio_client.stat_object(BUCKET_NAME, file_request.object_name)
|
||||
except S3Error:
|
||||
raise HTTPException(status_code=404, detail=f"Object '{file_request.object_name}' does not exist.")
|
||||
|
||||
# 将秒数转换为 timedelta
|
||||
expiry_timedelta = timedelta(seconds=file_request.expiry_in_seconds)
|
||||
|
||||
# 生成预签名 URL
|
||||
presigned_url = minio_client.presigned_get_object(
|
||||
BUCKET_NAME,
|
||||
file_request.object_name,
|
||||
expires=expiry_timedelta # 传递 timedelta
|
||||
)
|
||||
return {"presigned_url": presigned_url}
|
||||
|
||||
except S3Error as e:
|
||||
raise HTTPException(status_code=500, detail=f"Error generating presigned URL: {e}")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
|
||||
|
||||
|
||||
@app.get("/download-file/{object_name:path}")
|
||||
def download_file(object_name: str):
|
||||
"""
|
||||
提供文件对象并直接下载
|
||||
"""
|
||||
try:
|
||||
print(f"Requested object: {object_name}")
|
||||
|
||||
# 检查存储桶是否存在
|
||||
if not minio_client.bucket_exists(BUCKET_NAME):
|
||||
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
|
||||
|
||||
# 检查对象是否存在
|
||||
try:
|
||||
stat = minio_client.stat_object(BUCKET_NAME, object_name)
|
||||
except S3Error:
|
||||
raise HTTPException(status_code=404, detail=f"Object '{object_name}' does not exist.")
|
||||
|
||||
# 获取文件对象
|
||||
response = minio_client.get_object(BUCKET_NAME, object_name)
|
||||
|
||||
# 设置响应头
|
||||
headers = {
|
||||
"Content-Disposition": f"attachment; filename*=UTF-8''{object_name.split('/')[-1]}",
|
||||
"Content-Type": stat.content_type
|
||||
}
|
||||
return Response(content=response.read(), headers=headers, media_type=stat.content_type)
|
||||
|
||||
except S3Error as e:
|
||||
raise HTTPException(status_code=500, detail=f"Error downloading file: {e}")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
|
||||
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
import random
|
||||
|
||||
def generateCardId(length):
|
||||
"""
|
||||
生成指定长度的随机卡号
|
||||
:param length: 卡号的长度
|
||||
:return: 生成的随机卡号
|
||||
"""
|
||||
if length <= 0:
|
||||
raise ValueError("Length must be a positive integer.")
|
||||
|
||||
# 初始化卡号
|
||||
card_id = ""
|
||||
# 数字范围
|
||||
numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||
|
||||
# 生成随机卡号
|
||||
for _ in range(length):
|
||||
random_number = random.choice(numbers)
|
||||
card_id += str(random_number)
|
||||
|
||||
return card_id
|
||||
|
||||
# 测试函数
|
||||
if __name__ == "__main__":
|
||||
length = 18 # 指定卡号长度
|
||||
card_id = generateCardId(length)
|
||||
print(f"Generated Card ID: {card_id}")
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
import requests
|
||||
|
||||
|
||||
class NotificationAPIClient:
|
||||
def __init__(self, environment="dev"):
|
||||
# 定义不同环境的域名
|
||||
self.environment_urls = {
|
||||
"dev": "http://dev.example.com",
|
||||
"test": "http://192.168.1.202:9100/downhole-tool-system",
|
||||
"prod": "http://prod.example.com"
|
||||
}
|
||||
|
||||
# 设置当前环境的域名
|
||||
if environment not in self.environment_urls:
|
||||
raise ValueError(f"Invalid environment '{environment}'. Choose from: {list(self.environment_urls.keys())}")
|
||||
|
||||
self.base_url = self.environment_urls[environment]
|
||||
|
||||
def get_my_task(self, token):
|
||||
# 拼接请求的完整 URL
|
||||
url = f"{self.base_url}/notification/v1/my-task"
|
||||
|
||||
# 设置请求头,包含 Authorization Token
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}"
|
||||
}
|
||||
|
||||
try:
|
||||
# 发起 GET 请求
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
# 检查 HTTP 状态码并返回结果
|
||||
if response.status_code == 200:
|
||||
return response.json() # 返回解析后的 JSON 数据
|
||||
else:
|
||||
return {"error": f"HTTP {response.status_code}", "details": response.text}
|
||||
except requests.RequestException as e:
|
||||
return {"error": "Request failed", "details": str(e)}
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
# 切换环境:可选 "dev", "test", "prod"
|
||||
environment = "test"
|
||||
|
||||
# 创建客户端
|
||||
client = NotificationAPIClient(environment=environment)
|
||||
|
||||
# 设置 Token
|
||||
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzY5Mjc5MTQsImp0aSI6InZQTm5iTUdiczJ6OHdPYi1VVTljcHVxd0oxVSIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.5Z0yXfoLvJc4qEPf6RMS8Xw6VtjaNMPAyfQxEp0DIkM"
|
||||
|
||||
# 发起请求
|
||||
result = client.get_my_task(token)
|
||||
|
||||
# 输出结果
|
||||
print(result)
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
import psycopg2
|
||||
|
||||
# 请根据实际情况修改以下参数
|
||||
HOST = "192.168.1.28"
|
||||
PORT = "54321"
|
||||
DATABASE = "upp_demo"
|
||||
USER = "system"
|
||||
PASSWORD = "kingbase_ok"
|
||||
|
||||
try:
|
||||
# 建立连接
|
||||
conn = psycopg2.connect(
|
||||
host=HOST,
|
||||
port=PORT,
|
||||
database=DATABASE,
|
||||
user=USER,
|
||||
password=PASSWORD
|
||||
)
|
||||
print("成功连接到人大金仓数据库!")
|
||||
|
||||
# 创建一个游标对象
|
||||
cur = conn.cursor()
|
||||
|
||||
# 测试执行简单SQL语句,如查询版本信息
|
||||
cur.execute("SELECT version();")
|
||||
version_info = cur.fetchone()
|
||||
print("数据库版本信息:", version_info)
|
||||
|
||||
# 关闭游标和连接
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
except psycopg2.Error as e:
|
||||
print("连接失败:", e)
|
||||
|
|
@ -4,11 +4,11 @@ from mysql.connector import Error
|
|||
def test_mysql_connection():
|
||||
# MySQL 配置
|
||||
config = {
|
||||
'host': '192.168.2.20',
|
||||
'port': 8006,
|
||||
'user': 'mticloud',
|
||||
'password': 'fT3KsNDahADGcWCZ',
|
||||
'database': 'mti-cloud',
|
||||
'host': '192.168.1.202',
|
||||
'port': 3306,
|
||||
'user': 'gitea',
|
||||
'password': 'CYVVMMxrox3ThsGy',
|
||||
'database': 'gitea',
|
||||
'charset': 'utf8mb4'
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
import mimetypes
|
||||
import os
|
||||
|
||||
def download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name):
|
||||
try:
|
||||
# 初始化 MinIO 客户端
|
||||
minio_client = Minio(
|
||||
endpoint=endpoint,
|
||||
access_key=access_key,
|
||||
secret_key=secret_key,
|
||||
secure=False # 如果 MinIO 没有使用 HTTPS,设置为 False
|
||||
)
|
||||
|
||||
# 检查存储桶是否存在
|
||||
if not minio_client.bucket_exists(bucket_name):
|
||||
print(f"Bucket '{bucket_name}' does not exist.")
|
||||
return
|
||||
|
||||
# 获取对象元数据
|
||||
stat = minio_client.stat_object(bucket_name, object_name)
|
||||
content_type = stat.content_type # 获取 Content-Type
|
||||
print(f"Object '{object_name}' Content-Type: {content_type}")
|
||||
|
||||
# 根据 Content-Type 确定文件后缀
|
||||
guessed_extension = mimetypes.guess_extension(content_type)
|
||||
if guessed_extension is None:
|
||||
guessed_extension = "" # 无法推断时默认不加后缀
|
||||
print(f"Guessed file extension: {guessed_extension}")
|
||||
|
||||
# 提取文件名并设置保存路径
|
||||
file_name = os.path.basename(object_name) # 提取路径中的文件名
|
||||
if "." not in file_name: # 如果文件名中没有后缀,添加推断的后缀
|
||||
file_name += guessed_extension
|
||||
local_file_path = f"./{file_name}" # 设置保存路径
|
||||
print(f"File will be saved as: {local_file_path}")
|
||||
|
||||
# 下载文件
|
||||
print(f"Downloading '{object_name}' from bucket '{bucket_name}' to '{local_file_path}'...")
|
||||
minio_client.fget_object(bucket_name, object_name, local_file_path)
|
||||
|
||||
print(f"File '{object_name}' downloaded successfully to '{local_file_path}'.")
|
||||
|
||||
except S3Error as e:
|
||||
print(f"Error occurred: {e}")
|
||||
except Exception as e:
|
||||
print(f"Unexpected error: {e}")
|
||||
|
||||
# 示例配置
|
||||
endpoint = "192.168.1.28:9500" # MinIO 服务器地址
|
||||
access_key = "UIiBRvUixj3GkJwvItmy" # MinIO Access Key
|
||||
secret_key = "wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm" # MinIO Secret Key
|
||||
bucket_name = "test-bucket" # 存储桶名称
|
||||
object_name = "test/uploadedFiles/20241223/175_big.png" # 不带后缀的文件名
|
||||
|
||||
# 测试下载文件
|
||||
download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name)
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
import base64
|
||||
import urllib.parse
|
||||
import requests
|
||||
|
||||
|
||||
def generate_preview_url():
|
||||
# 原始 URL
|
||||
origin_url = 'http://192.168.1.28:8080/prod-api/downhole-tool-system/common/files/v1/download/60cbe676ef181249af3fe2cbeea99746'
|
||||
|
||||
# 添加 fullfilename 参数
|
||||
preview_url = origin_url
|
||||
|
||||
# Base64 编码
|
||||
encoded_preview_url = base64.b64encode(preview_url.encode('utf-8')).decode('utf-8')
|
||||
|
||||
# URL 编码 Base64 编码的结果
|
||||
final_url = f"http://127.0.0.1:8012/picturesPreview?urls={urllib.parse.quote(encoded_preview_url)}"
|
||||
|
||||
return final_url
|
||||
|
||||
|
||||
def request_with_token():
|
||||
# 生成完整的 URL
|
||||
url = generate_preview_url()
|
||||
|
||||
print(f"完整的 URL: {url}")
|
||||
|
||||
# 设置请求头
|
||||
headers = {
|
||||
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzQ2MDA4MDgsImp0aSI6IjJOT1hqbnRJdEh4YUJhQzRqcThJdlMtMVVSayIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.ytexuU6OPBycL6Zoh74YTYCJ_9f-u6emsFDy4-OsDvE', # 替换为实际的 token
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发起 GET 请求
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
# 处理响应
|
||||
if response.status_code == 200:
|
||||
print("请求成功!")
|
||||
print("响应内容:")
|
||||
print(response.content) # 如果是图片/文件数据,可以保存到本地
|
||||
else:
|
||||
print(f"请求失败,状态码: {response.status_code}")
|
||||
print("响应内容:")
|
||||
print(response.text)
|
||||
|
||||
|
||||
# 执行请求
|
||||
request_with_token()
|
||||
Loading…
Reference in New Issue