from __future__ import annotations import importlib from copy import deepcopy from dataclasses import dataclass from datetime import date, datetime from typing import Any from django.db import transaction from .models import SimulationRun, SimulationScenario DEFAULT_OUTPUT_VARS = ["DVS", "LAI", "TAGP", "TWSO", "SM"] DEFAULT_SUMMARY_VARS = ["TAGP", "TWSO", "CTRAT", "RD"] DEFAULT_TERMINAL_VARS = ["TAGP", "TWSO", "LAI", "DVS"] DEFAULT_PCSE_MODEL_NAME = "Wofost81_NWLP_CWB_CNB" DEFAULT_NAVAILI = 35.0 DEFAULT_WAV = 40.0 class CropSimulationError(Exception): pass def _json_ready(value: Any) -> Any: if isinstance(value, dict): return {str(key): _json_ready(item) for key, item in value.items()} if isinstance(value, list): return [_json_ready(item) for item in value] if isinstance(value, tuple): return [_json_ready(item) for item in value] if isinstance(value, (date, datetime)): return value.isoformat() return value def _coerce_date(value: Any) -> date: if isinstance(value, date) and not isinstance(value, datetime): return value if isinstance(value, datetime): return value.date() if isinstance(value, str): return date.fromisoformat(value) raise CropSimulationError(f"Unsupported date value: {value!r}") def _normalize_weather_records(weather: Any) -> list[dict[str, Any]]: if isinstance(weather, dict): if "records" in weather: records = weather["records"] else: records = [weather] else: records = weather if not isinstance(records, list) or not records: raise CropSimulationError("Weather input must contain at least one record.") normalized = [] for raw in records: if not isinstance(raw, dict): raise CropSimulationError("Weather records must be dictionaries.") current_date = _coerce_date(raw.get("DAY") or raw.get("day")) normalized.append( { "DAY": current_date, "LAT": float(raw.get("LAT", raw.get("lat", 0.0))), "LON": float(raw.get("LON", raw.get("lon", 0.0))), "ELEV": float(raw.get("ELEV", raw.get("elev", 0.0))), "IRRAD": float(raw.get("IRRAD", raw.get("irrad", 15_000_000.0))), "TMIN": float(raw.get("TMIN", raw.get("tmin", 10.0))), "TMAX": float(raw.get("TMAX", raw.get("tmax", 20.0))), "VAP": float(raw.get("VAP", raw.get("vap", 12.0))), "WIND": float(raw.get("WIND", raw.get("wind", 2.0))), "RAIN": float(raw.get("RAIN", raw.get("rain", 0.0))), "E0": float(raw.get("E0", raw.get("e0", 0.35))), "ES0": float(raw.get("ES0", raw.get("es0", 0.3))), "ET0": float(raw.get("ET0", raw.get("et0", 0.32))), } ) return normalized def _normalize_agromanagement(agromanagement: Any) -> list[dict[str, Any]]: if isinstance(agromanagement, dict) and "AgroManagement" in agromanagement: campaigns = agromanagement["AgroManagement"] elif isinstance(agromanagement, list): campaigns = agromanagement elif isinstance(agromanagement, dict): campaigns = [agromanagement] else: raise CropSimulationError("Agromanagement input must be a dict or list.") if not campaigns: raise CropSimulationError("Agromanagement input cannot be empty.") return campaigns def _deep_copy_json_like(value: Any) -> Any: if isinstance(value, dict): return {key: _deep_copy_json_like(item) for key, item in value.items()} if isinstance(value, list): return [_deep_copy_json_like(item) for item in value] return value def _parse_recommendation_events( recommendation: dict[str, Any] | None, *, event_signal: str, amount_keys: tuple[str, ...], extra_keys: tuple[str, ...], ) -> list[dict[str, Any]]: if not recommendation: return [] raw_events = recommendation.get("events") if raw_events is None: raw_events = recommendation.get("schedule") if raw_events is None: raw_events = recommendation.get("applications") if raw_events is None: raw_events = recommendation.get("plan") if not isinstance(raw_events, list): return [] events_table = [] for item in raw_events: if not isinstance(item, dict): continue raw_date = item.get("date") or item.get("day") if raw_date is None: continue payload = {} amount_value = None amount_key = None for candidate in amount_keys: if item.get(candidate) is not None: amount_value = item.get(candidate) amount_key = candidate break if amount_key is not None: payload[amount_key] = float(amount_value) for extra_key in extra_keys: if item.get(extra_key) is not None: payload[extra_key] = float(item[extra_key]) if payload: events_table.append({_coerce_date(raw_date): payload}) if not events_table: return [] return [ { "event_signal": event_signal, "name": recommendation.get("name", f"{event_signal} recommendation"), "comment": recommendation.get("comment", ""), "events_table": events_table, } ] def _merge_management_recommendations( agromanagement: Any, *, irrigation_recommendation: dict[str, Any] | None = None, fertilization_recommendation: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: campaigns = _deep_copy_json_like(_normalize_agromanagement(agromanagement)) irrigation_events = _parse_recommendation_events( irrigation_recommendation, event_signal="irrigate", amount_keys=("amount", "irrigation_amount"), extra_keys=("efficiency",), ) fertilization_events = _parse_recommendation_events( fertilization_recommendation, event_signal="apply_n", amount_keys=("N_amount", "amount"), extra_keys=("N_recovery",), ) if not irrigation_events and not fertilization_events: return campaigns target_campaign = None for campaign in campaigns: if isinstance(campaign, dict) and campaign: target_campaign = campaign break if target_campaign is None: raise CropSimulationError( "Agromanagement must contain at least one non-empty campaign." ) campaign_start = next(iter(target_campaign.keys())) campaign_payload = target_campaign[campaign_start] if not isinstance(campaign_payload, dict): raise CropSimulationError("Agromanagement campaign payload must be a dictionary.") timed_events = campaign_payload.get("TimedEvents") if timed_events in (None, ""): timed_events = [] if not isinstance(timed_events, list): raise CropSimulationError("TimedEvents must be a list when recommendations are merged.") timed_events.extend(irrigation_events) timed_events.extend(fertilization_events) campaign_payload["TimedEvents"] = timed_events return campaigns def _normalize_pcse_output_records(records: Any) -> list[dict[str, Any]]: if records is None: return [] if isinstance(records, dict): return [records] if isinstance(records, list): return records if isinstance(records, tuple): return list(records) return [records] def _pick_first_not_none(*values: Any) -> Any: for value in values: if value is not None: return value return None def _safe_float(value: Any, default: float = 0.0) -> float: try: if value in (None, ""): return default return float(value) except (TypeError, ValueError): return default def _clamp(value: float, lower: float, upper: float) -> float: return max(lower, min(value, upper)) def _sensor_metric(sensor: Any, metric_name: str) -> float | None: if sensor is None: return None if hasattr(sensor, metric_name): value = getattr(sensor, metric_name) if value is not None: return _safe_float(value) payload = getattr(sensor, "sensor_payload", None) or {} if not isinstance(payload, dict): return None for block in payload.values(): if isinstance(block, dict) and block.get(metric_name) is not None: return _safe_float(block.get(metric_name)) return None def _extract_plant_simulation_profile(plant: Any | None) -> dict[str, Any] | None: if plant is None: return None for attr in ("growth_profile", "irrigation_profile", "health_profile"): profile = getattr(plant, attr, None) or {} if not isinstance(profile, dict): continue simulation = profile.get("simulation") if isinstance(simulation, dict): return simulation return None def _build_default_crop_parameters(plant: Any | None, crop_name: str) -> dict[str, Any]: profile = getattr(plant, "growth_profile", None) or {} required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0) return { "crop_name": crop_name, "TSUM1": round(required_gdd * 0.45, 3), "TSUM2": round(required_gdd * 0.55, 3), "YIELD_SCALE": 1.0, "MAX_LAI": 5.0, "MAX_BIOMASS": 12000.0, } def _build_default_agromanagement(crop_name: str, weather: list[dict[str, Any]]) -> list[dict[str, Any]]: first_day = weather[0]["DAY"] last_day = weather[-1]["DAY"] crop_end = max(last_day, first_day + (last_day - first_day)) return [ { first_day: { "CropCalendar": { "crop_name": crop_name, "variety_name": "default", "crop_start_date": first_day, "crop_start_type": "sowing", "crop_end_date": crop_end, "crop_end_type": "harvest", "max_duration": max((crop_end - first_day).days, 1), }, "TimedEvents": [], "StateEvents": [], } } ] def _build_weather_from_forecasts(forecasts: list[Any], *, latitude: float, longitude: float) -> list[dict[str, Any]]: return [ { "DAY": forecast.forecast_date, "LAT": latitude, "LON": longitude, "ELEV": 1200.0, "IRRAD": 16_000_000.0, "TMIN": _safe_float( _pick_first_not_none(forecast.temperature_min, forecast.temperature_mean), 12.0, ), "TMAX": _safe_float( _pick_first_not_none(forecast.temperature_max, forecast.temperature_mean), 24.0, ), "VAP": max(_safe_float(forecast.humidity_mean, 55.0) / 5.0, 6.0), "WIND": _safe_float(forecast.wind_speed_max, 7.2) / 3.6, "RAIN": _safe_float(forecast.precipitation, 0.0), "E0": _safe_float(forecast.et0, 0.35), "ES0": max(_safe_float(forecast.et0, 0.35) * 0.9, 0.1), "ET0": _safe_float(forecast.et0, 0.35), } for forecast in forecasts ] def _normalize_site_parameters_for_model( model_name: str, site_parameters: dict[str, Any] | None, *, soil_parameters: dict[str, Any] | None = None, ) -> dict[str, Any]: site = dict(site_parameters or {}) soil = soil_parameters or {} site.setdefault("WAV", _safe_float(site.get("WAV"), DEFAULT_WAV)) if model_name.startswith("Wofost81_NWLP"): navaili = _pick_first_not_none( site.get("NAVAILI"), site.get("navaili"), site.get("nitrogen"), soil.get("NAVAILI"), soil.get("nitrogen"), ) site["NAVAILI"] = _safe_float(navaili, DEFAULT_NAVAILI) site.setdefault("BG_N_SUPPLY", 0.05) site.setdefault("NSOILBASE", max(site["NAVAILI"] * 0.35, 5.0)) site.setdefault("NSOILBASE_FR", 0.02) return site def build_simulation_payload_from_farm( *, farm_uuid: str, plant_name: str | None = None, weather: Any | None = None, soil: dict[str, Any] | None = None, crop_parameters: dict[str, Any] | None = None, agromanagement: Any | None = None, site_parameters: dict[str, Any] | None = None, ) -> dict[str, Any]: from farm_data.models import SensorData from weather.models import WeatherForecast farm = ( SensorData.objects.select_related("center_location", "irrigation_method") .prefetch_related("plants", "center_location__depths") .filter(farm_uuid=farm_uuid) .first() ) if farm is None: raise CropSimulationError(f"Farm with uuid={farm_uuid} not found.") plant = None if plant_name: plant = farm.plants.filter(name=plant_name).first() if plant is None: plant = farm.plants.first() if weather is not None: resolved_weather = _normalize_weather_records(weather) else: forecasts = list( WeatherForecast.objects.filter(location=farm.center_location) .order_by("forecast_date")[:14] ) if not forecasts: raise CropSimulationError( "Weather data for the selected farm is missing." ) resolved_weather = _build_weather_from_forecasts( forecasts, latitude=float(farm.center_location.latitude), longitude=float(farm.center_location.longitude), ) depths = list(farm.center_location.depths.all()) top_depth = depths[0] if depths else None smfcf = _clamp(_safe_float(getattr(top_depth, "wv0033", None), 0.34), 0.2, 0.55) smw = _clamp(_safe_float(getattr(top_depth, "wv1500", None), 0.14), 0.05, max(smfcf - 0.02, 0.06)) soil_moisture = _sensor_metric(farm, "soil_moisture") wav = ( round(_clamp((soil_moisture or DEFAULT_WAV) / 100.0, smw, smfcf) * 100.0, 3) if soil_moisture is not None else DEFAULT_WAV ) nitrogen = _pick_first_not_none(_sensor_metric(farm, "nitrogen"), getattr(top_depth, "nitrogen", None)) phosphorus = _sensor_metric(farm, "phosphorus") potassium = _sensor_metric(farm, "potassium") soil_ph = _pick_first_not_none(_sensor_metric(farm, "soil_ph"), getattr(top_depth, "phh2o", None)) ec = _sensor_metric(farm, "electrical_conductivity") resolved_soil = { "SMFCF": round(smfcf, 3), "SMW": round(smw, 3), "RDMSOL": 120.0, "soil_moisture": soil_moisture, "nitrogen": _safe_float(nitrogen, DEFAULT_NAVAILI), "phosphorus": _safe_float(phosphorus, 0.0), "potassium": _safe_float(potassium, 0.0), "soil_ph": _safe_float(soil_ph, 7.0), "electrical_conductivity": _safe_float(ec, 0.0), "clay": _safe_float(getattr(top_depth, "clay", None), 0.0), "sand": _safe_float(getattr(top_depth, "sand", None), 0.0), "silt": _safe_float(getattr(top_depth, "silt", None), 0.0), "cec": _safe_float(getattr(top_depth, "cec", None), 0.0), "soc": _safe_float(getattr(top_depth, "soc", None), 0.0), } if soil: resolved_soil.update(soil) resolved_site = { "WAV": wav, "NAVAILI": _safe_float(nitrogen, DEFAULT_NAVAILI), "P_STATUS": _safe_float(phosphorus, 0.0), "K_STATUS": _safe_float(potassium, 0.0), "SOIL_PH": _safe_float(soil_ph, 7.0), "EC": _safe_float(ec, 0.0), } if site_parameters: resolved_site.update(site_parameters) simulation_profile = _extract_plant_simulation_profile(plant) default_crop = ( deepcopy(simulation_profile.get("crop_parameters")) if simulation_profile and isinstance(simulation_profile.get("crop_parameters"), dict) else _build_default_crop_parameters(plant, plant_name or getattr(plant, "name", "crop")) ) resolved_crop = default_crop if crop_parameters: resolved_crop.update(crop_parameters) resolved_crop.setdefault("crop_name", plant_name or getattr(plant, "name", "crop")) resolved_crop.setdefault("farm_uuid", str(farm_uuid)) resolved_crop.setdefault("soil_ph", _safe_float(soil_ph, 7.0)) resolved_crop.setdefault("soil_nitrogen", _safe_float(nitrogen, DEFAULT_NAVAILI)) resolved_crop.setdefault("soil_phosphorus", _safe_float(phosphorus, 0.0)) resolved_crop.setdefault("soil_potassium", _safe_float(potassium, 0.0)) default_agromanagement = ( deepcopy(simulation_profile.get("agromanagement")) if simulation_profile and simulation_profile.get("agromanagement") else _build_default_agromanagement(resolved_crop["crop_name"], resolved_weather) ) resolved_agromanagement = agromanagement if agromanagement is not None else default_agromanagement return { "farm": farm, "plant": plant, "weather": resolved_weather, "soil": resolved_soil, "site_parameters": resolved_site, "crop_parameters": resolved_crop, "agromanagement": resolved_agromanagement, } def _extract_total_n(agromanagement: list[dict[str, Any]]) -> float: total_n = 0.0 for campaign in agromanagement: if not isinstance(campaign, dict): continue for payload in campaign.values(): if not isinstance(payload, dict): continue for bucket_name in ("TimedEvents", "StateEvents"): for event_group in payload.get(bucket_name, []) or []: if not isinstance(event_group, dict): continue for event in event_group.get("events_table", []) or []: if not isinstance(event, dict): continue for event_payload in event.values(): if isinstance(event_payload, dict): total_n += float(event_payload.get("N_amount", 0.0)) return total_n def _estimate_pk_stress_factor( *, soil: dict[str, Any], site: dict[str, Any], crop: dict[str, Any], ) -> dict[str, float]: phosphorus = _safe_float( _pick_first_not_none(site.get("P_STATUS"), soil.get("phosphorus"), crop.get("soil_phosphorus")), 0.0, ) potassium = _safe_float( _pick_first_not_none(site.get("K_STATUS"), soil.get("potassium"), crop.get("soil_potassium")), 0.0, ) soil_ph = _safe_float( _pick_first_not_none(site.get("SOIL_PH"), soil.get("soil_ph"), crop.get("soil_ph")), 7.0, ) ec = _safe_float(_pick_first_not_none(site.get("EC"), soil.get("electrical_conductivity")), 0.0) phosphorus_target = _safe_float(crop.get("P_OPTIMAL"), 30.0) potassium_target = _safe_float(crop.get("K_OPTIMAL"), 45.0) p_factor = _clamp(phosphorus / max(phosphorus_target, 1.0), 0.45, 1.0) k_factor = _clamp(potassium / max(potassium_target, 1.0), 0.45, 1.0) ph_penalty = 1.0 if soil_ph < 5.8: ph_penalty = _clamp(1.0 - ((5.8 - soil_ph) * 0.08), 0.65, 1.0) elif soil_ph > 7.8: ph_penalty = _clamp(1.0 - ((soil_ph - 7.8) * 0.06), 0.7, 1.0) ec_penalty = 1.0 if ec > 2.5: ec_penalty = _clamp(1.0 - ((ec - 2.5) * 0.07), 0.72, 1.0) combined_factor = round(_clamp(p_factor * k_factor * ph_penalty * ec_penalty, 0.35, 1.0), 4) return { "phosphorus_factor": round(p_factor, 4), "potassium_factor": round(k_factor, 4), "ph_penalty": round(ph_penalty, 4), "ec_penalty": round(ec_penalty, 4), "combined_factor": combined_factor, } def _apply_pk_adjustment( result: dict[str, Any], *, soil: dict[str, Any], site: dict[str, Any], crop: dict[str, Any], ) -> dict[str, Any]: adjustment = _estimate_pk_stress_factor(soil=soil, site=site, crop=crop) factor = adjustment["combined_factor"] if factor >= 0.995: result["nutrient_adjustment"] = adjustment return result metrics = dict(result.get("metrics", {})) for key, scale in {"yield_estimate": factor, "biomass": factor, "max_lai": max(factor, 0.6)}.items(): if metrics.get(key) is not None: metrics[key] = round(_safe_float(metrics[key]) * scale, 4) result["metrics"] = metrics result["nutrient_adjustment"] = adjustment return result def _load_pcse_bindings() -> dict[str, Any] | None: try: base_module = importlib.import_module("pcse.base") models_module = importlib.import_module("pcse.models") except ImportError: return None parameter_provider = getattr(base_module, "ParameterProvider", None) weather_provider = getattr(base_module, "WeatherDataProvider", object) weather_container = getattr(base_module, "WeatherDataContainer", None) if weather_container is None or parameter_provider is None: return None return { "ParameterProvider": parameter_provider, "WeatherDataProvider": weather_provider, "WeatherDataContainer": weather_container, "models": models_module, } def _resolve_model_class(bindings: dict[str, Any], model_name: str): models_source = bindings["models"] if isinstance(models_source, dict): return models_source[model_name] return getattr(models_source, model_name) @dataclass class PreparedSimulationInput: weather: list[dict[str, Any]] soil: dict[str, Any] crop: dict[str, Any] site: dict[str, Any] agromanagement: list[dict[str, Any]] class PcseSimulationManager: def __init__(self, model_name: str = DEFAULT_PCSE_MODEL_NAME): self.model_name = model_name def run_simulation( self, *, weather: Any, soil: dict[str, Any], crop_parameters: dict[str, Any], agromanagement: Any, site_parameters: dict[str, Any] | None = None, ) -> dict[str, Any]: prepared = PreparedSimulationInput( weather=_normalize_weather_records(weather), soil=soil or {}, crop=crop_parameters or {}, site=_normalize_site_parameters_for_model( self.model_name, site_parameters or {}, soil_parameters=soil or {}, ), agromanagement=_normalize_agromanagement(agromanagement), ) bindings = _load_pcse_bindings() if bindings is None: raise CropSimulationError( "PCSE is not installed or required PCSE classes could not be loaded." ) result = self._run_with_pcse(prepared, bindings) if self.model_name.startswith("Wofost81_NWLP"): result = _apply_pk_adjustment( result, soil=prepared.soil, site=prepared.site, crop=prepared.crop, ) return result def _run_with_pcse( self, prepared: PreparedSimulationInput, bindings: dict[str, Any], ) -> dict[str, Any]: weather_provider_base = bindings["WeatherDataProvider"] weather_container = bindings["WeatherDataContainer"] parameter_provider_cls = bindings["ParameterProvider"] model_cls = _resolve_model_class(bindings, self.model_name) class DictWeatherProvider(weather_provider_base): def __init__(self, records: list[dict[str, Any]]): super().__init__() self._records = { item["DAY"]: weather_container(**item) for item in records } def __call__(self, day): return self._records[_coerce_date(day)] parameter_provider = parameter_provider_cls( cropdata=prepared.crop, soildata=prepared.soil, sitedata=prepared.site, ) simulation = model_cls( parameterprovider=parameter_provider, weatherdataprovider=DictWeatherProvider(prepared.weather), agromanagement=prepared.agromanagement, output_vars=DEFAULT_OUTPUT_VARS, summary_vars=DEFAULT_SUMMARY_VARS, terminal_vars=DEFAULT_TERMINAL_VARS, ) if hasattr(simulation, "run_till_terminate"): simulation.run_till_terminate() elif hasattr(simulation, "run"): simulation.run(days=len(prepared.weather)) else: raise CropSimulationError("PCSE model does not expose a runnable interface.") daily_output = _normalize_pcse_output_records(simulation.get_output()) summary_output = _normalize_pcse_output_records(simulation.get_summary_output()) terminal_output = _normalize_pcse_output_records(simulation.get_terminal_output()) return self._build_result( engine="pcse", daily_output=daily_output, summary_output=summary_output, terminal_output=terminal_output, ) def _build_result( self, *, engine: str, daily_output: list[dict[str, Any]], summary_output: list[dict[str, Any]], terminal_output: list[dict[str, Any]], ) -> dict[str, Any]: terminal = terminal_output[-1] if terminal_output else {} summary = summary_output[-1] if summary_output else {} final_daily = daily_output[-1] if daily_output else {} metrics = { "yield_estimate": _pick_first_not_none( terminal.get("TWSO"), summary.get("TWSO"), final_daily.get("TWSO"), ), "biomass": _pick_first_not_none( terminal.get("TAGP"), summary.get("TAGP"), final_daily.get("TAGP"), ), "max_lai": _pick_first_not_none( terminal.get("LAI"), summary.get("LAIMAX"), final_daily.get("LAI"), ), } return { "engine": engine, "model_name": self.model_name, "metrics": _json_ready(metrics), "daily_output": _json_ready(daily_output), "summary_output": _json_ready(summary_output), "terminal_output": _json_ready(terminal_output), } class CropSimulationService: def __init__(self, manager: PcseSimulationManager | None = None): self.manager = manager or PcseSimulationManager() def _resolve_common_inputs( self, *, farm_uuid: str | None = None, plant_name: str | None = None, weather: Any | None = None, soil: dict[str, Any] | None = None, crop_parameters: dict[str, Any] | None = None, agromanagement: Any | None = None, site_parameters: dict[str, Any] | None = None, ) -> dict[str, Any]: if not farm_uuid: return { "weather": weather, "soil": soil or {}, "crop_parameters": crop_parameters or {}, "agromanagement": agromanagement, "site_parameters": _normalize_site_parameters_for_model( self.manager.model_name, site_parameters or {}, soil_parameters=soil or {}, ), "farm": None, "plant": None, } base = build_simulation_payload_from_farm( farm_uuid=str(farm_uuid), plant_name=plant_name or (crop_parameters or {}).get("crop_name"), weather=weather, soil=soil, crop_parameters=crop_parameters, agromanagement=agromanagement, site_parameters=site_parameters, ) base["site_parameters"] = _normalize_site_parameters_for_model( self.manager.model_name, base.get("site_parameters"), soil_parameters=base.get("soil"), ) return base def run_single_simulation( self, *, weather: Any | None = None, soil: dict[str, Any] | None = None, crop_parameters: dict[str, Any] | None = None, agromanagement: Any | None = None, site_parameters: dict[str, Any] | None = None, irrigation_recommendation: dict[str, Any] | None = None, fertilization_recommendation: dict[str, Any] | None = None, name: str = "", farm_uuid: str | None = None, plant_name: str | None = None, ) -> dict[str, Any]: resolved = self._resolve_common_inputs( farm_uuid=farm_uuid, plant_name=plant_name, weather=weather, soil=soil, crop_parameters=crop_parameters, agromanagement=agromanagement, site_parameters=site_parameters, ) merged_agromanagement = _merge_management_recommendations( resolved["agromanagement"], irrigation_recommendation=irrigation_recommendation, fertilization_recommendation=fertilization_recommendation, ) scenario = SimulationScenario.objects.create( name=name, scenario_type=SimulationScenario.ScenarioType.SINGLE, model_name=self.manager.model_name, input_payload=_json_ready( { "weather": resolved["weather"], "soil": resolved["soil"], "crop_parameters": resolved["crop_parameters"], "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, "irrigation_recommendation": irrigation_recommendation or {}, "fertilization_recommendation": fertilization_recommendation or {}, "farm_uuid": farm_uuid, "plant_name": plant_name, } ), ) run = SimulationRun.objects.create( scenario=scenario, run_key="single", label=name or "single", weather_payload=_json_ready(resolved["weather"]), soil_payload=_json_ready(resolved["soil"]), crop_payload=_json_ready(resolved["crop_parameters"]), site_payload=_json_ready(resolved["site_parameters"]), agromanagement_payload=_json_ready(merged_agromanagement), ) return self._execute_scenario( scenario=scenario, run_specs=[ { "instance": run, "weather": resolved["weather"], "soil": resolved["soil"], "crop_parameters": resolved["crop_parameters"], "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, } ], ) def compare_crops( self, *, weather: Any | None = None, soil: dict[str, Any] | None = None, crop_a: dict[str, Any], crop_b: dict[str, Any], agromanagement: Any | None = None, site_parameters: dict[str, Any] | None = None, irrigation_recommendation: dict[str, Any] | None = None, fertilization_recommendation: dict[str, Any] | None = None, name: str = "", farm_uuid: str | None = None, ) -> dict[str, Any]: resolved = self._resolve_common_inputs( farm_uuid=farm_uuid, weather=weather, soil=soil, crop_parameters=None, agromanagement=agromanagement, site_parameters=site_parameters, ) merged_agromanagement = _merge_management_recommendations( resolved["agromanagement"], irrigation_recommendation=irrigation_recommendation, fertilization_recommendation=fertilization_recommendation, ) scenario = SimulationScenario.objects.create( name=name, scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON, model_name=self.manager.model_name, input_payload=_json_ready( { "weather": resolved["weather"], "soil": resolved["soil"], "crop_a": crop_a, "crop_b": crop_b, "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, "irrigation_recommendation": irrigation_recommendation or {}, "fertilization_recommendation": fertilization_recommendation or {}, "farm_uuid": farm_uuid, } ), ) runs = [ SimulationRun.objects.create( scenario=scenario, run_key="crop_a", label=crop_a.get("crop_name", "crop_a"), weather_payload=_json_ready(resolved["weather"]), soil_payload=_json_ready(resolved["soil"]), crop_payload=_json_ready(crop_a), site_payload=_json_ready(resolved["site_parameters"]), agromanagement_payload=_json_ready(merged_agromanagement), ), SimulationRun.objects.create( scenario=scenario, run_key="crop_b", label=crop_b.get("crop_name", "crop_b"), weather_payload=_json_ready(resolved["weather"]), soil_payload=_json_ready(resolved["soil"]), crop_payload=_json_ready(crop_b), site_payload=_json_ready(resolved["site_parameters"]), agromanagement_payload=_json_ready(merged_agromanagement), ), ] return self._execute_scenario( scenario=scenario, run_specs=[ { "instance": runs[0], "weather": resolved["weather"], "soil": resolved["soil"], "crop_parameters": crop_a, "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, }, { "instance": runs[1], "weather": resolved["weather"], "soil": resolved["soil"], "crop_parameters": crop_b, "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, }, ], ) def recommend_best_crop( self, *, weather: Any | None = None, soil: dict[str, Any] | None = None, crops: list[dict[str, Any]] | None = None, agromanagement: Any | None = None, site_parameters: dict[str, Any] | None = None, irrigation_recommendation: dict[str, Any] | None = None, fertilization_recommendation: dict[str, Any] | None = None, name: str = "", farm_uuid: str | None = None, ) -> dict[str, Any]: if not crops and farm_uuid: base = build_simulation_payload_from_farm(farm_uuid=str(farm_uuid)) crops = [] for plant in base["farm"].plants.all(): simulation_profile = _extract_plant_simulation_profile(plant) crop_payload = ( deepcopy(simulation_profile.get("crop_parameters")) if simulation_profile and isinstance(simulation_profile.get("crop_parameters"), dict) else _build_default_crop_parameters(plant, plant.name) ) crop_payload.setdefault("crop_name", plant.name) crop_payload.setdefault("label", plant.name) crops.append(crop_payload) crops = crops or [] if len(crops) < 2: raise CropSimulationError("At least two crop options are required.") resolved = self._resolve_common_inputs( farm_uuid=farm_uuid, weather=weather, soil=soil, crop_parameters=None, agromanagement=agromanagement, site_parameters=site_parameters, ) merged_agromanagement = _merge_management_recommendations( resolved["agromanagement"], irrigation_recommendation=irrigation_recommendation, fertilization_recommendation=fertilization_recommendation, ) scenario = SimulationScenario.objects.create( name=name, scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON, model_name=self.manager.model_name, input_payload=_json_ready( { "weather": resolved["weather"], "soil": resolved["soil"], "crops": crops, "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, "irrigation_recommendation": irrigation_recommendation or {}, "fertilization_recommendation": fertilization_recommendation or {}, "farm_uuid": farm_uuid, } ), ) run_specs = [] for index, crop in enumerate(crops, start=1): label = ( crop.get("label") or crop.get("crop_name") or crop.get("name") or f"crop_{index}" ) run = SimulationRun.objects.create( scenario=scenario, run_key=f"crop_{index}", label=label, weather_payload=_json_ready(resolved["weather"]), soil_payload=_json_ready(resolved["soil"]), crop_payload=_json_ready(crop), site_payload=_json_ready(resolved["site_parameters"]), agromanagement_payload=_json_ready(merged_agromanagement), ) run_specs.append( { "instance": run, "weather": resolved["weather"], "soil": resolved["soil"], "crop_parameters": crop, "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, } ) result = self._execute_scenario(scenario=scenario, run_specs=run_specs) comparison = result.get("comparison", {}) return { "scenario_id": result["scenario_id"], "scenario_type": result["scenario_type"], "recommended_crop": { "run_key": comparison.get("best_run_key"), "label": comparison.get("best_label"), "expected_yield_estimate": comparison.get("best_yield_estimate"), }, "candidates": comparison.get("runs", []), "raw_result": result, } def compare_fertilization_strategies( self, *, weather: Any | None = None, soil: dict[str, Any] | None = None, crop_parameters: dict[str, Any] | None = None, strategies: list[dict[str, Any]], site_parameters: dict[str, Any] | None = None, irrigation_recommendation: dict[str, Any] | None = None, fertilization_recommendation: dict[str, Any] | None = None, name: str = "", farm_uuid: str | None = None, plant_name: str | None = None, ) -> dict[str, Any]: if len(strategies) < 2: raise CropSimulationError("At least two fertilization strategies are required.") resolved = self._resolve_common_inputs( farm_uuid=farm_uuid, plant_name=plant_name, weather=weather, soil=soil, crop_parameters=crop_parameters, agromanagement=None, site_parameters=site_parameters, ) scenario = SimulationScenario.objects.create( name=name, scenario_type=SimulationScenario.ScenarioType.FERTILIZATION_COMPARISON, model_name=self.manager.model_name, input_payload=_json_ready( { "weather": resolved["weather"], "soil": resolved["soil"], "crop_parameters": resolved["crop_parameters"], "site_parameters": resolved["site_parameters"], "strategies": strategies, "irrigation_recommendation": irrigation_recommendation or {}, "fertilization_recommendation": fertilization_recommendation or {}, "farm_uuid": farm_uuid, "plant_name": plant_name, } ), ) run_specs = [] for index, strategy in enumerate(strategies, start=1): merged_agromanagement = _merge_management_recommendations( strategy["agromanagement"], irrigation_recommendation=irrigation_recommendation, fertilization_recommendation=fertilization_recommendation, ) run = SimulationRun.objects.create( scenario=scenario, run_key=f"strategy_{index}", label=strategy.get("label", f"strategy_{index}"), weather_payload=_json_ready(resolved["weather"]), soil_payload=_json_ready(resolved["soil"]), crop_payload=_json_ready(resolved["crop_parameters"]), site_payload=_json_ready(resolved["site_parameters"]), agromanagement_payload=_json_ready(merged_agromanagement), ) run_specs.append( { "instance": run, "weather": resolved["weather"], "soil": resolved["soil"], "crop_parameters": resolved["crop_parameters"], "site_parameters": resolved["site_parameters"], "agromanagement": merged_agromanagement, } ) return self._execute_scenario(scenario=scenario, run_specs=run_specs) def get_scenario_result(self, scenario_id: int) -> dict[str, Any]: scenario = SimulationScenario.objects.prefetch_related("runs").get(pk=scenario_id) return { "id": scenario.id, "name": scenario.name, "scenario_type": scenario.scenario_type, "status": scenario.status, "model_name": scenario.model_name, "input_payload": scenario.input_payload, "result_payload": scenario.result_payload, "error_message": scenario.error_message, "runs": [ { "id": run.id, "run_key": run.run_key, "label": run.label, "status": run.status, "result_payload": run.result_payload, "error_message": run.error_message, } for run in scenario.runs.all() ], } def _execute_scenario( self, *, scenario: SimulationScenario, run_specs: list[dict[str, Any]], ) -> dict[str, Any]: scenario.status = SimulationScenario.Status.RUNNING scenario.error_message = "" scenario.save(update_fields=["status", "error_message", "updated_at"]) results = [] try: for spec in run_specs: run = spec["instance"] run.status = SimulationScenario.Status.RUNNING run.error_message = "" run.save(update_fields=["status", "error_message", "updated_at"]) result = self.manager.run_simulation( weather=spec["weather"], soil=spec["soil"], crop_parameters=spec["crop_parameters"], agromanagement=spec["agromanagement"], site_parameters=spec["site_parameters"], ) run.status = SimulationScenario.Status.SUCCESS run.result_payload = result run.save(update_fields=["status", "result_payload", "updated_at"]) results.append( { "run_key": run.run_key, "label": run.label, "result": result, } ) except Exception as exc: message = str(exc) run = spec["instance"] run.status = SimulationScenario.Status.FAILURE run.error_message = message run.save(update_fields=["status", "error_message", "updated_at"]) scenario.status = SimulationScenario.Status.FAILURE scenario.error_message = message scenario.result_payload = {"runs": results} scenario.save( update_fields=["status", "error_message", "result_payload", "updated_at"] ) raise scenario_result = self._build_scenario_result(scenario, results) scenario.status = SimulationScenario.Status.SUCCESS scenario.result_payload = scenario_result scenario.error_message = "" scenario.save( update_fields=["status", "result_payload", "error_message", "updated_at"] ) return scenario_result def _build_scenario_result( self, scenario: SimulationScenario, results: list[dict[str, Any]], ) -> dict[str, Any]: payload = { "scenario_id": scenario.id, "scenario_type": scenario.scenario_type, "status": SimulationScenario.Status.SUCCESS, "runs": results, } if scenario.scenario_type == SimulationScenario.ScenarioType.SINGLE: payload["result"] = results[0]["result"] return payload run_metrics = [ { "run_key": item["run_key"], "label": item["label"], "yield_estimate": float(item["result"]["metrics"]["yield_estimate"] or 0.0), "biomass": float(item["result"]["metrics"]["biomass"] or 0.0), } for item in results ] best = max(run_metrics, key=lambda item: item["yield_estimate"]) payload["comparison"] = { "best_run_key": best["run_key"], "best_label": best["label"], "best_yield_estimate": best["yield_estimate"], "runs": run_metrics, } if scenario.scenario_type == SimulationScenario.ScenarioType.CROP_COMPARISON: if len(run_metrics) >= 2: payload["comparison"]["yield_gap"] = round( abs(run_metrics[0]["yield_estimate"] - run_metrics[1]["yield_estimate"]), 3, ) if ( scenario.scenario_type == SimulationScenario.ScenarioType.FERTILIZATION_COMPARISON ): payload["recommendation"] = { "recommended_run_key": best["run_key"], "recommended_label": best["label"], "expected_yield_estimate": best["yield_estimate"], } return payload @transaction.atomic def run_single_simulation(**kwargs) -> dict[str, Any]: return CropSimulationService().run_single_simulation(**kwargs) @transaction.atomic def compare_crops(**kwargs) -> dict[str, Any]: return CropSimulationService().compare_crops(**kwargs) @transaction.atomic def recommend_best_crop(**kwargs) -> dict[str, Any]: return CropSimulationService().recommend_best_crop(**kwargs) @transaction.atomic def compare_fertilization_strategies(**kwargs) -> dict[str, Any]: return CropSimulationService().compare_fertilization_strategies(**kwargs)