import pymysql from dbutils.pooled_db import PooledDB from config import DB_CONFIG class MySQLPool: def __init__(self): # 初始化时建立数据库连接池 self.pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数 mincached=2, # 初始化时,连接池中至少创建的空闲的连接 maxcached=5, # 连接池中最多闲置的连接 blocking=True, # 连接池中如果没有可用连接后是否阻塞等待 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表 ping=0, **DB_CONFIG ) def get_connection(self): # 从连接池中获取一个连接 return self.pool.connection() # 使用上下文管理器自动处理连接的开启和关闭 def execute(self, sql, args=None): with self.get_connection() as connection: with connection.cursor() as cursor: cursor.execute(sql, args) if sql.strip().lower().startswith("select"): # 如果是查询操作,返回所有结果 return cursor.fetchall() else: # 如果是增、删、改操作,提交事务并返回影响的行数 connection.commit() return cursor.rowcount