196 lines
7.4 KiB
Python
196 lines
7.4 KiB
Python
from __future__ import annotations
|
|
|
|
from math import sqrt
|
|
from typing import Any
|
|
|
|
from farm_data.models import SensorData
|
|
|
|
|
|
QUALITY_REAL = "REAL"
|
|
QUALITY_INTERPOLATED = "INTERPOLATED"
|
|
QUALITY_MISSING = "MISSING"
|
|
INTERPOLATION_LIMIT = 3
|
|
IDW_POWER = 2
|
|
MAX_GRID_STEPS = 10
|
|
|
|
|
|
def _interpolate_series(points: list[dict[str, Any]], limit: int = INTERPOLATION_LIMIT) -> list[dict[str, Any]]:
|
|
output = [dict(point) for point in points]
|
|
known_indexes = [index for index, point in enumerate(output) if point["value"] is not None]
|
|
|
|
for start_index, end_index in zip(known_indexes, known_indexes[1:]):
|
|
gap = end_index - start_index - 1
|
|
if gap <= 0 or gap > limit:
|
|
continue
|
|
|
|
start_value = output[start_index]["value"]
|
|
end_value = output[end_index]["value"]
|
|
if start_value is None or end_value is None:
|
|
continue
|
|
|
|
step = (end_value - start_value) / (gap + 1)
|
|
for offset in range(1, gap + 1):
|
|
target_index = start_index + offset
|
|
output[target_index]["value"] = round(start_value + (step * offset), 2)
|
|
output[target_index]["quality_flag"] = QUALITY_INTERPOLATED
|
|
|
|
return output
|
|
|
|
|
|
def _sensor_time_series(sensor: Any, histories: list[Any]) -> list[dict[str, Any]]:
|
|
points = []
|
|
for item in reversed(histories):
|
|
points.append(
|
|
{
|
|
"timestamp": item.recorded_at.isoformat(),
|
|
"value": float(item.soil_moisture) if item.soil_moisture is not None else None,
|
|
"quality_flag": QUALITY_REAL if item.soil_moisture is not None else QUALITY_MISSING,
|
|
}
|
|
)
|
|
points.append(
|
|
{
|
|
"timestamp": sensor.updated_at.isoformat() if getattr(sensor, "updated_at", None) else None,
|
|
"value": float(sensor.soil_moisture) if getattr(sensor, "soil_moisture", None) is not None else None,
|
|
"quality_flag": QUALITY_REAL if getattr(sensor, "soil_moisture", None) is not None else QUALITY_MISSING,
|
|
}
|
|
)
|
|
return _interpolate_series(points)
|
|
|
|
|
|
def _latest_sensor_measurement(sensor: Any, histories: list[Any]) -> dict[str, Any]:
|
|
series = _sensor_time_series(sensor, histories)
|
|
latest = series[-1] if series else {"timestamp": None, "value": None, "quality_flag": QUALITY_MISSING}
|
|
return {
|
|
"sensor_id": str(sensor.farm_uuid),
|
|
"latitude": float(sensor.center_location.latitude),
|
|
"longitude": float(sensor.center_location.longitude),
|
|
"depth": None,
|
|
"timestamp": latest["timestamp"],
|
|
"soil_moisture_value": latest["value"],
|
|
"quality_flag": latest["quality_flag"],
|
|
}
|
|
|
|
|
|
def _idw_value(lat: float, lon: float, sensor_points: list[dict[str, Any]]) -> float | None:
|
|
weighted_sum = 0.0
|
|
weight_total = 0.0
|
|
for point in sensor_points:
|
|
value = point["soil_moisture_value"]
|
|
if value is None:
|
|
continue
|
|
distance = sqrt(((lat - point["latitude"]) ** 2) + ((lon - point["longitude"]) ** 2))
|
|
if distance == 0:
|
|
return round(float(value), 2)
|
|
weight = 1 / (distance**IDW_POWER)
|
|
weighted_sum += weight * float(value)
|
|
weight_total += weight
|
|
if weight_total == 0:
|
|
return None
|
|
return round(weighted_sum / weight_total, 2)
|
|
|
|
|
|
def _grid_axis(min_value: float, max_value: float) -> list[float]:
|
|
if min_value == max_value:
|
|
return [round(min_value, 6)]
|
|
step_count = min(MAX_GRID_STEPS, max(int((max_value - min_value) / 0.0001) + 1, 2))
|
|
step = (max_value - min_value) / (step_count - 1)
|
|
return [round(min_value + (step * index), 6) for index in range(step_count)]
|
|
|
|
|
|
def _load_sensor_network(current_sensor: Any) -> list[Any]:
|
|
plant_ids = list(current_sensor.plants.values_list("id", flat=True))
|
|
queryset = SensorData.objects.select_related("center_location").prefetch_related("plants")
|
|
if plant_ids:
|
|
queryset = queryset.filter(plants__id__in=plant_ids).distinct()
|
|
return list(queryset)
|
|
|
|
|
|
def build_soil_moisture_heatmap(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
|
|
context = context or {}
|
|
current_sensor = context.get("sensor")
|
|
if current_sensor is None:
|
|
return {
|
|
"timestamp": None,
|
|
"grid_resolution": None,
|
|
"grid_cells": [],
|
|
"sensor_points": [],
|
|
"quality_legend": {},
|
|
}
|
|
|
|
sensors = _load_sensor_network(current_sensor)
|
|
sensor_points = [
|
|
_latest_sensor_measurement(sensor, [])
|
|
for sensor in sensors
|
|
]
|
|
valid_sensor_points = [point for point in sensor_points if point["soil_moisture_value"] is not None]
|
|
|
|
if not valid_sensor_points:
|
|
return {
|
|
"timestamp": current_sensor.updated_at.isoformat() if getattr(current_sensor, "updated_at", None) else None,
|
|
"grid_resolution": None,
|
|
"grid_cells": [],
|
|
"sensor_points": sensor_points,
|
|
"quality_legend": {
|
|
QUALITY_REAL: "اندازهگیری واقعی سنسور",
|
|
QUALITY_INTERPOLATED: "مقدار سنسور با درونیابی زمانی کوتاهمدت",
|
|
QUALITY_MISSING: "داده معتبر برای سنسور موجود نیست",
|
|
},
|
|
}
|
|
|
|
min_lat = min(point["latitude"] for point in valid_sensor_points)
|
|
max_lat = max(point["latitude"] for point in valid_sensor_points)
|
|
min_lon = min(point["longitude"] for point in valid_sensor_points)
|
|
max_lon = max(point["longitude"] for point in valid_sensor_points)
|
|
|
|
lat_axis = _grid_axis(min_lat, max_lat)
|
|
lon_axis = _grid_axis(min_lon, max_lon)
|
|
|
|
grid_cells = []
|
|
for lat in lat_axis:
|
|
for lon in lon_axis:
|
|
direct_sensor = next(
|
|
(
|
|
point for point in valid_sensor_points
|
|
if point["latitude"] == lat and point["longitude"] == lon
|
|
),
|
|
None,
|
|
)
|
|
if direct_sensor is not None:
|
|
moisture_value = direct_sensor["soil_moisture_value"]
|
|
quality_flag = direct_sensor["quality_flag"]
|
|
elif len(valid_sensor_points) >= 2:
|
|
moisture_value = _idw_value(lat, lon, valid_sensor_points)
|
|
quality_flag = QUALITY_INTERPOLATED if moisture_value is not None else QUALITY_MISSING
|
|
else:
|
|
moisture_value = None
|
|
quality_flag = QUALITY_MISSING
|
|
|
|
grid_cells.append(
|
|
{
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"moisture_value": moisture_value,
|
|
"quality_flag": quality_flag,
|
|
}
|
|
)
|
|
|
|
lat_step = round(abs(lat_axis[1] - lat_axis[0]), 6) if len(lat_axis) > 1 else 0.0
|
|
lon_step = round(abs(lon_axis[1] - lon_axis[0]), 6) if len(lon_axis) > 1 else 0.0
|
|
|
|
return {
|
|
"timestamp": max(point["timestamp"] for point in sensor_points if point["timestamp"]),
|
|
"grid_resolution": {
|
|
"lat_step": lat_step,
|
|
"lon_step": lon_step,
|
|
"rows": len(lat_axis),
|
|
"cols": len(lon_axis),
|
|
},
|
|
"grid_cells": grid_cells,
|
|
"sensor_points": sensor_points,
|
|
"quality_legend": {
|
|
QUALITY_REAL: "اندازهگیری واقعی سنسور",
|
|
QUALITY_INTERPOLATED: "مقدار برآوردشده با درونیابی زمانی/فضایی",
|
|
QUALITY_MISSING: "داده معتبر در دسترس نیست",
|
|
},
|
|
}
|