Files
Logic/Modules/Ai/crop_simulation/recommendation_optimizer.py
2026-05-11 03:27:21 +03:30

802 lines
33 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from statistics import mean
from typing import Any
from django.apps import apps
from location_data.satellite_snapshot import build_location_satellite_snapshot
from crop_simulation.services import CropSimulationService
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except (TypeError, ValueError):
return default
def _mm_to_cm_day(value: Any, default: float) -> float:
scaled = _safe_float(value, default * 10.0) / 10.0
return round(max(scaled, 0.0), 4)
def _clamp(value: float, lower: float, upper: float) -> float:
return max(lower, min(value, upper))
def _stage_key(growth_stage: str | None) -> str:
text = (growth_stage or "").strip().lower()
if any(token in text for token in ("flower", "گل", "anthesis")):
return "flowering"
if any(token in text for token in ("fruit", "میوه", "برداشت", "ripen", "harvest")):
return "fruiting"
if any(token in text for token in ("initial", "seed", "جوانه", "نشا", "استقرار")):
return "initial"
return "vegetative"
def _first_not_none(*values: Any) -> Any:
for value in values:
if value is not None:
return value
return None
def _sensor_metric(sensor: Any, metric: str) -> float | None:
if sensor is None:
return None
if hasattr(sensor, metric):
value = getattr(sensor, metric)
return _safe_float(value, default=0.0) if value is not None else None
payload = getattr(sensor, "sensor_payload", None) or {}
if not isinstance(payload, dict):
return None
for block in payload.values():
if isinstance(block, dict) and block.get(metric) is not None:
return _safe_float(block.get(metric), default=0.0)
return None
def _parse_temperature_range(plant: Any) -> tuple[float, float]:
raw = (getattr(plant, "temperature", "") or "").replace("تا", "-")
digits = []
current = ""
for char in raw:
if char.isdigit() or char in ".-":
current += char
continue
if current:
digits.append(current)
current = ""
if current:
digits.append(current)
if len(digits) >= 2:
low = _safe_float(digits[0], 12.0)
high = _safe_float(digits[1], 28.0)
if low < high:
return low, high
return 14.0, 30.0
def _mean_forecast_value(forecasts: list[Any], attr: str, fallback: float = 0.0) -> float:
values = [_safe_float(getattr(item, attr, None), default=fallback) for item in forecasts]
return round(mean(values), 3) if values else fallback
def _next_rain_date(forecasts: list[Any], threshold_mm: float) -> str | None:
for forecast in forecasts:
if _safe_float(getattr(forecast, "precipitation", None), 0.0) >= threshold_mm:
return forecast.forecast_date.isoformat()
return None
def _best_timing(avg_temp: float, avg_wind: float) -> str:
if avg_temp >= 30 or avg_wind >= 18:
return "اوایل صبح"
if avg_temp <= 18:
return "اواخر صبح"
return "اوایل صبح یا نزدیک غروب"
def _build_weather_records(forecasts: list[Any], *, latitude: float, longitude: float) -> list[dict[str, Any]]:
records = []
for forecast in forecasts:
tmin = _safe_float(
_first_not_none(getattr(forecast, "temperature_min", None), getattr(forecast, "temperature_mean", None)),
12.0,
)
tmax = _safe_float(
_first_not_none(getattr(forecast, "temperature_max", None), getattr(forecast, "temperature_mean", None)),
24.0,
)
humidity = _safe_float(getattr(forecast, "humidity_mean", None), 55.0)
vap = max(6.0, round((humidity / 100.0) * 20.0, 3))
wind_kmh = _safe_float(getattr(forecast, "wind_speed_max", None), 7.2)
wind_ms = round(wind_kmh / 3.6, 3)
et0 = _mm_to_cm_day(getattr(forecast, "et0", None), 0.35)
records.append(
{
"DAY": forecast.forecast_date,
"LAT": latitude,
"LON": longitude,
"ELEV": 1200.0,
"IRRAD": 16_000_000.0,
"TMIN": tmin,
"TMAX": tmax,
"VAP": vap,
"WIND": wind_ms,
"RAIN": _mm_to_cm_day(getattr(forecast, "precipitation", None), 0.0),
"E0": et0,
"ES0": max(et0 * 0.9, 0.1),
"ET0": et0,
}
)
return records
def _build_soil_parameters(sensor: Any) -> tuple[dict[str, Any], dict[str, Any]]:
moisture_pct = _sensor_metric(sensor, "soil_moisture")
center_location = getattr(sensor, "center_location", None)
satellite_metrics = (
build_location_satellite_snapshot(center_location).get("resolved_metrics") or {}
if center_location is not None
else {}
)
ndwi = _safe_float(satellite_metrics.get("ndwi"), 0.34)
wv0033 = ndwi if ndwi > 0 else 0.34
wv1500 = max(round(wv0033 * 0.45, 3), 0.14)
smfcf = _clamp(wv0033 if wv0033 > 0 else 0.34, 0.2, 0.55)
smw = _clamp(wv1500 if wv1500 > 0 else 0.12, 0.05, smfcf - 0.02)
if moisture_pct is not None:
wav = round(_clamp(moisture_pct / 100.0, smw, smfcf) * 100.0, 3)
else:
wav = round(((smfcf + smw) / 2.0) * 100.0, 3)
soil = {
"SMFCF": round(smfcf, 3),
"SMW": round(smw, 3),
"RDMSOL": 120.0,
}
site = {"WAV": wav}
return soil, site
def _build_crop_parameters(plant: Any, growth_stage: str | None) -> tuple[dict[str, Any], list[dict[str, Any]]] | None:
profiles = []
for attr in ("growth_profile", "irrigation_profile", "health_profile"):
profile = getattr(plant, attr, None) or {}
if isinstance(profile, dict):
profiles.append(profile)
simulation_block = None
for profile in profiles:
candidate = profile.get("simulation")
if isinstance(candidate, dict):
simulation_block = candidate
break
if not simulation_block:
return None
crop_parameters = simulation_block.get("crop_parameters")
agromanagement = simulation_block.get("agromanagement")
if not isinstance(crop_parameters, dict) or not agromanagement:
return None
enriched_crop = dict(crop_parameters)
enriched_crop.setdefault("crop_name", getattr(plant, "name", "crop"))
if growth_stage:
enriched_crop.setdefault("growth_stage", growth_stage)
return enriched_crop, agromanagement
def _event_dates_for_frequency(forecasts: list[Any], count: int) -> list[str]:
if not forecasts:
return []
ranked = sorted(
forecasts,
key=lambda item: (
_safe_float(getattr(item, "et0", None), 0.0)
+ _safe_float(getattr(item, "temperature_max", None), 0.0) / 10.0
- _safe_float(getattr(item, "precipitation", None), 0.0)
),
reverse=True,
)
selected = sorted(ranked[:count], key=lambda item: item.forecast_date)
return [item.forecast_date.isoformat() for item in selected]
def _irrigation_context_text(result: dict[str, Any]) -> str:
recommended = result["recommended_strategy"]
alternative_lines = [
f"- {item['label']}: امتیاز {item['score']}, آب کل {item['total_irrigation_mm']} mm"
for item in result.get("alternatives", [])
]
lines = [
f"engine: {result['engine']}",
f"استراتژی منتخب: {recommended['label']}",
f"امتیاز شبیه سازی: {recommended['score']}",
f"شاخص عملکرد مورد انتظار: {recommended['expected_yield_index']}",
f"آب کل پیشنهادی: {recommended['total_irrigation_mm']} mm",
f"مقدار هر نوبت: {recommended['amount_per_event_mm']} mm",
f"تعداد نوبت: {recommended['events']}",
f"تقویم اجرای پیشنهادی: {', '.join(recommended['event_dates']) or 'نامشخص'}",
f"زمان انجام: {recommended['timing']}",
f"رطوبت هدف خاک: {recommended['moisture_target_percent']}%",
f"اعتبار: {recommended['validity_period']}",
"دلایل اصلی:",
*[f"- {item}" for item in recommended["reasoning"]],
]
if alternative_lines:
lines.extend(["گزینه های جایگزین:", *alternative_lines])
return "\n".join(lines)
def _fertilization_context_text(result: dict[str, Any]) -> str:
recommended = result["recommended_strategy"]
alternative_lines = [
f"- {item['label']}: امتیاز {item['score']}, دوز {item['amount_kg_per_ha']} kg/ha"
for item in result.get("alternatives", [])
]
lines = [
f"engine: {result['engine']}",
f"استراتژی منتخب: {recommended['label']}",
f"امتیاز شبیه سازی: {recommended['score']}",
f"شاخص عملکرد مورد انتظار: {recommended['expected_yield_index']}",
f"نوع کود: {recommended['fertilizer_type']}",
f"مقدار مصرف: {recommended['amount_kg_per_ha']} kg/ha",
f"روش مصرف: {recommended['application_method']}",
f"زمان مصرف: {recommended['timing']}",
f"اعتبار: {recommended['validity_period']}",
"دلایل اصلی:",
*[f"- {item}" for item in recommended["reasoning"]],
]
if alternative_lines:
lines.extend(["گزینه های جایگزین:", *alternative_lines])
return "\n".join(lines)
@dataclass
class StrategyResult:
code: str
label: str
score: float
expected_yield_index: float
payload: dict[str, Any]
reasoning: list[str]
class SimulationRecommendationOptimizer:
"""بهینه ساز توصیه های آبیاری و کودهی داخل اپ crop_simulation."""
def __init__(self):
self.simulation_service = CropSimulationService()
def optimize_irrigation(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
daily_water_needs: list[dict[str, Any]],
growth_stage: str | None,
irrigation_method: Any | None,
) -> dict[str, Any] | None:
if sensor is None or plant is None or not forecasts:
return None
crop_blueprint = _build_crop_parameters(plant, growth_stage)
if crop_blueprint:
pcse_result = self._optimize_irrigation_with_pcse(
sensor=sensor,
plant=plant,
forecasts=forecasts,
daily_water_needs=daily_water_needs,
growth_stage=growth_stage,
crop_blueprint=crop_blueprint,
)
if pcse_result is not None:
return pcse_result
return self._optimize_irrigation_with_heuristic(
sensor=sensor,
plant=plant,
forecasts=forecasts,
daily_water_needs=daily_water_needs,
growth_stage=growth_stage,
irrigation_method=irrigation_method,
)
def optimize_fertilization(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
growth_stage: str | None,
) -> dict[str, Any] | None:
if sensor is None or plant is None:
return None
crop_blueprint = _build_crop_parameters(plant, growth_stage)
if crop_blueprint and forecasts:
pcse_result = self._optimize_fertilization_with_pcse(
sensor=sensor,
plant=plant,
forecasts=forecasts,
growth_stage=growth_stage,
crop_blueprint=crop_blueprint,
)
if pcse_result is not None:
return pcse_result
return self._optimize_fertilization_with_heuristic(
sensor=sensor,
plant=plant,
forecasts=forecasts,
growth_stage=growth_stage,
)
def _optimize_irrigation_with_pcse(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
daily_water_needs: list[dict[str, Any]],
growth_stage: str | None,
crop_blueprint: tuple[dict[str, Any], list[dict[str, Any]]],
) -> dict[str, Any] | None:
defaults = apps.get_app_config("irrigation").get_optimizer_defaults()
crop_parameters, agromanagement = crop_blueprint
soil, site = _build_soil_parameters(sensor)
weather = _build_weather_records(
forecasts,
latitude=_safe_float(sensor.center_location.latitude),
longitude=_safe_float(sensor.center_location.longitude),
)
total_mm = round(sum(_safe_float(item.get("gross_irrigation_mm"), 0.0) for item in daily_water_needs), 3)
if total_mm <= 0:
return None
strategies = []
for spec in defaults["strategy_profiles"]:
irrigation_events = []
event_dates = _event_dates_for_frequency(forecasts, max(1, spec["event_count"]))
amount_per_event = round((total_mm * spec["multiplier"]) / max(len(event_dates), 1), 3)
for day in event_dates:
irrigation_events.append({"date": day, "amount": amount_per_event})
try:
result = self.simulation_service.run_single_simulation(
farm_uuid=str(sensor.farm_uuid),
plant_name=getattr(plant, "name", None),
weather=weather,
crop_parameters=crop_parameters,
agromanagement=agromanagement,
soil=soil,
site_parameters=site,
irrigation_recommendation={"events": irrigation_events},
name=f"irrigation-{spec['code']}",
)
except Exception:
return None
yield_estimate = _safe_float(
result.get("result", {}).get("metrics", {}).get("yield_estimate"),
0.0,
)
score = round(_clamp((yield_estimate / 100.0), 0.0, 100.0), 2)
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=round(score, 2),
payload={
"events": len(event_dates),
"event_dates": event_dates,
"amount_per_event_mm": amount_per_event,
"total_irrigation_mm": round(amount_per_event * len(event_dates), 3),
"timing": _best_timing(
_mean_forecast_value(forecasts, "temperature_mean", 22.0),
_mean_forecast_value(forecasts, "wind_speed_max", 8.0),
),
},
reasoning=[
"امتیاز بر اساس بیشترین عملکرد شبیه سازی شده انتخاب شد.",
f"عملکرد نسبی این سناریو {round(score, 2)} ارزیابی شد.",
],
)
)
best = max(strategies, key=lambda item: item.score)
moisture_target = defaults["stage_targets"].get(_stage_key(growth_stage), defaults["stage_targets"]["vegetative"])
result = {
"engine": "pcse",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"total_irrigation_mm": best.payload["total_irrigation_mm"],
"amount_per_event_mm": best.payload["amount_per_event_mm"],
"events": best.payload["events"],
"frequency_per_week": best.payload["events"],
"event_dates": best.payload["event_dates"],
"timing": best.payload["timing"],
"moisture_target_percent": moisture_target,
"validity_period": f"معتبر برای {defaults['validity_days']} روز آینده",
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"total_irrigation_mm": item.payload["total_irrigation_mm"],
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
}
result["context_text"] = _irrigation_context_text(result)
return result
def _optimize_irrigation_with_heuristic(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
daily_water_needs: list[dict[str, Any]],
growth_stage: str | None,
irrigation_method: Any | None,
) -> dict[str, Any]:
defaults = apps.get_app_config("irrigation").get_optimizer_defaults()
stage_key = _stage_key(growth_stage)
moisture_target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
total_mm = round(sum(_safe_float(item.get("gross_irrigation_mm"), 0.0) for item in daily_water_needs), 3)
non_zero_days = [item for item in daily_water_needs if _safe_float(item.get("gross_irrigation_mm"), 0.0) > 0]
average_temp = _mean_forecast_value(forecasts, "temperature_mean", 22.0)
average_wind = _mean_forecast_value(forecasts, "wind_speed_max", 8.0)
heat_risk = _mean_forecast_value(forecasts, "temperature_max", 28.0) >= 32.0
rain_date = _next_rain_date(forecasts, defaults["significant_rain_threshold_mm"])
efficiency = _safe_float(getattr(irrigation_method, "water_efficiency_percent", None), 75.0)
soil_moisture = _sensor_metric(sensor, "soil_moisture")
strategies: list[StrategyResult] = []
for spec in defaults["strategy_profiles"]:
event_count = max(1, min(7, round(max(len(non_zero_days), 1) * spec["frequency_factor"])))
applied_total = round(max(total_mm * spec["multiplier"], 0.0), 3)
amount_per_event = round(max(applied_total / event_count, defaults["minimum_event_mm"]), 3)
water_penalty = abs(applied_total - total_mm) * 2.4
if total_mm <= 0:
water_penalty = 0.0 if spec["code"] == "conservative" else 12.0
soil_penalty = 0.0
if soil_moisture is not None:
if soil_moisture < 25 and spec["code"] == "conservative":
soil_penalty += 8.0
if soil_moisture > 55 and spec["code"] == "protective":
soil_penalty += 7.0
climate_bonus = 0.0
if heat_risk and spec["code"] == "protective":
climate_bonus += 6.0
if rain_date and spec["code"] == "protective":
climate_bonus -= 8.0
if efficiency >= 85 and spec["code"] == "balanced":
climate_bonus += 4.0
score = round(_clamp(100.0 - water_penalty - soil_penalty + climate_bonus, 35.0, 96.0), 2)
event_dates = _event_dates_for_frequency(forecasts, event_count)
reasoning = [
f"نیاز آبی محاسبه شده برای بازه پیش رو حدود {total_mm} میلی متر است.",
f"این سناریو {applied_total} میلی متر آب را در {event_count} نوبت پخش می کند.",
]
if heat_risk:
reasoning.append("به خاطر دمای بالاتر از حد مطلوب، تنش گرمایی در امتیازدهی لحاظ شده است.")
if rain_date:
reasoning.append(f"بارش معنی دار از تاریخ {rain_date} احتمال کاهش نیاز آبی را بالا می برد.")
if soil_moisture is not None:
reasoning.append(f"رطوبت فعلی خاک حدود {round(soil_moisture, 1)} درصد در نظر گرفته شده است.")
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=round(52.0 + (score * 0.48), 2),
payload={
"events": event_count,
"amount_per_event_mm": amount_per_event,
"total_irrigation_mm": applied_total,
"event_dates": event_dates,
"timing": _best_timing(average_temp, average_wind),
},
reasoning=reasoning,
)
)
best = max(strategies, key=lambda item: item.score)
validity_period = f"معتبر برای {defaults['validity_days']} روز آینده"
if rain_date:
validity_period = f"معتبر تا قبل از بارش موثر پیش بینی شده در {rain_date}"
result = {
"engine": "crop_simulation_heuristic",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"total_irrigation_mm": best.payload["total_irrigation_mm"],
"amount_per_event_mm": best.payload["amount_per_event_mm"],
"events": best.payload["events"],
"frequency_per_week": min(best.payload["events"] + 1, 7),
"event_dates": best.payload["event_dates"],
"timing": best.payload["timing"],
"moisture_target_percent": moisture_target,
"validity_period": validity_period,
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"total_irrigation_mm": item.payload["total_irrigation_mm"],
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
}
result["context_text"] = _irrigation_context_text(result)
return result
def _optimize_fertilization_with_pcse(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
growth_stage: str | None,
crop_blueprint: tuple[dict[str, Any], list[dict[str, Any]]],
) -> dict[str, Any] | None:
defaults = apps.get_app_config("fertilization").get_optimizer_defaults()
crop_parameters, agromanagement = crop_blueprint
soil, site = _build_soil_parameters(sensor)
weather = _build_weather_records(
forecasts,
latitude=_safe_float(sensor.center_location.latitude),
longitude=_safe_float(sensor.center_location.longitude),
)
stage_key = _stage_key(growth_stage)
target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
base_n = max(target["n"], 20)
strategies = []
for spec in defaults["strategy_profiles"]:
n_amount = round(base_n * spec["multiplier"], 3)
fertilizer_formula = spec["formula_override"] or target["formula"]
strategy_agromanagement = [
{
key: {
**value,
"TimedEvents": [
{
"event_signal": "apply_n",
"name": spec["label"],
"events_table": [
{
forecasts[0].forecast_date: {
"N_amount": n_amount,
"N_recovery": 0.7,
}
}
],
}
],
}
}
for entry in agromanagement
for key, value in entry.items()
] or agromanagement
try:
result = self.simulation_service.run_single_simulation(
farm_uuid=str(sensor.farm_uuid),
plant_name=getattr(plant, "name", None),
weather=weather,
crop_parameters=crop_parameters,
agromanagement=strategy_agromanagement,
soil=soil,
site_parameters=site,
name=f"fertilization-{spec['code']}",
)
except Exception:
return None
yield_estimate = _safe_float(
result.get("result", {}).get("metrics", {}).get("yield_estimate"),
0.0,
)
score = round(_clamp((yield_estimate / 100.0), 0.0, 100.0), 2)
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=score,
payload={
"amount_kg_per_ha": round(n_amount * 1.6, 3),
"fertilizer_type": fertilizer_formula,
"application_method": target["application_method"],
"timing": target["timing"],
},
reasoning=[
"سناریو برتر با بیشترین عملکرد شبیه سازی شده انتخاب شد.",
f"فرمول هدف برای این مرحله {target['formula']} در نظر گرفته شد.",
],
)
)
best = max(strategies, key=lambda item: item.score)
result = {
"engine": "pcse",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"fertilizer_type": best.payload["fertilizer_type"],
"amount_kg_per_ha": best.payload["amount_kg_per_ha"],
"application_method": best.payload["application_method"],
"timing": best.payload["timing"],
"validity_period": f"معتبر برای {defaults['validity_days']} روز آینده",
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"fertilizer_type": item.payload["fertilizer_type"],
"amount_kg_per_ha": item.payload["amount_kg_per_ha"],
"application_method": item.payload["application_method"],
"timing": item.payload["timing"],
"reasoning": item.reasoning,
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
}
result["context_text"] = _fertilization_context_text(result)
return result
def _optimize_fertilization_with_heuristic(
self,
*,
sensor: Any,
plant: Any,
forecasts: list[Any],
growth_stage: str | None,
) -> dict[str, Any]:
defaults = apps.get_app_config("fertilization").get_optimizer_defaults()
stage_key = _stage_key(growth_stage)
target = defaults["stage_targets"].get(stage_key, defaults["stage_targets"]["vegetative"])
current_n = _sensor_metric(sensor, "nitrogen")
current_p = _sensor_metric(sensor, "phosphorus")
current_k = _sensor_metric(sensor, "potassium")
current_ph = _sensor_metric(sensor, "soil_ph")
deficits = {
"n": max(target["n"] - _safe_float(current_n, target["n"] * 0.6), 0.0),
"p": max(target["p"] - _safe_float(current_p, target["p"] * 0.6), 0.0),
"k": max(target["k"] - _safe_float(current_k, target["k"] * 0.6), 0.0),
}
dominant = max(deficits, key=deficits.get)
severity = sum(deficits.values())
next_rain = _next_rain_date(forecasts, defaults["rain_delay_threshold_mm"]) if forecasts else None
avg_temp = _mean_forecast_value(forecasts, "temperature_mean", 22.0)
strategies: list[StrategyResult] = []
for spec in defaults["strategy_profiles"]:
base_amount = max(30.0, min(120.0, 35.0 + (severity * 1.4)))
amount = round(base_amount * spec["multiplier"], 2)
mismatch_penalty = 0.0
if dominant == "n" and "ازت" not in spec["focus"]:
mismatch_penalty += 12.0
if dominant == "k" and "پتاس" not in spec["focus"]:
mismatch_penalty += 12.0
if dominant == "p" and "فسفر" not in spec["focus"]:
mismatch_penalty += 12.0
if current_ph is not None and current_ph > 7.8 and "فسفر" in spec["focus"]:
mismatch_penalty += 8.0
if next_rain and spec["application_method"] == "محلول پاشی":
mismatch_penalty += 10.0
score = round(_clamp(96.0 - mismatch_penalty - abs(spec["multiplier"] - 1.0) * 18.0, 42.0, 95.0), 2)
reasoning = [
f"کسری عناصر برای این مرحله با فرمول هدف {target['formula']} سنجیده شد.",
f"بیشترین کمبود نسبی مربوط به عنصر {dominant.upper()} است.",
f"دوز پیشنهادی این سناریو {amount} کیلوگرم در هکتار برآورد شد.",
]
if current_ph is not None:
reasoning.append(f"pH فعلی خاک حدود {round(current_ph, 2)} در تصمیم گیری لحاظ شد.")
if next_rain:
reasoning.append(f"به دلیل بارش موثر نزدیک در {next_rain} از مصرف سطحی پرریسک اجتناب شده است.")
strategies.append(
StrategyResult(
code=spec["code"],
label=spec["label"],
score=score,
expected_yield_index=round(50.0 + (score * 0.5), 2),
payload={
"fertilizer_type": spec["formula_override"] or target["formula"],
"amount_kg_per_ha": amount,
"application_method": spec["application_method"],
"timing": target["timing"] if avg_temp < 30 else "صبح زود یا نزدیک غروب",
},
reasoning=reasoning,
)
)
best = max(strategies, key=lambda item: item.score)
validity_period = f"معتبر برای {defaults['validity_days']} روز آینده"
if stage_key == "flowering":
validity_period = "معتبر تا پایان پنجره گلدهی فعلی و حداکثر 5 روز آینده"
result = {
"engine": "crop_simulation_heuristic",
"recommended_strategy": {
"code": best.code,
"label": best.label,
"score": best.score,
"expected_yield_index": best.expected_yield_index,
"fertilizer_type": best.payload["fertilizer_type"],
"amount_kg_per_ha": best.payload["amount_kg_per_ha"],
"application_method": best.payload["application_method"],
"timing": best.payload["timing"],
"validity_period": validity_period,
"reasoning": best.reasoning,
},
"alternatives": [
{
"code": item.code,
"label": item.label,
"score": item.score,
"expected_yield_index": item.expected_yield_index,
"fertilizer_type": item.payload["fertilizer_type"],
"amount_kg_per_ha": item.payload["amount_kg_per_ha"],
"application_method": item.payload["application_method"],
"timing": item.payload["timing"],
"reasoning": item.reasoning,
}
for item in sorted(strategies, key=lambda value: value.score, reverse=True)
if item.code != best.code
],
"nutrient_status": {
"nitrogen": current_n,
"phosphorus": current_p,
"potassium": current_k,
"soil_ph": current_ph,
"dominant_gap": dominant,
},
}
result["context_text"] = _fertilization_context_text(result)
return result