from __future__ import annotations from decimal import Decimal import math from django.conf import settings from django.db import transaction from .block_subdivision import ( GeoPoint, bounds, extract_polygon, point_in_polygon, project_polygon_to_local_meters, quantize_coordinate, unproject_point, ) from .models import AnalysisGridCell, BlockSubdivision, SoilLocation def create_or_get_analysis_grid_cells( location: SoilLocation, *, boundary: dict | list | None = None, block_code: str | None = None, block_subdivision: BlockSubdivision | None = None, chunk_size_sqm: int | None = None, ) -> dict: """ شبکه تحلیل 30x30 متری یا هر chunk size تنظیم‌شده را برای مزرعه/بلوک می‌سازد و رکوردهای AnalysisGridCell را به‌صورت idempotent ذخیره می‌کند. """ normalized_chunk_size = int( chunk_size_sqm or getattr(settings, "SUBDIVISION_CHUNK_SQM", 900) or 900 ) if normalized_chunk_size <= 0: raise ValueError("chunk_size_sqm باید بزرگ‌تر از صفر باشد.") resolved_block_code = str(block_code or getattr(block_subdivision, "block_code", "") or "").strip() resolved_boundary = _resolve_boundary( location=location, boundary=boundary, block_subdivision=block_subdivision, ) polygon = extract_polygon(resolved_boundary) if len(polygon) < 3: raise ValueError("برای ساخت analysis grid باید حداقل سه نقطه معتبر در boundary وجود داشته باشد.") existing_qs = AnalysisGridCell.objects.filter( soil_location=location, block_code=resolved_block_code, chunk_size_sqm=normalized_chunk_size, ).order_by("cell_code") existing_count = existing_qs.count() if existing_count: return { "created_count": 0, "existing_count": existing_count, "total_count": existing_count, "chunk_size_sqm": normalized_chunk_size, "block_code": resolved_block_code, "created": False, } cell_payloads = build_analysis_grid_payload( polygon=polygon, location=location, block_code=resolved_block_code, chunk_size_sqm=normalized_chunk_size, ) created_cells = [] with transaction.atomic(): for payload in cell_payloads: created_cells.append( AnalysisGridCell.objects.create( soil_location=location, block_subdivision=block_subdivision, block_code=resolved_block_code, cell_code=payload["cell_code"], chunk_size_sqm=normalized_chunk_size, geometry=payload["geometry"], centroid_lat=Decimal(str(payload["centroid_lat"])), centroid_lon=Decimal(str(payload["centroid_lon"])), ) ) _update_grid_summary_metadata( location=location, block_code=resolved_block_code, chunk_size_sqm=normalized_chunk_size, total_count=len(created_cells), block_subdivision=block_subdivision, ) return { "created_count": len(created_cells), "existing_count": 0, "total_count": len(created_cells), "chunk_size_sqm": normalized_chunk_size, "block_code": resolved_block_code, "created": True, } def build_analysis_grid_payload( *, polygon: list[GeoPoint], location: SoilLocation, block_code: str, chunk_size_sqm: int, ) -> list[dict]: projected_polygon = project_polygon_to_local_meters(polygon) step_m = math.sqrt(chunk_size_sqm) min_x, max_x, min_y, max_y = bounds(projected_polygon) payloads: list[dict] = [] row_index = 0 y = min_y while y < max_y: col_index = 0 x = min_x while x < max_x: cell_polygon = [ (x, y), (x + step_m, y), (x + step_m, y + step_m), (x, y + step_m), ] if _cell_intersects_polygon(cell_polygon, projected_polygon): payloads.append( _build_cell_payload( location=location, block_code=block_code, chunk_size_sqm=chunk_size_sqm, polygon=polygon, cell_polygon=cell_polygon, row_index=row_index, col_index=col_index, ) ) col_index += 1 x += step_m row_index += 1 y += step_m return payloads def _build_cell_payload( *, location: SoilLocation, block_code: str, chunk_size_sqm: int, polygon: list[GeoPoint], cell_polygon: list[tuple[float, float]], row_index: int, col_index: int, ) -> dict: closed_polygon = cell_polygon + [cell_polygon[0]] geometry_coordinates = [] for x, y in closed_polygon: lat, lon = unproject_point(x, y, polygon) geometry_coordinates.append( [quantize_coordinate(lon), quantize_coordinate(lat)] ) centroid_x = sum(point[0] for point in cell_polygon) / len(cell_polygon) centroid_y = sum(point[1] for point in cell_polygon) / len(cell_polygon) centroid_lat, centroid_lon = unproject_point(centroid_x, centroid_y, polygon) return { "cell_code": build_analysis_cell_code( location_id=location.id, block_code=block_code, chunk_size_sqm=chunk_size_sqm, row_index=row_index, col_index=col_index, ), "geometry": { "type": "Polygon", "coordinates": [geometry_coordinates], }, "centroid_lat": quantize_coordinate(centroid_lat), "centroid_lon": quantize_coordinate(centroid_lon), } def build_analysis_cell_code( *, location_id: int | None, block_code: str, chunk_size_sqm: int, row_index: int, col_index: int, ) -> str: block_segment = block_code or "farm" location_segment = location_id if location_id is not None else "new" return ( f"loc-{location_segment}__" f"block-{block_segment}__" f"chunk-{chunk_size_sqm}__" f"r{row_index:04d}c{col_index:04d}" ) def _resolve_boundary( *, location: SoilLocation, boundary: dict | list | None, block_subdivision: BlockSubdivision | None, ) -> dict | list: if boundary: return boundary if block_subdivision is not None and block_subdivision.source_boundary: return block_subdivision.source_boundary if location.farm_boundary: return location.farm_boundary raise ValueError("هیچ boundary معتبری برای ساخت analysis grid پیدا نشد.") def _cell_intersects_polygon( cell_polygon: list[tuple[float, float]], polygon: list[tuple[float, float]], ) -> bool: if any(point_in_polygon(point, polygon) for point in cell_polygon): return True for polygon_point in polygon: if _point_in_rect(polygon_point, cell_polygon): return True cell_edges = _polygon_edges(cell_polygon) polygon_edges = _polygon_edges(polygon) for edge_a in cell_edges: for edge_b in polygon_edges: if _segments_intersect(edge_a[0], edge_a[1], edge_b[0], edge_b[1]): return True return False def _point_in_rect(point: tuple[float, float], rect: list[tuple[float, float]]) -> bool: xs = [vertex[0] for vertex in rect] ys = [vertex[1] for vertex in rect] return min(xs) <= point[0] <= max(xs) and min(ys) <= point[1] <= max(ys) def _polygon_edges(points: list[tuple[float, float]]) -> list[tuple[tuple[float, float], tuple[float, float]]]: closed = points + [points[0]] return [ (closed[index], closed[index + 1]) for index in range(len(points)) ] def _segments_intersect( p1: tuple[float, float], p2: tuple[float, float], q1: tuple[float, float], q2: tuple[float, float], ) -> bool: o1 = _orientation(p1, p2, q1) o2 = _orientation(p1, p2, q2) o3 = _orientation(q1, q2, p1) o4 = _orientation(q1, q2, p2) if o1 != o2 and o3 != o4: return True if o1 == 0 and _on_segment(p1, q1, p2): return True if o2 == 0 and _on_segment(p1, q2, p2): return True if o3 == 0 and _on_segment(q1, p1, q2): return True if o4 == 0 and _on_segment(q1, p2, q2): return True return False def _orientation(a: tuple[float, float], b: tuple[float, float], c: tuple[float, float]) -> int: value = ((b[1] - a[1]) * (c[0] - b[0])) - ((b[0] - a[0]) * (c[1] - b[1])) if abs(value) < 1e-9: return 0 return 1 if value > 0 else 2 def _on_segment(a: tuple[float, float], b: tuple[float, float], c: tuple[float, float]) -> bool: return ( min(a[0], c[0]) <= b[0] <= max(a[0], c[0]) and min(a[1], c[1]) <= b[1] <= max(a[1], c[1]) ) def _update_grid_summary_metadata( *, location: SoilLocation, block_code: str, chunk_size_sqm: int, total_count: int, block_subdivision: BlockSubdivision | None, ) -> None: if block_subdivision is not None: metadata = dict(block_subdivision.metadata or {}) metadata["analysis_grid"] = { "chunk_size_sqm": chunk_size_sqm, "cell_count": total_count, } block_subdivision.metadata = metadata block_subdivision.save(update_fields=["metadata", "updated_at"]) layout = dict(location.block_layout or {}) blocks = list(layout.get("blocks") or []) for block in blocks: if block.get("block_code") == block_code: block["analysis_grid_summary"] = { "chunk_size_sqm": chunk_size_sqm, "cell_count": total_count, } break else: if not block_code: layout["analysis_grid_summary"] = { "chunk_size_sqm": chunk_size_sqm, "cell_count": total_count, } if blocks: layout["blocks"] = blocks location.block_layout = layout location.save(update_fields=["block_layout", "updated_at"])