from __future__ import annotations from decimal import Decimal, ROUND_HALF_UP from numbers import Number import logging from django.conf import settings from django.db import transaction from django.utils.dateparse import parse_datetime import requests from location_data.models import SoilLocation from location_data.serializers import SoilDepthDataSerializer from location_data.tasks import fetch_soil_data_for_coordinates from irrigation.serializers import IrrigationMethodSerializer from weather.services import update_weather_for_location from weather.models import WeatherForecast from .models import ( FarmPlantAssignment, ParameterUpdateLog, PlantCatalogSnapshot, SensorData, SensorParameter, ) from .serializers import PlantCatalogSnapshotSerializer, WeatherForecastDetailSerializer DEPTH_PRIORITY = ["0-5cm", "5-15cm", "15-30cm"] DECIMAL_PRECISION = Decimal("0.000001") logger = logging.getLogger(__name__) class ExternalDataSyncError(Exception): """خطا در همگام‌سازی داده از سرویس‌های بیرونی.""" class BackendSyncError(Exception): """خطا در همگام سازی کاتالوگ گیاه و assignmentها از Backend.""" PARAMETER_LABEL_OVERRIDES = { "soil_moisture": "رطوبت خاک", "soil_temperature": "دمای خاک", "soil_ph": "pH خاک", "electrical_conductivity": "هدایت الکتریکی", "nitrogen": "نیتروژن", "phosphorus": "فسفر", "potassium": "پتاسیم", } PARAMETER_UNIT_OVERRIDES = { "soil_moisture": "%", "soil_temperature": "°C", "soil_ph": "", "electrical_conductivity": "dS/m", "nitrogen": "mg/kg", "phosphorus": "mg/kg", "potassium": "mg/kg", } def get_backend_plant_base_url() -> str: return getattr(settings, "BACKEND_PLANT_SYNC_BASE_URL", "").rstrip("/") def get_backend_plant_timeout() -> int: return int(getattr(settings, "BACKEND_PLANT_SYNC_TIMEOUT", 20)) def get_backend_plant_headers() -> dict[str, str]: headers = {"Accept": "application/json"} api_key = getattr(settings, "BACKEND_PLANT_SYNC_API_KEY", "").strip() if api_key: headers["X-API-Key"] = api_key headers["Authorization"] = f"Api-Key {api_key}" return headers def _extract_envelope_list(payload): if isinstance(payload, list): return payload if not isinstance(payload, dict): return [] data = payload.get("data") if isinstance(data, list): return data result = payload.get("result") if isinstance(result, list): return result if isinstance(data, dict) and isinstance(data.get("result"), list): return data["result"] return [] def _normalize_growth_stages(item: dict) -> list[str]: stages = item.get("growth_stages") if isinstance(stages, list): return [str(stage).strip() for stage in stages if str(stage).strip()] growth_stage = str(item.get("growth_stage") or "").strip() if not growth_stage: return [] return [part.strip() for part in growth_stage.replace("،", ",").split(",") if part.strip()] def _snapshot_defaults_from_payload(item: dict) -> dict: source_updated_at = parse_datetime(str(item.get("updated_at") or "").strip()) if item.get("updated_at") else None return { "name": str(item.get("name") or "").strip(), "slug": str(item.get("slug") or "").strip(), "icon": str(item.get("icon") or "leaf").strip() or "leaf", "description": str(item.get("description") or "").strip(), "metadata": item.get("metadata") if isinstance(item.get("metadata"), dict) else {}, "light": str(item.get("light") or "").strip(), "watering": str(item.get("watering") or "").strip(), "soil": str(item.get("soil") or "").strip(), "temperature": str(item.get("temperature") or "").strip(), "growth_stage": str(item.get("growth_stage") or "").strip(), "growth_stages": _normalize_growth_stages(item), "planting_season": str(item.get("planting_season") or "").strip(), "harvest_time": str(item.get("harvest_time") or "").strip(), "spacing": str(item.get("spacing") or "").strip(), "fertilizer": str(item.get("fertilizer") or "").strip(), "health_profile": item.get("health_profile") if isinstance(item.get("health_profile"), dict) else {}, "irrigation_profile": item.get("irrigation_profile") if isinstance(item.get("irrigation_profile"), dict) else {}, "growth_profile": item.get("growth_profile") if isinstance(item.get("growth_profile"), dict) else {}, "is_active": bool(item.get("is_active", True)), "source_updated_at": source_updated_at, } def sync_plant_catalog_from_backend(plant_payloads: list[dict] | None = None) -> list[PlantCatalogSnapshot]: if plant_payloads is None: base_url = get_backend_plant_base_url() if not base_url: raise BackendSyncError("BACKEND_PLANT_SYNC_BASE_URL is not configured.") try: response = requests.get( f"{base_url}/api/plants/", headers=get_backend_plant_headers(), timeout=get_backend_plant_timeout(), ) except requests.RequestException as exc: raise BackendSyncError(f"Backend plant catalog request failed: {exc}") from exc if response.status_code >= 400: raise BackendSyncError(f"Backend plant catalog returned status {response.status_code}.") plant_payloads = _extract_envelope_list(response.json()) snapshots: list[PlantCatalogSnapshot] = [] with transaction.atomic(): for item in plant_payloads or []: if not isinstance(item, dict): continue plant_id = item.get("id") or item.get("backend_plant_id") if plant_id in (None, ""): continue snapshot, _ = PlantCatalogSnapshot.objects.update_or_create( backend_plant_id=int(plant_id), defaults=_snapshot_defaults_from_payload(item), ) snapshots.append(snapshot) return snapshots def assign_farm_plants_from_backend_ids(farm: SensorData, backend_plant_ids: list[int] | None) -> list[PlantCatalogSnapshot]: if backend_plant_ids is None: return list(get_farm_plant_snapshots(farm)) normalized_ids = [int(plant_id) for plant_id in backend_plant_ids] snapshots = list(PlantCatalogSnapshot.objects.filter(backend_plant_id__in=normalized_ids)) snapshot_by_backend_id = {snapshot.backend_plant_id: snapshot for snapshot in snapshots} missing_ids = [plant_id for plant_id in normalized_ids if plant_id not in snapshot_by_backend_id] if missing_ids: raise BackendSyncError( "Plant catalog snapshot missing for backend ids: " + ", ".join(str(plant_id) for plant_id in missing_ids) ) with transaction.atomic(): FarmPlantAssignment.objects.filter(farm=farm).exclude( plant__backend_plant_id__in=normalized_ids ).delete() for position, backend_plant_id in enumerate(normalized_ids): FarmPlantAssignment.objects.update_or_create( farm=farm, plant=snapshot_by_backend_id[backend_plant_id], defaults={"position": position}, ) return [snapshot_by_backend_id[backend_plant_id] for backend_plant_id in normalized_ids] def get_farm_plant_assignments(farm: SensorData) -> list[FarmPlantAssignment]: return list( farm.plant_assignments.select_related("plant").order_by("position", "id") ) def get_farm_plant_snapshots(farm: SensorData) -> list[PlantCatalogSnapshot]: return [assignment.plant for assignment in get_farm_plant_assignments(farm)] def get_primary_plant_snapshot(farm: SensorData) -> PlantCatalogSnapshot | None: assignments = get_farm_plant_assignments(farm) return assignments[0].plant if assignments else None def get_farm_plant_snapshot_by_name( farm: SensorData, plant_name: str | None, ) -> PlantCatalogSnapshot | None: normalized_name = str(plant_name or "").strip().lower() if not normalized_name: return get_primary_plant_snapshot(farm) for assignment in get_farm_plant_assignments(farm): if assignment.plant.name.strip().lower() == normalized_name: return assignment.plant return get_primary_plant_snapshot(farm) def clone_snapshot_as_runtime_plant( snapshot: PlantCatalogSnapshot | None, *, growth_stage: str | None = None, ): if snapshot is None: return None class RuntimePlant: pass runtime = RuntimePlant() for field_name in ( "backend_plant_id", "name", "slug", "icon", "description", "metadata", "light", "watering", "soil", "temperature", "growth_stage", "growth_stages", "planting_season", "harvest_time", "spacing", "fertilizer", "health_profile", "irrigation_profile", "growth_profile", "is_active", ): setattr(runtime, field_name, getattr(snapshot, field_name)) if growth_stage: runtime.growth_stage = growth_stage runtime.id = snapshot.backend_plant_id return runtime def build_plant_text_from_snapshot( plant: PlantCatalogSnapshot | None, growth_stage: str, ) -> str | None: if plant is None: return None lines = [ f"نام گیاه: {plant.name}", f"مرحله رشد: {growth_stage}", ] if plant.light: lines.append(f"نور مورد نیاز: {plant.light}") if plant.watering: lines.append(f"آبیاری: {plant.watering}") if plant.soil: lines.append(f"خاک مناسب: {plant.soil}") if plant.temperature: lines.append(f"دمای مناسب: {plant.temperature}") if plant.planting_season: lines.append(f"فصل کاشت: {plant.planting_season}") if plant.harvest_time: lines.append(f"زمان برداشت: {plant.harvest_time}") if plant.spacing: lines.append(f"فاصله کاشت: {plant.spacing}") if plant.fertilizer: lines.append(f"کود مناسب: {plant.fertilizer}") return "\n".join(lines) def build_farm_plant_context(farm_uuid: str) -> dict | None: farm = ( SensorData.objects.select_related( "center_location", "weather_forecast", "irrigation_method", ) .prefetch_related("plant_assignments__plant", "center_location__depths") .filter(farm_uuid=farm_uuid) .first() ) if farm is None: return None assignments = get_farm_plant_assignments(farm) snapshots = [assignment.plant for assignment in assignments] return { "farm": farm, "plant_ids": [plant.backend_plant_id for plant in snapshots], "plants": PlantCatalogSnapshotSerializer(snapshots, many=True).data, "plant_snapshots": snapshots, "plant_assignments": assignments, "primary_plant": snapshots[0] if snapshots else None, } def infer_sensor_parameter_data_type(value: object) -> str: if isinstance(value, bool): return "bool" if isinstance(value, int) and not isinstance(value, bool): return "int" if isinstance(value, float): return "float" if isinstance(value, str): return "string" if isinstance(value, list): return "list" if isinstance(value, dict): return "object" return "string" def build_parameter_defaults(sensor_key: str, code: str, value: object) -> dict[str, object]: return { "name_fa": PARAMETER_LABEL_OVERRIDES.get(code) or code.replace("_", " ").strip(), "unit": PARAMETER_UNIT_OVERRIDES.get(code, ""), "data_type": infer_sensor_parameter_data_type(value), "metadata": { "source": "auto_discovered", "sensor_key": sensor_key, "code": code, }, } def sync_sensor_parameters_from_payload(sensor_payload: dict | None) -> list[SensorParameter]: if not isinstance(sensor_payload, dict): return [] synced_parameters: list[SensorParameter] = [] with transaction.atomic(): for sensor_key, sensor_values in sensor_payload.items(): if not isinstance(sensor_values, dict): continue for code, value in sensor_values.items(): defaults = build_parameter_defaults(sensor_key, code, value) parameter, created = SensorParameter.objects.get_or_create( sensor_key=sensor_key, code=code, defaults=defaults, ) if created: ParameterUpdateLog.objects.create( parameter=parameter, action=ParameterUpdateLog.ACTION_ADDED, payload={ "sensor_key": parameter.sensor_key, "code": parameter.code, "name_fa": parameter.name_fa, "unit": parameter.unit, "data_type": parameter.data_type, "metadata": parameter.metadata, "source": "farm_data_auto_sync", }, ) synced_parameters.append(parameter) return synced_parameters def get_sensor_parameter_catalog(sensor_payload: dict | None = None) -> dict[str, list[dict[str, object]]]: parameter_queryset = SensorParameter.objects.order_by("sensor_key", "code") if sensor_payload and isinstance(sensor_payload, dict): parameter_queryset = parameter_queryset.filter(sensor_key__in=list(sensor_payload.keys())) catalog: dict[str, list[dict[str, object]]] = {} for parameter in parameter_queryset: catalog.setdefault(parameter.sensor_key, []).append( { "code": parameter.code, "name_fa": parameter.name_fa, "unit": parameter.unit, "data_type": parameter.data_type, "metadata": parameter.metadata, } ) return catalog def get_farm_details(farm_uuid: str): farm = ( SensorData.objects.select_related( "center_location", "weather_forecast", "irrigation_method", ) .prefetch_related("plant_assignments__plant", "center_location__depths") .filter(farm_uuid=farm_uuid) .first() ) if farm is None: return None sync_sensor_parameters_from_payload(farm.sensor_payload) center_location = farm.center_location weather = farm.weather_forecast if weather is None: weather = ( center_location.weather_forecasts.order_by("-forecast_date", "-id").first() ) depths = list(center_location.depths.all()) depths.sort(key=lambda item: DEPTH_PRIORITY.index(item.depth_label) if item.depth_label in DEPTH_PRIORITY else 99) soil_metrics = _surface_soil_metrics(depths) sensor_metrics, sensor_metric_sources = _resolve_sensor_metrics(farm.sensor_payload) resolved_metrics = dict(soil_metrics) metric_sources = {key: "soil" for key in soil_metrics} for key, value in sensor_metrics.items(): resolved_metrics[key] = value metric_sources[key] = sensor_metric_sources[key] plant_assignments = get_farm_plant_assignments(farm) plant_snapshots = [assignment.plant for assignment in plant_assignments] return { "center_location": { "id": center_location.id, "lat": center_location.latitude, "lon": center_location.longitude, "farm_boundary": center_location.farm_boundary, }, "weather": WeatherForecastDetailSerializer(weather).data if weather else None, "sensor_payload": farm.sensor_payload or {}, "sensor_schema": get_sensor_parameter_catalog(farm.sensor_payload), "soil": { "resolved_metrics": resolved_metrics, "metric_sources": metric_sources, "depths": SoilDepthDataSerializer(depths, many=True).data, }, "plant_ids": [plant.backend_plant_id for plant in plant_snapshots], "plants": PlantCatalogSnapshotSerializer(plant_snapshots, many=True).data, "plant_assignments": [ { "plant_id": assignment.plant.backend_plant_id, "position": assignment.position, "stage": assignment.stage, "metadata": assignment.metadata, "assigned_at": assignment.assigned_at, "updated_at": assignment.updated_at, "plant": PlantCatalogSnapshotSerializer(assignment.plant).data, } for assignment in plant_assignments ], "irrigation_method_id": farm.irrigation_method_id, "irrigation_method": ( IrrigationMethodSerializer(farm.irrigation_method).data if farm.irrigation_method else None ), "created_at": farm.created_at, "updated_at": farm.updated_at, } def resolve_center_location_from_boundary(farm_boundary: dict | list) -> SoilLocation: """ مرز مزرعه را می‌گیرد، مرکز را محاسبه می‌کند و رکورد SoilLocation را ایجاد/به‌روزرسانی می‌کند. """ points = _extract_boundary_points(farm_boundary) if not points: raise ValueError("farm_boundary باید حداقل 3 گوشه معتبر داشته باشد.") normalized_points = _normalize_points(points) if len(normalized_points) < 3: raise ValueError("farm_boundary باید حداقل 3 گوشه معتبر داشته باشد.") center_lat, center_lon = _compute_polygon_centroid(normalized_points) with transaction.atomic(): location, _ = SoilLocation.objects.update_or_create( latitude=center_lat, longitude=center_lon, defaults={"farm_boundary": _serialize_boundary(farm_boundary)}, ) return location def resolve_weather_for_location(location: SoilLocation) -> WeatherForecast | None: return ( WeatherForecast.objects.filter(location=location) .order_by("-forecast_date", "-id") .first() ) def ensure_location_and_weather_data(location: SoilLocation) -> tuple[SoilLocation, WeatherForecast | None]: """ اگر داده خاک یا آب‌وهوا برای location موجود نباشد، از سرویس مربوطه واکشی و در دیتابیس ذخیره می‌شود. """ if not location.is_complete: try: soil_result = fetch_soil_data_for_coordinates( latitude=float(location.latitude), longitude=float(location.longitude), ) except Exception as exc: raise ExternalDataSyncError(f"خطا در واکشی داده خاک: {exc}") from exc if soil_result.get("status") != "completed": raise ExternalDataSyncError( soil_result.get("error") or "واکشی داده خاک کامل نشد." ) location.refresh_from_db() weather_forecast = resolve_weather_for_location(location) if weather_forecast is None: weather_result = update_weather_for_location(location) if weather_result.get("status") not in {"success", "no_data"}: raise ExternalDataSyncError( weather_result.get("error") or "واکشی داده آب‌وهوا کامل نشد." ) weather_forecast = resolve_weather_for_location(location) return location, weather_forecast def _resolve_sensor_metrics(sensor_payload: dict | None) -> tuple[dict, dict]: if not isinstance(sensor_payload, dict): return {}, {} readings_by_metric: dict[str, list[tuple[str, object]]] = {} for sensor_key, sensor_values in sorted(sensor_payload.items()): if not isinstance(sensor_values, dict): continue for metric_key, metric_value in sensor_values.items(): readings_by_metric.setdefault(metric_key, []).append((sensor_key, metric_value)) resolved_metrics = {} metric_sources = {} for metric_key, readings in readings_by_metric.items(): resolved_value, source = _resolve_metric_readings(readings) resolved_metrics[metric_key] = resolved_value metric_sources[metric_key] = source return resolved_metrics, metric_sources def _resolve_metric_readings(readings: list[tuple[str, object]]) -> tuple[object, dict[str, object]]: if not readings: return None, {"type": "sensor", "strategy": "empty", "sensor_keys": []} sensor_keys = [sensor_key for sensor_key, _value in readings] distinct_values: list[object] = [] for _sensor_key, value in readings: if value not in distinct_values: distinct_values.append(value) if len(distinct_values) == 1: return distinct_values[0], { "type": "sensor", "strategy": "single_value", "sensor_keys": sensor_keys, "sensor_count": len(sensor_keys), } numeric_values = [_coerce_numeric(value) for value in distinct_values] if all(value is not None for value in numeric_values): average = sum(numeric_values) / len(numeric_values) resolved_value = _normalize_numeric_result(average, distinct_values) return resolved_value, { "type": "sensor", "strategy": "average", "sensor_keys": sensor_keys, "sensor_count": len(sensor_keys), "conflict": True, "distinct_values": distinct_values, } return distinct_values, { "type": "sensor", "strategy": "distinct_values", "sensor_keys": sensor_keys, "sensor_count": len(sensor_keys), "conflict": True, "distinct_values": distinct_values, } def _coerce_numeric(value: object) -> float | None: if isinstance(value, bool): return None if isinstance(value, Number): return float(value) try: return float(value) except (TypeError, ValueError): return None def _normalize_numeric_result(value: float, source_values: list[object]) -> int | float: if all(isinstance(item, int) and not isinstance(item, bool) for item in source_values): if value.is_integer(): return int(value) return float(Decimal(str(value)).quantize(Decimal("0.0001"), rounding=ROUND_HALF_UP)) def _surface_soil_metrics(depths) -> dict: if not depths: return {} primary_depth = depths[0] fields = [ "bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs", "phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500", ] return { field: getattr(primary_depth, field) for field in fields if getattr(primary_depth, field) is not None } def _extract_boundary_points(boundary: dict | list) -> list: if isinstance(boundary, dict): if boundary.get("type") == "Polygon": coordinates = boundary.get("coordinates") or [] if coordinates and isinstance(coordinates[0], list): return coordinates[0] return [] if "corners" in boundary: return boundary.get("corners") or [] if isinstance(boundary, list): return boundary return [] def _normalize_points(points: list) -> list[tuple[Decimal, Decimal]]: normalized: list[tuple[Decimal, Decimal]] = [] for point in points: lat = lon = None if isinstance(point, dict): lat = point.get("lat", point.get("latitude")) lon = point.get("lon", point.get("longitude")) elif isinstance(point, (list, tuple)) and len(point) >= 2: lon, lat = point[0], point[1] if lat is None or lon is None: continue lat_decimal = Decimal(str(lat)) lon_decimal = Decimal(str(lon)) normalized.append((lat_decimal, lon_decimal)) if len(normalized) > 1 and normalized[0] == normalized[-1]: normalized = normalized[:-1] return normalized def _serialize_boundary(boundary: dict | list) -> dict: if isinstance(boundary, dict) and boundary.get("type") == "Polygon": return boundary raw_points = boundary.get("corners") if isinstance(boundary, dict) else boundary normalized = _normalize_points(raw_points or []) coordinates = [[float(lon), float(lat)] for lat, lon in normalized] if coordinates and coordinates[0] != coordinates[-1]: coordinates.append(coordinates[0]) return { "type": "Polygon", "coordinates": [coordinates], } def _compute_polygon_centroid(points: list[tuple[Decimal, Decimal]]) -> tuple[Decimal, Decimal]: polygon = list(points) if polygon[0] != polygon[-1]: polygon.append(polygon[0]) twice_area = Decimal("0") centroid_lon = Decimal("0") centroid_lat = Decimal("0") for index in range(len(polygon) - 1): lat1, lon1 = polygon[index] lat2, lon2 = polygon[index + 1] cross = (lon1 * lat2) - (lon2 * lat1) twice_area += cross centroid_lon += (lon1 + lon2) * cross centroid_lat += (lat1 + lat2) * cross if twice_area == 0: return _compute_average_center(points) factor = Decimal("3") * twice_area return ( (centroid_lat / factor).quantize(DECIMAL_PRECISION, rounding=ROUND_HALF_UP), (centroid_lon / factor).quantize(DECIMAL_PRECISION, rounding=ROUND_HALF_UP), ) def _compute_average_center(points: list[tuple[Decimal, Decimal]]) -> tuple[Decimal, Decimal]: lat_sum = sum(lat for lat, _ in points) lon_sum = sum(lon for _, lon in points) count = Decimal(len(points)) return ( (lat_sum / count).quantize(DECIMAL_PRECISION, rounding=ROUND_HALF_UP), (lon_sum / count).quantize(DECIMAL_PRECISION, rounding=ROUND_HALF_UP), )