diff --git a/.gitignore b/.gitignore index 355de5f..2ec0c6b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ .pytest_cache/ .pytest_ -.vscode \ No newline at end of file +.vscode + +log/* \ No newline at end of file diff --git a/app/wits_sender.py b/app/wits_sender.py index 37b0db9..0946a75 100644 --- a/app/wits_sender.py +++ b/app/wits_sender.py @@ -7,74 +7,79 @@ from datetime import datetime from pathlib import Path from config import build_wits_sender_dependencies -from model import WITS_FIELD_MAPPING, WitsData +from model import WITS_CHANNEL_MAPPING, WitsData logger = logging.getLogger(__name__) BEGIN_MARK = "&&\r\n" END_MARK = "!!\r\n" +RECORD_TERMINATOR = "*\r\n" +RECONNECT_DELAY = 3 def rand_int(a, b): return random.randint(a, b) -def rand_float(a, b, digits=2): +def rand_float(a, b, digits=6): return round(random.uniform(a, b), digits) def build_random_wits_data(device_code): now = datetime.now() ts_ms = int(time.time() * 1000) + hook_load = rand_float(17.3, 18.8) + standpipe_pressure = rand_float(990.0, 1012.0) + mud_density = rand_float(1069.8, 1070.1) return WitsData( ts=ts_ms, - wellid=device_code, - stknum=rand_int(0, 500), - recid=0, - seqid=rand_int(1, 999999), - actual_date=float(now.strftime("%Y%m%d")), - actual_time=float(now.strftime("%H%M%S")), + wellid="???1", + stknum=0, + recid=1, + seqid=rand_int(1600, 9999), + actual_date=now.strftime("%y%m%d"), + actual_time=now.strftime("%H%M%S"), actual_ts=ts_ms, - actcod=rand_int(0, 9), + actcod=37, actod_label="AUTO", - deptbitm=rand_float(0, 5000), - deptbitv=rand_float(0, 5000), - deptmeas=rand_float(0, 5000), - deptvert=rand_float(0, 5000), - blkpos=rand_float(0, 100), - ropa=rand_float(0, 200), - hkla=rand_float(0, 500), - hklx=rand_float(0, 500), - woba=rand_float(0, 200), - wobx=rand_float(0, 200), - torqa=rand_float(0, 200), - torqx=rand_float(0, 200), - rpma=rand_int(0, 300), - sppa=rand_float(0, 5000), - chkp=rand_float(0, 5000), - spm1=rand_int(0, 200), - spm2=rand_int(0, 200), - spm3=rand_int(0, 200), - tvolact=rand_float(0, 20000), - tvolcact=rand_float(0, 20000), - mfop=rand_int(0, 1000), - mfoa=rand_float(0, 1000), - mfia=rand_float(0, 1000), - mdoa=rand_float(0, 1000), - mdia=rand_float(0, 1000), - mtoa=rand_float(0, 1000), - mtia=rand_float(0, 1000), - mcoa=rand_float(0, 1000), - mcia=rand_float(0, 1000), - stkc=rand_int(0, 200), - lagstks=rand_int(0, 200), - deptretm=rand_float(0, 5000), - gasa=rand_float(0, 100), - space1=rand_float(0, 10), - space2=rand_float(0, 10), - space3=rand_float(0, 10), - space4=rand_float(0, 10), - space5=rand_float(0, 10), + deptbitm=200.0, + deptbitv=198.551422, + deptmeas=200.0, + deptvert=198.551422, + blkpos=6.001850, + ropa=0.0, + hkla=hook_load, + hklx=hook_load, + woba=0.0, + wobx=-hook_load, + torqa=0.0, + torqx=0.0, + rpma=0, + sppa=standpipe_pressure, + chkp=0.0, + spm1=0, + spm2=0, + spm3=0, + tvolact=0.0, + tvolcact=0.0, + mfop=0, + mfoa=0.0, + mfia=0.0, + mdoa=mud_density, + mdia=26.846003, + mtoa=29.113855, + mtia=346.874634, + mcoa=241.874634, + mcia=0.0, + stkc=0, + lagstks=0, + deptretm=200.0, + gasa=0.0, + space1=0.0, + space2=0.0, + space3=0.0, + space4=0.0, + space5=0.0, ) @@ -83,15 +88,14 @@ def format_wits_value(value, kind): return str(value) if kind == "int": return str(int(value)) - return f"{float(value):.2f}" + if kind == "float6": + return f"{float(value):.6f}" + return str(value) def build_wits_packet(data): - lines = [] - for index, field_name, kind in WITS_FIELD_MAPPING: - value = getattr(data, field_name) - lines.append(f"{index:02d}{format_wits_value(value, kind)}") - return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + lines = [f"{channel}{format_wits_value(getattr(data, field_name), kind)}" for channel, field_name, kind in WITS_CHANNEL_MAPPING] + return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + RECORD_TERMINATOR def normalize_packet(text): @@ -99,49 +103,116 @@ def normalize_packet(text): lines = [line.rstrip() for line in body.split("\n") if line.strip()] if lines and lines[0] == "&&": lines = lines[1:] + if lines and lines[-1] == "*": + lines = lines[:-1] if lines and lines[-1] == "!!": lines = lines[:-1] - return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + return BEGIN_MARK + "\r\n".join(lines) + "\r\n" + END_MARK + RECORD_TERMINATOR def load_packet_from_file(path): return normalize_packet(Path(path).read_text(encoding="utf-8-sig")) -def send_packet(host, port, timeout, packet): - with socket.create_connection((host, port), timeout=timeout) as sock: - sock.sendall(packet.encode("ascii", errors="strict")) +def open_connection(host, port, timeout): + sock = socket.create_connection((host, port), timeout=timeout) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.settimeout(timeout) + return sock + + +def send_packet(sock, packet): + sock.sendall(packet.encode("ascii", errors="strict")) def run_wits_sender(args, deps): wits_config = deps.config.wits - device_code = deps.config.tms.device_code source_file = args.source_file or wits_config.source_file host = args.host or wits_config.host port = args.port or wits_config.port timeout = args.timeout or wits_config.timeout + interval = args.interval or 2.0 + if not host or not port: raise ValueError("WITS target host/port is empty. Configure wits.host/wits.port or tms.server-ip/tms.server-port") - logger.info("WITS sender config host=%s port=%s timeout=%ss source_file=%s interval=%ss count=%s", host, port, timeout, source_file or "(generated)", args.interval, args.count or "forever") + logger.info( + "WITS sender config host=%s port=%s timeout=%ss source_file=%s interval=%ss count=%s", + host, + port, + timeout, + source_file or "(generated)", + interval, + args.count or "forever", + ) seq = 0 + sock = None try: while True: - seq += 1 - if source_file: - packet = load_packet_from_file(source_file) - else: - packet = build_wits_packet(build_random_wits_data(device_code)) - send_packet(host, port, timeout, packet) - logger.info("TX WITS #%s -> %s:%s", seq, host, port) - if logger.isEnabledFor(logging.DEBUG): - logger.debug("WITS packet:\n%s", packet) - if args.count and seq >= args.count: - break - time.sleep(args.interval) + if sock is None: + try: + sock = open_connection(host, port, timeout) + logger.info("WITS connected %s:%s", host, port) + except ConnectionRefusedError: + logger.warning("WITS target refused connection %s:%s, retry in %ss", host, port, RECONNECT_DELAY) + time.sleep(RECONNECT_DELAY) + continue + except TimeoutError: + logger.warning("WITS connect timeout %s:%s, retry in %ss", host, port, RECONNECT_DELAY) + time.sleep(RECONNECT_DELAY) + continue + except OSError as exc: + logger.error("WITS connect failed %s:%s (%s), retry in %ss", host, port, exc, RECONNECT_DELAY) + time.sleep(RECONNECT_DELAY) + continue + + try: + seq += 1 + if source_file: + packet = load_packet_from_file(source_file) + else: + packet = build_wits_packet(build_random_wits_data(deps.config.tms.device_code)) + send_packet(sock, packet) + logger.info("TX WITS #%s -> %s:%s", seq, host, port) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("WITS packet:\n%s", packet) + if args.count and seq >= args.count: + break + time.sleep(interval) + except (BrokenPipeError, ConnectionResetError): + logger.warning("WITS connection dropped by remote host, reconnecting in %ss", RECONNECT_DELAY) + try: + sock.close() + except OSError: + pass + sock = None + time.sleep(RECONNECT_DELAY) + except TimeoutError: + logger.warning("WITS send timeout, reconnecting in %ss", RECONNECT_DELAY) + try: + sock.close() + except OSError: + pass + sock = None + time.sleep(RECONNECT_DELAY) + except OSError as exc: + logger.error("WITS send failed (%s), reconnecting in %ss", exc, RECONNECT_DELAY) + try: + sock.close() + except OSError: + pass + sock = None + time.sleep(RECONNECT_DELAY) except KeyboardInterrupt: logger.info("WITS sender interrupted") + finally: + if sock is not None: + try: + sock.close() + except OSError: + pass + logger.info("WITS disconnected") def add_arguments(parser): @@ -150,8 +221,8 @@ def add_arguments(parser): parser.add_argument("--port", type=int, default=0, help="Override target port") parser.add_argument("--timeout", type=int, default=0, help="Override socket timeout") parser.add_argument("--source-file", default="", help="Send raw WITS packet from file") - parser.add_argument("--interval", type=float, default=3.0, help="Send interval in seconds") - parser.add_argument("--count", type=int, default=1, help="Send count (0 = forever)") + parser.add_argument("--interval", type=float, default=2.0, help="Send interval in seconds") + parser.add_argument("--count", type=int, default=0, help="Send count (0 = forever)") def main(argv=None): diff --git a/db/orm.py b/db/orm.py index 2e35022..89a59bf 100644 --- a/db/orm.py +++ b/db/orm.py @@ -1,4 +1,5 @@ -import re +import logging +import re from model import DrillingRealtimeData @@ -88,14 +89,34 @@ class DrillingRealtimeORM: table_name = sanitize_identifier(f"drilling_realtime_{equipment_code}", "drilling_realtime_default") values = [] for column in DB_COLUMNS: - raw = getattr(entity, column) + raw = getattr(entity, column, None) + if raw is None: + logging.debug("Column %s is None, default=0", column) if column in INT_COLUMNS or column == "ts": - values.append(str(int(raw))) + values.append(str(to_int(raw))) else: - values.append(str(float(raw))) + values.append(str(to_float(raw))) columns_sql = ", ".join([f"`{column}`" for column in DB_COLUMNS]) values_sql = ", ".join(values) return ( f"INSERT INTO `{self.database}`.`{table_name}` USING `{self.database}`.`{self.stable}` " f"TAGS ({sql_quote(equipment_code)}) ({columns_sql}) VALUES ({values_sql})" ) + + +def to_int(value, default=0): + try: + if value is None or value == "": + return default + return int(value) + except Exception: + return default + + +def to_float(value, default=0.0): + try: + if value is None or value == "": + return default + return float(value) + except Exception: + return default \ No newline at end of file diff --git a/main.py b/main.py index 943b697..23e8ac5 100644 --- a/main.py +++ b/main.py @@ -1,54 +1,117 @@ import argparse import logging +import os +import threading +import time +from pathlib import Path +from types import SimpleNamespace -from app import mqtt_mock, mqtt_sender, mqtt_subscriber, wits_sender +from app import mqtt_mock, wits_sender +from config import build_mock_dependencies, build_wits_sender_dependencies + + +logger = logging.getLogger(__name__) +BASE_DIR = Path(__file__).resolve().parent def configure_logging(level_name): + log_dir = BASE_DIR / "log" + os.makedirs(log_dir, exist_ok=True) level = getattr(logging, str(level_name).upper(), logging.INFO) - logging.basicConfig( - level=level, - format="%(asctime)s %(levelname)s %(name)s - %(message)s", - ) + formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s - %(message)s") + + root_logger = logging.getLogger() + root_logger.handlers.clear() + root_logger.setLevel(level) + + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + console_handler.setFormatter(formatter) + + app_file_handler = logging.FileHandler(log_dir / "app.log", encoding="utf-8") + app_file_handler.setLevel(level) + app_file_handler.setFormatter(formatter) + + error_file_handler = logging.FileHandler(log_dir / "error.log", encoding="utf-8") + error_file_handler.setLevel(logging.ERROR) + error_file_handler.setFormatter(formatter) + + root_logger.addHandler(console_handler) + root_logger.addHandler(app_file_handler) + root_logger.addHandler(error_file_handler) def build_parser(): parser = argparse.ArgumentParser(description="tdEngine mqtt/wits mock application") + parser.add_argument("--config", default="config.yaml", help="Path to config yaml") parser.add_argument("--log-level", default="INFO", help="Logging level") - subparsers = parser.add_subparsers(dest="command", required=True) - - mqtt_mock_parser = subparsers.add_parser("mqtt-mock", help="Run MQTT mock service") - mqtt_mock.add_arguments(mqtt_mock_parser) - - mqtt_sender_parser = subparsers.add_parser("mqtt-sender", help="Run MQTT sender") - mqtt_sender.add_arguments(mqtt_sender_parser) - - mqtt_subscriber_parser = subparsers.add_parser("mqtt-subscriber", help="Run MQTT subscriber") - mqtt_subscriber.add_arguments(mqtt_subscriber_parser) - - wits_sender_parser = subparsers.add_parser("wits-sender", help="Run WITS sender") - wits_sender.add_arguments(wits_sender_parser) + parser.add_argument("--data-file", default="", help="Override data-file in config") + parser.add_argument("--wits-host", default="", help="Override WITS target host") + parser.add_argument("--wits-port", type=int, default=0, help="Override WITS target port") + parser.add_argument("--wits-timeout", type=int, default=0, help="Override WITS socket timeout") + parser.add_argument("--wits-source-file", default="", help="Send raw WITS packet from file") return parser +def build_mqtt_args(args): + return SimpleNamespace( + config=args.config, + mode="listen", + interval=2.0, + count=0, + data_file=args.data_file, + ) + + +def build_wits_args(args): + return SimpleNamespace( + config=args.config, + host=args.wits_host, + port=args.wits_port, + timeout=args.wits_timeout, + source_file=args.wits_source_file, + interval=2.0, + count=0, + ) + + +def start_worker(name, target, args, deps): + thread = threading.Thread(target=target, args=(args, deps), name=name, daemon=True) + thread.start() + return thread + + def main(argv=None): parser = build_parser() args = parser.parse_args(argv) configure_logging(args.log_level) - logging.getLogger(__name__).info("start app command=%s", args.command) - if args.command == "mqtt-mock": - deps = mqtt_mock.build_mock_dependencies(args.config, data_file_override=args.data_file) - mqtt_mock.run_mock_service(args, deps) - elif args.command == "mqtt-sender": - deps = mqtt_sender.build_sender_dependencies(args.config) - mqtt_sender.run_sender(args, deps) - elif args.command == "mqtt-subscriber": - deps = mqtt_subscriber.build_subscriber_dependencies(args.config) - mqtt_subscriber.run_subscriber(args, deps) - elif args.command == "wits-sender": - deps = wits_sender.build_wits_sender_dependencies(args.config) - wits_sender.run_wits_sender(args, deps) + logger.info("start app") + + mqtt_args = build_mqtt_args(args) + wits_args = build_wits_args(args) + + mqtt_deps = build_mock_dependencies(mqtt_args.config, data_file_override=mqtt_args.data_file) + wits_deps = build_wits_sender_dependencies(wits_args.config) + + threads = [ + start_worker("mqtt-mock", mqtt_mock.run_mock_service, mqtt_args, mqtt_deps), + start_worker("wits-sender", wits_sender.run_wits_sender, wits_args, wits_deps), + ] + + logger.info("services started: mqtt subscription+ingest and wits sender") + + try: + while True: + for thread in threads: + if not thread.is_alive(): + raise RuntimeError(f"worker stopped unexpectedly: {thread.name}") + time.sleep(1) + except KeyboardInterrupt: + logger.info("shutdown requested") + except Exception: + logger.exception("application stopped unexpectedly") + raise if __name__ == "__main__": diff --git a/model/__init__.py b/model/__init__.py index 5448725..9b2cb27 100644 --- a/model/__init__.py +++ b/model/__init__.py @@ -1,6 +1,9 @@ from model.config import AppConfig, MqttConfig, TdengineConfig, TmsConfig, WitsConfig from model.drilling import DrillingRealtimeData -from model.wits import WITS_FIELD_MAPPING, WitsData +from model.wits import WITS_CHANNEL_MAPPING, WitsData + +# Backward-compatible alias for older imports. +WITS_FIELD_MAPPING = WITS_CHANNEL_MAPPING __all__ = [ "AppConfig", @@ -8,6 +11,7 @@ __all__ = [ "MqttConfig", "TdengineConfig", "TmsConfig", + "WITS_CHANNEL_MAPPING", "WITS_FIELD_MAPPING", "WitsConfig", "WitsData", diff --git a/model/wits.py b/model/wits.py index b37d694..4e71361 100644 --- a/model/wits.py +++ b/model/wits.py @@ -8,8 +8,8 @@ class WitsData: stknum: int recid: int seqid: int - actual_date: float - actual_time: float + actual_date: str + actual_time: str actual_ts: int actcod: int actod_label: str @@ -53,50 +53,40 @@ class WitsData: space5: float -WITS_FIELD_MAPPING = [ - (1, "wellid", "string"), - (2, "stknum", "int"), - (3, "recid", "int"), - (4, "seqid", "int"), - (5, "actual_date", "float"), - (6, "actual_time", "float"), - (7, "actcod", "int"), - (8, "deptbitm", "float"), - (9, "deptbitv", "float"), - (10, "deptmeas", "float"), - (11, "deptvert", "float"), - (12, "blkpos", "float"), - (13, "ropa", "float"), - (14, "hkla", "float"), - (15, "hklx", "float"), - (16, "woba", "float"), - (17, "wobx", "float"), - (18, "torqa", "float"), - (19, "torqx", "float"), - (20, "rpma", "int"), - (21, "sppa", "float"), - (22, "chkp", "float"), - (23, "spm1", "int"), - (24, "spm2", "int"), - (25, "spm3", "int"), - (26, "tvolact", "float"), - (27, "tvolcact", "float"), - (28, "mfop", "int"), - (29, "mfoa", "float"), - (30, "mfia", "float"), - (31, "mdoa", "float"), - (32, "mdia", "float"), - (33, "mtoa", "float"), - (34, "mtia", "float"), - (35, "mcoa", "float"), - (36, "mcia", "float"), - (37, "stkc", "int"), - (38, "lagstks", "int"), - (39, "deptretm", "float"), - (40, "gasa", "float"), - (41, "space1", "float"), - (42, "space2", "float"), - (43, "space3", "float"), - (44, "space4", "float"), - (45, "space5", "float"), +WITS_CHANNEL_MAPPING = [ + ("0101", "wellid", "string"), + ("0102", "stknum", "int"), + ("0103", "recid", "int"), + ("0104", "seqid", "int"), + ("0105", "actual_date", "string"), + ("0106", "actual_time", "string"), + ("0107", "actcod", "int"), + ("0108", "deptbitm", "float6"), + ("0109", "deptbitv", "float6"), + ("0110", "deptmeas", "float6"), + ("0111", "deptvert", "float6"), + ("0112", "blkpos", "float6"), + ("0113", "ropa", "float6"), + ("0114", "hkla", "float6"), + ("0116", "woba", "float6"), + ("0117", "wobx", "float6"), + ("0118", "torqa", "float6"), + ("0119", "torqx", "float6"), + ("0120", "rpma", "int"), + ("0121", "sppa", "float6"), + ("0123", "spm1", "int"), + ("0124", "spm2", "int"), + ("0125", "spm3", "int"), + ("0126", "tvolact", "float6"), + ("0127", "tvolcact", "float6"), + ("0128", "mfop", "int"), + ("0130", "mfoa", "float6"), + ("0131", "mfia", "float6"), + ("0132", "mdoa", "float6"), + ("0133", "mdia", "float6"), + ("0134", "mtoa", "float6"), + ("0135", "mtia", "float6"), + ("0136", "mcoa", "float6"), + ("0137", "stkc", "int"), + ("0139", "deptretm", "float6"), ] diff --git a/requirements.md b/requirements.md index 8239844..d59da4f 100644 --- a/requirements.md +++ b/requirements.md @@ -21,6 +21,10 @@ 程序启动入口main.py,使用logging记录日志。 +启动是同时启动wits数据模拟和mqtt消息订阅和入库。不需要选择功能。 + 将业务对象抽取到model包下。 -数据库实体放在model包下。 \ No newline at end of file +数据库实体放在model包下。 + +将日志记录到log/app.log,错误日志记录到log/error.log