263 lines
9.9 KiB
Python
263 lines
9.9 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from math import sqrt
|
||
|
|
from statistics import mean
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
|
||
|
|
METRIC_CONFIG = {
|
||
|
|
"soil_moisture": {
|
||
|
|
"label": "رطوبت خاک",
|
||
|
|
"unit": "%",
|
||
|
|
"source": "history",
|
||
|
|
"current_field": "soil_moisture",
|
||
|
|
},
|
||
|
|
"soil_temperature": {
|
||
|
|
"label": "دمای خاک",
|
||
|
|
"unit": "°C",
|
||
|
|
"source": "history",
|
||
|
|
"current_field": "soil_temperature",
|
||
|
|
},
|
||
|
|
"humidity": {
|
||
|
|
"label": "رطوبت هوا",
|
||
|
|
"unit": "%",
|
||
|
|
"source": "forecast",
|
||
|
|
"forecast_field": "humidity_mean",
|
||
|
|
},
|
||
|
|
"soil_ph": {
|
||
|
|
"label": "pH خاک",
|
||
|
|
"unit": "pH",
|
||
|
|
"source": "history",
|
||
|
|
"current_field": "soil_ph",
|
||
|
|
},
|
||
|
|
"electrical_conductivity": {
|
||
|
|
"label": "هدایت الکتریکی",
|
||
|
|
"unit": "dS/m",
|
||
|
|
"source": "history",
|
||
|
|
"current_field": "electrical_conductivity",
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
METHOD_PRIORITY = {"IQR": 2, "Z_SCORE": 1}
|
||
|
|
|
||
|
|
|
||
|
|
def _percentile(sorted_values: list[float], percentile: float) -> float:
|
||
|
|
if not sorted_values:
|
||
|
|
return 0.0
|
||
|
|
if len(sorted_values) == 1:
|
||
|
|
return sorted_values[0]
|
||
|
|
index = (len(sorted_values) - 1) * percentile
|
||
|
|
lower = int(index)
|
||
|
|
upper = min(lower + 1, len(sorted_values) - 1)
|
||
|
|
fraction = index - lower
|
||
|
|
return sorted_values[lower] + ((sorted_values[upper] - sorted_values[lower]) * fraction)
|
||
|
|
|
||
|
|
|
||
|
|
def _population_std(values: list[float]) -> float:
|
||
|
|
if len(values) < 2:
|
||
|
|
return 0.0
|
||
|
|
center = mean(values)
|
||
|
|
variance = sum((value - center) ** 2 for value in values) / len(values)
|
||
|
|
return sqrt(variance)
|
||
|
|
|
||
|
|
|
||
|
|
def _severity_from_score(score: float) -> str:
|
||
|
|
absolute = abs(score)
|
||
|
|
if absolute >= 3.5:
|
||
|
|
return "critical"
|
||
|
|
if absolute >= 2.5:
|
||
|
|
return "high"
|
||
|
|
if absolute >= 1.5:
|
||
|
|
return "medium"
|
||
|
|
return "low"
|
||
|
|
|
||
|
|
|
||
|
|
def _history_series(history: list[Any], field_name: str) -> tuple[list[float], str | None, float | None]:
|
||
|
|
values: list[float] = []
|
||
|
|
latest_timestamp = None
|
||
|
|
latest_value = None
|
||
|
|
|
||
|
|
for item in history:
|
||
|
|
value = getattr(item, field_name, None)
|
||
|
|
if value is None:
|
||
|
|
continue
|
||
|
|
numeric = float(value)
|
||
|
|
values.append(numeric)
|
||
|
|
if latest_timestamp is None:
|
||
|
|
recorded_at = getattr(item, "recorded_at", None)
|
||
|
|
latest_timestamp = recorded_at.isoformat() if recorded_at is not None else None
|
||
|
|
latest_value = numeric
|
||
|
|
|
||
|
|
return list(reversed(values)), latest_timestamp, latest_value
|
||
|
|
|
||
|
|
|
||
|
|
def _forecast_series(forecasts: list[Any], field_name: str) -> tuple[list[float], str | None, float | None]:
|
||
|
|
values: list[float] = []
|
||
|
|
latest_timestamp = None
|
||
|
|
latest_value = None
|
||
|
|
|
||
|
|
for forecast in forecasts[:7]:
|
||
|
|
value = getattr(forecast, field_name, None)
|
||
|
|
if value is None:
|
||
|
|
continue
|
||
|
|
numeric = float(value)
|
||
|
|
values.append(numeric)
|
||
|
|
if latest_timestamp is None:
|
||
|
|
forecast_date = getattr(forecast, "forecast_date", None)
|
||
|
|
latest_timestamp = forecast_date.isoformat() if forecast_date is not None else None
|
||
|
|
latest_value = numeric
|
||
|
|
|
||
|
|
return values, latest_timestamp, latest_value
|
||
|
|
|
||
|
|
|
||
|
|
def _detect_with_z_score(values: list[float], observed_value: float) -> dict[str, Any] | None:
|
||
|
|
if len(values) < 5:
|
||
|
|
return None
|
||
|
|
center = mean(values)
|
||
|
|
std = _population_std(values)
|
||
|
|
if std == 0:
|
||
|
|
return None
|
||
|
|
score = (observed_value - center) / std
|
||
|
|
if abs(score) < 2.0:
|
||
|
|
return None
|
||
|
|
return {
|
||
|
|
"anomaly_method": "Z_SCORE",
|
||
|
|
"deviation_score": round(score, 3),
|
||
|
|
"expected_range": [round(center - (2 * std), 2), round(center + (2 * std), 2)],
|
||
|
|
"severity": _severity_from_score(score),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _detect_with_iqr(values: list[float], observed_value: float) -> dict[str, Any] | None:
|
||
|
|
if len(values) < 5:
|
||
|
|
return None
|
||
|
|
sorted_values = sorted(values)
|
||
|
|
q1 = _percentile(sorted_values, 0.25)
|
||
|
|
q3 = _percentile(sorted_values, 0.75)
|
||
|
|
iqr = q3 - q1
|
||
|
|
if iqr == 0:
|
||
|
|
return None
|
||
|
|
lower = q1 - (1.5 * iqr)
|
||
|
|
upper = q3 + (1.5 * iqr)
|
||
|
|
if lower <= observed_value <= upper:
|
||
|
|
return None
|
||
|
|
|
||
|
|
if observed_value < lower:
|
||
|
|
score = (observed_value - lower) / iqr
|
||
|
|
else:
|
||
|
|
score = (observed_value - upper) / iqr
|
||
|
|
|
||
|
|
return {
|
||
|
|
"anomaly_method": "IQR",
|
||
|
|
"deviation_score": round(score, 3),
|
||
|
|
"expected_range": [round(lower, 2), round(upper, 2)],
|
||
|
|
"severity": _severity_from_score(score),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _select_detection_result(results: list[dict[str, Any]]) -> dict[str, Any] | None:
|
||
|
|
if not results:
|
||
|
|
return None
|
||
|
|
return sorted(
|
||
|
|
results,
|
||
|
|
key=lambda item: (METHOD_PRIORITY[item["anomaly_method"]], abs(item["deviation_score"])),
|
||
|
|
reverse=True,
|
||
|
|
)[0]
|
||
|
|
|
||
|
|
|
||
|
|
def _build_contextual_interpretation(anomalies: list[dict[str, Any]], ai_bundle: dict | None = None) -> dict[str, Any]:
|
||
|
|
ai_bundle = ai_bundle or {}
|
||
|
|
ai_payload = ai_bundle.get("anomalyDetectionCard", {}) if isinstance(ai_bundle, dict) else {}
|
||
|
|
if isinstance(ai_payload, dict) and all(ai_payload.get(key) for key in ("explanation", "likely_cause", "recommended_action")):
|
||
|
|
return {
|
||
|
|
"explanation": ai_payload["explanation"],
|
||
|
|
"likely_cause": ai_payload["likely_cause"],
|
||
|
|
"recommended_action": ai_payload["recommended_action"],
|
||
|
|
}
|
||
|
|
|
||
|
|
metric_types = {item["metric_type"] for item in anomalies}
|
||
|
|
if {"soil_temperature", "soil_moisture"} <= metric_types:
|
||
|
|
return {
|
||
|
|
"explanation": "همزمانی ناهنجاری دمای خاک و رطوبت خاک نشان میدهد تنش ترکیبی در ناحیه ریشه در حال شکلگیری است.",
|
||
|
|
"likely_cause": "احتمالاً الگوی آبیاری، موج گرما یا افت ناگهانی ظرفیت نگهداشت رطوبت خاک عامل اصلی است.",
|
||
|
|
"recommended_action": "زمانبندی آبیاری و وضعیت زهکشی/تبخیر بررسی و قرائتهای سنسور در ۲۴ ساعت آینده دوباره پایش شود.",
|
||
|
|
}
|
||
|
|
if "electrical_conductivity" in metric_types and "soil_moisture" in metric_types:
|
||
|
|
return {
|
||
|
|
"explanation": "همزمانی ناهنجاری EC و رطوبت میتواند نشاندهنده فشار شوری یا تجمع نمک در بستر باشد.",
|
||
|
|
"likely_cause": "کیفیت آب آبیاری، کوددهی اخیر یا کاهش شستوشوی خاک میتواند عامل این الگو باشد.",
|
||
|
|
"recommended_action": "EC آب و برنامه کوددهی بازبینی و در صورت نیاز شستوشوی کنترلشده خاک بررسی شود.",
|
||
|
|
}
|
||
|
|
if anomalies:
|
||
|
|
top = anomalies[0]
|
||
|
|
return {
|
||
|
|
"explanation": f"در شاخص {top['label']} یک ناهنجاری آماری با روش {top['anomaly_method']} شناسایی شده است.",
|
||
|
|
"likely_cause": "این رخداد میتواند ناشی از تغییر ناگهانی شرایط محیطی، خطای فرایندی یا نیاز به کالیبراسیون سنسور باشد.",
|
||
|
|
"recommended_action": "روند همان شاخص و دادههای پیرامونی بازبینی و در صورت تداوم، اقدام اصلاحی مزرعه اجرا شود.",
|
||
|
|
}
|
||
|
|
return {
|
||
|
|
"explanation": "ناهنجاری آماری معناداری در دادههای اخیر شناسایی نشد.",
|
||
|
|
"likely_cause": "دادههای فعلی با الگوی تاریخی سازگار هستند.",
|
||
|
|
"recommended_action": "پایش عادی ادامه یابد.",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def build_anomaly_detection_card(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
|
||
|
|
context = context or {}
|
||
|
|
sensor = context.get("sensor")
|
||
|
|
history = context.get("history", [])
|
||
|
|
forecasts = context.get("forecasts", [])
|
||
|
|
if sensor is None:
|
||
|
|
return {"anomalies": [], "interpretation": None}
|
||
|
|
|
||
|
|
anomalies: list[dict[str, Any]] = []
|
||
|
|
|
||
|
|
for metric_type, config in METRIC_CONFIG.items():
|
||
|
|
if config["source"] == "history":
|
||
|
|
values, timestamp, observed_value = _history_series(history, config["current_field"])
|
||
|
|
current_value = getattr(sensor, config["current_field"], None)
|
||
|
|
if current_value is not None:
|
||
|
|
observed_value = float(current_value)
|
||
|
|
timestamp = getattr(sensor, "updated_at", None)
|
||
|
|
timestamp = timestamp.isoformat() if timestamp is not None else timestamp
|
||
|
|
else:
|
||
|
|
values, timestamp, observed_value = _forecast_series(forecasts, config["forecast_field"])
|
||
|
|
|
||
|
|
if observed_value is None or len(values) < 5:
|
||
|
|
continue
|
||
|
|
|
||
|
|
detection = _select_detection_result(
|
||
|
|
[
|
||
|
|
result
|
||
|
|
for result in (
|
||
|
|
_detect_with_z_score(values, observed_value),
|
||
|
|
_detect_with_iqr(values, observed_value),
|
||
|
|
)
|
||
|
|
if result is not None
|
||
|
|
]
|
||
|
|
)
|
||
|
|
if detection is None:
|
||
|
|
continue
|
||
|
|
|
||
|
|
anomalies.append(
|
||
|
|
{
|
||
|
|
"metric_type": metric_type,
|
||
|
|
"label": config["label"],
|
||
|
|
"timestamp": timestamp,
|
||
|
|
"observed_value": round(observed_value, 2),
|
||
|
|
"expected_range": detection["expected_range"],
|
||
|
|
"deviation_score": detection["deviation_score"],
|
||
|
|
"anomaly_method": detection["anomaly_method"],
|
||
|
|
"severity": detection["severity"],
|
||
|
|
"unit": config["unit"],
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
anomalies.sort(key=lambda item: abs(item["deviation_score"]), reverse=True)
|
||
|
|
interpretation = _build_contextual_interpretation(anomalies, ai_bundle=ai_bundle)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"anomalies": anomalies,
|
||
|
|
"interpretation": interpretation,
|
||
|
|
}
|