from __future__ import annotations from dataclasses import dataclass from decimal import Decimal, ROUND_HALF_UP from io import BytesIO import math from django.conf import settings from django.core.files.base import ContentFile EARTH_RADIUS_M = 6371008.8 COORD_PRECISION = Decimal("0.000001") MAX_K = 10 RANDOM_STATE = 42 @dataclass(frozen=True) class GeoPoint: lat: float lon: float def create_or_get_block_subdivision( location, block_code: str, boundary: dict | list, *, chunk_size_sqm: int | None = None, ): """ اگر subdivision این بلوک قبلاً ساخته شده باشد همان را برمی‌گرداند؛ در غیر این صورت الگوریتم grid + KMeans را اجرا و ذخیره می‌کند. """ from .models import BlockSubdivision existing = BlockSubdivision.objects.filter( soil_location=location, block_code=block_code, ).first() if existing is not None: return existing, False payload = build_block_subdivision_payload( boundary=boundary, block_code=block_code, chunk_size_sqm=chunk_size_sqm, ) subdivision = BlockSubdivision.objects.create( soil_location=location, block_code=block_code, source_boundary=payload["source_boundary"], chunk_size_sqm=payload["chunk_size_sqm"], grid_points=payload["grid_points"], centroid_points=payload["centroid_points"], grid_point_count=payload["grid_point_count"], centroid_count=payload["centroid_count"], status="created", metadata=payload["metadata"], ) plot_content = render_elbow_plot( inertia_curve=payload["metadata"].get("inertia_curve", []), optimal_k=payload["metadata"].get("optimal_k", 0), block_code=block_code, ) if plot_content is not None: subdivision.elbow_plot.save( f"{location.pk}_{block_code}_elbow.png", plot_content, save=False, ) subdivision.save(update_fields=["elbow_plot", "updated_at"]) sync_block_layout_with_subdivision(location, subdivision) return subdivision, True def build_block_subdivision_payload( boundary: dict | list, block_code: str = "block-1", chunk_size_sqm: int | None = None, ) -> dict: """ مرز یک بلوک را گرفته و ابتدا شبکه نقاط را می‌سازد، سپس با KMeans تعداد بهینه خوشه‌ها را از elbow point پیدا می‌کند و centroidها را برمی‌گرداند. """ chunk_size = int(chunk_size_sqm or getattr(settings, "SUBDIVISION_CHUNK_SQM", 900) or 900) if chunk_size <= 0: raise ValueError("chunk_size_sqm باید بزرگ‌تر از صفر باشد.") polygon = extract_polygon(boundary) if len(polygon) < 3: raise ValueError("مرز بلوک باید حداقل سه نقطه معتبر داشته باشد.") projected_polygon = project_polygon_to_local_meters(polygon) area_sqm = abs(polygon_area(projected_polygon)) grid_points, grid_vectors = generate_grid_points( polygon=polygon, projected_polygon=projected_polygon, chunk_size_sqm=chunk_size, ) clustering_result = cluster_grid_points(grid_vectors, polygon) return { "block_code": block_code, "source_boundary": boundary if isinstance(boundary, dict) else {"points": boundary}, "chunk_size_sqm": chunk_size, "grid_points": grid_points, "centroid_points": clustering_result["centroid_points"], "grid_point_count": len(grid_points), "centroid_count": len(clustering_result["centroid_points"]), "metadata": { "estimated_area_sqm": round(area_sqm, 2), "optimal_k": clustering_result["optimal_k"], "inertia_curve": clustering_result["inertia_curve"], }, } def cluster_grid_points(grid_vectors: list[tuple[float, float]], polygon: list[GeoPoint]) -> dict: if not grid_vectors: return { "optimal_k": 0, "inertia_curve": [], "centroid_points": [], } if len(grid_vectors) == 1: lat, lon = unproject_point(grid_vectors[0][0], grid_vectors[0][1], polygon) return { "optimal_k": 1, "inertia_curve": [{"k": 1, "sse": 0.0}], "centroid_points": [ { "sub_block_code": "sub-block-1", "centroid_lat": quantize_coordinate(lat), "centroid_lon": quantize_coordinate(lon), } ], } try: from sklearn.cluster import KMeans except ImportError as exc: # pragma: no cover - runtime dependency guard raise ImportError("scikit-learn برای اجرای subdivision لازم است.") from exc max_k = min(MAX_K, len(grid_vectors)) inertia_curve = [] trained_models = {} for k in range(1, max_k + 1): model = KMeans( n_clusters=k, n_init=10, random_state=RANDOM_STATE, ) model.fit(grid_vectors) trained_models[k] = model inertia_curve.append({"k": k, "sse": round(float(model.inertia_), 6)}) optimal_k = detect_elbow_point(inertia_curve) final_model = trained_models[optimal_k] centroid_points = [] for index, center in enumerate(final_model.cluster_centers_, start=1): lat, lon = unproject_point(center[0], center[1], polygon) centroid_points.append( { "sub_block_code": f"sub-block-{index}", "centroid_lat": quantize_coordinate(lat), "centroid_lon": quantize_coordinate(lon), } ) return { "optimal_k": optimal_k, "inertia_curve": inertia_curve, "centroid_points": centroid_points, } def detect_elbow_point(inertia_curve: list[dict]) -> int: if not inertia_curve: return 0 if len(inertia_curve) <= 2: return inertia_curve[-1]["k"] if len(inertia_curve) == 2 else inertia_curve[0]["k"] sses = [item["sse"] for item in inertia_curve] ks = [item["k"] for item in inertia_curve] slopes = [sses[index] - sses[index + 1] for index in range(len(sses) - 1)] best_k = ks[0] best_change = float("-inf") for index in range(len(slopes) - 1): change = slopes[index] - slopes[index + 1] candidate_k = ks[index + 1] if change > best_change: best_change = change best_k = candidate_k return best_k def render_elbow_plot( inertia_curve: list[dict], optimal_k: int, block_code: str, ) -> ContentFile | None: if not inertia_curve: return None try: import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt except ImportError as exc: # pragma: no cover - runtime dependency guard raise ImportError("matplotlib برای ذخیره نمودار elbow لازم است.") from exc ks = [item["k"] for item in inertia_curve] sses = [item["sse"] for item in inertia_curve] buffer = BytesIO() fig, ax = plt.subplots(figsize=(8, 5)) try: ax.plot(ks, sses, marker="o", linewidth=2, color="#2f6fed") if optimal_k in ks: elbow_index = ks.index(optimal_k) ax.scatter( [ks[elbow_index]], [sses[elbow_index]], color="#d62828", s=90, zorder=3, label=f"Elbow K={optimal_k}", ) ax.legend() ax.set_title(f"Elbow Plot - {block_code}") ax.set_xlabel("K") ax.set_ylabel("SSE / Inertia") ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.6) fig.tight_layout() fig.savefig(buffer, format="png", dpi=150) buffer.seek(0) return ContentFile(buffer.getvalue()) finally: buffer.close() plt.close(fig) def sync_block_layout_with_subdivision(location, subdivision) -> None: layout = location.block_layout or {} blocks = list(layout.get("blocks") or []) target_block = None for block in blocks: if block.get("block_code") == subdivision.block_code: target_block = block break if target_block is None: target_block = { "block_code": subdivision.block_code, "order": len(blocks) + 1, "source": "input", "needs_subdivision": None, "sub_blocks": [], } blocks.append(target_block) target_block["needs_subdivision"] = subdivision.centroid_count > 1 target_block["sub_blocks"] = list(subdivision.centroid_points or []) target_block["subdivision_summary"] = { "chunk_size_sqm": subdivision.chunk_size_sqm, "grid_point_count": subdivision.grid_point_count, "centroid_count": subdivision.centroid_count, "optimal_k": (subdivision.metadata or {}).get("optimal_k", subdivision.centroid_count), } layout["blocks"] = blocks layout["algorithm_status"] = "completed" location.block_layout = layout location.save(update_fields=["block_layout", "updated_at"]) def generate_grid_points( polygon: list[GeoPoint], projected_polygon: list[tuple[float, float]], chunk_size_sqm: int, ) -> tuple[list[dict], list[tuple[float, float]]]: step_m = math.sqrt(chunk_size_sqm) min_x, max_x, min_y, max_y = bounds(projected_polygon) grid_points: list[dict] = [] grid_vectors: list[tuple[float, float]] = [] y = min_y + (step_m / 2.0) point_index = 0 while y <= max_y: x = min_x + (step_m / 2.0) while x <= max_x: if point_in_polygon((x, y), projected_polygon): lat, lon = unproject_point(x, y, polygon) point_index += 1 grid_vectors.append((x, y)) grid_points.append( { "point_code": f"pt-{point_index}", "lat": quantize_coordinate(lat), "lon": quantize_coordinate(lon), } ) x += step_m y += step_m return grid_points, grid_vectors def extract_polygon(boundary: dict | list) -> list[GeoPoint]: if isinstance(boundary, dict): if boundary.get("type") == "Polygon": coordinates = boundary.get("coordinates") or [] if coordinates and isinstance(coordinates[0], list): points = coordinates[0] else: points = [] else: points = boundary.get("corners") or [] elif isinstance(boundary, list): points = boundary else: points = [] polygon: list[GeoPoint] = [] for point in points: lat = lon = None if isinstance(point, dict): lat = point.get("lat", point.get("latitude")) lon = point.get("lon", point.get("longitude")) elif isinstance(point, (list, tuple)) and len(point) >= 2: lon, lat = point[0], point[1] if lat is None or lon is None: continue polygon.append(GeoPoint(lat=float(lat), lon=float(lon))) if len(polygon) > 1 and polygon[0] == polygon[-1]: polygon = polygon[:-1] return polygon def project_polygon_to_local_meters(polygon: list[GeoPoint]) -> list[tuple[float, float]]: origin = polygon[0] lat0 = math.radians(origin.lat) lon0 = math.radians(origin.lon) cos_lat0 = math.cos(lat0) projected = [] for point in polygon: lat = math.radians(point.lat) lon = math.radians(point.lon) x = (lon - lon0) * cos_lat0 * EARTH_RADIUS_M y = (lat - lat0) * EARTH_RADIUS_M projected.append((x, y)) return projected def unproject_point(x: float, y: float, polygon: list[GeoPoint]) -> tuple[float, float]: origin = polygon[0] lat0 = math.radians(origin.lat) lon0 = math.radians(origin.lon) cos_lat0 = math.cos(lat0) lat = math.degrees((y / EARTH_RADIUS_M) + lat0) lon = math.degrees((x / (EARTH_RADIUS_M * cos_lat0)) + lon0) return lat, lon def polygon_area(points: list[tuple[float, float]]) -> float: area = 0.0 closed = points + [points[0]] for index in range(len(points)): x1, y1 = closed[index] x2, y2 = closed[index + 1] area += (x1 * y2) - (x2 * y1) return area / 2.0 def bounds(points: list[tuple[float, float]]) -> tuple[float, float, float, float]: xs = [point[0] for point in points] ys = [point[1] for point in points] return min(xs), max(xs), min(ys), max(ys) def point_in_polygon(point: tuple[float, float], polygon: list[tuple[float, float]]) -> bool: x, y = point inside = False j = len(polygon) - 1 for i in range(len(polygon)): xi, yi = polygon[i] xj, yj = polygon[j] intersects = ((yi > y) != (yj > y)) and ( x < ((xj - xi) * (y - yi) / ((yj - yi) or 1e-12)) + xi ) if intersects: inside = not inside j = i return inside def quantize_coordinate(value: float) -> float: return float(Decimal(str(value)).quantize(COORD_PRECISION, rounding=ROUND_HALF_UP))