from __future__ import annotations from copy import deepcopy from dataclasses import dataclass from datetime import date, datetime, timedelta from math import exp from typing import Any from django.core.paginator import EmptyPage, Paginator from farm_data.models import SensorData from plant.gdd import calculate_daily_gdd, resolve_growth_profile from weather.models import WeatherForecast from .services import CropSimulationService, build_simulation_payload_from_farm DEFAULT_DYNAMIC_PARAMETERS = ["DVS", "LAI", "TAGP", "TWSO", "SM"] DEFAULT_PAGE_SIZE = 10 MAX_PAGE_SIZE = 50 DEFAULT_STAGE_LABELS = { "pre_emergence": "پیش از سبز شدن", "establishment": "استقرار", "vegetative": "رشد رویشی", "flowering": "گلدهی", "reproductive": "پرشدن محصول", "maturity": "رسیدگی", } class GrowthSimulationError(Exception): pass @dataclass class GrowthSimulationContext: farm_uuid: str | None plant_name: str plant: Any dynamic_parameters: list[str] weather: list[dict[str, Any]] crop_parameters: dict[str, Any] soil_parameters: dict[str, Any] site_parameters: dict[str, Any] agromanagement: list[dict[str, Any]] page_size: int def _safe_float(value: Any, default: float = 0.0) -> float: try: if value in (None, ""): return default return float(value) except (TypeError, ValueError): return default def _coerce_date(value: Any) -> date: if isinstance(value, date) and not isinstance(value, datetime): return value if isinstance(value, datetime): return value.date() if isinstance(value, str): return date.fromisoformat(value) raise GrowthSimulationError(f"Invalid date value: {value!r}") def _json_ready(value: Any) -> Any: if isinstance(value, dict): return {str(key): _json_ready(item) for key, item in value.items()} if isinstance(value, list): return [_json_ready(item) for item in value] if isinstance(value, tuple): return [_json_ready(item) for item in value] if isinstance(value, (date, datetime)): return value.isoformat() return value def _normalize_weather_records(weather: Any) -> list[dict[str, Any]]: if not weather: return [] records = weather.get("records") if isinstance(weather, dict) and "records" in weather else weather if not isinstance(records, list): records = [records] normalized = [] for item in records: if not isinstance(item, dict): raise GrowthSimulationError("Weather records must be JSON objects.") current_date = _coerce_date(item.get("DAY") or item.get("day")) normalized.append( { "DAY": current_date, "LAT": _safe_float(item.get("LAT", item.get("lat")), 35.7), "LON": _safe_float(item.get("LON", item.get("lon")), 51.4), "ELEV": _safe_float(item.get("ELEV", item.get("elev")), 1200.0), "IRRAD": _safe_float(item.get("IRRAD", item.get("irrad")), 16_000_000.0), "TMIN": _safe_float(item.get("TMIN", item.get("tmin")), 12.0), "TMAX": _safe_float(item.get("TMAX", item.get("tmax")), 24.0), "VAP": _safe_float(item.get("VAP", item.get("vap")), 12.0), "WIND": _safe_float(item.get("WIND", item.get("wind")), 2.0), "RAIN": _safe_float(item.get("RAIN", item.get("rain")), 0.0), "E0": _safe_float(item.get("E0", item.get("e0")), 0.35), "ES0": _safe_float(item.get("ES0", item.get("es0")), 0.3), "ET0": _safe_float(item.get("ET0", item.get("et0")), 0.32), } ) if not normalized: raise GrowthSimulationError("At least one weather record is required.") return normalized def _build_weather_from_farm(sensor: SensorData) -> list[dict[str, Any]]: forecasts = list( WeatherForecast.objects.filter(location=sensor.center_location) .order_by("forecast_date")[:14] ) if not forecasts: raise GrowthSimulationError("No forecast data found for the selected farm.") records = [] for forecast in forecasts: records.append( { "DAY": forecast.forecast_date, "LAT": float(sensor.center_location.latitude), "LON": float(sensor.center_location.longitude), "ELEV": 1200.0, "IRRAD": 16_000_000.0, "TMIN": _safe_float(forecast.temperature_min, 12.0), "TMAX": _safe_float(forecast.temperature_max, 24.0), "VAP": max(_safe_float(forecast.humidity_mean, 55.0) / 5.0, 6.0), "WIND": _safe_float(forecast.wind_speed_max, 7.2) / 3.6, "RAIN": _safe_float(forecast.precipitation, 0.0), "E0": _safe_float(forecast.et0, 0.35), "ES0": max(_safe_float(forecast.et0, 0.35) * 0.9, 0.1), "ET0": _safe_float(forecast.et0, 0.35), } ) return records def _build_soil_and_site_from_farm(sensor: SensorData) -> tuple[dict[str, Any], dict[str, Any]]: depths = list(sensor.center_location.depths.all()) top_depth = depths[0] if depths else None smfcf = _safe_float(getattr(top_depth, "wv0033", None), 0.34) smw = _safe_float(getattr(top_depth, "wv1500", None), 0.14) soil_moisture = None payload = sensor.sensor_payload or {} if isinstance(payload, dict): for block in payload.values(): if isinstance(block, dict) and block.get("soil_moisture") is not None: soil_moisture = _safe_float(block.get("soil_moisture")) break site = {"WAV": soil_moisture if soil_moisture is not None else 40.0} soil = {"SMFCF": smfcf, "SMW": smw, "RDMSOL": 120.0} return soil, site def _build_default_crop_parameters(plant: Any) -> dict[str, Any]: profile = resolve_growth_profile(plant) required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0) return { "crop_name": plant.name, "TSUM1": round(required_gdd * 0.45, 3), "TSUM2": round(required_gdd * 0.55, 3), "YIELD_SCALE": 1.0, "MAX_LAI": 5.0, "MAX_BIOMASS": 12000.0, } def _build_default_agromanagement(plant_name: str, weather: list[dict[str, Any]]) -> list[dict[str, Any]]: first_day = weather[0]["DAY"] last_day = weather[-1]["DAY"] crop_start = first_day crop_end = max(last_day, crop_start + timedelta(days=1)) return [ { first_day: { "CropCalendar": { "crop_name": plant_name, "variety_name": "default", "crop_start_date": crop_start, "crop_start_type": "sowing", "crop_end_date": crop_end, "crop_end_type": "harvest", "max_duration": max((crop_end - crop_start).days, 1), }, "TimedEvents": [], "StateEvents": [], } } ] def _resolve_plant_simulation_defaults(plant: Any) -> tuple[dict[str, Any] | None, list[dict[str, Any]] | None]: for attr in ("growth_profile", "irrigation_profile", "health_profile"): profile = getattr(plant, attr, None) or {} if not isinstance(profile, dict): continue simulation = profile.get("simulation") if not isinstance(simulation, dict): continue crop_parameters = simulation.get("crop_parameters") agromanagement = simulation.get("agromanagement") if isinstance(crop_parameters, dict) and agromanagement: return deepcopy(crop_parameters), deepcopy(agromanagement) return None, None def build_growth_context(payload: dict[str, Any]) -> GrowthSimulationContext: plant_name = payload["plant_name"] from plant.models import Plant plant = Plant.objects.filter(name=plant_name).first() if plant is None: raise GrowthSimulationError("Plant not found.") dynamic_parameters = payload.get("dynamic_parameters") or DEFAULT_DYNAMIC_PARAMETERS page_size = min(max(int(payload.get("page_size") or DEFAULT_PAGE_SIZE), 1), MAX_PAGE_SIZE) sensor = None resolved_farm_uuid = str(payload["farm_uuid"]) if payload.get("farm_uuid") else None if payload.get("farm_uuid"): sensor = ( SensorData.objects.select_related("center_location") .prefetch_related("center_location__depths") .filter(farm_uuid=payload["farm_uuid"]) .first() ) if sensor is None: raise GrowthSimulationError("Farm not found.") if resolved_farm_uuid: farm_payload = build_simulation_payload_from_farm( farm_uuid=resolved_farm_uuid, plant_name=plant_name, weather=payload.get("weather"), soil=payload.get("soil_parameters"), crop_parameters=payload.get("crop_parameters"), agromanagement=payload.get("agromanagement"), site_parameters=payload.get("site_parameters"), ) weather = farm_payload["weather"] crop_parameters = farm_payload["crop_parameters"] soil_parameters = farm_payload["soil"] site_parameters = farm_payload["site_parameters"] agromanagement = farm_payload["agromanagement"] plant = farm_payload["plant"] or plant return GrowthSimulationContext( farm_uuid=resolved_farm_uuid, plant_name=plant_name, plant=plant, dynamic_parameters=dynamic_parameters, weather=weather, crop_parameters=crop_parameters, soil_parameters=soil_parameters, site_parameters=site_parameters, agromanagement=agromanagement, page_size=page_size, ) weather = ( _normalize_weather_records(payload["weather"]) if payload.get("weather") else _build_weather_from_farm(sensor) if sensor is not None else [] ) if not weather: raise GrowthSimulationError("Weather input is required.") default_crop_parameters, default_agromanagement = _resolve_plant_simulation_defaults(plant) crop_parameters = deepcopy(payload.get("crop_parameters") or default_crop_parameters or _build_default_crop_parameters(plant)) crop_parameters.setdefault("crop_name", plant.name) soil_parameters = deepcopy(payload.get("soil_parameters") or {}) site_parameters = deepcopy(payload.get("site_parameters") or {}) if sensor is not None: farm_soil, farm_site = _build_soil_and_site_from_farm(sensor) soil_parameters = {**farm_soil, **soil_parameters} site_parameters = {**farm_site, **site_parameters} soil_parameters.setdefault("SMFCF", 0.34) soil_parameters.setdefault("SMW", 0.14) soil_parameters.setdefault("RDMSOL", 120.0) site_parameters.setdefault("WAV", 40.0) agromanagement = deepcopy( payload.get("agromanagement") or default_agromanagement or _build_default_agromanagement(plant.name, weather) ) return GrowthSimulationContext( farm_uuid=resolved_farm_uuid, plant_name=plant_name, plant=plant, dynamic_parameters=dynamic_parameters, weather=weather, crop_parameters=crop_parameters, soil_parameters=soil_parameters, site_parameters=site_parameters, agromanagement=agromanagement, page_size=page_size, ) def _derive_stage(dvs: float) -> tuple[str, str]: if dvs < 0: return "pre_emergence", DEFAULT_STAGE_LABELS["pre_emergence"] if dvs < 0.2: return "establishment", DEFAULT_STAGE_LABELS["establishment"] if dvs < 1.0: return "vegetative", DEFAULT_STAGE_LABELS["vegetative"] if dvs < 1.3: return "flowering", DEFAULT_STAGE_LABELS["flowering"] if dvs < 2.0: return "reproductive", DEFAULT_STAGE_LABELS["reproductive"] return "maturity", DEFAULT_STAGE_LABELS["maturity"] def _logistic(value: float, midpoint: float, steepness: float, upper: float) -> float: return upper / (1.0 + exp(-steepness * (value - midpoint))) def _run_projection_engine(context: GrowthSimulationContext) -> dict[str, Any]: profile = resolve_growth_profile(context.plant) required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0) current_gdd = _safe_float(profile.get("current_cumulative_gdd"), 0.0) base_temperature = _safe_float(profile.get("base_temperature"), 10.0) max_lai = _safe_float(context.crop_parameters.get("MAX_LAI"), 5.0) max_biomass = _safe_float(context.crop_parameters.get("MAX_BIOMASS"), 12000.0) soil_moisture = _safe_float(context.site_parameters.get("WAV"), 40.0) daily_output = [] for record in context.weather: tmax = _safe_float(record.get("TMAX"), 24.0) tmin = _safe_float(record.get("TMIN"), 12.0) rain = _safe_float(record.get("RAIN"), 0.0) et0 = _safe_float(record.get("ET0"), 0.32) daily_gdd = calculate_daily_gdd(tmax=tmax, tmin=tmin, tbase=base_temperature) current_gdd += daily_gdd dvs = min(max((current_gdd / max(required_gdd, 1.0)) * 2.0, 0.0), 2.0) if dvs <= 1.0: lai = _logistic(dvs, midpoint=0.55, steepness=7.5, upper=max_lai) else: decline_factor = max(0.25, 1.0 - ((dvs - 1.0) / 1.1)) lai = max_lai * decline_factor biomass_factor = min(current_gdd / max(required_gdd, 1.0), 1.25) weather_modifier = max(0.65, min(1.15, 1.0 + (rain * 0.02) - (et0 * 0.08))) tagp = max_biomass * biomass_factor * weather_modifier twso = tagp * max(min((dvs - 0.95) / 0.85, 0.55), 0.0) soil_moisture = max(5.0, min(95.0, soil_moisture + (rain * 0.7) - (et0 * 8.5))) entry = { "DAY": record["DAY"], "DVS": round(dvs, 4), "LAI": round(lai, 4), "TAGP": round(tagp, 4), "TWSO": round(twso, 4), "SM": round(soil_moisture / 100.0, 4), "GDD": round(daily_gdd, 4), "TMIN": round(tmin, 4), "TMAX": round(tmax, 4), "RAIN": round(rain, 4), "ET0": round(et0, 4), } daily_output.append(entry) final_entry = daily_output[-1] if daily_output else {} return { "engine": "growth_projection", "model_name": "growth_projection_v1", "metrics": { "yield_estimate": round(_safe_float(final_entry.get("TWSO"), 0.0), 4), "biomass": round(_safe_float(final_entry.get("TAGP"), 0.0), 4), "max_lai": round(max((_safe_float(item.get("LAI"), 0.0) for item in daily_output), default=0.0), 4), }, "daily_output": _json_ready(daily_output), "summary_output": [], "terminal_output": [_json_ready(final_entry)] if final_entry else [], } def _run_simulation(context: GrowthSimulationContext) -> tuple[dict[str, Any], int | None, str | None]: try: response = CropSimulationService().run_single_simulation( farm_uuid=context.farm_uuid, plant_name=context.plant_name, weather=context.weather, soil=context.soil_parameters, crop_parameters=context.crop_parameters, agromanagement=context.agromanagement, site_parameters=context.site_parameters, name=f"growth:{context.plant_name}", ) return response["result"], response.get("scenario_id"), None except Exception as exc: raise GrowthSimulationError(f"Simulation engine failed: {exc}") from exc def summarize_growth_stages( daily_output: list[dict[str, Any]], dynamic_parameters: list[str], ) -> list[dict[str, Any]]: if not daily_output: return [] stage_items = [] current = None for raw in daily_output: record = dict(raw) day = _coerce_date(record.get("DAY") or record.get("day")) dvs = _safe_float(record.get("DVS"), 0.0) stage_code, stage_name = _derive_stage(dvs) parameter_values = {} for param in dynamic_parameters: if record.get(param) is not None: parameter_values[param] = _safe_float(record.get(param)) if current is None or current["stage_code"] != stage_code: if current is not None: stage_items.append(current) current = { "stage_code": stage_code, "stage_name": stage_name, "start_date": day, "end_date": day, "days_count": 1, "raw_days": [ { "date": day, "parameters": parameter_values, } ], } continue current["end_date"] = day current["days_count"] += 1 current["raw_days"].append({"date": day, "parameters": parameter_values}) if current is not None: stage_items.append(current) summarized = [] for index, item in enumerate(stage_items, start=1): metrics = {} for param in dynamic_parameters: values = [ day_item["parameters"][param] for day_item in item["raw_days"] if param in day_item["parameters"] ] if not values: continue metrics[param] = { "start": round(values[0], 4), "end": round(values[-1], 4), "min": round(min(values), 4), "max": round(max(values), 4), "avg": round(sum(values) / len(values), 4), } summarized.append( { "order": index, "stage_code": item["stage_code"], "stage_name": item["stage_name"], "start_date": item["start_date"].isoformat(), "end_date": item["end_date"].isoformat(), "days_count": item["days_count"], "metrics": metrics, } ) return summarized def paginate_growth_stages( stage_timeline: list[dict[str, Any]], *, page: int, page_size: int, ) -> dict[str, Any]: page_size = min(max(page_size, 1), MAX_PAGE_SIZE) if not stage_timeline: return { "items": [], "pagination": { "page": 1, "page_size": page_size, "total_items": 0, "total_pages": 0, "has_next": False, "has_previous": False, }, } paginator = Paginator(stage_timeline, page_size) try: page_obj = paginator.page(page) except EmptyPage: page_obj = paginator.page(paginator.num_pages or 1) return { "items": list(page_obj.object_list), "pagination": { "page": page_obj.number, "page_size": page_size, "total_items": paginator.count, "total_pages": paginator.num_pages, "has_next": page_obj.has_next(), "has_previous": page_obj.has_previous(), }, } def run_growth_simulation(payload: dict[str, Any], progress_callback=None) -> dict[str, Any]: context = build_growth_context(payload) if progress_callback is not None: progress_callback( state="PROGRESS", meta={"current": 1, "total": 3, "message": "simulation input resolved"}, ) simulation_result, scenario_id, simulation_error = _run_simulation(context) if progress_callback is not None: progress_callback( state="PROGRESS", meta={"current": 2, "total": 3, "message": "simulation finished"}, ) stage_timeline = summarize_growth_stages( daily_output=simulation_result.get("daily_output", []), dynamic_parameters=context.dynamic_parameters, ) if progress_callback is not None: progress_callback( state="PROGRESS", meta={"current": 3, "total": 3, "message": "growth stages prepared"}, ) paginated = paginate_growth_stages( stage_timeline, page=1, page_size=context.page_size, ) return { "plant_name": context.plant_name, "dynamic_parameters": context.dynamic_parameters, "engine": simulation_result.get("engine"), "model_name": simulation_result.get("model_name"), "scenario_id": scenario_id, "simulation_warning": simulation_error, "summary_metrics": simulation_result.get("metrics", {}), "stage_timeline": stage_timeline, "stages_page": paginated["items"], "pagination": paginated["pagination"], "daily_records_count": len(simulation_result.get("daily_output", [])), "default_page_size": context.page_size, } def _estimate_leaf_count(lai: float) -> float: return max(lai, 0.0) * 12000.0 def _build_current_farm_chart_payload( context: GrowthSimulationContext, simulation_result: dict[str, Any], scenario_id: int | None, simulation_warning: str | None, ) -> dict[str, Any]: daily_output = simulation_result.get("daily_output") or [] categories = [str(item.get("DAY")) for item in daily_output] leaf_count_series = [round(_estimate_leaf_count(_safe_float(item.get("LAI"))), 2) for item in daily_output] biomass_series = [round(_safe_float(item.get("TAGP")), 2) for item in daily_output] storage_weight_series = [round(_safe_float(item.get("TWSO")), 2) for item in daily_output] lai_series = [round(_safe_float(item.get("LAI")), 4) for item in daily_output] moisture_series = [round(_safe_float(item.get("SM")) * 100.0, 2) for item in daily_output] latest = daily_output[-1] if daily_output else {} latest_lai = _safe_float(latest.get("LAI"), 0.0) latest_biomass = _safe_float(latest.get("TAGP"), 0.0) latest_storage = _safe_float(latest.get("TWSO"), 0.0) latest_moisture = _safe_float(latest.get("SM"), 0.0) * 100.0 summary = [ { "title": "تعداد برگ تخمینی", "subtitle": "وضعیت فعلی", "amount": round(_estimate_leaf_count(latest_lai), 2), "unit": "leaf", "avatarColor": "success", "avatarIcon": "tabler-leaf", }, { "title": "وزن بیوماس", "subtitle": "برآورد فعلی", "amount": round(latest_biomass, 2), "unit": "kg/ha", "avatarColor": "primary", "avatarIcon": "tabler-chart-bar", }, { "title": "وزن محصول", "subtitle": "برآورد فعلی", "amount": round(latest_storage, 2), "unit": "kg/ha", "avatarColor": "warning", "avatarIcon": "tabler-scale", }, { "title": "رطوبت خاک", "subtitle": "آخرین روز", "amount": round(latest_moisture, 2), "unit": "%", "avatarColor": "info", "avatarIcon": "tabler-droplet", }, ] return { "farm_uuid": context.farm_uuid, "plant_name": context.plant_name, "engine": simulation_result.get("engine"), "model_name": simulation_result.get("model_name"), "scenario_id": scenario_id, "simulation_warning": simulation_warning, "categories": categories, "series": [ {"name": "تعداد برگ تخمینی", "key": "leaf_count_estimate", "data": leaf_count_series}, {"name": "وزن بیوماس", "key": "biomass_weight", "data": biomass_series}, {"name": "وزن محصول", "key": "storage_organ_weight", "data": storage_weight_series}, {"name": "شاخص سطح برگ", "key": "lai", "data": lai_series}, {"name": "رطوبت خاک", "key": "soil_moisture_percent", "data": moisture_series}, ], "summary": summary, "current_state": { "date": latest.get("DAY"), "leaf_count_estimate": round(_estimate_leaf_count(latest_lai), 2), "leaf_area_index": round(latest_lai, 4), "biomass_weight": round(latest_biomass, 2), "storage_organ_weight": round(latest_storage, 2), "soil_moisture_percent": round(latest_moisture, 2), "development_stage": round(_safe_float(latest.get("DVS"), 0.0), 4), "gdd": round(_safe_float(latest.get("GDD"), 0.0), 2), }, "metrics": simulation_result.get("metrics") or {}, "daily_output": daily_output, } class CurrentFarmChartSimulator: """سازنده chart وضعیت فعلی مزرعه برای خروجی مستقل از dashboard.""" def simulate(self, *, farm_uuid: str, plant_name: str | None = None) -> dict[str, Any]: if not farm_uuid: raise GrowthSimulationError("farm_uuid is required.") resolved_plant_name = plant_name if not resolved_plant_name: sensor = ( SensorData.objects.prefetch_related("plants") .filter(farm_uuid=farm_uuid) .first() ) if sensor is None: raise GrowthSimulationError("Farm not found.") plant = sensor.plants.first() if plant is None: raise GrowthSimulationError("Plant not found for the selected farm.") resolved_plant_name = plant.name context = build_growth_context( { "farm_uuid": farm_uuid, "plant_name": resolved_plant_name, "dynamic_parameters": DEFAULT_DYNAMIC_PARAMETERS, "page_size": DEFAULT_PAGE_SIZE, } ) simulation_result, scenario_id, simulation_warning = _run_simulation(context) return _build_current_farm_chart_payload( context, simulation_result, scenario_id, simulation_warning, )