from __future__ import annotations import importlib from dataclasses import dataclass from datetime import date, datetime from typing import Any from django.db import transaction from .models import SimulationRun, SimulationScenario DEFAULT_OUTPUT_VARS = ["DVS", "LAI", "TAGP", "TWSO", "SM"] DEFAULT_SUMMARY_VARS = ["TAGP", "TWSO", "CTRAT", "RD"] DEFAULT_TERMINAL_VARS = ["TAGP", "TWSO", "LAI", "DVS"] class CropSimulationError(Exception): pass def _json_ready(value: Any) -> Any: if isinstance(value, dict): return {str(key): _json_ready(item) for key, item in value.items()} if isinstance(value, list): return [_json_ready(item) for item in value] if isinstance(value, tuple): return [_json_ready(item) for item in value] if isinstance(value, (date, datetime)): return value.isoformat() return value def _coerce_date(value: Any) -> date: if isinstance(value, date) and not isinstance(value, datetime): return value if isinstance(value, datetime): return value.date() if isinstance(value, str): return date.fromisoformat(value) raise CropSimulationError(f"Unsupported date value: {value!r}") def _normalize_weather_records(weather: Any) -> list[dict[str, Any]]: if isinstance(weather, dict): if "records" in weather: records = weather["records"] else: records = [weather] else: records = weather if not isinstance(records, list) or not records: raise CropSimulationError("Weather input must contain at least one record.") normalized = [] for raw in records: if not isinstance(raw, dict): raise CropSimulationError("Weather records must be dictionaries.") current_date = _coerce_date(raw.get("DAY") or raw.get("day")) normalized.append( { "DAY": current_date, "LAT": float(raw.get("LAT", raw.get("lat", 0.0))), "LON": float(raw.get("LON", raw.get("lon", 0.0))), "ELEV": float(raw.get("ELEV", raw.get("elev", 0.0))), "IRRAD": float(raw.get("IRRAD", raw.get("irrad", 15_000_000.0))), "TMIN": float(raw.get("TMIN", raw.get("tmin", 10.0))), "TMAX": float(raw.get("TMAX", raw.get("tmax", 20.0))), "VAP": float(raw.get("VAP", raw.get("vap", 12.0))), "WIND": float(raw.get("WIND", raw.get("wind", 2.0))), "RAIN": float(raw.get("RAIN", raw.get("rain", 0.0))), "E0": float(raw.get("E0", raw.get("e0", 0.35))), "ES0": float(raw.get("ES0", raw.get("es0", 0.3))), "ET0": float(raw.get("ET0", raw.get("et0", 0.32))), } ) return normalized def _normalize_agromanagement(agromanagement: Any) -> list[dict[str, Any]]: if isinstance(agromanagement, dict) and "AgroManagement" in agromanagement: campaigns = agromanagement["AgroManagement"] elif isinstance(agromanagement, list): campaigns = agromanagement elif isinstance(agromanagement, dict): campaigns = [agromanagement] else: raise CropSimulationError("Agromanagement input must be a dict or list.") if not campaigns: raise CropSimulationError("Agromanagement input cannot be empty.") return campaigns def _normalize_pcse_output_records(records: Any) -> list[dict[str, Any]]: if records is None: return [] if isinstance(records, dict): return [records] if isinstance(records, list): return records if isinstance(records, tuple): return list(records) return [records] def _pick_first_not_none(*values: Any) -> Any: for value in values: if value is not None: return value return None def _extract_total_n(agromanagement: list[dict[str, Any]]) -> float: total_n = 0.0 for campaign in agromanagement: if not isinstance(campaign, dict): continue for payload in campaign.values(): if not isinstance(payload, dict): continue for bucket_name in ("TimedEvents", "StateEvents"): for event_group in payload.get(bucket_name, []) or []: if not isinstance(event_group, dict): continue for event in event_group.get("events_table", []) or []: if not isinstance(event, dict): continue for event_payload in event.values(): if isinstance(event_payload, dict): total_n += float(event_payload.get("N_amount", 0.0)) return total_n def _load_pcse_bindings() -> dict[str, Any] | None: try: base_module = importlib.import_module("pcse.base") models_module = importlib.import_module("pcse.models") except ImportError: return None parameter_provider = getattr(base_module, "ParameterProvider", None) weather_provider = getattr(base_module, "WeatherDataProvider", object) weather_container = getattr(base_module, "WeatherDataContainer", None) if weather_container is None or parameter_provider is None: return None return { "ParameterProvider": parameter_provider, "WeatherDataProvider": weather_provider, "WeatherDataContainer": weather_container, "models": models_module, } def _resolve_model_class(bindings: dict[str, Any], model_name: str): models_source = bindings["models"] if isinstance(models_source, dict): return models_source[model_name] return getattr(models_source, model_name) @dataclass class PreparedSimulationInput: weather: list[dict[str, Any]] soil: dict[str, Any] crop: dict[str, Any] site: dict[str, Any] agromanagement: list[dict[str, Any]] class PcseSimulationManager: def __init__(self, model_name: str = "Wofost72_WLP_CWB"): self.model_name = model_name def run_simulation( self, *, weather: Any, soil: dict[str, Any], crop_parameters: dict[str, Any], agromanagement: Any, site_parameters: dict[str, Any] | None = None, ) -> dict[str, Any]: prepared = PreparedSimulationInput( weather=_normalize_weather_records(weather), soil=soil or {}, crop=crop_parameters or {}, site=site_parameters or {}, agromanagement=_normalize_agromanagement(agromanagement), ) bindings = _load_pcse_bindings() if bindings is None: raise CropSimulationError( "PCSE is not installed or required PCSE classes could not be loaded." ) return self._run_with_pcse(prepared, bindings) def _run_with_pcse( self, prepared: PreparedSimulationInput, bindings: dict[str, Any], ) -> dict[str, Any]: weather_provider_base = bindings["WeatherDataProvider"] weather_container = bindings["WeatherDataContainer"] parameter_provider_cls = bindings["ParameterProvider"] model_cls = _resolve_model_class(bindings, self.model_name) class DictWeatherProvider(weather_provider_base): def __init__(self, records: list[dict[str, Any]]): super().__init__() self._records = { item["DAY"]: weather_container(**item) for item in records } def __call__(self, day): return self._records[_coerce_date(day)] parameter_provider = parameter_provider_cls( cropdata=prepared.crop, soildata=prepared.soil, sitedata=prepared.site, ) simulation = model_cls( parameterprovider=parameter_provider, weatherdataprovider=DictWeatherProvider(prepared.weather), agromanagement=prepared.agromanagement, output_vars=DEFAULT_OUTPUT_VARS, summary_vars=DEFAULT_SUMMARY_VARS, terminal_vars=DEFAULT_TERMINAL_VARS, ) if hasattr(simulation, "run_till_terminate"): simulation.run_till_terminate() elif hasattr(simulation, "run"): simulation.run(days=len(prepared.weather)) else: raise CropSimulationError("PCSE model does not expose a runnable interface.") daily_output = _normalize_pcse_output_records(simulation.get_output()) summary_output = _normalize_pcse_output_records(simulation.get_summary_output()) terminal_output = _normalize_pcse_output_records(simulation.get_terminal_output()) return self._build_result( engine="pcse", daily_output=daily_output, summary_output=summary_output, terminal_output=terminal_output, ) def _build_result( self, *, engine: str, daily_output: list[dict[str, Any]], summary_output: list[dict[str, Any]], terminal_output: list[dict[str, Any]], ) -> dict[str, Any]: terminal = terminal_output[-1] if terminal_output else {} summary = summary_output[-1] if summary_output else {} final_daily = daily_output[-1] if daily_output else {} metrics = { "yield_estimate": _pick_first_not_none( terminal.get("TWSO"), summary.get("TWSO"), final_daily.get("TWSO"), ), "biomass": _pick_first_not_none( terminal.get("TAGP"), summary.get("TAGP"), final_daily.get("TAGP"), ), "max_lai": _pick_first_not_none( terminal.get("LAI"), summary.get("LAIMAX"), final_daily.get("LAI"), ), } return { "engine": engine, "model_name": self.model_name, "metrics": _json_ready(metrics), "daily_output": _json_ready(daily_output), "summary_output": _json_ready(summary_output), "terminal_output": _json_ready(terminal_output), } class CropSimulationService: def __init__(self, manager: PcseSimulationManager | None = None): self.manager = manager or PcseSimulationManager() def run_single_simulation( self, *, weather: Any, soil: dict[str, Any], crop_parameters: dict[str, Any], agromanagement: Any, site_parameters: dict[str, Any] | None = None, name: str = "", ) -> dict[str, Any]: scenario = SimulationScenario.objects.create( name=name, scenario_type=SimulationScenario.ScenarioType.SINGLE, model_name=self.manager.model_name, input_payload=_json_ready( { "weather": weather, "soil": soil, "crop_parameters": crop_parameters, "site_parameters": site_parameters or {}, "agromanagement": agromanagement, } ), ) run = SimulationRun.objects.create( scenario=scenario, run_key="single", label=name or "single", weather_payload=_json_ready(weather), soil_payload=_json_ready(soil), crop_payload=_json_ready(crop_parameters), site_payload=_json_ready(site_parameters or {}), agromanagement_payload=_json_ready(agromanagement), ) return self._execute_scenario( scenario=scenario, run_specs=[ { "instance": run, "weather": weather, "soil": soil, "crop_parameters": crop_parameters, "site_parameters": site_parameters or {}, "agromanagement": agromanagement, } ], ) def compare_crops( self, *, weather: Any, soil: dict[str, Any], crop_a: dict[str, Any], crop_b: dict[str, Any], agromanagement: Any, site_parameters: dict[str, Any] | None = None, name: str = "", ) -> dict[str, Any]: scenario = SimulationScenario.objects.create( name=name, scenario_type=SimulationScenario.ScenarioType.CROP_COMPARISON, model_name=self.manager.model_name, input_payload=_json_ready( { "weather": weather, "soil": soil, "crop_a": crop_a, "crop_b": crop_b, "site_parameters": site_parameters or {}, "agromanagement": agromanagement, } ), ) runs = [ SimulationRun.objects.create( scenario=scenario, run_key="crop_a", label=crop_a.get("crop_name", "crop_a"), weather_payload=_json_ready(weather), soil_payload=_json_ready(soil), crop_payload=_json_ready(crop_a), site_payload=_json_ready(site_parameters or {}), agromanagement_payload=_json_ready(agromanagement), ), SimulationRun.objects.create( scenario=scenario, run_key="crop_b", label=crop_b.get("crop_name", "crop_b"), weather_payload=_json_ready(weather), soil_payload=_json_ready(soil), crop_payload=_json_ready(crop_b), site_payload=_json_ready(site_parameters or {}), agromanagement_payload=_json_ready(agromanagement), ), ] return self._execute_scenario( scenario=scenario, run_specs=[ { "instance": runs[0], "weather": weather, "soil": soil, "crop_parameters": crop_a, "site_parameters": site_parameters or {}, "agromanagement": agromanagement, }, { "instance": runs[1], "weather": weather, "soil": soil, "crop_parameters": crop_b, "site_parameters": site_parameters or {}, "agromanagement": agromanagement, }, ], ) def compare_fertilization_strategies( self, *, weather: Any, soil: dict[str, Any], crop_parameters: dict[str, Any], strategies: list[dict[str, Any]], site_parameters: dict[str, Any] | None = None, name: str = "", ) -> dict[str, Any]: if len(strategies) < 2: raise CropSimulationError("At least two fertilization strategies are required.") scenario = SimulationScenario.objects.create( name=name, scenario_type=SimulationScenario.ScenarioType.FERTILIZATION_COMPARISON, model_name=self.manager.model_name, input_payload=_json_ready( { "weather": weather, "soil": soil, "crop_parameters": crop_parameters, "site_parameters": site_parameters or {}, "strategies": strategies, } ), ) run_specs = [] for index, strategy in enumerate(strategies, start=1): run = SimulationRun.objects.create( scenario=scenario, run_key=f"strategy_{index}", label=strategy.get("label", f"strategy_{index}"), weather_payload=_json_ready(weather), soil_payload=_json_ready(soil), crop_payload=_json_ready(crop_parameters), site_payload=_json_ready(site_parameters or {}), agromanagement_payload=_json_ready(strategy["agromanagement"]), ) run_specs.append( { "instance": run, "weather": weather, "soil": soil, "crop_parameters": crop_parameters, "site_parameters": site_parameters or {}, "agromanagement": strategy["agromanagement"], } ) return self._execute_scenario(scenario=scenario, run_specs=run_specs) def get_scenario_result(self, scenario_id: int) -> dict[str, Any]: scenario = SimulationScenario.objects.prefetch_related("runs").get(pk=scenario_id) return { "id": scenario.id, "name": scenario.name, "scenario_type": scenario.scenario_type, "status": scenario.status, "model_name": scenario.model_name, "input_payload": scenario.input_payload, "result_payload": scenario.result_payload, "error_message": scenario.error_message, "runs": [ { "id": run.id, "run_key": run.run_key, "label": run.label, "status": run.status, "result_payload": run.result_payload, "error_message": run.error_message, } for run in scenario.runs.all() ], } def _execute_scenario( self, *, scenario: SimulationScenario, run_specs: list[dict[str, Any]], ) -> dict[str, Any]: scenario.status = SimulationScenario.Status.RUNNING scenario.error_message = "" scenario.save(update_fields=["status", "error_message", "updated_at"]) results = [] try: for spec in run_specs: run = spec["instance"] run.status = SimulationScenario.Status.RUNNING run.error_message = "" run.save(update_fields=["status", "error_message", "updated_at"]) result = self.manager.run_simulation( weather=spec["weather"], soil=spec["soil"], crop_parameters=spec["crop_parameters"], agromanagement=spec["agromanagement"], site_parameters=spec["site_parameters"], ) run.status = SimulationScenario.Status.SUCCESS run.result_payload = result run.save(update_fields=["status", "result_payload", "updated_at"]) results.append( { "run_key": run.run_key, "label": run.label, "result": result, } ) except Exception as exc: message = str(exc) run = spec["instance"] run.status = SimulationScenario.Status.FAILURE run.error_message = message run.save(update_fields=["status", "error_message", "updated_at"]) scenario.status = SimulationScenario.Status.FAILURE scenario.error_message = message scenario.result_payload = {"runs": results} scenario.save( update_fields=["status", "error_message", "result_payload", "updated_at"] ) raise scenario_result = self._build_scenario_result(scenario, results) scenario.status = SimulationScenario.Status.SUCCESS scenario.result_payload = scenario_result scenario.error_message = "" scenario.save( update_fields=["status", "result_payload", "error_message", "updated_at"] ) return scenario_result def _build_scenario_result( self, scenario: SimulationScenario, results: list[dict[str, Any]], ) -> dict[str, Any]: payload = { "scenario_id": scenario.id, "scenario_type": scenario.scenario_type, "status": SimulationScenario.Status.SUCCESS, "runs": results, } if scenario.scenario_type == SimulationScenario.ScenarioType.SINGLE: payload["result"] = results[0]["result"] return payload run_metrics = [ { "run_key": item["run_key"], "label": item["label"], "yield_estimate": float(item["result"]["metrics"]["yield_estimate"] or 0.0), "biomass": float(item["result"]["metrics"]["biomass"] or 0.0), } for item in results ] best = max(run_metrics, key=lambda item: item["yield_estimate"]) payload["comparison"] = { "best_run_key": best["run_key"], "best_label": best["label"], "best_yield_estimate": best["yield_estimate"], "runs": run_metrics, } if scenario.scenario_type == SimulationScenario.ScenarioType.CROP_COMPARISON: payload["comparison"]["yield_gap"] = round( abs(run_metrics[0]["yield_estimate"] - run_metrics[1]["yield_estimate"]), 3, ) if ( scenario.scenario_type == SimulationScenario.ScenarioType.FERTILIZATION_COMPARISON ): payload["recommendation"] = { "recommended_run_key": best["run_key"], "recommended_label": best["label"], "expected_yield_estimate": best["yield_estimate"], } return payload @transaction.atomic def run_single_simulation(**kwargs) -> dict[str, Any]: return CropSimulationService().run_single_simulation(**kwargs) @transaction.atomic def compare_crops(**kwargs) -> dict[str, Any]: return CropSimulationService().compare_crops(**kwargs) @transaction.atomic def compare_fertilization_strategies(**kwargs) -> dict[str, Any]: return CropSimulationService().compare_fertilization_strategies(**kwargs)