Files
tdEngine_mqtt_mock/model/drilling.py
2026-03-13 09:46:16 +08:00

223 lines
5.0 KiB
Python

import time
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal, InvalidOperation
DATA_KEYS = [
"ts",
"wellid",
"stknum",
"recid",
"seqid",
"actual_date",
"actual_time",
"actcod",
"deptbitm",
"deptbitv",
"deptmeas",
"deptvert",
"blkpos",
"ropa",
"hkla",
"hklx",
"woba",
"wobx",
"torqa",
"torqx",
"rpma",
"sppa",
"chkp",
"spm1",
"spm2",
"spm3",
"tvolact",
"tvolcact",
"mfop",
"mfoa",
"mfia",
"mdoa",
"mdia",
"mtoa",
"mtia",
"mcoa",
"mcia",
"stkc",
"lagstks",
"deptretm",
"gasa",
"space1",
"space2",
"space3",
"space4",
"space5",
]
DATA_KEY_ALIASES = {
"actual_date": ("actual_date", "actualDate"),
"actual_time": ("actual_time", "actualTime"),
"wellid": ("wellid", "wellId"),
}
META_KEY_ALIASES = {
"equipment_code": ("equipment_code", "equipmentCode"),
"equipment_sn": ("equipment_sn", "equipmentSN"),
}
@dataclass(frozen=True)
class DrillingRealtimeData:
ts: int
wellid: str
stknum: int
recid: int
seqid: int
actual_date: int
actual_time: int
actcod: int
deptbitm: float
deptbitv: float
deptmeas: float
deptvert: float
blkpos: float
ropa: float
hkla: float
hklx: float
woba: float
wobx: float
torqa: float
torqx: float
rpma: int
sppa: float
chkp: float
spm1: int
spm2: int
spm3: int
tvolact: float
tvolcact: float
mfop: int
mfoa: float
mfia: float
mdoa: float
mdia: float
mtoa: float
mtia: float
mcoa: float
mcia: float
stkc: int
lagstks: int
deptretm: float
gasa: float
space1: float
space2: float
space3: float
space4: float
space5: float
@classmethod
def empty(cls, wellid=""):
now = datetime.utcnow()
return cls(
ts=int(time.time() * 1000),
wellid=wellid,
stknum=0,
recid=0,
seqid=0,
actual_date=int(now.strftime("%Y%m%d")),
actual_time=int(now.strftime("%H%M%S")),
actcod=0,
deptbitm=0.0,
deptbitv=0.0,
deptmeas=0.0,
deptvert=0.0,
blkpos=0.0,
ropa=0.0,
hkla=0.0,
hklx=0.0,
woba=0.0,
wobx=0.0,
torqa=0.0,
torqx=0.0,
rpma=0,
sppa=0.0,
chkp=0.0,
spm1=0,
spm2=0,
spm3=0,
tvolact=0.0,
tvolcact=0.0,
mfop=0,
mfoa=0.0,
mfia=0.0,
mdoa=0.0,
mdia=0.0,
mtoa=0.0,
mtia=0.0,
mcoa=0.0,
mcia=0.0,
stkc=0,
lagstks=0,
deptretm=0.0,
gasa=0.0,
space1=0.0,
space2=0.0,
space3=0.0,
space4=0.0,
space5=0.0,
)
@classmethod
def from_payload(cls, payload):
meta = get_dict_alias(payload, "meta", "mate")
data = payload.get("data") if isinstance(payload, dict) and isinstance(payload.get("data"), dict) else {}
values = {key: get_alias_value(data, DATA_KEY_ALIASES.get(key, (key,)), 0) for key in DATA_KEYS}
values["ts"] = parse_int_value(
get_alias_value(data, ("ts", "record_time", "recordTime"), int(time.time() * 1000)),
int(time.time() * 1000),
)
values["actual_date"] = parse_int_value(get_alias_value(data, DATA_KEY_ALIASES["actual_date"], 0))
values["actual_time"] = parse_int_value(get_alias_value(data, DATA_KEY_ALIASES["actual_time"], 0))
values["wellid"] = (
get_alias_value(data, DATA_KEY_ALIASES["wellid"], "")
or get_alias_value(meta, META_KEY_ALIASES["equipment_code"], "")
or get_alias_value(meta, META_KEY_ALIASES["equipment_sn"], "")
or ""
)
return cls(**values)
def to_payload(self, equipment_code):
return {
"meta": {
"equipment_code": equipment_code,
"equipment_sn": equipment_code,
},
"data": {
key: getattr(self, key)
for key in DATA_KEYS
},
}
def parse_int_value(value, default=0):
try:
if value is None or value == "":
return default
return int(Decimal(str(value).strip()))
except (InvalidOperation, TypeError, ValueError):
return default
def get_alias_value(source, keys, default=None):
if not isinstance(source, dict):
return default
for key in keys:
value = source.get(key)
if value is not None:
return value
return default
def get_dict_alias(source, *keys):
value = get_alias_value(source, keys, {})
return value if isinstance(value, dict) else {}