Files
tdEngine_mqtt_mock/app/mqtt_mock.py
wsy182 6557479a2f refactor(config): 重构配置模块并优化应用依赖注入
- 将配置相关类移动到model模块
- 实现依赖注入容器管理各组件依赖关系
- 重构配置加载逻辑支持多层级键值查找
- 更新主应用入口支持命令行参数解析
- 统一日志输出格式替换原有打印语句
- 引入钻井实时数据模型简化数据处理
- 移除硬编码字段映射改用动态配置方式
- 优化数据库写入逻辑基于新的数据模型
2026-03-12 10:41:26 +08:00

191 lines
6.8 KiB
Python

import argparse
import json
import logging
import os
import time
from datetime import datetime
from urllib.parse import urlparse
import paho.mqtt.client as mqtt
from config import build_mock_dependencies
from model import DrillingRealtimeData
logger = logging.getLogger(__name__)
def parse_broker(broker):
if not broker:
raise ValueError("broker is required")
if "://" not in broker:
broker = "tcp://" + broker
parsed = urlparse(broker)
host = parsed.hostname or "localhost"
port = parsed.port or 1883
scheme = (parsed.scheme or "tcp").lower()
return scheme, host, port
def ensure_dir(path):
if not path:
return
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
def build_server_payload(equipment_code):
return DrillingRealtimeData.empty(wellid=equipment_code).to_payload(equipment_code)
def parse_payload(raw_payload):
try:
return json.loads(raw_payload)
except Exception:
return raw_payload
def build_ack_payload(topic, raw_payload):
payload = {
"ack": True,
"topic": topic,
"ts": datetime.utcnow().isoformat(timespec="seconds") + "Z",
}
try:
obj = json.loads(raw_payload)
if isinstance(obj, dict):
meta = obj.get("meta") if isinstance(obj.get("meta"), dict) else {}
if "equipment_code" in meta:
payload["equipment_code"] = meta["equipment_code"]
if "equipment_sn" in meta:
payload["equipment_sn"] = meta["equipment_sn"]
if "id" in obj:
payload["id"] = obj["id"]
payload["src"] = obj
except Exception:
payload["raw"] = raw_payload
return payload
def write_message(path, topic, raw_payload):
ensure_dir(path)
record = {
"ts": datetime.utcnow().isoformat(timespec="seconds") + "Z",
"topic": topic,
"payload": parse_payload(raw_payload),
}
payload_obj = record["payload"]
if isinstance(payload_obj, dict):
data = payload_obj.get("data")
if isinstance(data, dict) and isinstance(data.get("ts"), (int, float)):
record["ts_iso"] = datetime.utcfromtimestamp(data["ts"] / 1000.0).isoformat(timespec="seconds") + "Z"
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=True))
f.write("\n")
def run_mock_service(args, deps):
mqtt_config = deps.config.mqtt
tms_config = deps.config.tms
tdengine_config = deps.tdengine_config
tdengine_writer = deps.tdengine_writer
scheme, host, port = parse_broker(mqtt_config.broker)
logger.info("MQTT mock config broker=%s://%s:%s client_id=%s mode=%s", scheme, host, port, mqtt_config.mock_client_id, args.mode)
logger.info("Topics ingest=%s forward=%s ack=%s", mqtt_config.pub_topic, mqtt_config.sub_topic, mqtt_config.ack_topic)
logger.info("Data file=%s device_code=%s tdengine_enabled=%s", deps.data_file, tms_config.device_code, tdengine_writer.enabled)
if tdengine_writer.enabled:
logger.info("TDengine host=%s database=%s stable=%s pool_size=%s", tdengine_config.base_url, tdengine_config.database, tdengine_config.stable, tdengine_config.pool_size)
client = mqtt.Client(client_id=mqtt_config.mock_client_id, clean_session=True)
if mqtt_config.username is not None:
client.username_pw_set(mqtt_config.username, mqtt_config.password)
if scheme in ("ssl", "tls", "mqtts"):
client.tls_set()
def on_connect(c, userdata, flags, rc):
if rc == 0:
logger.info("Connected")
else:
logger.error("Connect failed rc=%s", rc)
return
if args.mode in ("listen", "both") and mqtt_config.pub_topic:
c.subscribe(mqtt_config.pub_topic)
logger.info("Subscribed %s", mqtt_config.pub_topic)
def on_disconnect(c, userdata, rc):
logger.info("Disconnected callback rc=%s", rc)
def on_message(c, userdata, msg):
payload = msg.payload.decode("utf-8", errors="replace")
logger.info("RX topic=%s bytes=%s", msg.topic, len(msg.payload))
if msg.topic != mqtt_config.pub_topic:
return
if deps.data_file:
write_message(deps.data_file, msg.topic, payload)
logger.info("Wrote file %s", deps.data_file)
if tdengine_writer.enabled:
try:
tdengine_writer.write_payload(parse_payload(payload))
logger.info("Wrote TDengine")
except Exception:
logger.exception("Write TDengine failed")
if mqtt_config.sub_topic:
c.publish(mqtt_config.sub_topic, payload)
logger.info("Forwarded %s", mqtt_config.sub_topic)
if mqtt_config.ack_topic:
ack_payload = build_ack_payload(msg.topic, payload)
c.publish(mqtt_config.ack_topic, json.dumps(ack_payload, ensure_ascii=True))
logger.info("TX ack %s", mqtt_config.ack_topic)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect(host, port, keepalive=tms_config.keepalive)
client.loop_start()
try:
if args.mode in ("publish", "both"):
if not mqtt_config.pub_topic:
logger.warning("pub-topic is empty; nothing to publish")
else:
seq = 0
while True:
seq += 1
payload = build_server_payload(tms_config.device_code)
client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True))
logger.info("TX %s #%s", mqtt_config.pub_topic, seq)
if args.count and seq >= args.count:
break
time.sleep(args.interval)
else:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Mock interrupted")
finally:
client.loop_stop()
client.disconnect()
logger.info("Mock stopped")
def add_arguments(parser):
parser.add_argument("--config", default="config.yaml", help="Path to config yaml")
parser.add_argument("--mode", choices=["publish", "listen", "both"], default="listen")
parser.add_argument("--interval", type=float, default=2.0, help="Publish interval (seconds)")
parser.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)")
parser.add_argument("--data-file", default="", help="Override data-file in config")
def main(argv=None):
parser = argparse.ArgumentParser(description="MQTT mock service")
add_arguments(parser)
args = parser.parse_args(argv)
deps = build_mock_dependencies(args.config, data_file_override=args.data_file)
run_mock_service(args, deps)
if __name__ == "__main__":
main()