Files
Ai/location_data/test_data_driven_subdivision.py
T
2026-05-10 22:49:07 +03:30

176 lines
6.2 KiB
Python

from datetime import date
from django.test import TestCase
from location_data.data_driven_subdivision import (
EmptyObservationDatasetError,
build_clustering_dataset,
sync_block_subdivision_with_result,
)
from location_data.models import (
AnalysisGridCell,
AnalysisGridObservation,
BlockSubdivision,
RemoteSensingRun,
RemoteSensingSubdivisionResult,
SoilLocation,
)
class DataDrivenSubdivisionSyncTests(TestCase):
def setUp(self):
self.boundary = {
"type": "Polygon",
"coordinates": [
[
[51.3890, 35.6890],
[51.3900, 35.6890],
[51.3900, 35.6900],
[51.3890, 35.6900],
[51.3890, 35.6890],
]
],
}
self.location = SoilLocation.objects.create(
latitude="35.689200",
longitude="51.389000",
farm_boundary=self.boundary,
)
self.subdivision = BlockSubdivision.objects.create(
soil_location=self.location,
block_code="block-1",
source_boundary=self.boundary,
chunk_size_sqm=900,
status="defined",
)
self.run = RemoteSensingRun.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
chunk_size_sqm=900,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
status=RemoteSensingRun.STATUS_SUCCESS,
)
def test_sync_block_subdivision_with_result_updates_saved_sub_blocks(self):
cell_1 = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-1",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689200",
centroid_lon="51.389200",
)
cell_2 = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-2",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689700",
centroid_lon="51.389700",
)
observation_1 = AnalysisGridObservation.objects.create(
cell=cell_1,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.5,
)
observation_2 = AnalysisGridObservation.objects.create(
cell=cell_2,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.7,
)
result = RemoteSensingSubdivisionResult.objects.create(
soil_location=self.location,
run=self.run,
block_subdivision=self.subdivision,
block_code="block-1",
chunk_size_sqm=900,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
cluster_count=2,
selected_features=["ndvi"],
metadata={
"used_cell_count": 2,
"skipped_cell_count": 0,
"inertia_curve": [{"k": 1, "sse": 1.0}, {"k": 2, "sse": 0.1}],
},
)
sync_block_subdivision_with_result(
block_subdivision=self.subdivision,
result=result,
observations=[observation_1, observation_2],
cluster_summaries=[
{
"cluster_label": 0,
"centroid_lat": 35.6892,
"centroid_lon": 51.3892,
"cell_count": 1,
"cell_codes": ["cell-1"],
},
{
"cluster_label": 1,
"centroid_lat": 35.6897,
"centroid_lon": 51.3897,
"cell_count": 1,
"cell_codes": ["cell-2"],
},
],
)
self.subdivision.refresh_from_db()
self.assertEqual(self.subdivision.status, "subdivided")
self.assertEqual(self.subdivision.grid_point_count, 2)
self.assertEqual(self.subdivision.centroid_count, 2)
self.assertEqual(self.subdivision.grid_points[0]["cell_code"], "cell-1")
self.assertEqual(self.subdivision.centroid_points[0]["sub_block_code"], "cluster-0")
self.assertEqual(
self.subdivision.metadata["data_driven_subdivision"]["cluster_count"],
2,
)
def test_build_clustering_dataset_raises_clear_error_when_all_selected_features_are_null(self):
cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-null",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689200",
centroid_lon="51.389200",
)
observation = AnalysisGridObservation.objects.create(
cell=cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
metadata={"job_refs": {"ndvi": "job-1"}},
)
with self.assertLogs("location_data.data_driven_subdivision", level="ERROR") as captured:
with self.assertRaisesRegex(
EmptyObservationDatasetError,
"Upstream processing completed but no usable feature values were persisted.",
):
build_clustering_dataset(
observations=[observation],
selected_features=["ndvi", "ndwi", "lst_c", "soil_vv_db"],
run=self.run,
location=self.location,
)
joined = "\n".join(captured.output)
self.assertIn("No usable observations available for clustering", joined)
self.assertIn('"run_id": {}'.format(self.run.id), joined)
self.assertIn('"region_id": {}'.format(self.location.id), joined)