Files
tdEngine_mqtt_mock/db/orm.py
2026-03-13 09:46:16 +08:00

153 lines
3.6 KiB
Python

import logging
import re
from decimal import Decimal, InvalidOperation
from model import DrillingRealtimeData
DB_COLUMNS = [
"ts",
"stknum",
"recid",
"seqid",
"actual_date",
"actual_time",
"actcod",
"deptbitm",
"deptbitv",
"deptmeas",
"deptvert",
"blkpos",
"ropa",
"hkla",
"hklx",
"woba",
"wobx",
"torqa",
"torqx",
"rpma",
"sppa",
"chkp",
"spm1",
"spm2",
"spm3",
"tvolact",
"tvolcact",
"mfop",
"mfoa",
"mfia",
"mdoa",
"mdia",
"mtoa",
"mtia",
"mcoa",
"mcia",
"stkc",
"lagstks",
"deptretm",
"gasa",
"space1",
"space2",
"space3",
"space4",
"space5",
]
INT_COLUMNS = {
"stknum",
"recid",
"seqid",
"actual_date",
"actual_time",
"actcod",
"rpma",
"spm1",
"spm2",
"spm3",
"mfop",
"stkc",
"lagstks",
}
def sanitize_identifier(value, fallback):
cleaned = re.sub(r"[^A-Za-z0-9_]", "_", str(value or ""))
if not cleaned:
cleaned = fallback
if cleaned[0].isdigit():
cleaned = f"t_{cleaned}"
return cleaned.lower()
def sql_quote(value):
return "'" + str(value).replace("'", "''") + "'"
class DrillingRealtimeORM:
def __init__(self, database, stable="drilling_realtime_st", default_device_code="GJ-304-0088"):
self.database = database
self.stable = stable
self.default_device_code = default_device_code or "GJ-304-0088"
def build_insert_sql(self, payload):
if not isinstance(payload, dict):
raise ValueError("payload is not JSON object")
entity = DrillingRealtimeData.from_payload(payload)
meta = get_dict_alias(payload, "meta", "mate")
equipment_code = (
str(get_alias_value(meta, ("equipment_code", "equipmentCode"), "")).strip()
or str(get_alias_value(meta, ("equipment_sn", "equipmentSN"), "")).strip()
or entity.wellid
or str(self.default_device_code).strip()
or "GJ-304-0088"
)
table_name = sanitize_identifier(f"drilling_realtime_{equipment_code}", "drilling_realtime_default")
values = []
for column in DB_COLUMNS:
raw = getattr(entity, column, None)
if raw is None:
logging.debug("Column %s is None, default=0", column)
if column in INT_COLUMNS or column == "ts":
values.append(str(to_int(raw)))
else:
values.append(str(to_float(raw)))
columns_sql = ", ".join([f"`{column}`" for column in DB_COLUMNS])
values_sql = ", ".join(values)
return (
f"INSERT INTO `{self.database}`.`{table_name}` USING `{self.database}`.`{self.stable}` "
f"TAGS ({sql_quote(equipment_code)}) ({columns_sql}) VALUES ({values_sql})"
)
def to_int(value, default=0):
try:
if value is None or value == "":
return default
return int(Decimal(str(value).strip()))
except (InvalidOperation, TypeError, ValueError):
return default
def to_float(value, default=0.0):
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def get_alias_value(source, keys, default=None):
if not isinstance(source, dict):
return default
for key in keys:
value = source.get(key)
if value is not None:
return value
return default
def get_dict_alias(source, *keys):
value = get_alias_value(source, keys, {})
return value if isinstance(value, dict) else {}