Files

437 lines
16 KiB
Python
Raw Permalink Normal View History

2026-05-11 03:27:21 +03:30
from __future__ import annotations
import hashlib
import json
import logging
from typing import Any
from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder
from farm_data.services import get_farm_details
from farm_data.context import load_farm_context
from rag.api_provider import get_chat_client
from rag.chat import (
_complete_audit_log,
_create_audit_log,
_fail_audit_log,
_load_service_tone,
build_rag_context,
)
from rag.config import RAGConfig, get_service_config, load_rag_config
from .models import FarmAlertNotification
from .alerts_tracker import build_farm_alerts_tracker
logger = logging.getLogger(__name__)
KB_NAME = "farm_alerts"
SERVICE_ID = "farm_alerts"
TRACKER_PROMPT = (
"وضعیت هشدارهای مزرعه را فقط بر اساس داده های ساختاریافته، اطلاعات مزرعه، alertهاي ورودي، و متون بازیابی شده از پایگاه دانش تحلیل کن. "
"پاسخ فقط JSON معتبر باشد و این کلیدها را داشته باشد: headline, overview, status_level, notifications. "
"status_level فقط یکی از danger, warning, info باشد. "
"notifications باید آرایه ای از آبجکت ها با کلیدهای level, title, message, suggested_action, source_alert_id, source_metric_type باشد. "
"سطوح level فقط یکی از danger, warning, info باشند. "
"اگر هشدار مهمی وجود ندارد، notifications را خالی برگردان."
)
TIMELINE_PROMPT = (
"بر اساس داده های هشدار مزرعه و alertهاي ورودي، یک timeline عملیاتی بساز. "
"پاسخ فقط JSON معتبر باشد و این کلیدها را داشته باشد: headline, overview, timeline, notifications. "
"timeline باید آرایه ای از آبجکت ها با کلیدهای timestamp, level, title, description, source_alert_id, source_metric_type باشد. "
"level فقط danger, warning, info باشد. "
"notifications باید آرایه ای از آبجکت ها با کلیدهای level, title, message, suggested_action, source_alert_id, source_metric_type باشد."
)
def _json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, indent=2, cls=DjangoJSONEncoder)
def _clean_json_response(raw: str) -> dict[str, Any]:
cleaned = (raw or "").strip()
if cleaned.startswith("```"):
cleaned = cleaned.strip("`")
if cleaned.startswith("json"):
cleaned = cleaned[4:]
cleaned = cleaned.strip()
if not cleaned:
return {}
try:
return json.loads(cleaned)
except (json.JSONDecodeError, ValueError):
logger.warning("Invalid JSON returned by farm_alerts LLM: %s", cleaned[:500])
return {}
def _severity_to_level(severity: str) -> str:
normalized = (severity or "").strip().lower()
if normalized in {"critical", "high", "danger"}:
return FarmAlertNotification.LEVEL_DANGER
if normalized in {"medium", "warning"}:
return FarmAlertNotification.LEVEL_WARNING
return FarmAlertNotification.LEVEL_INFO
def _normalize_level(level: str | None) -> str:
normalized = (level or "").strip().lower()
if normalized in {
FarmAlertNotification.LEVEL_DANGER,
FarmAlertNotification.LEVEL_WARNING,
FarmAlertNotification.LEVEL_INFO,
}:
return normalized
if normalized in {"high", "critical"}:
return FarmAlertNotification.LEVEL_DANGER
if normalized in {"medium", "alert"}:
return FarmAlertNotification.LEVEL_WARNING
return FarmAlertNotification.LEVEL_INFO
def _alert_identifier(alert: dict[str, Any]) -> str:
metric_type = alert.get("metric_type", "alert")
timestamp = alert.get("timestamp", "")
return f"{metric_type}:{timestamp}"
def _forecast_summary(context: dict[str, Any]) -> list[dict[str, Any]]:
forecasts = context.get("forecasts", [])
return [
{
"date": getattr(item, "forecast_date", None),
"temperature_min": getattr(item, "temperature_min", None),
"temperature_max": getattr(item, "temperature_max", None),
"humidity_mean": getattr(item, "humidity_mean", None),
"precipitation": getattr(item, "precipitation", None),
"et0": getattr(item, "et0", None),
}
for item in forecasts[:7]
]
def _farm_profile(context: dict[str, Any], farm_uuid: str) -> dict[str, Any]:
sensor = context.get("sensor")
location = context.get("location")
plants = context.get("plants", [])
irrigation_method = getattr(sensor, "irrigation_method", None) if sensor else None
return {
"farm_uuid": farm_uuid,
"location": {
"latitude": float(location.latitude) if location else None,
"longitude": float(location.longitude) if location else None,
},
"plant_names": [getattr(plant, "name", "") for plant in plants],
"irrigation_method": getattr(irrigation_method, "name", None),
"last_sensor_update": getattr(sensor, "updated_at", None),
}
def _normalize_incoming_alerts(alerts: list[dict[str, Any]] | None) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for item in alerts or []:
if not isinstance(item, dict):
continue
normalized.append(
{
"alert_id": item.get("alert_id") or None,
"level": item.get("level") or None,
"title": item.get("title") or None,
"message": item.get("message") or None,
"suggested_action": item.get("suggested_action") or None,
"source_metric_type": item.get("source_metric_type") or None,
"timestamp": item.get("timestamp"),
"payload": item.get("payload") if isinstance(item.get("payload"), dict) else {},
}
)
return normalized
def _build_structured_context(
farm_uuid: str,
incoming_alerts: list[dict[str, Any]] | None = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
context = load_farm_context(farm_uuid)
if context is None:
raise ValueError("farm_uuid نامعتبر است یا اطلاعات هشدار مزرعه پیدا نشد.")
tracker = build_farm_alerts_tracker(sensor_id=farm_uuid, context=context, ai_bundle=None)
structured = {
"farm_profile": _farm_profile(context, farm_uuid),
"tracker": tracker,
"forecasts": _forecast_summary(context),
"incoming_alerts": _normalize_incoming_alerts(incoming_alerts),
}
return context, structured
def _validate_tracker_response(payload: dict[str, Any]) -> dict[str, Any]:
required_keys = {"headline", "overview", "status_level", "notifications"}
missing = [key for key in required_keys if key not in payload]
if missing:
raise ValueError(
"Farm alerts tracker response is missing required fields: "
+ ", ".join(missing)
)
if not isinstance(payload.get("notifications"), list):
raise ValueError("Farm alerts tracker notifications must be a list.")
return payload
def _validate_timeline_response(payload: dict[str, Any]) -> dict[str, Any]:
required_keys = {"headline", "overview", "timeline", "notifications"}
missing = [key for key in required_keys if key not in payload]
if missing:
raise ValueError(
"Farm alerts timeline response is missing required fields: "
+ ", ".join(missing)
)
if not isinstance(payload.get("timeline"), list):
raise ValueError("Farm alerts timeline must be a list.")
if not isinstance(payload.get("notifications"), list):
raise ValueError("Farm alerts timeline notifications must be a list.")
return payload
def _notification_fingerprint(
*,
farm_uuid: str,
endpoint: str,
level: str,
title: str,
source_alert_id: str,
source_metric_type: str,
) -> str:
raw = "|".join([
str(farm_uuid),
endpoint,
level,
source_alert_id or "-",
source_metric_type or "-",
title.strip(),
])
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _save_notifications(
*,
farm_uuid: str,
endpoint: str,
notifications: list[dict[str, Any]],
) -> list[FarmAlertNotification]:
saved: list[FarmAlertNotification] = []
for item in notifications:
level = _normalize_level(item.get("level"))
title = (item.get("title") or "هشدار مزرعه").strip()
source_alert_id = (item.get("source_alert_id") or "").strip()
source_metric_type = (item.get("source_metric_type") or "").strip()
fingerprint = _notification_fingerprint(
farm_uuid=farm_uuid,
endpoint=endpoint,
level=level,
title=title,
source_alert_id=source_alert_id,
source_metric_type=source_metric_type,
)
payload = item.get("payload") if isinstance(item.get("payload"), dict) else {}
notification, _ = FarmAlertNotification.objects.update_or_create(
fingerprint=fingerprint,
defaults={
"farm_uuid": farm_uuid,
"endpoint": endpoint,
"level": level,
"title": title,
"message": item.get("message") or "",
"suggested_action": item.get("suggested_action") or "",
"source_alert_id": source_alert_id,
"source_metric_type": source_metric_type,
"payload": payload,
},
)
saved.append(notification)
return saved
def _serialize_notification(notification: FarmAlertNotification) -> dict[str, Any]:
return {
"id": notification.id,
"farm_uuid": str(notification.farm_uuid),
"endpoint": notification.endpoint,
"level": notification.level,
"title": notification.title,
"message": notification.message,
"suggested_action": notification.suggested_action,
"source_alert_id": notification.source_alert_id,
"source_metric_type": notification.source_metric_type,
"payload": notification.payload,
"created_at": notification.created_at.isoformat(),
"updated_at": notification.updated_at.isoformat(),
}
def _build_service_config(cfg: RAGConfig, service_id: str) -> tuple[Any, Any, str, Any]:
service = get_service_config(service_id, cfg)
service_cfg = RAGConfig(
embedding=cfg.embedding,
qdrant=cfg.qdrant,
chunking=cfg.chunking,
llm=service.llm,
knowledge_bases=cfg.knowledge_bases,
services=cfg.services,
chromadb=cfg.chromadb,
)
client = get_chat_client(service_cfg)
model = service.llm.model
return service, service_cfg, model, client
def _build_messages(
*,
prompt: str,
service: Any,
cfg: RAGConfig,
query: str,
rag_context: str,
structured_context: dict[str, Any],
) -> tuple[str, list[dict[str, str]]]:
tone = _load_service_tone(service, cfg)
system_parts = [tone] if tone else []
if service.system_prompt:
system_parts.append(service.system_prompt)
system_parts.append(prompt)
system_parts.append("[کانتکست ساختاریافته هشدار مزرعه]\n" + _json_dumps(structured_context))
if rag_context:
system_parts.append(rag_context)
system_prompt = "\n\n".join(part for part in system_parts if part)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
]
return system_prompt, messages
def _llm_response(
*,
farm_uuid: str,
service_id: str,
prompt: str,
query: str,
structured_context: dict[str, Any],
) -> tuple[dict[str, Any], str, str]:
cfg = load_rag_config()
service, service_cfg, model, client = _build_service_config(cfg, service_id)
farm_details = get_farm_details(farm_uuid)
rag_context = build_rag_context(
query=query,
sensor_uuid=farm_uuid,
config=cfg,
kb_name=KB_NAME,
service_id=service_id,
farm_details=farm_details,
)
system_prompt, messages = _build_messages(
prompt=prompt,
service=service,
cfg=cfg,
query=query,
rag_context=rag_context,
structured_context=structured_context,
)
audit_log = _create_audit_log(
farm_uuid=farm_uuid,
service_id=service_id,
model=model,
query=query,
system_prompt=system_prompt,
messages=messages,
)
try:
response = client.chat.completions.create(model=model, messages=messages)
raw = response.choices[0].message.content.strip()
parsed = _clean_json_response(raw)
if not parsed:
raise ValueError("farm_alerts LLM returned an empty or invalid JSON payload.")
_complete_audit_log(audit_log, raw)
return parsed, raw, service.tone_file or ""
except Exception as exc:
logger.error("farm_alerts llm error for %s: %s", farm_uuid, exc)
_fail_audit_log(audit_log, str(exc))
raise RuntimeError(f"Farm alerts generation failed for farm {farm_uuid}.") from exc
def get_farm_alerts_tracker(
*,
farm_uuid: str,
query: str | None = None,
alerts: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
_, structured_context = _build_structured_context(farm_uuid, incoming_alerts=alerts)
tracker = structured_context["tracker"]
user_query = query or "وضعیت فعلی هشدارهای مزرعه را ارزیابی کن و اگر لازم است notification بساز."
llm_result, raw_response, tone_file = _llm_response(
farm_uuid=farm_uuid,
service_id=SERVICE_ID,
prompt=TRACKER_PROMPT,
query=user_query,
structured_context=structured_context,
)
llm_result = _validate_tracker_response(llm_result)
notifications_input = llm_result["notifications"]
saved_notifications = _save_notifications(
farm_uuid=farm_uuid,
endpoint=FarmAlertNotification.ENDPOINT_TRACKER,
notifications=notifications_input,
)
return {
"farm_uuid": farm_uuid,
"service_id": SERVICE_ID,
"tracker": tracker,
"headline": llm_result["headline"],
"overview": llm_result["overview"],
"status_level": _normalize_level(llm_result.get("status_level")),
"notifications": [_serialize_notification(item) for item in saved_notifications],
"raw_llm_response": raw_response or None,
"structured_context": structured_context,
}
def get_farm_alerts_timeline(
*,
farm_uuid: str,
query: str | None = None,
alerts: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
_, structured_context = _build_structured_context(farm_uuid, incoming_alerts=alerts)
tracker = structured_context["tracker"]
user_query = query or "برای هشدارهای مزرعه یک timeline عملیاتی بساز و اگر لازم است notification ثبت کن."
llm_result, raw_response, tone_file = _llm_response(
farm_uuid=farm_uuid,
service_id=SERVICE_ID,
prompt=TIMELINE_PROMPT,
query=user_query,
structured_context=structured_context,
)
llm_result = _validate_timeline_response(llm_result)
timeline = llm_result["timeline"]
notifications_input = llm_result["notifications"]
saved_notifications = _save_notifications(
farm_uuid=farm_uuid,
endpoint=FarmAlertNotification.ENDPOINT_TIMELINE,
notifications=notifications_input,
)
return {
"farm_uuid": farm_uuid,
"service_id": SERVICE_ID,
"tracker": tracker,
"headline": llm_result["headline"],
"overview": llm_result["overview"],
"timeline": timeline,
"notifications": [_serialize_notification(item) for item in saved_notifications],
"raw_llm_response": raw_response or None,
"structured_context": structured_context,
}