from __future__ import annotations from copy import deepcopy from dataclasses import dataclass from datetime import date, datetime from decimal import Decimal from typing import Any from django.db.models import Avg from crop_simulation.growth_simulation import GrowthSimulationContext, _run_projection_engine from crop_simulation.services import PcseSimulationManager, build_simulation_payload_from_farm from farm_data.services import get_canonical_farm_record, get_farm_plant_assignments from .models import AnalysisGridObservation, RemoteSensingClusterBlock, RemoteSensingSubdivisionResult from .satellite_snapshot import build_location_block_satellite_snapshots class ClusterRecommendationNotFound(Exception): pass class ClusterRecommendationValidationError(Exception): pass @dataclass class ClusterPlantCandidate: plant_id: int | None plant_name: str position: int | None stage: str score: float predicted_yield: float | None predicted_yield_tons: float | None biomass: float | None max_lai: float | None simulation_engine: str | None simulation_model_name: str | None simulation_warning: str | None supporting_metrics: dict[str, Any] def as_dict(self) -> dict[str, Any]: return { "plant_id": self.plant_id, "plant_name": self.plant_name, "position": self.position, "stage": self.stage, "score": self.score, "predicted_yield": self.predicted_yield, "predicted_yield_tons": self.predicted_yield_tons, "biomass": self.biomass, "max_lai": self.max_lai, "simulation_engine": self.simulation_engine, "simulation_model_name": self.simulation_model_name, "simulation_warning": self.simulation_warning, "supporting_metrics": self.supporting_metrics, } def _safe_float(value: Any) -> float | None: try: if value in (None, ""): return None return float(value) except (TypeError, ValueError): return None def _clamp(value: float, minimum: float, maximum: float) -> float: if minimum > maximum: minimum, maximum = maximum, minimum return max(minimum, min(value, maximum)) def _json_safe(value: Any) -> Any: if isinstance(value, Decimal): return float(value) if isinstance(value, datetime): formatted = value.isoformat() if formatted.endswith("+00:00"): return formatted[:-6] + "Z" return formatted if isinstance(value, date): return value.isoformat() if isinstance(value, dict): return {str(key): _json_safe(item) for key, item in value.items()} if isinstance(value, (list, tuple)): return [_json_safe(item) for item in value] return value def _build_cluster_entries( snapshots: list[dict[str, Any]], *, cluster_blocks_by_uuid: dict[str, RemoteSensingClusterBlock], ) -> list[dict[str, Any]]: entries_by_key: dict[str, dict[str, Any]] = {} for snapshot in snapshots: block_code = str(snapshot.get("block_code") or "").strip() temporal_extent = snapshot.get("temporal_extent") for satellite_sub_block in snapshot.get("satellite_sub_blocks") or []: cluster_uuid = str(satellite_sub_block.get("cluster_uuid") or "").strip() sub_block_code = str(satellite_sub_block.get("sub_block_code") or "").strip() cluster_label = satellite_sub_block.get("cluster_label") if cluster_uuid: entry_key = cluster_uuid elif sub_block_code: entry_key = f"{block_code}::{sub_block_code}" else: entry_key = f"{block_code}::cluster-{cluster_label}" entry = entries_by_key.setdefault( entry_key, { "block_code": block_code, "cluster_uuid": cluster_uuid or None, "sub_block_code": sub_block_code, "cluster_label": cluster_label, "temporal_extent": temporal_extent, "cluster_block": None, "satellite_metrics": {}, "sensor_metrics": {}, "resolved_metrics": {}, "source_metadata": { "block_status": snapshot.get("status") or "missing", "aggregation_strategy": snapshot.get("aggregation_strategy") or "missing", "has_satellite_metrics": False, "has_sensor_metrics": False, }, }, ) entry["satellite_metrics"] = dict(satellite_sub_block.get("resolved_metrics") or {}) entry["resolved_metrics"].update(entry["satellite_metrics"]) entry["source_metadata"]["has_satellite_metrics"] = True if cluster_uuid and cluster_uuid in cluster_blocks_by_uuid: entry["cluster_block"] = cluster_blocks_by_uuid[cluster_uuid] for sensor_sub_block in snapshot.get("sensor_sub_blocks") or []: cluster_uuid = str(sensor_sub_block.get("cluster_uuid") or "").strip() sub_block_code = str(sensor_sub_block.get("sub_block_code") or "").strip() cluster_label = sensor_sub_block.get("cluster_label") candidate_keys = [ cluster_uuid, f"{block_code}::{sub_block_code}" if sub_block_code else "", f"{block_code}::cluster-{cluster_label}" if cluster_label is not None else "", ] entry = None for candidate_key in candidate_keys: if candidate_key and candidate_key in entries_by_key: entry = entries_by_key[candidate_key] break if entry is None: continue entry["sensor_metrics"] = dict(sensor_sub_block.get("resolved_metrics") or {}) entry["resolved_metrics"].update(entry["sensor_metrics"]) entry["source_metadata"]["has_sensor_metrics"] = True return list(entries_by_key.values()) def _attach_missing_satellite_metrics(cluster_entries: list[dict[str, Any]]) -> None: for cluster_entry in cluster_entries: cluster_block = cluster_entry.get("cluster_block") if cluster_block is None: continue needs_soil_vv = "soil_vv" not in (cluster_entry.get("resolved_metrics") or {}) if not needs_soil_vv: continue observation_summary = AnalysisGridObservation.objects.filter( cell__cell_code__in=list(cluster_block.cell_codes or []), temporal_start=cluster_block.result.temporal_start, temporal_end=cluster_block.result.temporal_end, ).aggregate(soil_vv_mean=Avg("soil_vv")) soil_vv_mean = _safe_float(observation_summary.get("soil_vv_mean")) if soil_vv_mean is None: continue rounded_soil_vv = round(soil_vv_mean, 6) cluster_entry.setdefault("satellite_metrics", {})["soil_vv"] = rounded_soil_vv cluster_entry.setdefault("resolved_metrics", {})["soil_vv"] = rounded_soil_vv def _build_cluster_overrides( base_payload: dict[str, Any], *, cluster_metrics: dict[str, Any], ) -> tuple[dict[str, Any], dict[str, Any]]: soil_parameters = deepcopy(base_payload.get("soil") or {}) site_parameters = deepcopy(base_payload.get("site_parameters") or {}) ndwi = _safe_float(cluster_metrics.get("ndwi")) if ndwi is not None: smfcf = _clamp(ndwi, 0.2, 0.55) smw = _clamp(smfcf * 0.45, 0.05, max(smfcf - 0.02, 0.06)) sm0 = _clamp( min(max(smfcf + 0.08, smw + 0.12), 0.6), max(smfcf + 0.02, smw + 0.05), 0.8, ) soil_parameters["SMFCF"] = round(smfcf, 3) soil_parameters["SMW"] = round(smw, 3) soil_parameters["SM0"] = round(sm0, 3) site_parameters["SMLIM"] = round(_clamp(smfcf, smw, sm0), 3) soil_moisture = _safe_float(cluster_metrics.get("soil_moisture")) if soil_moisture is not None: soil_parameters["soil_moisture"] = soil_moisture site_parameters["WAV"] = round(max(soil_moisture, 0.0), 3) nutrient_mappings = ( ("nitrogen", "NAVAILI", "nitrogen"), ("phosphorus", "P_STATUS", "phosphorus"), ("potassium", "K_STATUS", "potassium"), ("soil_ph", "SOIL_PH", "soil_ph"), ("electrical_conductivity", "EC", "electrical_conductivity"), ) for metric_name, site_key, soil_key in nutrient_mappings: value = _safe_float(cluster_metrics.get(metric_name)) if value is None: continue soil_parameters[soil_key] = value site_parameters[site_key] = value return soil_parameters, site_parameters def _serialize_cluster_block(cluster_block: RemoteSensingClusterBlock | None) -> dict[str, Any] | None: if cluster_block is None: return None return { "uuid": str(cluster_block.uuid), "sub_block_code": cluster_block.sub_block_code, "cluster_label": cluster_block.cluster_label, "chunk_size_sqm": cluster_block.chunk_size_sqm, "centroid_lat": cluster_block.centroid_lat, "centroid_lon": cluster_block.centroid_lon, "center_cell_code": cluster_block.center_cell_code, "center_cell_lat": cluster_block.center_cell_lat, "center_cell_lon": cluster_block.center_cell_lon, "cell_count": cluster_block.cell_count, "cell_codes": list(cluster_block.cell_codes or []), "geometry": cluster_block.geometry, "metadata": dict(cluster_block.metadata or {}), "created_at": cluster_block.created_at, "updated_at": cluster_block.updated_at, } def _simulate_candidate( *, base_payload: dict[str, Any], soil_parameters: dict[str, Any], site_parameters: dict[str, Any], ) -> tuple[dict[str, Any], str | None]: manager = PcseSimulationManager() try: return ( manager.run_simulation( weather=base_payload.get("weather") or [], soil=soil_parameters, crop_parameters=base_payload.get("crop_parameters") or {}, agromanagement=base_payload.get("agromanagement") or [], site_parameters=site_parameters, ), None, ) except Exception as exc: context = GrowthSimulationContext( farm_uuid=None, plant_name=str((base_payload.get("crop_parameters") or {}).get("crop_name") or ""), plant=base_payload.get("plant"), dynamic_parameters=[], weather=base_payload.get("weather") or [], crop_parameters=base_payload.get("crop_parameters") or {}, soil_parameters=soil_parameters, site_parameters=site_parameters, agromanagement=base_payload.get("agromanagement") or [], page_size=10, ) fallback_result = _run_projection_engine(context) return fallback_result, f"simulation_fallback:{exc}" def _rank_cluster_plants( cluster_entry: dict[str, Any], *, plant_assignments: list[Any], base_payloads: dict[str, dict[str, Any]], ) -> list[dict[str, Any]]: candidates: list[ClusterPlantCandidate] = [] for assignment in plant_assignments: plant_name = str(getattr(assignment.plant, "name", "") or "").strip() if not plant_name: continue base_payload = base_payloads[plant_name] soil_parameters, site_parameters = _build_cluster_overrides( base_payload, cluster_metrics=dict(cluster_entry.get("resolved_metrics") or {}), ) simulation_result, simulation_warning = _simulate_candidate( base_payload=base_payload, soil_parameters=soil_parameters, site_parameters=site_parameters, ) metrics = dict(simulation_result.get("metrics") or {}) predicted_yield = _safe_float(metrics.get("yield_estimate")) biomass = _safe_float(metrics.get("biomass")) max_lai = _safe_float(metrics.get("max_lai")) predicted_yield_tons = None if predicted_yield is None else round(max(predicted_yield, 0.0) / 1000.0, 4) score = round(predicted_yield if predicted_yield is not None else -1.0, 4) candidates.append( ClusterPlantCandidate( plant_id=getattr(assignment.plant, "backend_plant_id", None), plant_name=plant_name, position=getattr(assignment, "position", None), stage=str(getattr(assignment, "stage", "") or ""), score=score, predicted_yield=round(predicted_yield, 4) if predicted_yield is not None else None, predicted_yield_tons=predicted_yield_tons, biomass=round(biomass, 4) if biomass is not None else None, max_lai=round(max_lai, 4) if max_lai is not None else None, simulation_engine=simulation_result.get("engine"), simulation_model_name=simulation_result.get("model_name"), simulation_warning=simulation_warning, supporting_metrics=metrics, ) ) ranked_candidates = sorted( candidates, key=lambda item: ( item.score, item.biomass if item.biomass is not None else float("-inf"), -1 * (item.position if item.position is not None else 10_000), ), reverse=True, ) return [candidate.as_dict() for candidate in ranked_candidates] def build_cluster_crop_recommendations(farm_uuid: str) -> dict[str, Any]: farm = get_canonical_farm_record(farm_uuid) if farm is None: raise ClusterRecommendationNotFound("مزرعه پیدا نشد.") plant_assignments = get_farm_plant_assignments(farm) if not plant_assignments: raise ClusterRecommendationValidationError("برای این مزرعه هنوز هیچ گیاهی در farm_data ثبت نشده است.") location = farm.center_location snapshots = build_location_block_satellite_snapshots( location, sensor_payload=farm.sensor_payload, ) cluster_uuids = { str(sub_block.get("cluster_uuid") or "").strip() for snapshot in snapshots for sub_block in (snapshot.get("satellite_sub_blocks") or []) if str(sub_block.get("cluster_uuid") or "").strip() } if not cluster_uuids: raise ClusterRecommendationNotFound("برای این مزرعه هنوز خروجی KMeans در location_data ثبت نشده است.") cluster_blocks_by_uuid = { str(cluster_block.uuid): cluster_block for cluster_block in RemoteSensingClusterBlock.objects.filter(uuid__in=list(cluster_uuids)).select_related("result") } cluster_entries = _build_cluster_entries( snapshots, cluster_blocks_by_uuid=cluster_blocks_by_uuid, ) _attach_missing_satellite_metrics(cluster_entries) if not cluster_entries: raise ClusterRecommendationNotFound("برای این مزرعه هنوز کلاستر قابل استفاده پیدا نشد.") recommendation_result_ids = sorted( { int(cluster_block.result_id) for cluster_block in cluster_blocks_by_uuid.values() if cluster_block.result_id } ) cached_payload = _load_cached_cluster_recommendations( farm_uuid=str(farm.farm_uuid), result_ids=recommendation_result_ids, plant_assignments=plant_assignments, ) if cached_payload is not None: return cached_payload base_payloads: dict[str, dict[str, Any]] = {} for assignment in plant_assignments: plant_name = str(getattr(assignment.plant, "name", "") or "").strip() if not plant_name or plant_name in base_payloads: continue try: base_payloads[plant_name] = build_simulation_payload_from_farm( farm_uuid=str(farm.farm_uuid), plant_name=plant_name, ) except Exception as exc: raise ClusterRecommendationValidationError( f"مقایسه گیاه‌ها با crop_simulation انجام نشد: {exc}" ) from exc response_clusters: list[dict[str, Any]] = [] for cluster_entry in cluster_entries: candidate_plants = _rank_cluster_plants( cluster_entry, plant_assignments=plant_assignments, base_payloads=base_payloads, ) response_clusters.append( { "block_code": cluster_entry.get("block_code") or "", "cluster_uuid": cluster_entry.get("cluster_uuid"), "sub_block_code": cluster_entry.get("sub_block_code") or "", "cluster_label": cluster_entry.get("cluster_label"), "temporal_extent": cluster_entry.get("temporal_extent"), "cluster_block": _serialize_cluster_block(cluster_entry.get("cluster_block")), "satellite_metrics": dict(cluster_entry.get("satellite_metrics") or {}), "sensor_metrics": dict(cluster_entry.get("sensor_metrics") or {}), "resolved_metrics": dict(cluster_entry.get("resolved_metrics") or {}), "candidate_plants": candidate_plants, "suggested_plant": candidate_plants[0] if candidate_plants else None, "source_metadata": dict(cluster_entry.get("source_metadata") or {}), } ) payload = { "farm_uuid": str(farm.farm_uuid), "location_id": location.id, "evaluated_plant_count": len(base_payloads), "cluster_count": len(response_clusters), "registered_plants": [ { "plant_id": assignment.plant.backend_plant_id, "plant_name": assignment.plant.name, "position": assignment.position, "stage": assignment.stage, } for assignment in plant_assignments ], "clusters": response_clusters, "source_metadata": { "source": "location_data+kmeans+farm_data+crop_simulation", "location_id": location.id, "snapshot_block_count": len(snapshots), }, } _store_cached_cluster_recommendations( farm_uuid=str(farm.farm_uuid), result_ids=recommendation_result_ids, plant_assignments=plant_assignments, payload=payload, ) return payload def _build_assignment_cache_signature(plant_assignments: list[Any]) -> list[dict[str, Any]]: return [ { "plant_id": getattr(assignment.plant, "backend_plant_id", None), "position": int(assignment.position or 0), "stage": str(assignment.stage or ""), } for assignment in plant_assignments ] def _load_cached_cluster_recommendations( *, farm_uuid: str, result_ids: list[int], plant_assignments: list[Any], ) -> dict[str, Any] | None: if not result_ids: return None cache_key = f"farm::{farm_uuid}" assignment_signature = _build_assignment_cache_signature(plant_assignments) for result in RemoteSensingSubdivisionResult.objects.filter(id__in=result_ids): metadata = dict(result.metadata or {}) recommendation_cache = dict(metadata.get("cluster_recommendations") or {}) cached_entry = recommendation_cache.get(cache_key) if not isinstance(cached_entry, dict): continue if cached_entry.get("assignment_signature") != assignment_signature: continue payload = cached_entry.get("payload") if isinstance(payload, dict): return payload return None def _store_cached_cluster_recommendations( *, farm_uuid: str, result_ids: list[int], plant_assignments: list[Any], payload: dict[str, Any], ) -> None: if not result_ids: return cache_key = f"farm::{farm_uuid}" assignment_signature = _build_assignment_cache_signature(plant_assignments) for result in RemoteSensingSubdivisionResult.objects.filter(id__in=result_ids): metadata = dict(result.metadata or {}) recommendation_cache = dict(metadata.get("cluster_recommendations") or {}) recommendation_cache[cache_key] = { "assignment_signature": assignment_signature, "payload": _json_safe(payload), } metadata["cluster_recommendations"] = recommendation_cache result.metadata = metadata result.save(update_fields=["metadata", "updated_at"])