Files
Ai/location_data/block_subdivision.py
T

402 lines
13 KiB
Python
Raw Normal View History

2026-05-09 16:55:06 +03:30
from __future__ import annotations
from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from io import BytesIO
import math
from django.conf import settings
from django.core.files.base import ContentFile
EARTH_RADIUS_M = 6371008.8
COORD_PRECISION = Decimal("0.000001")
MAX_K = 10
RANDOM_STATE = 42
@dataclass(frozen=True)
class GeoPoint:
lat: float
lon: float
def create_or_get_block_subdivision(
location,
block_code: str,
boundary: dict | list,
*,
chunk_size_sqm: int | None = None,
):
"""
اگر subdivision این بلوک قبلاً ساخته شده باشد همان را برمی‌گرداند؛
در غیر این صورت الگوریتم grid + KMeans را اجرا و ذخیره می‌کند.
"""
from .models import BlockSubdivision
existing = BlockSubdivision.objects.filter(
soil_location=location,
block_code=block_code,
).first()
if existing is not None:
return existing, False
payload = build_block_subdivision_payload(
boundary=boundary,
block_code=block_code,
chunk_size_sqm=chunk_size_sqm,
)
subdivision = BlockSubdivision.objects.create(
soil_location=location,
block_code=block_code,
source_boundary=payload["source_boundary"],
chunk_size_sqm=payload["chunk_size_sqm"],
grid_points=payload["grid_points"],
centroid_points=payload["centroid_points"],
grid_point_count=payload["grid_point_count"],
centroid_count=payload["centroid_count"],
status="created",
metadata=payload["metadata"],
)
plot_content = render_elbow_plot(
inertia_curve=payload["metadata"].get("inertia_curve", []),
optimal_k=payload["metadata"].get("optimal_k", 0),
block_code=block_code,
)
if plot_content is not None:
subdivision.elbow_plot.save(
f"{location.pk}_{block_code}_elbow.png",
plot_content,
save=False,
)
subdivision.save(update_fields=["elbow_plot", "updated_at"])
sync_block_layout_with_subdivision(location, subdivision)
return subdivision, True
def build_block_subdivision_payload(
boundary: dict | list,
block_code: str = "block-1",
chunk_size_sqm: int | None = None,
) -> dict:
"""
مرز یک بلوک را گرفته و ابتدا شبکه نقاط را می‌سازد، سپس با KMeans
تعداد بهینه خوشه‌ها را از elbow point پیدا می‌کند و centroidها را برمی‌گرداند.
"""
chunk_size = int(chunk_size_sqm or getattr(settings, "SUBDIVISION_CHUNK_SQM", 900) or 900)
if chunk_size <= 0:
raise ValueError("chunk_size_sqm باید بزرگ‌تر از صفر باشد.")
polygon = extract_polygon(boundary)
if len(polygon) < 3:
raise ValueError("مرز بلوک باید حداقل سه نقطه معتبر داشته باشد.")
projected_polygon = project_polygon_to_local_meters(polygon)
area_sqm = abs(polygon_area(projected_polygon))
grid_points, grid_vectors = generate_grid_points(
polygon=polygon,
projected_polygon=projected_polygon,
chunk_size_sqm=chunk_size,
)
clustering_result = cluster_grid_points(grid_vectors, polygon)
return {
"block_code": block_code,
"source_boundary": boundary if isinstance(boundary, dict) else {"points": boundary},
"chunk_size_sqm": chunk_size,
"grid_points": grid_points,
"centroid_points": clustering_result["centroid_points"],
"grid_point_count": len(grid_points),
"centroid_count": len(clustering_result["centroid_points"]),
"metadata": {
"estimated_area_sqm": round(area_sqm, 2),
"optimal_k": clustering_result["optimal_k"],
"inertia_curve": clustering_result["inertia_curve"],
},
}
def cluster_grid_points(grid_vectors: list[tuple[float, float]], polygon: list[GeoPoint]) -> dict:
if not grid_vectors:
return {
"optimal_k": 0,
"inertia_curve": [],
"centroid_points": [],
}
if len(grid_vectors) == 1:
lat, lon = unproject_point(grid_vectors[0][0], grid_vectors[0][1], polygon)
return {
"optimal_k": 1,
"inertia_curve": [{"k": 1, "sse": 0.0}],
"centroid_points": [
{
"sub_block_code": "sub-block-1",
"centroid_lat": quantize_coordinate(lat),
"centroid_lon": quantize_coordinate(lon),
}
],
}
try:
from sklearn.cluster import KMeans
except ImportError as exc: # pragma: no cover - runtime dependency guard
raise ImportError("scikit-learn برای اجرای subdivision لازم است.") from exc
max_k = min(MAX_K, len(grid_vectors))
inertia_curve = []
trained_models = {}
for k in range(1, max_k + 1):
model = KMeans(
n_clusters=k,
n_init=10,
random_state=RANDOM_STATE,
)
model.fit(grid_vectors)
trained_models[k] = model
inertia_curve.append({"k": k, "sse": round(float(model.inertia_), 6)})
optimal_k = detect_elbow_point(inertia_curve)
final_model = trained_models[optimal_k]
centroid_points = []
for index, center in enumerate(final_model.cluster_centers_, start=1):
lat, lon = unproject_point(center[0], center[1], polygon)
centroid_points.append(
{
"sub_block_code": f"sub-block-{index}",
"centroid_lat": quantize_coordinate(lat),
"centroid_lon": quantize_coordinate(lon),
}
)
return {
"optimal_k": optimal_k,
"inertia_curve": inertia_curve,
"centroid_points": centroid_points,
}
def detect_elbow_point(inertia_curve: list[dict]) -> int:
if not inertia_curve:
return 0
if len(inertia_curve) <= 2:
return inertia_curve[-1]["k"] if len(inertia_curve) == 2 else inertia_curve[0]["k"]
sses = [item["sse"] for item in inertia_curve]
ks = [item["k"] for item in inertia_curve]
slopes = [sses[index] - sses[index + 1] for index in range(len(sses) - 1)]
best_k = ks[0]
best_change = float("-inf")
for index in range(len(slopes) - 1):
change = slopes[index] - slopes[index + 1]
candidate_k = ks[index + 1]
if change > best_change:
best_change = change
best_k = candidate_k
return best_k
def render_elbow_plot(
inertia_curve: list[dict],
optimal_k: int,
block_code: str,
) -> ContentFile | None:
if not inertia_curve:
return None
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
except ImportError as exc: # pragma: no cover - runtime dependency guard
raise ImportError("matplotlib برای ذخیره نمودار elbow لازم است.") from exc
ks = [item["k"] for item in inertia_curve]
sses = [item["sse"] for item in inertia_curve]
buffer = BytesIO()
fig, ax = plt.subplots(figsize=(8, 5))
try:
ax.plot(ks, sses, marker="o", linewidth=2, color="#2f6fed")
if optimal_k in ks:
elbow_index = ks.index(optimal_k)
ax.scatter(
[ks[elbow_index]],
[sses[elbow_index]],
color="#d62828",
s=90,
zorder=3,
label=f"Elbow K={optimal_k}",
)
ax.legend()
ax.set_title(f"Elbow Plot - {block_code}")
ax.set_xlabel("K")
ax.set_ylabel("SSE / Inertia")
ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.6)
fig.tight_layout()
fig.savefig(buffer, format="png", dpi=150)
buffer.seek(0)
return ContentFile(buffer.getvalue())
finally:
buffer.close()
plt.close(fig)
def sync_block_layout_with_subdivision(location, subdivision) -> None:
layout = location.block_layout or {}
blocks = list(layout.get("blocks") or [])
target_block = None
for block in blocks:
if block.get("block_code") == subdivision.block_code:
target_block = block
break
if target_block is None:
target_block = {
"block_code": subdivision.block_code,
"order": len(blocks) + 1,
"source": "input",
"needs_subdivision": None,
"sub_blocks": [],
}
blocks.append(target_block)
target_block["needs_subdivision"] = subdivision.centroid_count > 1
target_block["sub_blocks"] = list(subdivision.centroid_points or [])
target_block["subdivision_summary"] = {
"chunk_size_sqm": subdivision.chunk_size_sqm,
"grid_point_count": subdivision.grid_point_count,
"centroid_count": subdivision.centroid_count,
"optimal_k": (subdivision.metadata or {}).get("optimal_k", subdivision.centroid_count),
}
layout["blocks"] = blocks
layout["algorithm_status"] = "completed"
location.block_layout = layout
location.save(update_fields=["block_layout", "updated_at"])
def generate_grid_points(
polygon: list[GeoPoint],
projected_polygon: list[tuple[float, float]],
chunk_size_sqm: int,
) -> tuple[list[dict], list[tuple[float, float]]]:
step_m = math.sqrt(chunk_size_sqm)
min_x, max_x, min_y, max_y = bounds(projected_polygon)
grid_points: list[dict] = []
grid_vectors: list[tuple[float, float]] = []
y = min_y + (step_m / 2.0)
point_index = 0
while y <= max_y:
x = min_x + (step_m / 2.0)
while x <= max_x:
if point_in_polygon((x, y), projected_polygon):
lat, lon = unproject_point(x, y, polygon)
point_index += 1
grid_vectors.append((x, y))
grid_points.append(
{
"point_code": f"pt-{point_index}",
"lat": quantize_coordinate(lat),
"lon": quantize_coordinate(lon),
}
)
x += step_m
y += step_m
return grid_points, grid_vectors
def extract_polygon(boundary: dict | list) -> list[GeoPoint]:
if isinstance(boundary, dict):
if boundary.get("type") == "Polygon":
coordinates = boundary.get("coordinates") or []
if coordinates and isinstance(coordinates[0], list):
points = coordinates[0]
else:
points = []
else:
points = boundary.get("corners") or []
elif isinstance(boundary, list):
points = boundary
else:
points = []
polygon: list[GeoPoint] = []
for point in points:
lat = lon = None
if isinstance(point, dict):
lat = point.get("lat", point.get("latitude"))
lon = point.get("lon", point.get("longitude"))
elif isinstance(point, (list, tuple)) and len(point) >= 2:
lon, lat = point[0], point[1]
if lat is None or lon is None:
continue
polygon.append(GeoPoint(lat=float(lat), lon=float(lon)))
if len(polygon) > 1 and polygon[0] == polygon[-1]:
polygon = polygon[:-1]
return polygon
def project_polygon_to_local_meters(polygon: list[GeoPoint]) -> list[tuple[float, float]]:
origin = polygon[0]
lat0 = math.radians(origin.lat)
lon0 = math.radians(origin.lon)
cos_lat0 = math.cos(lat0)
projected = []
for point in polygon:
lat = math.radians(point.lat)
lon = math.radians(point.lon)
x = (lon - lon0) * cos_lat0 * EARTH_RADIUS_M
y = (lat - lat0) * EARTH_RADIUS_M
projected.append((x, y))
return projected
def unproject_point(x: float, y: float, polygon: list[GeoPoint]) -> tuple[float, float]:
origin = polygon[0]
lat0 = math.radians(origin.lat)
lon0 = math.radians(origin.lon)
cos_lat0 = math.cos(lat0)
lat = math.degrees((y / EARTH_RADIUS_M) + lat0)
lon = math.degrees((x / (EARTH_RADIUS_M * cos_lat0)) + lon0)
return lat, lon
def polygon_area(points: list[tuple[float, float]]) -> float:
area = 0.0
closed = points + [points[0]]
for index in range(len(points)):
x1, y1 = closed[index]
x2, y2 = closed[index + 1]
area += (x1 * y2) - (x2 * y1)
return area / 2.0
def bounds(points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
xs = [point[0] for point in points]
ys = [point[1] for point in points]
return min(xs), max(xs), min(ys), max(ys)
def point_in_polygon(point: tuple[float, float], polygon: list[tuple[float, float]]) -> bool:
x, y = point
inside = False
j = len(polygon) - 1
for i in range(len(polygon)):
xi, yi = polygon[i]
xj, yj = polygon[j]
intersects = ((yi > y) != (yj > y)) and (
x < ((xj - xi) * (y - yi) / ((yj - yi) or 1e-12)) + xi
)
if intersects:
inside = not inside
j = i
return inside
def quantize_coordinate(value: float) -> float:
return float(Decimal(str(value)).quantize(COORD_PRECISION, rounding=ROUND_HALF_UP))