Files
tdEngine_mqtt_mock/app/wits_sender.py
wsy182 0a123ba210 feat(core): 重构主应用启动逻辑并改进WITS数据发送
- 将主应用改为同时启动MQTT订阅入库和WITS数据发送两个服务
- 实现WITS发送器的自动重连机制和连接状态管理
- 添加日志记录到log/app.log和错误日志到log/error.log
- 更新WITS通道映射定义并支持字符串类型的日期时间字段
- 修改数据入库逻辑以支持空值处理和类型转换容错
- 移除命令行子命令模式,改为配置文件驱动的参数设置
- 添加.vscode和log目录到.gitignore忽略列表
2026-03-12 13:32:27 +08:00

238 lines
7.4 KiB
Python

import argparse
import logging
import random
import socket
import time
from datetime import datetime
from pathlib import Path
from config import build_wits_sender_dependencies
from model import WITS_CHANNEL_MAPPING, WitsData
logger = logging.getLogger(__name__)
BEGIN_MARK = "&&\r\n"
END_MARK = "!!\r\n"
RECORD_TERMINATOR = "*\r\n"
RECONNECT_DELAY = 3
def rand_int(a, b):
return random.randint(a, b)
def rand_float(a, b, digits=6):
return round(random.uniform(a, b), digits)
def build_random_wits_data(device_code):
now = datetime.now()
ts_ms = int(time.time() * 1000)
hook_load = rand_float(17.3, 18.8)
standpipe_pressure = rand_float(990.0, 1012.0)
mud_density = rand_float(1069.8, 1070.1)
return WitsData(
ts=ts_ms,
wellid="???1",
stknum=0,
recid=1,
seqid=rand_int(1600, 9999),
actual_date=now.strftime("%y%m%d"),
actual_time=now.strftime("%H%M%S"),
actual_ts=ts_ms,
actcod=37,
actod_label="AUTO",
deptbitm=200.0,
deptbitv=198.551422,
deptmeas=200.0,
deptvert=198.551422,
blkpos=6.001850,
ropa=0.0,
hkla=hook_load,
hklx=hook_load,
woba=0.0,
wobx=-hook_load,
torqa=0.0,
torqx=0.0,
rpma=0,
sppa=standpipe_pressure,
chkp=0.0,
spm1=0,
spm2=0,
spm3=0,
tvolact=0.0,
tvolcact=0.0,
mfop=0,
mfoa=0.0,
mfia=0.0,
mdoa=mud_density,
mdia=26.846003,
mtoa=29.113855,
mtia=346.874634,
mcoa=241.874634,
mcia=0.0,
stkc=0,
lagstks=0,
deptretm=200.0,
gasa=0.0,
space1=0.0,
space2=0.0,
space3=0.0,
space4=0.0,
space5=0.0,
)
def format_wits_value(value, kind):
if kind == "string":
return str(value)
if kind == "int":
return str(int(value))
if kind == "float6":
return f"{float(value):.6f}"
return str(value)
def build_wits_packet(data):
lines = [f"{channel}{format_wits_value(getattr(data, field_name), kind)}" for channel, field_name, kind in WITS_CHANNEL_MAPPING]
return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + RECORD_TERMINATOR
def normalize_packet(text):
body = text.replace("\r\n", "\n").replace("\r", "\n")
lines = [line.rstrip() for line in body.split("\n") if line.strip()]
if lines and lines[0] == "&&":
lines = lines[1:]
if lines and lines[-1] == "*":
lines = lines[:-1]
if lines and lines[-1] == "!!":
lines = lines[:-1]
return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + RECORD_TERMINATOR
def load_packet_from_file(path):
return normalize_packet(Path(path).read_text(encoding="utf-8-sig"))
def open_connection(host, port, timeout):
sock = socket.create_connection((host, port), timeout=timeout)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(timeout)
return sock
def send_packet(sock, packet):
sock.sendall(packet.encode("ascii", errors="strict"))
def run_wits_sender(args, deps):
wits_config = deps.config.wits
source_file = args.source_file or wits_config.source_file
host = args.host or wits_config.host
port = args.port or wits_config.port
timeout = args.timeout or wits_config.timeout
interval = args.interval or 2.0
if not host or not port:
raise ValueError("WITS target host/port is empty. Configure wits.host/wits.port or tms.server-ip/tms.server-port")
logger.info(
"WITS sender config host=%s port=%s timeout=%ss source_file=%s interval=%ss count=%s",
host,
port,
timeout,
source_file or "(generated)",
interval,
args.count or "forever",
)
seq = 0
sock = None
try:
while True:
if sock is None:
try:
sock = open_connection(host, port, timeout)
logger.info("WITS connected %s:%s", host, port)
except ConnectionRefusedError:
logger.warning("WITS target refused connection %s:%s, retry in %ss", host, port, RECONNECT_DELAY)
time.sleep(RECONNECT_DELAY)
continue
except TimeoutError:
logger.warning("WITS connect timeout %s:%s, retry in %ss", host, port, RECONNECT_DELAY)
time.sleep(RECONNECT_DELAY)
continue
except OSError as exc:
logger.error("WITS connect failed %s:%s (%s), retry in %ss", host, port, exc, RECONNECT_DELAY)
time.sleep(RECONNECT_DELAY)
continue
try:
seq += 1
if source_file:
packet = load_packet_from_file(source_file)
else:
packet = build_wits_packet(build_random_wits_data(deps.config.tms.device_code))
send_packet(sock, packet)
logger.info("TX WITS #%s -> %s:%s", seq, host, port)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("WITS packet:\n%s", packet)
if args.count and seq >= args.count:
break
time.sleep(interval)
except (BrokenPipeError, ConnectionResetError):
logger.warning("WITS connection dropped by remote host, reconnecting in %ss", RECONNECT_DELAY)
try:
sock.close()
except OSError:
pass
sock = None
time.sleep(RECONNECT_DELAY)
except TimeoutError:
logger.warning("WITS send timeout, reconnecting in %ss", RECONNECT_DELAY)
try:
sock.close()
except OSError:
pass
sock = None
time.sleep(RECONNECT_DELAY)
except OSError as exc:
logger.error("WITS send failed (%s), reconnecting in %ss", exc, RECONNECT_DELAY)
try:
sock.close()
except OSError:
pass
sock = None
time.sleep(RECONNECT_DELAY)
except KeyboardInterrupt:
logger.info("WITS sender interrupted")
finally:
if sock is not None:
try:
sock.close()
except OSError:
pass
logger.info("WITS disconnected")
def add_arguments(parser):
parser.add_argument("--config", default="config.yaml", help="Path to config yaml")
parser.add_argument("--host", default="", help="Override target host")
parser.add_argument("--port", type=int, default=0, help="Override target port")
parser.add_argument("--timeout", type=int, default=0, help="Override socket timeout")
parser.add_argument("--source-file", default="", help="Send raw WITS packet from file")
parser.add_argument("--interval", type=float, default=2.0, help="Send interval in seconds")
parser.add_argument("--count", type=int, default=0, help="Send count (0 = forever)")
def main(argv=None):
parser = argparse.ArgumentParser(description="WITS TCP sender")
add_arguments(parser)
args = parser.parse_args(argv)
deps = build_wits_sender_dependencies(args.config)
run_wits_sender(args, deps)
if __name__ == "__main__":
main()