UPDATE
This commit is contained in:
@@ -0,0 +1,401 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from decimal import Decimal, ROUND_HALF_UP
|
||||
from io import BytesIO
|
||||
import math
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.files.base import ContentFile
|
||||
|
||||
|
||||
EARTH_RADIUS_M = 6371008.8
|
||||
COORD_PRECISION = Decimal("0.000001")
|
||||
MAX_K = 10
|
||||
RANDOM_STATE = 42
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GeoPoint:
|
||||
lat: float
|
||||
lon: float
|
||||
|
||||
|
||||
def create_or_get_block_subdivision(
|
||||
location,
|
||||
block_code: str,
|
||||
boundary: dict | list,
|
||||
*,
|
||||
chunk_size_sqm: int | None = None,
|
||||
):
|
||||
"""
|
||||
اگر subdivision این بلوک قبلاً ساخته شده باشد همان را برمیگرداند؛
|
||||
در غیر این صورت الگوریتم grid + KMeans را اجرا و ذخیره میکند.
|
||||
"""
|
||||
from .models import BlockSubdivision
|
||||
|
||||
existing = BlockSubdivision.objects.filter(
|
||||
soil_location=location,
|
||||
block_code=block_code,
|
||||
).first()
|
||||
if existing is not None:
|
||||
return existing, False
|
||||
|
||||
payload = build_block_subdivision_payload(
|
||||
boundary=boundary,
|
||||
block_code=block_code,
|
||||
chunk_size_sqm=chunk_size_sqm,
|
||||
)
|
||||
subdivision = BlockSubdivision.objects.create(
|
||||
soil_location=location,
|
||||
block_code=block_code,
|
||||
source_boundary=payload["source_boundary"],
|
||||
chunk_size_sqm=payload["chunk_size_sqm"],
|
||||
grid_points=payload["grid_points"],
|
||||
centroid_points=payload["centroid_points"],
|
||||
grid_point_count=payload["grid_point_count"],
|
||||
centroid_count=payload["centroid_count"],
|
||||
status="created",
|
||||
metadata=payload["metadata"],
|
||||
)
|
||||
plot_content = render_elbow_plot(
|
||||
inertia_curve=payload["metadata"].get("inertia_curve", []),
|
||||
optimal_k=payload["metadata"].get("optimal_k", 0),
|
||||
block_code=block_code,
|
||||
)
|
||||
if plot_content is not None:
|
||||
subdivision.elbow_plot.save(
|
||||
f"{location.pk}_{block_code}_elbow.png",
|
||||
plot_content,
|
||||
save=False,
|
||||
)
|
||||
subdivision.save(update_fields=["elbow_plot", "updated_at"])
|
||||
sync_block_layout_with_subdivision(location, subdivision)
|
||||
return subdivision, True
|
||||
|
||||
|
||||
def build_block_subdivision_payload(
|
||||
boundary: dict | list,
|
||||
block_code: str = "block-1",
|
||||
chunk_size_sqm: int | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
مرز یک بلوک را گرفته و ابتدا شبکه نقاط را میسازد، سپس با KMeans
|
||||
تعداد بهینه خوشهها را از elbow point پیدا میکند و centroidها را برمیگرداند.
|
||||
"""
|
||||
chunk_size = int(chunk_size_sqm or getattr(settings, "SUBDIVISION_CHUNK_SQM", 900) or 900)
|
||||
if chunk_size <= 0:
|
||||
raise ValueError("chunk_size_sqm باید بزرگتر از صفر باشد.")
|
||||
|
||||
polygon = extract_polygon(boundary)
|
||||
if len(polygon) < 3:
|
||||
raise ValueError("مرز بلوک باید حداقل سه نقطه معتبر داشته باشد.")
|
||||
|
||||
projected_polygon = project_polygon_to_local_meters(polygon)
|
||||
area_sqm = abs(polygon_area(projected_polygon))
|
||||
grid_points, grid_vectors = generate_grid_points(
|
||||
polygon=polygon,
|
||||
projected_polygon=projected_polygon,
|
||||
chunk_size_sqm=chunk_size,
|
||||
)
|
||||
clustering_result = cluster_grid_points(grid_vectors, polygon)
|
||||
|
||||
return {
|
||||
"block_code": block_code,
|
||||
"source_boundary": boundary if isinstance(boundary, dict) else {"points": boundary},
|
||||
"chunk_size_sqm": chunk_size,
|
||||
"grid_points": grid_points,
|
||||
"centroid_points": clustering_result["centroid_points"],
|
||||
"grid_point_count": len(grid_points),
|
||||
"centroid_count": len(clustering_result["centroid_points"]),
|
||||
"metadata": {
|
||||
"estimated_area_sqm": round(area_sqm, 2),
|
||||
"optimal_k": clustering_result["optimal_k"],
|
||||
"inertia_curve": clustering_result["inertia_curve"],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def cluster_grid_points(grid_vectors: list[tuple[float, float]], polygon: list[GeoPoint]) -> dict:
|
||||
if not grid_vectors:
|
||||
return {
|
||||
"optimal_k": 0,
|
||||
"inertia_curve": [],
|
||||
"centroid_points": [],
|
||||
}
|
||||
|
||||
if len(grid_vectors) == 1:
|
||||
lat, lon = unproject_point(grid_vectors[0][0], grid_vectors[0][1], polygon)
|
||||
return {
|
||||
"optimal_k": 1,
|
||||
"inertia_curve": [{"k": 1, "sse": 0.0}],
|
||||
"centroid_points": [
|
||||
{
|
||||
"sub_block_code": "sub-block-1",
|
||||
"centroid_lat": quantize_coordinate(lat),
|
||||
"centroid_lon": quantize_coordinate(lon),
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
try:
|
||||
from sklearn.cluster import KMeans
|
||||
except ImportError as exc: # pragma: no cover - runtime dependency guard
|
||||
raise ImportError("scikit-learn برای اجرای subdivision لازم است.") from exc
|
||||
|
||||
max_k = min(MAX_K, len(grid_vectors))
|
||||
inertia_curve = []
|
||||
trained_models = {}
|
||||
for k in range(1, max_k + 1):
|
||||
model = KMeans(
|
||||
n_clusters=k,
|
||||
n_init=10,
|
||||
random_state=RANDOM_STATE,
|
||||
)
|
||||
model.fit(grid_vectors)
|
||||
trained_models[k] = model
|
||||
inertia_curve.append({"k": k, "sse": round(float(model.inertia_), 6)})
|
||||
|
||||
optimal_k = detect_elbow_point(inertia_curve)
|
||||
final_model = trained_models[optimal_k]
|
||||
centroid_points = []
|
||||
for index, center in enumerate(final_model.cluster_centers_, start=1):
|
||||
lat, lon = unproject_point(center[0], center[1], polygon)
|
||||
centroid_points.append(
|
||||
{
|
||||
"sub_block_code": f"sub-block-{index}",
|
||||
"centroid_lat": quantize_coordinate(lat),
|
||||
"centroid_lon": quantize_coordinate(lon),
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"optimal_k": optimal_k,
|
||||
"inertia_curve": inertia_curve,
|
||||
"centroid_points": centroid_points,
|
||||
}
|
||||
|
||||
|
||||
def detect_elbow_point(inertia_curve: list[dict]) -> int:
|
||||
if not inertia_curve:
|
||||
return 0
|
||||
if len(inertia_curve) <= 2:
|
||||
return inertia_curve[-1]["k"] if len(inertia_curve) == 2 else inertia_curve[0]["k"]
|
||||
|
||||
sses = [item["sse"] for item in inertia_curve]
|
||||
ks = [item["k"] for item in inertia_curve]
|
||||
slopes = [sses[index] - sses[index + 1] for index in range(len(sses) - 1)]
|
||||
|
||||
best_k = ks[0]
|
||||
best_change = float("-inf")
|
||||
for index in range(len(slopes) - 1):
|
||||
change = slopes[index] - slopes[index + 1]
|
||||
candidate_k = ks[index + 1]
|
||||
if change > best_change:
|
||||
best_change = change
|
||||
best_k = candidate_k
|
||||
return best_k
|
||||
|
||||
|
||||
def render_elbow_plot(
|
||||
inertia_curve: list[dict],
|
||||
optimal_k: int,
|
||||
block_code: str,
|
||||
) -> ContentFile | None:
|
||||
if not inertia_curve:
|
||||
return None
|
||||
|
||||
try:
|
||||
import matplotlib
|
||||
|
||||
matplotlib.use("Agg")
|
||||
import matplotlib.pyplot as plt
|
||||
except ImportError as exc: # pragma: no cover - runtime dependency guard
|
||||
raise ImportError("matplotlib برای ذخیره نمودار elbow لازم است.") from exc
|
||||
|
||||
ks = [item["k"] for item in inertia_curve]
|
||||
sses = [item["sse"] for item in inertia_curve]
|
||||
buffer = BytesIO()
|
||||
fig, ax = plt.subplots(figsize=(8, 5))
|
||||
try:
|
||||
ax.plot(ks, sses, marker="o", linewidth=2, color="#2f6fed")
|
||||
if optimal_k in ks:
|
||||
elbow_index = ks.index(optimal_k)
|
||||
ax.scatter(
|
||||
[ks[elbow_index]],
|
||||
[sses[elbow_index]],
|
||||
color="#d62828",
|
||||
s=90,
|
||||
zorder=3,
|
||||
label=f"Elbow K={optimal_k}",
|
||||
)
|
||||
ax.legend()
|
||||
ax.set_title(f"Elbow Plot - {block_code}")
|
||||
ax.set_xlabel("K")
|
||||
ax.set_ylabel("SSE / Inertia")
|
||||
ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.6)
|
||||
fig.tight_layout()
|
||||
fig.savefig(buffer, format="png", dpi=150)
|
||||
buffer.seek(0)
|
||||
return ContentFile(buffer.getvalue())
|
||||
finally:
|
||||
buffer.close()
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def sync_block_layout_with_subdivision(location, subdivision) -> None:
|
||||
layout = location.block_layout or {}
|
||||
blocks = list(layout.get("blocks") or [])
|
||||
target_block = None
|
||||
for block in blocks:
|
||||
if block.get("block_code") == subdivision.block_code:
|
||||
target_block = block
|
||||
break
|
||||
|
||||
if target_block is None:
|
||||
target_block = {
|
||||
"block_code": subdivision.block_code,
|
||||
"order": len(blocks) + 1,
|
||||
"source": "input",
|
||||
"needs_subdivision": None,
|
||||
"sub_blocks": [],
|
||||
}
|
||||
blocks.append(target_block)
|
||||
|
||||
target_block["needs_subdivision"] = subdivision.centroid_count > 1
|
||||
target_block["sub_blocks"] = list(subdivision.centroid_points or [])
|
||||
target_block["subdivision_summary"] = {
|
||||
"chunk_size_sqm": subdivision.chunk_size_sqm,
|
||||
"grid_point_count": subdivision.grid_point_count,
|
||||
"centroid_count": subdivision.centroid_count,
|
||||
"optimal_k": (subdivision.metadata or {}).get("optimal_k", subdivision.centroid_count),
|
||||
}
|
||||
layout["blocks"] = blocks
|
||||
layout["algorithm_status"] = "completed"
|
||||
location.block_layout = layout
|
||||
location.save(update_fields=["block_layout", "updated_at"])
|
||||
|
||||
|
||||
def generate_grid_points(
|
||||
polygon: list[GeoPoint],
|
||||
projected_polygon: list[tuple[float, float]],
|
||||
chunk_size_sqm: int,
|
||||
) -> tuple[list[dict], list[tuple[float, float]]]:
|
||||
step_m = math.sqrt(chunk_size_sqm)
|
||||
min_x, max_x, min_y, max_y = bounds(projected_polygon)
|
||||
grid_points: list[dict] = []
|
||||
grid_vectors: list[tuple[float, float]] = []
|
||||
|
||||
y = min_y + (step_m / 2.0)
|
||||
point_index = 0
|
||||
while y <= max_y:
|
||||
x = min_x + (step_m / 2.0)
|
||||
while x <= max_x:
|
||||
if point_in_polygon((x, y), projected_polygon):
|
||||
lat, lon = unproject_point(x, y, polygon)
|
||||
point_index += 1
|
||||
grid_vectors.append((x, y))
|
||||
grid_points.append(
|
||||
{
|
||||
"point_code": f"pt-{point_index}",
|
||||
"lat": quantize_coordinate(lat),
|
||||
"lon": quantize_coordinate(lon),
|
||||
}
|
||||
)
|
||||
x += step_m
|
||||
y += step_m
|
||||
return grid_points, grid_vectors
|
||||
|
||||
|
||||
def extract_polygon(boundary: dict | list) -> list[GeoPoint]:
|
||||
if isinstance(boundary, dict):
|
||||
if boundary.get("type") == "Polygon":
|
||||
coordinates = boundary.get("coordinates") or []
|
||||
if coordinates and isinstance(coordinates[0], list):
|
||||
points = coordinates[0]
|
||||
else:
|
||||
points = []
|
||||
else:
|
||||
points = boundary.get("corners") or []
|
||||
elif isinstance(boundary, list):
|
||||
points = boundary
|
||||
else:
|
||||
points = []
|
||||
|
||||
polygon: list[GeoPoint] = []
|
||||
for point in points:
|
||||
lat = lon = None
|
||||
if isinstance(point, dict):
|
||||
lat = point.get("lat", point.get("latitude"))
|
||||
lon = point.get("lon", point.get("longitude"))
|
||||
elif isinstance(point, (list, tuple)) and len(point) >= 2:
|
||||
lon, lat = point[0], point[1]
|
||||
|
||||
if lat is None or lon is None:
|
||||
continue
|
||||
polygon.append(GeoPoint(lat=float(lat), lon=float(lon)))
|
||||
|
||||
if len(polygon) > 1 and polygon[0] == polygon[-1]:
|
||||
polygon = polygon[:-1]
|
||||
return polygon
|
||||
|
||||
|
||||
def project_polygon_to_local_meters(polygon: list[GeoPoint]) -> list[tuple[float, float]]:
|
||||
origin = polygon[0]
|
||||
lat0 = math.radians(origin.lat)
|
||||
lon0 = math.radians(origin.lon)
|
||||
cos_lat0 = math.cos(lat0)
|
||||
projected = []
|
||||
for point in polygon:
|
||||
lat = math.radians(point.lat)
|
||||
lon = math.radians(point.lon)
|
||||
x = (lon - lon0) * cos_lat0 * EARTH_RADIUS_M
|
||||
y = (lat - lat0) * EARTH_RADIUS_M
|
||||
projected.append((x, y))
|
||||
return projected
|
||||
|
||||
|
||||
def unproject_point(x: float, y: float, polygon: list[GeoPoint]) -> tuple[float, float]:
|
||||
origin = polygon[0]
|
||||
lat0 = math.radians(origin.lat)
|
||||
lon0 = math.radians(origin.lon)
|
||||
cos_lat0 = math.cos(lat0)
|
||||
lat = math.degrees((y / EARTH_RADIUS_M) + lat0)
|
||||
lon = math.degrees((x / (EARTH_RADIUS_M * cos_lat0)) + lon0)
|
||||
return lat, lon
|
||||
|
||||
|
||||
def polygon_area(points: list[tuple[float, float]]) -> float:
|
||||
area = 0.0
|
||||
closed = points + [points[0]]
|
||||
for index in range(len(points)):
|
||||
x1, y1 = closed[index]
|
||||
x2, y2 = closed[index + 1]
|
||||
area += (x1 * y2) - (x2 * y1)
|
||||
return area / 2.0
|
||||
|
||||
|
||||
def bounds(points: list[tuple[float, float]]) -> tuple[float, float, float, float]:
|
||||
xs = [point[0] for point in points]
|
||||
ys = [point[1] for point in points]
|
||||
return min(xs), max(xs), min(ys), max(ys)
|
||||
|
||||
|
||||
def point_in_polygon(point: tuple[float, float], polygon: list[tuple[float, float]]) -> bool:
|
||||
x, y = point
|
||||
inside = False
|
||||
j = len(polygon) - 1
|
||||
for i in range(len(polygon)):
|
||||
xi, yi = polygon[i]
|
||||
xj, yj = polygon[j]
|
||||
intersects = ((yi > y) != (yj > y)) and (
|
||||
x < ((xj - xi) * (y - yi) / ((yj - yi) or 1e-12)) + xi
|
||||
)
|
||||
if intersects:
|
||||
inside = not inside
|
||||
j = i
|
||||
return inside
|
||||
|
||||
|
||||
def quantize_coordinate(value: float) -> float:
|
||||
return float(Decimal(str(value)).quantize(COORD_PRECISION, rounding=ROUND_HALF_UP))
|
||||
Reference in New Issue
Block a user