Files
Ai/location_data/data_driven_subdivision.py
T
2026-05-11 04:38:44 +03:30

2143 lines
83 KiB
Python

from __future__ import annotations
from io import BytesIO
import math
import os
from pathlib import Path
from dataclasses import dataclass
from decimal import Decimal
import json
import logging
from typing import Any
from django.conf import settings
from django.core.files.base import ContentFile
from django.db import transaction
from .block_subdivision import detect_elbow_point, point_in_polygon, render_elbow_plot
from .models import (
AnalysisGridObservation,
BlockSubdivision,
RemoteSensingClusterBlock,
RemoteSensingClusterAssignment,
RemoteSensingRun,
RemoteSensingSubdivisionResult,
RemoteSensingSubdivisionOption,
RemoteSensingSubdivisionOptionAssignment,
RemoteSensingSubdivisionOptionBlock,
SoilLocation,
)
DEFAULT_CLUSTER_FEATURES = [
"ndvi",
"ndwi",
"soil_vv_db",
]
SUPPORTED_CLUSTER_FEATURES = tuple(DEFAULT_CLUSTER_FEATURES)
DEFAULT_RANDOM_STATE = 42
DEFAULT_MAX_K = 10
DEFAULT_REMOTE_SENSING_DIAGNOSTIC_DIR = "artifacts/remote_sensing_charts"
logger = logging.getLogger(__name__)
class DataDrivenSubdivisionError(Exception):
"""Raised when remote-sensing-driven subdivision can not be computed."""
class EmptyObservationDatasetError(DataDrivenSubdivisionError):
"""Raised when upstream persistence completes without usable clustering features."""
@dataclass
class ClusteringDataset:
observations: list[AnalysisGridObservation]
selected_features: list[str]
raw_feature_rows: list[list[float | None]]
raw_feature_maps: list[dict[str, float | None]]
skipped_cell_codes: list[str]
used_cell_codes: list[str]
imputed_matrix: list[list[float]]
scaled_matrix: list[list[float]]
imputer_statistics: dict[str, float | None]
scaler_means: dict[str, float]
scaler_scales: dict[str, float]
missing_value_counts: dict[str, int]
skipped_reasons: dict[str, list[str]]
@dataclass
class SubdivisionOptionPayload:
requested_k: int
effective_cluster_count: int
labels: list[int]
cluster_summaries: list[dict[str, Any]]
spatial_constraint_metadata: dict[str, Any]
assignment_rows: list[dict[str, Any]]
cluster_block_rows: list[dict[str, Any]]
def create_remote_sensing_subdivision_result(
*,
location: SoilLocation,
run: RemoteSensingRun,
observations: list[AnalysisGridObservation],
block_subdivision: BlockSubdivision | None = None,
block_code: str = "",
selected_features: list[str] | None = None,
explicit_k: int | None = None,
max_k: int = DEFAULT_MAX_K,
random_state: int = DEFAULT_RANDOM_STATE,
) -> RemoteSensingSubdivisionResult:
"""
Build a data-driven subdivision result from stored remote sensing observations.
KMeans is applied on actual per-cell feature vectors, not geometric points.
"""
dataset = build_clustering_dataset(
observations=observations,
selected_features=selected_features,
run=run,
location=location,
)
if not dataset.observations:
raise DataDrivenSubdivisionError("هیچ observation قابل استفاده‌ای برای خوشه‌بندی باقی نماند.")
optimal_k, inertia_curve = choose_cluster_count(
scaled_matrix=dataset.scaled_matrix,
explicit_k=explicit_k,
max_k=max_k,
random_state=random_state,
)
cluster_selection_strategy = "explicit_k" if explicit_k is not None else "elbow"
option_payloads = build_subdivision_option_payloads(
dataset=dataset,
max_k=max_k,
random_state=random_state,
)
if not option_payloads:
raise DataDrivenSubdivisionError("هیچ گزینه K معتبری برای ذخیره‌سازی subdivision ساخته نشد.")
active_requested_k = min(
int(explicit_k if explicit_k is not None else optimal_k),
max(option_payload.requested_k for option_payload in option_payloads),
)
active_option_payload = next(
(
option_payload
for option_payload in option_payloads
if option_payload.requested_k == active_requested_k
),
option_payloads[-1],
)
effective_cluster_count = active_option_payload.effective_cluster_count
cluster_summaries = active_option_payload.cluster_summaries
with transaction.atomic():
result, _created = RemoteSensingSubdivisionResult.objects.update_or_create(
run=run,
defaults={
"soil_location": location,
"block_subdivision": block_subdivision,
"block_code": block_code,
"chunk_size_sqm": run.chunk_size_sqm,
"temporal_start": run.temporal_start,
"temporal_end": run.temporal_end,
"cluster_count": effective_cluster_count,
"selected_features": dataset.selected_features,
"skipped_cell_codes": dataset.skipped_cell_codes,
"metadata": {
"cell_count": len(observations),
"used_cell_count": len(dataset.observations),
"skipped_cell_count": len(dataset.skipped_cell_codes),
"used_cell_codes": dataset.used_cell_codes,
"skipped_reasons": dataset.skipped_reasons,
"selected_features": dataset.selected_features,
"imputer_strategy": "median",
"imputer_statistics": dataset.imputer_statistics,
"missing_value_counts": dataset.missing_value_counts,
"scaler_means": dataset.scaler_means,
"scaler_scales": dataset.scaler_scales,
"kmeans_params": {
"random_state": random_state,
"explicit_k": explicit_k,
"selected_k": optimal_k,
"recommended_k": optimal_k,
"active_requested_k": active_requested_k,
"effective_k": effective_cluster_count,
"max_k": max_k,
"n_init": 10,
"selection_strategy": cluster_selection_strategy,
},
"recommended_requested_k": optimal_k,
"active_requested_k": active_requested_k,
"available_k_options": [
{
"requested_k": option_payload.requested_k,
"effective_cluster_count": option_payload.effective_cluster_count,
}
for option_payload in option_payloads
],
"spatial_constraint": active_option_payload.spatial_constraint_metadata,
"inertia_curve": inertia_curve,
"cluster_summaries": cluster_summaries,
},
},
)
result.assignments.all().delete()
result.cluster_blocks.all().delete()
result.options.all().delete()
option_objects = persist_subdivision_options(
result=result,
location=location,
block_subdivision=block_subdivision,
option_payloads=option_payloads,
recommended_requested_k=optimal_k,
active_requested_k=active_requested_k,
chunk_size_sqm=run.chunk_size_sqm,
)
persist_subdivision_option_artifacts(
result=result,
option_payloads=option_payloads,
option_objects=option_objects,
observations=dataset.observations,
selected_features=dataset.selected_features,
scaled_matrix=dataset.scaled_matrix,
inertia_curve=inertia_curve,
)
active_option = option_objects[active_requested_k]
activate_subdivision_option(
option=active_option,
selection_source="system",
recommended_requested_k=optimal_k,
)
result.refresh_from_db()
diagnostic_artifacts = _persist_remote_sensing_diagnostic_artifacts(
result=result,
observations=dataset.observations,
labels=active_option_payload.labels,
cluster_summaries=active_option_payload.cluster_summaries,
selected_features=dataset.selected_features,
scaled_matrix=dataset.scaled_matrix,
inertia_curve=inertia_curve,
)
if diagnostic_artifacts:
metadata = dict(result.metadata or {})
metadata["diagnostic_artifacts"] = diagnostic_artifacts
result.metadata = metadata
result.save(update_fields=["metadata", "updated_at"])
return result
def build_clustering_dataset(
*,
observations: list[AnalysisGridObservation],
selected_features: list[str] | None = None,
run: RemoteSensingRun | None = None,
location: SoilLocation | None = None,
) -> ClusteringDataset:
selected_features = list(selected_features or DEFAULT_CLUSTER_FEATURES)
invalid_features = [
feature_name
for feature_name in selected_features
if feature_name not in SUPPORTED_CLUSTER_FEATURES
]
if invalid_features:
raise DataDrivenSubdivisionError(
"ویژگی‌های نامعتبر برای خوشه‌بندی: "
+ ", ".join(sorted(invalid_features))
)
log_context = _build_clustering_log_context(
observations=observations,
selected_features=selected_features,
run=run,
location=location,
)
logger.info(
"Preparing clustering dataset: %s",
_serialize_log_payload(
{
**log_context,
"total_observations": len(observations),
"non_null_counts": _count_non_null_features(observations),
}
),
)
raw_rows: list[list[float | None]] = []
raw_maps: list[dict[str, float | None]] = []
usable_observations: list[AnalysisGridObservation] = []
skipped_cell_codes: list[str] = []
used_cell_codes: list[str] = []
missing_value_counts = {feature_name: 0 for feature_name in selected_features}
skipped_reasons = {"all_features_missing": []}
for observation in observations:
feature_map = {
feature_name: _coerce_float(getattr(observation, feature_name, None))
for feature_name in selected_features
}
for feature_name, value in feature_map.items():
if value is None:
missing_value_counts[feature_name] += 1
if all(value is None for value in feature_map.values()):
logger.debug(
"Skipping observation cell=%s: all clustering features are null | context=%s",
observation.cell.cell_code,
_serialize_log_payload(log_context),
)
skipped_cell_codes.append(observation.cell.cell_code)
skipped_reasons["all_features_missing"].append(observation.cell.cell_code)
continue
usable_observations.append(observation)
used_cell_codes.append(observation.cell.cell_code)
raw_maps.append(feature_map)
raw_rows.append([feature_map[feature_name] for feature_name in selected_features])
logger.info(
"Clustering dataset filtered observations: %s",
_serialize_log_payload(
{
**log_context,
"remaining_observations": len(usable_observations),
"removed_observations": len(observations) - len(usable_observations),
}
)
)
zero_usable_feature_names = [
feature_name for feature_name, missing_count in missing_value_counts.items() if missing_count == len(observations)
]
if zero_usable_feature_names and len(zero_usable_feature_names) < len(selected_features):
for feature_name in zero_usable_feature_names:
logger.warning(
"Feature %s has zero usable values in dataset | context=%s",
feature_name,
_serialize_log_payload(log_context),
)
if not usable_observations:
error_context = {
**log_context,
"total_observations": len(observations),
"removed_observations": len(observations),
"null_counts_per_feature": missing_value_counts,
"selected_features": selected_features,
}
logger.error(
"No usable observations available for clustering: %s",
_serialize_log_payload(error_context),
)
raise EmptyObservationDatasetError(
"Upstream processing completed but no usable feature values were persisted."
)
try:
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
except ImportError as exc: # pragma: no cover - runtime dependency guard
raise DataDrivenSubdivisionError(
"scikit-learn و numpy برای خوشه‌بندی داده‌محور لازم هستند."
) from exc
raw_matrix = np.array(raw_rows, dtype=float)
imputer = SimpleImputer(strategy="median")
imputed_matrix = imputer.fit_transform(raw_matrix)
scaler = StandardScaler()
scaled_matrix = scaler.fit_transform(imputed_matrix)
return ClusteringDataset(
observations=usable_observations,
selected_features=selected_features,
raw_feature_rows=raw_rows,
raw_feature_maps=raw_maps,
skipped_cell_codes=skipped_cell_codes,
used_cell_codes=used_cell_codes,
imputed_matrix=imputed_matrix.tolist(),
scaled_matrix=scaled_matrix.tolist(),
imputer_statistics={
feature_name: _coerce_float(imputer.statistics_[index])
for index, feature_name in enumerate(selected_features)
},
scaler_means={
feature_name: float(scaler.mean_[index])
for index, feature_name in enumerate(selected_features)
},
scaler_scales={
feature_name: float(scaler.scale_[index] or 1.0)
for index, feature_name in enumerate(selected_features)
},
missing_value_counts=missing_value_counts,
skipped_reasons=skipped_reasons,
)
def choose_cluster_count(
*,
scaled_matrix: list[list[float]],
explicit_k: int | None,
max_k: int,
random_state: int,
) -> tuple[int, list[dict[str, float]]]:
sample_count = len(scaled_matrix)
if sample_count == 0:
raise DataDrivenSubdivisionError("هیچ نمونه‌ای برای خوشه‌بندی وجود ندارد.")
if sample_count == 1:
return 1, [{"k": 1, "sse": 0.0}]
if explicit_k is not None:
if explicit_k <= 0:
raise DataDrivenSubdivisionError("cluster_count باید بزرگ‌تر از صفر باشد.")
return min(explicit_k, sample_count), []
try:
from sklearn.cluster import KMeans
except ImportError as exc: # pragma: no cover
raise DataDrivenSubdivisionError("scikit-learn برای انتخاب تعداد خوشه لازم است.") from exc
max_allowed_k = min(max_k, sample_count)
inertia_curve = []
for k in range(1, max_allowed_k + 1):
model = KMeans(n_clusters=k, n_init=10, random_state=random_state)
model.fit(scaled_matrix)
inertia_curve.append({"k": k, "sse": round(float(model.inertia_), 6)})
return detect_elbow_point(inertia_curve), inertia_curve
def run_kmeans_labels(
*,
scaled_matrix: list[list[float]],
cluster_count: int,
random_state: int,
) -> list[int]:
if cluster_count <= 0:
raise DataDrivenSubdivisionError("cluster_count باید بزرگ‌تر از صفر باشد.")
if len(scaled_matrix) == 1:
return [0]
try:
from sklearn.cluster import KMeans
except ImportError as exc: # pragma: no cover
raise DataDrivenSubdivisionError("scikit-learn برای اجرای KMeans لازم است.") from exc
model = KMeans(n_clusters=cluster_count, n_init=10, random_state=random_state)
return [int(label) for label in model.fit_predict(scaled_matrix)]
def enforce_spatial_contiguity(
*,
observations: list[AnalysisGridObservation],
labels: list[int],
scaled_matrix: list[list[float]],
) -> tuple[list[int], dict[str, Any]]:
if not observations or len(observations) <= 1:
return labels, {
"applied": False,
"strategy": "shared_edge_component_merge",
"initial_cluster_count": len(set(labels)),
"final_cluster_count": len(set(labels)),
"disconnected_components_merged": 0,
"shared_border_required": True,
}
adjacency_map = _build_shared_border_adjacency(observations)
if not adjacency_map:
return labels, {
"applied": False,
"strategy": "shared_edge_component_merge",
"initial_cluster_count": len(set(labels)),
"final_cluster_count": len(set(labels)),
"disconnected_components_merged": 0,
"shared_border_required": True,
"note": "No shared-border adjacency detected.",
}
working_labels = [int(label) for label in labels]
merged_component_count = 0
max_iterations = max(len(observations), 1)
for _iteration in range(max_iterations):
disconnected_components = _find_disconnected_label_components(
labels=working_labels,
adjacency_map=adjacency_map,
)
if not disconnected_components:
normalized_labels = _normalize_cluster_labels(working_labels)
return normalized_labels, {
"applied": merged_component_count > 0,
"strategy": "shared_edge_component_merge",
"initial_cluster_count": len(set(labels)),
"final_cluster_count": len(set(normalized_labels)),
"disconnected_components_merged": merged_component_count,
"shared_border_required": True,
}
for disconnected_component in disconnected_components:
target_label = _choose_neighbor_label_for_component(
component_indexes=disconnected_component,
labels=working_labels,
adjacency_map=adjacency_map,
scaled_matrix=scaled_matrix,
)
if target_label is None:
continue
for component_index in disconnected_component:
working_labels[component_index] = target_label
merged_component_count += 1
break
else:
raise DataDrivenSubdivisionError(
"نمی‌توان قید اتصال فضایی خوشه‌ها را تضمین کرد؛ بعضی سلول‌ها برای تشکیل بلوکِ دارای مرز مشترک قابل انتساب نبودند."
)
raise DataDrivenSubdivisionError(
"اعمال قید اتصال فضایی خوشه‌ها در تعداد تکرار مجاز همگرا نشد."
)
def build_cluster_summaries(
*,
observations: list[AnalysisGridObservation],
labels: list[int],
) -> list[dict[str, Any]]:
clusters: dict[int, dict[str, Any]] = {}
for observation, label in zip(observations, labels):
cluster = clusters.setdefault(
int(label),
{
"cluster_label": int(label),
"observations": [],
"cell_codes": [],
"centroid_lat_sum": 0.0,
"centroid_lon_sum": 0.0,
"cell_count": 0,
},
)
cluster["observations"].append(observation)
cluster["cell_codes"].append(observation.cell.cell_code)
cluster["centroid_lat_sum"] += float(observation.cell.centroid_lat)
cluster["centroid_lon_sum"] += float(observation.cell.centroid_lon)
cluster["cell_count"] += 1
summaries = []
for cluster_label in sorted(clusters):
cluster = clusters[cluster_label]
cell_count = cluster["cell_count"] or 1
center_payload = _select_cluster_center_observation(
cluster_observations=cluster["observations"],
)
summaries.append(
{
"cluster_label": cluster_label,
"cell_count": cluster["cell_count"],
"centroid_lat": round(cluster["centroid_lat_sum"] / cell_count, 6),
"centroid_lon": round(cluster["centroid_lon_sum"] / cell_count, 6),
"cell_codes": cluster["cell_codes"],
"center_cell_code": center_payload["cell_code"],
"center_cell_lat": center_payload["centroid_lat"],
"center_cell_lon": center_payload["centroid_lon"],
"center_radius": center_payload["radius"],
"center_mean_distance": center_payload["mean_distance"],
}
)
return summaries
def _select_cluster_center_observation(
*,
cluster_observations: list[AnalysisGridObservation],
) -> dict[str, Any]:
if not cluster_observations:
return {
"cell_code": "",
"centroid_lat": None,
"centroid_lon": None,
"radius": 0.0,
"mean_distance": 0.0,
}
candidate_payloads: list[dict[str, Any]] = []
for candidate in cluster_observations:
candidate_lat = float(candidate.cell.centroid_lat)
candidate_lon = float(candidate.cell.centroid_lon)
distances = [
_euclidean_distance(
[candidate_lon, candidate_lat],
[float(member.cell.centroid_lon), float(member.cell.centroid_lat)],
)
for member in cluster_observations
]
radius = max(distances) if distances else 0.0
mean_distance = sum(distances) / len(distances) if distances else 0.0
candidate_payloads.append(
{
"cell_code": candidate.cell.cell_code,
"centroid_lat": round(candidate_lat, 6),
"centroid_lon": round(candidate_lon, 6),
"radius": round(radius, 8),
"mean_distance": round(mean_distance, 8),
}
)
return min(
candidate_payloads,
key=lambda payload: (
float(payload["radius"]),
float(payload["mean_distance"]),
str(payload["cell_code"]),
),
)
def build_subdivision_option_payloads(
*,
dataset: ClusteringDataset,
max_k: int,
random_state: int,
) -> list[SubdivisionOptionPayload]:
sample_count = len(dataset.observations)
if sample_count == 0:
return []
max_allowed_k = min(max_k, sample_count)
option_payloads: list[SubdivisionOptionPayload] = []
for requested_k in range(1, max_allowed_k + 1):
labels = run_kmeans_labels(
scaled_matrix=dataset.scaled_matrix,
cluster_count=requested_k,
random_state=random_state,
)
labels, spatial_constraint_metadata = enforce_spatial_contiguity(
observations=dataset.observations,
labels=labels,
scaled_matrix=dataset.scaled_matrix,
)
effective_cluster_count = len(set(labels))
cluster_summaries = build_cluster_summaries(
observations=dataset.observations,
labels=labels,
)
cluster_block_rows = build_cluster_block_rows(
observations=dataset.observations,
labels=labels,
cluster_summaries=cluster_summaries,
)
assignment_rows = [
{
"cell": observation.cell,
"cluster_label": int(labels[index]),
"raw_feature_values": dataset.raw_feature_maps[index],
"scaled_feature_values": {
feature_name: round(dataset.scaled_matrix[index][feature_index], 6)
for feature_index, feature_name in enumerate(dataset.selected_features)
},
}
for index, observation in enumerate(dataset.observations)
]
option_payloads.append(
SubdivisionOptionPayload(
requested_k=requested_k,
effective_cluster_count=effective_cluster_count,
labels=labels,
cluster_summaries=cluster_summaries,
spatial_constraint_metadata=spatial_constraint_metadata,
assignment_rows=assignment_rows,
cluster_block_rows=cluster_block_rows,
)
)
return option_payloads
def build_cluster_block_rows(
*,
observations: list[AnalysisGridObservation],
labels: list[int],
cluster_summaries: list[dict[str, Any]],
) -> list[dict[str, Any]]:
observations_by_label: dict[int, list[AnalysisGridObservation]] = {}
for observation, label in zip(observations, labels):
observations_by_label.setdefault(int(label), []).append(observation)
cluster_block_rows: list[dict[str, Any]] = []
for cluster_summary in cluster_summaries:
cluster_label = int(cluster_summary["cluster_label"])
cluster_observations = observations_by_label.get(cluster_label, [])
cluster_geometry = _build_cluster_geometry(cluster_observations)
cluster_metadata = {
"cell_geometry_type": cluster_geometry.get("type"),
"source": "analysis_grid_cells",
}
cluster_block_rows.append(
{
"cluster_label": cluster_label,
"sub_block_code": f"cluster-{cluster_label}",
"centroid_lat": Decimal(str(cluster_summary["centroid_lat"])),
"centroid_lon": Decimal(str(cluster_summary["centroid_lon"])),
"center_cell_code": str(cluster_summary.get("center_cell_code") or ""),
"center_cell_lat": _to_decimal_or_none(cluster_summary.get("center_cell_lat")),
"center_cell_lon": _to_decimal_or_none(cluster_summary.get("center_cell_lon")),
"geometry": cluster_geometry,
"cell_count": int(cluster_summary["cell_count"]),
"cell_codes": list(cluster_summary["cell_codes"]),
"metadata": {
**cluster_metadata,
"center_selection": {
"strategy": "coordinate_1_center",
"center_cell_code": cluster_summary.get("center_cell_code") or "",
"center_radius": cluster_summary.get("center_radius"),
"center_mean_distance": cluster_summary.get("center_mean_distance"),
},
},
}
)
return cluster_block_rows
def persist_subdivision_options(
*,
result: RemoteSensingSubdivisionResult,
location: SoilLocation,
block_subdivision: BlockSubdivision | None,
option_payloads: list[SubdivisionOptionPayload],
recommended_requested_k: int,
active_requested_k: int,
chunk_size_sqm: int,
) -> dict[int, RemoteSensingSubdivisionOption]:
option_objects: dict[int, RemoteSensingSubdivisionOption] = {}
for option_payload in option_payloads:
option = RemoteSensingSubdivisionOption.objects.create(
result=result,
requested_k=option_payload.requested_k,
effective_cluster_count=option_payload.effective_cluster_count,
is_active=option_payload.requested_k == active_requested_k,
is_recommended=option_payload.requested_k == recommended_requested_k,
selection_source="system",
metadata={
"requested_k": option_payload.requested_k,
"effective_cluster_count": option_payload.effective_cluster_count,
"spatial_constraint": option_payload.spatial_constraint_metadata,
"cluster_summaries": option_payload.cluster_summaries,
},
)
RemoteSensingSubdivisionOptionAssignment.objects.bulk_create(
[
RemoteSensingSubdivisionOptionAssignment(
option=option,
cell=assignment_row["cell"],
cluster_label=assignment_row["cluster_label"],
raw_feature_values=assignment_row["raw_feature_values"],
scaled_feature_values=assignment_row["scaled_feature_values"],
)
for assignment_row in option_payload.assignment_rows
]
)
option_block_objects = []
for block_row in option_payload.cluster_block_rows:
option_block_objects.append(
RemoteSensingSubdivisionOptionBlock(
option=option,
cluster_label=block_row["cluster_label"],
sub_block_code=block_row["sub_block_code"],
chunk_size_sqm=chunk_size_sqm,
centroid_lat=block_row["centroid_lat"],
centroid_lon=block_row["centroid_lon"],
center_cell_code=block_row["center_cell_code"],
center_cell_lat=block_row["center_cell_lat"],
center_cell_lon=block_row["center_cell_lon"],
geometry=block_row["geometry"],
cell_count=block_row["cell_count"],
cell_codes=block_row["cell_codes"],
metadata=block_row["metadata"],
)
)
RemoteSensingSubdivisionOptionBlock.objects.bulk_create(option_block_objects)
option_objects[option.requested_k] = option
return option_objects
def persist_subdivision_option_artifacts(
*,
result: RemoteSensingSubdivisionResult,
option_payloads: list[SubdivisionOptionPayload],
option_objects: dict[int, RemoteSensingSubdivisionOption],
observations: list[AnalysisGridObservation],
selected_features: list[str],
scaled_matrix: list[list[float]],
inertia_curve: list[dict[str, float]],
) -> None:
for option_payload in option_payloads:
option = option_objects.get(option_payload.requested_k)
if option is None:
continue
diagnostic_artifacts = _persist_remote_sensing_diagnostic_artifacts(
result=result,
observations=observations,
labels=option_payload.labels,
cluster_summaries=option_payload.cluster_summaries,
selected_features=selected_features,
scaled_matrix=scaled_matrix,
inertia_curve=inertia_curve,
requested_k=option_payload.requested_k,
effective_cluster_count=option_payload.effective_cluster_count,
)
if not diagnostic_artifacts:
continue
metadata = dict(option.metadata or {})
metadata["diagnostic_artifacts"] = diagnostic_artifacts
option.metadata = metadata
option.save(update_fields=["metadata", "updated_at"])
def activate_subdivision_option(
*,
option: RemoteSensingSubdivisionOption,
selection_source: str,
recommended_requested_k: int | None = None,
) -> RemoteSensingSubdivisionResult:
result = option.result
requested_k = int(option.requested_k)
if recommended_requested_k is None:
recommended_requested_k = (
result.options.filter(is_recommended=True)
.values_list("requested_k", flat=True)
.first()
)
result.options.exclude(pk=option.pk).update(is_active=False)
option.is_active = True
option.selection_source = selection_source
option.save(update_fields=["is_active", "selection_source", "updated_at"])
assignments = list(
option.assignments.select_related("cell").order_by("cell__cell_code")
)
result.assignments.all().delete()
RemoteSensingClusterAssignment.objects.bulk_create(
[
RemoteSensingClusterAssignment(
result=result,
cell=assignment.cell,
cluster_label=assignment.cluster_label,
raw_feature_values=assignment.raw_feature_values,
scaled_feature_values=assignment.scaled_feature_values,
)
for assignment in assignments
]
)
result.cluster_blocks.all().delete()
cluster_block_objects = []
cluster_summaries = []
for option_block in option.cluster_blocks.order_by("cluster_label", "id"):
cluster_block = RemoteSensingClusterBlock.objects.create(
result=result,
soil_location=result.soil_location,
block_subdivision=result.block_subdivision,
block_code=result.block_code,
sub_block_code=option_block.sub_block_code,
cluster_label=option_block.cluster_label,
chunk_size_sqm=option_block.chunk_size_sqm,
centroid_lat=option_block.centroid_lat,
centroid_lon=option_block.centroid_lon,
center_cell_code=option_block.center_cell_code,
center_cell_lat=option_block.center_cell_lat,
center_cell_lon=option_block.center_cell_lon,
geometry=option_block.geometry,
cell_count=option_block.cell_count,
cell_codes=option_block.cell_codes,
metadata=option_block.metadata,
)
cluster_block_objects.append(cluster_block)
cluster_summaries.append(
{
"cluster_uuid": str(cluster_block.uuid),
"cluster_label": option_block.cluster_label,
"sub_block_code": option_block.sub_block_code,
"centroid_lat": float(option_block.centroid_lat),
"centroid_lon": float(option_block.centroid_lon),
"center_cell_code": option_block.center_cell_code,
"center_cell_lat": float(option_block.center_cell_lat) if option_block.center_cell_lat is not None else None,
"center_cell_lon": float(option_block.center_cell_lon) if option_block.center_cell_lon is not None else None,
"center_radius": (option_block.metadata or {}).get("center_selection", {}).get("center_radius"),
"center_mean_distance": (option_block.metadata or {}).get("center_selection", {}).get("center_mean_distance"),
"cell_count": option_block.cell_count,
"cell_codes": list(option_block.cell_codes or []),
"geometry": option_block.geometry,
"metadata": option_block.metadata,
}
)
metadata = dict(result.metadata or {})
kmeans_params = dict(metadata.get("kmeans_params") or {})
kmeans_params["active_requested_k"] = requested_k
kmeans_params["effective_k"] = option.effective_cluster_count
if recommended_requested_k is not None:
kmeans_params["recommended_k"] = recommended_requested_k
metadata["kmeans_params"] = kmeans_params
metadata["active_requested_k"] = requested_k
metadata["recommended_requested_k"] = recommended_requested_k
metadata["cluster_summaries"] = cluster_summaries
metadata["active_option"] = {
"requested_k": requested_k,
"effective_cluster_count": option.effective_cluster_count,
"selection_source": selection_source,
}
metadata["available_k_options"] = [
{
"requested_k": subdivision_option.requested_k,
"effective_cluster_count": subdivision_option.effective_cluster_count,
"is_active": subdivision_option.pk == option.pk,
"is_recommended": subdivision_option.is_recommended,
"selection_source": option.selection_source if subdivision_option.pk == option.pk else subdivision_option.selection_source,
"diagnostic_artifacts": (subdivision_option.metadata or {}).get("diagnostic_artifacts", {}),
}
for subdivision_option in result.options.order_by("requested_k")
]
result.cluster_count = option.effective_cluster_count
result.metadata = metadata
result.save(update_fields=["cluster_count", "metadata", "updated_at"])
if result.block_subdivision is not None:
sync_block_subdivision_with_result(
block_subdivision=result.block_subdivision,
result=result,
observations=assignments,
cluster_summaries=cluster_summaries,
)
sync_location_block_layout_with_result(
location=result.soil_location,
result=result,
cluster_summaries=cluster_summaries,
)
return result
def sync_cluster_blocks_with_result(
*,
result: RemoteSensingSubdivisionResult,
location: SoilLocation,
block_subdivision: BlockSubdivision | None,
observations: list[AnalysisGridObservation],
labels: list[int],
cluster_summaries: list[dict[str, Any]],
) -> list[RemoteSensingClusterBlock]:
observations_by_label: dict[int, list[AnalysisGridObservation]] = {}
for observation, label in zip(observations, labels):
observations_by_label.setdefault(int(label), []).append(observation)
existing_blocks = {
cluster_block.cluster_label: cluster_block
for cluster_block in result.cluster_blocks.all()
}
active_labels: set[int] = set()
synced_blocks: list[RemoteSensingClusterBlock] = []
for cluster_summary in cluster_summaries:
cluster_label = int(cluster_summary["cluster_label"])
active_labels.add(cluster_label)
cluster_observations = observations_by_label.get(cluster_label, [])
cluster_geometry = _build_cluster_geometry(cluster_observations)
cluster_metadata = {
"cell_geometry_type": cluster_geometry.get("type"),
"source": "analysis_grid_cells",
}
cluster_block = existing_blocks.get(cluster_label)
defaults = {
"soil_location": location,
"block_subdivision": block_subdivision,
"block_code": result.block_code,
"sub_block_code": f"cluster-{cluster_label}",
"chunk_size_sqm": result.chunk_size_sqm,
"centroid_lat": Decimal(str(cluster_summary["centroid_lat"])),
"centroid_lon": Decimal(str(cluster_summary["centroid_lon"])),
"center_cell_code": str(cluster_summary.get("center_cell_code") or ""),
"center_cell_lat": _to_decimal_or_none(cluster_summary.get("center_cell_lat")),
"center_cell_lon": _to_decimal_or_none(cluster_summary.get("center_cell_lon")),
"geometry": cluster_geometry,
"cell_count": int(cluster_summary["cell_count"]),
"cell_codes": list(cluster_summary["cell_codes"]),
"metadata": {
**cluster_metadata,
"center_selection": {
"strategy": "coordinate_1_center",
"center_cell_code": cluster_summary.get("center_cell_code") or "",
"center_radius": cluster_summary.get("center_radius"),
"center_mean_distance": cluster_summary.get("center_mean_distance"),
},
},
}
if cluster_block is None:
cluster_block = RemoteSensingClusterBlock.objects.create(
result=result,
cluster_label=cluster_label,
**defaults,
)
else:
for field_name, value in defaults.items():
setattr(cluster_block, field_name, value)
cluster_block.save(
update_fields=[
"soil_location",
"block_subdivision",
"block_code",
"sub_block_code",
"chunk_size_sqm",
"centroid_lat",
"centroid_lon",
"center_cell_code",
"center_cell_lat",
"center_cell_lon",
"geometry",
"cell_count",
"cell_codes",
"metadata",
"updated_at",
]
)
cluster_summary["cluster_uuid"] = str(cluster_block.uuid)
cluster_summary["geometry"] = cluster_block.geometry
cluster_summary["metadata"] = cluster_block.metadata
synced_blocks.append(cluster_block)
stale_labels = set(existing_blocks) - active_labels
if stale_labels:
result.cluster_blocks.filter(cluster_label__in=stale_labels).delete()
return synced_blocks
def _build_cluster_geometry(
observations: list[AnalysisGridObservation],
) -> dict[str, Any]:
rings = _build_cluster_boundary_rings(observations)
if not rings:
return {}
if len(rings) == 1:
return {"type": "Polygon", "coordinates": [rings[0]]}
outer_ring_indexes = []
hole_mapping: dict[int, list[list[list[float]]]] = {}
for ring_index, ring in enumerate(rings):
sample_point = ring[0]
parent_indexes = [
candidate_index
for candidate_index, candidate_ring in enumerate(rings)
if candidate_index != ring_index and point_in_polygon(
(sample_point[0], sample_point[1]),
[(point[0], point[1]) for point in candidate_ring[:-1]],
)
]
if not parent_indexes:
outer_ring_indexes.append(ring_index)
continue
parent_index = min(
parent_indexes,
key=lambda candidate_index: abs(_signed_ring_area(rings[candidate_index])),
)
hole_mapping.setdefault(parent_index, []).append(ring)
if len(outer_ring_indexes) == 1:
outer_index = outer_ring_indexes[0]
return {
"type": "Polygon",
"coordinates": [rings[outer_index], *hole_mapping.get(outer_index, [])],
}
return {
"type": "MultiPolygon",
"coordinates": [
[rings[outer_index], *hole_mapping.get(outer_index, [])]
for outer_index in outer_ring_indexes
],
}
def _build_shared_border_adjacency(
observations: list[AnalysisGridObservation],
) -> dict[int, set[int]]:
adjacency_map: dict[int, set[int]] = {index: set() for index in range(len(observations))}
shared_edge_map: dict[tuple[tuple[float, float], tuple[float, float]], list[int]] = {}
for index, observation in enumerate(observations):
for edge_key in _extract_cell_shared_edge_keys(observation):
shared_edge_map.setdefault(edge_key, []).append(index)
for cell_indexes in shared_edge_map.values():
if len(cell_indexes) != 2:
continue
left_index, right_index = cell_indexes
adjacency_map[left_index].add(right_index)
adjacency_map[right_index].add(left_index)
return adjacency_map
def _extract_cell_shared_edge_keys(
observation: AnalysisGridObservation,
) -> set[tuple[tuple[float, float], tuple[float, float]]]:
geometry = dict(getattr(observation.cell, "geometry", {}) or {})
coordinates = geometry.get("coordinates") or []
polygons = []
if geometry.get("type") == "Polygon" and coordinates:
polygons = [coordinates]
elif geometry.get("type") == "MultiPolygon" and coordinates:
polygons = coordinates
edge_keys: set[tuple[tuple[float, float], tuple[float, float]]] = set()
for polygon in polygons:
outer_ring = polygon[0] if polygon else []
normalized_ring = [
(float(point[0]), float(point[1]))
for point in outer_ring
if len(point) >= 2
]
if len(normalized_ring) < 4:
continue
if normalized_ring[0] != normalized_ring[-1]:
normalized_ring.append(normalized_ring[0])
for start_point, end_point in zip(normalized_ring, normalized_ring[1:]):
if start_point == end_point:
continue
edge_keys.add(tuple(sorted((start_point, end_point))))
return edge_keys
def _find_disconnected_label_components(
*,
labels: list[int],
adjacency_map: dict[int, set[int]],
) -> list[list[int]]:
components_to_merge: list[list[int]] = []
for cluster_label in sorted(set(labels)):
label_indexes = [index for index, label in enumerate(labels) if int(label) == cluster_label]
if len(label_indexes) <= 1:
continue
label_index_set = set(label_indexes)
visited: set[int] = set()
connected_components: list[list[int]] = []
for start_index in label_indexes:
if start_index in visited:
continue
component = []
queue = [start_index]
visited.add(start_index)
while queue:
current_index = queue.pop()
component.append(current_index)
for neighbor_index in adjacency_map.get(current_index, set()):
if neighbor_index in visited or neighbor_index not in label_index_set:
continue
visited.add(neighbor_index)
queue.append(neighbor_index)
connected_components.append(sorted(component))
if len(connected_components) <= 1:
continue
connected_components.sort(key=lambda component: (-len(component), component[0]))
components_to_merge.extend(connected_components[1:])
return components_to_merge
def _choose_neighbor_label_for_component(
*,
component_indexes: list[int],
labels: list[int],
adjacency_map: dict[int, set[int]],
scaled_matrix: list[list[float]],
) -> int | None:
component_centroid = _mean_vector([scaled_matrix[index] for index in component_indexes])
candidate_scores: dict[int, float] = {}
for component_index in component_indexes:
for neighbor_index in adjacency_map.get(component_index, set()):
neighbor_label = int(labels[neighbor_index])
if neighbor_label == int(labels[component_index]):
continue
candidate_indexes = [
index
for index, label in enumerate(labels)
if int(label) == neighbor_label
]
if not candidate_indexes:
continue
candidate_centroid = _mean_vector([scaled_matrix[index] for index in candidate_indexes])
distance = _euclidean_distance(component_centroid, candidate_centroid)
best_distance = candidate_scores.get(neighbor_label)
if best_distance is None or distance < best_distance:
candidate_scores[neighbor_label] = distance
if not candidate_scores:
return None
return min(candidate_scores.items(), key=lambda item: (item[1], item[0]))[0]
def _normalize_cluster_labels(labels: list[int]) -> list[int]:
label_mapping: dict[int, int] = {}
normalized_labels: list[int] = []
next_label = 0
for label in labels:
label = int(label)
if label not in label_mapping:
label_mapping[label] = next_label
next_label += 1
normalized_labels.append(label_mapping[label])
return normalized_labels
def _mean_vector(vectors: list[list[float]]) -> list[float]:
if not vectors:
return []
dimensions = len(vectors[0])
return [
sum(vector[dimension_index] for vector in vectors) / len(vectors)
for dimension_index in range(dimensions)
]
def _euclidean_distance(left: list[float], right: list[float]) -> float:
return math.sqrt(
sum((float(left[index]) - float(right[index])) ** 2 for index in range(len(left)))
)
def _build_cluster_boundary_rings(
observations: list[AnalysisGridObservation],
) -> list[list[list[float]]]:
directed_edges: dict[tuple[tuple[float, float], tuple[float, float]], int] = {}
undirected_counts: dict[tuple[tuple[float, float], tuple[float, float]], int] = {}
point_lookup: dict[tuple[float, float], list[tuple[float, float]]] = {}
for observation in observations:
geometry = dict(getattr(observation.cell, "geometry", {}) or {})
coordinates = geometry.get("coordinates") or []
polygons = []
if geometry.get("type") == "Polygon" and coordinates:
polygons = [coordinates]
elif geometry.get("type") == "MultiPolygon" and coordinates:
polygons = coordinates
for polygon in polygons:
outer_ring = polygon[0] if polygon else []
normalized_ring = [
(float(point[0]), float(point[1]))
for point in outer_ring
if len(point) >= 2
]
if len(normalized_ring) < 4:
continue
if normalized_ring[0] != normalized_ring[-1]:
normalized_ring.append(normalized_ring[0])
for start_point, end_point in zip(normalized_ring, normalized_ring[1:]):
if start_point == end_point:
continue
directed_edges[(start_point, end_point)] = directed_edges.get((start_point, end_point), 0) + 1
undirected_key = tuple(sorted((start_point, end_point)))
undirected_counts[undirected_key] = undirected_counts.get(undirected_key, 0) + 1
boundary_edges = [
(start_point, end_point)
for (start_point, end_point), _count in directed_edges.items()
if undirected_counts.get(tuple(sorted((start_point, end_point))), 0) == 1
]
if not boundary_edges:
return []
for start_point, end_point in boundary_edges:
point_lookup.setdefault(start_point, []).append(end_point)
rings: list[list[list[float]]] = []
while point_lookup:
start_point = next(iter(point_lookup))
current_point = start_point
ring = [start_point]
visited_guard = 0
while visited_guard <= len(boundary_edges) + 1:
next_points = point_lookup.get(current_point) or []
if not next_points:
break
next_point = next_points.pop(0)
if not next_points:
point_lookup.pop(current_point, None)
current_point = next_point
ring.append(current_point)
if current_point == start_point:
break
visited_guard += 1
if len(ring) >= 4 and ring[0] == ring[-1]:
signed_area = _signed_ring_area(ring)
if signed_area < 0:
ring = [ring[0], *list(reversed(ring[1:-1])), ring[0]]
rings.append([[point[0], point[1]] for point in ring])
return rings
def _signed_ring_area(ring: list[list[float]] | list[tuple[float, float]]) -> float:
area = 0.0
for current_point, next_point in zip(ring, ring[1:]):
area += (float(current_point[0]) * float(next_point[1])) - (float(next_point[0]) * float(current_point[1]))
return area / 2.0
def sync_location_block_layout_with_result(
*,
location: SoilLocation,
result: RemoteSensingSubdivisionResult,
cluster_summaries: list[dict[str, Any]],
) -> None:
layout = dict(location.block_layout or {})
blocks = list(layout.get("blocks") or [])
target_block = None
for block in blocks:
if block.get("block_code") == result.block_code:
target_block = block
break
if target_block is None:
target_block = {
"block_code": result.block_code,
"order": len(blocks) + 1,
"source": "remote_sensing",
"needs_subdivision": None,
"sub_blocks": [],
}
blocks.append(target_block)
target_block["needs_subdivision"] = result.cluster_count > 1
target_block["sub_blocks"] = [
{
"cluster_uuid": cluster.get("cluster_uuid"),
"sub_block_code": f"cluster-{cluster['cluster_label']}",
"cluster_label": cluster["cluster_label"],
"centroid_lat": cluster["centroid_lat"],
"centroid_lon": cluster["centroid_lon"],
"center_cell_code": cluster.get("center_cell_code") or "",
"center_cell_lat": cluster.get("center_cell_lat"),
"center_cell_lon": cluster.get("center_cell_lon"),
"cell_count": cluster["cell_count"],
"geometry": cluster.get("geometry") or {},
"metadata": cluster.get("metadata") or {},
}
for cluster in cluster_summaries
]
target_block["subdivision_summary"] = {
"type": "data_driven_remote_sensing",
"cluster_count": result.cluster_count,
"selected_features": result.selected_features,
"used_cell_count": result.metadata.get("used_cell_count", 0),
"skipped_cell_count": result.metadata.get("skipped_cell_count", 0),
"run_id": result.run_id,
}
layout["blocks"] = blocks
layout["algorithm_status"] = "completed"
location.block_layout = layout
location.save(update_fields=["block_layout", "updated_at"])
def sync_block_subdivision_with_result(
*,
block_subdivision: BlockSubdivision,
result: RemoteSensingSubdivisionResult,
observations: list[AnalysisGridObservation],
cluster_summaries: list[dict[str, Any]],
) -> None:
metadata = dict(block_subdivision.metadata or {})
metadata["data_driven_subdivision"] = {
"run_id": result.run_id,
"result_id": result.id,
"cluster_count": result.cluster_count,
"selected_features": result.selected_features,
"used_cell_count": result.metadata.get("used_cell_count", 0),
"skipped_cell_count": result.metadata.get("skipped_cell_count", 0),
"temporal_extent": {
"start_date": result.temporal_start.isoformat() if result.temporal_start else None,
"end_date": result.temporal_end.isoformat() if result.temporal_end else None,
},
"inertia_curve": result.metadata.get("inertia_curve", []),
"diagnostic_artifacts": result.metadata.get("diagnostic_artifacts", {}),
}
block_subdivision.grid_points = [
{
"cell_code": observation.cell.cell_code,
"centroid_lat": round(float(observation.cell.centroid_lat), 6),
"centroid_lon": round(float(observation.cell.centroid_lon), 6),
}
for observation in observations
]
block_subdivision.centroid_points = [
{
"cluster_uuid": cluster.get("cluster_uuid"),
"sub_block_code": f"cluster-{cluster['cluster_label']}",
"cluster_label": cluster["cluster_label"],
"centroid_lat": cluster["centroid_lat"],
"centroid_lon": cluster["centroid_lon"],
"center_cell_code": cluster.get("center_cell_code") or "",
"center_cell_lat": cluster.get("center_cell_lat"),
"center_cell_lon": cluster.get("center_cell_lon"),
"cell_count": cluster["cell_count"],
"cell_codes": cluster["cell_codes"],
"geometry": cluster.get("geometry") or {},
"metadata": cluster.get("metadata") or {},
}
for cluster in cluster_summaries
]
block_subdivision.grid_point_count = len(observations)
block_subdivision.centroid_count = len(cluster_summaries)
block_subdivision.status = "subdivided"
block_subdivision.metadata = metadata
plot_content = render_elbow_plot(
inertia_curve=result.metadata.get("inertia_curve", []),
optimal_k=result.cluster_count,
block_code=result.block_code or block_subdivision.block_code,
)
if plot_content is not None:
block_subdivision.elbow_plot.save(
f"remote-sensing-{result.soil_location_id}-{result.block_code or block_subdivision.block_code}-elbow.png",
plot_content,
save=False,
)
block_subdivision.save(
update_fields=[
"grid_points",
"centroid_points",
"grid_point_count",
"centroid_count",
"status",
"metadata",
"elbow_plot",
"updated_at",
]
)
return
block_subdivision.save(
update_fields=[
"grid_points",
"centroid_points",
"grid_point_count",
"centroid_count",
"status",
"metadata",
"updated_at",
]
)
def _coerce_float(value: Any) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _to_decimal_or_none(value: Any) -> Decimal | None:
if value is None:
return None
try:
return Decimal(str(value))
except (ArithmeticError, ValueError):
return None
def _count_non_null_features(observations: list[AnalysisGridObservation]) -> dict[str, int]:
counts = {feature_name: 0 for feature_name in DEFAULT_CLUSTER_FEATURES}
for observation in observations:
for feature_name in DEFAULT_CLUSTER_FEATURES:
if _coerce_float(getattr(observation, feature_name, None)) is not None:
counts[feature_name] += 1
return counts
def _persist_remote_sensing_diagnostic_artifacts(
*,
result: RemoteSensingSubdivisionResult,
observations: list[AnalysisGridObservation],
labels: list[int],
cluster_summaries: list[dict[str, Any]],
selected_features: list[str],
scaled_matrix: list[list[float]],
inertia_curve: list[dict[str, float]],
requested_k: int | None = None,
effective_cluster_count: int | None = None,
) -> dict[str, Any]:
try:
artifact_dir = _build_remote_sensing_diagnostic_dir(
result=result,
requested_k=requested_k,
effective_cluster_count=effective_cluster_count,
)
artifact_dir.mkdir(parents=True, exist_ok=True)
specs = [
(
"elbow_plot",
render_elbow_plot(
inertia_curve=inertia_curve,
optimal_k=result.cluster_count,
block_code=result.block_code or "farm",
),
"elbow",
),
(
"cluster_map",
_render_cluster_map_plot(
observations=observations,
labels=labels,
cluster_summaries=cluster_summaries,
block_code=result.block_code or "farm",
),
"cluster-map",
),
(
"cluster_sizes",
_render_cluster_size_plot(
observations=observations,
cluster_summaries=cluster_summaries,
block_code=result.block_code or "farm",
),
"cluster-sizes",
),
(
"feature_pairs",
_render_feature_pair_plot(
observations=observations,
selected_features=selected_features,
scaled_matrix=scaled_matrix,
labels=labels,
cluster_summaries=cluster_summaries,
block_code=result.block_code or "farm",
),
"feature-pairs",
),
(
"feature_projection",
_render_feature_projection_plot(
observations=observations,
selected_features=selected_features,
scaled_matrix=scaled_matrix,
labels=labels,
cluster_summaries=cluster_summaries,
block_code=result.block_code or "farm",
),
"feature-projection",
),
]
files: dict[str, str] = {}
for artifact_key, content, suffix in specs:
if content is None:
continue
target_path = artifact_dir / (
f"{_build_remote_sensing_artifact_stem(result=result, requested_k=requested_k, effective_cluster_count=effective_cluster_count)}"
f"__{suffix}.png"
)
_write_content_file(target_path=target_path, content=content)
files[artifact_key] = _to_project_relative_path(target_path)
return {
"directory": _to_project_relative_path(artifact_dir),
"files": files,
"requested_k": requested_k,
"effective_cluster_count": effective_cluster_count,
}
except (DataDrivenSubdivisionError, OSError) as exc:
logger.warning(
"Failed to persist remote sensing diagnostic artifacts for result_id=%s: %s",
result.id,
exc,
)
return {}
def _build_remote_sensing_diagnostic_dir(
*,
result: RemoteSensingSubdivisionResult,
requested_k: int | None = None,
effective_cluster_count: int | None = None,
) -> Path:
configured_dir = str(
os.environ.get("REMOTE_SENSING_DIAGNOSTIC_DIR", DEFAULT_REMOTE_SENSING_DIAGNOSTIC_DIR)
).strip()
base_dir = Path(getattr(settings, "BASE_DIR", Path.cwd()))
target_dir = Path(configured_dir)
if not target_dir.is_absolute():
target_dir = base_dir / target_dir
block_component = _sanitize_path_component(result.block_code or "farm")
diagnostic_dir = target_dir / f"location-{result.soil_location_id}" / f"run-{result.run_id}-{block_component}"
if requested_k is not None:
effective_component = effective_cluster_count if effective_cluster_count is not None else requested_k
diagnostic_dir = diagnostic_dir / f"k-{requested_k}-effective-{effective_component}"
return diagnostic_dir
def _build_remote_sensing_artifact_stem(
*,
result: RemoteSensingSubdivisionResult,
requested_k: int | None = None,
effective_cluster_count: int | None = None,
) -> str:
stem = (
f"location-{result.soil_location_id}"
f"__run-{result.run_id}"
f"__{_sanitize_path_component(result.block_code or 'farm')}"
)
if requested_k is not None:
effective_component = effective_cluster_count if effective_cluster_count is not None else requested_k
stem = f"{stem}__k-{requested_k}__effective-{effective_component}"
return stem
def _write_content_file(*, target_path: Path, content: ContentFile) -> None:
target_path.parent.mkdir(parents=True, exist_ok=True)
content.open("rb")
try:
target_path.write_bytes(content.read())
finally:
content.close()
def _to_project_relative_path(path: Path) -> str:
base_dir = Path(getattr(settings, "BASE_DIR", Path.cwd()))
try:
return str(path.relative_to(base_dir))
except ValueError:
return str(path)
def _sanitize_path_component(value: str) -> str:
text = str(value or "").strip() or "unknown"
sanitized = "".join(character if character.isalnum() or character in {"-", "_", "."} else "_" for character in text)
return sanitized or "unknown"
def _render_cluster_map_plot(
*,
observations: list[AnalysisGridObservation],
labels: list[int],
cluster_summaries: list[dict[str, Any]],
block_code: str,
) -> ContentFile | None:
if not observations:
return None
plt = _import_matplotlib_pyplot()
unique_labels = sorted(set(int(label) for label in labels))
colors = plt.cm.get_cmap("tab10", max(len(unique_labels), 1))
center_indexes_by_label = _build_center_indexes_by_label(
observations=observations,
labels=labels,
cluster_summaries=cluster_summaries,
)
fig, ax = plt.subplots(figsize=(8, 6))
buffer = BytesIO()
try:
for color_index, cluster_label in enumerate(unique_labels):
cluster_points = [
(
float(observation.cell.centroid_lon),
float(observation.cell.centroid_lat),
_build_observation_label(observation=observation, index=index),
)
for index, (observation, label) in enumerate(zip(observations, labels))
if int(label) == cluster_label
]
if not cluster_points:
continue
xs = [point[0] for point in cluster_points]
ys = [point[1] for point in cluster_points]
ax.scatter(
xs,
ys,
s=70,
alpha=0.9,
color=colors(color_index),
edgecolors="white",
linewidths=0.8,
label=f"Cluster {cluster_label}",
)
_annotate_plot_points(
axis=ax,
x_values=xs,
y_values=ys,
point_labels=[point[2] for point in cluster_points],
)
center_index = center_indexes_by_label.get(cluster_label)
if center_index is not None:
_plot_cluster_center_marker(
axis=ax,
x_value=float(observations[center_index].cell.centroid_lon),
y_value=float(observations[center_index].cell.centroid_lat),
point_label=_build_center_label(
observations=observations,
cluster_summaries=cluster_summaries,
cluster_label=cluster_label,
),
color=colors(color_index),
)
ax.set_title(f"KMeans Spatial Cluster Map - {block_code}")
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
if unique_labels:
ax.legend()
fig.tight_layout()
fig.savefig(buffer, format="png", dpi=150)
buffer.seek(0)
return ContentFile(buffer.getvalue())
finally:
buffer.close()
plt.close(fig)
def _render_cluster_size_plot(
*,
observations: list[AnalysisGridObservation],
cluster_summaries: list[dict[str, Any]],
block_code: str,
) -> ContentFile | None:
if not cluster_summaries:
return None
plt = _import_matplotlib_pyplot()
labels = [f"C{int(cluster['cluster_label'])}" for cluster in cluster_summaries]
counts = [int(cluster["cell_count"]) for cluster in cluster_summaries]
fig, ax = plt.subplots(figsize=(8, 5))
buffer = BytesIO()
try:
bars = ax.bar(labels, counts, color="#2f6fed", alpha=0.85)
point_numbers_by_cell_code = {
str(observation.cell.cell_code): index + 1
for index, observation in enumerate(observations)
}
for bar, count in zip(bars, counts):
ax.text(
bar.get_x() + bar.get_width() / 2.0,
bar.get_height(),
str(count),
ha="center",
va="bottom",
fontsize=9,
)
for bar, cluster_summary in zip(bars, cluster_summaries):
center_cell_code = str(cluster_summary.get("center_cell_code") or "").strip()
if center_cell_code:
center_point_number = point_numbers_by_cell_code.get(center_cell_code)
center_text = f"center: {center_point_number}" if center_point_number is not None else "center"
ax.text(
bar.get_x() + bar.get_width() / 2.0,
bar.get_height() / 2.0 if bar.get_height() else 0.05,
center_text,
ha="center",
va="center",
fontsize=8,
color="#16325c",
rotation=90,
)
ax.set_title(f"Cluster Sizes - {block_code}")
ax.set_xlabel("Cluster")
ax.set_ylabel("Cell Count")
ax.grid(True, axis="y", linestyle="--", linewidth=0.5, alpha=0.4)
fig.tight_layout()
fig.savefig(buffer, format="png", dpi=150)
buffer.seek(0)
return ContentFile(buffer.getvalue())
finally:
buffer.close()
plt.close(fig)
def _render_feature_pair_plot(
*,
observations: list[AnalysisGridObservation],
selected_features: list[str],
scaled_matrix: list[list[float]],
labels: list[int],
cluster_summaries: list[dict[str, Any]],
block_code: str,
) -> ContentFile | None:
if not scaled_matrix or not selected_features:
return None
plt = _import_matplotlib_pyplot()
feature_count = len(selected_features)
pair_indexes = [(0, 0)] if feature_count == 1 else [
(left_index, right_index)
for left_index in range(feature_count)
for right_index in range(left_index + 1, feature_count)
]
subplot_count = len(pair_indexes)
columns = 2 if subplot_count > 1 else 1
rows = math.ceil(subplot_count / columns)
fig, axes = plt.subplots(rows, columns, figsize=(7 * columns, 5 * rows))
axes_list = axes.flatten().tolist() if hasattr(axes, "flatten") else [axes]
unique_labels = sorted(set(int(label) for label in labels))
colors = plt.cm.get_cmap("tab10", max(len(unique_labels), 1))
observation_labels = [
_build_observation_label(observation=observation, index=index)
for index, observation in enumerate(observations)
]
center_indexes_by_label = _build_center_indexes_by_label(
observations=observations,
labels=labels,
cluster_summaries=cluster_summaries,
)
buffer = BytesIO()
try:
for axis, (left_index, right_index) in zip(axes_list, pair_indexes):
if feature_count == 1:
xs = list(range(1, len(scaled_matrix) + 1))
ys = [row[0] for row in scaled_matrix]
for color_index, cluster_label in enumerate(unique_labels):
filtered = [
(x_value, y_value, point_label)
for x_value, y_value, label, point_label in zip(xs, ys, labels, observation_labels)
if int(label) == cluster_label
]
axis.scatter(
[item[0] for item in filtered],
[item[1] for item in filtered],
s=55,
color=colors(color_index),
alpha=0.85,
label=f"Cluster {cluster_label}",
)
_annotate_plot_points(
axis=axis,
x_values=[item[0] for item in filtered],
y_values=[item[1] for item in filtered],
point_labels=[item[2] for item in filtered],
)
center_index = center_indexes_by_label.get(cluster_label)
if center_index is not None:
_plot_cluster_center_marker(
axis=axis,
x_value=float(center_index + 1),
y_value=float(scaled_matrix[center_index][0]),
point_label=_build_center_label(
observations=observations,
cluster_summaries=cluster_summaries,
cluster_label=cluster_label,
),
color=colors(color_index),
)
axis.set_xlabel("Observation Index")
axis.set_ylabel(f"{selected_features[0]} (scaled)")
axis.set_title(f"{selected_features[0]} distribution")
else:
x_values = [row[left_index] for row in scaled_matrix]
y_values = [row[right_index] for row in scaled_matrix]
for color_index, cluster_label in enumerate(unique_labels):
filtered = [
(x_value, y_value, point_label)
for x_value, y_value, label, point_label in zip(
x_values,
y_values,
labels,
observation_labels,
)
if int(label) == cluster_label
]
axis.scatter(
[item[0] for item in filtered],
[item[1] for item in filtered],
s=55,
color=colors(color_index),
alpha=0.85,
label=f"Cluster {cluster_label}",
)
_annotate_plot_points(
axis=axis,
x_values=[item[0] for item in filtered],
y_values=[item[1] for item in filtered],
point_labels=[item[2] for item in filtered],
)
center_index = center_indexes_by_label.get(cluster_label)
if center_index is not None:
_plot_cluster_center_marker(
axis=axis,
x_value=float(scaled_matrix[center_index][left_index]),
y_value=float(scaled_matrix[center_index][right_index]),
point_label=_build_center_label(
observations=observations,
cluster_summaries=cluster_summaries,
cluster_label=cluster_label,
),
color=colors(color_index),
)
axis.set_xlabel(f"{selected_features[left_index]} (scaled)")
axis.set_ylabel(f"{selected_features[right_index]} (scaled)")
axis.set_title(
f"{selected_features[left_index]} vs {selected_features[right_index]}"
)
axis.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
for axis in axes_list[subplot_count:]:
axis.remove()
if unique_labels and axes_list:
axes_list[0].legend()
fig.suptitle(f"KMeans Feature Diagnostics - {block_code}", fontsize=14)
fig.tight_layout(rect=(0, 0, 1, 0.97))
fig.savefig(buffer, format="png", dpi=150)
buffer.seek(0)
return ContentFile(buffer.getvalue())
finally:
buffer.close()
plt.close(fig)
def _render_feature_projection_plot(
*,
observations: list[AnalysisGridObservation],
selected_features: list[str],
scaled_matrix: list[list[float]],
labels: list[int],
cluster_summaries: list[dict[str, Any]],
block_code: str,
) -> ContentFile | None:
if not scaled_matrix:
return None
plt = _import_matplotlib_pyplot()
projected_points, x_label, y_label = _project_all_features_to_2d(
scaled_matrix=scaled_matrix,
selected_features=selected_features,
)
unique_labels = sorted(set(int(label) for label in labels))
colors = plt.cm.get_cmap("tab10", max(len(unique_labels), 1))
observation_labels = [
_build_observation_label(observation=observation, index=index)
for index, observation in enumerate(observations)
]
center_indexes_by_label = _build_center_indexes_by_label(
observations=observations,
labels=labels,
cluster_summaries=cluster_summaries,
)
fig, ax = plt.subplots(figsize=(8, 6))
buffer = BytesIO()
try:
for color_index, cluster_label in enumerate(unique_labels):
filtered = [
(x_value, y_value, point_label)
for (x_value, y_value), label, point_label in zip(projected_points, labels, observation_labels)
if int(label) == cluster_label
]
if not filtered:
continue
ax.scatter(
[item[0] for item in filtered],
[item[1] for item in filtered],
s=65,
color=colors(color_index),
alpha=0.9,
edgecolors="white",
linewidths=0.8,
label=f"Cluster {cluster_label}",
)
_annotate_plot_points(
axis=ax,
x_values=[item[0] for item in filtered],
y_values=[item[1] for item in filtered],
point_labels=[item[2] for item in filtered],
)
center_index = center_indexes_by_label.get(cluster_label)
if center_index is not None and center_index < len(projected_points):
_plot_cluster_center_marker(
axis=ax,
x_value=float(projected_points[center_index][0]),
y_value=float(projected_points[center_index][1]),
point_label=_build_center_label(
observations=observations,
cluster_summaries=cluster_summaries,
cluster_label=cluster_label,
),
color=colors(color_index),
)
ax.set_title(f"KMeans All-Feature Projection - {block_code}")
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
if unique_labels:
ax.legend()
fig.tight_layout()
fig.savefig(buffer, format="png", dpi=150)
buffer.seek(0)
return ContentFile(buffer.getvalue())
finally:
buffer.close()
plt.close(fig)
def _project_all_features_to_2d(
*,
scaled_matrix: list[list[float]],
selected_features: list[str],
) -> tuple[list[tuple[float, float]], str, str]:
if not scaled_matrix:
return [], "Projection Axis 1", "Projection Axis 2"
matrix = [[float(value) for value in row] for row in scaled_matrix]
row_count = len(matrix)
column_count = len(matrix[0]) if matrix and matrix[0] else 0
if row_count >= 2 and column_count >= 2:
try:
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
projected_matrix = pca.fit_transform(matrix)
x_axis_label = "PC1"
y_axis_label = "PC2"
explained = list(getattr(pca, "explained_variance_ratio_", []) or [])
if len(explained) >= 2:
x_axis_label = f"PC1 ({explained[0] * 100:.1f}%)"
y_axis_label = f"PC2 ({explained[1] * 100:.1f}%)"
return (
[(float(row[0]), float(row[1])) for row in projected_matrix.tolist()],
x_axis_label,
y_axis_label,
)
except ImportError:
logger.warning(
"scikit-learn PCA is unavailable, falling back to the first scaled features for projection."
)
except ValueError as exc:
logger.warning("Failed to calculate PCA projection for remote sensing diagnostics: %s", exc)
x_values = [float(row[0]) if row else 0.0 for row in matrix]
if column_count >= 2:
y_values = [float(row[1]) for row in matrix]
else:
y_values = [0.0 for _ in matrix]
x_axis_label = f"{selected_features[0]} (scaled)" if selected_features else "Feature 1 (scaled)"
y_axis_label = (
f"{selected_features[1]} (scaled)"
if len(selected_features) >= 2
else "Projection Axis 2"
)
return list(zip(x_values, y_values)), x_axis_label, y_axis_label
def _build_observation_label(*, observation: AnalysisGridObservation, index: int) -> str:
_ = observation
return str(index + 1)
def _annotate_plot_points(
*,
axis: Any,
x_values: list[float],
y_values: list[float],
point_labels: list[str],
) -> None:
for x_value, y_value, point_label in zip(x_values, y_values, point_labels):
axis.annotate(
point_label,
xy=(x_value, y_value),
xytext=(4, 4),
textcoords="offset points",
fontsize=7,
alpha=0.85,
)
def _build_center_indexes_by_label(
*,
observations: list[AnalysisGridObservation],
labels: list[int],
cluster_summaries: list[dict[str, Any]],
) -> dict[int, int]:
cell_code_to_index = {
str(observation.cell.cell_code): index
for index, observation in enumerate(observations)
}
center_indexes_by_label: dict[int, int] = {}
for cluster_summary in cluster_summaries:
cluster_label = int(cluster_summary.get("cluster_label", -1))
center_cell_code = str(cluster_summary.get("center_cell_code") or "").strip()
center_index = cell_code_to_index.get(center_cell_code)
if center_index is None:
continue
if center_index < len(labels) and int(labels[center_index]) == cluster_label:
center_indexes_by_label[cluster_label] = center_index
return center_indexes_by_label
def _build_center_label(
*,
observations: list[AnalysisGridObservation],
cluster_summaries: list[dict[str, Any]],
cluster_label: int,
) -> str:
point_numbers_by_cell_code = {
str(observation.cell.cell_code): index + 1
for index, observation in enumerate(observations)
}
for cluster_summary in cluster_summaries:
if int(cluster_summary.get("cluster_label", -1)) == int(cluster_label):
center_cell_code = str(cluster_summary.get("center_cell_code") or "").strip()
if center_cell_code:
point_number = point_numbers_by_cell_code.get(center_cell_code)
if point_number is not None:
return f"K-center: {point_number}"
return "K-center"
break
return f"K-center C{cluster_label}"
def _plot_cluster_center_marker(
*,
axis: Any,
x_value: float,
y_value: float,
point_label: str,
color: Any,
) -> None:
axis.scatter(
[x_value],
[y_value],
s=220,
marker="*",
color=color,
edgecolors="black",
linewidths=1.2,
zorder=5,
)
axis.annotate(
point_label,
xy=(x_value, y_value),
xytext=(7, -10),
textcoords="offset points",
fontsize=8,
fontweight="bold",
color="black",
)
def _import_matplotlib_pyplot():
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
except ImportError as exc: # pragma: no cover - runtime dependency guard
raise DataDrivenSubdivisionError("matplotlib برای ذخیره نمودارهای KMeans لازم است.") from exc
return plt
def _build_clustering_log_context(
*,
observations: list[AnalysisGridObservation],
selected_features: list[str],
run: RemoteSensingRun | None,
location: SoilLocation | None,
) -> dict[str, Any]:
first_observation = observations[0] if observations else None
observation_metadata = dict(getattr(first_observation, "metadata", {}) or {})
resolved_run = run or getattr(first_observation, "run", None)
resolved_location = location or getattr(getattr(first_observation, "cell", None), "soil_location", None)
temporal_start = getattr(resolved_run, "temporal_start", None) or getattr(first_observation, "temporal_start", None)
temporal_end = getattr(resolved_run, "temporal_end", None) or getattr(first_observation, "temporal_end", None)
return {
"run_id": getattr(resolved_run, "id", None),
"job_ref": observation_metadata.get("job_refs", {}),
"region_id": getattr(resolved_location, "id", None),
"date_range": {
"temporal_start": temporal_start.isoformat() if hasattr(temporal_start, "isoformat") else temporal_start,
"temporal_end": temporal_end.isoformat() if hasattr(temporal_end, "isoformat") else temporal_end,
},
"selected_features": selected_features,
}
def _serialize_log_payload(payload: dict[str, Any]) -> str:
return json.dumps(payload, ensure_ascii=True, default=str, sort_keys=True)