from minio import Minio from minio.error import S3Error def test_minio_connection(): # MinIO 配置 config = { "endpoint": "192.168.2.20:9002", "access_key": "zw4bi7qgyupVbouWonlr", "secret_key": "ETelFUk0mDf5ITpwJAeIqgLi8UQ1S953001eq6sq", "secure": False # 是否使用 HTTPS } try: # 创建 MinIO 客户端 client = Minio( config["endpoint"], access_key=config["access_key"], secret_key=config["secret_key"], secure=config["secure"] ) # 测试列出所有存储桶 buckets = client.list_buckets() print("成功连接到 MinIO!以下是现有的存储桶列表:") for bucket in buckets: print(f" - {bucket.name}") except S3Error as e: print(f"连接 MinIO 时发生错误: {e}") if __name__ == "__main__": test_minio_connection()