From 45870a2f73f564e23c763dd77c164ae5f168de57 Mon Sep 17 00:00:00 2001 From: wsy182 <2392948297@qq.com> Date: Thu, 12 Mar 2026 10:20:35 +0800 Subject: [PATCH] =?UTF-8?q?feat(wits):=20=E6=B7=BB=E5=8A=A0WITS=20TCP?= =?UTF-8?q?=E5=8F=91=E9=80=81=E5=8A=9F=E8=83=BD=E5=92=8C=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增WitsConfig数据类用于WITS配置管理 - 在AppConfig中集成wits配置选项 - 重命名dependencies.py为config.py并重构配置加载逻辑 - 移除db/config.py文件中的TDengine配置相关代码 - 创建新的model.py文件定义MqttConfig、TmsConfig和TdengineConfig模型 - 更新MQTT模块导入路径从config.dependencies到config.config - 添加WITS发送器脚本wits_sender.py实现TCP数据包发送 - 在README.md中添加WITS发送器使用说明和配置选项 - 添加WITS样本数据文件data/wits_sample.txt - 添加requirements.md文档说明项目需求 --- README.md | 47 +++-- config/__init__.py | 16 ++ config/config.py | 18 ++ config/dependencies.py | 41 ---- config/model.py | 40 ++++ config_utils.py | 3 - data/wits_sample.txt | 47 +++++ db/config.py | 65 ------ mqtt_mock.py | 2 +- mqtt_sender.py | 2 +- mqtt_subscriber.py | 2 +- requirements.md | 21 ++ createTable.sql => sql/createTable.sql | 0 wits_sender.py | 268 +++++++++++++++++++++++++ 14 files changed, 444 insertions(+), 128 deletions(-) create mode 100644 config/config.py delete mode 100644 config/dependencies.py create mode 100644 config/model.py delete mode 100644 config_utils.py create mode 100644 data/wits_sample.txt delete mode 100644 db/config.py create mode 100644 requirements.md rename createTable.sql => sql/createTable.sql (100%) create mode 100644 wits_sender.py diff --git a/README.md b/README.md index 09bd755..57d76be 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# MQTT Mock Service +# MQTT Mock Service Simulates a service that: - Subscribes to the ingest topic (server -> broker) @@ -10,6 +10,7 @@ Simulates a service that: Also includes: - A random data sender (every 3 seconds by default) - A simple subscriber that prints incoming payloads +- A WITS TCP sender for the Java TCP receiver ## Setup ``` @@ -23,11 +24,6 @@ pip install -r requirements.txt python mqtt_mock.py --config config.yaml --mode listen ``` -When TDengine config exists in `config.yaml`, each message on `pub-topic` is inserted into: -- Super table: `drilling_realtime_st` -- Sub table: auto-generated as `drilling_realtime_` -- Tag value: fixed `device_code` from config (`GJ-304-0088`) - ## Run (also publish test data) ``` python mqtt_mock.py --config config.yaml --mode both --interval 2 @@ -43,6 +39,22 @@ python mqtt_sender.py --config config.yaml --interval 3 python mqtt_subscriber.py --config config.yaml ``` +## Run WITS sender +Send one generated WITS packet to `wits.host/wits.port` or fallback `tms.server-ip/tms.server-port`: +``` +python wits_sender.py --config config.yaml +``` + +Send packets continuously every 2 seconds: +``` +python wits_sender.py --config config.yaml --interval 2 --count 0 +``` + +Send a raw packet from file: +``` +python wits_sender.py --config config.yaml --source-file data/wits_sample.txt +``` + ## Payload format (server -> broker) ``` { @@ -102,12 +114,15 @@ python mqtt_subscriber.py --config config.yaml ``` ## Config -- `pub-topic`: ingest topic from server -- `sub-topic`: forward topic for other services to subscribe -- `ack-topic`: optional ack topic -- `data-file`: local append-only file (JSON Lines) -- `equipment-sn`: default equipment_sn for simulated payloads -- `test-id`: default test_id for simulated payloads +- `mqtt.*`: MQTT broker and topic settings +- `tms.device-code`: default device code for generated data +- `tms.server-ip`: WITS target host fallback +- `tms.server-port`: WITS target port fallback +- `tms.timeout`: WITS socket timeout fallback +- `wits.host`: optional explicit WITS target host +- `wits.port`: optional explicit WITS target port +- `wits.timeout`: optional explicit WITS socket timeout +- `wits.source-file`: optional default WITS packet file path - `tdengine.url`: e.g. `jdbc:TAOS-RS://192.168.1.87:6041/tms` - `tdengine.username`: DB user - `tdengine.password`: DB password @@ -115,7 +130,7 @@ python mqtt_subscriber.py --config config.yaml - `tdengine.stable`: default `drilling_realtime_st` ## Options -- `--mode`: publish | listen | both -- `--interval`: publish interval seconds -- `--count`: publish count (0 = forever) -- `--data-file`: override data-file in config +- `mqtt_mock.py`: `--mode` `--interval` `--count` `--data-file` +- `mqtt_sender.py`: `--interval` `--count` +- `mqtt_subscriber.py`: `--topic` +- `wits_sender.py`: `--host` `--port` `--timeout` `--source-file` `--interval` `--count` diff --git a/config/__init__.py b/config/__init__.py index 2084059..8481632 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -28,10 +28,19 @@ class TmsConfig: server_port: int | None +@dataclass(frozen=True) +class WitsConfig: + host: str + port: int + timeout: int + source_file: str + + @dataclass(frozen=True) class AppConfig: mqtt: MqttConfig tms: TmsConfig + wits: WitsConfig raw: dict @@ -96,6 +105,12 @@ def load_app_config(path): server_ip=get_value(raw, ("tms", "server-ip"), ("server-ip",)), server_port=get_value(raw, ("tms", "server-port"), ("server-port",)), ), + wits=WitsConfig( + host=get_value(raw, ("wits", "host"), ("tms", "server-ip"), ("server-ip",), default=""), + port=int(get_value(raw, ("wits", "port"), ("tms", "server-port"), ("server-port",), default=0)), + timeout=int(get_value(raw, ("wits", "timeout"), ("tms", "timeout"), ("timeout",), default=10)), + source_file=get_value(raw, ("wits", "source-file"), default=""), + ), raw=raw, ) @@ -106,6 +121,7 @@ __all__ = [ "AppConfig", "MqttConfig", "TmsConfig", + "WitsConfig", "get_value", "load_app_config", "load_config", diff --git a/config/config.py b/config/config.py new file mode 100644 index 0000000..b588729 --- /dev/null +++ b/config/config.py @@ -0,0 +1,18 @@ +import yaml + +from config.model import * + + +def load(path: str) -> "Config": + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + mqtt_cfg = MqttConfig(**data["mqtt"]) + tms_cfg = TmsConfig(**data["tms"]) + tdengine_cfg = TdengineConfig(**data["tdengine"]) + + return Config( + mqtt=mqtt_cfg, + tms=tms_cfg, + tdengine=tdengine_cfg + ) diff --git a/config/dependencies.py b/config/dependencies.py deleted file mode 100644 index c14735b..0000000 --- a/config/dependencies.py +++ /dev/null @@ -1,41 +0,0 @@ -from dataclasses import dataclass - -from config import load_app_config -from db import TDengineWriter, load_tdengine_config - - -@dataclass(frozen=True) -class SenderDependencies: - config: object - - -@dataclass(frozen=True) -class SubscriberDependencies: - config: object - - -@dataclass(frozen=True) -class MockDependencies: - config: object - tdengine_config: object - tdengine_writer: object - data_file: str - - -def build_sender_dependencies(config_path): - return SenderDependencies(config=load_app_config(config_path)) - - -def build_subscriber_dependencies(config_path): - return SubscriberDependencies(config=load_app_config(config_path)) - - -def build_mock_dependencies(config_path, data_file_override=""): - app_config = load_app_config(config_path) - tdengine_config = load_tdengine_config(app_config, default_device_code=app_config.tms.device_code) - return MockDependencies( - config=app_config, - tdengine_config=tdengine_config, - tdengine_writer=TDengineWriter(tdengine_config), - data_file=data_file_override or app_config.mqtt.data_file, - ) diff --git a/config/model.py b/config/model.py new file mode 100644 index 0000000..2a67620 --- /dev/null +++ b/config/model.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass + + +@dataclass +class MqttConfig: + broker: str + client_id: str + mock_client_id: str + sender_client_id: str + subscriber_client_id: str + username: str + password: str + pub_topic: str + + +@dataclass +class TmsConfig: + device_code: str + equipment_sn: str + timeout: int + keepalive: int + server_ip: str + server_port: int + + +@dataclass +class TdengineConfig: + url: str + username: str + password: str + database: str + stable: str + device_code: str + + +@dataclass +class Config: + mqtt: MqttConfig + tms: TmsConfig + tdengine: TdengineConfig diff --git a/config_utils.py b/config_utils.py deleted file mode 100644 index d988cb5..0000000 --- a/config_utils.py +++ /dev/null @@ -1,3 +0,0 @@ -from config import get_value, load_config - -__all__ = ["get_value", "load_config"] diff --git a/data/wits_sample.txt b/data/wits_sample.txt new file mode 100644 index 0000000..d7985df --- /dev/null +++ b/data/wits_sample.txt @@ -0,0 +1,47 @@ +&& +01GJ-304-0088 +020 +030 +041 +0520250312.00 +6120000.00 +70 +80.00 +90.00 +100.00 +110.00 +120.00 +130.00 +140.00 +150.00 +160.00 +170.00 +180.00 +190.00 +200 +210.00 +220.00 +230 +240 +250 +260.00 +270.00 +280 +290.00 +300.00 +310.00 +320.00 +330.00 +340.00 +350.00 +360.00 +370 +380 +390.00 +400.00 +410.00 +420.00 +430.00 +440.00 +450.00 +!! diff --git a/db/config.py b/db/config.py deleted file mode 100644 index 6750b33..0000000 --- a/db/config.py +++ /dev/null @@ -1,65 +0,0 @@ -from dataclasses import dataclass -from urllib.parse import urlparse - -from config import get_value - - -@dataclass(frozen=True) -class TDengineConfig: - url: str = "" - username: str = "" - password: str = "" - database: str = "" - stable: str = "drilling_realtime_st" - device_code: str = "GJ-304-0088" - pool_size: int = 2 - timeout: int = 10 - - @property - def enabled(self): - return bool(self.base_url and self.database and self.username) - - @property - def base_url(self): - base_url, _ = parse_taos_url(self.url) - return base_url - - -def parse_taos_url(jdbc_url): - if not jdbc_url: - return "", "" - raw = str(jdbc_url).strip() - if raw.lower().startswith("jdbc:taos-rs://"): - raw = "http://" + raw[len("jdbc:TAOS-RS://") :] - elif "://" not in raw: - raw = "http://" + raw - parsed = urlparse(raw) - base_url = f"{parsed.scheme or 'http'}://{parsed.hostname or '127.0.0.1'}:{parsed.port or 6041}" - database = (parsed.path or "").strip("/") - return base_url, database - - -def _resolve_raw_config(cfg_or_app): - raw = getattr(cfg_or_app, "raw", None) - return raw if isinstance(raw, dict) else cfg_or_app - - -def load_tdengine_config(cfg_or_app, default_device_code="GJ-304-0088"): - cfg = _resolve_raw_config(cfg_or_app) - url = get_value(cfg, ("tdengine", "url"), ("tdengine-url",), default="") - _, database_from_url = parse_taos_url(url) - return TDengineConfig( - url=url, - username=get_value(cfg, ("tdengine", "username"), ("tdengine-username",), default=""), - password=get_value(cfg, ("tdengine", "password"), ("tdengine-password",), default=""), - database=get_value(cfg, ("tdengine", "database"), ("tdengine-database",), default=database_from_url), - stable=get_value(cfg, ("tdengine", "stable"), ("tdengine-stable",), default="drilling_realtime_st"), - device_code=get_value( - cfg, - ("tdengine", "device-code"), - ("tdengine", "equipment-sn"), - default=default_device_code, - ), - pool_size=int(get_value(cfg, ("tdengine", "pool-size"), ("tdengine-pool-size",), default=2)), - timeout=int(get_value(cfg, ("tdengine", "timeout"), ("tdengine-timeout",), default=10)), - ) diff --git a/mqtt_mock.py b/mqtt_mock.py index cc9e2e1..1c2462e 100644 --- a/mqtt_mock.py +++ b/mqtt_mock.py @@ -7,7 +7,7 @@ from urllib.parse import urlparse import paho.mqtt.client as mqtt -from config.dependencies import build_mock_dependencies +from config.config import build_mock_dependencies DATA_KEYS = [ diff --git a/mqtt_sender.py b/mqtt_sender.py index 51e31e8..b500ea4 100644 --- a/mqtt_sender.py +++ b/mqtt_sender.py @@ -7,7 +7,7 @@ from urllib.parse import urlparse import paho.mqtt.client as mqtt -from config.dependencies import build_sender_dependencies +from config.config import build_sender_dependencies DATA_KEYS = [ diff --git a/mqtt_subscriber.py b/mqtt_subscriber.py index 699e126..251d725 100644 --- a/mqtt_subscriber.py +++ b/mqtt_subscriber.py @@ -6,7 +6,7 @@ from urllib.parse import urlparse import paho.mqtt.client as mqtt -from config.dependencies import build_subscriber_dependencies +from config.config import build_subscriber_dependencies def parse_broker(broker): diff --git a/requirements.md b/requirements.md new file mode 100644 index 0000000..f109e5b --- /dev/null +++ b/requirements.md @@ -0,0 +1,21 @@ +# tdEngine_mqtt_mock需求 + +## 功能 + +### wits数据模拟 + +1. 读取config.yaml中的server-ip和端口将模拟的wits数据发送到这个地址。wits数据要尽可能的拟真。 + + + + + +### mqtt消息订阅和入库 + +1. 订阅指定topic的消息,将消息写入tdengine数据库,tdengine的配置在config.yaml的tdengine。mqtt的配置在config.yaml的mqtt。 + + + +## 要求 + +程序启动入口main.py,使用logging记录日志。 \ No newline at end of file diff --git a/createTable.sql b/sql/createTable.sql similarity index 100% rename from createTable.sql rename to sql/createTable.sql diff --git a/wits_sender.py b/wits_sender.py new file mode 100644 index 0000000..3924bbe --- /dev/null +++ b/wits_sender.py @@ -0,0 +1,268 @@ +import argparse +import random +import socket +import time +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from config.dependencies import build_wits_sender_dependencies + + +BEGIN_MARK = "&&\r\n" +END_MARK = "!!\r\n" + + +@dataclass(frozen=True) +class WitsData: + ts: int + wellid: str + stknum: int + recid: int + seqid: int + actual_date: float + actual_time: float + actual_ts: int + actcod: int + actod_label: str + deptbitm: float + deptbitv: float + deptmeas: float + deptvert: float + blkpos: float + ropa: float + hkla: float + hklx: float + woba: float + wobx: float + torqa: float + torqx: float + rpma: int + sppa: float + chkp: float + spm1: int + spm2: int + spm3: int + tvolact: float + tvolcact: float + mfop: int + mfoa: float + mfia: float + mdoa: float + mdia: float + mtoa: float + mtia: float + mcoa: float + mcia: float + stkc: int + lagstks: int + deptretm: float + gasa: float + space1: float + space2: float + space3: float + space4: float + space5: float + + +WITS_FIELD_MAPPING = [ + (1, "wellid", "string"), + (2, "stknum", "int"), + (3, "recid", "int"), + (4, "seqid", "int"), + (5, "actual_date", "float"), + (6, "actual_time", "float"), + (7, "actcod", "int"), + (8, "deptbitm", "float"), + (9, "deptbitv", "float"), + (10, "deptmeas", "float"), + (11, "deptvert", "float"), + (12, "blkpos", "float"), + (13, "ropa", "float"), + (14, "hkla", "float"), + (15, "hklx", "float"), + (16, "woba", "float"), + (17, "wobx", "float"), + (18, "torqa", "float"), + (19, "torqx", "float"), + (20, "rpma", "int"), + (21, "sppa", "float"), + (22, "chkp", "float"), + (23, "spm1", "int"), + (24, "spm2", "int"), + (25, "spm3", "int"), + (26, "tvolact", "float"), + (27, "tvolcact", "float"), + (28, "mfop", "int"), + (29, "mfoa", "float"), + (30, "mfia", "float"), + (31, "mdoa", "float"), + (32, "mdia", "float"), + (33, "mtoa", "float"), + (34, "mtia", "float"), + (35, "mcoa", "float"), + (36, "mcia", "float"), + (37, "stkc", "int"), + (38, "lagstks", "int"), + (39, "deptretm", "float"), + (40, "gasa", "float"), + (41, "space1", "float"), + (42, "space2", "float"), + (43, "space3", "float"), + (44, "space4", "float"), + (45, "space5", "float"), +] + + +def rand_int(a, b): + return random.randint(a, b) + + +def rand_float(a, b, digits=2): + return round(random.uniform(a, b), digits) + + +def build_random_wits_data(device_code): + now = datetime.now() + ts_ms = int(time.time() * 1000) + return WitsData( + ts=ts_ms, + wellid=device_code, + stknum=rand_int(0, 500), + recid=0, + seqid=rand_int(1, 999999), + actual_date=float(now.strftime("%Y%m%d")), + actual_time=float(now.strftime("%H%M%S")), + actual_ts=ts_ms, + actcod=rand_int(0, 9), + actod_label="AUTO", + deptbitm=rand_float(0, 5000), + deptbitv=rand_float(0, 5000), + deptmeas=rand_float(0, 5000), + deptvert=rand_float(0, 5000), + blkpos=rand_float(0, 100), + ropa=rand_float(0, 200), + hkla=rand_float(0, 500), + hklx=rand_float(0, 500), + woba=rand_float(0, 200), + wobx=rand_float(0, 200), + torqa=rand_float(0, 200), + torqx=rand_float(0, 200), + rpma=rand_int(0, 300), + sppa=rand_float(0, 5000), + chkp=rand_float(0, 5000), + spm1=rand_int(0, 200), + spm2=rand_int(0, 200), + spm3=rand_int(0, 200), + tvolact=rand_float(0, 20000), + tvolcact=rand_float(0, 20000), + mfop=rand_int(0, 1000), + mfoa=rand_float(0, 1000), + mfia=rand_float(0, 1000), + mdoa=rand_float(0, 1000), + mdia=rand_float(0, 1000), + mtoa=rand_float(0, 1000), + mtia=rand_float(0, 1000), + mcoa=rand_float(0, 1000), + mcia=rand_float(0, 1000), + stkc=rand_int(0, 200), + lagstks=rand_int(0, 200), + deptretm=rand_float(0, 5000), + gasa=rand_float(0, 100), + space1=rand_float(0, 10), + space2=rand_float(0, 10), + space3=rand_float(0, 10), + space4=rand_float(0, 10), + space5=rand_float(0, 10), + ) + + +def format_wits_value(value, kind): + if kind == "string": + return str(value) + if kind == "int": + return str(int(value)) + return f"{float(value):.2f}" + + +def build_wits_packet(data): + lines = [] + for index, field_name, kind in WITS_FIELD_MAPPING: + value = getattr(data, field_name) + lines.append(f"{index:02d}{format_wits_value(value, kind)}") + return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + + +def normalize_packet(text): + body = text.replace("\r\n", "\n").replace("\r", "\n") + lines = [line.rstrip() for line in body.split("\n") if line.strip()] + if lines and lines[0] == "&&": + lines = lines[1:] + if lines and lines[-1] == "!!": + lines = lines[:-1] + return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + + +def load_packet_from_file(path): + return normalize_packet(Path(path).read_text(encoding="utf-8-sig")) + + +def send_packet(host, port, timeout, packet): + with socket.create_connection((host, port), timeout=timeout) as sock: + sock.sendall(packet.encode("ascii", errors="strict")) + + +def run_wits_sender(args, deps): + wits_config = deps.config.wits + device_code = deps.config.tms.device_code + source_file = args.source_file or wits_config.source_file + host = args.host or wits_config.host + port = args.port or wits_config.port + timeout = args.timeout or wits_config.timeout + + if not host or not port: + raise ValueError("WITS target host/port is empty. Configure wits.host/wits.port or tms.server-ip/tms.server-port") + + print("WITS sender config:") + print(f" host: {host}") + print(f" port: {port}") + print(f" timeout: {timeout}s") + print(f" source-file: {source_file or '(generated)'}") + print(f" interval: {args.interval}s") + print(f" count: {args.count or 'forever'}") + + seq = 0 + try: + while True: + seq += 1 + if source_file: + packet = load_packet_from_file(source_file) + else: + packet = build_wits_packet(build_random_wits_data(device_code)) + send_packet(host, port, timeout, packet) + print(f"TX WITS #{seq} -> {host}:{port}") + print(packet) + if args.count and seq >= args.count: + break + time.sleep(args.interval) + except KeyboardInterrupt: + pass + + +def main(): + ap = argparse.ArgumentParser(description="WITS TCP sender") + ap.add_argument("--config", default="config.yaml", help="Path to config yaml") + ap.add_argument("--host", default="", help="Override target host") + ap.add_argument("--port", type=int, default=0, help="Override target port") + ap.add_argument("--timeout", type=int, default=0, help="Override socket timeout") + ap.add_argument("--source-file", default="", help="Send raw WITS packet from file") + ap.add_argument("--interval", type=float, default=3.0, help="Send interval in seconds") + ap.add_argument("--count", type=int, default=1, help="Send count (0 = forever)") + args = ap.parse_args() + deps = build_wits_sender_dependencies(args.config) + run_wits_sender(args, deps) + + +if __name__ == "__main__": + main() +