From 6557479a2f1082ea31874881aa51aadfb5c91e4c Mon Sep 17 00:00:00 2001 From: wsy182 <2392948297@qq.com> Date: Thu, 12 Mar 2026 10:41:26 +0800 Subject: [PATCH] =?UTF-8?q?refactor(config):=20=E9=87=8D=E6=9E=84=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=A8=A1=E5=9D=97=E5=B9=B6=E4=BC=98=E5=8C=96=E5=BA=94?= =?UTF-8?q?=E7=94=A8=E4=BE=9D=E8=B5=96=E6=B3=A8=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将配置相关类移动到model模块 - 实现依赖注入容器管理各组件依赖关系 - 重构配置加载逻辑支持多层级键值查找 - 更新主应用入口支持命令行参数解析 - 统一日志输出格式替换原有打印语句 - 引入钻井实时数据模型简化数据处理 - 移除硬编码字段映射改用动态配置方式 - 优化数据库写入逻辑基于新的数据模型 --- app/mqtt_mock.py | 137 ++++++++--------------------- app/mqtt_sender.py | 195 ++++++++++++++++------------------------- app/mqtt_subscriber.py | 61 ++++++------- app/wits_sender.py | 148 +++++-------------------------- config/__init__.py | 145 +++++------------------------- config/config.py | 87 +++++++++++++++--- config/dependencies.py | 51 +++++++++++ config/model.py | 48 +++------- db/config.py | 41 +++++++++ db/orm.py | 41 +++------ main.py | 53 ++++++++++- model/__init__.py | 14 +++ model/config.py | 72 +++++++++++++++ model/drilling.py | 176 +++++++++++++++++++++++++++++++++++++ model/wits.py | 102 +++++++++++++++++++++ requirements.md | 1 + 16 files changed, 783 insertions(+), 589 deletions(-) create mode 100644 config/dependencies.py create mode 100644 db/config.py create mode 100644 model/config.py create mode 100644 model/drilling.py create mode 100644 model/wits.py diff --git a/app/mqtt_mock.py b/app/mqtt_mock.py index 1c2462e..55fa725 100644 --- a/app/mqtt_mock.py +++ b/app/mqtt_mock.py @@ -1,5 +1,6 @@ import argparse import json +import logging import os import time from datetime import datetime @@ -7,57 +8,11 @@ from urllib.parse import urlparse import paho.mqtt.client as mqtt -from config.config import build_mock_dependencies +from config import build_mock_dependencies +from model import DrillingRealtimeData -DATA_KEYS = [ - "ts", - "wellid", - "stknum", - "recid", - "seqid", - "actual_date", - "actual_time", - "actcod", - "deptbitm", - "deptbitv", - "deptmeas", - "deptvert", - "blkpos", - "ropa", - "hkla", - "hklx", - "woba", - "wobx", - "torqa", - "torqx", - "rpma", - "sppa", - "chkp", - "spm1", - "spm2", - "spm3", - "tvolact", - "tvolcact", - "mfop", - "mfoa", - "mfia", - "mdoa", - "mdia", - "mtoa", - "mtia", - "mcoa", - "mcia", - "stkc", - "lagstks", - "deptretm", - "gasa", - "space1", - "space2", - "space3", - "space4", - "space5", -] +logger = logging.getLogger(__name__) def parse_broker(broker): @@ -81,16 +36,7 @@ def ensure_dir(path): def build_server_payload(equipment_code): - data = {key: 0 for key in DATA_KEYS} - data["ts"] = int(time.time() * 1000) - data["wellid"] = "" - return { - "meta": { - "equipment_code": equipment_code, - "equipment_sn": equipment_code, - }, - "data": data, - } + return DrillingRealtimeData.empty(wellid=equipment_code).to_payload(equipment_code) def parse_payload(raw_payload): @@ -146,67 +92,52 @@ def run_mock_service(args, deps): tdengine_writer = deps.tdengine_writer scheme, host, port = parse_broker(mqtt_config.broker) - print("MQTT mock config:") - print(f" broker: {scheme}://{host}:{port}") - print(f" client-id: {mqtt_config.mock_client_id}") - print(f" pub-topic (ingest): {mqtt_config.pub_topic}") - print(f" sub-topic (forward): {mqtt_config.sub_topic}") - print(f" ack-topic: {mqtt_config.ack_topic}") - print(f" data-file: {deps.data_file}") - print(f" device-code: {tms_config.device_code}") - print(f" tdengine enabled: {tdengine_writer.enabled}") + logger.info("MQTT mock config broker=%s://%s:%s client_id=%s mode=%s", scheme, host, port, mqtt_config.mock_client_id, args.mode) + logger.info("Topics ingest=%s forward=%s ack=%s", mqtt_config.pub_topic, mqtt_config.sub_topic, mqtt_config.ack_topic) + logger.info("Data file=%s device_code=%s tdengine_enabled=%s", deps.data_file, tms_config.device_code, tdengine_writer.enabled) if tdengine_writer.enabled: - print(f" tdengine host: {tdengine_config.base_url}") - print(f" tdengine database: {tdengine_config.database}") - print(f" tdengine stable: {tdengine_config.stable}") - print(f" tdengine pool-size: {tdengine_config.pool_size}") - print(f" mode: {args.mode}") + logger.info("TDengine host=%s database=%s stable=%s pool_size=%s", tdengine_config.base_url, tdengine_config.database, tdengine_config.stable, tdengine_config.pool_size) client = mqtt.Client(client_id=mqtt_config.mock_client_id, clean_session=True) if mqtt_config.username is not None: client.username_pw_set(mqtt_config.username, mqtt_config.password) - if scheme in ("ssl", "tls", "mqtts"): client.tls_set() def on_connect(c, userdata, flags, rc): if rc == 0: - print("Connected") + logger.info("Connected") else: - print(f"Connect failed rc={rc}") + logger.error("Connect failed rc=%s", rc) return if args.mode in ("listen", "both") and mqtt_config.pub_topic: c.subscribe(mqtt_config.pub_topic) - print(f"Subscribed: {mqtt_config.pub_topic}") + logger.info("Subscribed %s", mqtt_config.pub_topic) def on_disconnect(c, userdata, rc): - print(f"Disconnected callback rc={rc}") + logger.info("Disconnected callback rc=%s", rc) def on_message(c, userdata, msg): payload = msg.payload.decode("utf-8", errors="replace") - print(f"RX {msg.topic}: {payload}") + logger.info("RX topic=%s bytes=%s", msg.topic, len(msg.payload)) if msg.topic != mqtt_config.pub_topic: return - if deps.data_file: write_message(deps.data_file, msg.topic, payload) - print(f"Wrote to file: {deps.data_file}") - + logger.info("Wrote file %s", deps.data_file) if tdengine_writer.enabled: try: tdengine_writer.write_payload(parse_payload(payload)) - print("Wrote to TDengine") - except Exception as exc: - print(f"Write TDengine failed: {exc}") - + logger.info("Wrote TDengine") + except Exception: + logger.exception("Write TDengine failed") if mqtt_config.sub_topic: c.publish(mqtt_config.sub_topic, payload) - print(f"Forwarded to {mqtt_config.sub_topic}") - + logger.info("Forwarded %s", mqtt_config.sub_topic) if mqtt_config.ack_topic: ack_payload = build_ack_payload(msg.topic, payload) c.publish(mqtt_config.ack_topic, json.dumps(ack_payload, ensure_ascii=True)) - print(f"TX {mqtt_config.ack_topic}: {ack_payload}") + logger.info("TX ack %s", mqtt_config.ack_topic) client.on_connect = on_connect client.on_disconnect = on_disconnect @@ -217,14 +148,14 @@ def run_mock_service(args, deps): try: if args.mode in ("publish", "both"): if not mqtt_config.pub_topic: - print("pub-topic is empty; nothing to publish") + logger.warning("pub-topic is empty; nothing to publish") else: seq = 0 while True: seq += 1 payload = build_server_payload(tms_config.device_code) client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True)) - print(f"TX {mqtt_config.pub_topic}: {payload}") + logger.info("TX %s #%s", mqtt_config.pub_topic, seq) if args.count and seq >= args.count: break time.sleep(args.interval) @@ -232,21 +163,25 @@ def run_mock_service(args, deps): while True: time.sleep(1) except KeyboardInterrupt: - pass + logger.info("Mock interrupted") finally: client.loop_stop() client.disconnect() - print("Disconnected") + logger.info("Mock stopped") -def main(): - ap = argparse.ArgumentParser(description="MQTT mock service") - ap.add_argument("--config", default="config.yaml", help="Path to config yaml") - ap.add_argument("--mode", choices=["publish", "listen", "both"], default="listen") - ap.add_argument("--interval", type=float, default=2.0, help="Publish interval (seconds)") - ap.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)") - ap.add_argument("--data-file", default="", help="Override data-file in config") - args = ap.parse_args() +def add_arguments(parser): + parser.add_argument("--config", default="config.yaml", help="Path to config yaml") + parser.add_argument("--mode", choices=["publish", "listen", "both"], default="listen") + parser.add_argument("--interval", type=float, default=2.0, help="Publish interval (seconds)") + parser.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)") + parser.add_argument("--data-file", default="", help="Override data-file in config") + + +def main(argv=None): + parser = argparse.ArgumentParser(description="MQTT mock service") + add_arguments(parser) + args = parser.parse_args(argv) deps = build_mock_dependencies(args.config, data_file_override=args.data_file) run_mock_service(args, deps) diff --git a/app/mqtt_sender.py b/app/mqtt_sender.py index b500ea4..a939be0 100644 --- a/app/mqtt_sender.py +++ b/app/mqtt_sender.py @@ -1,63 +1,17 @@ import argparse import json +import logging import random import time -from datetime import datetime from urllib.parse import urlparse import paho.mqtt.client as mqtt -from config.config import build_sender_dependencies +from config import build_sender_dependencies +from model import DrillingRealtimeData -DATA_KEYS = [ - "ts", - "wellid", - "stknum", - "recid", - "seqid", - "actual_date", - "actual_time", - "actcod", - "deptbitm", - "deptbitv", - "deptmeas", - "deptvert", - "blkpos", - "ropa", - "hkla", - "hklx", - "woba", - "wobx", - "torqa", - "torqx", - "rpma", - "sppa", - "chkp", - "spm1", - "spm2", - "spm3", - "tvolact", - "tvolcact", - "mfop", - "mfoa", - "mfia", - "mdoa", - "mdia", - "mtoa", - "mtia", - "mcoa", - "mcia", - "stkc", - "lagstks", - "deptretm", - "gasa", - "space1", - "space2", - "space3", - "space4", - "space5", -] +logger = logging.getLogger(__name__) def parse_broker(broker): @@ -76,61 +30,61 @@ def rand_int(a, b): return random.randint(a, b) +def rand_float(a, b, digits=2): + return round(random.uniform(a, b), digits) + + def build_random_payload(equipment_code): - data = {key: 0 for key in DATA_KEYS} - data["ts"] = int(time.time() * 1000) - data["wellid"] = random.choice(["", f"WELL-{rand_int(1, 9999):04d}"]) - data["stknum"] = rand_int(0, 500) - data["recid"] = rand_int(0, 100000) - data["seqid"] = rand_int(0, 100000) - data["actual_date"] = int(datetime.utcnow().strftime("%Y%m%d")) - data["actual_time"] = int(datetime.utcnow().strftime("%H%M%S")) - data["actcod"] = rand_int(0, 9) - data["deptbitm"] = rand_int(0, 5000) - data["deptbitv"] = rand_int(0, 5000) - data["deptmeas"] = rand_int(0, 5000) - data["deptvert"] = rand_int(0, 5000) - data["blkpos"] = rand_int(0, 100) - data["ropa"] = rand_int(0, 200) - data["hkla"] = rand_int(0, 500) - data["hklx"] = rand_int(0, 500) - data["woba"] = rand_int(0, 200) - data["wobx"] = rand_int(0, 200) - data["torqa"] = rand_int(0, 200) - data["torqx"] = rand_int(0, 200) - data["rpma"] = rand_int(0, 300) - data["sppa"] = rand_int(0, 5000) - data["chkp"] = rand_int(0, 5000) - data["spm1"] = rand_int(0, 200) - data["spm2"] = rand_int(0, 200) - data["spm3"] = rand_int(0, 200) - data["tvolact"] = rand_int(0, 20000) - data["tvolcact"] = rand_int(0, 20000) - data["mfop"] = rand_int(0, 1000) - data["mfoa"] = rand_int(0, 1000) - data["mfia"] = rand_int(0, 1000) - data["mdoa"] = rand_int(0, 1000) - data["mdia"] = rand_int(0, 1000) - data["mtoa"] = rand_int(0, 1000) - data["mtia"] = rand_int(0, 1000) - data["mcoa"] = rand_int(0, 1000) - data["mcia"] = rand_int(0, 1000) - data["stkc"] = rand_int(0, 200) - data["lagstks"] = rand_int(0, 200) - data["deptretm"] = rand_int(0, 5000) - data["gasa"] = rand_int(0, 100) - data["space1"] = rand_int(0, 10) - data["space2"] = rand_int(0, 10) - data["space3"] = rand_int(0, 10) - data["space4"] = rand_int(0, 10) - data["space5"] = rand_int(0, 10) - return { - "meta": { - "equipment_code": equipment_code, - "equipment_sn": equipment_code, - }, - "data": data, - } + entity = DrillingRealtimeData.empty(wellid=equipment_code) + entity = DrillingRealtimeData( + ts=entity.ts, + wellid=entity.wellid, + stknum=rand_int(0, 500), + recid=rand_int(0, 100000), + seqid=rand_int(0, 100000), + actual_date=entity.actual_date, + actual_time=entity.actual_time, + actcod=rand_int(0, 9), + deptbitm=rand_float(0, 5000), + deptbitv=rand_float(0, 5000), + deptmeas=rand_float(0, 5000), + deptvert=rand_float(0, 5000), + blkpos=rand_float(0, 100), + ropa=rand_float(0, 200), + hkla=rand_float(0, 500), + hklx=rand_float(0, 500), + woba=rand_float(0, 200), + wobx=rand_float(0, 200), + torqa=rand_float(0, 200), + torqx=rand_float(0, 200), + rpma=rand_int(0, 300), + sppa=rand_float(0, 5000), + chkp=rand_float(0, 5000), + spm1=rand_int(0, 200), + spm2=rand_int(0, 200), + spm3=rand_int(0, 200), + tvolact=rand_float(0, 20000), + tvolcact=rand_float(0, 20000), + mfop=rand_int(0, 1000), + mfoa=rand_float(0, 1000), + mfia=rand_float(0, 1000), + mdoa=rand_float(0, 1000), + mdia=rand_float(0, 1000), + mtoa=rand_float(0, 1000), + mtia=rand_float(0, 1000), + mcoa=rand_float(0, 1000), + mcia=rand_float(0, 1000), + stkc=rand_int(0, 200), + lagstks=rand_int(0, 200), + deptretm=rand_float(0, 5000), + gasa=rand_float(0, 100), + space1=rand_float(0, 10), + space2=rand_float(0, 10), + space3=rand_float(0, 10), + space4=rand_float(0, 10), + space5=rand_float(0, 10), + ) + return entity.to_payload(equipment_code) def run_sender(args, deps): @@ -138,21 +92,16 @@ def run_sender(args, deps): tms_config = deps.config.tms scheme, host, port = parse_broker(mqtt_config.broker) - print("MQTT sender config:") - print(f" broker: {scheme}://{host}:{port}") - print(f" client-id: {mqtt_config.sender_client_id}") - print(f" pub-topic: {mqtt_config.pub_topic}") - print(f" interval: {args.interval}s") + logger.info("MQTT sender config broker=%s://%s:%s client_id=%s pub_topic=%s interval=%ss", scheme, host, port, mqtt_config.sender_client_id, mqtt_config.pub_topic, args.interval) client = mqtt.Client(client_id=mqtt_config.sender_client_id, clean_session=True) if mqtt_config.username is not None: client.username_pw_set(mqtt_config.username, mqtt_config.password) - if scheme in ("ssl", "tls", "mqtts"): client.tls_set() def on_disconnect(c, userdata, rc): - print(f"Disconnected callback rc={rc}") + logger.info("Disconnected callback rc=%s", rc) client.on_disconnect = on_disconnect client.connect(host, port, keepalive=tms_config.keepalive) @@ -160,31 +109,35 @@ def run_sender(args, deps): try: if not mqtt_config.pub_topic: - print("pub-topic is empty; nothing to publish") + logger.warning("pub-topic is empty; nothing to publish") return seq = 0 while True: seq += 1 payload = build_random_payload(tms_config.device_code) client.publish(mqtt_config.pub_topic, json.dumps(payload, ensure_ascii=True)) - print(f"TX {mqtt_config.pub_topic}: {payload}") + logger.info("TX %s #%s", mqtt_config.pub_topic, seq) if args.count and seq >= args.count: break time.sleep(args.interval) except KeyboardInterrupt: - pass + logger.info("Sender interrupted") finally: client.loop_stop() client.disconnect() - print("Disconnected") + logger.info("Sender stopped") -def main(): - ap = argparse.ArgumentParser(description="MQTT random data sender") - ap.add_argument("--config", default="config.yaml", help="Path to config yaml") - ap.add_argument("--interval", type=float, default=3.0, help="Publish interval (seconds)") - ap.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)") - args = ap.parse_args() +def add_arguments(parser): + parser.add_argument("--config", default="config.yaml", help="Path to config yaml") + parser.add_argument("--interval", type=float, default=3.0, help="Publish interval (seconds)") + parser.add_argument("--count", type=int, default=0, help="Publish count (0 = forever)") + + +def main(argv=None): + parser = argparse.ArgumentParser(description="MQTT random data sender") + add_arguments(parser) + args = parser.parse_args(argv) deps = build_sender_dependencies(args.config) run_sender(args, deps) diff --git a/app/mqtt_subscriber.py b/app/mqtt_subscriber.py index 251d725..46d7cf0 100644 --- a/app/mqtt_subscriber.py +++ b/app/mqtt_subscriber.py @@ -1,12 +1,16 @@ import argparse import json +import logging import time from datetime import datetime from urllib.parse import urlparse import paho.mqtt.client as mqtt -from config.config import build_subscriber_dependencies +from config import build_subscriber_dependencies + + +logger = logging.getLogger(__name__) def parse_broker(broker): @@ -22,12 +26,12 @@ def parse_broker(broker): def print_flat_fields(title, value): - print(title) + logger.info(title) if isinstance(value, dict): - for k, v in value.items(): - print(f" {k}: {v}") + for key, val in value.items(): + logger.info(" %s: %s", key, val) else: - print(f" {value}") + logger.info(" %s", value) def run_subscriber(args, deps): @@ -38,49 +42,36 @@ def run_subscriber(args, deps): raise ValueError("No topic to subscribe. Set sub-topic or pub-topic, or pass --topic") scheme, host, port = parse_broker(mqtt_config.broker) - - print("MQTT subscriber config:") - print(f" broker: {scheme}://{host}:{port}") - print(f" client-id: {mqtt_config.subscriber_client_id}") - print(f" topic: {topic}") + logger.info("MQTT subscriber config broker=%s://%s:%s client_id=%s topic=%s", scheme, host, port, mqtt_config.subscriber_client_id, topic) client = mqtt.Client(client_id=mqtt_config.subscriber_client_id, clean_session=True) if mqtt_config.username is not None: client.username_pw_set(mqtt_config.username, mqtt_config.password) - if scheme in ("ssl", "tls", "mqtts"): client.tls_set() def on_connect(c, userdata, flags, rc): if rc == 0: - print("Connected") + logger.info("Connected") c.subscribe(topic) - print(f"Subscribed: {topic}") + logger.info("Subscribed %s", topic) else: - print(f"Connect failed rc={rc}") + logger.error("Connect failed rc=%s", rc) def on_disconnect(c, userdata, rc): - print(f"Disconnected callback rc={rc}") + logger.info("Disconnected callback rc=%s", rc) def on_message(c, userdata, msg): raw_payload = msg.payload.decode("utf-8", errors="replace") - print("=" * 80) - print(f"RX Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print(f"Topic: {msg.topic}") - print(f"QoS: {msg.qos}, Retain: {msg.retain}, Bytes: {len(msg.payload)}") + logger.info("RX time=%s topic=%s qos=%s retain=%s bytes=%s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'), msg.topic, msg.qos, msg.retain, len(msg.payload)) try: obj = json.loads(raw_payload) - print("Raw JSON:") - print(json.dumps(obj, ensure_ascii=False, indent=2)) + logger.info("Raw JSON: %s", json.dumps(obj, ensure_ascii=False)) if isinstance(obj, dict): print_flat_fields("meta:", obj.get("meta")) print_flat_fields("data:", obj.get("data")) - extra_keys = [key for key in obj.keys() if key not in ("meta", "data")] - for key in extra_keys: - print_flat_fields(f"{key}:", obj.get(key)) except Exception: - print("Raw payload:") - print(raw_payload) + logger.info("Raw payload: %s", raw_payload) client.on_connect = on_connect client.on_disconnect = on_disconnect @@ -92,18 +83,22 @@ def run_subscriber(args, deps): while True: time.sleep(1) except KeyboardInterrupt: - pass + logger.info("Subscriber interrupted") finally: client.loop_stop() client.disconnect() - print("Disconnected") + logger.info("Subscriber stopped") -def main(): - ap = argparse.ArgumentParser(description="MQTT subscriber") - ap.add_argument("--config", default="config.yaml", help="Path to config yaml") - ap.add_argument("--topic", default="", help="Override topic to subscribe") - args = ap.parse_args() +def add_arguments(parser): + parser.add_argument("--config", default="config.yaml", help="Path to config yaml") + parser.add_argument("--topic", default="", help="Override topic to subscribe") + + +def main(argv=None): + parser = argparse.ArgumentParser(description="MQTT subscriber") + add_arguments(parser) + args = parser.parse_args(argv) deps = build_subscriber_dependencies(args.config) run_subscriber(args, deps) diff --git a/app/wits_sender.py b/app/wits_sender.py index 3924bbe..37b0db9 100644 --- a/app/wits_sender.py +++ b/app/wits_sender.py @@ -1,119 +1,20 @@ import argparse +import logging import random import socket import time -from dataclasses import dataclass from datetime import datetime from pathlib import Path -from config.dependencies import build_wits_sender_dependencies +from config import build_wits_sender_dependencies +from model import WITS_FIELD_MAPPING, WitsData +logger = logging.getLogger(__name__) BEGIN_MARK = "&&\r\n" END_MARK = "!!\r\n" -@dataclass(frozen=True) -class WitsData: - ts: int - wellid: str - stknum: int - recid: int - seqid: int - actual_date: float - actual_time: float - actual_ts: int - actcod: int - actod_label: str - deptbitm: float - deptbitv: float - deptmeas: float - deptvert: float - blkpos: float - ropa: float - hkla: float - hklx: float - woba: float - wobx: float - torqa: float - torqx: float - rpma: int - sppa: float - chkp: float - spm1: int - spm2: int - spm3: int - tvolact: float - tvolcact: float - mfop: int - mfoa: float - mfia: float - mdoa: float - mdia: float - mtoa: float - mtia: float - mcoa: float - mcia: float - stkc: int - lagstks: int - deptretm: float - gasa: float - space1: float - space2: float - space3: float - space4: float - space5: float - - -WITS_FIELD_MAPPING = [ - (1, "wellid", "string"), - (2, "stknum", "int"), - (3, "recid", "int"), - (4, "seqid", "int"), - (5, "actual_date", "float"), - (6, "actual_time", "float"), - (7, "actcod", "int"), - (8, "deptbitm", "float"), - (9, "deptbitv", "float"), - (10, "deptmeas", "float"), - (11, "deptvert", "float"), - (12, "blkpos", "float"), - (13, "ropa", "float"), - (14, "hkla", "float"), - (15, "hklx", "float"), - (16, "woba", "float"), - (17, "wobx", "float"), - (18, "torqa", "float"), - (19, "torqx", "float"), - (20, "rpma", "int"), - (21, "sppa", "float"), - (22, "chkp", "float"), - (23, "spm1", "int"), - (24, "spm2", "int"), - (25, "spm3", "int"), - (26, "tvolact", "float"), - (27, "tvolcact", "float"), - (28, "mfop", "int"), - (29, "mfoa", "float"), - (30, "mfia", "float"), - (31, "mdoa", "float"), - (32, "mdia", "float"), - (33, "mtoa", "float"), - (34, "mtia", "float"), - (35, "mcoa", "float"), - (36, "mcia", "float"), - (37, "stkc", "int"), - (38, "lagstks", "int"), - (39, "deptretm", "float"), - (40, "gasa", "float"), - (41, "space1", "float"), - (42, "space2", "float"), - (43, "space3", "float"), - (44, "space4", "float"), - (45, "space5", "float"), -] - - def rand_int(a, b): return random.randint(a, b) @@ -219,17 +120,10 @@ def run_wits_sender(args, deps): host = args.host or wits_config.host port = args.port or wits_config.port timeout = args.timeout or wits_config.timeout - if not host or not port: raise ValueError("WITS target host/port is empty. Configure wits.host/wits.port or tms.server-ip/tms.server-port") - print("WITS sender config:") - print(f" host: {host}") - print(f" port: {port}") - print(f" timeout: {timeout}s") - print(f" source-file: {source_file or '(generated)'}") - print(f" interval: {args.interval}s") - print(f" count: {args.count or 'forever'}") + logger.info("WITS sender config host=%s port=%s timeout=%ss source_file=%s interval=%ss count=%s", host, port, timeout, source_file or "(generated)", args.interval, args.count or "forever") seq = 0 try: @@ -240,29 +134,33 @@ def run_wits_sender(args, deps): else: packet = build_wits_packet(build_random_wits_data(device_code)) send_packet(host, port, timeout, packet) - print(f"TX WITS #{seq} -> {host}:{port}") - print(packet) + logger.info("TX WITS #%s -> %s:%s", seq, host, port) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("WITS packet:\n%s", packet) if args.count and seq >= args.count: break time.sleep(args.interval) except KeyboardInterrupt: - pass + logger.info("WITS sender interrupted") -def main(): - ap = argparse.ArgumentParser(description="WITS TCP sender") - ap.add_argument("--config", default="config.yaml", help="Path to config yaml") - ap.add_argument("--host", default="", help="Override target host") - ap.add_argument("--port", type=int, default=0, help="Override target port") - ap.add_argument("--timeout", type=int, default=0, help="Override socket timeout") - ap.add_argument("--source-file", default="", help="Send raw WITS packet from file") - ap.add_argument("--interval", type=float, default=3.0, help="Send interval in seconds") - ap.add_argument("--count", type=int, default=1, help="Send count (0 = forever)") - args = ap.parse_args() +def add_arguments(parser): + parser.add_argument("--config", default="config.yaml", help="Path to config yaml") + parser.add_argument("--host", default="", help="Override target host") + parser.add_argument("--port", type=int, default=0, help="Override target port") + parser.add_argument("--timeout", type=int, default=0, help="Override socket timeout") + parser.add_argument("--source-file", default="", help="Send raw WITS packet from file") + parser.add_argument("--interval", type=float, default=3.0, help="Send interval in seconds") + parser.add_argument("--count", type=int, default=1, help="Send count (0 = forever)") + + +def main(argv=None): + parser = argparse.ArgumentParser(description="WITS TCP sender") + add_arguments(parser) + args = parser.parse_args(argv) deps = build_wits_sender_dependencies(args.config) run_wits_sender(args, deps) if __name__ == "__main__": main() - diff --git a/config/__init__.py b/config/__init__.py index 8481632..e7aa00c 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,129 +1,24 @@ -from dataclasses import dataclass - -import yaml - - -@dataclass(frozen=True) -class MqttConfig: - broker: str - client_id: str - mock_client_id: str - sender_client_id: str - subscriber_client_id: str - username: str | None - password: str | None - pub_topic: str | None - sub_topic: str | None - ack_topic: str | None - data_file: str - - -@dataclass(frozen=True) -class TmsConfig: - device_code: str - equipment_sn: str - timeout: int - keepalive: int - server_ip: str | None - server_port: int | None - - -@dataclass(frozen=True) -class WitsConfig: - host: str - port: int - timeout: int - source_file: str - - -@dataclass(frozen=True) -class AppConfig: - mqtt: MqttConfig - tms: TmsConfig - wits: WitsConfig - raw: dict - - -def load_raw_config(path): - with open(path, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} - - -def get_value(cfg, *paths, default=None): - for path in paths: - current = cfg - found = True - for key in path: - if not isinstance(current, dict) or key not in current: - found = False - break - current = current[key] - if found and current is not None: - return current - return default - - -def load_app_config(path): - raw = load_raw_config(path) - base_client_id = get_value(raw, ("mqtt", "client-id"), ("client-id",), default="mqtt") - device_code = get_value( - raw, - ("tms", "device-code"), - ("tms", "equipment-sn"), - ("device-code",), - ("equipment-sn",), - default="GJ-304-0088", - ) - equipment_sn = get_value( - raw, - ("tms", "equipment-sn"), - ("tms", "device-code"), - ("equipment-sn",), - ("device-code",), - default=device_code, - ) - - return AppConfig( - mqtt=MqttConfig( - broker=get_value(raw, ("mqtt", "broker"), ("broker",), default=""), - client_id=base_client_id, - mock_client_id=get_value(raw, ("mqtt", "mock-client-id"), ("mock-client-id",), default=f"{base_client_id}-mock"), - sender_client_id=get_value(raw, ("mqtt", "sender-client-id"), ("sender-client-id",), default=f"{base_client_id}-sender"), - subscriber_client_id=get_value(raw, ("mqtt", "subscriber-client-id"), ("subscriber-client-id",), default=f"{base_client_id}-subscriber"), - username=get_value(raw, ("mqtt", "username"), ("username",)), - password=get_value(raw, ("mqtt", "password"), ("password",)), - pub_topic=get_value(raw, ("mqtt", "pub-topic"), ("pub-topic",)), - sub_topic=get_value(raw, ("mqtt", "sub-topic"), ("sub-topic",)), - ack_topic=get_value(raw, ("mqtt", "ack-topic"), ("ack-topic",)), - data_file=get_value(raw, ("mqtt", "data-file"), ("data-file",), default=""), - ), - tms=TmsConfig( - device_code=device_code, - equipment_sn=equipment_sn, - timeout=int(get_value(raw, ("tms", "timeout"), ("timeout",), default=10)), - keepalive=int(get_value(raw, ("tms", "keepalive"), ("keepalive",), default=20)), - server_ip=get_value(raw, ("tms", "server-ip"), ("server-ip",)), - server_port=get_value(raw, ("tms", "server-port"), ("server-port",)), - ), - wits=WitsConfig( - host=get_value(raw, ("wits", "host"), ("tms", "server-ip"), ("server-ip",), default=""), - port=int(get_value(raw, ("wits", "port"), ("tms", "server-port"), ("server-port",), default=0)), - timeout=int(get_value(raw, ("wits", "timeout"), ("tms", "timeout"), ("timeout",), default=10)), - source_file=get_value(raw, ("wits", "source-file"), default=""), - ), - raw=raw, - ) - - -load_config = load_app_config +from config.config import get_value, load +from config.dependencies import ( + MockDependencies, + SenderDependencies, + SubscriberDependencies, + WitsSenderDependencies, + build_mock_dependencies, + build_sender_dependencies, + build_subscriber_dependencies, + build_wits_sender_dependencies, +) __all__ = [ - "AppConfig", - "MqttConfig", - "TmsConfig", - "WitsConfig", + "MockDependencies", + "SenderDependencies", + "SubscriberDependencies", + "WitsSenderDependencies", + "build_mock_dependencies", + "build_sender_dependencies", + "build_subscriber_dependencies", + "build_wits_sender_dependencies", "get_value", - "load_app_config", - "load_config", - "load_raw_config", + "load", ] diff --git a/config/config.py b/config/config.py index b588729..77bfca3 100644 --- a/config/config.py +++ b/config/config.py @@ -1,18 +1,83 @@ import yaml -from config.model import * +from model import AppConfig, MqttConfig, TdengineConfig, TmsConfig, WitsConfig -def load(path: str) -> "Config": +def get_value(cfg, *paths, default=None): + for path in paths: + current = cfg + found = True + for key in path: + if not isinstance(current, dict) or key not in current: + found = False + break + current = current[key] + if found and current is not None: + return current + return default + + +def load(path: str) -> AppConfig: with open(path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) + raw = yaml.safe_load(f) or {} - mqtt_cfg = MqttConfig(**data["mqtt"]) - tms_cfg = TmsConfig(**data["tms"]) - tdengine_cfg = TdengineConfig(**data["tdengine"]) - - return Config( - mqtt=mqtt_cfg, - tms=tms_cfg, - tdengine=tdengine_cfg + base_client_id = get_value(raw, ("mqtt", "client-id"), ("client-id",), default="mqtt") + device_code = get_value( + raw, + ("tms", "device-code"), + ("tms", "equipment-sn"), + ("device-code",), + ("equipment-sn",), + default="GJ-304-0088", + ) + equipment_sn = get_value( + raw, + ("tms", "equipment-sn"), + ("tms", "device-code"), + ("equipment-sn",), + ("device-code",), + default=device_code, + ) + td_url = get_value(raw, ("tdengine", "url"), ("tdengine-url",), default="") + td_database = get_value(raw, ("tdengine", "database"), ("tdengine-database",), default="") + + return AppConfig( + mqtt=MqttConfig( + broker=get_value(raw, ("mqtt", "broker"), ("broker",), default=""), + client_id=base_client_id, + mock_client_id=get_value(raw, ("mqtt", "mock-client-id"), ("mock-client-id",), default=f"{base_client_id}-mock"), + sender_client_id=get_value(raw, ("mqtt", "sender-client-id"), ("sender-client-id",), default=f"{base_client_id}-sender"), + subscriber_client_id=get_value(raw, ("mqtt", "subscriber-client-id"), ("subscriber-client-id",), default=f"{base_client_id}-subscriber"), + username=get_value(raw, ("mqtt", "username"), ("username",)), + password=get_value(raw, ("mqtt", "password"), ("password",)), + pub_topic=get_value(raw, ("mqtt", "pub-topic"), ("pub-topic",)), + sub_topic=get_value(raw, ("mqtt", "sub-topic"), ("sub-topic",)), + ack_topic=get_value(raw, ("mqtt", "ack-topic"), ("ack-topic",)), + data_file=get_value(raw, ("mqtt", "data-file"), ("data-file",), default=""), + ), + tms=TmsConfig( + device_code=device_code, + equipment_sn=equipment_sn, + timeout=int(get_value(raw, ("tms", "timeout"), ("timeout",), default=10)), + keepalive=int(get_value(raw, ("tms", "keepalive"), ("keepalive",), default=20)), + server_ip=get_value(raw, ("tms", "server-ip"), ("server-ip",)), + server_port=get_value(raw, ("tms", "server-port"), ("server-port",)), + ), + wits=WitsConfig( + host=get_value(raw, ("wits", "host"), ("tms", "server-ip"), ("server-ip",), default=""), + port=int(get_value(raw, ("wits", "port"), ("tms", "server-port"), ("server-port",), default=0)), + timeout=int(get_value(raw, ("wits", "timeout"), ("tms", "timeout"), ("timeout",), default=10)), + source_file=get_value(raw, ("wits", "source-file"), default=""), + ), + tdengine=TdengineConfig( + url=td_url, + username=get_value(raw, ("tdengine", "username"), ("tdengine-username",), default=""), + password=get_value(raw, ("tdengine", "password"), ("tdengine-password",), default=""), + database=td_database, + stable=get_value(raw, ("tdengine", "stable"), ("tdengine-stable",), default="drilling_realtime_st"), + device_code=get_value(raw, ("tdengine", "device-code"), ("tdengine", "equipment-sn"), default=device_code), + pool_size=int(get_value(raw, ("tdengine", "pool-size"), ("tdengine-pool-size",), default=2)), + timeout=int(get_value(raw, ("tdengine", "timeout"), ("tdengine-timeout",), default=10)), + ), + raw=raw, ) diff --git a/config/dependencies.py b/config/dependencies.py new file mode 100644 index 0000000..5e660ed --- /dev/null +++ b/config/dependencies.py @@ -0,0 +1,51 @@ +from dataclasses import dataclass + +from config.config import load +from db import TDengineWriter, load_tdengine_config +from model import AppConfig + + +@dataclass(frozen=True) +class SenderDependencies: + config: AppConfig + + +@dataclass(frozen=True) +class SubscriberDependencies: + config: AppConfig + + +@dataclass(frozen=True) +class WitsSenderDependencies: + config: AppConfig + + +@dataclass(frozen=True) +class MockDependencies: + config: AppConfig + tdengine_config: object + tdengine_writer: object + data_file: str + + +def build_sender_dependencies(config_path): + return SenderDependencies(config=load(config_path)) + + +def build_subscriber_dependencies(config_path): + return SubscriberDependencies(config=load(config_path)) + + +def build_wits_sender_dependencies(config_path): + return WitsSenderDependencies(config=load(config_path)) + + +def build_mock_dependencies(config_path, data_file_override=""): + app_config = load(config_path) + tdengine_config = load_tdengine_config(app_config, default_device_code=app_config.tms.device_code) + return MockDependencies( + config=app_config, + tdengine_config=tdengine_config, + tdengine_writer=TDengineWriter(tdengine_config), + data_file=data_file_override or app_config.mqtt.data_file, + ) diff --git a/config/model.py b/config/model.py index 2a67620..32e785e 100644 --- a/config/model.py +++ b/config/model.py @@ -1,40 +1,12 @@ -from dataclasses import dataclass +from model import AppConfig, MqttConfig, TdengineConfig, TmsConfig, WitsConfig +Config = AppConfig -@dataclass -class MqttConfig: - broker: str - client_id: str - mock_client_id: str - sender_client_id: str - subscriber_client_id: str - username: str - password: str - pub_topic: str - - -@dataclass -class TmsConfig: - device_code: str - equipment_sn: str - timeout: int - keepalive: int - server_ip: str - server_port: int - - -@dataclass -class TdengineConfig: - url: str - username: str - password: str - database: str - stable: str - device_code: str - - -@dataclass -class Config: - mqtt: MqttConfig - tms: TmsConfig - tdengine: TdengineConfig +__all__ = [ + "Config", + "AppConfig", + "MqttConfig", + "TdengineConfig", + "TmsConfig", + "WitsConfig", +] diff --git a/db/config.py b/db/config.py new file mode 100644 index 0000000..456751c --- /dev/null +++ b/db/config.py @@ -0,0 +1,41 @@ +from urllib.parse import urlparse + +from model import TdengineConfig + + +def parse_taos_url(jdbc_url): + if not jdbc_url: + return "", "" + raw = str(jdbc_url).strip() + if raw.lower().startswith("jdbc:taos-rs://"): + raw = "http://" + raw[len("jdbc:TAOS-RS://") :] + elif "://" not in raw: + raw = "http://" + raw + parsed = urlparse(raw) + base_url = f"{parsed.scheme or 'http'}://{parsed.hostname or '127.0.0.1'}:{parsed.port or 6041}" + database = (parsed.path or "").strip("/") + return base_url, database + + +def load_tdengine_config(cfg_or_app, default_device_code="GJ-304-0088"): + config = getattr(cfg_or_app, "tdengine", None) + if isinstance(config, TdengineConfig): + url = config.url + _, db_from_url = parse_taos_url(url) + if config.database: + return config + return TdengineConfig( + url=config.url, + username=config.username, + password=config.password, + database=db_from_url, + stable=config.stable, + device_code=config.device_code or default_device_code, + pool_size=config.pool_size, + timeout=config.timeout, + ) + _, db_from_url = parse_taos_url("") + return TdengineConfig(database=db_from_url, device_code=default_device_code) + + +TDengineConfig = TdengineConfig diff --git a/db/orm.py b/db/orm.py index 7810fbc..2e35022 100644 --- a/db/orm.py +++ b/db/orm.py @@ -1,5 +1,6 @@ import re -import time + +from model import DrillingRealtimeData DB_COLUMNS = [ @@ -50,7 +51,6 @@ DB_COLUMNS = [ "space5", ] - INT_COLUMNS = {"stknum", "recid", "seqid", "actcod", "rpma", "spm1", "spm2", "spm3", "mfop", "stkc", "lagstks"} @@ -67,20 +67,6 @@ def sql_quote(value): return "'" + str(value).replace("'", "''") + "'" -def to_int(value, default=0): - try: - return int(value) - except Exception: - return default - - -def to_float(value, default=0.0): - try: - return float(value) - except Exception: - return default - - class DrillingRealtimeORM: def __init__(self, database, stable="drilling_realtime_st", default_device_code="GJ-304-0088"): self.database = database @@ -90,30 +76,23 @@ class DrillingRealtimeORM: def build_insert_sql(self, payload): if not isinstance(payload, dict): raise ValueError("payload is not JSON object") + entity = DrillingRealtimeData.from_payload(payload) meta = payload.get("meta") if isinstance(payload.get("meta"), dict) else {} - data = payload.get("data") if isinstance(payload.get("data"), dict) else {} - equipment_code = ( str(self.default_device_code).strip() or str(meta.get("equipment_code", "")).strip() or str(meta.get("equipment_sn", "")).strip() + or entity.wellid or "GJ-304-0088" ) - table_name = sanitize_identifier( - f"drilling_realtime_{equipment_code}", - "drilling_realtime_default", - ) - + table_name = sanitize_identifier(f"drilling_realtime_{equipment_code}", "drilling_realtime_default") values = [] - for col in DB_COLUMNS: - if col == "ts": - raw = data.get("ts", data.get("record_time", int(time.time() * 1000))) - values.append(str(to_int(raw, int(time.time() * 1000)))) - elif col in INT_COLUMNS: - values.append(str(to_int(data.get(col, 0), 0))) + for column in DB_COLUMNS: + raw = getattr(entity, column) + if column in INT_COLUMNS or column == "ts": + values.append(str(int(raw))) else: - values.append(str(to_float(data.get(col, 0), 0.0))) - + values.append(str(float(raw))) columns_sql = ", ".join([f"`{column}`" for column in DB_COLUMNS]) values_sql = ", ".join(values) return ( diff --git a/main.py b/main.py index 9620e74..943b697 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,55 @@ +import argparse import logging - -def main(): - logging.info("start app....") +from app import mqtt_mock, mqtt_sender, mqtt_subscriber, wits_sender +def configure_logging(level_name): + level = getattr(logging, str(level_name).upper(), logging.INFO) + logging.basicConfig( + level=level, + format="%(asctime)s %(levelname)s %(name)s - %(message)s", + ) -if __name__ == '__main__': + +def build_parser(): + parser = argparse.ArgumentParser(description="tdEngine mqtt/wits mock application") + parser.add_argument("--log-level", default="INFO", help="Logging level") + subparsers = parser.add_subparsers(dest="command", required=True) + + mqtt_mock_parser = subparsers.add_parser("mqtt-mock", help="Run MQTT mock service") + mqtt_mock.add_arguments(mqtt_mock_parser) + + mqtt_sender_parser = subparsers.add_parser("mqtt-sender", help="Run MQTT sender") + mqtt_sender.add_arguments(mqtt_sender_parser) + + mqtt_subscriber_parser = subparsers.add_parser("mqtt-subscriber", help="Run MQTT subscriber") + mqtt_subscriber.add_arguments(mqtt_subscriber_parser) + + wits_sender_parser = subparsers.add_parser("wits-sender", help="Run WITS sender") + wits_sender.add_arguments(wits_sender_parser) + return parser + + +def main(argv=None): + parser = build_parser() + args = parser.parse_args(argv) + configure_logging(args.log_level) + logging.getLogger(__name__).info("start app command=%s", args.command) + + if args.command == "mqtt-mock": + deps = mqtt_mock.build_mock_dependencies(args.config, data_file_override=args.data_file) + mqtt_mock.run_mock_service(args, deps) + elif args.command == "mqtt-sender": + deps = mqtt_sender.build_sender_dependencies(args.config) + mqtt_sender.run_sender(args, deps) + elif args.command == "mqtt-subscriber": + deps = mqtt_subscriber.build_subscriber_dependencies(args.config) + mqtt_subscriber.run_subscriber(args, deps) + elif args.command == "wits-sender": + deps = wits_sender.build_wits_sender_dependencies(args.config) + wits_sender.run_wits_sender(args, deps) + + +if __name__ == "__main__": main() diff --git a/model/__init__.py b/model/__init__.py index e69de29..5448725 100644 --- a/model/__init__.py +++ b/model/__init__.py @@ -0,0 +1,14 @@ +from model.config import AppConfig, MqttConfig, TdengineConfig, TmsConfig, WitsConfig +from model.drilling import DrillingRealtimeData +from model.wits import WITS_FIELD_MAPPING, WitsData + +__all__ = [ + "AppConfig", + "DrillingRealtimeData", + "MqttConfig", + "TdengineConfig", + "TmsConfig", + "WITS_FIELD_MAPPING", + "WitsConfig", + "WitsData", +] diff --git a/model/config.py b/model/config.py new file mode 100644 index 0000000..f82b947 --- /dev/null +++ b/model/config.py @@ -0,0 +1,72 @@ +from dataclasses import dataclass +from urllib.parse import urlparse + + +@dataclass(frozen=True) +class MqttConfig: + broker: str + client_id: str + mock_client_id: str + sender_client_id: str + subscriber_client_id: str + username: str | None + password: str | None + pub_topic: str | None + sub_topic: str | None + ack_topic: str | None + data_file: str + + +@dataclass(frozen=True) +class TmsConfig: + device_code: str + equipment_sn: str + timeout: int + keepalive: int + server_ip: str | None + server_port: int | None + + +@dataclass(frozen=True) +class WitsConfig: + host: str + port: int + timeout: int + source_file: str + + +@dataclass(frozen=True) +class TdengineConfig: + url: str = "" + username: str = "" + password: str = "" + database: str = "" + stable: str = "drilling_realtime_st" + device_code: str = "GJ-304-0088" + pool_size: int = 2 + timeout: int = 10 + + @property + def base_url(self): + if not self.url: + return "" + raw = str(self.url).strip() + if raw.lower().startswith("jdbc:taos-rs://"): + raw = "http://" + raw[len("jdbc:TAOS-RS://") :] + elif "://" not in raw: + raw = "http://" + raw + parsed = urlparse(raw) + return f"{parsed.scheme or 'http'}://{parsed.hostname or '127.0.0.1'}:{parsed.port or 6041}" + + @property + def enabled(self): + return bool(self.base_url and self.database and self.username) + + +@dataclass(frozen=True) +class AppConfig: + mqtt: MqttConfig + tms: TmsConfig + wits: WitsConfig + tdengine: TdengineConfig + raw: dict diff --git a/model/drilling.py b/model/drilling.py new file mode 100644 index 0000000..67ef8c8 --- /dev/null +++ b/model/drilling.py @@ -0,0 +1,176 @@ +import time +from dataclasses import dataclass +from datetime import datetime + + +DATA_KEYS = [ + "ts", + "wellid", + "stknum", + "recid", + "seqid", + "actual_date", + "actual_time", + "actcod", + "deptbitm", + "deptbitv", + "deptmeas", + "deptvert", + "blkpos", + "ropa", + "hkla", + "hklx", + "woba", + "wobx", + "torqa", + "torqx", + "rpma", + "sppa", + "chkp", + "spm1", + "spm2", + "spm3", + "tvolact", + "tvolcact", + "mfop", + "mfoa", + "mfia", + "mdoa", + "mdia", + "mtoa", + "mtia", + "mcoa", + "mcia", + "stkc", + "lagstks", + "deptretm", + "gasa", + "space1", + "space2", + "space3", + "space4", + "space5", +] + + +@dataclass(frozen=True) +class DrillingRealtimeData: + ts: int + wellid: str + stknum: int + recid: int + seqid: int + actual_date: float + actual_time: float + actcod: int + deptbitm: float + deptbitv: float + deptmeas: float + deptvert: float + blkpos: float + ropa: float + hkla: float + hklx: float + woba: float + wobx: float + torqa: float + torqx: float + rpma: int + sppa: float + chkp: float + spm1: int + spm2: int + spm3: int + tvolact: float + tvolcact: float + mfop: int + mfoa: float + mfia: float + mdoa: float + mdia: float + mtoa: float + mtia: float + mcoa: float + mcia: float + stkc: int + lagstks: int + deptretm: float + gasa: float + space1: float + space2: float + space3: float + space4: float + space5: float + + @classmethod + def empty(cls, wellid=""): + now = datetime.utcnow() + return cls( + ts=int(time.time() * 1000), + wellid=wellid, + stknum=0, + recid=0, + seqid=0, + actual_date=float(now.strftime("%Y%m%d")), + actual_time=float(now.strftime("%H%M%S")), + actcod=0, + deptbitm=0.0, + deptbitv=0.0, + deptmeas=0.0, + deptvert=0.0, + blkpos=0.0, + ropa=0.0, + hkla=0.0, + hklx=0.0, + woba=0.0, + wobx=0.0, + torqa=0.0, + torqx=0.0, + rpma=0, + sppa=0.0, + chkp=0.0, + spm1=0, + spm2=0, + spm3=0, + tvolact=0.0, + tvolcact=0.0, + mfop=0, + mfoa=0.0, + mfia=0.0, + mdoa=0.0, + mdia=0.0, + mtoa=0.0, + mtia=0.0, + mcoa=0.0, + mcia=0.0, + stkc=0, + lagstks=0, + deptretm=0.0, + gasa=0.0, + space1=0.0, + space2=0.0, + space3=0.0, + space4=0.0, + space5=0.0, + ) + + @classmethod + def from_payload(cls, payload): + meta = payload.get("meta") if isinstance(payload, dict) and isinstance(payload.get("meta"), dict) else {} + data = payload.get("data") if isinstance(payload, dict) and isinstance(payload.get("data"), dict) else {} + values = {key: data.get(key, 0) for key in DATA_KEYS} + values["ts"] = int(data.get("ts", data.get("record_time", int(time.time() * 1000)))) + values["wellid"] = data.get("wellid") or meta.get("equipment_code") or meta.get("equipment_sn") or "" + return cls(**values) + + def to_payload(self, equipment_code): + return { + "meta": { + "equipment_code": equipment_code, + "equipment_sn": equipment_code, + }, + "data": { + key: getattr(self, key) + for key in DATA_KEYS + }, + } diff --git a/model/wits.py b/model/wits.py new file mode 100644 index 0000000..b37d694 --- /dev/null +++ b/model/wits.py @@ -0,0 +1,102 @@ +from dataclasses import dataclass + + +@dataclass(frozen=True) +class WitsData: + ts: int + wellid: str + stknum: int + recid: int + seqid: int + actual_date: float + actual_time: float + actual_ts: int + actcod: int + actod_label: str + deptbitm: float + deptbitv: float + deptmeas: float + deptvert: float + blkpos: float + ropa: float + hkla: float + hklx: float + woba: float + wobx: float + torqa: float + torqx: float + rpma: int + sppa: float + chkp: float + spm1: int + spm2: int + spm3: int + tvolact: float + tvolcact: float + mfop: int + mfoa: float + mfia: float + mdoa: float + mdia: float + mtoa: float + mtia: float + mcoa: float + mcia: float + stkc: int + lagstks: int + deptretm: float + gasa: float + space1: float + space2: float + space3: float + space4: float + space5: float + + +WITS_FIELD_MAPPING = [ + (1, "wellid", "string"), + (2, "stknum", "int"), + (3, "recid", "int"), + (4, "seqid", "int"), + (5, "actual_date", "float"), + (6, "actual_time", "float"), + (7, "actcod", "int"), + (8, "deptbitm", "float"), + (9, "deptbitv", "float"), + (10, "deptmeas", "float"), + (11, "deptvert", "float"), + (12, "blkpos", "float"), + (13, "ropa", "float"), + (14, "hkla", "float"), + (15, "hklx", "float"), + (16, "woba", "float"), + (17, "wobx", "float"), + (18, "torqa", "float"), + (19, "torqx", "float"), + (20, "rpma", "int"), + (21, "sppa", "float"), + (22, "chkp", "float"), + (23, "spm1", "int"), + (24, "spm2", "int"), + (25, "spm3", "int"), + (26, "tvolact", "float"), + (27, "tvolcact", "float"), + (28, "mfop", "int"), + (29, "mfoa", "float"), + (30, "mfia", "float"), + (31, "mdoa", "float"), + (32, "mdia", "float"), + (33, "mtoa", "float"), + (34, "mtia", "float"), + (35, "mcoa", "float"), + (36, "mcia", "float"), + (37, "stkc", "int"), + (38, "lagstks", "int"), + (39, "deptretm", "float"), + (40, "gasa", "float"), + (41, "space1", "float"), + (42, "space2", "float"), + (43, "space3", "float"), + (44, "space4", "float"), + (45, "space5", "float"), +] diff --git a/requirements.md b/requirements.md index 9b25aec..8239844 100644 --- a/requirements.md +++ b/requirements.md @@ -5,6 +5,7 @@ ### wits数据模拟 1. 读取config.yaml中的server-ip和端口将模拟的wits数据发送到这个地址。wits数据要尽可能的拟真。 +2. 发送间隔两秒一条。