""" سرویس توصیه آبیاری — بدون API، قابل فراخوانی از سایر سرویس‌ها از RAG با پایگاه دانش irrigation و لحن مخصوص آبیاری استفاده می‌کند. """ import json import logging from typing import Any from django.apps import apps from django.db import transaction from farm_data.models import SensorData from irrigation.evapotranspiration import ( calculate_forecast_water_needs, resolve_crop_profile, resolve_kc, ) from irrigation.models import IrrigationMethod from rag.api_provider import get_chat_client from rag.chat import ( _complete_audit_log, _create_audit_log, _fail_audit_log, _load_service_tone, build_rag_context, ) from rag.config import RAGConfig, get_service_config, load_rag_config from rag.user_data import build_irrigation_method_text, build_plant_text from weather.models import WeatherForecast logger = logging.getLogger(__name__) KB_NAME = "irrigation" SERVICE_ID = "irrigation" DEFAULT_IRRIGATION_PROMPT = ( "از محاسبات FAO-56 و خروجی بهینه ساز شبیه سازی برای ساخت توصیه آبیاری استفاده کن. " "اگر بلوک [خروجی بهینه ساز شبیه سازی] وجود داشت، همان را مرجع اصلی اعداد قرار بده. " "پاسخ را در قالب JSON معتبر با کلیدهای plan، timeline و sections برگردان و عدد جدید متناقض نساز." ) def _get_optimizer(): return apps.get_app_config("crop_simulation").get_recommendation_optimizer() def _safe_float(value: Any, default: float = 0.0) -> float: try: return float(value) except (TypeError, ValueError): return default def _sensor_metric(sensor: SensorData | None, metric: str) -> float | None: if sensor is None or not isinstance(sensor.sensor_payload, dict): return None for payload in sensor.sensor_payload.values(): if isinstance(payload, dict) and payload.get(metric) is not None: return _safe_float(payload.get(metric), default=0.0) return None def _coerce_list(value: Any) -> list[Any]: return value if isinstance(value, list) else [] def _coerce_dict(value: Any) -> dict[str, Any]: return value if isinstance(value, dict) else {} def _estimate_duration_minutes(amount_per_event_mm: float, efficiency_percent: float | None) -> int: normalized_efficiency = max(_safe_float(efficiency_percent, 75.0), 30.0) estimated_minutes = round(max(amount_per_event_mm, 1.0) * (2400 / normalized_efficiency)) return max(10, min(estimated_minutes, 240)) def _default_warning( optimizer_result: dict[str, Any] | None, daily_water_needs: list[dict[str, Any]], soil_moisture: float | None, ) -> str: strategy = _coerce_dict((optimizer_result or {}).get("recommended_strategy")) reasoning = _coerce_list(strategy.get("reasoning")) if reasoning: return str(reasoning[0]) if soil_moisture is not None and soil_moisture < 25: return "رطوبت خاک پایین است و نباید آبیاری به تعویق بیفتد." if soil_moisture is not None and soil_moisture > 80: return "رطوبت خاک بالاست و باید از آبیاری اضافی خودداری شود." if any(_safe_float(item.get("effective_rainfall_mm")) > 0 for item in daily_water_needs): return "با توجه به بارش موثر پیش بینی شده، برنامه آبیاری را قبل از اجرا دوباره بررسی کنید." return "در ساعات گرم روز آبیاری انجام نشود." def _normalize_plan( llm_result: dict[str, Any], optimizer_result: dict[str, Any] | None, daily_water_needs: list[dict[str, Any]], irrigation_method: IrrigationMethod | None, soil_moisture: float | None, ) -> dict[str, Any]: llm_plan = _coerce_dict(llm_result.get("plan")) strategy = _coerce_dict((optimizer_result or {}).get("recommended_strategy")) frequency = llm_plan.get("frequencyPerWeek") if frequency is None: frequency = strategy.get("frequency_per_week") or strategy.get("events") or len(daily_water_needs) or 1 duration = llm_plan.get("durationMinutes") if duration is None: duration = _estimate_duration_minutes( _safe_float(strategy.get("amount_per_event_mm"), 6.0), getattr(irrigation_method, "water_efficiency_percent", None), ) best_time = llm_plan.get("bestTimeOfDay") if not best_time: best_time = strategy.get("timing") or ( daily_water_needs[0].get("irrigation_timing") if daily_water_needs else "05:30 تا 08:00 صبح" ) moisture_level = llm_plan.get("moistureLevel") if moisture_level is None: moisture_level = round( soil_moisture if soil_moisture is not None else _safe_float(strategy.get("moisture_target_percent"), 70.0) ) warning = llm_plan.get("warning") if not warning: warning = _default_warning(optimizer_result, daily_water_needs, soil_moisture) return { "frequencyPerWeek": int(max(_safe_float(frequency, 1), 1)), "durationMinutes": int(max(_safe_float(duration, 10), 10)), "bestTimeOfDay": str(best_time), "moistureLevel": int(max(min(_safe_float(moisture_level, 70), 100), 0)), "warning": str(warning), } def _normalize_timeline( llm_result: dict[str, Any], optimizer_result: dict[str, Any] | None, daily_water_needs: list[dict[str, Any]], ) -> list[dict[str, Any]]: raw_timeline = _coerce_list(llm_result.get("timeline")) timeline: list[dict[str, Any]] = [] for index, item in enumerate(raw_timeline, start=1): item_dict = _coerce_dict(item) title = item_dict.get("title") description = item_dict.get("description") if title and description: timeline.append( { "step_number": int(item_dict.get("step_number") or index), "title": str(title), "description": str(description), } ) if timeline: return timeline strategy = _coerce_dict((optimizer_result or {}).get("recommended_strategy")) event_dates = _coerce_list(strategy.get("event_dates")) best_timing = strategy.get("timing") or ( daily_water_needs[0].get("irrigation_timing") if daily_water_needs else "صبح زود" ) generated = [ { "step_number": 1, "title": "بررسی فشار", "description": "فشار ابتدا و انتهای لاین کنترل شود.", }, { "step_number": 2, "title": "اجرای آبیاری", "description": f"آبیاری در بازه {best_timing} انجام شود.", }, ] if event_dates: generated.append( { "step_number": 3, "title": "پیگیری برنامه", "description": f"نوبت های پیشنهادی برای تاریخ های {', '.join(map(str, event_dates))} بررسی شوند.", } ) else: generated.append( { "step_number": 3, "title": "بازبینی رطوبت", "description": "بعد از هر نوبت، رطوبت خاک و یکنواختی توزیع آب کنترل شود.", } ) return generated def _normalize_sections( llm_result: dict[str, Any], optimizer_result: dict[str, Any] | None, daily_water_needs: list[dict[str, Any]], plan_warning: str, ) -> list[dict[str, Any]]: raw_sections = _coerce_list(llm_result.get("sections")) sections: list[dict[str, Any]] = [] for section in raw_sections: item = _coerce_dict(section) section_type = str(item.get("type") or "").strip().lower() if section_type not in {"warning", "tip"}: continue content = item.get("content") title = item.get("title") if not content or not title: continue icon = item.get("icon") or ( "tabler-alert-triangle" if section_type == "warning" else "tabler-bulb" ) sections.append( { "title": str(title), "icon": str(icon), "type": section_type, "content": str(content), } ) if not any(item["type"] == "warning" for item in sections): sections.insert( 0, { "title": "هشدار آبیاری", "icon": "tabler-alert-triangle", "type": "warning", "content": plan_warning, }, ) if not any(item["type"] == "tip" for item in sections): strategy = _coerce_dict((optimizer_result or {}).get("recommended_strategy")) reasoning = _coerce_list(strategy.get("reasoning")) tip_content = ( str(reasoning[-1]) if reasoning else "شست وشوی فیلترها و بازبینی یکنواختی پخش آب به پایداری برنامه آبیاری کمک می کند." ) if any(_safe_float(item.get("effective_rainfall_mm")) > 0 for item in daily_water_needs): tip_content = "قبل از نوبت بعدی، مقدار بارش موثر و رطوبت خاک را دوباره با برنامه تطبیق دهید." sections.append( { "title": "نکته بهره وری", "icon": "tabler-bulb", "type": "tip", "content": tip_content, } ) return sections[:4] def _build_irrigation_ui_payload( llm_result: dict[str, Any], optimizer_result: dict[str, Any] | None, daily_water_needs: list[dict[str, Any]], crop_profile: dict[str, Any], active_kc: float, irrigation_method: IrrigationMethod | None, sensor: SensorData | None, ) -> dict[str, Any]: soil_moisture = _sensor_metric(sensor, "soil_moisture") plan = _normalize_plan( llm_result, optimizer_result, daily_water_needs, irrigation_method, soil_moisture, ) payload = { "plan": plan, "water_balance": { "daily": daily_water_needs, "crop_profile": crop_profile, "active_kc": active_kc, }, "timeline": _normalize_timeline(llm_result, optimizer_result, daily_water_needs), "sections": _normalize_sections( llm_result, optimizer_result, daily_water_needs, plan["warning"], ), } return payload def _resolve_irrigation_method( sensor: SensorData | None, irrigation_method_name: str | None, ) -> IrrigationMethod | None: if irrigation_method_name: return IrrigationMethod.objects.filter(name=irrigation_method_name).first() if sensor is not None: return sensor.irrigation_method return None def _persist_irrigation_method_on_farm( sensor: SensorData | None, irrigation_method: IrrigationMethod | None, ) -> None: if sensor is None or irrigation_method is None: return if sensor.irrigation_method_id == irrigation_method.id: return with transaction.atomic(): sensor.irrigation_method = irrigation_method sensor.save(update_fields=["irrigation_method", "updated_at"]) def get_irrigation_recommendation( farm_uuid: str | None = None, plant_name: str | None = None, growth_stage: str | None = None, irrigation_method_name: str | None = None, query: str | None = None, config: RAGConfig | None = None, limit: int = 8, sensor_uuid: str | None = None, ) -> dict: """ توصیه آبیاری برای یک مزرعه. از RAG با پایگاه دانش irrigation استفاده می‌کند. Args: farm_uuid: شناسه مزرعه plant_name: نام گیاه (برای بارگذاری مشخصات از جدول Plant) growth_stage: مرحله رشد گیاه irrigation_method_name: نام روش آبیاری (برای بارگذاری از جدول IrrigationMethod) query: سوال اختیاری config: تنظیمات RAG limit: تعداد چانک‌های بازیابی‌شده Returns: dict ساختاریافته برای توصیه آبیاری """ cfg = config or load_rag_config() service = get_service_config(SERVICE_ID, cfg) service_cfg = RAGConfig( embedding=cfg.embedding, qdrant=cfg.qdrant, chunking=cfg.chunking, llm=service.llm, knowledge_bases=cfg.knowledge_bases, services=cfg.services, chromadb=cfg.chromadb, ) client = get_chat_client(service_cfg) model = service.llm.model resolved_farm_uuid = str(farm_uuid or sensor_uuid or "").strip() if not resolved_farm_uuid: raise ValueError("farm_uuid is required.") user_query = query or "توصیه آبیاری برای مزرعه من چیست؟" sensor = ( SensorData.objects.select_related("center_location", "irrigation_method") .prefetch_related("plants") .filter(farm_uuid=resolved_farm_uuid) .first() ) irrigation_method = _resolve_irrigation_method(sensor, irrigation_method_name) _persist_irrigation_method_on_farm(sensor, irrigation_method) plant = None resolved_plant_name = plant_name if sensor is not None and plant_name: plant = sensor.plants.filter(name=plant_name).first() elif sensor is not None: plant = sensor.plants.first() if plant is not None: resolved_plant_name = plant.name crop_profile = resolve_crop_profile(plant, growth_stage=growth_stage) active_kc = resolve_kc(crop_profile, growth_stage=growth_stage) forecasts = [] daily_water_needs = [] optimized_result = None if sensor is not None: forecasts = list( WeatherForecast.objects.filter( location=sensor.center_location, forecast_date__isnull=False, ).order_by("forecast_date")[:7] ) efficiency_percent = ( getattr(irrigation_method, "water_efficiency_percent", None) if irrigation_method else None ) daily_water_needs = calculate_forecast_water_needs( forecasts=forecasts, latitude_deg=float(sensor.center_location.latitude), crop_profile=crop_profile, growth_stage=growth_stage, irrigation_efficiency_percent=efficiency_percent, ) if plant is not None and forecasts: optimized_result = _get_optimizer().optimize_irrigation( sensor=sensor, plant=plant, forecasts=forecasts, daily_water_needs=daily_water_needs, growth_stage=growth_stage, irrigation_method=irrigation_method, ) context = build_rag_context( user_query, resolved_farm_uuid, config=cfg, limit=limit, kb_name=KB_NAME, service_id=SERVICE_ID, ) extra_parts: list[str] = [] resolved_irrigation_method_name = irrigation_method.name if irrigation_method is not None else None if resolved_plant_name and growth_stage: plant_text = build_plant_text(resolved_plant_name, growth_stage) if plant_text: extra_parts.append("[اطلاعات گیاه]\n" + plant_text) if resolved_irrigation_method_name: method_text = build_irrigation_method_text(resolved_irrigation_method_name) if method_text: extra_parts.append("[روش آبیاری انتخابی]\n" + method_text) if daily_water_needs: total_mm = round(sum(item["gross_irrigation_mm"] for item in daily_water_needs), 2) schedule_lines = [ f"- {item['forecast_date']}: ET0={item['et0_mm']} mm, ETc={item['etc_mm']} mm, " f"بارش مؤثر={item['effective_rainfall_mm']} mm, نیاز آبی={item['gross_irrigation_mm']} mm, " f"زمان پیشنهادی={item['irrigation_timing']}" for item in daily_water_needs ] extra_parts.append( "[خروجی قطعی محاسبات FAO-56]\n" f"کل نیاز آبی ۷ روز آینده: {total_mm} mm\n" f"Kc مورد استفاده: {active_kc}\n" + "\n".join(schedule_lines) ) if optimized_result is not None: extra_parts.append("[خروجی بهینه ساز شبیه سازی]\n" + optimized_result["context_text"]) if extra_parts: context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "") tone = _load_service_tone(service, cfg) system_parts = [tone] if tone else [] if service.system_prompt: system_parts.append(service.system_prompt) system_parts.append(DEFAULT_IRRIGATION_PROMPT) if context: system_parts.append("\n\n" + context) system_content = "\n".join(system_parts) messages = [ {"role": "system", "content": system_content}, {"role": "user", "content": user_query}, ] audit_log = _create_audit_log( farm_uuid=resolved_farm_uuid, service_id=SERVICE_ID, model=model, query=user_query, system_prompt=system_content, messages=messages, ) try: response = client.chat.completions.create( model=model, messages=messages, ) raw = response.choices[0].message.content.strip() except Exception as exc: logger.error("Irrigation recommendation error for %s: %s", resolved_farm_uuid, exc) _fail_audit_log(audit_log, str(exc)) raise RuntimeError( f"Irrigation recommendation failed for farm {resolved_farm_uuid}." ) from exc try: cleaned = raw if cleaned.startswith("```"): cleaned = cleaned.strip("`").removeprefix("json").strip() llm_result = json.loads(cleaned) except (json.JSONDecodeError, ValueError): llm_result = {} result = _build_irrigation_ui_payload( _coerce_dict(llm_result), optimized_result, daily_water_needs, crop_profile, active_kc, irrigation_method, sensor, ) result["raw_response"] = raw result["simulation_optimizer"] = optimized_result result["selected_irrigation_method"] = ( { "id": irrigation_method.id, "name": irrigation_method.name, "category": irrigation_method.category, "water_efficiency_percent": irrigation_method.water_efficiency_percent, } if irrigation_method is not None else None ) _complete_audit_log( audit_log, json.dumps(result, ensure_ascii=False, default=str), ) return result