from __future__ import annotations from math import sqrt from typing import Any from sensor_data.models import SensorData, SensorDataHistory QUALITY_REAL = "REAL" QUALITY_INTERPOLATED = "INTERPOLATED" QUALITY_MISSING = "MISSING" INTERPOLATION_LIMIT = 3 IDW_POWER = 2 MAX_GRID_STEPS = 10 def _interpolate_series(points: list[dict[str, Any]], limit: int = INTERPOLATION_LIMIT) -> list[dict[str, Any]]: output = [dict(point) for point in points] known_indexes = [index for index, point in enumerate(output) if point["value"] is not None] for start_index, end_index in zip(known_indexes, known_indexes[1:]): gap = end_index - start_index - 1 if gap <= 0 or gap > limit: continue start_value = output[start_index]["value"] end_value = output[end_index]["value"] if start_value is None or end_value is None: continue step = (end_value - start_value) / (gap + 1) for offset in range(1, gap + 1): target_index = start_index + offset output[target_index]["value"] = round(start_value + (step * offset), 2) output[target_index]["quality_flag"] = QUALITY_INTERPOLATED return output def _sensor_time_series(sensor: Any, histories: list[Any]) -> list[dict[str, Any]]: points = [] for item in reversed(histories): points.append( { "timestamp": item.recorded_at.isoformat(), "value": float(item.soil_moisture) if item.soil_moisture is not None else None, "quality_flag": QUALITY_REAL if item.soil_moisture is not None else QUALITY_MISSING, } ) points.append( { "timestamp": sensor.updated_at.isoformat() if getattr(sensor, "updated_at", None) else None, "value": float(sensor.soil_moisture) if getattr(sensor, "soil_moisture", None) is not None else None, "quality_flag": QUALITY_REAL if getattr(sensor, "soil_moisture", None) is not None else QUALITY_MISSING, } ) return _interpolate_series(points) def _latest_sensor_measurement(sensor: Any, histories: list[Any]) -> dict[str, Any]: series = _sensor_time_series(sensor, histories) latest = series[-1] if series else {"timestamp": None, "value": None, "quality_flag": QUALITY_MISSING} return { "sensor_id": str(sensor.uuid_sensor), "latitude": float(sensor.location.latitude), "longitude": float(sensor.location.longitude), "depth": None, "timestamp": latest["timestamp"], "soil_moisture_value": latest["value"], "quality_flag": latest["quality_flag"], } def _idw_value(lat: float, lon: float, sensor_points: list[dict[str, Any]]) -> float | None: weighted_sum = 0.0 weight_total = 0.0 for point in sensor_points: value = point["soil_moisture_value"] if value is None: continue distance = sqrt(((lat - point["latitude"]) ** 2) + ((lon - point["longitude"]) ** 2)) if distance == 0: return round(float(value), 2) weight = 1 / (distance**IDW_POWER) weighted_sum += weight * float(value) weight_total += weight if weight_total == 0: return None return round(weighted_sum / weight_total, 2) def _grid_axis(min_value: float, max_value: float) -> list[float]: if min_value == max_value: return [round(min_value, 6)] step_count = min(MAX_GRID_STEPS, max(int((max_value - min_value) / 0.0001) + 1, 2)) step = (max_value - min_value) / (step_count - 1) return [round(min_value + (step * index), 6) for index in range(step_count)] def _load_sensor_network(current_sensor: Any) -> list[Any]: plant_ids = list(current_sensor.plants.values_list("id", flat=True)) queryset = SensorData.objects.select_related("location").prefetch_related("plants") if plant_ids: queryset = queryset.filter(plants__id__in=plant_ids).distinct() return list(queryset) def build_soil_moisture_heatmap(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict: context = context or {} current_sensor = context.get("sensor") if current_sensor is None: return { "timestamp": None, "grid_resolution": None, "grid_cells": [], "sensor_points": [], "quality_legend": {}, } sensors = _load_sensor_network(current_sensor) sensor_ids = [sensor.uuid_sensor for sensor in sensors] history_rows = SensorDataHistory.objects.filter(uuid_sensor__in=sensor_ids).order_by("-recorded_at")[:200] history_map: dict[Any, list[Any]] = {} for row in history_rows: history_map.setdefault(row.uuid_sensor, []).append(row) sensor_points = [ _latest_sensor_measurement(sensor, history_map.get(sensor.uuid_sensor, [])) for sensor in sensors ] valid_sensor_points = [point for point in sensor_points if point["soil_moisture_value"] is not None] if not valid_sensor_points: return { "timestamp": current_sensor.updated_at.isoformat() if getattr(current_sensor, "updated_at", None) else None, "grid_resolution": None, "grid_cells": [], "sensor_points": sensor_points, "quality_legend": { QUALITY_REAL: "اندازه‌گیری واقعی سنسور", QUALITY_INTERPOLATED: "مقدار سنسور با درون‌یابی زمانی کوتاه‌مدت", QUALITY_MISSING: "داده معتبر برای سنسور موجود نیست", }, } min_lat = min(point["latitude"] for point in valid_sensor_points) max_lat = max(point["latitude"] for point in valid_sensor_points) min_lon = min(point["longitude"] for point in valid_sensor_points) max_lon = max(point["longitude"] for point in valid_sensor_points) lat_axis = _grid_axis(min_lat, max_lat) lon_axis = _grid_axis(min_lon, max_lon) grid_cells = [] for lat in lat_axis: for lon in lon_axis: direct_sensor = next( ( point for point in valid_sensor_points if point["latitude"] == lat and point["longitude"] == lon ), None, ) if direct_sensor is not None: moisture_value = direct_sensor["soil_moisture_value"] quality_flag = direct_sensor["quality_flag"] elif len(valid_sensor_points) >= 2: moisture_value = _idw_value(lat, lon, valid_sensor_points) quality_flag = QUALITY_INTERPOLATED if moisture_value is not None else QUALITY_MISSING else: moisture_value = None quality_flag = QUALITY_MISSING grid_cells.append( { "lat": lat, "lon": lon, "moisture_value": moisture_value, "quality_flag": quality_flag, } ) lat_step = round(abs(lat_axis[1] - lat_axis[0]), 6) if len(lat_axis) > 1 else 0.0 lon_step = round(abs(lon_axis[1] - lon_axis[0]), 6) if len(lon_axis) > 1 else 0.0 return { "timestamp": max(point["timestamp"] for point in sensor_points if point["timestamp"]), "grid_resolution": { "lat_step": lat_step, "lon_step": lon_step, "rows": len(lat_axis), "cols": len(lon_axis), }, "grid_cells": grid_cells, "sensor_points": sensor_points, "quality_legend": { QUALITY_REAL: "اندازه‌گیری واقعی سنسور", QUALITY_INTERPOLATED: "مقدار برآوردشده با درون‌یابی زمانی/فضایی", QUALITY_MISSING: "داده معتبر در دسترس نیست", }, }