From be6544fdfe705581a766cfdbced7dd4ee49aa5ca Mon Sep 17 00:00:00 2001 From: gourx Date: Fri, 13 Mar 2026 09:46:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9acturetime/data=20=E5=85=A5?= =?UTF-8?q?=E5=BA=93=E6=97=A0=E5=80=BC=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/mqtt_mock.py | 1 + db/orm.py | 48 +++++++++++++++++++++++++++------- model/drilling.py | 64 ++++++++++++++++++++++++++++++++++++++------- model/wits.py | 8 +++--- sql/createTable.sql | 4 +-- 5 files changed, 101 insertions(+), 24 deletions(-) diff --git a/app/mqtt_mock.py b/app/mqtt_mock.py index 55fa725..8c5cf55 100644 --- a/app/mqtt_mock.py +++ b/app/mqtt_mock.py @@ -128,6 +128,7 @@ def run_mock_service(args, deps): if tdengine_writer.enabled: try: tdengine_writer.write_payload(parse_payload(payload)) + logger.info("payload: %s", payload) logger.info("Wrote TDengine") except Exception: logger.exception("Write TDengine failed") diff --git a/db/orm.py b/db/orm.py index 89a59bf..5fa1050 100644 --- a/db/orm.py +++ b/db/orm.py @@ -1,5 +1,6 @@ -import logging +import logging import re +from decimal import Decimal, InvalidOperation from model import DrillingRealtimeData @@ -52,7 +53,21 @@ DB_COLUMNS = [ "space5", ] -INT_COLUMNS = {"stknum", "recid", "seqid", "actcod", "rpma", "spm1", "spm2", "spm3", "mfop", "stkc", "lagstks"} +INT_COLUMNS = { + "stknum", + "recid", + "seqid", + "actual_date", + "actual_time", + "actcod", + "rpma", + "spm1", + "spm2", + "spm3", + "mfop", + "stkc", + "lagstks", +} def sanitize_identifier(value, fallback): @@ -78,12 +93,12 @@ class DrillingRealtimeORM: if not isinstance(payload, dict): raise ValueError("payload is not JSON object") entity = DrillingRealtimeData.from_payload(payload) - meta = payload.get("meta") if isinstance(payload.get("meta"), dict) else {} + meta = get_dict_alias(payload, "meta", "mate") equipment_code = ( - str(self.default_device_code).strip() - or str(meta.get("equipment_code", "")).strip() - or str(meta.get("equipment_sn", "")).strip() + str(get_alias_value(meta, ("equipment_code", "equipmentCode"), "")).strip() + or str(get_alias_value(meta, ("equipment_sn", "equipmentSN"), "")).strip() or entity.wellid + or str(self.default_device_code).strip() or "GJ-304-0088" ) table_name = sanitize_identifier(f"drilling_realtime_{equipment_code}", "drilling_realtime_default") @@ -108,8 +123,8 @@ def to_int(value, default=0): try: if value is None or value == "": return default - return int(value) - except Exception: + return int(Decimal(str(value).strip())) + except (InvalidOperation, TypeError, ValueError): return default @@ -119,4 +134,19 @@ def to_float(value, default=0.0): return default return float(value) except Exception: - return default \ No newline at end of file + return default + + +def get_alias_value(source, keys, default=None): + if not isinstance(source, dict): + return default + for key in keys: + value = source.get(key) + if value is not None: + return value + return default + + +def get_dict_alias(source, *keys): + value = get_alias_value(source, keys, {}) + return value if isinstance(value, dict) else {} diff --git a/model/drilling.py b/model/drilling.py index 67ef8c8..41a7289 100644 --- a/model/drilling.py +++ b/model/drilling.py @@ -1,6 +1,7 @@ -import time +import time from dataclasses import dataclass from datetime import datetime +from decimal import Decimal, InvalidOperation DATA_KEYS = [ @@ -52,6 +53,17 @@ DATA_KEYS = [ "space5", ] +DATA_KEY_ALIASES = { + "actual_date": ("actual_date", "actualDate"), + "actual_time": ("actual_time", "actualTime"), + "wellid": ("wellid", "wellId"), +} + +META_KEY_ALIASES = { + "equipment_code": ("equipment_code", "equipmentCode"), + "equipment_sn": ("equipment_sn", "equipmentSN"), +} + @dataclass(frozen=True) class DrillingRealtimeData: @@ -60,8 +72,8 @@ class DrillingRealtimeData: stknum: int recid: int seqid: int - actual_date: float - actual_time: float + actual_date: int + actual_time: int actcod: int deptbitm: float deptbitv: float @@ -111,8 +123,8 @@ class DrillingRealtimeData: stknum=0, recid=0, seqid=0, - actual_date=float(now.strftime("%Y%m%d")), - actual_time=float(now.strftime("%H%M%S")), + actual_date=int(now.strftime("%Y%m%d")), + actual_time=int(now.strftime("%H%M%S")), actcod=0, deptbitm=0.0, deptbitv=0.0, @@ -156,11 +168,21 @@ class DrillingRealtimeData: @classmethod def from_payload(cls, payload): - meta = payload.get("meta") if isinstance(payload, dict) and isinstance(payload.get("meta"), dict) else {} + meta = get_dict_alias(payload, "meta", "mate") data = payload.get("data") if isinstance(payload, dict) and isinstance(payload.get("data"), dict) else {} - values = {key: data.get(key, 0) for key in DATA_KEYS} - values["ts"] = int(data.get("ts", data.get("record_time", int(time.time() * 1000)))) - values["wellid"] = data.get("wellid") or meta.get("equipment_code") or meta.get("equipment_sn") or "" + values = {key: get_alias_value(data, DATA_KEY_ALIASES.get(key, (key,)), 0) for key in DATA_KEYS} + values["ts"] = parse_int_value( + get_alias_value(data, ("ts", "record_time", "recordTime"), int(time.time() * 1000)), + int(time.time() * 1000), + ) + values["actual_date"] = parse_int_value(get_alias_value(data, DATA_KEY_ALIASES["actual_date"], 0)) + values["actual_time"] = parse_int_value(get_alias_value(data, DATA_KEY_ALIASES["actual_time"], 0)) + values["wellid"] = ( + get_alias_value(data, DATA_KEY_ALIASES["wellid"], "") + or get_alias_value(meta, META_KEY_ALIASES["equipment_code"], "") + or get_alias_value(meta, META_KEY_ALIASES["equipment_sn"], "") + or "" + ) return cls(**values) def to_payload(self, equipment_code): @@ -174,3 +196,27 @@ class DrillingRealtimeData: for key in DATA_KEYS }, } + + +def parse_int_value(value, default=0): + try: + if value is None or value == "": + return default + return int(Decimal(str(value).strip())) + except (InvalidOperation, TypeError, ValueError): + return default + + +def get_alias_value(source, keys, default=None): + if not isinstance(source, dict): + return default + for key in keys: + value = source.get(key) + if value is not None: + return value + return default + + +def get_dict_alias(source, *keys): + value = get_alias_value(source, keys, {}) + return value if isinstance(value, dict) else {} diff --git a/model/wits.py b/model/wits.py index 0aa5841..482e7b7 100644 --- a/model/wits.py +++ b/model/wits.py @@ -43,8 +43,8 @@ class WitsData: stknum: int recid: int seqid: int - actual_date: str - actual_time: str + actual_date: int + actual_time: int actual_ts: int actcod: int actod_label: str @@ -122,8 +122,8 @@ WITS_CHANNEL_MAPPING = [ ("0102", "stknum", "int"), ("0103", "recid", "int"), ("0104", "seqid", "int"), - ("0105", "actual_date", "string"), - ("0106", "actual_time", "string"), + ("0105", "actual_date", "int"), + ("0106", "actual_time", "int"), ("0107", "actcod", "int"), ("0108", "deptbitm", "float6"), ("0109", "deptbitv", "float6"), diff --git a/sql/createTable.sql b/sql/createTable.sql index de89186..83b58e0 100644 --- a/sql/createTable.sql +++ b/sql/createTable.sql @@ -5,8 +5,8 @@ CREATE STABLE drilling_realtime_st ( stknum INT, recid INT, seqid INT, - actual_date FLOAT, - actual_time FLOAT, + actual_date INT, + actual_time INT, actcod INT, deptbitm FLOAT, deptbitv FLOAT,