Files
Ai/rag/services/irrigation.py
T
2026-04-24 03:02:22 +03:30

262 lines
9.3 KiB
Python

"""
سرویس توصیه آبیاری — بدون API، قابل فراخوانی از سایر سرویس‌ها
از RAG با پایگاه دانش irrigation و لحن مخصوص آبیاری استفاده می‌کند.
"""
import json
import logging
from django.db import transaction
from irrigation.models import IrrigationMethod
from irrigation.evapotranspiration import calculate_forecast_water_needs, resolve_crop_profile, resolve_kc
from farm_data.models import SensorData
from rag.api_provider import get_chat_client
from rag.chat import (
_complete_audit_log,
_create_audit_log,
_fail_audit_log,
_load_service_tone,
build_rag_context,
)
from rag.config import load_rag_config, RAGConfig, get_service_config
from rag.user_data import build_plant_text, build_irrigation_method_text
from weather.models import WeatherForecast
logger = logging.getLogger(__name__)
KB_NAME = "irrigation"
SERVICE_ID = "irrigation"
DEFAULT_IRRIGATION_PROMPT = (
"بر اساس محاسبات نهایی تبخیر-تعرق و نیاز آبی که در ورودی آمده، "
"یک برنامه آبیاری قابل‌فهم برای کشاورز تولید کن. "
"پاسخ حتماً به فرمت JSON با ساختار زیر باشد:\n"
'{\n'
' "plan": {\n'
' "frequencyPerWeek": <int>,\n'
' "durationMinutes": <int>,\n'
' "bestTimeOfDay": "<str>",\n'
' "moistureLevel": <int>,\n'
' "warning": "<str>"\n'
' }\n'
'}\n'
"فقط JSON خروجی بده، بدون توضیح اضافی. "
"از انجام هرگونه محاسبه عددی جدید خودداری کن و فقط از داده‌های ساختاریافته ورودی استفاده کن."
)
def _resolve_irrigation_method(
sensor: SensorData | None,
irrigation_method_name: str | None,
) -> IrrigationMethod | None:
if irrigation_method_name:
return IrrigationMethod.objects.filter(name=irrigation_method_name).first()
if sensor is not None:
return sensor.irrigation_method
return None
def _persist_irrigation_method_on_farm(
sensor: SensorData | None,
irrigation_method: IrrigationMethod | None,
) -> None:
if sensor is None or irrigation_method is None:
return
if sensor.irrigation_method_id == irrigation_method.id:
return
with transaction.atomic():
sensor.irrigation_method = irrigation_method
sensor.save(update_fields=["irrigation_method", "updated_at"])
def get_irrigation_recommendation(
sensor_uuid: str,
plant_name: str | None = None,
growth_stage: str | None = None,
irrigation_method_name: str | None = None,
query: str | None = None,
config: RAGConfig | None = None,
limit: int = 8,
) -> dict:
"""
توصیه آبیاری برای یک سنسور (کاربر).
از RAG با پایگاه دانش irrigation استفاده می‌کند.
Args:
sensor_uuid: شناسه سنسور کاربر
plant_name: نام گیاه (برای بارگذاری مشخصات از جدول Plant)
growth_stage: مرحله رشد گیاه
irrigation_method_name: نام روش آبیاری (برای بارگذاری از جدول IrrigationMethod)
query: سوال اختیاری
config: تنظیمات RAG
limit: تعداد چانک‌های بازیابی‌شده
Returns:
dict با کلیدهای irrigation_needed, amount_mm, reason, next_check_date, raw_response
"""
cfg = config or load_rag_config()
service = get_service_config(SERVICE_ID, cfg)
service_cfg = RAGConfig(
embedding=cfg.embedding,
qdrant=cfg.qdrant,
chunking=cfg.chunking,
llm=service.llm,
knowledge_bases=cfg.knowledge_bases,
services=cfg.services,
chromadb=cfg.chromadb,
)
client = get_chat_client(service_cfg)
model = service.llm.model
user_query = query or "توصیه آبیاری برای مزرعه من چیست؟"
sensor = (
SensorData.objects.select_related("center_location", "irrigation_method")
.prefetch_related("plants")
.filter(farm_uuid=sensor_uuid)
.first()
)
irrigation_method = _resolve_irrigation_method(sensor, irrigation_method_name)
_persist_irrigation_method_on_farm(sensor, irrigation_method)
plant = None
resolved_plant_name = plant_name
if sensor is not None and plant_name:
plant = sensor.plants.filter(name=plant_name).first()
elif sensor is not None:
plant = sensor.plants.first()
if plant is not None:
resolved_plant_name = plant.name
crop_profile = resolve_crop_profile(plant, growth_stage=growth_stage)
active_kc = resolve_kc(crop_profile, growth_stage=growth_stage)
forecasts = []
daily_water_needs = []
if sensor is not None:
forecasts = list(
WeatherForecast.objects.filter(location=sensor.center_location, forecast_date__isnull=False)
.order_by("forecast_date")[:7]
)
efficiency_percent = (
getattr(irrigation_method, "water_efficiency_percent", None)
if irrigation_method
else None
)
daily_water_needs = calculate_forecast_water_needs(
forecasts=forecasts,
latitude_deg=float(sensor.center_location.latitude),
crop_profile=crop_profile,
growth_stage=growth_stage,
irrigation_efficiency_percent=efficiency_percent,
)
context = build_rag_context(
user_query, sensor_uuid, config=cfg, limit=limit, kb_name=KB_NAME, service_id=SERVICE_ID,
)
extra_parts: list[str] = []
resolved_irrigation_method_name = (
irrigation_method.name if irrigation_method is not None else None
)
if resolved_plant_name and growth_stage:
plant_text = build_plant_text(resolved_plant_name, growth_stage)
if plant_text:
extra_parts.append("[اطلاعات گیاه]\n" + plant_text)
if resolved_irrigation_method_name:
method_text = build_irrigation_method_text(resolved_irrigation_method_name)
if method_text:
extra_parts.append("[روش آبیاری انتخابی]\n" + method_text)
if daily_water_needs:
total_mm = round(sum(item["gross_irrigation_mm"] for item in daily_water_needs), 2)
schedule_lines = [
f"- {item['forecast_date']}: ET0={item['et0_mm']} mm, ETc={item['etc_mm']} mm, "
f"بارش مؤثر={item['effective_rainfall_mm']} mm, نیاز آبی={item['gross_irrigation_mm']} mm, "
f"زمان پیشنهادی={item['irrigation_timing']}"
for item in daily_water_needs
]
extra_parts.append(
"[خروجی قطعی محاسبات FAO-56]\n"
f"کل نیاز آبی ۷ روز آینده: {total_mm} mm\n"
f"Kc مورد استفاده: {active_kc}\n"
+ "\n".join(schedule_lines)
)
if extra_parts:
context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "")
tone = _load_service_tone(service, cfg)
system_parts = [tone] if tone else []
if service.system_prompt:
system_parts.append(service.system_prompt)
system_parts.append(DEFAULT_IRRIGATION_PROMPT)
if context:
system_parts.append("\n\n" + context)
system_content = "\n".join(system_parts)
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": user_query},
]
audit_log = _create_audit_log(
farm_uuid=sensor_uuid,
service_id=SERVICE_ID,
model=model,
query=user_query,
system_prompt=system_content,
messages=messages,
)
try:
response = client.chat.completions.create(
model=model,
messages=messages,
)
raw = response.choices[0].message.content.strip()
except Exception as exc:
logger.error("Irrigation recommendation error for %s: %s", sensor_uuid, exc)
result = {
"irrigation_needed": None,
"amount_mm": None,
"reason": f"خطا در دریافت توصیه: {exc}",
"next_check_date": None,
"raw_response": None,
}
_fail_audit_log(
audit_log,
str(exc),
response_text=json.dumps(result, ensure_ascii=False, default=str),
)
return result
try:
cleaned = raw
if cleaned.startswith("```"):
cleaned = cleaned.strip("`").removeprefix("json").strip()
result = json.loads(cleaned)
except (json.JSONDecodeError, ValueError):
result = {
"plan": {
"warning": raw,
},
}
result["raw_response"] = raw
result["water_balance"] = {
"daily": daily_water_needs,
"crop_profile": crop_profile,
"active_kc": active_kc,
}
result["selected_irrigation_method"] = (
{
"id": irrigation_method.id,
"name": irrigation_method.name,
"category": irrigation_method.category,
"water_efficiency_percent": irrigation_method.water_efficiency_percent,
}
if irrigation_method is not None
else None
)
_complete_audit_log(
audit_log,
json.dumps(result, ensure_ascii=False, default=str),
)
return result