Files

328 lines
10 KiB
Python
Raw Permalink Normal View History

2026-05-11 03:27:21 +03:30
from __future__ import annotations
from decimal import Decimal
import math
from django.conf import settings
from django.db import transaction
from .block_subdivision import (
GeoPoint,
bounds,
extract_polygon,
point_in_polygon,
project_polygon_to_local_meters,
quantize_coordinate,
unproject_point,
)
from .models import AnalysisGridCell, BlockSubdivision, SoilLocation
def create_or_get_analysis_grid_cells(
location: SoilLocation,
*,
boundary: dict | list | None = None,
block_code: str | None = None,
block_subdivision: BlockSubdivision | None = None,
chunk_size_sqm: int | None = None,
) -> dict:
"""
شبکه تحلیل 30x30 متری یا هر chunk size تنظیم‌شده را برای مزرعه/بلوک می‌سازد
و رکوردهای AnalysisGridCell را به‌صورت idempotent ذخیره می‌کند.
"""
normalized_chunk_size = int(
chunk_size_sqm or getattr(settings, "SUBDIVISION_CHUNK_SQM", 900) or 900
)
if normalized_chunk_size <= 0:
raise ValueError("chunk_size_sqm باید بزرگ‌تر از صفر باشد.")
resolved_block_code = str(block_code or getattr(block_subdivision, "block_code", "") or "").strip()
resolved_boundary = _resolve_boundary(
location=location,
boundary=boundary,
block_subdivision=block_subdivision,
)
polygon = extract_polygon(resolved_boundary)
if len(polygon) < 3:
raise ValueError("برای ساخت analysis grid باید حداقل سه نقطه معتبر در boundary وجود داشته باشد.")
existing_qs = AnalysisGridCell.objects.filter(
soil_location=location,
block_code=resolved_block_code,
chunk_size_sqm=normalized_chunk_size,
).order_by("cell_code")
existing_count = existing_qs.count()
if existing_count:
return {
"created_count": 0,
"existing_count": existing_count,
"total_count": existing_count,
"chunk_size_sqm": normalized_chunk_size,
"block_code": resolved_block_code,
"created": False,
}
cell_payloads = build_analysis_grid_payload(
polygon=polygon,
location=location,
block_code=resolved_block_code,
chunk_size_sqm=normalized_chunk_size,
)
created_cells = []
with transaction.atomic():
for payload in cell_payloads:
created_cells.append(
AnalysisGridCell.objects.create(
soil_location=location,
block_subdivision=block_subdivision,
block_code=resolved_block_code,
cell_code=payload["cell_code"],
chunk_size_sqm=normalized_chunk_size,
geometry=payload["geometry"],
centroid_lat=Decimal(str(payload["centroid_lat"])),
centroid_lon=Decimal(str(payload["centroid_lon"])),
)
)
_update_grid_summary_metadata(
location=location,
block_code=resolved_block_code,
chunk_size_sqm=normalized_chunk_size,
total_count=len(created_cells),
block_subdivision=block_subdivision,
)
return {
"created_count": len(created_cells),
"existing_count": 0,
"total_count": len(created_cells),
"chunk_size_sqm": normalized_chunk_size,
"block_code": resolved_block_code,
"created": True,
}
def build_analysis_grid_payload(
*,
polygon: list[GeoPoint],
location: SoilLocation,
block_code: str,
chunk_size_sqm: int,
) -> list[dict]:
projected_polygon = project_polygon_to_local_meters(polygon)
step_m = math.sqrt(chunk_size_sqm)
min_x, max_x, min_y, max_y = bounds(projected_polygon)
payloads: list[dict] = []
row_index = 0
y = min_y
while y < max_y:
col_index = 0
x = min_x
while x < max_x:
cell_polygon = [
(x, y),
(x + step_m, y),
(x + step_m, y + step_m),
(x, y + step_m),
]
if _cell_intersects_polygon(cell_polygon, projected_polygon):
payloads.append(
_build_cell_payload(
location=location,
block_code=block_code,
chunk_size_sqm=chunk_size_sqm,
polygon=polygon,
cell_polygon=cell_polygon,
row_index=row_index,
col_index=col_index,
)
)
col_index += 1
x += step_m
row_index += 1
y += step_m
return payloads
def _build_cell_payload(
*,
location: SoilLocation,
block_code: str,
chunk_size_sqm: int,
polygon: list[GeoPoint],
cell_polygon: list[tuple[float, float]],
row_index: int,
col_index: int,
) -> dict:
closed_polygon = cell_polygon + [cell_polygon[0]]
geometry_coordinates = []
for x, y in closed_polygon:
lat, lon = unproject_point(x, y, polygon)
geometry_coordinates.append(
[quantize_coordinate(lon), quantize_coordinate(lat)]
)
centroid_x = sum(point[0] for point in cell_polygon) / len(cell_polygon)
centroid_y = sum(point[1] for point in cell_polygon) / len(cell_polygon)
centroid_lat, centroid_lon = unproject_point(centroid_x, centroid_y, polygon)
return {
"cell_code": build_analysis_cell_code(
location_id=location.id,
block_code=block_code,
chunk_size_sqm=chunk_size_sqm,
row_index=row_index,
col_index=col_index,
),
"geometry": {
"type": "Polygon",
"coordinates": [geometry_coordinates],
},
"centroid_lat": quantize_coordinate(centroid_lat),
"centroid_lon": quantize_coordinate(centroid_lon),
}
def build_analysis_cell_code(
*,
location_id: int | None,
block_code: str,
chunk_size_sqm: int,
row_index: int,
col_index: int,
) -> str:
block_segment = block_code or "farm"
location_segment = location_id if location_id is not None else "new"
return (
f"loc-{location_segment}__"
f"block-{block_segment}__"
f"chunk-{chunk_size_sqm}__"
f"r{row_index:04d}c{col_index:04d}"
)
def _resolve_boundary(
*,
location: SoilLocation,
boundary: dict | list | None,
block_subdivision: BlockSubdivision | None,
) -> dict | list:
if boundary:
return boundary
if block_subdivision is not None and block_subdivision.source_boundary:
return block_subdivision.source_boundary
if location.farm_boundary:
return location.farm_boundary
raise ValueError("هیچ boundary معتبری برای ساخت analysis grid پیدا نشد.")
def _cell_intersects_polygon(
cell_polygon: list[tuple[float, float]],
polygon: list[tuple[float, float]],
) -> bool:
if any(point_in_polygon(point, polygon) for point in cell_polygon):
return True
for polygon_point in polygon:
if _point_in_rect(polygon_point, cell_polygon):
return True
cell_edges = _polygon_edges(cell_polygon)
polygon_edges = _polygon_edges(polygon)
for edge_a in cell_edges:
for edge_b in polygon_edges:
if _segments_intersect(edge_a[0], edge_a[1], edge_b[0], edge_b[1]):
return True
return False
def _point_in_rect(point: tuple[float, float], rect: list[tuple[float, float]]) -> bool:
xs = [vertex[0] for vertex in rect]
ys = [vertex[1] for vertex in rect]
return min(xs) <= point[0] <= max(xs) and min(ys) <= point[1] <= max(ys)
def _polygon_edges(points: list[tuple[float, float]]) -> list[tuple[tuple[float, float], tuple[float, float]]]:
closed = points + [points[0]]
return [
(closed[index], closed[index + 1])
for index in range(len(points))
]
def _segments_intersect(
p1: tuple[float, float],
p2: tuple[float, float],
q1: tuple[float, float],
q2: tuple[float, float],
) -> bool:
o1 = _orientation(p1, p2, q1)
o2 = _orientation(p1, p2, q2)
o3 = _orientation(q1, q2, p1)
o4 = _orientation(q1, q2, p2)
if o1 != o2 and o3 != o4:
return True
if o1 == 0 and _on_segment(p1, q1, p2):
return True
if o2 == 0 and _on_segment(p1, q2, p2):
return True
if o3 == 0 and _on_segment(q1, p1, q2):
return True
if o4 == 0 and _on_segment(q1, p2, q2):
return True
return False
def _orientation(a: tuple[float, float], b: tuple[float, float], c: tuple[float, float]) -> int:
value = ((b[1] - a[1]) * (c[0] - b[0])) - ((b[0] - a[0]) * (c[1] - b[1]))
if abs(value) < 1e-9:
return 0
return 1 if value > 0 else 2
def _on_segment(a: tuple[float, float], b: tuple[float, float], c: tuple[float, float]) -> bool:
return (
min(a[0], c[0]) <= b[0] <= max(a[0], c[0])
and min(a[1], c[1]) <= b[1] <= max(a[1], c[1])
)
def _update_grid_summary_metadata(
*,
location: SoilLocation,
block_code: str,
chunk_size_sqm: int,
total_count: int,
block_subdivision: BlockSubdivision | None,
) -> None:
if block_subdivision is not None:
metadata = dict(block_subdivision.metadata or {})
metadata["analysis_grid"] = {
"chunk_size_sqm": chunk_size_sqm,
"cell_count": total_count,
}
block_subdivision.metadata = metadata
block_subdivision.save(update_fields=["metadata", "updated_at"])
layout = dict(location.block_layout or {})
blocks = list(layout.get("blocks") or [])
for block in blocks:
if block.get("block_code") == block_code:
block["analysis_grid_summary"] = {
"chunk_size_sqm": chunk_size_sqm,
"cell_count": total_count,
}
break
else:
if not block_code:
layout["analysis_grid_summary"] = {
"chunk_size_sqm": chunk_size_sqm,
"cell_count": total_count,
}
if blocks:
layout["blocks"] = blocks
location.block_layout = layout
location.save(update_fields=["block_layout", "updated_at"])