""" سرویس توصیه آبیاری — بدون API، قابل فراخوانی از سایر سرویس‌ها از RAG با پایگاه دانش irrigation و لحن مخصوص آبیاری استفاده می‌کند. """ import json import logging from django.apps import apps from django.db import transaction from irrigation.models import IrrigationMethod from irrigation.evapotranspiration import calculate_forecast_water_needs, resolve_crop_profile, resolve_kc from farm_data.models import SensorData from rag.api_provider import get_chat_client from rag.chat import ( _complete_audit_log, _create_audit_log, _fail_audit_log, _load_service_tone, build_rag_context, ) from rag.config import load_rag_config, RAGConfig, get_service_config from rag.user_data import build_plant_text, build_irrigation_method_text from weather.models import WeatherForecast logger = logging.getLogger(__name__) KB_NAME = "irrigation" SERVICE_ID = "irrigation" DEFAULT_IRRIGATION_PROMPT = ( "از محاسبات FAO-56 و خروجی بهینه ساز شبیه سازی برای ساخت توصیه آبیاری استفاده کن. " "اگر بلوک [خروجی بهینه ساز شبیه سازی] وجود داشت، همان را مرجع اصلی اعداد قرار بده. " "پاسخ فقط JSON معتبر با کلید sections باشد و عدد جدید متناقض نساز." ) def _get_optimizer(): return apps.get_app_config("crop_simulation").get_recommendation_optimizer() def _unique_items(items: list[str]) -> list[str]: seen = set() output = [] for item in items: normalized = (item or "").strip() if not normalized or normalized in seen: continue seen.add(normalized) output.append(normalized) return output def _find_section(sections: list[dict], section_type: str) -> dict | None: for section in sections: if isinstance(section, dict) and section.get("type") == section_type: return section return None def _field_sources(llm_section: dict, fallback_section: dict, merged_section: dict) -> dict[str, str]: sources: dict[str, str] = {} for key, value in merged_section.items(): if key == "provenance": continue llm_value = llm_section.get(key) fallback_value = fallback_section.get(key) if key in llm_section and value == llm_value and value != fallback_value: sources[key] = "llm" elif key in fallback_section and value == fallback_value and value != llm_value: sources[key] = "fallback" elif key in llm_section and key in fallback_section and llm_value == fallback_value == value: sources[key] = "shared" elif key in llm_section and key in fallback_section: sources[key] = "merged" else: sources[key] = "fallback" if key in fallback_section else "llm" return sources def _attach_provenance(section_type: str, llm_section: dict, fallback_section: dict, merged_section: dict) -> dict: merged = dict(merged_section) field_sources = _field_sources(llm_section, fallback_section, merged) merged["provenance"] = { "sectionType": section_type, "llmProvided": bool(llm_section), "fallbackUsed": any(source != "llm" for source in field_sources.values()), "fieldSources": field_sources, } return merged def _fallback_with_provenance(fallback: dict, reason: str) -> dict: sections = [] for section in fallback.get("sections", []): section_with_provenance = dict(section) section_with_provenance["provenance"] = { "sectionType": section.get("type"), "llmProvided": False, "fallbackUsed": True, "fieldSources": { key: "fallback" for key in section.keys() if key != "provenance" }, } sections.append(section_with_provenance) return { "sections": sections, "mergeMetadata": { "source": "fallback_only", "reason": reason, }, } def _build_irrigation_fallback( *, optimized_result: dict | None, daily_water_needs: list[dict], ) -> dict: if optimized_result: recommended = optimized_result["recommended_strategy"] content = ( f"{recommended['events']} نوبت آبیاری با حدود " f"{recommended['amount_per_event_mm']} میلی متر در هر نوبت اجرا شود." ) list_items = [ f"در بازه اعتبار حدود {recommended['total_irrigation_mm']} میلی متر آب توزیع شود.", f"نوبت های پیشنهادی: {', '.join(recommended['event_dates']) or 'بر اساس پایش روزانه مزرعه'}", f"رطوبت خاک نزدیک {recommended['moisture_target_percent']} درصد نگه داشته شود.", ] warning = optimized_result.get("alternatives", []) warning_text = ( f"اگر شرایط مزرعه تغییر کرد، سناریوی جایگزین {warning[0]['label']} هم قابل بررسی است." if warning else "در صورت تغییر ناگهانی بارش یا باد، برنامه را دوباره ارزیابی کنید." ) explanation = " ".join(recommended.get("reasoning", [])) return { "sections": [ { "type": "recommendation", "title": "برنامه آبیاری بهینه", "icon": "droplet", "content": content, "frequency": f"{recommended['events']} نوبت در بازه اعتبار", "amount": ( f"{recommended['amount_per_event_mm']} میلی متر در هر نوبت " f"(جمع کل {recommended['total_irrigation_mm']} میلی متر)" ), "timing": recommended["timing"], "validityPeriod": recommended["validity_period"], "expandableExplanation": explanation, }, { "type": "list", "title": "اقدامات اجرایی", "icon": "list", "items": _unique_items(list_items), }, { "type": "warning", "title": "هشدار آبیاری", "icon": "alert-triangle", "content": warning_text, }, ] } total_mm = round(sum(float(item.get("gross_irrigation_mm", 0.0) or 0.0) for item in daily_water_needs), 2) return { "sections": [ { "type": "recommendation", "title": "برنامه آبیاری پیشنهادی", "icon": "droplet", "content": "پایش رطوبت خاک ادامه پیدا کند و برنامه آبیاری بر اساس نیاز روزانه تنظیم شود.", "frequency": "بر اساس پایش روزانه", "amount": f"جمع نیاز تخمینی این بازه حدود {total_mm} میلی متر است.", "timing": "اوایل صبح یا نزدیک غروب", "validityPeriod": "معتبر برای 3 روز آینده", "expandableExplanation": "به دلیل محدود بودن داده ها، توصیه با اتکا به نیاز آبی روزانه ساخته شده است.", }, { "type": "list", "title": "اقدامات اجرایی", "icon": "list", "items": [ "قبل از هر نوبت آبیاری رطوبت خاک سطحی را دوباره بررسی کنید.", "اگر بارش موثر رخ داد، نوبت بعدی را به تعویق بیندازید.", ], }, { "type": "warning", "title": "هشدار آبیاری", "icon": "alert-triangle", "content": "با تغییر دما یا بارش پیش بینی شده، برنامه فعلی باید بازبینی شود.", }, ] } def _merge_irrigation_response( *, parsed_result: dict, optimized_result: dict | None, daily_water_needs: list[dict], ) -> dict: fallback = _build_irrigation_fallback( optimized_result=optimized_result, daily_water_needs=daily_water_needs, ) if not isinstance(parsed_result, dict): return _fallback_with_provenance(fallback, "invalid_llm_payload") sections = parsed_result.get("sections") if not isinstance(sections, list): return _fallback_with_provenance(fallback, "missing_sections") recommendation = _find_section(sections, "recommendation") or {} list_section = _find_section(sections, "list") or {} warning_section = _find_section(sections, "warning") or {} fallback_recommendation = fallback["sections"][0] fallback_list = fallback["sections"][1] fallback_warning = fallback["sections"][2] merged_recommendation = {**recommendation, **fallback_recommendation} merged_recommendation["expandableExplanation"] = ( recommendation.get("expandableExplanation") or fallback_recommendation["expandableExplanation"] ) merged_recommendation["content"] = recommendation.get("content") or fallback_recommendation["content"] merged_recommendation["title"] = recommendation.get("title") or fallback_recommendation["title"] merged_list = { **fallback_list, **list_section, "items": _unique_items( list(list_section.get("items", [])) + list(fallback_list["items"]) )[:5], } merged_warning = { **fallback_warning, **warning_section, "content": warning_section.get("content") or fallback_warning["content"], } merged_recommendation = _attach_provenance( "recommendation", recommendation, fallback_recommendation, merged_recommendation, ) merged_list = _attach_provenance( "list", list_section, fallback_list, merged_list, ) merged_warning = _attach_provenance( "warning", warning_section, fallback_warning, merged_warning, ) return { "sections": [merged_recommendation, merged_list, merged_warning], "mergeMetadata": { "source": "llm_with_fallback_merge", "llmSectionsDetected": [section.get("type") for section in sections if isinstance(section, dict)], "fallbackSectionsApplied": [ item["type"] for item in (fallback_recommendation, fallback_list, fallback_warning) ], }, } def _resolve_irrigation_method( sensor: SensorData | None, irrigation_method_name: str | None, ) -> IrrigationMethod | None: if irrigation_method_name: return IrrigationMethod.objects.filter(name=irrigation_method_name).first() if sensor is not None: return sensor.irrigation_method return None def _persist_irrigation_method_on_farm( sensor: SensorData | None, irrigation_method: IrrigationMethod | None, ) -> None: if sensor is None or irrigation_method is None: return if sensor.irrigation_method_id == irrigation_method.id: return with transaction.atomic(): sensor.irrigation_method = irrigation_method sensor.save(update_fields=["irrigation_method", "updated_at"]) def get_irrigation_recommendation( farm_uuid: str | None = None, plant_name: str | None = None, growth_stage: str | None = None, irrigation_method_name: str | None = None, query: str | None = None, config: RAGConfig | None = None, limit: int = 8, sensor_uuid: str | None = None, ) -> dict: """ توصیه آبیاری برای یک مزرعه. از RAG با پایگاه دانش irrigation استفاده می‌کند. Args: farm_uuid: شناسه مزرعه plant_name: نام گیاه (برای بارگذاری مشخصات از جدول Plant) growth_stage: مرحله رشد گیاه irrigation_method_name: نام روش آبیاری (برای بارگذاری از جدول IrrigationMethod) query: سوال اختیاری config: تنظیمات RAG limit: تعداد چانک‌های بازیابی‌شده Returns: dict ساختاریافته برای توصیه آبیاری """ cfg = config or load_rag_config() service = get_service_config(SERVICE_ID, cfg) service_cfg = RAGConfig( embedding=cfg.embedding, qdrant=cfg.qdrant, chunking=cfg.chunking, llm=service.llm, knowledge_bases=cfg.knowledge_bases, services=cfg.services, chromadb=cfg.chromadb, ) client = get_chat_client(service_cfg) model = service.llm.model resolved_farm_uuid = str(farm_uuid or sensor_uuid or "").strip() if not resolved_farm_uuid: raise ValueError("farm_uuid is required.") user_query = query or "توصیه آبیاری برای مزرعه من چیست؟" sensor = ( SensorData.objects.select_related("center_location", "irrigation_method") .prefetch_related("plants") .filter(farm_uuid=resolved_farm_uuid) .first() ) irrigation_method = _resolve_irrigation_method(sensor, irrigation_method_name) _persist_irrigation_method_on_farm(sensor, irrigation_method) plant = None resolved_plant_name = plant_name if sensor is not None and plant_name: plant = sensor.plants.filter(name=plant_name).first() elif sensor is not None: plant = sensor.plants.first() if plant is not None: resolved_plant_name = plant.name crop_profile = resolve_crop_profile(plant, growth_stage=growth_stage) active_kc = resolve_kc(crop_profile, growth_stage=growth_stage) forecasts = [] daily_water_needs = [] optimized_result = None if sensor is not None: forecasts = list( WeatherForecast.objects.filter(location=sensor.center_location, forecast_date__isnull=False) .order_by("forecast_date")[:7] ) efficiency_percent = ( getattr(irrigation_method, "water_efficiency_percent", None) if irrigation_method else None ) daily_water_needs = calculate_forecast_water_needs( forecasts=forecasts, latitude_deg=float(sensor.center_location.latitude), crop_profile=crop_profile, growth_stage=growth_stage, irrigation_efficiency_percent=efficiency_percent, ) if plant is not None and forecasts: optimized_result = _get_optimizer().optimize_irrigation( sensor=sensor, plant=plant, forecasts=forecasts, daily_water_needs=daily_water_needs, growth_stage=growth_stage, irrigation_method=irrigation_method, ) context = build_rag_context( user_query, resolved_farm_uuid, config=cfg, limit=limit, kb_name=KB_NAME, service_id=SERVICE_ID, ) extra_parts: list[str] = [] resolved_irrigation_method_name = ( irrigation_method.name if irrigation_method is not None else None ) if resolved_plant_name and growth_stage: plant_text = build_plant_text(resolved_plant_name, growth_stage) if plant_text: extra_parts.append("[اطلاعات گیاه]\n" + plant_text) if resolved_irrigation_method_name: method_text = build_irrigation_method_text(resolved_irrigation_method_name) if method_text: extra_parts.append("[روش آبیاری انتخابی]\n" + method_text) if daily_water_needs: total_mm = round(sum(item["gross_irrigation_mm"] for item in daily_water_needs), 2) schedule_lines = [ f"- {item['forecast_date']}: ET0={item['et0_mm']} mm, ETc={item['etc_mm']} mm, " f"بارش مؤثر={item['effective_rainfall_mm']} mm, نیاز آبی={item['gross_irrigation_mm']} mm, " f"زمان پیشنهادی={item['irrigation_timing']}" for item in daily_water_needs ] extra_parts.append( "[خروجی قطعی محاسبات FAO-56]\n" f"کل نیاز آبی ۷ روز آینده: {total_mm} mm\n" f"Kc مورد استفاده: {active_kc}\n" + "\n".join(schedule_lines) ) if optimized_result is not None: extra_parts.append( "[خروجی بهینه ساز شبیه سازی]\n" + optimized_result["context_text"] ) if extra_parts: context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "") tone = _load_service_tone(service, cfg) system_parts = [tone] if tone else [] if service.system_prompt: system_parts.append(service.system_prompt) system_parts.append(DEFAULT_IRRIGATION_PROMPT) if context: system_parts.append("\n\n" + context) system_content = "\n".join(system_parts) messages = [ {"role": "system", "content": system_content}, {"role": "user", "content": user_query}, ] audit_log = _create_audit_log( farm_uuid=resolved_farm_uuid, service_id=SERVICE_ID, model=model, query=user_query, system_prompt=system_content, messages=messages, ) try: response = client.chat.completions.create( model=model, messages=messages, ) raw = response.choices[0].message.content.strip() except Exception as exc: logger.error("Irrigation recommendation error for %s: %s", resolved_farm_uuid, exc) result = _build_irrigation_fallback( optimized_result=optimized_result, daily_water_needs=daily_water_needs, ) result["error"] = f"خطا در دریافت توصیه: {exc}" result["raw_response"] = None _fail_audit_log( audit_log, str(exc), response_text=json.dumps(result, ensure_ascii=False, default=str), ) return result try: cleaned = raw if cleaned.startswith("```"): cleaned = cleaned.strip("`").removeprefix("json").strip() result = json.loads(cleaned) except (json.JSONDecodeError, ValueError): result = {} result = _merge_irrigation_response( parsed_result=result, optimized_result=optimized_result, daily_water_needs=daily_water_needs, ) result["raw_response"] = raw result["water_balance"] = { "daily": daily_water_needs, "crop_profile": crop_profile, "active_kc": active_kc, } result["simulation_optimizer"] = optimized_result result["selected_irrigation_method"] = ( { "id": irrigation_method.id, "name": irrigation_method.name, "category": irrigation_method.category, "water_efficiency_percent": irrigation_method.water_efficiency_percent, } if irrigation_method is not None else None ) _complete_audit_log( audit_log, json.dumps(result, ensure_ascii=False, default=str), ) return result