import argparse import json import os import time from datetime import datetime from urllib.parse import urlparse import paho.mqtt.client as mqtt from config.config import build_mock_dependencies DATA_KEYS = [ "ts", "wellid", "stknum", "recid", "seqid", "actual_date", "actual_time", "actcod", "deptbitm", "deptbitv", "deptmeas", "deptvert", "blkpos", "ropa", "hkla", "hklx", "woba", "wobx", "torqa", "torqx", "rpma", "sppa", "chkp", "spm1", "spm2", "spm3", "tvolact", "tvolcact", "mfop", "mfoa", "mfia", "mdoa", "mdia", "mtoa", "mtia", "mcoa", "mcia", "stkc", "lagstks", "deptretm", "gasa", "space1", "space2", "space3", "space4", "space5", ] def parse_broker(broker): if not broker: raise ValueError("broker is required") if "://" not in broker: broker = "tcp://" + broker parsed = urlparse(broker) host = parsed.hostname or "localhost" port = parsed.port or 1883 scheme = (parsed.scheme or "tcp").lower() return scheme, host, port def ensure_dir(path): if not path: return dir_path = os.path.dirname(path) if dir_path: os.makedirs(dir_path, exist_ok=True) def build_server_payload(equipment_code): data = {key: 0 for key in DATA_KEYS} data["ts"] = int(time.time() * 1000) data["wellid"] = "" return { "meta": { "equipment_code": equipment_code, "equipment_sn": equipment_code, }, "data": data, } def parse_payload(raw_payload): try: return json.loads(raw_payload) except Exception: return raw_payload def build_ack_payload(topic, raw_payload): payload = { "ack": True, "topic": topic, "ts": datetime.utcnow().isoformat(timespec="seconds") + "Z", } try: obj = json.loads(raw_payload) if isinstance(obj, dict): meta = obj.get("meta") if isinstance(obj.get("meta"), dict) else {} if "equipment_code" in meta: payload["equipment_code"] = meta["equipment_code"] if "equipment_sn" in meta: payload["equipment_sn"] = meta["equipment_sn"] if "id" in obj: payload["id"] = obj["id"] payload["src"] = obj except Exception: payload["raw"] = raw_payload return payload def write_message(path, topic, raw_payload): ensure_dir(path) record = { "ts": datetime.utcnow().isoformat(timespec="seconds") + "Z", "topic": topic, "payload": parse_payload(raw_payload), } payload_obj = record["payload"] if isinstance(payload_obj, dict): data = payload_obj.get("data") if isinstance(data, dict) and isinstance(data.get("ts"), (int, float)): record["ts_iso"] = datetime.utcfromtimestamp(data["ts"] / 1000.0).isoformat(timespec="seconds") + "Z" with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=True)) f.write("\n") def run_mock_service(args, deps): mqtt_config = deps.config.mqtt tms_config = deps.config.tms tdengine_config = deps.tdengine_config tdengine_writer = deps.tdengine_writer scheme, host, port = parse_broker(mqtt_config.broker) print("MQTT mock config:") print(f" broker: {scheme}://{host}:{port}") print(f" client-id: {mqtt_config.mock_client_id}") print(f" pub-topic (ingest): {mqtt_config.pub_topic}") print(f" sub-topic (forward): {mqtt_config.sub_topic}") print(f" ack-topic: {mqtt_config.ack_topic}") print(f" data-file: {deps.data_file}") print(f" device-code: {tms_config.device_code}") print(f" tdengine enabled: {tdengine_writer.enabled}") if tdengine_writer.enabled: print(f" tdengine host: {tdengine_config.base_url}") print(f" tdengine database: {tdengine_config.database}") print(f" tdengine stable: {tdengine_config.stable}") print(f" tdengine pool-size: {tdengine_config.pool_size}") print(f" mode: {args.mode}") client = mqtt.Client(client_id=mqtt_config.mock_client_id, clean_session=True) if mqtt_config.username is not None: client.username_pw_set(mqtt_config.username, mqtt_config.password) if scheme in ("ssl", "tls", "mqtts"): client.tls_set() def on_connect(c, userdata, flags, rc): if rc == 0: print("Connected") else: print(f"Connect failed rc={rc}") return if args.mode in ("listen", "both") and mqtt_config.pub_topic: c.subscribe(mqtt_config.pub_topic) print(f"Subscribed: {mqtt_config.pub_topic}") def on_disconnect(c, userdata, rc): print(f"Disconnected callback rc={rc}") def on_message(c, userdata, msg): payload = msg.payload.decode("utf-8", errors="replace") print(f"RX {msg.topic}: {payload}") if msg.topic != mqtt_config.pub_topic: return if deps.data_file: write_message(deps.data_file, msg.topic, payload) print(f"Wrote to file: {deps.data_file}") if tdengine_writer.enabled: try: tdengine_writer.write_payload(parse_payload(payload)) print("Wrote to TDengine") except Exception as exc: print(f"Write TDengine failed: {exc}") if mqtt_config.sub_topic: c.publish(mqtt_config.sub_topic, payload) print(f"Forwarded to {mqtt_config.sub_topic}") if mqtt_config.ack_topic: ack_payload = build_ack_payload(msg.topic, payload) c.publish(mqtt_config.ack_topic, json.dumps(ack_payload, ensure_ascii=True)) print(f"TX {mqtt_config.ack_topic}: {ack_payload}") client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message client.connect(host, port, keepalive=tms_config.keepalive) client.loop_start() try: if args.mode in ("publish", "both"): if not mqtt_config.pub_topic: print("pub-topic is empty; nothing to publish") else: seq = 0 while True: seq += 1 payload = build_server_payload(tms_config.device_code) client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True)) print(f"TX {mqtt_config.pub_topic}: {payload}") if args.count and seq >= args.count: break time.sleep(args.interval) else: while True: time.sleep(1) except KeyboardInterrupt: pass finally: client.loop_stop() client.disconnect() print("Disconnected") def main(): ap = argparse.ArgumentParser(description="MQTT mock service") ap.add_argument("--config", default="config.yaml", help="Path to config yaml") ap.add_argument("--mode", choices=["publish", "listen", "both"], default="listen") ap.add_argument("--interval", type=float, default=2.0, help="Publish interval (seconds)") ap.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)") ap.add_argument("--data-file", default="", help="Override data-file in config") args = ap.parse_args() deps = build_mock_dependencies(args.config, data_file_override=args.data_file) run_mock_service(args, deps) if __name__ == "__main__": main()