Files
tdEngine_mqtt_mock/db/pool.py
wsy182 d5d1cb0b7d feat(config): 添加配置管理和MQTT模拟服务功能
- 实现了应用配置的数据类结构(MqttConfig, TmsConfig, AppConfig)
- 创建了配置加载和解析功能,支持从YAML文件读取配置
- 添加了TDengine数据库配置和连接池管理
- 实现了MQTT客户端依赖注入和服务构建
- 创建了钻孔实时数据的ORM映射和SQL构建功能
- 实现了TDengine Writer用于数据写入超级表
- 添加了MQTT模拟服务,支持发布、订阅和数据转发功能
- 创建了随机数据发送器用于测试
- 实现了消息持久化到本地文件功能
- 配置了数据库连接池和SQL执行功能
2026-03-12 09:58:00 +08:00

61 lines
1.9 KiB
Python

import base64
from queue import LifoQueue
from urllib.request import Request, urlopen
class TaosConnection:
def __init__(self, base_url, username, password, timeout=10):
self.base_url = base_url
self.username = username or ""
self.password = password or ""
self.timeout = timeout
def execute(self, sql):
auth = f"{self.username}:{self.password}"
auth_header = base64.b64encode(auth.encode("utf-8")).decode("ascii")
req = Request(
url=f"{self.base_url}/rest/sql",
data=sql.encode("utf-8"),
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "text/plain",
},
method="POST",
)
with urlopen(req, timeout=self.timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
if resp.status != 200:
raise RuntimeError(f"HTTP {resp.status} {body}")
return body
class TaosConnectionPool:
def __init__(self, base_url, username, password, pool_size=2, timeout=10):
self.base_url = base_url
self.username = username
self.password = password
self.pool_size = max(int(pool_size or 1), 1)
self.timeout = timeout
self._pool = LifoQueue(maxsize=self.pool_size)
for _ in range(self.pool_size):
self._pool.put(TaosConnection(base_url, username, password, timeout=timeout))
def execute(self, sql):
conn = self._pool.get()
try:
return conn.execute(sql)
finally:
self._pool.put(conn)
def create_taos_pool(config):
if not config.enabled:
return None
return TaosConnectionPool(
config.base_url,
config.username,
config.password,
pool_size=config.pool_size,
timeout=config.timeout,
)