This commit is contained in:
2026-04-01 17:28:24 +03:30
parent c97b90bfaf
commit 69dd005355
9 changed files with 558 additions and 97 deletions
+155 -44
View File
@@ -1,11 +1,14 @@
import math
from copy import deepcopy
from decimal import Decimal
from datetime import timedelta
from django.conf import settings
from celery.result import AsyncResult
from kombu.exceptions import OperationalError
from django.db import transaction
from django.db.models import Prefetch
from django.utils import timezone
from sensor_hub.models import Sensor
from external_api_adapter.adapter import request as external_request
@@ -48,6 +51,12 @@ RULE_BASED_PRODUCTS = {
},
}
RULE_BASED_CROP_IDS = tuple(RULE_BASED_PRODUCTS.keys())
TASK_STATE_PENDING = "PENDING"
TASK_STATE_STARTED = "STARTED"
TASK_STATE_RETRY = "RETRY"
TASK_STATE_SUCCESS = "SUCCESS"
TASK_STATE_FAILURE = "FAILURE"
TASK_STATE_REVOKED = "REVOKED"
def get_default_cell_side_km():
@@ -70,6 +79,15 @@ def get_default_cell_side_km():
return DEFAULT_CELL_SIDE_KM
def get_task_stale_seconds():
raw_value = getattr(settings, "CROP_ZONE_TASK_STALE_SECONDS", 300)
try:
stale_seconds = int(raw_value)
except (TypeError, ValueError):
stale_seconds = 300
return max(stale_seconds, 0)
def get_cell_side_km(cell_side_km=None):
if cell_side_km is None or cell_side_km == "":
resolved_value = get_default_cell_side_km()
@@ -471,6 +489,42 @@ def build_initial_zone_payload(zone):
}
def build_area_zone_payload(zone):
recommendation = getattr(zone, "recommendation", None)
water_need_layer = getattr(zone, "water_need_layer", None)
soil_quality_layer = getattr(zone, "soil_quality_layer", None)
cultivation_risk_layer = getattr(zone, "cultivation_risk_layer", None)
return {
"zoneId": zone.zone_id,
"zoneUuid": str(zone.uuid),
"geometry": zone.geometry,
"center": zone.center,
"area_sqm": zone.area_sqm,
"area_hectares": zone.area_hectares,
"sequence": zone.sequence,
"processing_status": zone.processing_status,
"processing_error": zone.processing_error,
"crop": recommendation.product.product_id if recommendation else "",
"matchPercent": recommendation.match_percent if recommendation else 0,
"waterNeed": recommendation.water_need if recommendation else "",
"estimatedProfit": recommendation.estimated_profit if recommendation else "",
"waterNeedLayer": {
"level": getattr(water_need_layer, "level", ""),
"value": getattr(water_need_layer, "value", ""),
"color": getattr(water_need_layer, "color", ""),
},
"soilQualityLayer": {
"level": getattr(soil_quality_layer, "level", ""),
"score": getattr(soil_quality_layer, "score", 0),
"color": getattr(soil_quality_layer, "color", ""),
},
"cultivationRiskLayer": {
"level": getattr(cultivation_risk_layer, "level", ""),
"color": getattr(cultivation_risk_layer, "color", ""),
},
}
def persist_zone_analysis_metrics(zone, metrics):
ensure_products_exist()
product = CropProduct.objects.get(product_id=metrics["recommended_crop"])
@@ -674,24 +728,56 @@ def analyze_and_store_zone_soil_data(zone_id):
return zone
def dispatch_zone_processing_tasks(crop_area_id=None, zone_ids=None):
def _get_stale_zone_ids(zones):
completed_task_ids = {
zone.task_id
for zone in zones
if zone.processing_status == CropZone.STATUS_COMPLETED and zone.task_id
}
stale_before = timezone.now() - timedelta(seconds=get_task_stale_seconds())
stale_zone_ids = []
for zone in zones:
if zone.processing_status == CropZone.STATUS_COMPLETED or not zone.task_id:
continue
if zone.task_id in completed_task_ids:
stale_zone_ids.append(zone.id)
continue
if zone.updated_at > stale_before:
continue
try:
task_state = AsyncResult(zone.task_id).state
except Exception:
task_state = TASK_STATE_PENDING
if task_state in {
TASK_STATE_PENDING,
TASK_STATE_SUCCESS,
TASK_STATE_FAILURE,
TASK_STATE_REVOKED,
}:
stale_zone_ids.append(zone.id)
return stale_zone_ids
def dispatch_zone_processing_tasks(crop_area_id=None, zone_ids=None, force=False):
from .tasks import process_zone_soil_data
queryset = CropZone.objects.select_related("crop_area").all()
queryset = CropZone.objects.all()
if crop_area_id is not None:
queryset = queryset.filter(crop_area_id=crop_area_id)
if zone_ids is not None:
queryset = queryset.filter(id__in=zone_ids)
zones = list(queryset.only("id", "task_id", "processing_status", "crop_area__sensor_id"))
sensor_task_ids = {}
zones = list(queryset.only("id", "task_id", "processing_status").order_by("sequence", "id"))
for zone in zones:
sensor_id = zone.crop_area.sensor_id
existing_task_id = sensor_task_ids.get(sensor_id) or zone.task_id
if existing_task_id and zone.processing_status in {CropZone.STATUS_PENDING, CropZone.STATUS_PROCESSING}:
sensor_task_ids[sensor_id] = existing_task_id
if zone.task_id != existing_task_id:
CropZone.objects.filter(id=zone.id).update(task_id=existing_task_id)
if zone.processing_status == CropZone.STATUS_COMPLETED:
continue
if not force and zone.processing_status == CropZone.STATUS_PROCESSING and zone.task_id:
continue
if not force and zone.processing_status == CropZone.STATUS_PENDING and zone.task_id:
continue
try:
@@ -705,16 +791,12 @@ def dispatch_zone_processing_tasks(crop_area_id=None, zone_ids=None):
task_identifier = str(uuid.uuid4())
processing_error = f"Celery dispatch failed: {exc}"
update_fields = {"task_id": task_identifier}
if zone.processing_status == CropZone.STATUS_FAILED:
update_fields["processing_status"] = CropZone.STATUS_PENDING
if processing_error:
update_fields["processing_error"] = processing_error
elif zone.processing_status == CropZone.STATUS_FAILED:
update_fields["processing_error"] = ""
update_fields = {
"task_id": task_identifier,
"processing_status": CropZone.STATUS_PENDING,
}
update_fields["processing_error"] = processing_error
CropZone.objects.filter(id=zone.id).update(**update_fields)
if sensor_id and task_identifier:
sensor_task_ids[sensor_id] = task_identifier
def create_missing_zones_for_area(crop_area):
@@ -766,23 +848,17 @@ def ensure_latest_area_ready_for_processing(sensor_uuid, area_feature=None):
for zone in zones:
ensure_rule_based_zone_data(zone)
active_task_id = next((zone.task_id for zone in zones if zone.task_id and zone.processing_status in {CropZone.STATUS_PENDING, CropZone.STATUS_PROCESSING}), "")
zones_to_dispatch = []
for zone in zones:
if zone.processing_status == CropZone.STATUS_COMPLETED:
continue
if active_task_id:
if not zone.task_id:
CropZone.objects.filter(id=zone.id).update(task_id=active_task_id)
continue
if zone.processing_status == CropZone.STATUS_PROCESSING and zone.task_id:
active_task_id = zone.task_id
continue
if zone.processing_status == CropZone.STATUS_PENDING and zone.task_id:
active_task_id = zone.task_id
continue
zones_to_dispatch.append(zone.id)
stale_zone_ids = _get_stale_zone_ids(zones)
zones_to_dispatch = [
zone.id
for zone in zones
if zone.processing_status != CropZone.STATUS_COMPLETED
and zone.id not in stale_zone_ids
and not (zone.processing_status in {CropZone.STATUS_PENDING, CropZone.STATUS_PROCESSING} and zone.task_id)
]
if stale_zone_ids:
dispatch_zone_processing_tasks(zone_ids=stale_zone_ids, force=True)
if zones_to_dispatch:
dispatch_zone_processing_tasks(zone_ids=zones_to_dispatch)
@@ -847,12 +923,13 @@ def _zones_queryset(zone_ids=None):
def get_latest_area_payload(area=None):
area = area or CropArea.objects.order_by("-created_at", "-id").first()
if area:
zones = list(area.zones.only("zone_id", "task_id", "processing_status", "processing_error"))
total_zones = len(zones)
completed_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_COMPLETED)
processing_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_PROCESSING)
failed_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_FAILED)
pending_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_PENDING)
status_zones = list(area.zones.only("zone_id", "task_id", "processing_status", "processing_error"))
total_zones = len(status_zones)
completed_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_COMPLETED)
processing_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_PROCESSING)
failed_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_FAILED)
pending_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_PENDING)
zones = _zones_queryset().filter(crop_area=area)
if failed_zones:
task_status = "FAILURE"
@@ -863,27 +940,61 @@ def get_latest_area_payload(area=None):
else:
task_status = "PENDING"
current_stage = "waiting_to_start"
if failed_zones:
current_stage = "failed"
elif total_zones and completed_zones == total_zones:
current_stage = "completed"
elif processing_zones:
current_stage = "processing_zones"
elif pending_zones and completed_zones:
current_stage = "continuing_processing"
elif pending_zones:
current_stage = "queued"
progress_percent = 0
if total_zones:
progress_percent = round((completed_zones / total_zones) * 100, 2)
return {
"task": {
"status": task_status,
"stage": current_stage,
"stage_label": {
"waiting_to_start": "در انتظار شروع پردازش",
"queued": "تسک ساخته شده و در صف پردازش است",
"processing_zones": "در حال پردازش زون‌ها",
"continuing_processing": "بخشی از زون‌ها پردازش شده و بقیه در صف هستند",
"completed": "پردازش همه زون‌ها کامل شده است",
"failed": "پردازش بعضی زون‌ها با خطا مواجه شده است",
}[current_stage],
"area_uuid": str(area.uuid),
"total_zones": total_zones,
"completed_zones": completed_zones,
"processing_zones": processing_zones,
"pending_zones": pending_zones,
"failed_zones": failed_zones,
"task_ids": [zone.task_id for zone in zones if zone.task_id],
"remaining_zones": max(total_zones - completed_zones, 0),
"progress_percent": progress_percent,
"summary": {
"done": completed_zones,
"in_progress": processing_zones,
"remaining": pending_zones,
"failed": failed_zones,
},
"message": f"از مجموع {total_zones} زون، {completed_zones} زون پردازش شده، {processing_zones} زون در حال پردازش و {pending_zones} زون باقی مانده است.",
"failed_zone_errors": [
{
"zoneId": zone.zone_id,
"error": zone.processing_error,
}
for zone in zones
for zone in status_zones
if zone.processing_status == CropZone.STATUS_FAILED and zone.processing_error
],
"cell_side_km": round(math.sqrt(max(area.chunk_area_sqm, 1)) / 1000.0, 4),
},
"area": area.geometry,
"zones": [build_area_zone_payload(zone) for zone in zones],
}
return {
"task": {
@@ -894,11 +1005,11 @@ def get_latest_area_payload(area=None):
"processing_zones": 0,
"pending_zones": 0,
"failed_zones": 0,
"task_ids": [],
"failed_zone_errors": [],
"cell_side_km": round(get_default_cell_side_km(), 4),
},
"area": get_default_area_feature(),
"zones": [],
}