Files
tdEngine_mqtt_mock/main.py
wsy182 0a123ba210 feat(core): 重构主应用启动逻辑并改进WITS数据发送
- 将主应用改为同时启动MQTT订阅入库和WITS数据发送两个服务
- 实现WITS发送器的自动重连机制和连接状态管理
- 添加日志记录到log/app.log和错误日志到log/error.log
- 更新WITS通道映射定义并支持字符串类型的日期时间字段
- 修改数据入库逻辑以支持空值处理和类型转换容错
- 移除命令行子命令模式,改为配置文件驱动的参数设置
- 添加.vscode和log目录到.gitignore忽略列表
2026-03-12 13:32:27 +08:00

119 lines
3.7 KiB
Python

import argparse
import logging
import os
import threading
import time
from pathlib import Path
from types import SimpleNamespace
from app import mqtt_mock, wits_sender
from config import build_mock_dependencies, build_wits_sender_dependencies
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).resolve().parent
def configure_logging(level_name):
log_dir = BASE_DIR / "log"
os.makedirs(log_dir, exist_ok=True)
level = getattr(logging, str(level_name).upper(), logging.INFO)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s - %(message)s")
root_logger = logging.getLogger()
root_logger.handlers.clear()
root_logger.setLevel(level)
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
app_file_handler = logging.FileHandler(log_dir / "app.log", encoding="utf-8")
app_file_handler.setLevel(level)
app_file_handler.setFormatter(formatter)
error_file_handler = logging.FileHandler(log_dir / "error.log", encoding="utf-8")
error_file_handler.setLevel(logging.ERROR)
error_file_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
root_logger.addHandler(app_file_handler)
root_logger.addHandler(error_file_handler)
def build_parser():
parser = argparse.ArgumentParser(description="tdEngine mqtt/wits mock application")
parser.add_argument("--config", default="config.yaml", help="Path to config yaml")
parser.add_argument("--log-level", default="INFO", help="Logging level")
parser.add_argument("--data-file", default="", help="Override data-file in config")
parser.add_argument("--wits-host", default="", help="Override WITS target host")
parser.add_argument("--wits-port", type=int, default=0, help="Override WITS target port")
parser.add_argument("--wits-timeout", type=int, default=0, help="Override WITS socket timeout")
parser.add_argument("--wits-source-file", default="", help="Send raw WITS packet from file")
return parser
def build_mqtt_args(args):
return SimpleNamespace(
config=args.config,
mode="listen",
interval=2.0,
count=0,
data_file=args.data_file,
)
def build_wits_args(args):
return SimpleNamespace(
config=args.config,
host=args.wits_host,
port=args.wits_port,
timeout=args.wits_timeout,
source_file=args.wits_source_file,
interval=2.0,
count=0,
)
def start_worker(name, target, args, deps):
thread = threading.Thread(target=target, args=(args, deps), name=name, daemon=True)
thread.start()
return thread
def main(argv=None):
parser = build_parser()
args = parser.parse_args(argv)
configure_logging(args.log_level)
logger.info("start app")
mqtt_args = build_mqtt_args(args)
wits_args = build_wits_args(args)
mqtt_deps = build_mock_dependencies(mqtt_args.config, data_file_override=mqtt_args.data_file)
wits_deps = build_wits_sender_dependencies(wits_args.config)
threads = [
start_worker("mqtt-mock", mqtt_mock.run_mock_service, mqtt_args, mqtt_deps),
start_worker("wits-sender", wits_sender.run_wits_sender, wits_args, wits_deps),
]
logger.info("services started: mqtt subscription+ingest and wits sender")
try:
while True:
for thread in threads:
if not thread.is_alive():
raise RuntimeError(f"worker stopped unexpectedly: {thread.name}")
time.sleep(1)
except KeyboardInterrupt:
logger.info("shutdown requested")
except Exception:
logger.exception("application stopped unexpectedly")
raise
if __name__ == "__main__":
main()