- 将配置相关类移动到model模块 - 实现依赖注入容器管理各组件依赖关系 - 重构配置加载逻辑支持多层级键值查找 - 更新主应用入口支持命令行参数解析 - 统一日志输出格式替换原有打印语句 - 引入钻井实时数据模型简化数据处理 - 移除硬编码字段映射改用动态配置方式 - 优化数据库写入逻辑基于新的数据模型
108 lines
3.4 KiB
Python
108 lines
3.4 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
from config import build_subscriber_dependencies
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_broker(broker):
|
|
if not broker:
|
|
raise ValueError("broker is required")
|
|
if "://" not in broker:
|
|
broker = "tcp://" + broker
|
|
parsed = urlparse(broker)
|
|
host = parsed.hostname or "localhost"
|
|
port = parsed.port or 1883
|
|
scheme = (parsed.scheme or "tcp").lower()
|
|
return scheme, host, port
|
|
|
|
|
|
def print_flat_fields(title, value):
|
|
logger.info(title)
|
|
if isinstance(value, dict):
|
|
for key, val in value.items():
|
|
logger.info(" %s: %s", key, val)
|
|
else:
|
|
logger.info(" %s", value)
|
|
|
|
|
|
def run_subscriber(args, deps):
|
|
mqtt_config = deps.config.mqtt
|
|
tms_config = deps.config.tms
|
|
topic = args.topic or mqtt_config.sub_topic or mqtt_config.pub_topic
|
|
if not topic:
|
|
raise ValueError("No topic to subscribe. Set sub-topic or pub-topic, or pass --topic")
|
|
|
|
scheme, host, port = parse_broker(mqtt_config.broker)
|
|
logger.info("MQTT subscriber config broker=%s://%s:%s client_id=%s topic=%s", scheme, host, port, mqtt_config.subscriber_client_id, topic)
|
|
|
|
client = mqtt.Client(client_id=mqtt_config.subscriber_client_id, clean_session=True)
|
|
if mqtt_config.username is not None:
|
|
client.username_pw_set(mqtt_config.username, mqtt_config.password)
|
|
if scheme in ("ssl", "tls", "mqtts"):
|
|
client.tls_set()
|
|
|
|
def on_connect(c, userdata, flags, rc):
|
|
if rc == 0:
|
|
logger.info("Connected")
|
|
c.subscribe(topic)
|
|
logger.info("Subscribed %s", topic)
|
|
else:
|
|
logger.error("Connect failed rc=%s", rc)
|
|
|
|
def on_disconnect(c, userdata, rc):
|
|
logger.info("Disconnected callback rc=%s", rc)
|
|
|
|
def on_message(c, userdata, msg):
|
|
raw_payload = msg.payload.decode("utf-8", errors="replace")
|
|
logger.info("RX time=%s topic=%s qos=%s retain=%s bytes=%s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'), msg.topic, msg.qos, msg.retain, len(msg.payload))
|
|
try:
|
|
obj = json.loads(raw_payload)
|
|
logger.info("Raw JSON: %s", json.dumps(obj, ensure_ascii=False))
|
|
if isinstance(obj, dict):
|
|
print_flat_fields("meta:", obj.get("meta"))
|
|
print_flat_fields("data:", obj.get("data"))
|
|
except Exception:
|
|
logger.info("Raw payload: %s", raw_payload)
|
|
|
|
client.on_connect = on_connect
|
|
client.on_disconnect = on_disconnect
|
|
client.on_message = on_message
|
|
client.connect(host, port, keepalive=tms_config.keepalive)
|
|
client.loop_start()
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
logger.info("Subscriber interrupted")
|
|
finally:
|
|
client.loop_stop()
|
|
client.disconnect()
|
|
logger.info("Subscriber stopped")
|
|
|
|
|
|
def add_arguments(parser):
|
|
parser.add_argument("--config", default="config.yaml", help="Path to config yaml")
|
|
parser.add_argument("--topic", default="", help="Override topic to subscribe")
|
|
|
|
|
|
def main(argv=None):
|
|
parser = argparse.ArgumentParser(description="MQTT subscriber")
|
|
add_arguments(parser)
|
|
args = parser.parse_args(argv)
|
|
deps = build_subscriber_dependencies(args.config)
|
|
run_subscriber(args, deps)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|