From d5d1cb0b7d54e4d923aefa79a8dbf8b374dd75e7 Mon Sep 17 00:00:00 2001 From: wsy182 <2392948297@qq.com> Date: Thu, 12 Mar 2026 09:58:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(config):=20=E6=B7=BB=E5=8A=A0=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E7=AE=A1=E7=90=86=E5=92=8CMQTT=E6=A8=A1=E6=8B=9F?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现了应用配置的数据类结构(MqttConfig, TmsConfig, AppConfig) - 创建了配置加载和解析功能,支持从YAML文件读取配置 - 添加了TDengine数据库配置和连接池管理 - 实现了MQTT客户端依赖注入和服务构建 - 创建了钻孔实时数据的ORM映射和SQL构建功能 - 实现了TDengine Writer用于数据写入超级表 - 添加了MQTT模拟服务,支持发布、订阅和数据转发功能 - 创建了随机数据发送器用于测试 - 实现了消息持久化到本地文件功能 - 配置了数据库连接池和SQL执行功能 --- .gitignore | 12 ++ README.md | 121 +++++++++++++++++++ config.yaml | 29 +++++ config/__init__.py | 113 ++++++++++++++++++ config/dependencies.py | 41 +++++++ config_utils.py | 3 + createTable.sql | 51 +++++++++ db/__init__.py | 14 +++ db/config.py | 65 +++++++++++ db/orm.py | 122 ++++++++++++++++++++ db/pool.py | 60 ++++++++++ db/writer.py | 20 ++++ install.bat | 1 + main.py | 10 ++ mqtt_mock.py | 255 +++++++++++++++++++++++++++++++++++++++++ mqtt_sender.py | 193 +++++++++++++++++++++++++++++++ mqtt_subscriber.py | 112 ++++++++++++++++++ requirements.txt | 2 + utils/__init__.py | 0 19 files changed, 1224 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config.yaml create mode 100644 config/__init__.py create mode 100644 config/dependencies.py create mode 100644 config_utils.py create mode 100644 createTable.sql create mode 100644 db/__init__.py create mode 100644 db/config.py create mode 100644 db/orm.py create mode 100644 db/pool.py create mode 100644 db/writer.py create mode 100644 install.bat create mode 100644 main.py create mode 100644 mqtt_mock.py create mode 100644 mqtt_sender.py create mode 100644 mqtt_subscriber.py create mode 100644 requirements.txt create mode 100644 utils/__init__.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..355de5f --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +.venv +.idea + + +.pytest_cache +.tox +.coverage +.coverage.* +.pytest_cache/ +.pytest_ + +.vscode \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..09bd755 --- /dev/null +++ b/README.md @@ -0,0 +1,121 @@ +# MQTT Mock Service + +Simulates a service that: +- Subscribes to the ingest topic (server -> broker) +- Writes received messages to a local file (JSONL) +- Writes received messages to TDengine (super table) +- Forwards messages to another topic for downstream subscribers +- Optionally sends ack messages + +Also includes: +- A random data sender (every 3 seconds by default) +- A simple subscriber that prints incoming payloads + +## Setup +``` +python -m venv .venv +.\.venv\Scripts\activate +pip install -r requirements.txt +``` + +## Run (service mode) +``` +python mqtt_mock.py --config config.yaml --mode listen +``` + +When TDengine config exists in `config.yaml`, each message on `pub-topic` is inserted into: +- Super table: `drilling_realtime_st` +- Sub table: auto-generated as `drilling_realtime_` +- Tag value: fixed `device_code` from config (`GJ-304-0088`) + +## Run (also publish test data) +``` +python mqtt_mock.py --config config.yaml --mode both --interval 2 +``` + +## Run random sender (3s interval) +``` +python mqtt_sender.py --config config.yaml --interval 3 +``` + +## Run subscriber +``` +python mqtt_subscriber.py --config config.yaml +``` + +## Payload format (server -> broker) +``` +{ + "meta": { + "test_id": "550e8400e29b41d4a716446655440000", + "equipment_sn": "GJ-304-0088" + }, + "data": { + "record_time": 1751964764000, + "wellid": "", + "stknum": 0, + "recid": 0, + "seqid": 0, + "actual_date": 0, + "actual_time": 0, + "actcod": 0, + "deptbitm": 0, + "deptbitv": 0, + "deptmeas": 0, + "deptvert": 0, + "blkpos": 0, + "ropa": 0, + "hkla": 0, + "hklx": 0, + "woba": 0, + "wobx": 0, + "torqa": 0, + "torqx": 0, + "rpma": 0, + "sppa": 0, + "chkp": 0, + "spm1": 0, + "spm2": 0, + "spm3": 0, + "tvolact": 0, + "tvolcact": 0, + "mfop": 0, + "mfoa": 0, + "mfia": 0, + "mdoa": 0, + "mdia": 0, + "mtoa": 0, + "mtia": 0, + "mcoa": 0, + "mcia": 0, + "stkc": 0, + "lagstks": 0, + "deptretm": 0, + "gasa": 0, + "space1": 0, + "space2": 0, + "space3": 0, + "space4": 0, + "space5": 0 + } +} +``` + +## Config +- `pub-topic`: ingest topic from server +- `sub-topic`: forward topic for other services to subscribe +- `ack-topic`: optional ack topic +- `data-file`: local append-only file (JSON Lines) +- `equipment-sn`: default equipment_sn for simulated payloads +- `test-id`: default test_id for simulated payloads +- `tdengine.url`: e.g. `jdbc:TAOS-RS://192.168.1.87:6041/tms` +- `tdengine.username`: DB user +- `tdengine.password`: DB password +- `tdengine.database`: DB name (optional if URL already has `/tms`) +- `tdengine.stable`: default `drilling_realtime_st` + +## Options +- `--mode`: publish | listen | both +- `--interval`: publish interval seconds +- `--count`: publish count (0 = forever) +- `--data-file`: override data-file in config diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..bd2b082 --- /dev/null +++ b/config.yaml @@ -0,0 +1,29 @@ +mqtt: + broker: tcp://192.168.1.87:1883 + client-id: tms-client-dev + mock-client-id: tms-client-dev-mock + sender-client-id: tms-client-dev-sender + subscriber-client-id: tms-client-dev-subscriber + username: test + password: S55HtwFZvhf67VpS + # Server publishes here; mock service subscribes and ingests + pub-topic: rules_gj_jxjs_wtt + # Mock service forwards here for other subscribers + +tms: + device-code: GJ-304-0088 + equipment-sn: GJ-304-0088 + timeout: 10 + keepalive: 20 + server-ip: 192.168.1.41 + server-port: 9929 + +tdengine: + url: jdbc:TAOS-RS://192.168.1.87:6041/tms + username: root + password: wDvfFffdwbm5U15K + # If omitted, database is parsed from url path: /tms + database: tms + stable: drilling_realtime_st + device-code: GJ-304-0088 + diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..2084059 --- /dev/null +++ b/config/__init__.py @@ -0,0 +1,113 @@ +from dataclasses import dataclass + +import yaml + + +@dataclass(frozen=True) +class MqttConfig: + broker: str + client_id: str + mock_client_id: str + sender_client_id: str + subscriber_client_id: str + username: str | None + password: str | None + pub_topic: str | None + sub_topic: str | None + ack_topic: str | None + data_file: str + + +@dataclass(frozen=True) +class TmsConfig: + device_code: str + equipment_sn: str + timeout: int + keepalive: int + server_ip: str | None + server_port: int | None + + +@dataclass(frozen=True) +class AppConfig: + mqtt: MqttConfig + tms: TmsConfig + raw: dict + + +def load_raw_config(path): + with open(path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + + +def get_value(cfg, *paths, default=None): + for path in paths: + current = cfg + found = True + for key in path: + if not isinstance(current, dict) or key not in current: + found = False + break + current = current[key] + if found and current is not None: + return current + return default + + +def load_app_config(path): + raw = load_raw_config(path) + base_client_id = get_value(raw, ("mqtt", "client-id"), ("client-id",), default="mqtt") + device_code = get_value( + raw, + ("tms", "device-code"), + ("tms", "equipment-sn"), + ("device-code",), + ("equipment-sn",), + default="GJ-304-0088", + ) + equipment_sn = get_value( + raw, + ("tms", "equipment-sn"), + ("tms", "device-code"), + ("equipment-sn",), + ("device-code",), + default=device_code, + ) + + return AppConfig( + mqtt=MqttConfig( + broker=get_value(raw, ("mqtt", "broker"), ("broker",), default=""), + client_id=base_client_id, + mock_client_id=get_value(raw, ("mqtt", "mock-client-id"), ("mock-client-id",), default=f"{base_client_id}-mock"), + sender_client_id=get_value(raw, ("mqtt", "sender-client-id"), ("sender-client-id",), default=f"{base_client_id}-sender"), + subscriber_client_id=get_value(raw, ("mqtt", "subscriber-client-id"), ("subscriber-client-id",), default=f"{base_client_id}-subscriber"), + username=get_value(raw, ("mqtt", "username"), ("username",)), + password=get_value(raw, ("mqtt", "password"), ("password",)), + pub_topic=get_value(raw, ("mqtt", "pub-topic"), ("pub-topic",)), + sub_topic=get_value(raw, ("mqtt", "sub-topic"), ("sub-topic",)), + ack_topic=get_value(raw, ("mqtt", "ack-topic"), ("ack-topic",)), + data_file=get_value(raw, ("mqtt", "data-file"), ("data-file",), default=""), + ), + tms=TmsConfig( + device_code=device_code, + equipment_sn=equipment_sn, + timeout=int(get_value(raw, ("tms", "timeout"), ("timeout",), default=10)), + keepalive=int(get_value(raw, ("tms", "keepalive"), ("keepalive",), default=20)), + server_ip=get_value(raw, ("tms", "server-ip"), ("server-ip",)), + server_port=get_value(raw, ("tms", "server-port"), ("server-port",)), + ), + raw=raw, + ) + + +load_config = load_app_config + +__all__ = [ + "AppConfig", + "MqttConfig", + "TmsConfig", + "get_value", + "load_app_config", + "load_config", + "load_raw_config", +] diff --git a/config/dependencies.py b/config/dependencies.py new file mode 100644 index 0000000..c14735b --- /dev/null +++ b/config/dependencies.py @@ -0,0 +1,41 @@ +from dataclasses import dataclass + +from config import load_app_config +from db import TDengineWriter, load_tdengine_config + + +@dataclass(frozen=True) +class SenderDependencies: + config: object + + +@dataclass(frozen=True) +class SubscriberDependencies: + config: object + + +@dataclass(frozen=True) +class MockDependencies: + config: object + tdengine_config: object + tdengine_writer: object + data_file: str + + +def build_sender_dependencies(config_path): + return SenderDependencies(config=load_app_config(config_path)) + + +def build_subscriber_dependencies(config_path): + return SubscriberDependencies(config=load_app_config(config_path)) + + +def build_mock_dependencies(config_path, data_file_override=""): + app_config = load_app_config(config_path) + tdengine_config = load_tdengine_config(app_config, default_device_code=app_config.tms.device_code) + return MockDependencies( + config=app_config, + tdengine_config=tdengine_config, + tdengine_writer=TDengineWriter(tdengine_config), + data_file=data_file_override or app_config.mqtt.data_file, + ) diff --git a/config_utils.py b/config_utils.py new file mode 100644 index 0000000..d988cb5 --- /dev/null +++ b/config_utils.py @@ -0,0 +1,3 @@ +from config import get_value, load_config + +__all__ = ["get_value", "load_config"] diff --git a/createTable.sql b/createTable.sql new file mode 100644 index 0000000..de89186 --- /dev/null +++ b/createTable.sql @@ -0,0 +1,51 @@ +DROP TABLE IF EXISTS drilling_realtime_st; + +CREATE STABLE drilling_realtime_st ( + ts TIMESTAMP, + stknum INT, + recid INT, + seqid INT, + actual_date FLOAT, + actual_time FLOAT, + actcod INT, + deptbitm FLOAT, + deptbitv FLOAT, + deptmeas FLOAT, + deptvert FLOAT, + blkpos FLOAT, + ropa FLOAT, + hkla FLOAT, + hklx FLOAT, + woba FLOAT, + wobx FLOAT, + torqa FLOAT, + torqx FLOAT, + rpma INT, + sppa FLOAT, + chkp FLOAT, + spm1 INT, + spm2 INT, + spm3 INT, + tvolact FLOAT, + tvolcact FLOAT, + mfop INT, + mfoa FLOAT, + mfia FLOAT, + mdoa FLOAT, + mdia FLOAT, + mtoa FLOAT, + mtia FLOAT, + mcoa FLOAT, + mcia FLOAT, + stkc INT, + lagstks INT, + deptretm FLOAT, + gasa FLOAT, + space1 FLOAT, + space2 FLOAT, + space3 FLOAT, + space4 FLOAT, + space5 FLOAT +) TAGS ( + equipment_code VARCHAR(50) +); \ No newline at end of file diff --git a/db/__init__.py b/db/__init__.py new file mode 100644 index 0000000..4f74d45 --- /dev/null +++ b/db/__init__.py @@ -0,0 +1,14 @@ +from db.config import TDengineConfig, load_tdengine_config, parse_taos_url +from db.orm import DrillingRealtimeORM +from db.pool import TaosConnectionPool, create_taos_pool +from db.writer import TDengineWriter + +__all__ = [ + "DrillingRealtimeORM", + "TDengineConfig", + "TDengineWriter", + "TaosConnectionPool", + "create_taos_pool", + "load_tdengine_config", + "parse_taos_url", +] diff --git a/db/config.py b/db/config.py new file mode 100644 index 0000000..6750b33 --- /dev/null +++ b/db/config.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass +from urllib.parse import urlparse + +from config import get_value + + +@dataclass(frozen=True) +class TDengineConfig: + url: str = "" + username: str = "" + password: str = "" + database: str = "" + stable: str = "drilling_realtime_st" + device_code: str = "GJ-304-0088" + pool_size: int = 2 + timeout: int = 10 + + @property + def enabled(self): + return bool(self.base_url and self.database and self.username) + + @property + def base_url(self): + base_url, _ = parse_taos_url(self.url) + return base_url + + +def parse_taos_url(jdbc_url): + if not jdbc_url: + return "", "" + raw = str(jdbc_url).strip() + if raw.lower().startswith("jdbc:taos-rs://"): + raw = "http://" + raw[len("jdbc:TAOS-RS://") :] + elif "://" not in raw: + raw = "http://" + raw + parsed = urlparse(raw) + base_url = f"{parsed.scheme or 'http'}://{parsed.hostname or '127.0.0.1'}:{parsed.port or 6041}" + database = (parsed.path or "").strip("/") + return base_url, database + + +def _resolve_raw_config(cfg_or_app): + raw = getattr(cfg_or_app, "raw", None) + return raw if isinstance(raw, dict) else cfg_or_app + + +def load_tdengine_config(cfg_or_app, default_device_code="GJ-304-0088"): + cfg = _resolve_raw_config(cfg_or_app) + url = get_value(cfg, ("tdengine", "url"), ("tdengine-url",), default="") + _, database_from_url = parse_taos_url(url) + return TDengineConfig( + url=url, + username=get_value(cfg, ("tdengine", "username"), ("tdengine-username",), default=""), + password=get_value(cfg, ("tdengine", "password"), ("tdengine-password",), default=""), + database=get_value(cfg, ("tdengine", "database"), ("tdengine-database",), default=database_from_url), + stable=get_value(cfg, ("tdengine", "stable"), ("tdengine-stable",), default="drilling_realtime_st"), + device_code=get_value( + cfg, + ("tdengine", "device-code"), + ("tdengine", "equipment-sn"), + default=default_device_code, + ), + pool_size=int(get_value(cfg, ("tdengine", "pool-size"), ("tdengine-pool-size",), default=2)), + timeout=int(get_value(cfg, ("tdengine", "timeout"), ("tdengine-timeout",), default=10)), + ) diff --git a/db/orm.py b/db/orm.py new file mode 100644 index 0000000..7810fbc --- /dev/null +++ b/db/orm.py @@ -0,0 +1,122 @@ +import re +import time + + +DB_COLUMNS = [ + "ts", + "stknum", + "recid", + "seqid", + "actual_date", + "actual_time", + "actcod", + "deptbitm", + "deptbitv", + "deptmeas", + "deptvert", + "blkpos", + "ropa", + "hkla", + "hklx", + "woba", + "wobx", + "torqa", + "torqx", + "rpma", + "sppa", + "chkp", + "spm1", + "spm2", + "spm3", + "tvolact", + "tvolcact", + "mfop", + "mfoa", + "mfia", + "mdoa", + "mdia", + "mtoa", + "mtia", + "mcoa", + "mcia", + "stkc", + "lagstks", + "deptretm", + "gasa", + "space1", + "space2", + "space3", + "space4", + "space5", +] + + +INT_COLUMNS = {"stknum", "recid", "seqid", "actcod", "rpma", "spm1", "spm2", "spm3", "mfop", "stkc", "lagstks"} + + +def sanitize_identifier(value, fallback): + cleaned = re.sub(r"[^A-Za-z0-9_]", "_", str(value or "")) + if not cleaned: + cleaned = fallback + if cleaned[0].isdigit(): + cleaned = f"t_{cleaned}" + return cleaned.lower() + + +def sql_quote(value): + return "'" + str(value).replace("'", "''") + "'" + + +def to_int(value, default=0): + try: + return int(value) + except Exception: + return default + + +def to_float(value, default=0.0): + try: + return float(value) + except Exception: + return default + + +class DrillingRealtimeORM: + def __init__(self, database, stable="drilling_realtime_st", default_device_code="GJ-304-0088"): + self.database = database + self.stable = stable + self.default_device_code = default_device_code or "GJ-304-0088" + + def build_insert_sql(self, payload): + if not isinstance(payload, dict): + raise ValueError("payload is not JSON object") + meta = payload.get("meta") if isinstance(payload.get("meta"), dict) else {} + data = payload.get("data") if isinstance(payload.get("data"), dict) else {} + + equipment_code = ( + str(self.default_device_code).strip() + or str(meta.get("equipment_code", "")).strip() + or str(meta.get("equipment_sn", "")).strip() + or "GJ-304-0088" + ) + table_name = sanitize_identifier( + f"drilling_realtime_{equipment_code}", + "drilling_realtime_default", + ) + + values = [] + for col in DB_COLUMNS: + if col == "ts": + raw = data.get("ts", data.get("record_time", int(time.time() * 1000))) + values.append(str(to_int(raw, int(time.time() * 1000)))) + elif col in INT_COLUMNS: + values.append(str(to_int(data.get(col, 0), 0))) + else: + values.append(str(to_float(data.get(col, 0), 0.0))) + + columns_sql = ", ".join([f"`{column}`" for column in DB_COLUMNS]) + values_sql = ", ".join(values) + return ( + f"INSERT INTO `{self.database}`.`{table_name}` USING `{self.database}`.`{self.stable}` " + f"TAGS ({sql_quote(equipment_code)}) ({columns_sql}) VALUES ({values_sql})" + ) diff --git a/db/pool.py b/db/pool.py new file mode 100644 index 0000000..e021145 --- /dev/null +++ b/db/pool.py @@ -0,0 +1,60 @@ +import base64 +from queue import LifoQueue +from urllib.request import Request, urlopen + + +class TaosConnection: + def __init__(self, base_url, username, password, timeout=10): + self.base_url = base_url + self.username = username or "" + self.password = password or "" + self.timeout = timeout + + def execute(self, sql): + auth = f"{self.username}:{self.password}" + auth_header = base64.b64encode(auth.encode("utf-8")).decode("ascii") + req = Request( + url=f"{self.base_url}/rest/sql", + data=sql.encode("utf-8"), + headers={ + "Authorization": f"Basic {auth_header}", + "Content-Type": "text/plain", + }, + method="POST", + ) + with urlopen(req, timeout=self.timeout) as resp: + body = resp.read().decode("utf-8", errors="replace") + if resp.status != 200: + raise RuntimeError(f"HTTP {resp.status} {body}") + return body + + +class TaosConnectionPool: + def __init__(self, base_url, username, password, pool_size=2, timeout=10): + self.base_url = base_url + self.username = username + self.password = password + self.pool_size = max(int(pool_size or 1), 1) + self.timeout = timeout + self._pool = LifoQueue(maxsize=self.pool_size) + for _ in range(self.pool_size): + self._pool.put(TaosConnection(base_url, username, password, timeout=timeout)) + + def execute(self, sql): + conn = self._pool.get() + try: + return conn.execute(sql) + finally: + self._pool.put(conn) + + +def create_taos_pool(config): + if not config.enabled: + return None + return TaosConnectionPool( + config.base_url, + config.username, + config.password, + pool_size=config.pool_size, + timeout=config.timeout, + ) diff --git a/db/writer.py b/db/writer.py new file mode 100644 index 0000000..7d72518 --- /dev/null +++ b/db/writer.py @@ -0,0 +1,20 @@ +from db.orm import DrillingRealtimeORM +from db.pool import create_taos_pool + + +class TDengineWriter: + def __init__(self, config, pool=None, orm=None): + self.config = config + self.pool = pool if pool is not None else create_taos_pool(config) + self.orm = orm if orm is not None else DrillingRealtimeORM( + config.database, + stable=config.stable, + default_device_code=config.device_code, + ) + self.enabled = bool(config.enabled and self.pool) + + def write_payload(self, payload): + if not self.enabled: + return None + sql = self.orm.build_insert_sql(payload) + return self.pool.execute(sql) diff --git a/install.bat b/install.bat new file mode 100644 index 0000000..d2a62d3 --- /dev/null +++ b/install.bat @@ -0,0 +1 @@ +pip install pyyaml paho-mqtt \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..9620e74 --- /dev/null +++ b/main.py @@ -0,0 +1,10 @@ +import logging + + +def main(): + logging.info("start app....") + + + +if __name__ == '__main__': + main() diff --git a/mqtt_mock.py b/mqtt_mock.py new file mode 100644 index 0000000..cc9e2e1 --- /dev/null +++ b/mqtt_mock.py @@ -0,0 +1,255 @@ +import argparse +import json +import os +import time +from datetime import datetime +from urllib.parse import urlparse + +import paho.mqtt.client as mqtt + +from config.dependencies import build_mock_dependencies + + +DATA_KEYS = [ + "ts", + "wellid", + "stknum", + "recid", + "seqid", + "actual_date", + "actual_time", + "actcod", + "deptbitm", + "deptbitv", + "deptmeas", + "deptvert", + "blkpos", + "ropa", + "hkla", + "hklx", + "woba", + "wobx", + "torqa", + "torqx", + "rpma", + "sppa", + "chkp", + "spm1", + "spm2", + "spm3", + "tvolact", + "tvolcact", + "mfop", + "mfoa", + "mfia", + "mdoa", + "mdia", + "mtoa", + "mtia", + "mcoa", + "mcia", + "stkc", + "lagstks", + "deptretm", + "gasa", + "space1", + "space2", + "space3", + "space4", + "space5", +] + + +def parse_broker(broker): + if not broker: + raise ValueError("broker is required") + if "://" not in broker: + broker = "tcp://" + broker + parsed = urlparse(broker) + host = parsed.hostname or "localhost" + port = parsed.port or 1883 + scheme = (parsed.scheme or "tcp").lower() + return scheme, host, port + + +def ensure_dir(path): + if not path: + return + dir_path = os.path.dirname(path) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + + +def build_server_payload(equipment_code): + data = {key: 0 for key in DATA_KEYS} + data["ts"] = int(time.time() * 1000) + data["wellid"] = "" + return { + "meta": { + "equipment_code": equipment_code, + "equipment_sn": equipment_code, + }, + "data": data, + } + + +def parse_payload(raw_payload): + try: + return json.loads(raw_payload) + except Exception: + return raw_payload + + +def build_ack_payload(topic, raw_payload): + payload = { + "ack": True, + "topic": topic, + "ts": datetime.utcnow().isoformat(timespec="seconds") + "Z", + } + try: + obj = json.loads(raw_payload) + if isinstance(obj, dict): + meta = obj.get("meta") if isinstance(obj.get("meta"), dict) else {} + if "equipment_code" in meta: + payload["equipment_code"] = meta["equipment_code"] + if "equipment_sn" in meta: + payload["equipment_sn"] = meta["equipment_sn"] + if "id" in obj: + payload["id"] = obj["id"] + payload["src"] = obj + except Exception: + payload["raw"] = raw_payload + return payload + + +def write_message(path, topic, raw_payload): + ensure_dir(path) + record = { + "ts": datetime.utcnow().isoformat(timespec="seconds") + "Z", + "topic": topic, + "payload": parse_payload(raw_payload), + } + payload_obj = record["payload"] + if isinstance(payload_obj, dict): + data = payload_obj.get("data") + if isinstance(data, dict) and isinstance(data.get("ts"), (int, float)): + record["ts_iso"] = datetime.utcfromtimestamp(data["ts"] / 1000.0).isoformat(timespec="seconds") + "Z" + with open(path, "a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=True)) + f.write("\n") + + +def run_mock_service(args, deps): + mqtt_config = deps.config.mqtt + tms_config = deps.config.tms + tdengine_config = deps.tdengine_config + tdengine_writer = deps.tdengine_writer + scheme, host, port = parse_broker(mqtt_config.broker) + + print("MQTT mock config:") + print(f" broker: {scheme}://{host}:{port}") + print(f" client-id: {mqtt_config.mock_client_id}") + print(f" pub-topic (ingest): {mqtt_config.pub_topic}") + print(f" sub-topic (forward): {mqtt_config.sub_topic}") + print(f" ack-topic: {mqtt_config.ack_topic}") + print(f" data-file: {deps.data_file}") + print(f" device-code: {tms_config.device_code}") + print(f" tdengine enabled: {tdengine_writer.enabled}") + if tdengine_writer.enabled: + print(f" tdengine host: {tdengine_config.base_url}") + print(f" tdengine database: {tdengine_config.database}") + print(f" tdengine stable: {tdengine_config.stable}") + print(f" tdengine pool-size: {tdengine_config.pool_size}") + print(f" mode: {args.mode}") + + client = mqtt.Client(client_id=mqtt_config.mock_client_id, clean_session=True) + if mqtt_config.username is not None: + client.username_pw_set(mqtt_config.username, mqtt_config.password) + + if scheme in ("ssl", "tls", "mqtts"): + client.tls_set() + + def on_connect(c, userdata, flags, rc): + if rc == 0: + print("Connected") + else: + print(f"Connect failed rc={rc}") + return + if args.mode in ("listen", "both") and mqtt_config.pub_topic: + c.subscribe(mqtt_config.pub_topic) + print(f"Subscribed: {mqtt_config.pub_topic}") + + def on_disconnect(c, userdata, rc): + print(f"Disconnected callback rc={rc}") + + def on_message(c, userdata, msg): + payload = msg.payload.decode("utf-8", errors="replace") + print(f"RX {msg.topic}: {payload}") + if msg.topic != mqtt_config.pub_topic: + return + + if deps.data_file: + write_message(deps.data_file, msg.topic, payload) + print(f"Wrote to file: {deps.data_file}") + + if tdengine_writer.enabled: + try: + tdengine_writer.write_payload(parse_payload(payload)) + print("Wrote to TDengine") + except Exception as exc: + print(f"Write TDengine failed: {exc}") + + if mqtt_config.sub_topic: + c.publish(mqtt_config.sub_topic, payload) + print(f"Forwarded to {mqtt_config.sub_topic}") + + if mqtt_config.ack_topic: + ack_payload = build_ack_payload(msg.topic, payload) + c.publish(mqtt_config.ack_topic, json.dumps(ack_payload, ensure_ascii=True)) + print(f"TX {mqtt_config.ack_topic}: {ack_payload}") + + client.on_connect = on_connect + client.on_disconnect = on_disconnect + client.on_message = on_message + client.connect(host, port, keepalive=tms_config.keepalive) + client.loop_start() + + try: + if args.mode in ("publish", "both"): + if not mqtt_config.pub_topic: + print("pub-topic is empty; nothing to publish") + else: + seq = 0 + while True: + seq += 1 + payload = build_server_payload(tms_config.device_code) + client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True)) + print(f"TX {mqtt_config.pub_topic}: {payload}") + if args.count and seq >= args.count: + break + time.sleep(args.interval) + else: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + client.loop_stop() + client.disconnect() + print("Disconnected") + + +def main(): + ap = argparse.ArgumentParser(description="MQTT mock service") + ap.add_argument("--config", default="config.yaml", help="Path to config yaml") + ap.add_argument("--mode", choices=["publish", "listen", "both"], default="listen") + ap.add_argument("--interval", type=float, default=2.0, help="Publish interval (seconds)") + ap.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)") + ap.add_argument("--data-file", default="", help="Override data-file in config") + args = ap.parse_args() + deps = build_mock_dependencies(args.config, data_file_override=args.data_file) + run_mock_service(args, deps) + + +if __name__ == "__main__": + main() diff --git a/mqtt_sender.py b/mqtt_sender.py new file mode 100644 index 0000000..51e31e8 --- /dev/null +++ b/mqtt_sender.py @@ -0,0 +1,193 @@ +import argparse +import json +import random +import time +from datetime import datetime +from urllib.parse import urlparse + +import paho.mqtt.client as mqtt + +from config.dependencies import build_sender_dependencies + + +DATA_KEYS = [ + "ts", + "wellid", + "stknum", + "recid", + "seqid", + "actual_date", + "actual_time", + "actcod", + "deptbitm", + "deptbitv", + "deptmeas", + "deptvert", + "blkpos", + "ropa", + "hkla", + "hklx", + "woba", + "wobx", + "torqa", + "torqx", + "rpma", + "sppa", + "chkp", + "spm1", + "spm2", + "spm3", + "tvolact", + "tvolcact", + "mfop", + "mfoa", + "mfia", + "mdoa", + "mdia", + "mtoa", + "mtia", + "mcoa", + "mcia", + "stkc", + "lagstks", + "deptretm", + "gasa", + "space1", + "space2", + "space3", + "space4", + "space5", +] + + +def parse_broker(broker): + if not broker: + raise ValueError("broker is required") + if "://" not in broker: + broker = "tcp://" + broker + parsed = urlparse(broker) + host = parsed.hostname or "localhost" + port = parsed.port or 1883 + scheme = (parsed.scheme or "tcp").lower() + return scheme, host, port + + +def rand_int(a, b): + return random.randint(a, b) + + +def build_random_payload(equipment_code): + data = {key: 0 for key in DATA_KEYS} + data["ts"] = int(time.time() * 1000) + data["wellid"] = random.choice(["", f"WELL-{rand_int(1, 9999):04d}"]) + data["stknum"] = rand_int(0, 500) + data["recid"] = rand_int(0, 100000) + data["seqid"] = rand_int(0, 100000) + data["actual_date"] = int(datetime.utcnow().strftime("%Y%m%d")) + data["actual_time"] = int(datetime.utcnow().strftime("%H%M%S")) + data["actcod"] = rand_int(0, 9) + data["deptbitm"] = rand_int(0, 5000) + data["deptbitv"] = rand_int(0, 5000) + data["deptmeas"] = rand_int(0, 5000) + data["deptvert"] = rand_int(0, 5000) + data["blkpos"] = rand_int(0, 100) + data["ropa"] = rand_int(0, 200) + data["hkla"] = rand_int(0, 500) + data["hklx"] = rand_int(0, 500) + data["woba"] = rand_int(0, 200) + data["wobx"] = rand_int(0, 200) + data["torqa"] = rand_int(0, 200) + data["torqx"] = rand_int(0, 200) + data["rpma"] = rand_int(0, 300) + data["sppa"] = rand_int(0, 5000) + data["chkp"] = rand_int(0, 5000) + data["spm1"] = rand_int(0, 200) + data["spm2"] = rand_int(0, 200) + data["spm3"] = rand_int(0, 200) + data["tvolact"] = rand_int(0, 20000) + data["tvolcact"] = rand_int(0, 20000) + data["mfop"] = rand_int(0, 1000) + data["mfoa"] = rand_int(0, 1000) + data["mfia"] = rand_int(0, 1000) + data["mdoa"] = rand_int(0, 1000) + data["mdia"] = rand_int(0, 1000) + data["mtoa"] = rand_int(0, 1000) + data["mtia"] = rand_int(0, 1000) + data["mcoa"] = rand_int(0, 1000) + data["mcia"] = rand_int(0, 1000) + data["stkc"] = rand_int(0, 200) + data["lagstks"] = rand_int(0, 200) + data["deptretm"] = rand_int(0, 5000) + data["gasa"] = rand_int(0, 100) + data["space1"] = rand_int(0, 10) + data["space2"] = rand_int(0, 10) + data["space3"] = rand_int(0, 10) + data["space4"] = rand_int(0, 10) + data["space5"] = rand_int(0, 10) + return { + "meta": { + "equipment_code": equipment_code, + "equipment_sn": equipment_code, + }, + "data": data, + } + + +def run_sender(args, deps): + mqtt_config = deps.config.mqtt + tms_config = deps.config.tms + scheme, host, port = parse_broker(mqtt_config.broker) + + print("MQTT sender config:") + print(f" broker: {scheme}://{host}:{port}") + print(f" client-id: {mqtt_config.sender_client_id}") + print(f" pub-topic: {mqtt_config.pub_topic}") + print(f" interval: {args.interval}s") + + client = mqtt.Client(client_id=mqtt_config.sender_client_id, clean_session=True) + if mqtt_config.username is not None: + client.username_pw_set(mqtt_config.username, mqtt_config.password) + + if scheme in ("ssl", "tls", "mqtts"): + client.tls_set() + + def on_disconnect(c, userdata, rc): + print(f"Disconnected callback rc={rc}") + + client.on_disconnect = on_disconnect + client.connect(host, port, keepalive=tms_config.keepalive) + client.loop_start() + + try: + if not mqtt_config.pub_topic: + print("pub-topic is empty; nothing to publish") + return + seq = 0 + while True: + seq += 1 + payload = build_random_payload(tms_config.device_code) + client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True)) + print(f"TX {mqtt_config.pub_topic}: {payload}") + if args.count and seq >= args.count: + break + time.sleep(args.interval) + except KeyboardInterrupt: + pass + finally: + client.loop_stop() + client.disconnect() + print("Disconnected") + + +def main(): + ap = argparse.ArgumentParser(description="MQTT random data sender") + ap.add_argument("--config", default="config.yaml", help="Path to config yaml") + ap.add_argument("--interval", type=float, default=3.0, help="Publish interval (seconds)") + ap.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)") + args = ap.parse_args() + deps = build_sender_dependencies(args.config) + run_sender(args, deps) + + +if __name__ == "__main__": + main() diff --git a/mqtt_subscriber.py b/mqtt_subscriber.py new file mode 100644 index 0000000..699e126 --- /dev/null +++ b/mqtt_subscriber.py @@ -0,0 +1,112 @@ +import argparse +import json +import time +from datetime import datetime +from urllib.parse import urlparse + +import paho.mqtt.client as mqtt + +from config.dependencies import build_subscriber_dependencies + + +def parse_broker(broker): + if not broker: + raise ValueError("broker is required") + if "://" not in broker: + broker = "tcp://" + broker + parsed = urlparse(broker) + host = parsed.hostname or "localhost" + port = parsed.port or 1883 + scheme = (parsed.scheme or "tcp").lower() + return scheme, host, port + + +def print_flat_fields(title, value): + print(title) + if isinstance(value, dict): + for k, v in value.items(): + print(f" {k}: {v}") + else: + print(f" {value}") + + +def run_subscriber(args, deps): + mqtt_config = deps.config.mqtt + tms_config = deps.config.tms + topic = args.topic or mqtt_config.sub_topic or mqtt_config.pub_topic + if not topic: + raise ValueError("No topic to subscribe. Set sub-topic or pub-topic, or pass --topic") + + scheme, host, port = parse_broker(mqtt_config.broker) + + print("MQTT subscriber config:") + print(f" broker: {scheme}://{host}:{port}") + print(f" client-id: {mqtt_config.subscriber_client_id}") + print(f" topic: {topic}") + + client = mqtt.Client(client_id=mqtt_config.subscriber_client_id, clean_session=True) + if mqtt_config.username is not None: + client.username_pw_set(mqtt_config.username, mqtt_config.password) + + if scheme in ("ssl", "tls", "mqtts"): + client.tls_set() + + def on_connect(c, userdata, flags, rc): + if rc == 0: + print("Connected") + c.subscribe(topic) + print(f"Subscribed: {topic}") + else: + print(f"Connect failed rc={rc}") + + def on_disconnect(c, userdata, rc): + print(f"Disconnected callback rc={rc}") + + def on_message(c, userdata, msg): + raw_payload = msg.payload.decode("utf-8", errors="replace") + print("=" * 80) + print(f"RX Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Topic: {msg.topic}") + print(f"QoS: {msg.qos}, Retain: {msg.retain}, Bytes: {len(msg.payload)}") + try: + obj = json.loads(raw_payload) + print("Raw JSON:") + print(json.dumps(obj, ensure_ascii=False, indent=2)) + if isinstance(obj, dict): + print_flat_fields("meta:", obj.get("meta")) + print_flat_fields("data:", obj.get("data")) + extra_keys = [key for key in obj.keys() if key not in ("meta", "data")] + for key in extra_keys: + print_flat_fields(f"{key}:", obj.get(key)) + except Exception: + print("Raw payload:") + print(raw_payload) + + client.on_connect = on_connect + client.on_disconnect = on_disconnect + client.on_message = on_message + client.connect(host, port, keepalive=tms_config.keepalive) + client.loop_start() + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + client.loop_stop() + client.disconnect() + print("Disconnected") + + +def main(): + ap = argparse.ArgumentParser(description="MQTT subscriber") + ap.add_argument("--config", default="config.yaml", help="Path to config yaml") + ap.add_argument("--topic", default="", help="Override topic to subscribe") + args = ap.parse_args() + deps = build_subscriber_dependencies(args.config) + run_subscriber(args, deps) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ce7a79e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +paho-mqtt>=1.6.0 +PyYAML>=6.0 diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29