import argparse import json import logging import time from datetime import datetime from urllib.parse import urlparse import paho.mqtt.client as mqtt from config import build_subscriber_dependencies logger = logging.getLogger(__name__) def parse_broker(broker): if not broker: raise ValueError("broker is required") if "://" not in broker: broker = "tcp://" + broker parsed = urlparse(broker) host = parsed.hostname or "localhost" port = parsed.port or 1883 scheme = (parsed.scheme or "tcp").lower() return scheme, host, port def print_flat_fields(title, value): logger.info(title) if isinstance(value, dict): for key, val in value.items(): logger.info(" %s: %s", key, val) else: logger.info(" %s", value) def run_subscriber(args, deps): mqtt_config = deps.config.mqtt tms_config = deps.config.tms topic = args.topic or mqtt_config.sub_topic or mqtt_config.pub_topic if not topic: raise ValueError("No topic to subscribe. Set sub-topic or pub-topic, or pass --topic") scheme, host, port = parse_broker(mqtt_config.broker) logger.info("MQTT subscriber config broker=%s://%s:%s client_id=%s topic=%s", scheme, host, port, mqtt_config.subscriber_client_id, topic) client = mqtt.Client(client_id=mqtt_config.subscriber_client_id, clean_session=True) if mqtt_config.username is not None: client.username_pw_set(mqtt_config.username, mqtt_config.password) if scheme in ("ssl", "tls", "mqtts"): client.tls_set() def on_connect(c, userdata, flags, rc): if rc == 0: logger.info("Connected") c.subscribe(topic) logger.info("Subscribed %s", topic) else: logger.error("Connect failed rc=%s", rc) def on_disconnect(c, userdata, rc): logger.info("Disconnected callback rc=%s", rc) def on_message(c, userdata, msg): raw_payload = msg.payload.decode("utf-8", errors="replace") logger.info("RX time=%s topic=%s qos=%s retain=%s bytes=%s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'), msg.topic, msg.qos, msg.retain, len(msg.payload)) try: obj = json.loads(raw_payload) logger.info("Raw JSON: %s", json.dumps(obj, ensure_ascii=False)) if isinstance(obj, dict): print_flat_fields("meta:", obj.get("meta")) print_flat_fields("data:", obj.get("data")) except Exception: logger.info("Raw payload: %s", raw_payload) client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message client.connect(host, port, keepalive=tms_config.keepalive) client.loop_start() try: while True: time.sleep(1) except KeyboardInterrupt: logger.info("Subscriber interrupted") finally: client.loop_stop() client.disconnect() logger.info("Subscriber stopped") def add_arguments(parser): parser.add_argument("--config", default="config.yaml", help="Path to config yaml") parser.add_argument("--topic", default="", help="Override topic to subscribe") def main(argv=None): parser = argparse.ArgumentParser(description="MQTT subscriber") add_arguments(parser) args = parser.parse_args(argv) deps = build_subscriber_dependencies(args.config) run_subscriber(args, deps) if __name__ == "__main__": main()