Files

501 lines
20 KiB
Python
Raw Permalink Normal View History

2026-04-25 17:22:41 +03:30
from __future__ import annotations
from datetime import datetime
from math import sqrt
from statistics import median
from typing import Any
from django.utils import timezone
from farm_data.context import load_farm_context
from farm_data.models import SensorData
2026-05-09 16:55:06 +03:30
from location_data.satellite_snapshot import build_location_satellite_snapshot
2026-04-25 17:22:41 +03:30
from rag.services import get_soil_anomaly_insight
from .anomaly_detection import build_anomaly_detection_card
from .health_summary import build_soil_health_summary
QUALITY_REAL = "REAL"
QUALITY_INTERPOLATED = "INTERPOLATED"
QUALITY_MISSING = "MISSING"
QUALITY_EXTRAPOLATED = "EXTRAPOLATED"
IDW_POWER = 2
MAX_GRID_STEPS = 10
FRESHNESS_HALF_LIFE_HOURS = 24.0
MAX_SENSOR_INFLUENCE_DISTANCE = 0.08
def _safe_float(value: Any) -> float | None:
try:
if value in (None, ""):
return None
return float(value)
except (TypeError, ValueError):
return None
def _sensor_time_series(sensor: Any) -> list[dict[str, Any]]:
sensor_block = sensor.get_sensor_block() if hasattr(sensor, "get_sensor_block") else {}
soil_moisture = _safe_float(getattr(sensor, "soil_moisture", None))
measured_at = sensor_block.get("timestamp") or sensor_block.get("measured_at")
if measured_at is None and getattr(sensor, "updated_at", None):
measured_at = sensor.updated_at.isoformat()
return [
{
"timestamp": measured_at,
"value": soil_moisture,
"quality_flag": QUALITY_REAL if soil_moisture is not None else QUALITY_MISSING,
}
]
def _parse_timestamp(value: Any) -> datetime | None:
if isinstance(value, datetime):
return value
if not value:
return None
if isinstance(value, str):
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if timezone.is_naive(parsed):
return timezone.make_aware(parsed, timezone.get_current_timezone())
return parsed
return None
def _hours_since(timestamp: Any) -> float | None:
parsed = _parse_timestamp(timestamp)
if parsed is None:
return None
delta = timezone.now() - parsed
return max(delta.total_seconds() / 3600.0, 0.0)
def _freshness_weight(timestamp: Any) -> float:
age_hours = _hours_since(timestamp)
if age_hours is None:
return 0.65
return 1.0 / (1.0 + (age_hours / FRESHNESS_HALF_LIFE_HOURS))
def _sensor_anomaly_penalty(value: float | None, network_values: list[float]) -> float:
if value is None or len(network_values) < 3:
return 1.0
center = median(network_values)
deviations = [abs(item - center) for item in network_values]
typical_deviation = median(deviations) or 1.0
normalized_distance = abs(value - center) / typical_deviation
return max(0.45, min(1.0, 1.15 - (normalized_distance * 0.18)))
def _boundary_points(sensor: Any) -> list[tuple[float, float]]:
boundary = getattr(sensor.center_location, "farm_boundary", None) or {}
coordinates = []
if isinstance(boundary, dict) and boundary.get("type") == "Polygon":
coordinates = boundary.get("coordinates") or []
if coordinates and isinstance(coordinates[0], list):
return [(float(point[1]), float(point[0])) for point in coordinates[0] if len(point) >= 2]
corners = boundary.get("corners") if isinstance(boundary, dict) else boundary if isinstance(boundary, list) else []
points = []
for point in corners or []:
if isinstance(point, dict) and point.get("lat") is not None and point.get("lon") is not None:
points.append((float(point["lat"]), float(point["lon"])))
return points
def _point_in_polygon(lat: float, lon: float, polygon: list[tuple[float, float]]) -> bool:
if len(polygon) < 3:
return True
inside = False
for index in range(len(polygon)):
lat1, lon1 = polygon[index]
lat2, lon2 = polygon[(index + 1) % len(polygon)]
intersects = ((lon1 > lon) != (lon2 > lon)) and (
lat < ((lat2 - lat1) * (lon - lon1) / max(lon2 - lon1, 1e-12)) + lat1
)
if intersects:
inside = not inside
return inside
def _latest_sensor_measurement(sensor: Any, network_values: list[float]) -> dict[str, Any]:
series = _sensor_time_series(sensor)
latest = series[-1] if series else {"timestamp": None, "value": None, "quality_flag": QUALITY_MISSING}
reliability = _sensor_anomaly_penalty(latest["value"], network_values) * _freshness_weight(latest["timestamp"])
return {
"sensor_id": str(sensor.farm_uuid),
"latitude": float(sensor.center_location.latitude),
"longitude": float(sensor.center_location.longitude),
"depth": None,
"timestamp": latest["timestamp"],
"soil_moisture_value": latest["value"],
"quality_flag": latest["quality_flag"],
"freshness_weight": round(_freshness_weight(latest["timestamp"]), 4),
"reliability_score": round(reliability, 4),
}
def _spatial_weight(distance: float) -> float:
if distance == 0:
return 1.0
if distance > MAX_SENSOR_INFLUENCE_DISTANCE:
return 0.0
return 1 / (distance**IDW_POWER)
def _interpolate_cell(
lat: float,
lon: float,
sensor_points: list[dict[str, Any]],
) -> tuple[float | None, str, float]:
weighted_sum = 0.0
weight_total = 0.0
min_distance = None
for point in sensor_points:
value = point["soil_moisture_value"]
if value is None:
continue
distance = sqrt(((lat - point["latitude"]) ** 2) + ((lon - point["longitude"]) ** 2))
min_distance = distance if min_distance is None else min(min_distance, distance)
if distance == 0:
return round(float(value), 2), point["quality_flag"], 1.0
spatial_weight = _spatial_weight(distance)
if spatial_weight == 0.0:
continue
composite_weight = spatial_weight * float(point.get("reliability_score", 1.0))
weighted_sum += composite_weight * float(value)
weight_total += composite_weight
if weight_total == 0.0:
return None, QUALITY_MISSING, 0.0
uncertainty = 1.0 - min(weight_total / (weight_total + 6.0), 1.0)
quality_flag = QUALITY_INTERPOLATED
if min_distance is not None and min_distance > (MAX_SENSOR_INFLUENCE_DISTANCE / 2):
quality_flag = QUALITY_EXTRAPOLATED
return round(weighted_sum / weight_total, 2), quality_flag, round(max(0.0, min(1.0, uncertainty)), 4)
def _grid_axis(min_value: float, max_value: float) -> list[float]:
if min_value == max_value:
return [round(min_value, 6)]
step_count = min(MAX_GRID_STEPS, max(int((max_value - min_value) / 0.0001) + 1, 2))
step = (max_value - min_value) / (step_count - 1)
return [round(min_value + (step * index), 6) for index in range(step_count)]
def _load_sensor_network(current_sensor: Any) -> list[Any]:
2026-05-05 01:46:10 +03:30
plant_ids = list(
current_sensor.plant_assignments.values_list("plant__backend_plant_id", flat=True)
)
queryset = SensorData.objects.select_related("center_location").prefetch_related(
"plant_assignments__plant",
)
2026-04-25 17:22:41 +03:30
if plant_ids:
2026-05-05 01:46:10 +03:30
queryset = queryset.filter(
plant_assignments__plant__backend_plant_id__in=plant_ids
).distinct()
2026-04-25 17:22:41 +03:30
return list(queryset)
def _soil_profile(sensor: Any) -> list[dict[str, Any]]:
2026-05-09 16:55:06 +03:30
snapshot = build_location_satellite_snapshot(sensor.center_location)
metrics = snapshot.get("resolved_metrics") or {}
if not metrics:
return []
2026-04-25 17:22:41 +03:30
return [
{
2026-05-09 16:55:06 +03:30
"depth_label": "surface_30x30_remote_sensing",
"field_capacity": metrics.get("ndwi"),
"wilting_point": None,
"saturation": None,
"nitrogen": None,
"ph": None,
"sand": None,
"silt": None,
"clay": None,
"ndvi": metrics.get("ndvi"),
"soil_vv_db": metrics.get("soil_vv_db"),
"dem_m": metrics.get("dem_m"),
"slope_deg": metrics.get("slope_deg"),
2026-04-25 17:22:41 +03:30
}
]
def _depth_layers(soil_profile: list[dict[str, Any]], grid_cells: list[dict[str, Any]]) -> list[dict[str, Any]]:
layers = []
if not soil_profile or not grid_cells:
return layers
for index, depth in enumerate(soil_profile):
depth_factor = max(0.72, 1.0 - (index * 0.08))
layer_cells = []
for cell in grid_cells:
if cell["moisture_value"] is None:
moisture_value = None
else:
moisture_value = round(cell["moisture_value"] * depth_factor, 2)
layer_cells.append(
{
"lat": cell["lat"],
"lon": cell["lon"],
"moisture_value": moisture_value,
"quality_flag": cell["quality_flag"],
"uncertainty": cell.get("uncertainty"),
}
)
layers.append(
{
"depth_label": depth.get("depth_label"),
"estimated_from_surface": True,
"cells": layer_cells,
}
)
return layers
def _heatmap_summary(sensor_points: list[dict[str, Any]], grid_cells: list[dict[str, Any]]) -> dict[str, Any]:
sensor_values = [point["soil_moisture_value"] for point in sensor_points if point["soil_moisture_value"] is not None]
uncertainties = [cell["uncertainty"] for cell in grid_cells if cell.get("uncertainty") is not None]
return {
"sensor_count": len(sensor_points),
"active_sensor_count": len(sensor_values),
"interpolation_model": "boundary_aware_weighted_idw",
"uses_sensor_history": False,
"uses_freshness_weighting": True,
"uses_boundary_mask": True,
"uses_outlier_penalty": True,
"avg_sensor_moisture": round(sum(sensor_values) / len(sensor_values), 2) if sensor_values else None,
"avg_uncertainty": round(sum(uncertainties) / len(uncertainties), 4) if uncertainties else None,
}
class SoilMoistureHeatmapService:
def get_heatmap(self, *, farm_uuid: str) -> dict[str, Any]:
2026-05-13 16:45:54 +03:30
# Spatial-first endpoint: preserve grid/cluster-aware output and never flatten the
# heatmap to a single farm-average metric. Aggregated summaries are supplementary only.
2026-04-25 17:22:41 +03:30
current_sensor = (
SensorData.objects.select_related("center_location")
2026-05-09 16:55:06 +03:30
.prefetch_related("plant_assignments__plant")
2026-04-25 17:22:41 +03:30
.filter(farm_uuid=farm_uuid)
.first()
)
if current_sensor is None:
raise ValueError("Farm not found.")
sensors = _load_sensor_network(current_sensor)
raw_network_values = [
_safe_float(getattr(sensor, "soil_moisture", None))
for sensor in sensors
if _safe_float(getattr(sensor, "soil_moisture", None)) is not None
]
sensor_points = [_latest_sensor_measurement(sensor, raw_network_values) for sensor in sensors]
valid_sensor_points = [point for point in sensor_points if point["soil_moisture_value"] is not None]
soil_profile = _soil_profile(current_sensor)
farm_polygon = _boundary_points(current_sensor)
if not valid_sensor_points:
return {
"farm_uuid": str(current_sensor.farm_uuid),
"location": {
"lat": float(current_sensor.center_location.latitude),
"lon": float(current_sensor.center_location.longitude),
},
"current_sensor": {
"soil_moisture": current_sensor.soil_moisture,
"soil_temperature": current_sensor.soil_temperature,
"soil_ph": current_sensor.soil_ph,
"electrical_conductivity": current_sensor.electrical_conductivity,
},
"soil_profile": soil_profile,
"depth_layers": [],
"timestamp": current_sensor.updated_at.isoformat() if getattr(current_sensor, "updated_at", None) else None,
"grid_resolution": None,
"grid_cells": [],
"sensor_points": sensor_points,
"model_metadata": {
"interpolation_model": "boundary_aware_weighted_idw",
"uses_sensor_history": False,
"limitations": [
"history واقعی سنسورها در مدل حاضر در دسترس نیست",
"depth layers از surface estimate مشتق می‌شوند",
],
},
"summary": _heatmap_summary(sensor_points, []),
2026-05-13 16:45:54 +03:30
"source_metadata": {
"endpoint_policy": "spatial_first",
"primary_source": "sensor_network_spatial_interpolation",
"agronomic_aggregation": "supplementary_only",
},
2026-04-25 17:22:41 +03:30
"quality_legend": {
QUALITY_REAL: "اندازه گیری واقعی سنسور",
QUALITY_INTERPOLATED: "مقدار برآوردشده با وزن دهی زمانی/فضایی",
QUALITY_MISSING: "داده معتبر در دسترس نیست",
QUALITY_EXTRAPOLATED: "برآورد در ناحیه دور از سنسورها",
},
}
if farm_polygon:
min_lat = min(point[0] for point in farm_polygon)
max_lat = max(point[0] for point in farm_polygon)
min_lon = min(point[1] for point in farm_polygon)
max_lon = max(point[1] for point in farm_polygon)
else:
min_lat = min(point["latitude"] for point in valid_sensor_points)
max_lat = max(point["latitude"] for point in valid_sensor_points)
min_lon = min(point["longitude"] for point in valid_sensor_points)
max_lon = max(point["longitude"] for point in valid_sensor_points)
lat_axis = _grid_axis(min_lat, max_lat)
lon_axis = _grid_axis(min_lon, max_lon)
grid_cells = []
for lat in lat_axis:
for lon in lon_axis:
if farm_polygon and not _point_in_polygon(lat, lon, farm_polygon):
grid_cells.append(
{
"lat": lat,
"lon": lon,
"moisture_value": None,
"quality_flag": QUALITY_MISSING,
"uncertainty": None,
"inside_farm_boundary": False,
}
)
continue
direct_sensor = next(
(
point
for point in valid_sensor_points
if point["latitude"] == lat and point["longitude"] == lon
),
None,
)
if direct_sensor is not None:
moisture_value = direct_sensor["soil_moisture_value"]
quality_flag = direct_sensor["quality_flag"]
uncertainty = round(1.0 - float(direct_sensor.get("reliability_score", 1.0)), 4)
else:
moisture_value, quality_flag, uncertainty = _interpolate_cell(lat, lon, valid_sensor_points)
grid_cells.append(
{
"lat": lat,
"lon": lon,
"moisture_value": moisture_value,
"quality_flag": quality_flag,
"uncertainty": uncertainty if moisture_value is not None else None,
"inside_farm_boundary": True,
}
)
lat_step = round(abs(lat_axis[1] - lat_axis[0]), 6) if len(lat_axis) > 1 else 0.0
lon_step = round(abs(lon_axis[1] - lon_axis[0]), 6) if len(lon_axis) > 1 else 0.0
timestamps = [point["timestamp"] for point in sensor_points if point["timestamp"]]
depth_layers = _depth_layers(soil_profile, [cell for cell in grid_cells if cell["inside_farm_boundary"]])
return {
"farm_uuid": str(current_sensor.farm_uuid),
"location": {
"lat": float(current_sensor.center_location.latitude),
"lon": float(current_sensor.center_location.longitude),
},
"current_sensor": {
"soil_moisture": current_sensor.soil_moisture,
"soil_temperature": current_sensor.soil_temperature,
"soil_ph": current_sensor.soil_ph,
"electrical_conductivity": current_sensor.electrical_conductivity,
"nitrogen": current_sensor.nitrogen,
"phosphorus": current_sensor.phosphorus,
"potassium": current_sensor.potassium,
},
"soil_profile": soil_profile,
"depth_layers": depth_layers,
"timestamp": max(timestamps) if timestamps else None,
"grid_resolution": {
"lat_step": lat_step,
"lon_step": lon_step,
"rows": len(lat_axis),
"cols": len(lon_axis),
},
"grid_cells": grid_cells,
"sensor_points": sensor_points,
"model_metadata": {
"interpolation_model": "boundary_aware_weighted_idw",
"uses_sensor_history": False,
"uses_freshness_weighting": True,
"uses_outlier_penalty": True,
"uses_depth_estimation": True,
"uses_boundary_mask": bool(farm_polygon),
"limitations": [
"history واقعی سنسورها در مدل حاضر ذخیره نشده است",
"depth layers از داده سطحی و پروفایل خاک مشتق شده‌اند",
"uncertainty به صورت heuristic برآورد می‌شود",
],
},
"summary": _heatmap_summary(sensor_points, [cell for cell in grid_cells if cell["inside_farm_boundary"]]),
2026-05-13 16:45:54 +03:30
"source_metadata": {
"endpoint_policy": "spatial_first",
"primary_source": "sensor_network_spatial_interpolation",
"agronomic_aggregation": "supplementary_only",
},
2026-04-25 17:22:41 +03:30
"quality_legend": {
QUALITY_REAL: "اندازه گیری واقعی سنسور",
QUALITY_INTERPOLATED: "مقدار برآوردشده با وزن دهی زمانی/فضایی",
QUALITY_MISSING: "داده معتبر در دسترس نیست",
QUALITY_EXTRAPOLATED: "برآورد در ناحیه دور از سنسورها",
},
}
class SoilHealthService:
def get_health_summary(self, *, farm_uuid: str) -> dict[str, Any]:
sensor = (
SensorData.objects.select_related("center_location")
2026-05-05 01:46:10 +03:30
.prefetch_related("plant_assignments__plant")
2026-04-25 17:22:41 +03:30
.filter(farm_uuid=farm_uuid)
.first()
)
if sensor is None:
raise ValueError("Farm not found.")
return {
"farm_uuid": str(sensor.farm_uuid),
2026-05-05 01:46:10 +03:30
**build_soil_health_summary(
sensor,
list(sensor.plant_snapshots),
),
2026-04-25 17:22:41 +03:30
}
class SoilAnomalyDetectionService:
def get_anomaly_detection(self, *, farm_uuid: str) -> dict[str, Any]:
context = load_farm_context(farm_uuid)
if context is None:
raise ValueError("Farm not found.")
anomaly_payload = build_anomaly_detection_card(
sensor_id=farm_uuid,
context=context,
ai_bundle=None,
)
rag_payload = get_soil_anomaly_insight(
farm_uuid=farm_uuid,
anomaly_payload=anomaly_payload,
)
return {
"farm_uuid": farm_uuid,
**anomaly_payload,
**rag_payload,
}