Files

1372 lines
51 KiB
Python
Raw Permalink Normal View History

2026-04-24 17:40:25 +03:30
from __future__ import annotations
import importlib
2026-04-24 22:20:15 +03:30
from copy import deepcopy
2026-04-24 17:40:25 +03:30
from dataclasses import dataclass
2026-04-30 00:53:47 +03:30
from datetime import date, datetime, timedelta
2026-04-24 17:40:25 +03:30
from typing import Any
from django.db import transaction
2026-05-13 16:45:54 +03:30
from farm_data.services import build_ai_farm_snapshot, get_ai_snapshot_weather
2026-04-24 17:40:25 +03:30
from .models import SimulationRun, SimulationScenario
DEFAULT_OUTPUT_VARS = ["DVS", "LAI", "TAGP", "TWSO", "SM"]
DEFAULT_SUMMARY_VARS = ["TAGP", "TWSO", "CTRAT", "RD"]
DEFAULT_TERMINAL_VARS = ["TAGP", "TWSO", "LAI", "DVS"]
2026-04-24 22:20:15 +03:30
DEFAULT_PCSE_MODEL_NAME = "Wofost81_NWLP_CWB_CNB"
DEFAULT_NAVAILI = 35.0
DEFAULT_WAV = 40.0
2026-04-24 17:40:25 +03:30
class CropSimulationError(Exception):
pass
def _json_ready(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _json_ready(item) for key, item in value.items()}
if isinstance(value, list):
return [_json_ready(item) for item in value]
if isinstance(value, tuple):
return [_json_ready(item) for item in value]
if isinstance(value, (date, datetime)):
return value.isoformat()
return value
def _coerce_date(value: Any) -> date:
if isinstance(value, date) and not isinstance(value, datetime):
return value
if isinstance(value, datetime):
return value.date()
if isinstance(value, str):
return date.fromisoformat(value)
raise CropSimulationError(f"Unsupported date value: {value!r}")
def _normalize_weather_records(weather: Any) -> list[dict[str, Any]]:
if isinstance(weather, dict):
if "records" in weather:
records = weather["records"]
else:
records = [weather]
else:
records = weather
if not isinstance(records, list) or not records:
raise CropSimulationError("Weather input must contain at least one record.")
normalized = []
for raw in records:
if not isinstance(raw, dict):
raise CropSimulationError("Weather records must be dictionaries.")
current_date = _coerce_date(raw.get("DAY") or raw.get("day"))
normalized.append(
{
"DAY": current_date,
"LAT": float(raw.get("LAT", raw.get("lat", 0.0))),
"LON": float(raw.get("LON", raw.get("lon", 0.0))),
"ELEV": float(raw.get("ELEV", raw.get("elev", 0.0))),
"IRRAD": float(raw.get("IRRAD", raw.get("irrad", 15_000_000.0))),
"TMIN": float(raw.get("TMIN", raw.get("tmin", 10.0))),
"TMAX": float(raw.get("TMAX", raw.get("tmax", 20.0))),
"VAP": float(raw.get("VAP", raw.get("vap", 12.0))),
"WIND": float(raw.get("WIND", raw.get("wind", 2.0))),
"RAIN": float(raw.get("RAIN", raw.get("rain", 0.0))),
"E0": float(raw.get("E0", raw.get("e0", 0.35))),
"ES0": float(raw.get("ES0", raw.get("es0", 0.3))),
"ET0": float(raw.get("ET0", raw.get("et0", 0.32))),
}
)
return normalized
2026-04-30 00:53:47 +03:30
def _mm_to_cm_day(value: Any, default: float) -> float:
scaled = _safe_float(value, default * 10.0) / 10.0
return round(max(scaled, 0.0), 4)
2026-04-24 17:40:25 +03:30
def _normalize_agromanagement(agromanagement: Any) -> list[dict[str, Any]]:
if isinstance(agromanagement, dict) and "AgroManagement" in agromanagement:
campaigns = agromanagement["AgroManagement"]
elif isinstance(agromanagement, list):
campaigns = agromanagement
elif isinstance(agromanagement, dict):
campaigns = [agromanagement]
else:
raise CropSimulationError("Agromanagement input must be a dict or list.")
if not campaigns:
raise CropSimulationError("Agromanagement input cannot be empty.")
2026-04-30 00:53:47 +03:30
return _ensure_trailing_empty_campaign(campaigns)
def _ensure_trailing_empty_campaign(campaigns: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized = list(campaigns)
if not normalized:
return normalized
last_campaign = normalized[-1]
if _is_explicit_empty_campaign(last_campaign):
return normalized
trailing = _build_trailing_empty_campaign(normalized)
if last_campaign == {}:
normalized[-1] = trailing
else:
normalized.append(trailing)
return normalized
def _is_explicit_empty_campaign(campaign: dict[str, Any]) -> bool:
if not isinstance(campaign, dict) or len(campaign) != 1:
return False
start_date, payload = next(iter(campaign.items()))
return isinstance(start_date, date) and payload is None
def _build_trailing_empty_campaign(campaigns: list[dict[str, Any]]) -> dict[date, None]:
last_campaign = next((item for item in reversed(campaigns) if isinstance(item, dict) and item), None)
if not last_campaign:
return {date.today(): None}
campaign_start, campaign_payload = next(iter(last_campaign.items()))
candidate_dates = [_coerce_date(campaign_start)]
if isinstance(campaign_payload, dict):
crop_calendar = campaign_payload.get("CropCalendar") or {}
for field_name in ("crop_end_date", "crop_start_date"):
value = crop_calendar.get(field_name)
if value:
candidate_dates.append(_coerce_date(value))
for bucket_name in ("TimedEvents",):
for event_group in campaign_payload.get(bucket_name, []) or []:
if not isinstance(event_group, dict):
continue
for event in event_group.get("events_table", []) or []:
if not isinstance(event, dict) or not event:
continue
event_date = next(iter(event.keys()))
candidate_dates.append(_coerce_date(event_date))
return {max(candidate_dates) + timedelta(days=1): None}
2026-04-24 17:40:25 +03:30
2026-04-24 18:34:17 +03:30
def _deep_copy_json_like(value: Any) -> Any:
if isinstance(value, dict):
return {key: _deep_copy_json_like(item) for key, item in value.items()}
if isinstance(value, list):
return [_deep_copy_json_like(item) for item in value]
return value
def _parse_recommendation_events(
recommendation: dict[str, Any] | None,
*,
event_signal: str,
amount_keys: tuple[str, ...],
extra_keys: tuple[str, ...],
) -> list[dict[str, Any]]:
if not recommendation:
return []
raw_events = recommendation.get("events")
if raw_events is None:
raw_events = recommendation.get("schedule")
if raw_events is None:
raw_events = recommendation.get("applications")
if raw_events is None:
raw_events = recommendation.get("plan")
if not isinstance(raw_events, list):
return []
events_table = []
for item in raw_events:
if not isinstance(item, dict):
continue
raw_date = item.get("date") or item.get("day")
if raw_date is None:
continue
payload = {}
amount_value = None
amount_key = None
for candidate in amount_keys:
if item.get(candidate) is not None:
amount_value = item.get(candidate)
amount_key = candidate
break
if amount_key is not None:
payload[amount_key] = float(amount_value)
for extra_key in extra_keys:
if item.get(extra_key) is not None:
payload[extra_key] = float(item[extra_key])
if payload:
events_table.append({_coerce_date(raw_date): payload})
if not events_table:
return []
return [
{
"event_signal": event_signal,
"name": recommendation.get("name", f"{event_signal} recommendation"),
"comment": recommendation.get("comment", ""),
"events_table": events_table,
}
]
def _merge_management_recommendations(
agromanagement: Any,
*,
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
campaigns = _deep_copy_json_like(_normalize_agromanagement(agromanagement))
irrigation_events = _parse_recommendation_events(
irrigation_recommendation,
event_signal="irrigate",
amount_keys=("amount", "irrigation_amount"),
extra_keys=("efficiency",),
)
fertilization_events = _parse_recommendation_events(
fertilization_recommendation,
event_signal="apply_n",
amount_keys=("N_amount", "amount"),
extra_keys=("N_recovery",),
)
if not irrigation_events and not fertilization_events:
return campaigns
target_campaign = None
for campaign in campaigns:
if isinstance(campaign, dict) and campaign:
target_campaign = campaign
break
if target_campaign is None:
raise CropSimulationError(
"Agromanagement must contain at least one non-empty campaign."
)
campaign_start = next(iter(target_campaign.keys()))
campaign_payload = target_campaign[campaign_start]
if not isinstance(campaign_payload, dict):
raise CropSimulationError("Agromanagement campaign payload must be a dictionary.")
timed_events = campaign_payload.get("TimedEvents")
if timed_events in (None, ""):
timed_events = []
if not isinstance(timed_events, list):
raise CropSimulationError("TimedEvents must be a list when recommendations are merged.")
timed_events.extend(irrigation_events)
timed_events.extend(fertilization_events)
campaign_payload["TimedEvents"] = timed_events
return campaigns
2026-04-24 17:40:25 +03:30
def _normalize_pcse_output_records(records: Any) -> list[dict[str, Any]]:
if records is None:
return []
if isinstance(records, dict):
return [records]
if isinstance(records, list):
return records
if isinstance(records, tuple):
return list(records)
return [records]
def _pick_first_not_none(*values: Any) -> Any:
for value in values:
if value is not None:
return value
return None
2026-04-24 22:20:15 +03:30
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
if value in (None, ""):
return default
return float(value)
except (TypeError, ValueError):
return default
def _clamp(value: float, lower: float, upper: float) -> float:
return max(lower, min(value, upper))
2026-05-13 16:45:54 +03:30
def _snapshot_metric(snapshot: dict[str, Any] | None, metric_name: str) -> float | None:
if not isinstance(snapshot, dict):
return None
farm_metrics = snapshot.get("farm_metrics") or {}
resolved_metrics = farm_metrics.get("resolved_metrics") if isinstance(farm_metrics, dict) else {}
if not isinstance(resolved_metrics, dict):
2026-04-24 22:20:15 +03:30
return None
2026-05-13 16:45:54 +03:30
return _safe_float(resolved_metrics.get(metric_name))
2026-04-24 22:20:15 +03:30
2026-05-13 16:45:54 +03:30
def _snapshot_source_metadata(snapshot: dict[str, Any] | None) -> dict[str, Any]:
if not isinstance(snapshot, dict):
return {}
source_metadata = snapshot.get("source_metadata") or {}
return source_metadata if isinstance(source_metadata, dict) else {}
2026-04-24 22:20:15 +03:30
def _extract_plant_simulation_profile(plant: Any | None) -> dict[str, Any] | None:
if plant is None:
return None
for attr in ("growth_profile", "irrigation_profile", "health_profile"):
profile = getattr(plant, attr, None) or {}
if not isinstance(profile, dict):
continue
simulation = profile.get("simulation")
if isinstance(simulation, dict):
return simulation
return None
def _build_default_crop_parameters(plant: Any | None, crop_name: str) -> dict[str, Any]:
profile = getattr(plant, "growth_profile", None) or {}
required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0)
return {
"crop_name": crop_name,
"TSUM1": round(required_gdd * 0.45, 3),
"TSUM2": round(required_gdd * 0.55, 3),
"YIELD_SCALE": 1.0,
"MAX_LAI": 5.0,
"MAX_BIOMASS": 12000.0,
}
def _build_default_agromanagement(crop_name: str, weather: list[dict[str, Any]]) -> list[dict[str, Any]]:
first_day = weather[0]["DAY"]
last_day = weather[-1]["DAY"]
crop_end = max(last_day, first_day + (last_day - first_day))
2026-04-30 00:53:47 +03:30
return _ensure_trailing_empty_campaign([
2026-04-24 22:20:15 +03:30
{
first_day: {
"CropCalendar": {
"crop_name": crop_name,
"variety_name": "default",
"crop_start_date": first_day,
"crop_start_type": "sowing",
"crop_end_date": crop_end,
"crop_end_type": "harvest",
"max_duration": max((crop_end - first_day).days, 1),
},
"TimedEvents": [],
"StateEvents": [],
}
}
2026-04-30 00:53:47 +03:30
])
2026-04-24 22:20:15 +03:30
def _build_weather_from_forecasts(forecasts: list[Any], *, latitude: float, longitude: float) -> list[dict[str, Any]]:
return [
{
"DAY": forecast.forecast_date,
"LAT": latitude,
"LON": longitude,
"ELEV": 1200.0,
"IRRAD": 16_000_000.0,
"TMIN": _safe_float(
_pick_first_not_none(forecast.temperature_min, forecast.temperature_mean),
12.0,
),
"TMAX": _safe_float(
_pick_first_not_none(forecast.temperature_max, forecast.temperature_mean),
24.0,
),
"VAP": max(_safe_float(forecast.humidity_mean, 55.0) / 5.0, 6.0),
"WIND": _safe_float(forecast.wind_speed_max, 7.2) / 3.6,
2026-04-30 00:53:47 +03:30
# WeatherForecast stores precipitation/ET0 in mm/day, while PCSE expects cm/day.
"RAIN": _mm_to_cm_day(forecast.precipitation, 0.0),
"E0": _mm_to_cm_day(forecast.et0, 0.35),
"ES0": max(round(_mm_to_cm_day(forecast.et0, 0.35) * 0.9, 4), 0.1),
"ET0": _mm_to_cm_day(forecast.et0, 0.35),
2026-04-24 22:20:15 +03:30
}
for forecast in forecasts
]
def _normalize_site_parameters_for_model(
model_name: str,
site_parameters: dict[str, Any] | None,
*,
soil_parameters: dict[str, Any] | None = None,
) -> dict[str, Any]:
site = dict(site_parameters or {})
soil = soil_parameters or {}
site.setdefault("WAV", _safe_float(site.get("WAV"), DEFAULT_WAV))
2026-04-30 00:53:47 +03:30
smw = _safe_float(soil.get("SMW"), 0.14)
smfcf = _safe_float(soil.get("SMFCF"), 0.34)
sm0 = _safe_float(
_pick_first_not_none(soil.get("SM0"), soil.get("SMMAX")),
min(max(smfcf + 0.08, smw + 0.12), 0.6),
)
site.setdefault("IFUNRN", 0)
site.setdefault("NOTINF", 0.0)
site.setdefault("SSI", 0.0)
site.setdefault("SSMAX", 0.0)
site.setdefault("SMLIM", round(_clamp(_safe_float(site.get("SMLIM"), smfcf), smw, sm0), 3))
2026-04-24 22:20:15 +03:30
if model_name.startswith("Wofost81_NWLP"):
navaili = _pick_first_not_none(
site.get("NAVAILI"),
site.get("navaili"),
site.get("nitrogen"),
soil.get("NAVAILI"),
soil.get("nitrogen"),
)
site["NAVAILI"] = _safe_float(navaili, DEFAULT_NAVAILI)
site.setdefault("BG_N_SUPPLY", 0.05)
site.setdefault("NSOILBASE", max(site["NAVAILI"] * 0.35, 5.0))
site.setdefault("NSOILBASE_FR", 0.02)
return site
def build_simulation_payload_from_farm(
*,
farm_uuid: str,
plant_name: str | None = None,
weather: Any | None = None,
soil: dict[str, Any] | None = None,
crop_parameters: dict[str, Any] | None = None,
agromanagement: Any | None = None,
site_parameters: dict[str, Any] | None = None,
) -> dict[str, Any]:
2026-05-05 21:02:12 +03:30
from farm_data.services import (
get_canonical_farm_record,
get_runtime_plant_for_farm,
list_runtime_plants_for_farm,
)
2026-04-24 22:20:15 +03:30
from weather.models import WeatherForecast
2026-05-05 21:02:12 +03:30
farm = get_canonical_farm_record(farm_uuid)
2026-04-24 22:20:15 +03:30
if farm is None:
raise CropSimulationError(f"Farm with uuid={farm_uuid} not found.")
2026-05-05 21:02:12 +03:30
plant = get_runtime_plant_for_farm(farm, plant_name=plant_name)
2026-05-13 16:45:54 +03:30
ai_snapshot = build_ai_farm_snapshot(str(farm_uuid))
if ai_snapshot is None:
raise CropSimulationError(f"Canonical AI snapshot for farm uuid={farm_uuid} is missing.")
2026-04-24 22:20:15 +03:30
if weather is not None:
resolved_weather = _normalize_weather_records(weather)
else:
forecasts = list(
WeatherForecast.objects.filter(location=farm.center_location)
.order_by("forecast_date")[:14]
)
if not forecasts:
raise CropSimulationError(
"Weather data for the selected farm is missing."
)
resolved_weather = _build_weather_from_forecasts(
forecasts,
latitude=float(farm.center_location.latitude),
longitude=float(farm.center_location.longitude),
)
2026-05-13 16:45:54 +03:30
ndwi = _snapshot_metric(ai_snapshot, "ndwi")
2026-05-09 16:55:06 +03:30
smfcf = _clamp(ndwi if ndwi is not None else 0.34, 0.2, 0.55)
smw = _clamp(smfcf * 0.45, 0.05, max(smfcf - 0.02, 0.06))
2026-04-30 00:53:47 +03:30
sm0 = _clamp(
2026-05-09 16:55:06 +03:30
min(max(smfcf + 0.08, smw + 0.12), 0.6),
2026-04-30 00:53:47 +03:30
max(smfcf + 0.02, smw + 0.05),
0.8,
)
2026-05-13 16:45:54 +03:30
soil_moisture = _snapshot_metric(ai_snapshot, "soil_moisture")
2026-04-24 22:20:15 +03:30
wav = (
round(_clamp((soil_moisture or DEFAULT_WAV) / 100.0, smw, smfcf) * 100.0, 3)
if soil_moisture is not None
else DEFAULT_WAV
)
2026-05-13 16:45:54 +03:30
nitrogen = _pick_first_not_none(_snapshot_metric(ai_snapshot, "nitrogen"), _snapshot_metric(ai_snapshot, "soil_vv_db"))
phosphorus = _snapshot_metric(ai_snapshot, "phosphorus")
potassium = _snapshot_metric(ai_snapshot, "potassium")
soil_ph = _pick_first_not_none(_snapshot_metric(ai_snapshot, "soil_ph"), None)
ec = _snapshot_metric(ai_snapshot, "electrical_conductivity")
2026-04-24 22:20:15 +03:30
resolved_soil = {
"SMFCF": round(smfcf, 3),
"SMW": round(smw, 3),
2026-04-30 00:53:47 +03:30
"SM0": round(sm0, 3),
2026-04-24 22:20:15 +03:30
"RDMSOL": 120.0,
2026-04-30 00:53:47 +03:30
"CRAIRC": 0.06,
"SOPE": 10.0,
"KSUB": 10.0,
2026-04-24 22:20:15 +03:30
"soil_moisture": soil_moisture,
"nitrogen": _safe_float(nitrogen, DEFAULT_NAVAILI),
"phosphorus": _safe_float(phosphorus, 0.0),
"potassium": _safe_float(potassium, 0.0),
"soil_ph": _safe_float(soil_ph, 7.0),
"electrical_conductivity": _safe_float(ec, 0.0),
2026-05-09 16:55:06 +03:30
"clay": 0.0,
"sand": 0.0,
"silt": 0.0,
"cec": 0.0,
"soc": 0.0,
2026-04-24 22:20:15 +03:30
}
if soil:
resolved_soil.update(soil)
resolved_site = {
"WAV": wav,
"NAVAILI": _safe_float(nitrogen, DEFAULT_NAVAILI),
"P_STATUS": _safe_float(phosphorus, 0.0),
"K_STATUS": _safe_float(potassium, 0.0),
"SOIL_PH": _safe_float(soil_ph, 7.0),
"EC": _safe_float(ec, 0.0),
2026-04-30 00:53:47 +03:30
"IFUNRN": 0,
"NOTINF": 0.0,
"SSI": 0.0,
"SSMAX": 0.0,
"SMLIM": round(_clamp(_safe_float(_pick_first_not_none(site_parameters and site_parameters.get("SMLIM"), smfcf), smfcf), smw, sm0), 3),
2026-04-24 22:20:15 +03:30
}
if site_parameters:
resolved_site.update(site_parameters)
simulation_profile = _extract_plant_simulation_profile(plant)
default_crop = (
deepcopy(simulation_profile.get("crop_parameters"))
if simulation_profile and isinstance(simulation_profile.get("crop_parameters"), dict)
else _build_default_crop_parameters(plant, plant_name or getattr(plant, "name", "crop"))
)
resolved_crop = default_crop
if crop_parameters:
resolved_crop.update(crop_parameters)
resolved_crop.setdefault("crop_name", plant_name or getattr(plant, "name", "crop"))
resolved_crop.setdefault("farm_uuid", str(farm_uuid))
resolved_crop.setdefault("soil_nitrogen", _safe_float(nitrogen, DEFAULT_NAVAILI))
resolved_crop.setdefault("soil_phosphorus", _safe_float(phosphorus, 0.0))
resolved_crop.setdefault("soil_potassium", _safe_float(potassium, 0.0))
2026-04-30 00:53:47 +03:30
# Keep pH in soil/site payloads only; duplicating it in cropdata breaks some PCSE parameter providers.
resolved_crop.pop("soil_ph", None)
2026-04-24 22:20:15 +03:30
default_agromanagement = (
deepcopy(simulation_profile.get("agromanagement"))
if simulation_profile and simulation_profile.get("agromanagement")
else _build_default_agromanagement(resolved_crop["crop_name"], resolved_weather)
)
resolved_agromanagement = agromanagement if agromanagement is not None else default_agromanagement
return {
"farm": farm,
2026-05-05 21:02:12 +03:30
"runtime_plants": list_runtime_plants_for_farm(farm),
2026-04-24 22:20:15 +03:30
"plant": plant,
"weather": resolved_weather,
"soil": resolved_soil,
"site_parameters": resolved_site,
"crop_parameters": resolved_crop,
"agromanagement": resolved_agromanagement,
2026-05-13 16:45:54 +03:30
"source_metadata": {
"farm_metrics": _snapshot_source_metadata(ai_snapshot).get("farm_metrics", {}),
"weather": {
"source": "center_location_forecast",
"policy": "center_location_latest_forecast",
},
},
2026-04-24 22:20:15 +03:30
}
2026-04-24 17:40:25 +03:30
def _extract_total_n(agromanagement: list[dict[str, Any]]) -> float:
total_n = 0.0
for campaign in agromanagement:
if not isinstance(campaign, dict):
continue
for payload in campaign.values():
if not isinstance(payload, dict):
continue
for bucket_name in ("TimedEvents", "StateEvents"):
for event_group in payload.get(bucket_name, []) or []:
if not isinstance(event_group, dict):
continue
for event in event_group.get("events_table", []) or []:
if not isinstance(event, dict):
continue
for event_payload in event.values():
if isinstance(event_payload, dict):
total_n += float(event_payload.get("N_amount", 0.0))
return total_n
2026-04-24 22:20:15 +03:30
def _estimate_pk_stress_factor(
*,
soil: dict[str, Any],
site: dict[str, Any],
crop: dict[str, Any],
) -> dict[str, float]:
phosphorus = _safe_float(
_pick_first_not_none(site.get("P_STATUS"), soil.get("phosphorus"), crop.get("soil_phosphorus")),
0.0,
)
potassium = _safe_float(
_pick_first_not_none(site.get("K_STATUS"), soil.get("potassium"), crop.get("soil_potassium")),
0.0,
)
soil_ph = _safe_float(
_pick_first_not_none(site.get("SOIL_PH"), soil.get("soil_ph"), crop.get("soil_ph")),
7.0,
)
ec = _safe_float(_pick_first_not_none(site.get("EC"), soil.get("electrical_conductivity")), 0.0)
phosphorus_target = _safe_float(crop.get("P_OPTIMAL"), 30.0)
potassium_target = _safe_float(crop.get("K_OPTIMAL"), 45.0)
p_factor = _clamp(phosphorus / max(phosphorus_target, 1.0), 0.45, 1.0)
k_factor = _clamp(potassium / max(potassium_target, 1.0), 0.45, 1.0)
ph_penalty = 1.0
if soil_ph < 5.8:
ph_penalty = _clamp(1.0 - ((5.8 - soil_ph) * 0.08), 0.65, 1.0)
elif soil_ph > 7.8:
ph_penalty = _clamp(1.0 - ((soil_ph - 7.8) * 0.06), 0.7, 1.0)
ec_penalty = 1.0
if ec > 2.5:
ec_penalty = _clamp(1.0 - ((ec - 2.5) * 0.07), 0.72, 1.0)
combined_factor = round(_clamp(p_factor * k_factor * ph_penalty * ec_penalty, 0.35, 1.0), 4)
return {
"phosphorus_factor": round(p_factor, 4),
"potassium_factor": round(k_factor, 4),
"ph_penalty": round(ph_penalty, 4),
"ec_penalty": round(ec_penalty, 4),
"combined_factor": combined_factor,
}
def _apply_pk_adjustment(
result: dict[str, Any],
*,
soil: dict[str, Any],
site: dict[str, Any],
crop: dict[str, Any],
) -> dict[str, Any]:
adjustment = _estimate_pk_stress_factor(soil=soil, site=site, crop=crop)
factor = adjustment["combined_factor"]
if factor >= 0.995:
result["nutrient_adjustment"] = adjustment
return result
metrics = dict(result.get("metrics", {}))
for key, scale in {"yield_estimate": factor, "biomass": factor, "max_lai": max(factor, 0.6)}.items():
if metrics.get(key) is not None:
metrics[key] = round(_safe_float(metrics[key]) * scale, 4)
result["metrics"] = metrics
result["nutrient_adjustment"] = adjustment
return result
2026-04-24 17:40:25 +03:30
def _load_pcse_bindings() -> dict[str, Any] | None:
try:
base_module = importlib.import_module("pcse.base")
models_module = importlib.import_module("pcse.models")
except ImportError:
return None
parameter_provider = getattr(base_module, "ParameterProvider", None)
weather_provider = getattr(base_module, "WeatherDataProvider", object)
weather_container = getattr(base_module, "WeatherDataContainer", None)
if weather_container is None or parameter_provider is None:
return None
return {
"ParameterProvider": parameter_provider,
"WeatherDataProvider": weather_provider,
"WeatherDataContainer": weather_container,
"models": models_module,
}
def _resolve_model_class(bindings: dict[str, Any], model_name: str):
models_source = bindings["models"]
if isinstance(models_source, dict):
return models_source[model_name]
return getattr(models_source, model_name)
@dataclass
class PreparedSimulationInput:
weather: list[dict[str, Any]]
soil: dict[str, Any]
crop: dict[str, Any]
site: dict[str, Any]
agromanagement: list[dict[str, Any]]
class PcseSimulationManager:
2026-04-24 22:20:15 +03:30
def __init__(self, model_name: str = DEFAULT_PCSE_MODEL_NAME):
2026-04-24 17:40:25 +03:30
self.model_name = model_name
def run_simulation(
self,
*,
weather: Any,
soil: dict[str, Any],
crop_parameters: dict[str, Any],
agromanagement: Any,
site_parameters: dict[str, Any] | None = None,
) -> dict[str, Any]:
prepared = PreparedSimulationInput(
weather=_normalize_weather_records(weather),
soil=soil or {},
crop=crop_parameters or {},
2026-04-24 22:20:15 +03:30
site=_normalize_site_parameters_for_model(
self.model_name,
site_parameters or {},
soil_parameters=soil or {},
),
2026-04-24 17:40:25 +03:30
agromanagement=_normalize_agromanagement(agromanagement),
)
bindings = _load_pcse_bindings()
if bindings is None:
raise CropSimulationError(
"PCSE is not installed or required PCSE classes could not be loaded."
)
2026-04-24 22:20:15 +03:30
result = self._run_with_pcse(prepared, bindings)
if self.model_name.startswith("Wofost81_NWLP"):
result = _apply_pk_adjustment(
result,
soil=prepared.soil,
site=prepared.site,
crop=prepared.crop,
)
return result
2026-04-24 17:40:25 +03:30
def _run_with_pcse(
self,
prepared: PreparedSimulationInput,
bindings: dict[str, Any],
) -> dict[str, Any]:
weather_provider_base = bindings["WeatherDataProvider"]
weather_container = bindings["WeatherDataContainer"]
parameter_provider_cls = bindings["ParameterProvider"]
model_cls = _resolve_model_class(bindings, self.model_name)
class DictWeatherProvider(weather_provider_base):
def __init__(self, records: list[dict[str, Any]]):
super().__init__()
self._records = {
item["DAY"]: weather_container(**item)
for item in records
}
def __call__(self, day):
return self._records[_coerce_date(day)]
parameter_provider = parameter_provider_cls(
cropdata=prepared.crop,
soildata=prepared.soil,
sitedata=prepared.site,
)
simulation = model_cls(
parameterprovider=parameter_provider,
weatherdataprovider=DictWeatherProvider(prepared.weather),
agromanagement=prepared.agromanagement,
output_vars=DEFAULT_OUTPUT_VARS,
summary_vars=DEFAULT_SUMMARY_VARS,
terminal_vars=DEFAULT_TERMINAL_VARS,
)
if hasattr(simulation, "run_till_terminate"):
simulation.run_till_terminate()
elif hasattr(simulation, "run"):
simulation.run(days=len(prepared.weather))
else:
raise CropSimulationError("PCSE model does not expose a runnable interface.")
daily_output = _normalize_pcse_output_records(simulation.get_output())
summary_output = _normalize_pcse_output_records(simulation.get_summary_output())
terminal_output = _normalize_pcse_output_records(simulation.get_terminal_output())
return self._build_result(
engine="pcse",
daily_output=daily_output,
summary_output=summary_output,
terminal_output=terminal_output,
)
def _build_result(
self,
*,
engine: str,
daily_output: list[dict[str, Any]],
summary_output: list[dict[str, Any]],
terminal_output: list[dict[str, Any]],
) -> dict[str, Any]:
terminal = terminal_output[-1] if terminal_output else {}
summary = summary_output[-1] if summary_output else {}
final_daily = daily_output[-1] if daily_output else {}
metrics = {
"yield_estimate": _pick_first_not_none(
terminal.get("TWSO"),
summary.get("TWSO"),
final_daily.get("TWSO"),
),
"biomass": _pick_first_not_none(
terminal.get("TAGP"),
summary.get("TAGP"),
final_daily.get("TAGP"),
),
"max_lai": _pick_first_not_none(
terminal.get("LAI"),
summary.get("LAIMAX"),
final_daily.get("LAI"),
),
}
return {
"engine": engine,
"model_name": self.model_name,
"metrics": _json_ready(metrics),
"daily_output": _json_ready(daily_output),
"summary_output": _json_ready(summary_output),
"terminal_output": _json_ready(terminal_output),
}
class CropSimulationService:
def __init__(self, manager: PcseSimulationManager | None = None):
self.manager = manager or PcseSimulationManager()
2026-04-24 22:20:15 +03:30
def _resolve_common_inputs(
self,
*,
farm_uuid: str | None = None,
plant_name: str | None = None,
weather: Any | None = None,
soil: dict[str, Any] | None = None,
crop_parameters: dict[str, Any] | None = None,
agromanagement: Any | None = None,
site_parameters: dict[str, Any] | None = None,
) -> dict[str, Any]:
if not farm_uuid:
return {
"weather": weather,
"soil": soil or {},
"crop_parameters": crop_parameters or {},
"agromanagement": agromanagement,
"site_parameters": _normalize_site_parameters_for_model(
self.manager.model_name,
site_parameters or {},
soil_parameters=soil or {},
),
"farm": None,
"plant": None,
}
base = build_simulation_payload_from_farm(
farm_uuid=str(farm_uuid),
plant_name=plant_name or (crop_parameters or {}).get("crop_name"),
weather=weather,
soil=soil,
crop_parameters=crop_parameters,
agromanagement=agromanagement,
site_parameters=site_parameters,
)
base["site_parameters"] = _normalize_site_parameters_for_model(
self.manager.model_name,
base.get("site_parameters"),
soil_parameters=base.get("soil"),
)
return base
2026-04-24 17:40:25 +03:30
def run_single_simulation(
self,
*,
2026-04-24 22:20:15 +03:30
weather: Any | None = None,
soil: dict[str, Any] | None = None,
crop_parameters: dict[str, Any] | None = None,
agromanagement: Any | None = None,
2026-04-24 17:40:25 +03:30
site_parameters: dict[str, Any] | None = None,
2026-04-24 18:34:17 +03:30
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
2026-04-24 17:40:25 +03:30
name: str = "",
2026-04-24 22:20:15 +03:30
farm_uuid: str | None = None,
plant_name: str | None = None,
2026-04-24 17:40:25 +03:30
) -> dict[str, Any]:
2026-04-24 22:20:15 +03:30
resolved = self._resolve_common_inputs(
farm_uuid=farm_uuid,
plant_name=plant_name,
weather=weather,
soil=soil,
crop_parameters=crop_parameters,
agromanagement=agromanagement,
site_parameters=site_parameters,
)
2026-04-24 18:34:17 +03:30
merged_agromanagement = _merge_management_recommendations(
2026-04-24 22:20:15 +03:30
resolved["agromanagement"],
2026-04-24 18:34:17 +03:30
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
2026-04-24 17:40:25 +03:30
scenario = SimulationScenario.objects.create(
name=name,
scenario_type=SimulationScenario.ScenarioType.SINGLE,
model_name=self.manager.model_name,
input_payload=_json_ready(
{
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
"crop_parameters": resolved["crop_parameters"],
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
2026-04-24 22:20:15 +03:30
"farm_uuid": farm_uuid,
"plant_name": plant_name,
2026-05-13 16:45:54 +03:30
"source_metadata": resolved.get("source_metadata") or {},
2026-04-24 17:40:25 +03:30
}
),
)
run = SimulationRun.objects.create(
scenario=scenario,
run_key="single",
label=name or "single",
2026-04-24 22:20:15 +03:30
weather_payload=_json_ready(resolved["weather"]),
soil_payload=_json_ready(resolved["soil"]),
crop_payload=_json_ready(resolved["crop_parameters"]),
site_payload=_json_ready(resolved["site_parameters"]),
2026-04-24 18:34:17 +03:30
agromanagement_payload=_json_ready(merged_agromanagement),
2026-04-24 17:40:25 +03:30
)
return self._execute_scenario(
scenario=scenario,
run_specs=[
{
"instance": run,
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
"crop_parameters": resolved["crop_parameters"],
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
2026-05-13 16:45:54 +03:30
"source_metadata": resolved.get("source_metadata") or {},
2026-04-24 17:40:25 +03:30
}
],
)
def compare_crops(
self,
*,
2026-04-24 22:20:15 +03:30
weather: Any | None = None,
soil: dict[str, Any] | None = None,
2026-04-24 17:40:25 +03:30
crop_a: dict[str, Any],
crop_b: dict[str, Any],
2026-04-24 22:20:15 +03:30
agromanagement: Any | None = None,
2026-04-24 17:40:25 +03:30
site_parameters: dict[str, Any] | None = None,
2026-04-24 18:34:17 +03:30
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
2026-04-24 17:40:25 +03:30
name: str = "",
2026-04-24 22:20:15 +03:30
farm_uuid: str | None = None,
2026-04-24 17:40:25 +03:30
) -> dict[str, Any]:
2026-04-24 22:20:15 +03:30
resolved = self._resolve_common_inputs(
farm_uuid=farm_uuid,
weather=weather,
soil=soil,
crop_parameters=None,
agromanagement=agromanagement,
site_parameters=site_parameters,
)
2026-04-24 18:34:17 +03:30
merged_agromanagement = _merge_management_recommendations(
2026-04-24 22:20:15 +03:30
resolved["agromanagement"],
2026-04-24 18:34:17 +03:30
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
2026-04-24 17:40:25 +03:30
scenario = SimulationScenario.objects.create(
name=name,
scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON,
model_name=self.manager.model_name,
input_payload=_json_ready(
{
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
2026-04-24 17:40:25 +03:30
"crop_a": crop_a,
"crop_b": crop_b,
2026-04-24 22:20:15 +03:30
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
2026-04-24 22:20:15 +03:30
"farm_uuid": farm_uuid,
2026-04-24 17:40:25 +03:30
}
),
)
runs = [
SimulationRun.objects.create(
scenario=scenario,
run_key="crop_a",
label=crop_a.get("crop_name", "crop_a"),
2026-04-24 22:20:15 +03:30
weather_payload=_json_ready(resolved["weather"]),
soil_payload=_json_ready(resolved["soil"]),
2026-04-24 17:40:25 +03:30
crop_payload=_json_ready(crop_a),
2026-04-24 22:20:15 +03:30
site_payload=_json_ready(resolved["site_parameters"]),
2026-04-24 18:34:17 +03:30
agromanagement_payload=_json_ready(merged_agromanagement),
2026-04-24 17:40:25 +03:30
),
SimulationRun.objects.create(
scenario=scenario,
run_key="crop_b",
label=crop_b.get("crop_name", "crop_b"),
2026-04-24 22:20:15 +03:30
weather_payload=_json_ready(resolved["weather"]),
soil_payload=_json_ready(resolved["soil"]),
2026-04-24 17:40:25 +03:30
crop_payload=_json_ready(crop_b),
2026-04-24 22:20:15 +03:30
site_payload=_json_ready(resolved["site_parameters"]),
2026-04-24 18:34:17 +03:30
agromanagement_payload=_json_ready(merged_agromanagement),
2026-04-24 17:40:25 +03:30
),
]
return self._execute_scenario(
scenario=scenario,
run_specs=[
{
"instance": runs[0],
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
2026-04-24 17:40:25 +03:30
"crop_parameters": crop_a,
2026-04-24 22:20:15 +03:30
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
2026-04-24 17:40:25 +03:30
},
{
"instance": runs[1],
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
2026-04-24 17:40:25 +03:30
"crop_parameters": crop_b,
2026-04-24 22:20:15 +03:30
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
2026-04-24 17:40:25 +03:30
},
],
)
2026-04-24 18:34:17 +03:30
def recommend_best_crop(
self,
*,
2026-04-24 22:20:15 +03:30
weather: Any | None = None,
soil: dict[str, Any] | None = None,
crops: list[dict[str, Any]] | None = None,
agromanagement: Any | None = None,
2026-04-24 18:34:17 +03:30
site_parameters: dict[str, Any] | None = None,
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
name: str = "",
2026-04-24 22:20:15 +03:30
farm_uuid: str | None = None,
2026-04-24 18:34:17 +03:30
) -> dict[str, Any]:
2026-04-24 22:20:15 +03:30
if not crops and farm_uuid:
base = build_simulation_payload_from_farm(farm_uuid=str(farm_uuid))
crops = []
2026-05-05 21:02:12 +03:30
for plant in base["runtime_plants"]:
2026-04-24 22:20:15 +03:30
simulation_profile = _extract_plant_simulation_profile(plant)
crop_payload = (
deepcopy(simulation_profile.get("crop_parameters"))
if simulation_profile and isinstance(simulation_profile.get("crop_parameters"), dict)
else _build_default_crop_parameters(plant, plant.name)
)
crop_payload.setdefault("crop_name", plant.name)
crop_payload.setdefault("label", plant.name)
crops.append(crop_payload)
crops = crops or []
2026-04-24 18:34:17 +03:30
if len(crops) < 2:
raise CropSimulationError("At least two crop options are required.")
2026-04-24 22:20:15 +03:30
resolved = self._resolve_common_inputs(
farm_uuid=farm_uuid,
weather=weather,
soil=soil,
crop_parameters=None,
agromanagement=agromanagement,
site_parameters=site_parameters,
)
2026-04-24 18:34:17 +03:30
merged_agromanagement = _merge_management_recommendations(
2026-04-24 22:20:15 +03:30
resolved["agromanagement"],
2026-04-24 18:34:17 +03:30
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
scenario = SimulationScenario.objects.create(
name=name,
scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON,
model_name=self.manager.model_name,
input_payload=_json_ready(
{
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
2026-04-24 18:34:17 +03:30
"crops": crops,
2026-04-24 22:20:15 +03:30
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
2026-04-24 22:20:15 +03:30
"farm_uuid": farm_uuid,
2026-04-24 18:34:17 +03:30
}
),
)
run_specs = []
for index, crop in enumerate(crops, start=1):
label = (
crop.get("label")
or crop.get("crop_name")
or crop.get("name")
or f"crop_{index}"
)
run = SimulationRun.objects.create(
scenario=scenario,
run_key=f"crop_{index}",
label=label,
2026-04-24 22:20:15 +03:30
weather_payload=_json_ready(resolved["weather"]),
soil_payload=_json_ready(resolved["soil"]),
2026-04-24 18:34:17 +03:30
crop_payload=_json_ready(crop),
2026-04-24 22:20:15 +03:30
site_payload=_json_ready(resolved["site_parameters"]),
2026-04-24 18:34:17 +03:30
agromanagement_payload=_json_ready(merged_agromanagement),
)
run_specs.append(
{
"instance": run,
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
2026-04-24 18:34:17 +03:30
"crop_parameters": crop,
2026-04-24 22:20:15 +03:30
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
}
)
result = self._execute_scenario(scenario=scenario, run_specs=run_specs)
comparison = result.get("comparison", {})
return {
"scenario_id": result["scenario_id"],
"scenario_type": result["scenario_type"],
"recommended_crop": {
"run_key": comparison.get("best_run_key"),
"label": comparison.get("best_label"),
"expected_yield_estimate": comparison.get("best_yield_estimate"),
},
"candidates": comparison.get("runs", []),
"raw_result": result,
}
2026-04-24 17:40:25 +03:30
def compare_fertilization_strategies(
self,
*,
2026-04-24 22:20:15 +03:30
weather: Any | None = None,
soil: dict[str, Any] | None = None,
crop_parameters: dict[str, Any] | None = None,
2026-04-24 17:40:25 +03:30
strategies: list[dict[str, Any]],
site_parameters: dict[str, Any] | None = None,
2026-04-24 18:34:17 +03:30
irrigation_recommendation: dict[str, Any] | None = None,
fertilization_recommendation: dict[str, Any] | None = None,
2026-04-24 17:40:25 +03:30
name: str = "",
2026-04-24 22:20:15 +03:30
farm_uuid: str | None = None,
plant_name: str | None = None,
2026-04-24 17:40:25 +03:30
) -> dict[str, Any]:
if len(strategies) < 2:
raise CropSimulationError("At least two fertilization strategies are required.")
2026-04-24 22:20:15 +03:30
resolved = self._resolve_common_inputs(
farm_uuid=farm_uuid,
plant_name=plant_name,
weather=weather,
soil=soil,
crop_parameters=crop_parameters,
agromanagement=None,
site_parameters=site_parameters,
)
2026-04-24 17:40:25 +03:30
scenario = SimulationScenario.objects.create(
name=name,
scenario_type=SimulationScenario.ScenarioType.FERTILIZATION_COMPARISON,
model_name=self.manager.model_name,
input_payload=_json_ready(
{
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
"crop_parameters": resolved["crop_parameters"],
"site_parameters": resolved["site_parameters"],
2026-04-24 17:40:25 +03:30
"strategies": strategies,
2026-04-24 18:34:17 +03:30
"irrigation_recommendation": irrigation_recommendation or {},
"fertilization_recommendation": fertilization_recommendation or {},
2026-04-24 22:20:15 +03:30
"farm_uuid": farm_uuid,
"plant_name": plant_name,
2026-04-24 17:40:25 +03:30
}
),
)
run_specs = []
for index, strategy in enumerate(strategies, start=1):
2026-04-24 18:34:17 +03:30
merged_agromanagement = _merge_management_recommendations(
strategy["agromanagement"],
irrigation_recommendation=irrigation_recommendation,
fertilization_recommendation=fertilization_recommendation,
)
2026-04-24 17:40:25 +03:30
run = SimulationRun.objects.create(
scenario=scenario,
run_key=f"strategy_{index}",
label=strategy.get("label", f"strategy_{index}"),
2026-04-24 22:20:15 +03:30
weather_payload=_json_ready(resolved["weather"]),
soil_payload=_json_ready(resolved["soil"]),
crop_payload=_json_ready(resolved["crop_parameters"]),
site_payload=_json_ready(resolved["site_parameters"]),
2026-04-24 18:34:17 +03:30
agromanagement_payload=_json_ready(merged_agromanagement),
2026-04-24 17:40:25 +03:30
)
run_specs.append(
{
"instance": run,
2026-04-24 22:20:15 +03:30
"weather": resolved["weather"],
"soil": resolved["soil"],
"crop_parameters": resolved["crop_parameters"],
"site_parameters": resolved["site_parameters"],
2026-04-24 18:34:17 +03:30
"agromanagement": merged_agromanagement,
2026-05-13 16:45:54 +03:30
"source_metadata": resolved.get("source_metadata") or {},
2026-04-24 17:40:25 +03:30
}
)
return self._execute_scenario(scenario=scenario, run_specs=run_specs)
def get_scenario_result(self, scenario_id: int) -> dict[str, Any]:
scenario = SimulationScenario.objects.prefetch_related("runs").get(pk=scenario_id)
return {
"id": scenario.id,
"name": scenario.name,
"scenario_type": scenario.scenario_type,
"status": scenario.status,
"model_name": scenario.model_name,
"input_payload": scenario.input_payload,
"result_payload": scenario.result_payload,
"error_message": scenario.error_message,
"runs": [
{
"id": run.id,
"run_key": run.run_key,
"label": run.label,
"status": run.status,
"result_payload": run.result_payload,
"error_message": run.error_message,
}
for run in scenario.runs.all()
],
}
def _execute_scenario(
self,
*,
scenario: SimulationScenario,
run_specs: list[dict[str, Any]],
) -> dict[str, Any]:
scenario.status = SimulationScenario.Status.RUNNING
scenario.error_message = ""
scenario.save(update_fields=["status", "error_message", "updated_at"])
results = []
try:
for spec in run_specs:
run = spec["instance"]
run.status = SimulationScenario.Status.RUNNING
run.error_message = ""
run.save(update_fields=["status", "error_message", "updated_at"])
result = self.manager.run_simulation(
weather=spec["weather"],
soil=spec["soil"],
crop_parameters=spec["crop_parameters"],
agromanagement=spec["agromanagement"],
site_parameters=spec["site_parameters"],
)
run.status = SimulationScenario.Status.SUCCESS
2026-05-13 16:45:54 +03:30
run.result_payload = {**result, "source_metadata": spec.get("source_metadata") or {}}
2026-04-24 17:40:25 +03:30
run.save(update_fields=["status", "result_payload", "updated_at"])
results.append(
{
"run_key": run.run_key,
"label": run.label,
"result": result,
}
)
except Exception as exc:
message = str(exc)
run = spec["instance"]
run.status = SimulationScenario.Status.FAILURE
run.error_message = message
run.save(update_fields=["status", "error_message", "updated_at"])
scenario.status = SimulationScenario.Status.FAILURE
scenario.error_message = message
scenario.result_payload = {"runs": results}
scenario.save(
update_fields=["status", "error_message", "result_payload", "updated_at"]
)
raise
scenario_result = self._build_scenario_result(scenario, results)
scenario.status = SimulationScenario.Status.SUCCESS
scenario.result_payload = scenario_result
scenario.error_message = ""
scenario.save(
update_fields=["status", "result_payload", "error_message", "updated_at"]
)
return scenario_result
def _build_scenario_result(
self,
scenario: SimulationScenario,
results: list[dict[str, Any]],
) -> dict[str, Any]:
payload = {
"scenario_id": scenario.id,
"scenario_type": scenario.scenario_type,
"status": SimulationScenario.Status.SUCCESS,
"runs": results,
}
if scenario.scenario_type == SimulationScenario.ScenarioType.SINGLE:
payload["result"] = results[0]["result"]
return payload
run_metrics = [
{
"run_key": item["run_key"],
"label": item["label"],
"yield_estimate": float(item["result"]["metrics"]["yield_estimate"] or 0.0),
"biomass": float(item["result"]["metrics"]["biomass"] or 0.0),
}
for item in results
]
best = max(run_metrics, key=lambda item: item["yield_estimate"])
payload["comparison"] = {
"best_run_key": best["run_key"],
"best_label": best["label"],
"best_yield_estimate": best["yield_estimate"],
"runs": run_metrics,
}
if scenario.scenario_type == SimulationScenario.ScenarioType.CROP_COMPARISON:
2026-04-24 22:20:15 +03:30
if len(run_metrics) >= 2:
payload["comparison"]["yield_gap"] = round(
abs(run_metrics[0]["yield_estimate"] - run_metrics[1]["yield_estimate"]),
3,
)
2026-04-24 17:40:25 +03:30
if (
scenario.scenario_type
== SimulationScenario.ScenarioType.FERTILIZATION_COMPARISON
):
payload["recommendation"] = {
"recommended_run_key": best["run_key"],
"recommended_label": best["label"],
"expected_yield_estimate": best["yield_estimate"],
}
return payload
@transaction.atomic
def run_single_simulation(**kwargs) -> dict[str, Any]:
return CropSimulationService().run_single_simulation(**kwargs)
@transaction.atomic
def compare_crops(**kwargs) -> dict[str, Any]:
return CropSimulationService().compare_crops(**kwargs)
2026-04-24 18:34:17 +03:30
@transaction.atomic
def recommend_best_crop(**kwargs) -> dict[str, Any]:
return CropSimulationService().recommend_best_crop(**kwargs)
2026-04-24 17:40:25 +03:30
@transaction.atomic
def compare_fertilization_strategies(**kwargs) -> dict[str, Any]:
return CropSimulationService().compare_fertilization_strategies(**kwargs)