Files
Ai/rag/services/fertilization.py
T
2026-04-28 19:00:38 +03:30

744 lines
30 KiB
Python

"""
سرویس توصیه کودهی — بدون API، قابل فراخوانی از سایر سرویس‌ها.
از RAG با پایگاه دانش fertilization و خروجی optimizer برای ساخت پاسخ ساختاریافته استفاده می‌کند.
"""
from __future__ import annotations
import json
import logging
import re
from typing import Any
from django.apps import apps
from farm_data.models import SensorData
from rag.api_provider import get_chat_client
from rag.chat import (
_complete_audit_log,
_create_audit_log,
_fail_audit_log,
_load_service_tone,
build_rag_context,
)
from rag.config import RAGConfig, get_service_config, load_rag_config
from rag.user_data import build_plant_text
logger = logging.getLogger(__name__)
KB_NAME = "fertilization"
SERVICE_ID = "fertilization"
HECTARE_TO_SQUARE_METER = 10000.0
DEFAULT_FERTILIZATION_PROMPT = (
"از RAG و خروجی بهینه ساز شبیه سازی برای ساخت پاسخ ساختاریافته کودهی استفاده کن. "
"اگر بلوک [خروجی بهینه ساز شبیه سازی] وجود داشت، همان مرجع قطعی اعداد، فرمول، روش مصرف و زمان بندی است. "
"پاسخ فقط JSON معتبر بر اساس قرارداد status/data برگردان."
)
DEFAULT_MACRO_DESCRIPTIONS = {
"n": "نیتروژن برای حفظ رشد رویشی، رنگ سبز برگ و بازسازی سریع بوته مهم است.",
"p": "فسفر به توسعه ریشه، انتقال انرژی و پشتیبانی از گلدهی و استقرار کمک می کند.",
"k": "پتاسیم به تنظیم آب، کیفیت محصول و مقاومت گیاه در برابر تنش محیطی کمک می کند.",
}
DEFAULT_MICRO_NAMES = {
"fe": "آهن",
"zn": "روی",
"mn": "منگنز",
"b": "بر",
"cu": "مس",
"mg": "منیزیم",
"ca": "کلسیم",
"mo": "مولیبدن",
}
DEFAULT_MICRO_DESCRIPTIONS = {
"fe": "آهن در ساخت کلروفیل و کاهش زردی بین رگبرگی نقش دارد.",
"zn": "روی در رشد متعادل، تشکیل هورمون ها و فعالیت آنزیمی موثر است.",
"mn": "منگنز در فتوسنتز و فعالیت آنزیم های متابولیکی نقش پشتیبان دارد.",
"b": "بر در گرده افشانی، تشکیل گل و انتقال قندها اهمیت دارد.",
"cu": "مس به فعالیت آنزیمی و استحکام نسبی بافت های گیاه کمک می کند.",
"mg": "منیزیم بخش مرکزی کلروفیل است و در فتوسنتز اهمیت دارد.",
"ca": "کلسیم در استحکام دیواره سلولی و کیفیت رشد بافت های جوان موثر است.",
"mo": "مولیبدن در متابولیسم نیتروژن و کارایی جذب آن نقش دارد.",
}
DEFAULT_STAGE_LABELS = {
"initial": "استقرار",
"vegetative": "رشد رویشی",
"flowering": "گلدهی",
"fruiting": "میوه دهی",
}
def _get_optimizer():
return apps.get_app_config("crop_simulation").get_recommendation_optimizer()
def _safe_float(value: Any, default: float | None = None) -> float | None:
try:
if value is None or value == "":
return default
return float(value)
except (TypeError, ValueError):
return default
def _stage_key(growth_stage: str | None) -> str:
text = (growth_stage or "").strip().lower()
if any(token in text for token in ("flower", "گل", "anthesis")):
return "flowering"
if any(token in text for token in ("fruit", "میوه", "برداشت", "ripen", "harvest")):
return "fruiting"
if any(token in text for token in ("initial", "seed", "جوانه", "نشا", "استقرار")):
return "initial"
return "vegetative"
def _clean_json_response(raw: str) -> dict[str, Any]:
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.strip("`").removeprefix("json").strip()
try:
parsed = json.loads(cleaned)
return parsed if isinstance(parsed, dict) else {}
except (json.JSONDecodeError, ValueError):
return {}
def _normalize_label(value: float) -> str:
if float(value).is_integer():
return str(int(value))
return f"{value:.2f}".rstrip("0").rstrip(".")
def _parse_npk_ratio(formula: str | None) -> dict[str, float | str]:
if not formula:
return {"n": 0.0, "p": 0.0, "k": 0.0, "label": "0-0-0"}
parts = re.findall(r"\d+(?:\.\d+)?", formula)
if len(parts) < 3:
return {"n": 0.0, "p": 0.0, "k": 0.0, "label": formula}
n, p, k = (_safe_float(part, 0.0) or 0.0 for part in parts[:3])
return {
"n": round(n, 3),
"p": round(p, 3),
"k": round(k, 3),
"label": f"{_normalize_label(n)}-{_normalize_label(p)}-{_normalize_label(k)}",
}
def _method_id(label: str) -> str:
text = (label or "").strip()
if "محلول" in text and ("آبیاری" in text or "کودآبیاری" in text):
return "foliar_fertigation"
if "محلول" in text:
return "foliar_spray"
if "آبیاری" in text or "کودآبیاری" in text:
return "fertigation"
if "سرک" in text or "خاک" in text or "نواری" in text:
return "soil_application"
return "custom_application"
def _slug_value(value: str) -> str:
token = re.sub(r"[^a-zA-Z0-9]+", "-", (value or "").strip().lower()).strip("-")
return token or "fertilizer"
def _fertilizer_display_name(formula: str | None) -> str:
ratio = _parse_npk_ratio(formula)
label = ratio["label"] if ratio["label"] else (formula or "کود پیشنهادی")
if label and label != "0-0-0":
return f"کود کامل {label}"
return formula or "کود پیشنهادی"
def _fertilizer_type_label(formula: str | None) -> str:
ratio = _parse_npk_ratio(formula)
if ratio["label"] and ratio["label"] != "0-0-0":
return "NPK"
return formula or "Fertilizer"
def _first_text(*values: Any) -> str:
for value in values:
if isinstance(value, str) and value.strip():
return value.strip()
return ""
def _default_application_steps(application_method: str) -> list[dict[str, Any]]:
if "محلول" in application_method:
return [
{
"step_number": 1,
"title": "آماده سازی",
"description": "دوز توصیه شده را در مقدار کمی آب تمیز حل کنید تا محلول یکنواخت به دست آید.",
},
{
"step_number": 2,
"title": "اختلاط",
"description": "محلول را به مخزن اصلی اضافه کنید و همزمان هم بزنید تا ته نشینی رخ ندهد.",
},
{
"step_number": 3,
"title": "مصرف",
"description": "در ساعات خنک روز به صورت یکنواخت محلول پاشی کنید و پس از اجرا بوته را پایش کنید.",
},
]
return [
{
"step_number": 1,
"title": "آماده سازی",
"description": "مقدار توصیه شده را بر اساس مساحت مزرعه اندازه گیری و پیش از اجرا یکنواخت تقسیم کنید.",
},
{
"step_number": 2,
"title": "تزریق یا پخش",
"description": "کود را از طریق کودآبیاری یا مصرف خاکی سبک مطابق روش پیشنهادی وارد مزرعه کنید.",
},
{
"step_number": 3,
"title": "پایش",
"description": "پس از اجرا رطوبت خاک، وضعیت برگ و پاسخ بوته را تا نوبت بعدی بررسی کنید.",
},
]
def _warning_from_weather(forecasts: list[Any], application_method: str) -> str:
if not forecasts:
return "هنگام مصرف از دستکش و ماسک استفاده کنید و قبل از اختلاط آزمون سازگاری در مقیاس کوچک انجام دهید."
rainy = next(
(
item
for item in forecasts
if (_safe_float(getattr(item, "precipitation", None), 0.0) or 0.0) >= 3.0
),
None,
)
hot = next(
(
item
for item in forecasts
if (_safe_float(getattr(item, "temperature_max", None), 0.0) or 0.0) >= 32.0
),
None,
)
if rainy is not None and "محلول" in application_method:
return (
f"به دلیل احتمال بارش موثر در {rainy.forecast_date} محلول پاشی را به پنجره خشک منتقل کنید و "
"در زمان اجرا از ماسک و دستکش استفاده شود."
)
if hot is not None:
return (
"به دلیل گرمای پیش رو، مصرف را فقط در صبح زود یا نزدیک غروب انجام دهید و از اختلاط غلیظ خودداری کنید."
)
return "هنگام مصرف از دستکش و ماسک استفاده کنید و پیش از اختلاط با سایر نهاده ها آزمون سازگاری انجام دهید."
def _fallback_optimizer_result(growth_stage: str | None) -> dict[str, Any]:
defaults = apps.get_app_config("fertilization").get_optimizer_defaults()
stage_key = _stage_key(growth_stage)
target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
base_amount = round(max(40.0, (target["n"] * 1.25)), 2)
return {
"engine": "defaults",
"recommended_strategy": {
"code": stage_key,
"label": DEFAULT_STAGE_LABELS.get(stage_key, stage_key),
"score": 0.0,
"expected_yield_index": 0.0,
"fertilizer_type": target["formula"],
"amount_kg_per_ha": base_amount,
"application_method": target["application_method"],
"timing": target["timing"],
"validity_period": f"معتبر برای {defaults['validity_days']} روز آینده",
"reasoning": [
"پیشنهاد از تنظیمات پایه مرحله رشد ساخته شد زیرا خروجی کامل optimizer در دسترس نبود.",
f"فرمول هدف مرحله {DEFAULT_STAGE_LABELS.get(stage_key, stage_key)} برابر با {target['formula']} در نظر گرفته شد.",
],
},
"alternatives": [],
"context_text": "fallback fertilization context",
}
def _build_legacy_sections(
structured_data: dict[str, Any],
recommended_strategy: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
primary = structured_data.get("primary_recommendation", {})
guide = structured_data.get("application_guide", {})
recommended_strategy = recommended_strategy or {}
return [
{
"type": "recommendation",
"title": primary.get("display_title") or "برنامه کودهی",
"icon": "leaf",
"content": primary.get("summary", ""),
"fertilizerType": primary.get("npk_ratio", {}).get("label") or primary.get("fertilizer_type", ""),
"amount": primary.get("dosage", {}).get("label", ""),
"applicationMethod": primary.get("application_method", {}).get("label", ""),
"timing": recommended_strategy.get("timing", ""),
"validityPeriod": recommended_strategy.get("validity_period", ""),
"expandableExplanation": primary.get("reasoning", ""),
},
{
"type": "list",
"title": "مراحل مصرف",
"icon": "list",
"items": [step.get("title", "") for step in guide.get("steps", []) if step.get("title")],
},
{
"type": "warning",
"title": "هشدار کودهی",
"icon": "alert-triangle",
"content": guide.get("safety_warning", ""),
},
]
def _coerce_steps(value: Any, application_method: str) -> list[dict[str, Any]]:
if not isinstance(value, list):
return _default_application_steps(application_method)
steps = []
for index, item in enumerate(value, start=1):
if isinstance(item, dict):
title = _first_text(item.get("title"), f"مرحله {index}")
description = _first_text(item.get("description"), item.get("content"))
if not description:
continue
steps.append(
{
"step_number": int(item.get("step_number") or index),
"title": title,
"description": description,
}
)
elif isinstance(item, str) and item.strip():
steps.append(
{
"step_number": index,
"title": f"مرحله {index}",
"description": item.strip(),
}
)
return steps or _default_application_steps(application_method)
def _normalize_micro_items(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
items = []
for item in value:
if not isinstance(item, dict):
continue
key = _first_text(item.get("key")).lower()
if not key:
continue
nutrient_value = _safe_float(item.get("value"))
if nutrient_value is None:
continue
items.append(
{
"key": key,
"name": _first_text(item.get("name"), DEFAULT_MICRO_NAMES.get(key, key.upper())),
"value": round(nutrient_value, 3),
"unit": "percent",
"description": _first_text(item.get("description"), DEFAULT_MICRO_DESCRIPTIONS.get(key, "")),
}
)
return items
def _build_nutrient_analysis(llm_analysis: dict[str, Any] | None, npk_ratio: dict[str, Any]) -> dict[str, Any]:
llm_analysis = llm_analysis if isinstance(llm_analysis, dict) else {}
macro_by_key: dict[str, dict[str, Any]] = {}
for item in llm_analysis.get("macro", []):
if not isinstance(item, dict):
continue
key = _first_text(item.get("key")).lower()
if key:
macro_by_key[key] = item
macro = []
for key, name in (("n", "نیتروژن (N)"), ("p", "فسفر (P)"), ("k", "پتاسیم (K)")):
source = macro_by_key.get(key, {})
macro.append(
{
"key": key,
"name": name,
"value": round(_safe_float(npk_ratio.get(key), 0.0) or 0.0, 3),
"unit": "percent",
"description": _first_text(source.get("description"), DEFAULT_MACRO_DESCRIPTIONS[key]),
}
)
return {"macro": macro, "micro": _normalize_micro_items(llm_analysis.get("micro"))}
def _build_application_guide(
llm_guide: dict[str, Any] | None,
*,
application_method: str,
warning_text: str,
) -> dict[str, Any]:
llm_guide = llm_guide if isinstance(llm_guide, dict) else {}
return {
"safety_warning": _first_text(llm_guide.get("safety_warning"), warning_text),
"steps": _coerce_steps(llm_guide.get("steps"), application_method),
}
def _build_alternative_recommendations(
llm_alternatives: Any,
optimizer_alternatives: list[dict[str, Any]],
recommended_strategy: dict[str, Any],
) -> list[dict[str, Any]]:
llm_items = llm_alternatives if isinstance(llm_alternatives, list) else []
alternatives = []
for index, optimizer_item in enumerate(optimizer_alternatives[:3]):
llm_item = llm_items[index] if index < len(llm_items) and isinstance(llm_items[index], dict) else {}
formula = _first_text(
llm_item.get("fertilizer_code"),
optimizer_item.get("fertilizer_type"),
recommended_strategy.get("fertilizer_type"),
)
display_name = _first_text(llm_item.get("fertilizer_name"), _fertilizer_display_name(formula), optimizer_item.get("label"))
description = _first_text(
llm_item.get("description"),
*(optimizer_item.get("reasoning") or []),
f"این گزینه با امتیاز {optimizer_item.get('score', 0)} برای شرایط مشابه قابل استفاده است.",
)
alternatives.append(
{
"fertilizer_code": _slug_value(formula or optimizer_item.get("code", f"alt-{index + 1}")),
"fertilizer_name": display_name,
"fertilizer_type": _first_text(llm_item.get("fertilizer_type"), _fertilizer_type_label(formula)),
"usage_method": _first_text(
llm_item.get("usage_method"),
optimizer_item.get("application_method"),
recommended_strategy.get("application_method"),
),
"description": description,
}
)
for llm_item in llm_items[len(alternatives):3]:
if not isinstance(llm_item, dict):
continue
fertilizer_name = _first_text(llm_item.get("fertilizer_name"))
fertilizer_code = _first_text(llm_item.get("fertilizer_code"), fertilizer_name)
if not fertilizer_name or not fertilizer_code:
continue
alternatives.append(
{
"fertilizer_code": _slug_value(fertilizer_code),
"fertilizer_name": fertilizer_name,
"fertilizer_type": _first_text(llm_item.get("fertilizer_type"), "Fertilizer"),
"usage_method": _first_text(llm_item.get("usage_method"), recommended_strategy.get("application_method", "")),
"description": _first_text(llm_item.get("description"), "گزینه جایگزین در صورت محدودیت تامین یا تغییر شرایط مزرعه."),
}
)
return alternatives
def _normalize_llm_payload(parsed_result: dict[str, Any]) -> dict[str, Any]:
if not isinstance(parsed_result, dict):
return {"status": "success", "data": {}}
if isinstance(parsed_result.get("data"), dict):
status = parsed_result.get("status") or "success"
return {"status": status, "data": parsed_result["data"]}
if any(key in parsed_result for key in ("primary_recommendation", "nutrient_analysis", "application_guide")):
status = parsed_result.get("status") or "success"
return {"status": status, "data": parsed_result}
sections = parsed_result.get("sections")
if isinstance(sections, list):
recommendation = next((item for item in sections if isinstance(item, dict) and item.get("type") == "recommendation"), {})
list_section = next((item for item in sections if isinstance(item, dict) and item.get("type") == "list"), {})
warning = next((item for item in sections if isinstance(item, dict) and item.get("type") == "warning"), {})
return {
"status": "success",
"data": {
"primary_recommendation": {
"display_title": _first_text(recommendation.get("title"), recommendation.get("fertilizerType")),
"reasoning": _first_text(recommendation.get("expandableExplanation"), recommendation.get("content")),
"summary": _first_text(recommendation.get("content"), recommendation.get("title")),
},
"application_guide": {
"safety_warning": _first_text(warning.get("content")),
"steps": list_section.get("items", []),
},
"alternative_recommendations": [],
},
}
return {"status": "success", "data": {}}
def _build_final_response(
*,
llm_payload: dict[str, Any],
optimized_result: dict[str, Any] | None,
plant_name: str | None,
crop_id: str | None,
growth_stage: str | None,
forecasts: list[Any],
) -> dict[str, Any]:
normalized_llm = _normalize_llm_payload(llm_payload)
advisory = normalized_llm.get("data", {}) if isinstance(normalized_llm.get("data"), dict) else {}
optimizer_payload = optimized_result or _fallback_optimizer_result(growth_stage)
recommended = optimizer_payload.get("recommended_strategy", {})
defaults = apps.get_app_config("fertilization").get_optimizer_defaults()
stage_key = _stage_key(growth_stage)
stage_target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
formula = _first_text(recommended.get("fertilizer_type"), stage_target.get("formula"))
npk_ratio = _parse_npk_ratio(formula)
application_method_label = _first_text(recommended.get("application_method"), stage_target.get("application_method"))
amount_kg_per_ha = round(_safe_float(recommended.get("amount_kg_per_ha"), 0.0) or 0.0, 3)
amount_per_square_meter = round(amount_kg_per_ha / HECTARE_TO_SQUARE_METER, 6)
interval_days = int(
stage_target.get(
"application_interval_days",
defaults.get("default_application_interval_days", 14),
)
)
primary_advisory = advisory.get("primary_recommendation") if isinstance(advisory.get("primary_recommendation"), dict) else {}
reasoning = _first_text(primary_advisory.get("reasoning"), " ".join(recommended.get("reasoning", [])))
if not reasoning:
reasoning = "این توصیه با اتکا به مرحله رشد، وضعیت خاک و خروجی بهینه ساز شبیه سازی تنظیم شده است."
summary = _first_text(primary_advisory.get("summary"))
if not summary:
summary = f"{_fertilizer_display_name(formula)} برای مرحله {DEFAULT_STAGE_LABELS.get(stage_key, stage_key)} مناسب ارزیابی شده است."
warning_text = _warning_from_weather(forecasts, application_method_label)
nutrient_analysis = _build_nutrient_analysis(advisory.get("nutrient_analysis"), npk_ratio)
application_guide = _build_application_guide(
advisory.get("application_guide"),
application_method=application_method_label,
warning_text=warning_text,
)
alternatives = _build_alternative_recommendations(
advisory.get("alternative_recommendations"),
optimizer_payload.get("alternatives", []),
recommended,
)
structured_data = {
"primary_recommendation": {
"fertilizer_code": _slug_value(formula),
"fertilizer_name": _first_text(primary_advisory.get("fertilizer_name"), _fertilizer_display_name(formula)),
"display_title": _first_text(primary_advisory.get("display_title"), _fertilizer_display_name(formula)),
"fertilizer_type": _first_text(primary_advisory.get("fertilizer_type"), _fertilizer_type_label(formula)),
"npk_ratio": npk_ratio,
"application_method": {
"id": _method_id(application_method_label),
"label": application_method_label,
},
"application_interval": {
"value": interval_days,
"unit": "day",
"label": f"هر {interval_days} روز",
},
"dosage": {
"base_amount_per_hectare": amount_kg_per_ha,
"base_amount_per_square_meter": amount_per_square_meter,
"unit": "kg",
"label": f"{_normalize_label(amount_kg_per_ha)} کیلوگرم در هکتار",
"calculation_basis": optimizer_payload.get("engine", "product"),
},
"reasoning": reasoning,
"summary": summary,
},
"nutrient_analysis": nutrient_analysis,
"application_guide": application_guide,
"alternative_recommendations": alternatives,
}
structured_data["sections"] = _build_legacy_sections(structured_data, recommended)
return {"status": normalized_llm.get("status") or "success", "data": structured_data}
def _validate_fertilization_response(parsed_result: dict[str, Any]) -> dict[str, Any]:
if not isinstance(parsed_result, dict):
raise ValueError("Fertilization recommendation response is not a JSON object.")
data = parsed_result.get("data")
if not isinstance(data, dict):
raise ValueError("Fertilization recommendation response is missing data.")
if not isinstance(data.get("primary_recommendation"), dict):
raise ValueError("Fertilization recommendation response is missing primary_recommendation.")
return parsed_result
def get_fertilization_recommendation(
farm_uuid: str | None = None,
plant_name: str | None = None,
growth_stage: str | None = None,
crop_id: str | None = None,
query: str | None = None,
config: RAGConfig | None = None,
limit: int = 8,
sensor_uuid: str | None = None,
) -> dict[str, Any]:
"""
توصیه کودهی برای یک مزرعه.
از RAG با پایگاه دانش fertilization استفاده می کند و خروجی نهایی را با optimizer ترکیب می کند.
"""
cfg = config or load_rag_config()
service = get_service_config(SERVICE_ID, cfg)
service_cfg = RAGConfig(
embedding=cfg.embedding,
qdrant=cfg.qdrant,
chunking=cfg.chunking,
llm=service.llm,
knowledge_bases=cfg.knowledge_bases,
services=cfg.services,
chromadb=cfg.chromadb,
)
client = get_chat_client(service_cfg)
model = service.llm.model
resolved_farm_uuid = str(farm_uuid or sensor_uuid or "").strip()
if not resolved_farm_uuid:
raise ValueError("farm_uuid is required.")
user_query = query or "توصیه کودهی بهینه برای مزرعه من چیست؟"
sensor = (
SensorData.objects.select_related("center_location")
.prefetch_related("plants")
.filter(farm_uuid=resolved_farm_uuid)
.first()
)
plant_config = apps.get_app_config("plant")
resolved_plant_name = plant_config.resolve_plant_name(plant_name)
if not resolved_plant_name and crop_id:
resolved_plant_name = plant_config.resolve_plant_name(crop_id)
resolved_growth_stage = plant_config.resolve_growth_stage(growth_stage)
plant = None
if not resolved_plant_name and sensor is not None:
plant = sensor.plants.first()
if plant is not None:
resolved_plant_name = plant.name
elif resolved_plant_name:
if sensor is not None:
plant = sensor.plants.filter(name=resolved_plant_name).first()
if plant is None:
Plant = apps.get_model("plant", "Plant")
plant = Plant.objects.filter(name=resolved_plant_name).first()
if plant is None and sensor is not None:
plant = sensor.plants.first()
if plant is not None:
resolved_plant_name = plant.name
forecasts = []
optimized_result = None
if sensor is not None and getattr(sensor, "center_location", None) is not None:
from weather.models import WeatherForecast
forecasts = list(
WeatherForecast.objects.filter(
location=sensor.center_location,
forecast_date__isnull=False,
).order_by("forecast_date")[:7]
)
if sensor is not None and plant is not None:
optimized_result = _get_optimizer().optimize_fertilization(
sensor=sensor,
plant=plant,
forecasts=forecasts,
growth_stage=resolved_growth_stage,
)
context = build_rag_context(
user_query,
resolved_farm_uuid,
config=cfg,
limit=limit,
kb_name=KB_NAME,
service_id=SERVICE_ID,
)
extra_parts: list[str] = []
if resolved_plant_name and resolved_growth_stage:
plant_text = build_plant_text(resolved_plant_name, resolved_growth_stage)
if plant_text:
extra_parts.append("[اطلاعات گیاه]\n" + plant_text)
if optimized_result is not None:
extra_parts.append("[خروجی بهینه ساز شبیه سازی]\n" + optimized_result["context_text"])
if extra_parts:
context = "\n\n---\n\n".join(extra_parts) + ("\n\n---\n\n" + context if context else "")
tone = _load_service_tone(service, cfg)
system_parts = [tone] if tone else []
if service.system_prompt:
system_parts.append(service.system_prompt)
system_parts.append(DEFAULT_FERTILIZATION_PROMPT)
if context:
system_parts.append("\n\n" + context)
system_content = "\n".join(system_parts)
messages = [
{"role": "system", "content": system_content},
{"role": "user", "content": user_query},
]
audit_log = _create_audit_log(
farm_uuid=resolved_farm_uuid,
service_id=SERVICE_ID,
model=model,
query=user_query,
system_prompt=system_content,
messages=messages,
)
try:
response = client.chat.completions.create(
model=model,
messages=messages,
)
raw = response.choices[0].message.content.strip()
except Exception as exc:
logger.error("Fertilization recommendation error for %s: %s", resolved_farm_uuid, exc)
_fail_audit_log(audit_log, str(exc))
raise RuntimeError(
f"Fertilization recommendation failed for farm {resolved_farm_uuid}."
) from exc
llm_payload = _clean_json_response(raw)
result = _build_final_response(
llm_payload=llm_payload,
optimized_result=optimized_result,
plant_name=resolved_plant_name,
crop_id=crop_id,
growth_stage=resolved_growth_stage,
forecasts=forecasts,
)
result = _validate_fertilization_response(result)
result["raw_response"] = raw
result["simulation_optimizer"] = optimized_result
result["sections"] = result["data"].get("sections", [])
_complete_audit_log(
audit_log,
json.dumps(result, ensure_ascii=False, default=str),
)
return result