Files
Ai/location_data/satellite_snapshot.py
T
2026-05-11 00:36:02 +03:30

116 lines
3.2 KiB
Python

from __future__ import annotations
from typing import Any
from django.db.models import Avg, QuerySet
from .models import AnalysisGridObservation, RemoteSensingRun, SoilLocation
SATELLITE_METRIC_FIELDS = (
"ndvi",
"ndwi",
"soil_vv_db",
"dem_m",
"slope_deg",
)
def build_location_satellite_snapshot(
location: SoilLocation,
*,
block_code: str = "",
) -> dict[str, Any]:
run = get_latest_completed_remote_sensing_run(location, block_code=block_code)
if run is None:
return {
"status": "missing",
"block_code": block_code,
"run_id": None,
"temporal_extent": None,
"cell_count": 0,
"resolved_metrics": {},
"metric_sources": {},
}
observations = get_run_observations(run)
summary = summarize_observations(observations)
return {
"status": "completed",
"block_code": run.block_code,
"run_id": run.id,
"temporal_extent": {
"start_date": run.temporal_start.isoformat() if run.temporal_start else None,
"end_date": run.temporal_end.isoformat() if run.temporal_end else None,
},
"cell_count": observations.count(),
"resolved_metrics": summary,
"metric_sources": {
metric_name: "remote_sensing"
for metric_name in summary
},
}
def build_location_block_satellite_snapshots(location: SoilLocation) -> list[dict[str, Any]]:
block_layout = location.block_layout or {}
blocks = block_layout.get("blocks") or []
if not blocks:
return [build_location_satellite_snapshot(location)]
snapshots = []
for block in blocks:
snapshots.append(
build_location_satellite_snapshot(
location,
block_code=str(block.get("block_code") or "").strip(),
)
)
return snapshots
def get_latest_completed_remote_sensing_run(
location: SoilLocation,
*,
block_code: str = "",
) -> RemoteSensingRun | None:
return (
RemoteSensingRun.objects.filter(
soil_location=location,
block_code=block_code or "",
status=RemoteSensingRun.STATUS_SUCCESS,
)
.order_by("-temporal_end", "-created_at", "-id")
.first()
)
def get_run_observations(run: RemoteSensingRun) -> QuerySet[AnalysisGridObservation]:
return (
AnalysisGridObservation.objects.select_related("cell", "run")
.filter(
cell__soil_location=run.soil_location,
cell__block_code=run.block_code or "",
temporal_start=run.temporal_start,
temporal_end=run.temporal_end,
)
.order_by("cell__cell_code")
)
def summarize_observations(
observations: QuerySet[AnalysisGridObservation],
) -> dict[str, float]:
aggregates = observations.aggregate(
**{
f"{metric_name}_mean": Avg(metric_name)
for metric_name in SATELLITE_METRIC_FIELDS
}
)
summary: dict[str, float] = {}
for metric_name in SATELLITE_METRIC_FIELDS:
value = aggregates.get(f"{metric_name}_mean")
if value is None:
continue
summary[metric_name] = round(float(value), 6)
return summary