update code

main
wsy182 2024-12-26 13:07:55 +08:00
parent d64e709d3f
commit f77dc396db
4 changed files with 217 additions and 0 deletions

40
base64EncodeUrl.py Normal file
View File

@ -0,0 +1,40 @@
import base64
def generate_download_url(base_url: str, file_id: str, fullfilename: str, token: str) -> str:
"""
生成下载 URL 并对其进行 Base64 编码
:param base_url: 文件下载的基础 URL
:param file_id: 文件的 ID
:param fullfilename: 文件的完整名称
:param token: 授权的 Token
:return: Base64 编码后的完整下载 URL
"""
# 拼接完整的下载 URL
download_url = f"{base_url}/{file_id}?fullfilename={fullfilename}&token={token}"
print(f"Original URL: {download_url}")
# 对 URL 进行 Base64 编码
base64_encoded_url = base64.b64encode(download_url.encode("utf-8")).decode("utf-8")
print(f"Base64 Encoded URL: {base64_encoded_url}")
return base64_encoded_url
# 示例数据
pre_view_url = "http://192.168.1.12:8012/onlinePreview"
download_url = "http://192.168.1.28:8080/prod-api/file/download"
base_url = "http://192.168.1.28:8080/prod-api/file/download"
file_id = "a69ea8e0f8e28f92202975a3a4996e7b"
fullfilename = "文件预览.pptx"
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZW5hbnRfcm9sZV9rZXkiOiJzaHV6aXpob25neGluIiwidXNlcl9pZCI6IjEiLCJ1c2VyX25hbWUiOiJhZG1pbiIsInNjb3BlIjpbInNlcnZlciJdLCJleHAiOjE3MzUwMzIxMDMsImp0aSI6Ijl4Q3JrVDltXy1iZGh6czhKSWlrdkcySk5XcyIsImNsaWVudF9pZCI6IjFhN2JmY2M2MDI3NzRkNDk5NTkzNzU1MTFmYmIzYWYzIn0.T7jZ4bSmPqlwH2LjZeKLRXg_5Q4t7ihRUhd-zvdehak"
# 生成 Base64 编码的 URL
encoded_url = generate_download_url(base_url, file_id, fullfilename, token)
# 拼接下载和预览 URL
down_url = f"{download_url}/{file_id}"
final_url = f"{pre_view_url}?url={encoded_url}"
# 打印结果
print(f"Download URL: {down_url}")
print(f"Final Preview URL: {final_url}")

91
fastapiTest.py Normal file
View File

@ -0,0 +1,91 @@
from fastapi import FastAPI, HTTPException, Response
from minio import Minio
from minio.error import S3Error
from pydantic import BaseModel
from datetime import timedelta
app = FastAPI()
# 初始化 MinIO 客户端
minio_client = Minio(
endpoint="192.168.1.28:9500", # MinIO 服务器地址
access_key="UIiBRvUixj3GkJwvItmy", # Access Key
secret_key="wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm", # Secret Key
secure=False # 如果没有使用 HTTPS则设置为 False
)
# MinIO 配置
BUCKET_NAME = "test-bucket"
class FileRequest(BaseModel):
object_name: str # 文件对象名
expiry_in_seconds: int = 3600 # URL 有效期(默认 1 小时)
@app.post("/download-presigned-url")
def generate_presigned_url(file_request: FileRequest):
"""
生成用于下载文件的预签名 URL
"""
try:
# 检查存储桶是否存在
if not minio_client.bucket_exists(BUCKET_NAME):
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
# 检查对象是否存在
try:
minio_client.stat_object(BUCKET_NAME, file_request.object_name)
except S3Error:
raise HTTPException(status_code=404, detail=f"Object '{file_request.object_name}' does not exist.")
# 将秒数转换为 timedelta
expiry_timedelta = timedelta(seconds=file_request.expiry_in_seconds)
# 生成预签名 URL
presigned_url = minio_client.presigned_get_object(
BUCKET_NAME,
file_request.object_name,
expires=expiry_timedelta # 传递 timedelta
)
return {"presigned_url": presigned_url}
except S3Error as e:
raise HTTPException(status_code=500, detail=f"Error generating presigned URL: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")
@app.get("/download-file/{object_name:path}")
def download_file(object_name: str):
"""
提供文件对象并直接下载
"""
try:
print(f"Requested object: {object_name}")
# 检查存储桶是否存在
if not minio_client.bucket_exists(BUCKET_NAME):
raise HTTPException(status_code=404, detail=f"Bucket '{BUCKET_NAME}' does not exist.")
# 检查对象是否存在
try:
stat = minio_client.stat_object(BUCKET_NAME, object_name)
except S3Error:
raise HTTPException(status_code=404, detail=f"Object '{object_name}' does not exist.")
# 获取文件对象
response = minio_client.get_object(BUCKET_NAME, object_name)
# 设置响应头
headers = {
"Content-Disposition": f"attachment; filename*=UTF-8''{object_name.split('/')[-1]}",
"Content-Type": stat.content_type
}
return Response(content=response.read(), headers=headers, media_type=stat.content_type)
except S3Error as e:
raise HTTPException(status_code=500, detail=f"Error downloading file: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}")

28
generaterCardId.py Normal file
View File

@ -0,0 +1,28 @@
import random
def generateCardId(length):
"""
生成指定长度的随机卡号
:param length: 卡号的长度
:return: 生成的随机卡号
"""
if length <= 0:
raise ValueError("Length must be a positive integer.")
# 初始化卡号
card_id = ""
# 数字范围
numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 生成随机卡号
for _ in range(length):
random_number = random.choice(numbers)
card_id += str(random_number)
return card_id
# 测试函数
if __name__ == "__main__":
length = 18 # 指定卡号长度
card_id = generateCardId(length)
print(f"Generated Card ID: {card_id}")

View File

@ -0,0 +1,58 @@
from minio import Minio
from minio.error import S3Error
import mimetypes
import os
def download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name):
try:
# 初始化 MinIO 客户端
minio_client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=False # 如果 MinIO 没有使用 HTTPS设置为 False
)
# 检查存储桶是否存在
if not minio_client.bucket_exists(bucket_name):
print(f"Bucket '{bucket_name}' does not exist.")
return
# 获取对象元数据
stat = minio_client.stat_object(bucket_name, object_name)
content_type = stat.content_type # 获取 Content-Type
print(f"Object '{object_name}' Content-Type: {content_type}")
# 根据 Content-Type 确定文件后缀
guessed_extension = mimetypes.guess_extension(content_type)
if guessed_extension is None:
guessed_extension = "" # 无法推断时默认不加后缀
print(f"Guessed file extension: {guessed_extension}")
# 提取文件名并设置保存路径
file_name = os.path.basename(object_name) # 提取路径中的文件名
if "." not in file_name: # 如果文件名中没有后缀,添加推断的后缀
file_name += guessed_extension
local_file_path = f"./{file_name}" # 设置保存路径
print(f"File will be saved as: {local_file_path}")
# 下载文件
print(f"Downloading '{object_name}' from bucket '{bucket_name}' to '{local_file_path}'...")
minio_client.fget_object(bucket_name, object_name, local_file_path)
print(f"File '{object_name}' downloaded successfully to '{local_file_path}'.")
except S3Error as e:
print(f"Error occurred: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# 示例配置
endpoint = "192.168.1.28:9500" # MinIO 服务器地址
access_key = "UIiBRvUixj3GkJwvItmy" # MinIO Access Key
secret_key = "wgIPqYxaNlDhUds0d3n5v57Yvzg3u9iUXplO7Kdm" # MinIO Secret Key
bucket_name = "test-bucket" # 存储桶名称
object_name = "test/uploadedFiles/20241223/175_big.png" # 不带后缀的文件名
# 测试下载文件
download_file_from_minio(endpoint, access_key, secret_key, bucket_name, object_name)