feat(config): 添加配置管理和MQTT模拟服务功能

- 实现了应用配置的数据类结构(MqttConfig, TmsConfig, AppConfig)
- 创建了配置加载和解析功能,支持从YAML文件读取配置
- 添加了TDengine数据库配置和连接池管理
- 实现了MQTT客户端依赖注入和服务构建
- 创建了钻孔实时数据的ORM映射和SQL构建功能
- 实现了TDengine Writer用于数据写入超级表
- 添加了MQTT模拟服务,支持发布、订阅和数据转发功能
- 创建了随机数据发送器用于测试
- 实现了消息持久化到本地文件功能
- 配置了数据库连接池和SQL执行功能
This commit is contained in:
2026-03-12 09:58:00 +08:00
commit d5d1cb0b7d
19 changed files with 1224 additions and 0 deletions

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
.venv
.idea
.pytest_cache
.tox
.coverage
.coverage.*
.pytest_cache/
.pytest_
.vscode

121
README.md Normal file
View File

@@ -0,0 +1,121 @@
# MQTT Mock Service
Simulates a service that:
- Subscribes to the ingest topic (server -> broker)
- Writes received messages to a local file (JSONL)
- Writes received messages to TDengine (super table)
- Forwards messages to another topic for downstream subscribers
- Optionally sends ack messages
Also includes:
- A random data sender (every 3 seconds by default)
- A simple subscriber that prints incoming payloads
## Setup
```
python -m venv .venv
.\.venv\Scripts\activate
pip install -r requirements.txt
```
## Run (service mode)
```
python mqtt_mock.py --config config.yaml --mode listen
```
When TDengine config exists in `config.yaml`, each message on `pub-topic` is inserted into:
- Super table: `drilling_realtime_st`
- Sub table: auto-generated as `drilling_realtime_<device_code>`
- Tag value: fixed `device_code` from config (`GJ-304-0088`)
## Run (also publish test data)
```
python mqtt_mock.py --config config.yaml --mode both --interval 2
```
## Run random sender (3s interval)
```
python mqtt_sender.py --config config.yaml --interval 3
```
## Run subscriber
```
python mqtt_subscriber.py --config config.yaml
```
## Payload format (server -> broker)
```
{
"meta": {
"test_id": "550e8400e29b41d4a716446655440000",
"equipment_sn": "GJ-304-0088"
},
"data": {
"record_time": 1751964764000,
"wellid": "",
"stknum": 0,
"recid": 0,
"seqid": 0,
"actual_date": 0,
"actual_time": 0,
"actcod": 0,
"deptbitm": 0,
"deptbitv": 0,
"deptmeas": 0,
"deptvert": 0,
"blkpos": 0,
"ropa": 0,
"hkla": 0,
"hklx": 0,
"woba": 0,
"wobx": 0,
"torqa": 0,
"torqx": 0,
"rpma": 0,
"sppa": 0,
"chkp": 0,
"spm1": 0,
"spm2": 0,
"spm3": 0,
"tvolact": 0,
"tvolcact": 0,
"mfop": 0,
"mfoa": 0,
"mfia": 0,
"mdoa": 0,
"mdia": 0,
"mtoa": 0,
"mtia": 0,
"mcoa": 0,
"mcia": 0,
"stkc": 0,
"lagstks": 0,
"deptretm": 0,
"gasa": 0,
"space1": 0,
"space2": 0,
"space3": 0,
"space4": 0,
"space5": 0
}
}
```
## Config
- `pub-topic`: ingest topic from server
- `sub-topic`: forward topic for other services to subscribe
- `ack-topic`: optional ack topic
- `data-file`: local append-only file (JSON Lines)
- `equipment-sn`: default equipment_sn for simulated payloads
- `test-id`: default test_id for simulated payloads
- `tdengine.url`: e.g. `jdbc:TAOS-RS://192.168.1.87:6041/tms`
- `tdengine.username`: DB user
- `tdengine.password`: DB password
- `tdengine.database`: DB name (optional if URL already has `/tms`)
- `tdengine.stable`: default `drilling_realtime_st`
## Options
- `--mode`: publish | listen | both
- `--interval`: publish interval seconds
- `--count`: publish count (0 = forever)
- `--data-file`: override data-file in config

29
config.yaml Normal file
View File

@@ -0,0 +1,29 @@
mqtt:
broker: tcp://192.168.1.87:1883
client-id: tms-client-dev
mock-client-id: tms-client-dev-mock
sender-client-id: tms-client-dev-sender
subscriber-client-id: tms-client-dev-subscriber
username: test
password: S55HtwFZvhf67VpS
# Server publishes here; mock service subscribes and ingests
pub-topic: rules_gj_jxjs_wtt
# Mock service forwards here for other subscribers
tms:
device-code: GJ-304-0088
equipment-sn: GJ-304-0088
timeout: 10
keepalive: 20
server-ip: 192.168.1.41
server-port: 9929
tdengine:
url: jdbc:TAOS-RS://192.168.1.87:6041/tms
username: root
password: wDvfFffdwbm5U15K
# If omitted, database is parsed from url path: /tms
database: tms
stable: drilling_realtime_st
device-code: GJ-304-0088

113
config/__init__.py Normal file
View File

@@ -0,0 +1,113 @@
from dataclasses import dataclass
import yaml
@dataclass(frozen=True)
class MqttConfig:
broker: str
client_id: str
mock_client_id: str
sender_client_id: str
subscriber_client_id: str
username: str | None
password: str | None
pub_topic: str | None
sub_topic: str | None
ack_topic: str | None
data_file: str
@dataclass(frozen=True)
class TmsConfig:
device_code: str
equipment_sn: str
timeout: int
keepalive: int
server_ip: str | None
server_port: int | None
@dataclass(frozen=True)
class AppConfig:
mqtt: MqttConfig
tms: TmsConfig
raw: dict
def load_raw_config(path):
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def get_value(cfg, *paths, default=None):
for path in paths:
current = cfg
found = True
for key in path:
if not isinstance(current, dict) or key not in current:
found = False
break
current = current[key]
if found and current is not None:
return current
return default
def load_app_config(path):
raw = load_raw_config(path)
base_client_id = get_value(raw, ("mqtt", "client-id"), ("client-id",), default="mqtt")
device_code = get_value(
raw,
("tms", "device-code"),
("tms", "equipment-sn"),
("device-code",),
("equipment-sn",),
default="GJ-304-0088",
)
equipment_sn = get_value(
raw,
("tms", "equipment-sn"),
("tms", "device-code"),
("equipment-sn",),
("device-code",),
default=device_code,
)
return AppConfig(
mqtt=MqttConfig(
broker=get_value(raw, ("mqtt", "broker"), ("broker",), default=""),
client_id=base_client_id,
mock_client_id=get_value(raw, ("mqtt", "mock-client-id"), ("mock-client-id",), default=f"{base_client_id}-mock"),
sender_client_id=get_value(raw, ("mqtt", "sender-client-id"), ("sender-client-id",), default=f"{base_client_id}-sender"),
subscriber_client_id=get_value(raw, ("mqtt", "subscriber-client-id"), ("subscriber-client-id",), default=f"{base_client_id}-subscriber"),
username=get_value(raw, ("mqtt", "username"), ("username",)),
password=get_value(raw, ("mqtt", "password"), ("password",)),
pub_topic=get_value(raw, ("mqtt", "pub-topic"), ("pub-topic",)),
sub_topic=get_value(raw, ("mqtt", "sub-topic"), ("sub-topic",)),
ack_topic=get_value(raw, ("mqtt", "ack-topic"), ("ack-topic",)),
data_file=get_value(raw, ("mqtt", "data-file"), ("data-file",), default=""),
),
tms=TmsConfig(
device_code=device_code,
equipment_sn=equipment_sn,
timeout=int(get_value(raw, ("tms", "timeout"), ("timeout",), default=10)),
keepalive=int(get_value(raw, ("tms", "keepalive"), ("keepalive",), default=20)),
server_ip=get_value(raw, ("tms", "server-ip"), ("server-ip",)),
server_port=get_value(raw, ("tms", "server-port"), ("server-port",)),
),
raw=raw,
)
load_config = load_app_config
__all__ = [
"AppConfig",
"MqttConfig",
"TmsConfig",
"get_value",
"load_app_config",
"load_config",
"load_raw_config",
]

41
config/dependencies.py Normal file
View File

@@ -0,0 +1,41 @@
from dataclasses import dataclass
from config import load_app_config
from db import TDengineWriter, load_tdengine_config
@dataclass(frozen=True)
class SenderDependencies:
config: object
@dataclass(frozen=True)
class SubscriberDependencies:
config: object
@dataclass(frozen=True)
class MockDependencies:
config: object
tdengine_config: object
tdengine_writer: object
data_file: str
def build_sender_dependencies(config_path):
return SenderDependencies(config=load_app_config(config_path))
def build_subscriber_dependencies(config_path):
return SubscriberDependencies(config=load_app_config(config_path))
def build_mock_dependencies(config_path, data_file_override=""):
app_config = load_app_config(config_path)
tdengine_config = load_tdengine_config(app_config, default_device_code=app_config.tms.device_code)
return MockDependencies(
config=app_config,
tdengine_config=tdengine_config,
tdengine_writer=TDengineWriter(tdengine_config),
data_file=data_file_override or app_config.mqtt.data_file,
)

3
config_utils.py Normal file
View File

@@ -0,0 +1,3 @@
from config import get_value, load_config
__all__ = ["get_value", "load_config"]

51
createTable.sql Normal file
View File

@@ -0,0 +1,51 @@
DROP TABLE IF EXISTS drilling_realtime_st;
CREATE STABLE drilling_realtime_st (
ts TIMESTAMP,
stknum INT,
recid INT,
seqid INT,
actual_date FLOAT,
actual_time FLOAT,
actcod INT,
deptbitm FLOAT,
deptbitv FLOAT,
deptmeas FLOAT,
deptvert FLOAT,
blkpos FLOAT,
ropa FLOAT,
hkla FLOAT,
hklx FLOAT,
woba FLOAT,
wobx FLOAT,
torqa FLOAT,
torqx FLOAT,
rpma INT,
sppa FLOAT,
chkp FLOAT,
spm1 INT,
spm2 INT,
spm3 INT,
tvolact FLOAT,
tvolcact FLOAT,
mfop INT,
mfoa FLOAT,
mfia FLOAT,
mdoa FLOAT,
mdia FLOAT,
mtoa FLOAT,
mtia FLOAT,
mcoa FLOAT,
mcia FLOAT,
stkc INT,
lagstks INT,
deptretm FLOAT,
gasa FLOAT,
space1 FLOAT,
space2 FLOAT,
space3 FLOAT,
space4 FLOAT,
space5 FLOAT
) TAGS (
equipment_code VARCHAR(50)
);

14
db/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
from db.config import TDengineConfig, load_tdengine_config, parse_taos_url
from db.orm import DrillingRealtimeORM
from db.pool import TaosConnectionPool, create_taos_pool
from db.writer import TDengineWriter
__all__ = [
"DrillingRealtimeORM",
"TDengineConfig",
"TDengineWriter",
"TaosConnectionPool",
"create_taos_pool",
"load_tdengine_config",
"parse_taos_url",
]

65
db/config.py Normal file
View File

@@ -0,0 +1,65 @@
from dataclasses import dataclass
from urllib.parse import urlparse
from config import get_value
@dataclass(frozen=True)
class TDengineConfig:
url: str = ""
username: str = ""
password: str = ""
database: str = ""
stable: str = "drilling_realtime_st"
device_code: str = "GJ-304-0088"
pool_size: int = 2
timeout: int = 10
@property
def enabled(self):
return bool(self.base_url and self.database and self.username)
@property
def base_url(self):
base_url, _ = parse_taos_url(self.url)
return base_url
def parse_taos_url(jdbc_url):
if not jdbc_url:
return "", ""
raw = str(jdbc_url).strip()
if raw.lower().startswith("jdbc:taos-rs://"):
raw = "http://" + raw[len("jdbc:TAOS-RS://") :]
elif "://" not in raw:
raw = "http://" + raw
parsed = urlparse(raw)
base_url = f"{parsed.scheme or 'http'}://{parsed.hostname or '127.0.0.1'}:{parsed.port or 6041}"
database = (parsed.path or "").strip("/")
return base_url, database
def _resolve_raw_config(cfg_or_app):
raw = getattr(cfg_or_app, "raw", None)
return raw if isinstance(raw, dict) else cfg_or_app
def load_tdengine_config(cfg_or_app, default_device_code="GJ-304-0088"):
cfg = _resolve_raw_config(cfg_or_app)
url = get_value(cfg, ("tdengine", "url"), ("tdengine-url",), default="")
_, database_from_url = parse_taos_url(url)
return TDengineConfig(
url=url,
username=get_value(cfg, ("tdengine", "username"), ("tdengine-username",), default=""),
password=get_value(cfg, ("tdengine", "password"), ("tdengine-password",), default=""),
database=get_value(cfg, ("tdengine", "database"), ("tdengine-database",), default=database_from_url),
stable=get_value(cfg, ("tdengine", "stable"), ("tdengine-stable",), default="drilling_realtime_st"),
device_code=get_value(
cfg,
("tdengine", "device-code"),
("tdengine", "equipment-sn"),
default=default_device_code,
),
pool_size=int(get_value(cfg, ("tdengine", "pool-size"), ("tdengine-pool-size",), default=2)),
timeout=int(get_value(cfg, ("tdengine", "timeout"), ("tdengine-timeout",), default=10)),
)

122
db/orm.py Normal file
View File

@@ -0,0 +1,122 @@
import re
import time
DB_COLUMNS = [
"ts",
"stknum",
"recid",
"seqid",
"actual_date",
"actual_time",
"actcod",
"deptbitm",
"deptbitv",
"deptmeas",
"deptvert",
"blkpos",
"ropa",
"hkla",
"hklx",
"woba",
"wobx",
"torqa",
"torqx",
"rpma",
"sppa",
"chkp",
"spm1",
"spm2",
"spm3",
"tvolact",
"tvolcact",
"mfop",
"mfoa",
"mfia",
"mdoa",
"mdia",
"mtoa",
"mtia",
"mcoa",
"mcia",
"stkc",
"lagstks",
"deptretm",
"gasa",
"space1",
"space2",
"space3",
"space4",
"space5",
]
INT_COLUMNS = {"stknum", "recid", "seqid", "actcod", "rpma", "spm1", "spm2", "spm3", "mfop", "stkc", "lagstks"}
def sanitize_identifier(value, fallback):
cleaned = re.sub(r"[^A-Za-z0-9_]", "_", str(value or ""))
if not cleaned:
cleaned = fallback
if cleaned[0].isdigit():
cleaned = f"t_{cleaned}"
return cleaned.lower()
def sql_quote(value):
return "'" + str(value).replace("'", "''") + "'"
def to_int(value, default=0):
try:
return int(value)
except Exception:
return default
def to_float(value, default=0.0):
try:
return float(value)
except Exception:
return default
class DrillingRealtimeORM:
def __init__(self, database, stable="drilling_realtime_st", default_device_code="GJ-304-0088"):
self.database = database
self.stable = stable
self.default_device_code = default_device_code or "GJ-304-0088"
def build_insert_sql(self, payload):
if not isinstance(payload, dict):
raise ValueError("payload is not JSON object")
meta = payload.get("meta") if isinstance(payload.get("meta"), dict) else {}
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
equipment_code = (
str(self.default_device_code).strip()
or str(meta.get("equipment_code", "")).strip()
or str(meta.get("equipment_sn", "")).strip()
or "GJ-304-0088"
)
table_name = sanitize_identifier(
f"drilling_realtime_{equipment_code}",
"drilling_realtime_default",
)
values = []
for col in DB_COLUMNS:
if col == "ts":
raw = data.get("ts", data.get("record_time", int(time.time() * 1000)))
values.append(str(to_int(raw, int(time.time() * 1000))))
elif col in INT_COLUMNS:
values.append(str(to_int(data.get(col, 0), 0)))
else:
values.append(str(to_float(data.get(col, 0), 0.0)))
columns_sql = ", ".join([f"`{column}`" for column in DB_COLUMNS])
values_sql = ", ".join(values)
return (
f"INSERT INTO `{self.database}`.`{table_name}` USING `{self.database}`.`{self.stable}` "
f"TAGS ({sql_quote(equipment_code)}) ({columns_sql}) VALUES ({values_sql})"
)

60
db/pool.py Normal file
View File

@@ -0,0 +1,60 @@
import base64
from queue import LifoQueue
from urllib.request import Request, urlopen
class TaosConnection:
def __init__(self, base_url, username, password, timeout=10):
self.base_url = base_url
self.username = username or ""
self.password = password or ""
self.timeout = timeout
def execute(self, sql):
auth = f"{self.username}:{self.password}"
auth_header = base64.b64encode(auth.encode("utf-8")).decode("ascii")
req = Request(
url=f"{self.base_url}/rest/sql",
data=sql.encode("utf-8"),
headers={
"Authorization": f"Basic {auth_header}",
"Content-Type": "text/plain",
},
method="POST",
)
with urlopen(req, timeout=self.timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
if resp.status != 200:
raise RuntimeError(f"HTTP {resp.status} {body}")
return body
class TaosConnectionPool:
def __init__(self, base_url, username, password, pool_size=2, timeout=10):
self.base_url = base_url
self.username = username
self.password = password
self.pool_size = max(int(pool_size or 1), 1)
self.timeout = timeout
self._pool = LifoQueue(maxsize=self.pool_size)
for _ in range(self.pool_size):
self._pool.put(TaosConnection(base_url, username, password, timeout=timeout))
def execute(self, sql):
conn = self._pool.get()
try:
return conn.execute(sql)
finally:
self._pool.put(conn)
def create_taos_pool(config):
if not config.enabled:
return None
return TaosConnectionPool(
config.base_url,
config.username,
config.password,
pool_size=config.pool_size,
timeout=config.timeout,
)

20
db/writer.py Normal file
View File

@@ -0,0 +1,20 @@
from db.orm import DrillingRealtimeORM
from db.pool import create_taos_pool
class TDengineWriter:
def __init__(self, config, pool=None, orm=None):
self.config = config
self.pool = pool if pool is not None else create_taos_pool(config)
self.orm = orm if orm is not None else DrillingRealtimeORM(
config.database,
stable=config.stable,
default_device_code=config.device_code,
)
self.enabled = bool(config.enabled and self.pool)
def write_payload(self, payload):
if not self.enabled:
return None
sql = self.orm.build_insert_sql(payload)
return self.pool.execute(sql)

1
install.bat Normal file
View File

@@ -0,0 +1 @@
pip install pyyaml paho-mqtt

10
main.py Normal file
View File

@@ -0,0 +1,10 @@
import logging
def main():
logging.info("start app....")
if __name__ == '__main__':
main()

255
mqtt_mock.py Normal file
View File

@@ -0,0 +1,255 @@
import argparse
import json
import os
import time
from datetime import datetime
from urllib.parse import urlparse
import paho.mqtt.client as mqtt
from config.dependencies import build_mock_dependencies
DATA_KEYS = [
"ts",
"wellid",
"stknum",
"recid",
"seqid",
"actual_date",
"actual_time",
"actcod",
"deptbitm",
"deptbitv",
"deptmeas",
"deptvert",
"blkpos",
"ropa",
"hkla",
"hklx",
"woba",
"wobx",
"torqa",
"torqx",
"rpma",
"sppa",
"chkp",
"spm1",
"spm2",
"spm3",
"tvolact",
"tvolcact",
"mfop",
"mfoa",
"mfia",
"mdoa",
"mdia",
"mtoa",
"mtia",
"mcoa",
"mcia",
"stkc",
"lagstks",
"deptretm",
"gasa",
"space1",
"space2",
"space3",
"space4",
"space5",
]
def parse_broker(broker):
if not broker:
raise ValueError("broker is required")
if "://" not in broker:
broker = "tcp://" + broker
parsed = urlparse(broker)
host = parsed.hostname or "localhost"
port = parsed.port or 1883
scheme = (parsed.scheme or "tcp").lower()
return scheme, host, port
def ensure_dir(path):
if not path:
return
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
def build_server_payload(equipment_code):
data = {key: 0 for key in DATA_KEYS}
data["ts"] = int(time.time() * 1000)
data["wellid"] = ""
return {
"meta": {
"equipment_code": equipment_code,
"equipment_sn": equipment_code,
},
"data": data,
}
def parse_payload(raw_payload):
try:
return json.loads(raw_payload)
except Exception:
return raw_payload
def build_ack_payload(topic, raw_payload):
payload = {
"ack": True,
"topic": topic,
"ts": datetime.utcnow().isoformat(timespec="seconds") + "Z",
}
try:
obj = json.loads(raw_payload)
if isinstance(obj, dict):
meta = obj.get("meta") if isinstance(obj.get("meta"), dict) else {}
if "equipment_code" in meta:
payload["equipment_code"] = meta["equipment_code"]
if "equipment_sn" in meta:
payload["equipment_sn"] = meta["equipment_sn"]
if "id" in obj:
payload["id"] = obj["id"]
payload["src"] = obj
except Exception:
payload["raw"] = raw_payload
return payload
def write_message(path, topic, raw_payload):
ensure_dir(path)
record = {
"ts": datetime.utcnow().isoformat(timespec="seconds") + "Z",
"topic": topic,
"payload": parse_payload(raw_payload),
}
payload_obj = record["payload"]
if isinstance(payload_obj, dict):
data = payload_obj.get("data")
if isinstance(data, dict) and isinstance(data.get("ts"), (int, float)):
record["ts_iso"] = datetime.utcfromtimestamp(data["ts"] / 1000.0).isoformat(timespec="seconds") + "Z"
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=True))
f.write("\n")
def run_mock_service(args, deps):
mqtt_config = deps.config.mqtt
tms_config = deps.config.tms
tdengine_config = deps.tdengine_config
tdengine_writer = deps.tdengine_writer
scheme, host, port = parse_broker(mqtt_config.broker)
print("MQTT mock config:")
print(f" broker: {scheme}://{host}:{port}")
print(f" client-id: {mqtt_config.mock_client_id}")
print(f" pub-topic (ingest): {mqtt_config.pub_topic}")
print(f" sub-topic (forward): {mqtt_config.sub_topic}")
print(f" ack-topic: {mqtt_config.ack_topic}")
print(f" data-file: {deps.data_file}")
print(f" device-code: {tms_config.device_code}")
print(f" tdengine enabled: {tdengine_writer.enabled}")
if tdengine_writer.enabled:
print(f" tdengine host: {tdengine_config.base_url}")
print(f" tdengine database: {tdengine_config.database}")
print(f" tdengine stable: {tdengine_config.stable}")
print(f" tdengine pool-size: {tdengine_config.pool_size}")
print(f" mode: {args.mode}")
client = mqtt.Client(client_id=mqtt_config.mock_client_id, clean_session=True)
if mqtt_config.username is not None:
client.username_pw_set(mqtt_config.username, mqtt_config.password)
if scheme in ("ssl", "tls", "mqtts"):
client.tls_set()
def on_connect(c, userdata, flags, rc):
if rc == 0:
print("Connected")
else:
print(f"Connect failed rc={rc}")
return
if args.mode in ("listen", "both") and mqtt_config.pub_topic:
c.subscribe(mqtt_config.pub_topic)
print(f"Subscribed: {mqtt_config.pub_topic}")
def on_disconnect(c, userdata, rc):
print(f"Disconnected callback rc={rc}")
def on_message(c, userdata, msg):
payload = msg.payload.decode("utf-8", errors="replace")
print(f"RX {msg.topic}: {payload}")
if msg.topic != mqtt_config.pub_topic:
return
if deps.data_file:
write_message(deps.data_file, msg.topic, payload)
print(f"Wrote to file: {deps.data_file}")
if tdengine_writer.enabled:
try:
tdengine_writer.write_payload(parse_payload(payload))
print("Wrote to TDengine")
except Exception as exc:
print(f"Write TDengine failed: {exc}")
if mqtt_config.sub_topic:
c.publish(mqtt_config.sub_topic, payload)
print(f"Forwarded to {mqtt_config.sub_topic}")
if mqtt_config.ack_topic:
ack_payload = build_ack_payload(msg.topic, payload)
c.publish(mqtt_config.ack_topic, json.dumps(ack_payload, ensure_ascii=True))
print(f"TX {mqtt_config.ack_topic}: {ack_payload}")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect(host, port, keepalive=tms_config.keepalive)
client.loop_start()
try:
if args.mode in ("publish", "both"):
if not mqtt_config.pub_topic:
print("pub-topic is empty; nothing to publish")
else:
seq = 0
while True:
seq += 1
payload = build_server_payload(tms_config.device_code)
client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True))
print(f"TX {mqtt_config.pub_topic}: {payload}")
if args.count and seq >= args.count:
break
time.sleep(args.interval)
else:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
client.loop_stop()
client.disconnect()
print("Disconnected")
def main():
ap = argparse.ArgumentParser(description="MQTT mock service")
ap.add_argument("--config", default="config.yaml", help="Path to config yaml")
ap.add_argument("--mode", choices=["publish", "listen", "both"], default="listen")
ap.add_argument("--interval", type=float, default=2.0, help="Publish interval (seconds)")
ap.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)")
ap.add_argument("--data-file", default="", help="Override data-file in config")
args = ap.parse_args()
deps = build_mock_dependencies(args.config, data_file_override=args.data_file)
run_mock_service(args, deps)
if __name__ == "__main__":
main()

193
mqtt_sender.py Normal file
View File

@@ -0,0 +1,193 @@
import argparse
import json
import random
import time
from datetime import datetime
from urllib.parse import urlparse
import paho.mqtt.client as mqtt
from config.dependencies import build_sender_dependencies
DATA_KEYS = [
"ts",
"wellid",
"stknum",
"recid",
"seqid",
"actual_date",
"actual_time",
"actcod",
"deptbitm",
"deptbitv",
"deptmeas",
"deptvert",
"blkpos",
"ropa",
"hkla",
"hklx",
"woba",
"wobx",
"torqa",
"torqx",
"rpma",
"sppa",
"chkp",
"spm1",
"spm2",
"spm3",
"tvolact",
"tvolcact",
"mfop",
"mfoa",
"mfia",
"mdoa",
"mdia",
"mtoa",
"mtia",
"mcoa",
"mcia",
"stkc",
"lagstks",
"deptretm",
"gasa",
"space1",
"space2",
"space3",
"space4",
"space5",
]
def parse_broker(broker):
if not broker:
raise ValueError("broker is required")
if "://" not in broker:
broker = "tcp://" + broker
parsed = urlparse(broker)
host = parsed.hostname or "localhost"
port = parsed.port or 1883
scheme = (parsed.scheme or "tcp").lower()
return scheme, host, port
def rand_int(a, b):
return random.randint(a, b)
def build_random_payload(equipment_code):
data = {key: 0 for key in DATA_KEYS}
data["ts"] = int(time.time() * 1000)
data["wellid"] = random.choice(["", f"WELL-{rand_int(1, 9999):04d}"])
data["stknum"] = rand_int(0, 500)
data["recid"] = rand_int(0, 100000)
data["seqid"] = rand_int(0, 100000)
data["actual_date"] = int(datetime.utcnow().strftime("%Y%m%d"))
data["actual_time"] = int(datetime.utcnow().strftime("%H%M%S"))
data["actcod"] = rand_int(0, 9)
data["deptbitm"] = rand_int(0, 5000)
data["deptbitv"] = rand_int(0, 5000)
data["deptmeas"] = rand_int(0, 5000)
data["deptvert"] = rand_int(0, 5000)
data["blkpos"] = rand_int(0, 100)
data["ropa"] = rand_int(0, 200)
data["hkla"] = rand_int(0, 500)
data["hklx"] = rand_int(0, 500)
data["woba"] = rand_int(0, 200)
data["wobx"] = rand_int(0, 200)
data["torqa"] = rand_int(0, 200)
data["torqx"] = rand_int(0, 200)
data["rpma"] = rand_int(0, 300)
data["sppa"] = rand_int(0, 5000)
data["chkp"] = rand_int(0, 5000)
data["spm1"] = rand_int(0, 200)
data["spm2"] = rand_int(0, 200)
data["spm3"] = rand_int(0, 200)
data["tvolact"] = rand_int(0, 20000)
data["tvolcact"] = rand_int(0, 20000)
data["mfop"] = rand_int(0, 1000)
data["mfoa"] = rand_int(0, 1000)
data["mfia"] = rand_int(0, 1000)
data["mdoa"] = rand_int(0, 1000)
data["mdia"] = rand_int(0, 1000)
data["mtoa"] = rand_int(0, 1000)
data["mtia"] = rand_int(0, 1000)
data["mcoa"] = rand_int(0, 1000)
data["mcia"] = rand_int(0, 1000)
data["stkc"] = rand_int(0, 200)
data["lagstks"] = rand_int(0, 200)
data["deptretm"] = rand_int(0, 5000)
data["gasa"] = rand_int(0, 100)
data["space1"] = rand_int(0, 10)
data["space2"] = rand_int(0, 10)
data["space3"] = rand_int(0, 10)
data["space4"] = rand_int(0, 10)
data["space5"] = rand_int(0, 10)
return {
"meta": {
"equipment_code": equipment_code,
"equipment_sn": equipment_code,
},
"data": data,
}
def run_sender(args, deps):
mqtt_config = deps.config.mqtt
tms_config = deps.config.tms
scheme, host, port = parse_broker(mqtt_config.broker)
print("MQTT sender config:")
print(f" broker: {scheme}://{host}:{port}")
print(f" client-id: {mqtt_config.sender_client_id}")
print(f" pub-topic: {mqtt_config.pub_topic}")
print(f" interval: {args.interval}s")
client = mqtt.Client(client_id=mqtt_config.sender_client_id, clean_session=True)
if mqtt_config.username is not None:
client.username_pw_set(mqtt_config.username, mqtt_config.password)
if scheme in ("ssl", "tls", "mqtts"):
client.tls_set()
def on_disconnect(c, userdata, rc):
print(f"Disconnected callback rc={rc}")
client.on_disconnect = on_disconnect
client.connect(host, port, keepalive=tms_config.keepalive)
client.loop_start()
try:
if not mqtt_config.pub_topic:
print("pub-topic is empty; nothing to publish")
return
seq = 0
while True:
seq += 1
payload = build_random_payload(tms_config.device_code)
client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True))
print(f"TX {mqtt_config.pub_topic}: {payload}")
if args.count and seq >= args.count:
break
time.sleep(args.interval)
except KeyboardInterrupt:
pass
finally:
client.loop_stop()
client.disconnect()
print("Disconnected")
def main():
ap = argparse.ArgumentParser(description="MQTT random data sender")
ap.add_argument("--config", default="config.yaml", help="Path to config yaml")
ap.add_argument("--interval", type=float, default=3.0, help="Publish interval (seconds)")
ap.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)")
args = ap.parse_args()
deps = build_sender_dependencies(args.config)
run_sender(args, deps)
if __name__ == "__main__":
main()

112
mqtt_subscriber.py Normal file
View File

@@ -0,0 +1,112 @@
import argparse
import json
import time
from datetime import datetime
from urllib.parse import urlparse
import paho.mqtt.client as mqtt
from config.dependencies import build_subscriber_dependencies
def parse_broker(broker):
if not broker:
raise ValueError("broker is required")
if "://" not in broker:
broker = "tcp://" + broker
parsed = urlparse(broker)
host = parsed.hostname or "localhost"
port = parsed.port or 1883
scheme = (parsed.scheme or "tcp").lower()
return scheme, host, port
def print_flat_fields(title, value):
print(title)
if isinstance(value, dict):
for k, v in value.items():
print(f" {k}: {v}")
else:
print(f" {value}")
def run_subscriber(args, deps):
mqtt_config = deps.config.mqtt
tms_config = deps.config.tms
topic = args.topic or mqtt_config.sub_topic or mqtt_config.pub_topic
if not topic:
raise ValueError("No topic to subscribe. Set sub-topic or pub-topic, or pass --topic")
scheme, host, port = parse_broker(mqtt_config.broker)
print("MQTT subscriber config:")
print(f" broker: {scheme}://{host}:{port}")
print(f" client-id: {mqtt_config.subscriber_client_id}")
print(f" topic: {topic}")
client = mqtt.Client(client_id=mqtt_config.subscriber_client_id, clean_session=True)
if mqtt_config.username is not None:
client.username_pw_set(mqtt_config.username, mqtt_config.password)
if scheme in ("ssl", "tls", "mqtts"):
client.tls_set()
def on_connect(c, userdata, flags, rc):
if rc == 0:
print("Connected")
c.subscribe(topic)
print(f"Subscribed: {topic}")
else:
print(f"Connect failed rc={rc}")
def on_disconnect(c, userdata, rc):
print(f"Disconnected callback rc={rc}")
def on_message(c, userdata, msg):
raw_payload = msg.payload.decode("utf-8", errors="replace")
print("=" * 80)
print(f"RX Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Topic: {msg.topic}")
print(f"QoS: {msg.qos}, Retain: {msg.retain}, Bytes: {len(msg.payload)}")
try:
obj = json.loads(raw_payload)
print("Raw JSON:")
print(json.dumps(obj, ensure_ascii=False, indent=2))
if isinstance(obj, dict):
print_flat_fields("meta:", obj.get("meta"))
print_flat_fields("data:", obj.get("data"))
extra_keys = [key for key in obj.keys() if key not in ("meta", "data")]
for key in extra_keys:
print_flat_fields(f"{key}:", obj.get(key))
except Exception:
print("Raw payload:")
print(raw_payload)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect(host, port, keepalive=tms_config.keepalive)
client.loop_start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
client.loop_stop()
client.disconnect()
print("Disconnected")
def main():
ap = argparse.ArgumentParser(description="MQTT subscriber")
ap.add_argument("--config", default="config.yaml", help="Path to config yaml")
ap.add_argument("--topic", default="", help="Override topic to subscribe")
args = ap.parse_args()
deps = build_subscriber_dependencies(args.config)
run_subscriber(args, deps)
if __name__ == "__main__":
main()

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
paho-mqtt>=1.6.0
PyYAML>=6.0

0
utils/__init__.py Normal file
View File