153 lines
3.6 KiB
Python
153 lines
3.6 KiB
Python
import logging
|
|
import re
|
|
from decimal import Decimal, InvalidOperation
|
|
|
|
from model import DrillingRealtimeData
|
|
|
|
|
|
DB_COLUMNS = [
|
|
"ts",
|
|
"stknum",
|
|
"recid",
|
|
"seqid",
|
|
"actual_date",
|
|
"actual_time",
|
|
"actcod",
|
|
"deptbitm",
|
|
"deptbitv",
|
|
"deptmeas",
|
|
"deptvert",
|
|
"blkpos",
|
|
"ropa",
|
|
"hkla",
|
|
"hklx",
|
|
"woba",
|
|
"wobx",
|
|
"torqa",
|
|
"torqx",
|
|
"rpma",
|
|
"sppa",
|
|
"chkp",
|
|
"spm1",
|
|
"spm2",
|
|
"spm3",
|
|
"tvolact",
|
|
"tvolcact",
|
|
"mfop",
|
|
"mfoa",
|
|
"mfia",
|
|
"mdoa",
|
|
"mdia",
|
|
"mtoa",
|
|
"mtia",
|
|
"mcoa",
|
|
"mcia",
|
|
"stkc",
|
|
"lagstks",
|
|
"deptretm",
|
|
"gasa",
|
|
"space1",
|
|
"space2",
|
|
"space3",
|
|
"space4",
|
|
"space5",
|
|
]
|
|
|
|
INT_COLUMNS = {
|
|
"stknum",
|
|
"recid",
|
|
"seqid",
|
|
"actual_date",
|
|
"actual_time",
|
|
"actcod",
|
|
"rpma",
|
|
"spm1",
|
|
"spm2",
|
|
"spm3",
|
|
"mfop",
|
|
"stkc",
|
|
"lagstks",
|
|
}
|
|
|
|
|
|
def sanitize_identifier(value, fallback):
|
|
cleaned = re.sub(r"[^A-Za-z0-9_]", "_", str(value or ""))
|
|
if not cleaned:
|
|
cleaned = fallback
|
|
if cleaned[0].isdigit():
|
|
cleaned = f"t_{cleaned}"
|
|
return cleaned.lower()
|
|
|
|
|
|
def sql_quote(value):
|
|
return "'" + str(value).replace("'", "''") + "'"
|
|
|
|
|
|
class DrillingRealtimeORM:
|
|
def __init__(self, database, stable="drilling_realtime_st", default_device_code="GJ-304-0088"):
|
|
self.database = database
|
|
self.stable = stable
|
|
self.default_device_code = default_device_code or "GJ-304-0088"
|
|
|
|
def build_insert_sql(self, payload):
|
|
if not isinstance(payload, dict):
|
|
raise ValueError("payload is not JSON object")
|
|
entity = DrillingRealtimeData.from_payload(payload)
|
|
meta = get_dict_alias(payload, "meta", "mate")
|
|
equipment_code = (
|
|
str(get_alias_value(meta, ("equipment_code", "equipmentCode"), "")).strip()
|
|
or str(get_alias_value(meta, ("equipment_sn", "equipmentSN"), "")).strip()
|
|
or entity.wellid
|
|
or str(self.default_device_code).strip()
|
|
or "GJ-304-0088"
|
|
)
|
|
table_name = sanitize_identifier(f"drilling_realtime_{equipment_code}", "drilling_realtime_default")
|
|
values = []
|
|
for column in DB_COLUMNS:
|
|
raw = getattr(entity, column, None)
|
|
if raw is None:
|
|
logging.debug("Column %s is None, default=0", column)
|
|
if column in INT_COLUMNS or column == "ts":
|
|
values.append(str(to_int(raw)))
|
|
else:
|
|
values.append(str(to_float(raw)))
|
|
columns_sql = ", ".join([f"`{column}`" for column in DB_COLUMNS])
|
|
values_sql = ", ".join(values)
|
|
return (
|
|
f"INSERT INTO `{self.database}`.`{table_name}` USING `{self.database}`.`{self.stable}` "
|
|
f"TAGS ({sql_quote(equipment_code)}) ({columns_sql}) VALUES ({values_sql})"
|
|
)
|
|
|
|
|
|
def to_int(value, default=0):
|
|
try:
|
|
if value is None or value == "":
|
|
return default
|
|
return int(Decimal(str(value).strip()))
|
|
except (InvalidOperation, TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def to_float(value, default=0.0):
|
|
try:
|
|
if value is None or value == "":
|
|
return default
|
|
return float(value)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def get_alias_value(source, keys, default=None):
|
|
if not isinstance(source, dict):
|
|
return default
|
|
for key in keys:
|
|
value = source.get(key)
|
|
if value is not None:
|
|
return value
|
|
return default
|
|
|
|
|
|
def get_dict_alias(source, *keys):
|
|
value = get_alias_value(source, keys, {})
|
|
return value if isinstance(value, dict) else {}
|