Files
Ai/location_data/test_data_driven_subdivision.py
T
2026-05-11 04:38:44 +03:30

573 lines
23 KiB
Python

from datetime import date
import os
from tempfile import TemporaryDirectory
from unittest.mock import patch
from django.core.files.base import ContentFile
from django.test import TestCase
from location_data.data_driven_subdivision import (
ClusteringDataset,
EmptyObservationDatasetError,
_persist_remote_sensing_diagnostic_artifacts,
_build_observation_label,
_build_cluster_geometry,
build_cluster_summaries,
build_clustering_dataset,
create_remote_sensing_subdivision_result,
enforce_spatial_contiguity,
sync_block_subdivision_with_result,
)
from location_data.models import (
AnalysisGridCell,
AnalysisGridObservation,
BlockSubdivision,
RemoteSensingClusterBlock,
RemoteSensingRun,
RemoteSensingSubdivisionResult,
RemoteSensingSubdivisionOption,
SoilLocation,
)
class DataDrivenSubdivisionSyncTests(TestCase):
def setUp(self):
self.boundary = {
"type": "Polygon",
"coordinates": [
[
[51.3890, 35.6890],
[51.3900, 35.6890],
[51.3900, 35.6900],
[51.3890, 35.6900],
[51.3890, 35.6890],
]
],
}
self.location = SoilLocation.objects.create(
latitude="35.689200",
longitude="51.389000",
farm_boundary=self.boundary,
)
self.subdivision = BlockSubdivision.objects.create(
soil_location=self.location,
block_code="block-1",
source_boundary=self.boundary,
chunk_size_sqm=900,
status="defined",
)
self.run = RemoteSensingRun.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
chunk_size_sqm=900,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
status=RemoteSensingRun.STATUS_SUCCESS,
)
@patch("location_data.data_driven_subdivision.render_elbow_plot", return_value=None)
def test_sync_block_subdivision_with_result_updates_saved_sub_blocks(self, _mock_plot):
cell_1 = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-1",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689200",
centroid_lon="51.389200",
)
cell_2 = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-2",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689700",
centroid_lon="51.389700",
)
observation_1 = AnalysisGridObservation.objects.create(
cell=cell_1,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.5,
)
observation_2 = AnalysisGridObservation.objects.create(
cell=cell_2,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.7,
)
result = RemoteSensingSubdivisionResult.objects.create(
soil_location=self.location,
run=self.run,
block_subdivision=self.subdivision,
block_code="block-1",
chunk_size_sqm=900,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
cluster_count=2,
selected_features=["ndvi"],
metadata={
"used_cell_count": 2,
"skipped_cell_count": 0,
"inertia_curve": [{"k": 1, "sse": 1.0}, {"k": 2, "sse": 0.1}],
},
)
sync_block_subdivision_with_result(
block_subdivision=self.subdivision,
result=result,
observations=[observation_1, observation_2],
cluster_summaries=[
{
"cluster_label": 0,
"centroid_lat": 35.6892,
"centroid_lon": 51.3892,
"cell_count": 1,
"cell_codes": ["cell-1"],
},
{
"cluster_label": 1,
"centroid_lat": 35.6897,
"centroid_lon": 51.3897,
"cell_count": 1,
"cell_codes": ["cell-2"],
},
],
)
self.subdivision.refresh_from_db()
self.assertEqual(self.subdivision.status, "subdivided")
self.assertEqual(self.subdivision.grid_point_count, 2)
self.assertEqual(self.subdivision.centroid_count, 2)
self.assertEqual(self.subdivision.grid_points[0]["cell_code"], "cell-1")
self.assertEqual(self.subdivision.centroid_points[0]["sub_block_code"], "cluster-0")
self.assertEqual(
self.subdivision.metadata["data_driven_subdivision"]["cluster_count"],
2,
)
self.assertIn("diagnostic_artifacts", self.subdivision.metadata["data_driven_subdivision"])
def test_persist_remote_sensing_diagnostic_artifacts_saves_expected_images(self):
cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-1",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689200",
centroid_lon="51.389200",
)
observation = AnalysisGridObservation.objects.create(
cell=cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.5,
ndwi=0.2,
soil_vv_db=-8.0,
)
result = RemoteSensingSubdivisionResult.objects.create(
soil_location=self.location,
run=self.run,
block_subdivision=self.subdivision,
block_code="block-1",
chunk_size_sqm=900,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
cluster_count=1,
selected_features=["ndvi", "ndwi", "soil_vv_db"],
metadata={"inertia_curve": [{"k": 1, "sse": 0.0}]},
)
with TemporaryDirectory() as temp_dir:
with patch.dict(os.environ, {"REMOTE_SENSING_DIAGNOSTIC_DIR": temp_dir}, clear=False), patch(
"location_data.data_driven_subdivision.render_elbow_plot",
return_value=ContentFile(b"elbow"),
), patch(
"location_data.data_driven_subdivision._render_cluster_map_plot",
return_value=ContentFile(b"map"),
), patch(
"location_data.data_driven_subdivision._render_cluster_size_plot",
return_value=ContentFile(b"sizes"),
), patch(
"location_data.data_driven_subdivision._render_feature_pair_plot",
return_value=ContentFile(b"pairs"),
), patch(
"location_data.data_driven_subdivision._render_feature_projection_plot",
return_value=ContentFile(b"projection"),
):
artifacts = _persist_remote_sensing_diagnostic_artifacts(
result=result,
observations=[observation],
labels=[0],
cluster_summaries=[
{
"cluster_label": 0,
"cell_count": 1,
"centroid_lat": 35.6892,
"centroid_lon": 51.3892,
"cell_codes": ["cell-1"],
}
],
selected_features=["ndvi", "ndwi", "soil_vv_db"],
scaled_matrix=[[0.0, 0.0, 0.0]],
inertia_curve=[{"k": 1, "sse": 0.0}],
requested_k=1,
effective_cluster_count=1,
)
self.assertEqual(
sorted(artifacts["files"].keys()),
[
"cluster_map",
"cluster_sizes",
"elbow_plot",
"feature_pairs",
"feature_projection",
],
)
self.assertIn("k-1-effective-1", artifacts["directory"])
for path in artifacts["files"].values():
self.assertTrue(os.path.exists(path))
self.assertIn("__k-1__effective-1__", path)
def test_build_clustering_dataset_raises_clear_error_when_all_selected_features_are_null(self):
cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-null",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689200",
centroid_lon="51.389200",
)
observation = AnalysisGridObservation.objects.create(
cell=cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
metadata={"job_refs": {"ndvi": "job-1"}},
)
with self.assertLogs("location_data.data_driven_subdivision", level="ERROR") as captured:
with self.assertRaisesRegex(
EmptyObservationDatasetError,
"Upstream processing completed but no usable feature values were persisted.",
):
build_clustering_dataset(
observations=[observation],
selected_features=["ndvi", "ndwi", "soil_vv_db"],
run=self.run,
location=self.location,
)
joined = "\n".join(captured.output)
self.assertIn("No usable observations available for clustering", joined)
self.assertIn('"run_id": {}'.format(self.run.id), joined)
self.assertIn('"region_id": {}'.format(self.location.id), joined)
def test_build_cluster_summaries_selects_middle_grid_as_k_center(self):
observations = []
for index in range(3):
cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code=f"cell-{index}",
chunk_size_sqm=900,
geometry={
"type": "Polygon",
"coordinates": [[
[51.3890 + (index * 0.0001), 35.6890],
[51.3891 + (index * 0.0001), 35.6890],
[51.3891 + (index * 0.0001), 35.6891],
[51.3890 + (index * 0.0001), 35.6891],
[51.3890 + (index * 0.0001), 35.6890],
]],
},
centroid_lat="35.689200",
centroid_lon=f"{51.3892 + (index * 0.0001):.6f}",
)
observations.append(
AnalysisGridObservation.objects.create(
cell=cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.2 + index,
)
)
cluster_summaries = build_cluster_summaries(
observations=observations,
labels=[0, 0, 0],
)
self.assertEqual(cluster_summaries[0]["center_cell_code"], "cell-1")
self.assertEqual(cluster_summaries[0]["center_cell_lat"], 35.6892)
self.assertEqual(cluster_summaries[0]["center_cell_lon"], 51.3893)
def test_build_observation_label_uses_numeric_index_for_30m_cells(self):
cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-arbitrary-name",
chunk_size_sqm=900,
geometry=self.boundary,
centroid_lat="35.689200",
centroid_lon="51.389200",
)
observation = AnalysisGridObservation.objects.create(
cell=cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.5,
)
self.assertEqual(_build_observation_label(observation=observation, index=0), "1")
self.assertEqual(_build_observation_label(observation=observation, index=7), "8")
@patch("location_data.data_driven_subdivision.run_kmeans_labels", return_value=[0, 1, 1])
@patch("location_data.data_driven_subdivision.choose_cluster_count", return_value=(2, []))
@patch("location_data.data_driven_subdivision.build_clustering_dataset")
@patch("location_data.data_driven_subdivision._persist_remote_sensing_diagnostic_artifacts", return_value={})
@patch("location_data.data_driven_subdivision.render_elbow_plot", return_value=None)
def test_create_remote_sensing_subdivision_result_persists_cluster_blocks_with_geometry(
self,
_mock_plot,
_mock_artifacts,
mock_build_dataset,
_mock_choose_k,
_mock_run_kmeans,
):
cells = [
AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code=f"cell-{index}",
chunk_size_sqm=900,
geometry={
"type": "Polygon",
"coordinates": [[
[51.3890 + (index * 0.0001), 35.6890],
[51.3891 + (index * 0.0001), 35.6890],
[51.3891 + (index * 0.0001), 35.6891],
[51.3890 + (index * 0.0001), 35.6891],
[51.3890 + (index * 0.0001), 35.6890],
]],
},
centroid_lat=f"{35.6892 + (index * 0.0001):.6f}",
centroid_lon=f"{51.3892 + (index * 0.0001):.6f}",
)
for index in range(3)
]
observations = [
AnalysisGridObservation.objects.create(
cell=cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.2 + (index * 0.3),
ndwi=0.1 + (index * 0.2),
soil_vv_db=-8.0 + index,
)
for index, cell in enumerate(cells)
]
mock_build_dataset.return_value = ClusteringDataset(
observations=observations,
selected_features=["ndvi", "ndwi", "soil_vv_db"],
raw_feature_rows=[[0.2, 0.1, -8.0], [0.5, 0.3, -7.0], [0.8, 0.5, -6.0]],
raw_feature_maps=[
{"ndvi": 0.2, "ndwi": 0.1, "soil_vv_db": -8.0},
{"ndvi": 0.5, "ndwi": 0.3, "soil_vv_db": -7.0},
{"ndvi": 0.8, "ndwi": 0.5, "soil_vv_db": -6.0},
],
skipped_cell_codes=[],
used_cell_codes=[cell.cell_code for cell in cells],
imputed_matrix=[[0.2, 0.1, -8.0], [0.5, 0.3, -7.0], [0.8, 0.5, -6.0]],
scaled_matrix=[[-1.0, -1.0, -1.0], [0.0, 0.0, 0.0], [1.0, 1.0, 1.0]],
imputer_statistics={"ndvi": 0.5, "ndwi": 0.3, "soil_vv_db": -7.0},
scaler_means={"ndvi": 0.5, "ndwi": 0.3, "soil_vv_db": -7.0},
scaler_scales={"ndvi": 0.1, "ndwi": 0.1, "soil_vv_db": 1.0},
missing_value_counts={"ndvi": 0, "ndwi": 0, "soil_vv_db": 0},
skipped_reasons={"all_features_missing": []},
)
result = create_remote_sensing_subdivision_result(
location=self.location,
run=self.run,
observations=observations,
block_subdivision=self.subdivision,
block_code="block-1",
selected_features=["ndvi", "ndwi", "soil_vv_db"],
explicit_k=2,
)
self.assertEqual(_mock_artifacts.call_count, 4)
requested_ks = sorted(
{
call.kwargs.get("requested_k")
for call in _mock_artifacts.call_args_list
if call.kwargs.get("requested_k") is not None
}
)
self.assertEqual(requested_ks, [1, 2, 3])
cluster_blocks = list(result.cluster_blocks.order_by("cluster_label"))
self.assertEqual(len(cluster_blocks), 2)
self.assertTrue(all(cluster_block.uuid for cluster_block in cluster_blocks))
self.assertTrue(all(cluster_block.geometry for cluster_block in cluster_blocks))
self.assertEqual(
sum(cluster_block.cell_count for cluster_block in cluster_blocks),
3,
)
self.assertEqual(RemoteSensingClusterBlock.objects.filter(result=result).count(), 2)
result.refresh_from_db()
cluster_summaries = result.metadata["cluster_summaries"]
self.assertTrue(all(summary.get("cluster_uuid") for summary in cluster_summaries))
self.assertTrue(all(summary.get("geometry") for summary in cluster_summaries))
self.assertEqual(cluster_summaries[0]["center_cell_code"], "cell-0")
self.assertEqual(cluster_summaries[1]["center_cell_code"], "cell-1")
self.assertEqual(result.cluster_count, 2)
self.assertEqual(
result.metadata["spatial_constraint"]["final_cluster_count"],
2,
)
self.assertEqual(
list(
RemoteSensingSubdivisionOption.objects.filter(result=result)
.order_by("requested_k")
.values_list("requested_k", flat=True)
),
[1, 2, 3],
)
self.assertEqual(result.options.filter(is_active=True).get().requested_k, 2)
self.assertEqual(result.options.filter(is_recommended=True).get().requested_k, 2)
self.subdivision.refresh_from_db()
self.assertTrue(all(point.get("cluster_uuid") for point in self.subdivision.centroid_points))
self.assertEqual(self.subdivision.centroid_points[1]["center_cell_code"], "cell-1")
self.location.refresh_from_db()
block_layout = self.location.block_layout["blocks"][0]
self.assertTrue(all(block.get("cluster_uuid") for block in block_layout["sub_blocks"]))
self.assertEqual(block_layout["sub_blocks"][1]["center_cell_code"], "cell-1")
self.assertEqual(cluster_blocks[1].geometry["type"], "Polygon")
self.assertEqual(cluster_blocks[1].center_cell_code, "cell-1")
def test_enforce_spatial_contiguity_merges_diagonal_island_into_adjacent_cluster(self):
cell_payloads = [
("cell-00", [[51.3890, 35.6890], [51.3891, 35.6890], [51.3891, 35.6891], [51.3890, 35.6891], [51.3890, 35.6890]]),
("cell-01", [[51.3891, 35.6890], [51.3892, 35.6890], [51.3892, 35.6891], [51.3891, 35.6891], [51.3891, 35.6890]]),
("cell-10", [[51.3890, 35.6891], [51.3891, 35.6891], [51.3891, 35.6892], [51.3890, 35.6892], [51.3890, 35.6891]]),
("cell-11", [[51.3891, 35.6891], [51.3892, 35.6891], [51.3892, 35.6892], [51.3891, 35.6892], [51.3891, 35.6891]]),
]
observations = []
for index, (cell_code, ring) in enumerate(cell_payloads):
cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code=cell_code,
chunk_size_sqm=900,
geometry={"type": "Polygon", "coordinates": [ring]},
centroid_lat=f"{35.68905 + (index // 2) * 0.0001:.6f}",
centroid_lon=f"{51.38905 + (index % 2) * 0.0001:.6f}",
)
observations.append(
AnalysisGridObservation.objects.create(
cell=cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.1 + index,
)
)
labels, metadata = enforce_spatial_contiguity(
observations=observations,
labels=[0, 1, 1, 0],
scaled_matrix=[
[0.0, 0.0, 0.0],
[1.0, 1.0, 1.0],
[1.1, 1.1, 1.1],
[0.1, 0.1, 0.1],
],
)
self.assertEqual(labels, [0, 1, 1, 1])
self.assertTrue(metadata["applied"])
self.assertEqual(metadata["disconnected_components_merged"], 1)
def test_build_cluster_geometry_returns_single_polygon_for_adjacent_cells(self):
left_cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-left",
chunk_size_sqm=900,
geometry={
"type": "Polygon",
"coordinates": [[
[51.3890, 35.6890],
[51.3891, 35.6890],
[51.3891, 35.6891],
[51.3890, 35.6891],
[51.3890, 35.6890],
]],
},
centroid_lat="35.689050",
centroid_lon="51.389050",
)
right_cell = AnalysisGridCell.objects.create(
soil_location=self.location,
block_subdivision=self.subdivision,
block_code="block-1",
cell_code="cell-right",
chunk_size_sqm=900,
geometry={
"type": "Polygon",
"coordinates": [[
[51.3891, 35.6890],
[51.3892, 35.6890],
[51.3892, 35.6891],
[51.3891, 35.6891],
[51.3891, 35.6890],
]],
},
centroid_lat="35.689050",
centroid_lon="51.389150",
)
observations = [
AnalysisGridObservation.objects.create(
cell=left_cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.4,
),
AnalysisGridObservation.objects.create(
cell=right_cell,
run=self.run,
temporal_start=date(2025, 1, 1),
temporal_end=date(2025, 1, 31),
ndvi=0.5,
),
]
geometry = _build_cluster_geometry(observations)
self.assertEqual(geometry["type"], "Polygon")
self.assertEqual(len(geometry["coordinates"][0]), 7)