- 实现了应用配置的数据类结构(MqttConfig, TmsConfig, AppConfig) - 创建了配置加载和解析功能,支持从YAML文件读取配置 - 添加了TDengine数据库配置和连接池管理 - 实现了MQTT客户端依赖注入和服务构建 - 创建了钻孔实时数据的ORM映射和SQL构建功能 - 实现了TDengine Writer用于数据写入超级表 - 添加了MQTT模拟服务,支持发布、订阅和数据转发功能 - 创建了随机数据发送器用于测试 - 实现了消息持久化到本地文件功能 - 配置了数据库连接池和SQL执行功能
61 lines
1.9 KiB
Python
61 lines
1.9 KiB
Python
import base64
|
|
from queue import LifoQueue
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
class TaosConnection:
|
|
def __init__(self, base_url, username, password, timeout=10):
|
|
self.base_url = base_url
|
|
self.username = username or ""
|
|
self.password = password or ""
|
|
self.timeout = timeout
|
|
|
|
def execute(self, sql):
|
|
auth = f"{self.username}:{self.password}"
|
|
auth_header = base64.b64encode(auth.encode("utf-8")).decode("ascii")
|
|
req = Request(
|
|
url=f"{self.base_url}/rest/sql",
|
|
data=sql.encode("utf-8"),
|
|
headers={
|
|
"Authorization": f"Basic {auth_header}",
|
|
"Content-Type": "text/plain",
|
|
},
|
|
method="POST",
|
|
)
|
|
with urlopen(req, timeout=self.timeout) as resp:
|
|
body = resp.read().decode("utf-8", errors="replace")
|
|
if resp.status != 200:
|
|
raise RuntimeError(f"HTTP {resp.status} {body}")
|
|
return body
|
|
|
|
|
|
class TaosConnectionPool:
|
|
def __init__(self, base_url, username, password, pool_size=2, timeout=10):
|
|
self.base_url = base_url
|
|
self.username = username
|
|
self.password = password
|
|
self.pool_size = max(int(pool_size or 1), 1)
|
|
self.timeout = timeout
|
|
self._pool = LifoQueue(maxsize=self.pool_size)
|
|
for _ in range(self.pool_size):
|
|
self._pool.put(TaosConnection(base_url, username, password, timeout=timeout))
|
|
|
|
def execute(self, sql):
|
|
conn = self._pool.get()
|
|
try:
|
|
return conn.execute(sql)
|
|
finally:
|
|
self._pool.put(conn)
|
|
|
|
|
|
def create_taos_pool(config):
|
|
if not config.enabled:
|
|
return None
|
|
return TaosConnectionPool(
|
|
config.base_url,
|
|
config.username,
|
|
config.password,
|
|
pool_size=config.pool_size,
|
|
timeout=config.timeout,
|
|
)
|