feat(wits): 添加WITS TCP发送功能和配置重构

- 新增WitsConfig数据类用于WITS配置管理
- 在AppConfig中集成wits配置选项
- 重命名dependencies.py为config.py并重构配置加载逻辑
- 移除db/config.py文件中的TDengine配置相关代码
- 创建新的model.py文件定义MqttConfig、TmsConfig和TdengineConfig模型
- 更新MQTT模块导入路径从config.dependencies到config.config
- 添加WITS发送器脚本wits_sender.py实现TCP数据包发送
- 在README.md中添加WITS发送器使用说明和配置选项
- 添加WITS样本数据文件data/wits_sample.txt
- 添加requirements.md文档说明项目需求
This commit is contained in:
2026-03-12 10:20:35 +08:00
parent d5d1cb0b7d
commit 45870a2f73
14 changed files with 444 additions and 128 deletions

View File

@@ -1,4 +1,4 @@
# MQTT Mock Service
# MQTT Mock Service
Simulates a service that:
- Subscribes to the ingest topic (server -> broker)
@@ -10,6 +10,7 @@ Simulates a service that:
Also includes:
- A random data sender (every 3 seconds by default)
- A simple subscriber that prints incoming payloads
- A WITS TCP sender for the Java TCP receiver
## Setup
```
@@ -23,11 +24,6 @@ pip install -r requirements.txt
python mqtt_mock.py --config config.yaml --mode listen
```
When TDengine config exists in `config.yaml`, each message on `pub-topic` is inserted into:
- Super table: `drilling_realtime_st`
- Sub table: auto-generated as `drilling_realtime_<device_code>`
- Tag value: fixed `device_code` from config (`GJ-304-0088`)
## Run (also publish test data)
```
python mqtt_mock.py --config config.yaml --mode both --interval 2
@@ -43,6 +39,22 @@ python mqtt_sender.py --config config.yaml --interval 3
python mqtt_subscriber.py --config config.yaml
```
## Run WITS sender
Send one generated WITS packet to `wits.host/wits.port` or fallback `tms.server-ip/tms.server-port`:
```
python wits_sender.py --config config.yaml
```
Send packets continuously every 2 seconds:
```
python wits_sender.py --config config.yaml --interval 2 --count 0
```
Send a raw packet from file:
```
python wits_sender.py --config config.yaml --source-file data/wits_sample.txt
```
## Payload format (server -> broker)
```
{
@@ -102,12 +114,15 @@ python mqtt_subscriber.py --config config.yaml
```
## Config
- `pub-topic`: ingest topic from server
- `sub-topic`: forward topic for other services to subscribe
- `ack-topic`: optional ack topic
- `data-file`: local append-only file (JSON Lines)
- `equipment-sn`: default equipment_sn for simulated payloads
- `test-id`: default test_id for simulated payloads
- `mqtt.*`: MQTT broker and topic settings
- `tms.device-code`: default device code for generated data
- `tms.server-ip`: WITS target host fallback
- `tms.server-port`: WITS target port fallback
- `tms.timeout`: WITS socket timeout fallback
- `wits.host`: optional explicit WITS target host
- `wits.port`: optional explicit WITS target port
- `wits.timeout`: optional explicit WITS socket timeout
- `wits.source-file`: optional default WITS packet file path
- `tdengine.url`: e.g. `jdbc:TAOS-RS://192.168.1.87:6041/tms`
- `tdengine.username`: DB user
- `tdengine.password`: DB password
@@ -115,7 +130,7 @@ python mqtt_subscriber.py --config config.yaml
- `tdengine.stable`: default `drilling_realtime_st`
## Options
- `--mode`: publish | listen | both
- `--interval`: publish interval seconds
- `--count`: publish count (0 = forever)
- `--data-file`: override data-file in config
- `mqtt_mock.py`: `--mode` `--interval` `--count` `--data-file`
- `mqtt_sender.py`: `--interval` `--count`
- `mqtt_subscriber.py`: `--topic`
- `wits_sender.py`: `--host` `--port` `--timeout` `--source-file` `--interval` `--count`

View File

@@ -28,10 +28,19 @@ class TmsConfig:
server_port: int | None
@dataclass(frozen=True)
class WitsConfig:
host: str
port: int
timeout: int
source_file: str
@dataclass(frozen=True)
class AppConfig:
mqtt: MqttConfig
tms: TmsConfig
wits: WitsConfig
raw: dict
@@ -96,6 +105,12 @@ def load_app_config(path):
server_ip=get_value(raw, ("tms", "server-ip"), ("server-ip",)),
server_port=get_value(raw, ("tms", "server-port"), ("server-port",)),
),
wits=WitsConfig(
host=get_value(raw, ("wits", "host"), ("tms", "server-ip"), ("server-ip",), default=""),
port=int(get_value(raw, ("wits", "port"), ("tms", "server-port"), ("server-port",), default=0)),
timeout=int(get_value(raw, ("wits", "timeout"), ("tms", "timeout"), ("timeout",), default=10)),
source_file=get_value(raw, ("wits", "source-file"), default=""),
),
raw=raw,
)
@@ -106,6 +121,7 @@ __all__ = [
"AppConfig",
"MqttConfig",
"TmsConfig",
"WitsConfig",
"get_value",
"load_app_config",
"load_config",

18
config/config.py Normal file
View File

@@ -0,0 +1,18 @@
import yaml
from config.model import *
def load(path: str) -> "Config":
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
mqtt_cfg = MqttConfig(**data["mqtt"])
tms_cfg = TmsConfig(**data["tms"])
tdengine_cfg = TdengineConfig(**data["tdengine"])
return Config(
mqtt=mqtt_cfg,
tms=tms_cfg,
tdengine=tdengine_cfg
)

View File

@@ -1,41 +0,0 @@
from dataclasses import dataclass
from config import load_app_config
from db import TDengineWriter, load_tdengine_config
@dataclass(frozen=True)
class SenderDependencies:
config: object
@dataclass(frozen=True)
class SubscriberDependencies:
config: object
@dataclass(frozen=True)
class MockDependencies:
config: object
tdengine_config: object
tdengine_writer: object
data_file: str
def build_sender_dependencies(config_path):
return SenderDependencies(config=load_app_config(config_path))
def build_subscriber_dependencies(config_path):
return SubscriberDependencies(config=load_app_config(config_path))
def build_mock_dependencies(config_path, data_file_override=""):
app_config = load_app_config(config_path)
tdengine_config = load_tdengine_config(app_config, default_device_code=app_config.tms.device_code)
return MockDependencies(
config=app_config,
tdengine_config=tdengine_config,
tdengine_writer=TDengineWriter(tdengine_config),
data_file=data_file_override or app_config.mqtt.data_file,
)

40
config/model.py Normal file
View File

@@ -0,0 +1,40 @@
from dataclasses import dataclass
@dataclass
class MqttConfig:
broker: str
client_id: str
mock_client_id: str
sender_client_id: str
subscriber_client_id: str
username: str
password: str
pub_topic: str
@dataclass
class TmsConfig:
device_code: str
equipment_sn: str
timeout: int
keepalive: int
server_ip: str
server_port: int
@dataclass
class TdengineConfig:
url: str
username: str
password: str
database: str
stable: str
device_code: str
@dataclass
class Config:
mqtt: MqttConfig
tms: TmsConfig
tdengine: TdengineConfig

View File

@@ -1,3 +0,0 @@
from config import get_value, load_config
__all__ = ["get_value", "load_config"]

47
data/wits_sample.txt Normal file
View File

@@ -0,0 +1,47 @@
&&
01GJ-304-0088
020
030
041
0520250312.00
6120000.00
70
80.00
90.00
100.00
110.00
120.00
130.00
140.00
150.00
160.00
170.00
180.00
190.00
200
210.00
220.00
230
240
250
260.00
270.00
280
290.00
300.00
310.00
320.00
330.00
340.00
350.00
360.00
370
380
390.00
400.00
410.00
420.00
430.00
440.00
450.00
!!

View File

@@ -1,65 +0,0 @@
from dataclasses import dataclass
from urllib.parse import urlparse
from config import get_value
@dataclass(frozen=True)
class TDengineConfig:
url: str = ""
username: str = ""
password: str = ""
database: str = ""
stable: str = "drilling_realtime_st"
device_code: str = "GJ-304-0088"
pool_size: int = 2
timeout: int = 10
@property
def enabled(self):
return bool(self.base_url and self.database and self.username)
@property
def base_url(self):
base_url, _ = parse_taos_url(self.url)
return base_url
def parse_taos_url(jdbc_url):
if not jdbc_url:
return "", ""
raw = str(jdbc_url).strip()
if raw.lower().startswith("jdbc:taos-rs://"):
raw = "http://" + raw[len("jdbc:TAOS-RS://") :]
elif "://" not in raw:
raw = "http://" + raw
parsed = urlparse(raw)
base_url = f"{parsed.scheme or 'http'}://{parsed.hostname or '127.0.0.1'}:{parsed.port or 6041}"
database = (parsed.path or "").strip("/")
return base_url, database
def _resolve_raw_config(cfg_or_app):
raw = getattr(cfg_or_app, "raw", None)
return raw if isinstance(raw, dict) else cfg_or_app
def load_tdengine_config(cfg_or_app, default_device_code="GJ-304-0088"):
cfg = _resolve_raw_config(cfg_or_app)
url = get_value(cfg, ("tdengine", "url"), ("tdengine-url",), default="")
_, database_from_url = parse_taos_url(url)
return TDengineConfig(
url=url,
username=get_value(cfg, ("tdengine", "username"), ("tdengine-username",), default=""),
password=get_value(cfg, ("tdengine", "password"), ("tdengine-password",), default=""),
database=get_value(cfg, ("tdengine", "database"), ("tdengine-database",), default=database_from_url),
stable=get_value(cfg, ("tdengine", "stable"), ("tdengine-stable",), default="drilling_realtime_st"),
device_code=get_value(
cfg,
("tdengine", "device-code"),
("tdengine", "equipment-sn"),
default=default_device_code,
),
pool_size=int(get_value(cfg, ("tdengine", "pool-size"), ("tdengine-pool-size",), default=2)),
timeout=int(get_value(cfg, ("tdengine", "timeout"), ("tdengine-timeout",), default=10)),
)

View File

@@ -7,7 +7,7 @@ from urllib.parse import urlparse
import paho.mqtt.client as mqtt
from config.dependencies import build_mock_dependencies
from config.config import build_mock_dependencies
DATA_KEYS = [

View File

@@ -7,7 +7,7 @@ from urllib.parse import urlparse
import paho.mqtt.client as mqtt
from config.dependencies import build_sender_dependencies
from config.config import build_sender_dependencies
DATA_KEYS = [

View File

@@ -6,7 +6,7 @@ from urllib.parse import urlparse
import paho.mqtt.client as mqtt
from config.dependencies import build_subscriber_dependencies
from config.config import build_subscriber_dependencies
def parse_broker(broker):

21
requirements.md Normal file
View File

@@ -0,0 +1,21 @@
# tdEngine_mqtt_mock需求
## 功能
### wits数据模拟
1. 读取config.yaml中的server-ip和端口将模拟的wits数据发送到这个地址。wits数据要尽可能的拟真。
### mqtt消息订阅和入库
1. 订阅指定topic的消息将消息写入tdengine数据库tdengine的配置在config.yaml的tdengine。mqtt的配置在config.yaml的mqtt。
## 要求
程序启动入口main.py,使用logging记录日志。

268
wits_sender.py Normal file
View File

@@ -0,0 +1,268 @@
import argparse
import random
import socket
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from config.dependencies import build_wits_sender_dependencies
BEGIN_MARK = "&&\r\n"
END_MARK = "!!\r\n"
@dataclass(frozen=True)
class WitsData:
ts: int
wellid: str
stknum: int
recid: int
seqid: int
actual_date: float
actual_time: float
actual_ts: int
actcod: int
actod_label: str
deptbitm: float
deptbitv: float
deptmeas: float
deptvert: float
blkpos: float
ropa: float
hkla: float
hklx: float
woba: float
wobx: float
torqa: float
torqx: float
rpma: int
sppa: float
chkp: float
spm1: int
spm2: int
spm3: int
tvolact: float
tvolcact: float
mfop: int
mfoa: float
mfia: float
mdoa: float
mdia: float
mtoa: float
mtia: float
mcoa: float
mcia: float
stkc: int
lagstks: int
deptretm: float
gasa: float
space1: float
space2: float
space3: float
space4: float
space5: float
WITS_FIELD_MAPPING = [
(1, "wellid", "string"),
(2, "stknum", "int"),
(3, "recid", "int"),
(4, "seqid", "int"),
(5, "actual_date", "float"),
(6, "actual_time", "float"),
(7, "actcod", "int"),
(8, "deptbitm", "float"),
(9, "deptbitv", "float"),
(10, "deptmeas", "float"),
(11, "deptvert", "float"),
(12, "blkpos", "float"),
(13, "ropa", "float"),
(14, "hkla", "float"),
(15, "hklx", "float"),
(16, "woba", "float"),
(17, "wobx", "float"),
(18, "torqa", "float"),
(19, "torqx", "float"),
(20, "rpma", "int"),
(21, "sppa", "float"),
(22, "chkp", "float"),
(23, "spm1", "int"),
(24, "spm2", "int"),
(25, "spm3", "int"),
(26, "tvolact", "float"),
(27, "tvolcact", "float"),
(28, "mfop", "int"),
(29, "mfoa", "float"),
(30, "mfia", "float"),
(31, "mdoa", "float"),
(32, "mdia", "float"),
(33, "mtoa", "float"),
(34, "mtia", "float"),
(35, "mcoa", "float"),
(36, "mcia", "float"),
(37, "stkc", "int"),
(38, "lagstks", "int"),
(39, "deptretm", "float"),
(40, "gasa", "float"),
(41, "space1", "float"),
(42, "space2", "float"),
(43, "space3", "float"),
(44, "space4", "float"),
(45, "space5", "float"),
]
def rand_int(a, b):
return random.randint(a, b)
def rand_float(a, b, digits=2):
return round(random.uniform(a, b), digits)
def build_random_wits_data(device_code):
now = datetime.now()
ts_ms = int(time.time() * 1000)
return WitsData(
ts=ts_ms,
wellid=device_code,
stknum=rand_int(0, 500),
recid=0,
seqid=rand_int(1, 999999),
actual_date=float(now.strftime("%Y%m%d")),
actual_time=float(now.strftime("%H%M%S")),
actual_ts=ts_ms,
actcod=rand_int(0, 9),
actod_label="AUTO",
deptbitm=rand_float(0, 5000),
deptbitv=rand_float(0, 5000),
deptmeas=rand_float(0, 5000),
deptvert=rand_float(0, 5000),
blkpos=rand_float(0, 100),
ropa=rand_float(0, 200),
hkla=rand_float(0, 500),
hklx=rand_float(0, 500),
woba=rand_float(0, 200),
wobx=rand_float(0, 200),
torqa=rand_float(0, 200),
torqx=rand_float(0, 200),
rpma=rand_int(0, 300),
sppa=rand_float(0, 5000),
chkp=rand_float(0, 5000),
spm1=rand_int(0, 200),
spm2=rand_int(0, 200),
spm3=rand_int(0, 200),
tvolact=rand_float(0, 20000),
tvolcact=rand_float(0, 20000),
mfop=rand_int(0, 1000),
mfoa=rand_float(0, 1000),
mfia=rand_float(0, 1000),
mdoa=rand_float(0, 1000),
mdia=rand_float(0, 1000),
mtoa=rand_float(0, 1000),
mtia=rand_float(0, 1000),
mcoa=rand_float(0, 1000),
mcia=rand_float(0, 1000),
stkc=rand_int(0, 200),
lagstks=rand_int(0, 200),
deptretm=rand_float(0, 5000),
gasa=rand_float(0, 100),
space1=rand_float(0, 10),
space2=rand_float(0, 10),
space3=rand_float(0, 10),
space4=rand_float(0, 10),
space5=rand_float(0, 10),
)
def format_wits_value(value, kind):
if kind == "string":
return str(value)
if kind == "int":
return str(int(value))
return f"{float(value):.2f}"
def build_wits_packet(data):
lines = []
for index, field_name, kind in WITS_FIELD_MAPPING:
value = getattr(data, field_name)
lines.append(f"{index:02d}{format_wits_value(value, kind)}")
return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK
def normalize_packet(text):
body = text.replace("\r\n", "\n").replace("\r", "\n")
lines = [line.rstrip() for line in body.split("\n") if line.strip()]
if lines and lines[0] == "&&":
lines = lines[1:]
if lines and lines[-1] == "!!":
lines = lines[:-1]
return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK
def load_packet_from_file(path):
return normalize_packet(Path(path).read_text(encoding="utf-8-sig"))
def send_packet(host, port, timeout, packet):
with socket.create_connection((host, port), timeout=timeout) as sock:
sock.sendall(packet.encode("ascii", errors="strict"))
def run_wits_sender(args, deps):
wits_config = deps.config.wits
device_code = deps.config.tms.device_code
source_file = args.source_file or wits_config.source_file
host = args.host or wits_config.host
port = args.port or wits_config.port
timeout = args.timeout or wits_config.timeout
if not host or not port:
raise ValueError("WITS target host/port is empty. Configure wits.host/wits.port or tms.server-ip/tms.server-port")
print("WITS sender config:")
print(f" host: {host}")
print(f" port: {port}")
print(f" timeout: {timeout}s")
print(f" source-file: {source_file or '(generated)'}")
print(f" interval: {args.interval}s")
print(f" count: {args.count or 'forever'}")
seq = 0
try:
while True:
seq += 1
if source_file:
packet = load_packet_from_file(source_file)
else:
packet = build_wits_packet(build_random_wits_data(device_code))
send_packet(host, port, timeout, packet)
print(f"TX WITS #{seq} -> {host}:{port}")
print(packet)
if args.count and seq >= args.count:
break
time.sleep(args.interval)
except KeyboardInterrupt:
pass
def main():
ap = argparse.ArgumentParser(description="WITS TCP sender")
ap.add_argument("--config", default="config.yaml", help="Path to config yaml")
ap.add_argument("--host", default="", help="Override target host")
ap.add_argument("--port", type=int, default=0, help="Override target port")
ap.add_argument("--timeout", type=int, default=0, help="Override socket timeout")
ap.add_argument("--source-file", default="", help="Send raw WITS packet from file")
ap.add_argument("--interval", type=float, default=3.0, help="Send interval in seconds")
ap.add_argument("--count", type=int, default=1, help="Send count (0 = forever)")
args = ap.parse_args()
deps = build_wits_sender_dependencies(args.config)
run_wits_sender(args, deps)
if __name__ == "__main__":
main()