402 lines
13 KiB
Python
402 lines
13 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from decimal import Decimal, ROUND_HALF_UP
|
||
|
|
from io import BytesIO
|
||
|
|
import math
|
||
|
|
|
||
|
|
from django.conf import settings
|
||
|
|
from django.core.files.base import ContentFile
|
||
|
|
|
||
|
|
|
||
|
|
EARTH_RADIUS_M = 6371008.8
|
||
|
|
COORD_PRECISION = Decimal("0.000001")
|
||
|
|
MAX_K = 10
|
||
|
|
RANDOM_STATE = 42
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class GeoPoint:
|
||
|
|
lat: float
|
||
|
|
lon: float
|
||
|
|
|
||
|
|
|
||
|
|
def create_or_get_block_subdivision(
|
||
|
|
location,
|
||
|
|
block_code: str,
|
||
|
|
boundary: dict | list,
|
||
|
|
*,
|
||
|
|
chunk_size_sqm: int | None = None,
|
||
|
|
):
|
||
|
|
"""
|
||
|
|
اگر subdivision این بلوک قبلاً ساخته شده باشد همان را برمیگرداند؛
|
||
|
|
در غیر این صورت الگوریتم grid + KMeans را اجرا و ذخیره میکند.
|
||
|
|
"""
|
||
|
|
from .models import BlockSubdivision
|
||
|
|
|
||
|
|
existing = BlockSubdivision.objects.filter(
|
||
|
|
soil_location=location,
|
||
|
|
block_code=block_code,
|
||
|
|
).first()
|
||
|
|
if existing is not None:
|
||
|
|
return existing, False
|
||
|
|
|
||
|
|
payload = build_block_subdivision_payload(
|
||
|
|
boundary=boundary,
|
||
|
|
block_code=block_code,
|
||
|
|
chunk_size_sqm=chunk_size_sqm,
|
||
|
|
)
|
||
|
|
subdivision = BlockSubdivision.objects.create(
|
||
|
|
soil_location=location,
|
||
|
|
block_code=block_code,
|
||
|
|
source_boundary=payload["source_boundary"],
|
||
|
|
chunk_size_sqm=payload["chunk_size_sqm"],
|
||
|
|
grid_points=payload["grid_points"],
|
||
|
|
centroid_points=payload["centroid_points"],
|
||
|
|
grid_point_count=payload["grid_point_count"],
|
||
|
|
centroid_count=payload["centroid_count"],
|
||
|
|
status="created",
|
||
|
|
metadata=payload["metadata"],
|
||
|
|
)
|
||
|
|
plot_content = render_elbow_plot(
|
||
|
|
inertia_curve=payload["metadata"].get("inertia_curve", []),
|
||
|
|
optimal_k=payload["metadata"].get("optimal_k", 0),
|
||
|
|
block_code=block_code,
|
||
|
|
)
|
||
|
|
if plot_content is not None:
|
||
|
|
subdivision.elbow_plot.save(
|
||
|
|
f"{location.pk}_{block_code}_elbow.png",
|
||
|
|
plot_content,
|
||
|
|
save=False,
|
||
|
|
)
|
||
|
|
subdivision.save(update_fields=["elbow_plot", "updated_at"])
|
||
|
|
sync_block_layout_with_subdivision(location, subdivision)
|
||
|
|
return subdivision, True
|
||
|
|
|
||
|
|
|
||
|
|
def build_block_subdivision_payload(
|
||
|
|
boundary: dict | list,
|
||
|
|
block_code: str = "block-1",
|
||
|
|
chunk_size_sqm: int | None = None,
|
||
|
|
) -> dict:
|
||
|
|
"""
|
||
|
|
مرز یک بلوک را گرفته و ابتدا شبکه نقاط را میسازد، سپس با KMeans
|
||
|
|
تعداد بهینه خوشهها را از elbow point پیدا میکند و centroidها را برمیگرداند.
|
||
|
|
"""
|
||
|
|
chunk_size = int(chunk_size_sqm or getattr(settings, "SUBDIVISION_CHUNK_SQM", 100) or 100)
|
||
|
|
if chunk_size <= 0:
|
||
|
|
raise ValueError("chunk_size_sqm باید بزرگتر از صفر باشد.")
|
||
|
|
|
||
|
|
polygon = extract_polygon(boundary)
|
||
|
|
if len(polygon) < 3:
|
||
|
|
raise ValueError("مرز بلوک باید حداقل سه نقطه معتبر داشته باشد.")
|
||
|
|
|
||
|
|
projected_polygon = project_polygon_to_local_meters(polygon)
|
||
|
|
area_sqm = abs(polygon_area(projected_polygon))
|
||
|
|
grid_points, grid_vectors = generate_grid_points(
|
||
|
|
polygon=polygon,
|
||
|
|
projected_polygon=projected_polygon,
|
||
|
|
chunk_size_sqm=chunk_size,
|
||
|
|
)
|
||
|
|
clustering_result = cluster_grid_points(grid_vectors, polygon)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"block_code": block_code,
|
||
|
|
"source_boundary": boundary if isinstance(boundary, dict) else {"points": boundary},
|
||
|
|
"chunk_size_sqm": chunk_size,
|
||
|
|
"grid_points": grid_points,
|
||
|
|
"centroid_points": clustering_result["centroid_points"],
|
||
|
|
"grid_point_count": len(grid_points),
|
||
|
|
"centroid_count": len(clustering_result["centroid_points"]),
|
||
|
|
"metadata": {
|
||
|
|
"estimated_area_sqm": round(area_sqm, 2),
|
||
|
|
"optimal_k": clustering_result["optimal_k"],
|
||
|
|
"inertia_curve": clustering_result["inertia_curve"],
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def cluster_grid_points(grid_vectors: list[tuple[float, float]], polygon: list[GeoPoint]) -> dict:
|
||
|
|
if not grid_vectors:
|
||
|
|
return {
|
||
|
|
"optimal_k": 0,
|
||
|
|
"inertia_curve": [],
|
||
|
|
"centroid_points": [],
|
||
|
|
}
|
||
|
|
|
||
|
|
if len(grid_vectors) == 1:
|
||
|
|
lat, lon = unproject_point(grid_vectors[0][0], grid_vectors[0][1], polygon)
|
||
|
|
return {
|
||
|
|
"optimal_k": 1,
|
||
|
|
"inertia_curve": [{"k": 1, "sse": 0.0}],
|
||
|
|
"centroid_points": [
|
||
|
|
{
|
||
|
|
"sub_block_code": "sub-block-1",
|
||
|
|
"centroid_lat": quantize_coordinate(lat),
|
||
|
|
"centroid_lon": quantize_coordinate(lon),
|
||
|
|
}
|
||
|
|
],
|
||
|
|
}
|
||
|
|
|
||
|
|
try:
|
||
|
|
from sklearn.cluster import KMeans
|
||
|
|
except ImportError as exc: # pragma: no cover - runtime dependency guard
|
||
|
|
raise ImportError("scikit-learn برای اجرای subdivision لازم است.") from exc
|
||
|
|
|
||
|
|
max_k = min(MAX_K, len(grid_vectors))
|
||
|
|
inertia_curve = []
|
||
|
|
trained_models = {}
|
||
|
|
for k in range(1, max_k + 1):
|
||
|
|
model = KMeans(
|
||
|
|
n_clusters=k,
|
||
|
|
n_init=10,
|
||
|
|
random_state=RANDOM_STATE,
|
||
|
|
)
|
||
|
|
model.fit(grid_vectors)
|
||
|
|
trained_models[k] = model
|
||
|
|
inertia_curve.append({"k": k, "sse": round(float(model.inertia_), 6)})
|
||
|
|
|
||
|
|
optimal_k = detect_elbow_point(inertia_curve)
|
||
|
|
final_model = trained_models[optimal_k]
|
||
|
|
centroid_points = []
|
||
|
|
for index, center in enumerate(final_model.cluster_centers_, start=1):
|
||
|
|
lat, lon = unproject_point(center[0], center[1], polygon)
|
||
|
|
centroid_points.append(
|
||
|
|
{
|
||
|
|
"sub_block_code": f"sub-block-{index}",
|
||
|
|
"centroid_lat": quantize_coordinate(lat),
|
||
|
|
"centroid_lon": quantize_coordinate(lon),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"optimal_k": optimal_k,
|
||
|
|
"inertia_curve": inertia_curve,
|
||
|
|
"centroid_points": centroid_points,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def detect_elbow_point(inertia_curve: list[dict]) -> int:
|
||
|
|
if not inertia_curve:
|
||
|
|
return 0
|
||
|
|
if len(inertia_curve) <= 2:
|
||
|
|
return inertia_curve[-1]["k"] if len(inertia_curve) == 2 else inertia_curve[0]["k"]
|
||
|
|
|
||
|
|
sses = [item["sse"] for item in inertia_curve]
|
||
|
|
ks = [item["k"] for item in inertia_curve]
|
||
|
|
slopes = [sses[index] - sses[index + 1] for index in range(len(sses) - 1)]
|
||
|
|
|
||
|
|
best_k = ks[0]
|
||
|
|
best_change = float("-inf")
|
||
|
|
for index in range(len(slopes) - 1):
|
||
|
|
change = slopes[index] - slopes[index + 1]
|
||
|
|
candidate_k = ks[index + 1]
|
||
|
|
if change > best_change:
|
||
|
|
best_change = change
|
||
|
|
best_k = candidate_k
|
||
|
|
return best_k
|
||
|
|
|
||
|
|
|
||
|
|
def render_elbow_plot(
|
||
|
|
inertia_curve: list[dict],
|
||
|
|
optimal_k: int,
|
||
|
|
block_code: str,
|
||
|
|
) -> ContentFile | None:
|
||
|
|
if not inertia_curve:
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
import matplotlib
|
||
|
|
|
||
|
|
matplotlib.use("Agg")
|
||
|
|
import matplotlib.pyplot as plt
|
||
|
|
except ImportError as exc: # pragma: no cover - runtime dependency guard
|
||
|
|
raise ImportError("matplotlib برای ذخیره نمودار elbow لازم است.") from exc
|
||
|
|
|
||
|
|
ks = [item["k"] for item in inertia_curve]
|
||
|
|
sses = [item["sse"] for item in inertia_curve]
|
||
|
|
buffer = BytesIO()
|
||
|
|
fig, ax = plt.subplots(figsize=(8, 5))
|
||
|
|
try:
|
||
|
|
ax.plot(ks, sses, marker="o", linewidth=2, color="#2f6fed")
|
||
|
|
if optimal_k in ks:
|
||
|
|
elbow_index = ks.index(optimal_k)
|
||
|
|
ax.scatter(
|
||
|
|
[ks[elbow_index]],
|
||
|
|
[sses[elbow_index]],
|
||
|
|
color="#d62828",
|
||
|
|
s=90,
|
||
|
|
zorder=3,
|
||
|
|
label=f"Elbow K={optimal_k}",
|
||
|
|
)
|
||
|
|
ax.legend()
|
||
|
|
ax.set_title(f"Elbow Plot - {block_code}")
|
||
|
|
ax.set_xlabel("K")
|
||
|
|
ax.set_ylabel("SSE / Inertia")
|
||
|
|
ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.6)
|
||
|
|
fig.tight_layout()
|
||
|
|
fig.savefig(buffer, format="png", dpi=150)
|
||
|
|
buffer.seek(0)
|
||
|
|
return ContentFile(buffer.getvalue())
|
||
|
|
finally:
|
||
|
|
buffer.close()
|
||
|
|
plt.close(fig)
|
||
|
|
|
||
|
|
|
||
|
|
def sync_block_layout_with_subdivision(location, subdivision) -> None:
|
||
|
|
layout = location.block_layout or {}
|
||
|
|
blocks = list(layout.get("blocks") or [])
|
||
|
|
target_block = None
|
||
|
|
for block in blocks:
|
||
|
|
if block.get("block_code") == subdivision.block_code:
|
||
|
|
target_block = block
|
||
|
|
break
|
||
|
|
|
||
|
|
if target_block is None:
|
||
|
|
target_block = {
|
||
|
|
"block_code": subdivision.block_code,
|
||
|
|
"order": len(blocks) + 1,
|
||
|
|
"source": "input",
|
||
|
|
"needs_subdivision": None,
|
||
|
|
"sub_blocks": [],
|
||
|
|
}
|
||
|
|
blocks.append(target_block)
|
||
|
|
|
||
|
|
target_block["needs_subdivision"] = subdivision.centroid_count > 1
|
||
|
|
target_block["sub_blocks"] = list(subdivision.centroid_points or [])
|
||
|
|
target_block["subdivision_summary"] = {
|
||
|
|
"chunk_size_sqm": subdivision.chunk_size_sqm,
|
||
|
|
"grid_point_count": subdivision.grid_point_count,
|
||
|
|
"centroid_count": subdivision.centroid_count,
|
||
|
|
"optimal_k": (subdivision.metadata or {}).get("optimal_k", subdivision.centroid_count),
|
||
|
|
}
|
||
|
|
layout["blocks"] = blocks
|
||
|
|
layout["algorithm_status"] = "completed"
|
||
|
|
location.block_layout = layout
|
||
|
|
location.save(update_fields=["block_layout", "updated_at"])
|
||
|
|
|
||
|
|
|
||
|
|
def generate_grid_points(
|
||
|
|
polygon: list[GeoPoint],
|
||
|
|
projected_polygon: list[tuple[float, float]],
|
||
|
|
chunk_size_sqm: int,
|
||
|
|
) -> tuple[list[dict], list[tuple[float, float]]]:
|
||
|
|
step_m = math.sqrt(chunk_size_sqm)
|
||
|
|
min_x, max_x, min_y, max_y = bounds(projected_polygon)
|
||
|
|
grid_points: list[dict] = []
|
||
|
|
grid_vectors: list[tuple[float, float]] = []
|
||
|
|
|
||
|
|
y = min_y + (step_m / 2.0)
|
||
|
|
point_index = 0
|
||
|
|
while y <= max_y:
|
||
|
|
x = min_x + (step_m / 2.0)
|
||
|
|
while x <= max_x:
|
||
|
|
if point_in_polygon((x, y), projected_polygon):
|
||
|
|
lat, lon = unproject_point(x, y, polygon)
|
||
|
|
point_index += 1
|
||
|
|
grid_vectors.append((x, y))
|
||
|
|
grid_points.append(
|
||
|
|
{
|
||
|
|
"point_code": f"pt-{point_index}",
|
||
|
|
"lat": quantize_coordinate(lat),
|
||
|
|
"lon": quantize_coordinate(lon),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
x += step_m
|
||
|
|
y += step_m
|
||
|
|
return grid_points, grid_vectors
|
||
|
|
|
||
|
|
|
||
|
|
def extract_polygon(boundary: dict | list) -> list[GeoPoint]:
|
||
|
|
if isinstance(boundary, dict):
|
||
|
|
if boundary.get("type") == "Polygon":
|
||
|
|
coordinates = boundary.get("coordinates") or []
|
||
|
|
if coordinates and isinstance(coordinates[0], list):
|
||
|
|
points = coordinates[0]
|
||
|
|
else:
|
||
|
|
points = []
|
||
|
|
else:
|
||
|
|
points = boundary.get("corners") or []
|
||
|
|
elif isinstance(boundary, list):
|
||
|
|
points = boundary
|
||
|
|
else:
|
||
|
|
points = []
|
||
|
|
|
||
|
|
polygon: list[GeoPoint] = []
|
||
|
|
for point in points:
|
||
|
|
lat = lon = None
|
||
|
|
if isinstance(point, dict):
|
||
|
|
lat = point.get("lat", point.get("latitude"))
|
||
|
|
lon = point.get("lon", point.get("longitude"))
|
||
|
|
elif isinstance(point, (list, tuple)) and len(point) >= 2:
|
||
|
|
lon, lat = point[0], point[1]
|
||
|
|
|
||
|
|
if lat is None or lon is None:
|
||
|
|
continue
|
||
|
|
polygon.append(GeoPoint(lat=float(lat), lon=float(lon)))
|
||
|
|
|
||
|
|
if len(polygon) > 1 and polygon[0] == polygon[-1]:
|
||
|
|
polygon = polygon[:-1]
|
||
|
|
return polygon
|
||
|
|
|
||
|
|
|
||
|
|
def project_polygon_to_local_meters(polygon: list[GeoPoint]) -> list[tuple[float, float]]:
|
||
|
|
origin = polygon[0]
|
||
|
|
lat0 = math.radians(origin.lat)
|
||
|
|
lon0 = math.radians(origin.lon)
|
||
|
|
cos_lat0 = math.cos(lat0)
|
||
|
|
projected = []
|
||
|
|
for point in polygon:
|
||
|
|
lat = math.radians(point.lat)
|
||
|
|
lon = math.radians(point.lon)
|
||
|
|
x = (lon - lon0) * cos_lat0 * EARTH_RADIUS_M
|
||
|
|
y = (lat - lat0) * EARTH_RADIUS_M
|
||
|
|
projected.append((x, y))
|
||
|
|
return projected
|
||
|
|
|
||
|
|
|
||
|
|
def unproject_point(x: float, y: float, polygon: list[GeoPoint]) -> tuple[float, float]:
|
||
|
|
origin = polygon[0]
|
||
|
|
lat0 = math.radians(origin.lat)
|
||
|
|
lon0 = math.radians(origin.lon)
|
||
|
|
cos_lat0 = math.cos(lat0)
|
||
|
|
lat = math.degrees((y / EARTH_RADIUS_M) + lat0)
|
||
|
|
lon = math.degrees((x / (EARTH_RADIUS_M * cos_lat0)) + lon0)
|
||
|
|
return lat, lon
|
||
|
|
|
||
|
|
|
||
|
|
def polygon_area(points: list[tuple[float, float]]) -> float:
|
||
|
|
area = 0.0
|
||
|
|
closed = points + [points[0]]
|
||
|
|
for index in range(len(points)):
|
||
|
|
x1, y1 = closed[index]
|
||
|
|
x2, y2 = closed[index + 1]
|
||
|
|
area += (x1 * y2) - (x2 * y1)
|
||
|
|
return area / 2.0
|
||
|
|
|
||
|
|
|
||
|
|
def bounds(points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
|
||
|
|
xs = [point[0] for point in points]
|
||
|
|
ys = [point[1] for point in points]
|
||
|
|
return min(xs), max(xs), min(ys), max(ys)
|
||
|
|
|
||
|
|
|
||
|
|
def point_in_polygon(point: tuple[float, float], polygon: list[tuple[float, float]]) -> bool:
|
||
|
|
x, y = point
|
||
|
|
inside = False
|
||
|
|
j = len(polygon) - 1
|
||
|
|
for i in range(len(polygon)):
|
||
|
|
xi, yi = polygon[i]
|
||
|
|
xj, yj = polygon[j]
|
||
|
|
intersects = ((yi > y) != (yj > y)) and (
|
||
|
|
x < ((xj - xi) * (y - yi) / ((yj - yi) or 1e-12)) + xi
|
||
|
|
)
|
||
|
|
if intersects:
|
||
|
|
inside = not inside
|
||
|
|
j = i
|
||
|
|
return inside
|
||
|
|
|
||
|
|
|
||
|
|
def quantize_coordinate(value: float) -> float:
|
||
|
|
return float(Decimal(str(value)).quantize(COORD_PRECISION, rounding=ROUND_HALF_UP))
|