328 lines
10 KiB
Python
328 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from decimal import Decimal
|
|
import math
|
|
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
|
|
from .block_subdivision import (
|
|
GeoPoint,
|
|
bounds,
|
|
extract_polygon,
|
|
point_in_polygon,
|
|
project_polygon_to_local_meters,
|
|
quantize_coordinate,
|
|
unproject_point,
|
|
)
|
|
from .models import AnalysisGridCell, BlockSubdivision, SoilLocation
|
|
|
|
|
|
def create_or_get_analysis_grid_cells(
|
|
location: SoilLocation,
|
|
*,
|
|
boundary: dict | list | None = None,
|
|
block_code: str | None = None,
|
|
block_subdivision: BlockSubdivision | None = None,
|
|
chunk_size_sqm: int | None = None,
|
|
) -> dict:
|
|
"""
|
|
شبکه تحلیل 30x30 متری یا هر chunk size تنظیمشده را برای مزرعه/بلوک میسازد
|
|
و رکوردهای AnalysisGridCell را بهصورت idempotent ذخیره میکند.
|
|
"""
|
|
normalized_chunk_size = int(
|
|
chunk_size_sqm or getattr(settings, "SUBDIVISION_CHUNK_SQM", 900) or 900
|
|
)
|
|
if normalized_chunk_size <= 0:
|
|
raise ValueError("chunk_size_sqm باید بزرگتر از صفر باشد.")
|
|
|
|
resolved_block_code = str(block_code or getattr(block_subdivision, "block_code", "") or "").strip()
|
|
resolved_boundary = _resolve_boundary(
|
|
location=location,
|
|
boundary=boundary,
|
|
block_subdivision=block_subdivision,
|
|
)
|
|
polygon = extract_polygon(resolved_boundary)
|
|
if len(polygon) < 3:
|
|
raise ValueError("برای ساخت analysis grid باید حداقل سه نقطه معتبر در boundary وجود داشته باشد.")
|
|
|
|
existing_qs = AnalysisGridCell.objects.filter(
|
|
soil_location=location,
|
|
block_code=resolved_block_code,
|
|
chunk_size_sqm=normalized_chunk_size,
|
|
).order_by("cell_code")
|
|
existing_count = existing_qs.count()
|
|
if existing_count:
|
|
return {
|
|
"created_count": 0,
|
|
"existing_count": existing_count,
|
|
"total_count": existing_count,
|
|
"chunk_size_sqm": normalized_chunk_size,
|
|
"block_code": resolved_block_code,
|
|
"created": False,
|
|
}
|
|
|
|
cell_payloads = build_analysis_grid_payload(
|
|
polygon=polygon,
|
|
location=location,
|
|
block_code=resolved_block_code,
|
|
chunk_size_sqm=normalized_chunk_size,
|
|
)
|
|
|
|
created_cells = []
|
|
with transaction.atomic():
|
|
for payload in cell_payloads:
|
|
created_cells.append(
|
|
AnalysisGridCell.objects.create(
|
|
soil_location=location,
|
|
block_subdivision=block_subdivision,
|
|
block_code=resolved_block_code,
|
|
cell_code=payload["cell_code"],
|
|
chunk_size_sqm=normalized_chunk_size,
|
|
geometry=payload["geometry"],
|
|
centroid_lat=Decimal(str(payload["centroid_lat"])),
|
|
centroid_lon=Decimal(str(payload["centroid_lon"])),
|
|
)
|
|
)
|
|
_update_grid_summary_metadata(
|
|
location=location,
|
|
block_code=resolved_block_code,
|
|
chunk_size_sqm=normalized_chunk_size,
|
|
total_count=len(created_cells),
|
|
block_subdivision=block_subdivision,
|
|
)
|
|
|
|
return {
|
|
"created_count": len(created_cells),
|
|
"existing_count": 0,
|
|
"total_count": len(created_cells),
|
|
"chunk_size_sqm": normalized_chunk_size,
|
|
"block_code": resolved_block_code,
|
|
"created": True,
|
|
}
|
|
|
|
|
|
def build_analysis_grid_payload(
|
|
*,
|
|
polygon: list[GeoPoint],
|
|
location: SoilLocation,
|
|
block_code: str,
|
|
chunk_size_sqm: int,
|
|
) -> list[dict]:
|
|
projected_polygon = project_polygon_to_local_meters(polygon)
|
|
step_m = math.sqrt(chunk_size_sqm)
|
|
min_x, max_x, min_y, max_y = bounds(projected_polygon)
|
|
|
|
payloads: list[dict] = []
|
|
row_index = 0
|
|
y = min_y
|
|
while y < max_y:
|
|
col_index = 0
|
|
x = min_x
|
|
while x < max_x:
|
|
cell_polygon = [
|
|
(x, y),
|
|
(x + step_m, y),
|
|
(x + step_m, y + step_m),
|
|
(x, y + step_m),
|
|
]
|
|
if _cell_intersects_polygon(cell_polygon, projected_polygon):
|
|
payloads.append(
|
|
_build_cell_payload(
|
|
location=location,
|
|
block_code=block_code,
|
|
chunk_size_sqm=chunk_size_sqm,
|
|
polygon=polygon,
|
|
cell_polygon=cell_polygon,
|
|
row_index=row_index,
|
|
col_index=col_index,
|
|
)
|
|
)
|
|
col_index += 1
|
|
x += step_m
|
|
row_index += 1
|
|
y += step_m
|
|
return payloads
|
|
|
|
|
|
def _build_cell_payload(
|
|
*,
|
|
location: SoilLocation,
|
|
block_code: str,
|
|
chunk_size_sqm: int,
|
|
polygon: list[GeoPoint],
|
|
cell_polygon: list[tuple[float, float]],
|
|
row_index: int,
|
|
col_index: int,
|
|
) -> dict:
|
|
closed_polygon = cell_polygon + [cell_polygon[0]]
|
|
geometry_coordinates = []
|
|
for x, y in closed_polygon:
|
|
lat, lon = unproject_point(x, y, polygon)
|
|
geometry_coordinates.append(
|
|
[quantize_coordinate(lon), quantize_coordinate(lat)]
|
|
)
|
|
|
|
centroid_x = sum(point[0] for point in cell_polygon) / len(cell_polygon)
|
|
centroid_y = sum(point[1] for point in cell_polygon) / len(cell_polygon)
|
|
centroid_lat, centroid_lon = unproject_point(centroid_x, centroid_y, polygon)
|
|
return {
|
|
"cell_code": build_analysis_cell_code(
|
|
location_id=location.id,
|
|
block_code=block_code,
|
|
chunk_size_sqm=chunk_size_sqm,
|
|
row_index=row_index,
|
|
col_index=col_index,
|
|
),
|
|
"geometry": {
|
|
"type": "Polygon",
|
|
"coordinates": [geometry_coordinates],
|
|
},
|
|
"centroid_lat": quantize_coordinate(centroid_lat),
|
|
"centroid_lon": quantize_coordinate(centroid_lon),
|
|
}
|
|
|
|
|
|
def build_analysis_cell_code(
|
|
*,
|
|
location_id: int | None,
|
|
block_code: str,
|
|
chunk_size_sqm: int,
|
|
row_index: int,
|
|
col_index: int,
|
|
) -> str:
|
|
block_segment = block_code or "farm"
|
|
location_segment = location_id if location_id is not None else "new"
|
|
return (
|
|
f"loc-{location_segment}__"
|
|
f"block-{block_segment}__"
|
|
f"chunk-{chunk_size_sqm}__"
|
|
f"r{row_index:04d}c{col_index:04d}"
|
|
)
|
|
|
|
|
|
def _resolve_boundary(
|
|
*,
|
|
location: SoilLocation,
|
|
boundary: dict | list | None,
|
|
block_subdivision: BlockSubdivision | None,
|
|
) -> dict | list:
|
|
if boundary:
|
|
return boundary
|
|
if block_subdivision is not None and block_subdivision.source_boundary:
|
|
return block_subdivision.source_boundary
|
|
if location.farm_boundary:
|
|
return location.farm_boundary
|
|
raise ValueError("هیچ boundary معتبری برای ساخت analysis grid پیدا نشد.")
|
|
|
|
|
|
def _cell_intersects_polygon(
|
|
cell_polygon: list[tuple[float, float]],
|
|
polygon: list[tuple[float, float]],
|
|
) -> bool:
|
|
if any(point_in_polygon(point, polygon) for point in cell_polygon):
|
|
return True
|
|
|
|
for polygon_point in polygon:
|
|
if _point_in_rect(polygon_point, cell_polygon):
|
|
return True
|
|
|
|
cell_edges = _polygon_edges(cell_polygon)
|
|
polygon_edges = _polygon_edges(polygon)
|
|
for edge_a in cell_edges:
|
|
for edge_b in polygon_edges:
|
|
if _segments_intersect(edge_a[0], edge_a[1], edge_b[0], edge_b[1]):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _point_in_rect(point: tuple[float, float], rect: list[tuple[float, float]]) -> bool:
|
|
xs = [vertex[0] for vertex in rect]
|
|
ys = [vertex[1] for vertex in rect]
|
|
return min(xs) <= point[0] <= max(xs) and min(ys) <= point[1] <= max(ys)
|
|
|
|
|
|
def _polygon_edges(points: list[tuple[float, float]]) -> list[tuple[tuple[float, float], tuple[float, float]]]:
|
|
closed = points + [points[0]]
|
|
return [
|
|
(closed[index], closed[index + 1])
|
|
for index in range(len(points))
|
|
]
|
|
|
|
|
|
def _segments_intersect(
|
|
p1: tuple[float, float],
|
|
p2: tuple[float, float],
|
|
q1: tuple[float, float],
|
|
q2: tuple[float, float],
|
|
) -> bool:
|
|
o1 = _orientation(p1, p2, q1)
|
|
o2 = _orientation(p1, p2, q2)
|
|
o3 = _orientation(q1, q2, p1)
|
|
o4 = _orientation(q1, q2, p2)
|
|
|
|
if o1 != o2 and o3 != o4:
|
|
return True
|
|
if o1 == 0 and _on_segment(p1, q1, p2):
|
|
return True
|
|
if o2 == 0 and _on_segment(p1, q2, p2):
|
|
return True
|
|
if o3 == 0 and _on_segment(q1, p1, q2):
|
|
return True
|
|
if o4 == 0 and _on_segment(q1, p2, q2):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _orientation(a: tuple[float, float], b: tuple[float, float], c: tuple[float, float]) -> int:
|
|
value = ((b[1] - a[1]) * (c[0] - b[0])) - ((b[0] - a[0]) * (c[1] - b[1]))
|
|
if abs(value) < 1e-9:
|
|
return 0
|
|
return 1 if value > 0 else 2
|
|
|
|
|
|
def _on_segment(a: tuple[float, float], b: tuple[float, float], c: tuple[float, float]) -> bool:
|
|
return (
|
|
min(a[0], c[0]) <= b[0] <= max(a[0], c[0])
|
|
and min(a[1], c[1]) <= b[1] <= max(a[1], c[1])
|
|
)
|
|
|
|
|
|
def _update_grid_summary_metadata(
|
|
*,
|
|
location: SoilLocation,
|
|
block_code: str,
|
|
chunk_size_sqm: int,
|
|
total_count: int,
|
|
block_subdivision: BlockSubdivision | None,
|
|
) -> None:
|
|
if block_subdivision is not None:
|
|
metadata = dict(block_subdivision.metadata or {})
|
|
metadata["analysis_grid"] = {
|
|
"chunk_size_sqm": chunk_size_sqm,
|
|
"cell_count": total_count,
|
|
}
|
|
block_subdivision.metadata = metadata
|
|
block_subdivision.save(update_fields=["metadata", "updated_at"])
|
|
|
|
layout = dict(location.block_layout or {})
|
|
blocks = list(layout.get("blocks") or [])
|
|
for block in blocks:
|
|
if block.get("block_code") == block_code:
|
|
block["analysis_grid_summary"] = {
|
|
"chunk_size_sqm": chunk_size_sqm,
|
|
"cell_count": total_count,
|
|
}
|
|
break
|
|
else:
|
|
if not block_code:
|
|
layout["analysis_grid_summary"] = {
|
|
"chunk_size_sqm": chunk_size_sqm,
|
|
"cell_count": total_count,
|
|
}
|
|
|
|
if blocks:
|
|
layout["blocks"] = blocks
|
|
location.block_layout = layout
|
|
location.save(update_fields=["block_layout", "updated_at"])
|