- 将主应用改为同时启动MQTT订阅入库和WITS数据发送两个服务 - 实现WITS发送器的自动重连机制和连接状态管理 - 添加日志记录到log/app.log和错误日志到log/error.log - 更新WITS通道映射定义并支持字符串类型的日期时间字段 - 修改数据入库逻辑以支持空值处理和类型转换容错 - 移除命令行子命令模式,改为配置文件驱动的参数设置 - 添加.vscode和log目录到.gitignore忽略列表
119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
from app import mqtt_mock, wits_sender
|
|
from config import build_mock_dependencies, build_wits_sender_dependencies
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
|
|
|
|
def configure_logging(level_name):
|
|
log_dir = BASE_DIR / "log"
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
level = getattr(logging, str(level_name).upper(), logging.INFO)
|
|
formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s - %(message)s")
|
|
|
|
root_logger = logging.getLogger()
|
|
root_logger.handlers.clear()
|
|
root_logger.setLevel(level)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(level)
|
|
console_handler.setFormatter(formatter)
|
|
|
|
app_file_handler = logging.FileHandler(log_dir / "app.log", encoding="utf-8")
|
|
app_file_handler.setLevel(level)
|
|
app_file_handler.setFormatter(formatter)
|
|
|
|
error_file_handler = logging.FileHandler(log_dir / "error.log", encoding="utf-8")
|
|
error_file_handler.setLevel(logging.ERROR)
|
|
error_file_handler.setFormatter(formatter)
|
|
|
|
root_logger.addHandler(console_handler)
|
|
root_logger.addHandler(app_file_handler)
|
|
root_logger.addHandler(error_file_handler)
|
|
|
|
|
|
def build_parser():
|
|
parser = argparse.ArgumentParser(description="tdEngine mqtt/wits mock application")
|
|
parser.add_argument("--config", default="config.yaml", help="Path to config yaml")
|
|
parser.add_argument("--log-level", default="INFO", help="Logging level")
|
|
parser.add_argument("--data-file", default="", help="Override data-file in config")
|
|
parser.add_argument("--wits-host", default="", help="Override WITS target host")
|
|
parser.add_argument("--wits-port", type=int, default=0, help="Override WITS target port")
|
|
parser.add_argument("--wits-timeout", type=int, default=0, help="Override WITS socket timeout")
|
|
parser.add_argument("--wits-source-file", default="", help="Send raw WITS packet from file")
|
|
return parser
|
|
|
|
|
|
def build_mqtt_args(args):
|
|
return SimpleNamespace(
|
|
config=args.config,
|
|
mode="listen",
|
|
interval=2.0,
|
|
count=0,
|
|
data_file=args.data_file,
|
|
)
|
|
|
|
|
|
def build_wits_args(args):
|
|
return SimpleNamespace(
|
|
config=args.config,
|
|
host=args.wits_host,
|
|
port=args.wits_port,
|
|
timeout=args.wits_timeout,
|
|
source_file=args.wits_source_file,
|
|
interval=2.0,
|
|
count=0,
|
|
)
|
|
|
|
|
|
def start_worker(name, target, args, deps):
|
|
thread = threading.Thread(target=target, args=(args, deps), name=name, daemon=True)
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def main(argv=None):
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
configure_logging(args.log_level)
|
|
|
|
logger.info("start app")
|
|
|
|
mqtt_args = build_mqtt_args(args)
|
|
wits_args = build_wits_args(args)
|
|
|
|
mqtt_deps = build_mock_dependencies(mqtt_args.config, data_file_override=mqtt_args.data_file)
|
|
wits_deps = build_wits_sender_dependencies(wits_args.config)
|
|
|
|
threads = [
|
|
start_worker("mqtt-mock", mqtt_mock.run_mock_service, mqtt_args, mqtt_deps),
|
|
start_worker("wits-sender", wits_sender.run_wits_sender, wits_args, wits_deps),
|
|
]
|
|
|
|
logger.info("services started: mqtt subscription+ingest and wits sender")
|
|
|
|
try:
|
|
while True:
|
|
for thread in threads:
|
|
if not thread.is_alive():
|
|
raise RuntimeError(f"worker stopped unexpectedly: {thread.name}")
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
logger.info("shutdown requested")
|
|
except Exception:
|
|
logger.exception("application stopped unexpectedly")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|