Files
Ai/crop_simulation/growth_simulation.py
T

569 lines
21 KiB
Python
Raw Normal View History

2026-04-24 18:34:17 +03:30
from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from math import exp
from typing import Any
from django.core.paginator import EmptyPage, Paginator
from farm_data.models import SensorData
from plant.gdd import calculate_daily_gdd, resolve_growth_profile
from weather.models import WeatherForecast
2026-04-24 22:20:15 +03:30
from .services import CropSimulationService, build_simulation_payload_from_farm
2026-04-24 18:34:17 +03:30
DEFAULT_DYNAMIC_PARAMETERS = ["DVS", "LAI", "TAGP", "TWSO", "SM"]
DEFAULT_PAGE_SIZE = 10
MAX_PAGE_SIZE = 50
DEFAULT_STAGE_LABELS = {
"pre_emergence": "پیش از سبز شدن",
"establishment": "استقرار",
"vegetative": "رشد رویشی",
"flowering": "گلدهی",
"reproductive": "پرشدن محصول",
"maturity": "رسیدگی",
}
class GrowthSimulationError(Exception):
pass
@dataclass
class GrowthSimulationContext:
2026-04-24 22:20:15 +03:30
farm_uuid: str | None
2026-04-24 18:34:17 +03:30
plant_name: str
plant: Any
dynamic_parameters: list[str]
weather: list[dict[str, Any]]
crop_parameters: dict[str, Any]
soil_parameters: dict[str, Any]
site_parameters: dict[str, Any]
agromanagement: list[dict[str, Any]]
page_size: int
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
if value in (None, ""):
return default
return float(value)
except (TypeError, ValueError):
return default
def _coerce_date(value: Any) -> date:
if isinstance(value, date) and not isinstance(value, datetime):
return value
if isinstance(value, datetime):
return value.date()
if isinstance(value, str):
return date.fromisoformat(value)
raise GrowthSimulationError(f"Invalid date value: {value!r}")
def _json_ready(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _json_ready(item) for key, item in value.items()}
if isinstance(value, list):
return [_json_ready(item) for item in value]
if isinstance(value, tuple):
return [_json_ready(item) for item in value]
if isinstance(value, (date, datetime)):
return value.isoformat()
return value
def _normalize_weather_records(weather: Any) -> list[dict[str, Any]]:
if not weather:
return []
records = weather.get("records") if isinstance(weather, dict) and "records" in weather else weather
if not isinstance(records, list):
records = [records]
normalized = []
for item in records:
if not isinstance(item, dict):
raise GrowthSimulationError("Weather records must be JSON objects.")
current_date = _coerce_date(item.get("DAY") or item.get("day"))
normalized.append(
{
"DAY": current_date,
"LAT": _safe_float(item.get("LAT", item.get("lat")), 35.7),
"LON": _safe_float(item.get("LON", item.get("lon")), 51.4),
"ELEV": _safe_float(item.get("ELEV", item.get("elev")), 1200.0),
"IRRAD": _safe_float(item.get("IRRAD", item.get("irrad")), 16_000_000.0),
"TMIN": _safe_float(item.get("TMIN", item.get("tmin")), 12.0),
"TMAX": _safe_float(item.get("TMAX", item.get("tmax")), 24.0),
"VAP": _safe_float(item.get("VAP", item.get("vap")), 12.0),
"WIND": _safe_float(item.get("WIND", item.get("wind")), 2.0),
"RAIN": _safe_float(item.get("RAIN", item.get("rain")), 0.0),
"E0": _safe_float(item.get("E0", item.get("e0")), 0.35),
"ES0": _safe_float(item.get("ES0", item.get("es0")), 0.3),
"ET0": _safe_float(item.get("ET0", item.get("et0")), 0.32),
}
)
if not normalized:
raise GrowthSimulationError("At least one weather record is required.")
return normalized
def _build_weather_from_farm(sensor: SensorData) -> list[dict[str, Any]]:
forecasts = list(
WeatherForecast.objects.filter(location=sensor.center_location)
.order_by("forecast_date")[:14]
)
if not forecasts:
raise GrowthSimulationError("No forecast data found for the selected farm.")
records = []
for forecast in forecasts:
records.append(
{
"DAY": forecast.forecast_date,
"LAT": float(sensor.center_location.latitude),
"LON": float(sensor.center_location.longitude),
"ELEV": 1200.0,
"IRRAD": 16_000_000.0,
"TMIN": _safe_float(forecast.temperature_min, 12.0),
"TMAX": _safe_float(forecast.temperature_max, 24.0),
"VAP": max(_safe_float(forecast.humidity_mean, 55.0) / 5.0, 6.0),
"WIND": _safe_float(forecast.wind_speed_max, 7.2) / 3.6,
"RAIN": _safe_float(forecast.precipitation, 0.0),
"E0": _safe_float(forecast.et0, 0.35),
"ES0": max(_safe_float(forecast.et0, 0.35) * 0.9, 0.1),
"ET0": _safe_float(forecast.et0, 0.35),
}
)
return records
def _build_soil_and_site_from_farm(sensor: SensorData) -> tuple[dict[str, Any], dict[str, Any]]:
depths = list(sensor.center_location.depths.all())
top_depth = depths[0] if depths else None
smfcf = _safe_float(getattr(top_depth, "wv0033", None), 0.34)
smw = _safe_float(getattr(top_depth, "wv1500", None), 0.14)
soil_moisture = None
payload = sensor.sensor_payload or {}
if isinstance(payload, dict):
for block in payload.values():
if isinstance(block, dict) and block.get("soil_moisture") is not None:
soil_moisture = _safe_float(block.get("soil_moisture"))
break
site = {"WAV": soil_moisture if soil_moisture is not None else 40.0}
soil = {"SMFCF": smfcf, "SMW": smw, "RDMSOL": 120.0}
return soil, site
def _build_default_crop_parameters(plant: Any) -> dict[str, Any]:
profile = resolve_growth_profile(plant)
required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0)
return {
"crop_name": plant.name,
"TSUM1": round(required_gdd * 0.45, 3),
"TSUM2": round(required_gdd * 0.55, 3),
"YIELD_SCALE": 1.0,
"MAX_LAI": 5.0,
"MAX_BIOMASS": 12000.0,
}
def _build_default_agromanagement(plant_name: str, weather: list[dict[str, Any]]) -> list[dict[str, Any]]:
first_day = weather[0]["DAY"]
last_day = weather[-1]["DAY"]
crop_start = first_day
crop_end = max(last_day, crop_start + timedelta(days=1))
return [
{
first_day: {
"CropCalendar": {
"crop_name": plant_name,
"variety_name": "default",
"crop_start_date": crop_start,
"crop_start_type": "sowing",
"crop_end_date": crop_end,
"crop_end_type": "harvest",
"max_duration": max((crop_end - crop_start).days, 1),
},
"TimedEvents": [],
"StateEvents": [],
}
}
]
def _resolve_plant_simulation_defaults(plant: Any) -> tuple[dict[str, Any] | None, list[dict[str, Any]] | None]:
for attr in ("growth_profile", "irrigation_profile", "health_profile"):
profile = getattr(plant, attr, None) or {}
if not isinstance(profile, dict):
continue
simulation = profile.get("simulation")
if not isinstance(simulation, dict):
continue
crop_parameters = simulation.get("crop_parameters")
agromanagement = simulation.get("agromanagement")
if isinstance(crop_parameters, dict) and agromanagement:
return deepcopy(crop_parameters), deepcopy(agromanagement)
return None, None
def build_growth_context(payload: dict[str, Any]) -> GrowthSimulationContext:
plant_name = payload["plant_name"]
from plant.models import Plant
plant = Plant.objects.filter(name=plant_name).first()
if plant is None:
raise GrowthSimulationError("Plant not found.")
dynamic_parameters = payload.get("dynamic_parameters") or DEFAULT_DYNAMIC_PARAMETERS
page_size = min(max(int(payload.get("page_size") or DEFAULT_PAGE_SIZE), 1), MAX_PAGE_SIZE)
sensor = None
2026-04-24 22:20:15 +03:30
resolved_farm_uuid = str(payload["farm_uuid"]) if payload.get("farm_uuid") else None
2026-04-24 18:34:17 +03:30
if payload.get("farm_uuid"):
sensor = (
SensorData.objects.select_related("center_location")
.prefetch_related("center_location__depths")
.filter(farm_uuid=payload["farm_uuid"])
.first()
)
if sensor is None:
raise GrowthSimulationError("Farm not found.")
2026-04-24 22:20:15 +03:30
if resolved_farm_uuid:
farm_payload = build_simulation_payload_from_farm(
farm_uuid=resolved_farm_uuid,
plant_name=plant_name,
weather=payload.get("weather"),
soil=payload.get("soil_parameters"),
crop_parameters=payload.get("crop_parameters"),
agromanagement=payload.get("agromanagement"),
site_parameters=payload.get("site_parameters"),
)
weather = farm_payload["weather"]
crop_parameters = farm_payload["crop_parameters"]
soil_parameters = farm_payload["soil"]
site_parameters = farm_payload["site_parameters"]
agromanagement = farm_payload["agromanagement"]
plant = farm_payload["plant"] or plant
return GrowthSimulationContext(
farm_uuid=resolved_farm_uuid,
plant_name=plant_name,
plant=plant,
dynamic_parameters=dynamic_parameters,
weather=weather,
crop_parameters=crop_parameters,
soil_parameters=soil_parameters,
site_parameters=site_parameters,
agromanagement=agromanagement,
page_size=page_size,
)
2026-04-24 18:34:17 +03:30
weather = (
_normalize_weather_records(payload["weather"])
if payload.get("weather")
else _build_weather_from_farm(sensor)
if sensor is not None
else []
)
if not weather:
raise GrowthSimulationError("Weather input is required.")
default_crop_parameters, default_agromanagement = _resolve_plant_simulation_defaults(plant)
crop_parameters = deepcopy(payload.get("crop_parameters") or default_crop_parameters or _build_default_crop_parameters(plant))
crop_parameters.setdefault("crop_name", plant.name)
soil_parameters = deepcopy(payload.get("soil_parameters") or {})
site_parameters = deepcopy(payload.get("site_parameters") or {})
if sensor is not None:
farm_soil, farm_site = _build_soil_and_site_from_farm(sensor)
soil_parameters = {**farm_soil, **soil_parameters}
site_parameters = {**farm_site, **site_parameters}
soil_parameters.setdefault("SMFCF", 0.34)
soil_parameters.setdefault("SMW", 0.14)
soil_parameters.setdefault("RDMSOL", 120.0)
site_parameters.setdefault("WAV", 40.0)
agromanagement = deepcopy(
payload.get("agromanagement")
or default_agromanagement
or _build_default_agromanagement(plant.name, weather)
)
return GrowthSimulationContext(
2026-04-24 22:20:15 +03:30
farm_uuid=resolved_farm_uuid,
2026-04-24 18:34:17 +03:30
plant_name=plant_name,
plant=plant,
dynamic_parameters=dynamic_parameters,
weather=weather,
crop_parameters=crop_parameters,
soil_parameters=soil_parameters,
site_parameters=site_parameters,
agromanagement=agromanagement,
page_size=page_size,
)
def _derive_stage(dvs: float) -> tuple[str, str]:
if dvs < 0:
return "pre_emergence", DEFAULT_STAGE_LABELS["pre_emergence"]
if dvs < 0.2:
return "establishment", DEFAULT_STAGE_LABELS["establishment"]
if dvs < 1.0:
return "vegetative", DEFAULT_STAGE_LABELS["vegetative"]
if dvs < 1.3:
return "flowering", DEFAULT_STAGE_LABELS["flowering"]
if dvs < 2.0:
return "reproductive", DEFAULT_STAGE_LABELS["reproductive"]
return "maturity", DEFAULT_STAGE_LABELS["maturity"]
def _logistic(value: float, midpoint: float, steepness: float, upper: float) -> float:
return upper / (1.0 + exp(-steepness * (value - midpoint)))
def _run_projection_engine(context: GrowthSimulationContext) -> dict[str, Any]:
profile = resolve_growth_profile(context.plant)
required_gdd = _safe_float(profile.get("required_gdd_for_maturity"), 1200.0)
current_gdd = _safe_float(profile.get("current_cumulative_gdd"), 0.0)
base_temperature = _safe_float(profile.get("base_temperature"), 10.0)
max_lai = _safe_float(context.crop_parameters.get("MAX_LAI"), 5.0)
max_biomass = _safe_float(context.crop_parameters.get("MAX_BIOMASS"), 12000.0)
soil_moisture = _safe_float(context.site_parameters.get("WAV"), 40.0)
daily_output = []
for record in context.weather:
tmax = _safe_float(record.get("TMAX"), 24.0)
tmin = _safe_float(record.get("TMIN"), 12.0)
rain = _safe_float(record.get("RAIN"), 0.0)
et0 = _safe_float(record.get("ET0"), 0.32)
daily_gdd = calculate_daily_gdd(tmax=tmax, tmin=tmin, tbase=base_temperature)
current_gdd += daily_gdd
dvs = min(max((current_gdd / max(required_gdd, 1.0)) * 2.0, 0.0), 2.0)
if dvs <= 1.0:
lai = _logistic(dvs, midpoint=0.55, steepness=7.5, upper=max_lai)
else:
decline_factor = max(0.25, 1.0 - ((dvs - 1.0) / 1.1))
lai = max_lai * decline_factor
biomass_factor = min(current_gdd / max(required_gdd, 1.0), 1.25)
weather_modifier = max(0.65, min(1.15, 1.0 + (rain * 0.02) - (et0 * 0.08)))
tagp = max_biomass * biomass_factor * weather_modifier
twso = tagp * max(min((dvs - 0.95) / 0.85, 0.55), 0.0)
soil_moisture = max(5.0, min(95.0, soil_moisture + (rain * 0.7) - (et0 * 8.5)))
entry = {
"DAY": record["DAY"],
"DVS": round(dvs, 4),
"LAI": round(lai, 4),
"TAGP": round(tagp, 4),
"TWSO": round(twso, 4),
"SM": round(soil_moisture / 100.0, 4),
"GDD": round(daily_gdd, 4),
"TMIN": round(tmin, 4),
"TMAX": round(tmax, 4),
"RAIN": round(rain, 4),
"ET0": round(et0, 4),
}
daily_output.append(entry)
final_entry = daily_output[-1] if daily_output else {}
return {
"engine": "growth_projection",
"model_name": "growth_projection_v1",
"metrics": {
"yield_estimate": round(_safe_float(final_entry.get("TWSO"), 0.0), 4),
"biomass": round(_safe_float(final_entry.get("TAGP"), 0.0), 4),
"max_lai": round(max((_safe_float(item.get("LAI"), 0.0) for item in daily_output), default=0.0), 4),
},
"daily_output": _json_ready(daily_output),
"summary_output": [],
"terminal_output": [_json_ready(final_entry)] if final_entry else [],
}
def _run_simulation(context: GrowthSimulationContext) -> tuple[dict[str, Any], int | None, str | None]:
try:
response = CropSimulationService().run_single_simulation(
2026-04-24 22:20:15 +03:30
farm_uuid=context.farm_uuid,
plant_name=context.plant_name,
2026-04-24 18:34:17 +03:30
weather=context.weather,
soil=context.soil_parameters,
crop_parameters=context.crop_parameters,
agromanagement=context.agromanagement,
site_parameters=context.site_parameters,
name=f"growth:{context.plant_name}",
)
return response["result"], response.get("scenario_id"), None
except Exception as exc:
fallback = _run_projection_engine(context)
return fallback, None, str(exc)
def summarize_growth_stages(
daily_output: list[dict[str, Any]],
dynamic_parameters: list[str],
) -> list[dict[str, Any]]:
if not daily_output:
return []
stage_items = []
current = None
for raw in daily_output:
record = dict(raw)
day = _coerce_date(record.get("DAY") or record.get("day"))
dvs = _safe_float(record.get("DVS"), 0.0)
stage_code, stage_name = _derive_stage(dvs)
parameter_values = {}
for param in dynamic_parameters:
if record.get(param) is not None:
parameter_values[param] = _safe_float(record.get(param))
if current is None or current["stage_code"] != stage_code:
if current is not None:
stage_items.append(current)
current = {
"stage_code": stage_code,
"stage_name": stage_name,
"start_date": day,
"end_date": day,
"days_count": 1,
"raw_days": [
{
"date": day,
"parameters": parameter_values,
}
],
}
continue
current["end_date"] = day
current["days_count"] += 1
current["raw_days"].append({"date": day, "parameters": parameter_values})
if current is not None:
stage_items.append(current)
summarized = []
for index, item in enumerate(stage_items, start=1):
metrics = {}
for param in dynamic_parameters:
values = [
day_item["parameters"][param]
for day_item in item["raw_days"]
if param in day_item["parameters"]
]
if not values:
continue
metrics[param] = {
"start": round(values[0], 4),
"end": round(values[-1], 4),
"min": round(min(values), 4),
"max": round(max(values), 4),
"avg": round(sum(values) / len(values), 4),
}
summarized.append(
{
"order": index,
"stage_code": item["stage_code"],
"stage_name": item["stage_name"],
"start_date": item["start_date"].isoformat(),
"end_date": item["end_date"].isoformat(),
"days_count": item["days_count"],
"metrics": metrics,
}
)
return summarized
def paginate_growth_stages(
stage_timeline: list[dict[str, Any]],
*,
page: int,
page_size: int,
) -> dict[str, Any]:
page_size = min(max(page_size, 1), MAX_PAGE_SIZE)
if not stage_timeline:
return {
"items": [],
"pagination": {
"page": 1,
"page_size": page_size,
"total_items": 0,
"total_pages": 0,
"has_next": False,
"has_previous": False,
},
}
paginator = Paginator(stage_timeline, page_size)
try:
page_obj = paginator.page(page)
except EmptyPage:
page_obj = paginator.page(paginator.num_pages or 1)
return {
"items": list(page_obj.object_list),
"pagination": {
"page": page_obj.number,
"page_size": page_size,
"total_items": paginator.count,
"total_pages": paginator.num_pages,
"has_next": page_obj.has_next(),
"has_previous": page_obj.has_previous(),
},
}
def run_growth_simulation(payload: dict[str, Any], progress_callback=None) -> dict[str, Any]:
context = build_growth_context(payload)
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={"current": 1, "total": 3, "message": "simulation input resolved"},
)
simulation_result, scenario_id, simulation_error = _run_simulation(context)
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={"current": 2, "total": 3, "message": "simulation finished"},
)
stage_timeline = summarize_growth_stages(
daily_output=simulation_result.get("daily_output", []),
dynamic_parameters=context.dynamic_parameters,
)
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={"current": 3, "total": 3, "message": "growth stages prepared"},
)
paginated = paginate_growth_stages(
stage_timeline,
page=1,
page_size=context.page_size,
)
return {
"plant_name": context.plant_name,
"dynamic_parameters": context.dynamic_parameters,
"engine": simulation_result.get("engine"),
"model_name": simulation_result.get("model_name"),
"scenario_id": scenario_id,
"simulation_warning": simulation_error,
"summary_metrics": simulation_result.get("metrics", {}),
"stage_timeline": stage_timeline,
"stages_page": paginated["items"],
"pagination": paginated["pagination"],
"daily_records_count": len(simulation_result.get("daily_output", [])),
"default_page_size": context.page_size,
}