diff --git a/.env.example b/.env.example index 87dfb09..f29ff05 100644 --- a/.env.example +++ b/.env.example @@ -5,8 +5,8 @@ ALLOWED_HOSTS=node.crop-logic.ir,crop-logic.ir,localhost,127.0.0.1,0.0.0.0 # Database (MySQL) DB_ENGINE=django.db.backends.mysql -DB_NAME=croplogic -DB_USER=croplogic +DB_NAME=backend +DB_USER=backend DB_PASSWORD=changeme DB_HOST=db DB_PORT=3306 @@ -28,3 +28,10 @@ SENSOR_HUB_SERVICE_BASE_URL=https://sensor-hub.example.com SENSOR_HUB_SERVICE_API_KEY= CROP_ZONE_CHUNK_AREA_SQM=10000 +CROP_ZONE_TASK_STALE_SECONDS=300 + +CELERY_BROKER_URL=redis://redis:6379/0 +CELERY_RESULT_BACKEND=redis://redis:6379/0 +CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP=true +QDRANT_HOST=qdrant +QDRANT_PORT=6333 diff --git a/config/settings.py b/config/settings.py index 6ba4a3f..25191fe 100644 --- a/config/settings.py +++ b/config/settings.py @@ -161,11 +161,13 @@ SIMPLE_JWT = { } CROP_ZONE_CHUNK_AREA_SQM = float(os.getenv("CROP_ZONE_CHUNK_AREA_SQM", "10000")) +CROP_ZONE_TASK_STALE_SECONDS = int(os.getenv("CROP_ZONE_TASK_STALE_SECONDS", "300")) -CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0") +CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0") CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", CELERY_BROKER_URL) CELERY_TASK_DEFAULT_QUEUE = os.getenv("CELERY_TASK_DEFAULT_QUEUE", "default") CELERY_TASK_ACKS_LATE = True CELERY_WORKER_PREFETCH_MULTIPLIER = int(os.getenv("CELERY_WORKER_PREFETCH_MULTIPLIER", "1")) CELERY_TASK_TIME_LIMIT = int(os.getenv("CELERY_TASK_TIME_LIMIT", "120")) CELERY_TASK_SOFT_TIME_LIMIT = int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "90")) +CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = os.getenv("CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP", "true").lower() == "true" diff --git a/crop_zoning/CROP_ZONING_FRONTEND_API.md b/crop_zoning/CROP_ZONING_FRONTEND_API.md new file mode 100644 index 0000000..4d726ce --- /dev/null +++ b/crop_zoning/CROP_ZONING_FRONTEND_API.md @@ -0,0 +1,229 @@ +# Crop Zoning API Guide For Frontend + +این فایل برای تیم فرانت نوشته شده و رفتار endpointهای ماژول `crop-zoning` را به صورت کاربردی توضیح می‌دهد. + +## Base Path + +```text +/api/crop-zoning/ +``` + +## Authentication + +- همه endpointها با تنظیم فعلی پروژه نیاز به احراز هویت دارند. +- هدر پیشنهادی: + +```http +Authorization: Bearer +Content-Type: application/json +``` + +## Flow پیشنهادی فرانت + +1. ابتدا `GET /area/` را با `sensor_uuid` صدا بزنید. +2. اگر `task.status` برابر `PENDING` یا `PROCESSING` بود، polling انجام دهید. +3. وقتی `task.status` برابر `SUCCESS` شد: + - `area` را برای polygon اصلی زمین استفاده کنید. + - `zones` را برای grid map و کارت‌های overview استفاده کنید. +4. برای legend محصولات، `GET /products/` را بزنید. + +## وضعیت‌های Task + +- `IDLE`: هنوز area/taskی برای سنسور وجود ندارد. +- `PENDING`: تسک ساخته شده ولی پردازش هنوز شروع نشده یا در صف است. +- `PROCESSING`: بخشی از زون‌ها در حال پردازش هستند یا برخی کامل شده‌اند. +- `SUCCESS`: همه زون‌ها کامل پردازش شده‌اند. +- `FAILURE`: یک یا چند زون با خطا مواجه شده‌اند. + +## Stageهای Task + +- `waiting_to_start` +- `queued` +- `processing_zones` +- `continuing_processing` +- `completed` +- `failed` + +فیلد `stage_label` متن فارسی آماده برای نمایش در UI است. + +--- + +## 1) Get Area + +```http +GET /api/crop-zoning/area/?sensor_uuid= +``` + +### Query Params + +- `sensor_uuid`: اجباری، UUID سنسور + +### کاربرد + +- گرفتن آخرین area مربوط به سنسور +- ساخت area و zoneها در صورت نبود داده +- دریافت وضعیت task +- دریافت لیست کامل `zones` برای نمایش روی نقشه + +### نمونه پاسخ موفق + +```json +{ + "status": "success", + "data": { + "task": { + "status": "SUCCESS", + "stage": "completed", + "stage_label": "پردازش همه زون‌ها کامل شده است", + "area_uuid": "c0eaa4d7-92bf-4542-a60d-6010b45e7c96", + "total_zones": 364, + "completed_zones": 364, + "processing_zones": 0, + "pending_zones": 0, + "failed_zones": 0, + "remaining_zones": 0, + "progress_percent": 100, + "summary": { + "done": 364, + "in_progress": 0, + "remaining": 0, + "failed": 0 + }, + "message": "از مجموع 364 زون، 364 زون پردازش شده، 0 زون در حال پردازش و 0 زون باقی مانده است.", + "failed_zone_errors": [], + "cell_side_km": 0.1 + }, + "area": { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [[[51.418934, 35.706815], [51.423054, 35.691062], [51.384258, 35.689389], [51.418934, 35.706815]]] + }, + "properties": { + "center": { + "latitude": 35.69575533, + "longitude": 51.40874867 + }, + "area_sqm": 3109868.97, + "cell_side_km": 0.1, + "area_hectares": 310.9869 + } + }, + "zones": [ + { + "zoneId": "zone-0", + "zoneUuid": "d7a9a19b-b3ec-4721-b514-9aae5c9ea940", + "geometry": { + "type": "Polygon", + "coordinates": [[[51.384258, 35.689389], [51.38536404, 35.689389], [51.38536404, 35.69028731], [51.384258, 35.69028731], [51.384258, 35.689389]]] + }, + "center": { + "latitude": 35.68983816, + "longitude": 51.38481102 + }, + "area_sqm": 9999.91, + "area_hectares": 1, + "sequence": 0, + "processing_status": "completed", + "processing_error": "", + "crop": "wheat", + "matchPercent": 89, + "waterNeed": "4820-5820 m³/ha", + "estimatedProfit": "۱۵-۲۵ میلیون/هکتار", + "waterNeedLayer": { + "level": "medium", + "value": "4820-5820 m³/ha", + "color": "#0ea5e9" + }, + "soilQualityLayer": { + "level": "high", + "score": 89, + "color": "#22c55e" + }, + "cultivationRiskLayer": { + "level": "low", + "color": "#22c55e" + } + } + ] + } +} +``` + +### فیلدهای مهم `zones` + +- `zoneId`: شناسه نمایشی زون، مثل `zone-0` +- `zoneUuid`: UUID داخلی زون +- `geometry`: polygon زون +- `center`: مرکز زون +- `area_sqm`: مساحت به متر مربع +- `area_hectares`: مساحت به هکتار +- `sequence`: ترتیب زون +- `processing_status`: یکی از `pending`, `processing`, `completed`, `failed` +- `processing_error`: متن خطا در صورت failure +- `crop`: محصول پیشنهادی +- `matchPercent`: درصد تطابق +- `waterNeed`: نیاز آبی پیشنهادی +- `estimatedProfit`: سود تخمینی +- `waterNeedLayer`: داده layer نیاز آبی +- `soilQualityLayer`: داده layer کیفیت خاک +- `cultivationRiskLayer`: داده layer ریسک کشت + +### خطاها + +#### وقتی `sensor_uuid` ارسال نشود + +```json +{ + "status": "error", + "message": "sensor_uuid is required." +} +``` + +#### وقتی سنسور پیدا نشود + +```json +{ + "status": "error", + "message": "Sensor not found." +} +``` + +--- + +## 2) Get Products + +```http +GET /api/crop-zoning/products/ +``` + +### کاربرد + +- گرفتن لیست محصولات برای legend و labelها + +### نمونه پاسخ + +```json +{ + "status": "success", + "data": { + "products": [ + { + "id": "wheat", + "label": "گندم", + "color": "#6bcb77" + }, + { + "id": "canola", + "label": "کلزا", + "color": "#ffd93d" + }, + { + "id": "saffron", + "label": "زعفران", + "color": "#9b59b6" + } + ] + } +} +``` diff --git a/crop_zoning/services.py b/crop_zoning/services.py index 4339e50..7d3484f 100644 --- a/crop_zoning/services.py +++ b/crop_zoning/services.py @@ -1,11 +1,14 @@ import math from copy import deepcopy from decimal import Decimal +from datetime import timedelta from django.conf import settings +from celery.result import AsyncResult from kombu.exceptions import OperationalError from django.db import transaction from django.db.models import Prefetch +from django.utils import timezone from sensor_hub.models import Sensor from external_api_adapter.adapter import request as external_request @@ -48,6 +51,12 @@ RULE_BASED_PRODUCTS = { }, } RULE_BASED_CROP_IDS = tuple(RULE_BASED_PRODUCTS.keys()) +TASK_STATE_PENDING = "PENDING" +TASK_STATE_STARTED = "STARTED" +TASK_STATE_RETRY = "RETRY" +TASK_STATE_SUCCESS = "SUCCESS" +TASK_STATE_FAILURE = "FAILURE" +TASK_STATE_REVOKED = "REVOKED" def get_default_cell_side_km(): @@ -70,6 +79,15 @@ def get_default_cell_side_km(): return DEFAULT_CELL_SIDE_KM +def get_task_stale_seconds(): + raw_value = getattr(settings, "CROP_ZONE_TASK_STALE_SECONDS", 300) + try: + stale_seconds = int(raw_value) + except (TypeError, ValueError): + stale_seconds = 300 + return max(stale_seconds, 0) + + def get_cell_side_km(cell_side_km=None): if cell_side_km is None or cell_side_km == "": resolved_value = get_default_cell_side_km() @@ -471,6 +489,42 @@ def build_initial_zone_payload(zone): } +def build_area_zone_payload(zone): + recommendation = getattr(zone, "recommendation", None) + water_need_layer = getattr(zone, "water_need_layer", None) + soil_quality_layer = getattr(zone, "soil_quality_layer", None) + cultivation_risk_layer = getattr(zone, "cultivation_risk_layer", None) + return { + "zoneId": zone.zone_id, + "zoneUuid": str(zone.uuid), + "geometry": zone.geometry, + "center": zone.center, + "area_sqm": zone.area_sqm, + "area_hectares": zone.area_hectares, + "sequence": zone.sequence, + "processing_status": zone.processing_status, + "processing_error": zone.processing_error, + "crop": recommendation.product.product_id if recommendation else "", + "matchPercent": recommendation.match_percent if recommendation else 0, + "waterNeed": recommendation.water_need if recommendation else "", + "estimatedProfit": recommendation.estimated_profit if recommendation else "", + "waterNeedLayer": { + "level": getattr(water_need_layer, "level", ""), + "value": getattr(water_need_layer, "value", ""), + "color": getattr(water_need_layer, "color", ""), + }, + "soilQualityLayer": { + "level": getattr(soil_quality_layer, "level", ""), + "score": getattr(soil_quality_layer, "score", 0), + "color": getattr(soil_quality_layer, "color", ""), + }, + "cultivationRiskLayer": { + "level": getattr(cultivation_risk_layer, "level", ""), + "color": getattr(cultivation_risk_layer, "color", ""), + }, + } + + def persist_zone_analysis_metrics(zone, metrics): ensure_products_exist() product = CropProduct.objects.get(product_id=metrics["recommended_crop"]) @@ -674,24 +728,56 @@ def analyze_and_store_zone_soil_data(zone_id): return zone -def dispatch_zone_processing_tasks(crop_area_id=None, zone_ids=None): +def _get_stale_zone_ids(zones): + completed_task_ids = { + zone.task_id + for zone in zones + if zone.processing_status == CropZone.STATUS_COMPLETED and zone.task_id + } + stale_before = timezone.now() - timedelta(seconds=get_task_stale_seconds()) + stale_zone_ids = [] + + for zone in zones: + if zone.processing_status == CropZone.STATUS_COMPLETED or not zone.task_id: + continue + if zone.task_id in completed_task_ids: + stale_zone_ids.append(zone.id) + continue + if zone.updated_at > stale_before: + continue + + try: + task_state = AsyncResult(zone.task_id).state + except Exception: + task_state = TASK_STATE_PENDING + + if task_state in { + TASK_STATE_PENDING, + TASK_STATE_SUCCESS, + TASK_STATE_FAILURE, + TASK_STATE_REVOKED, + }: + stale_zone_ids.append(zone.id) + + return stale_zone_ids + + +def dispatch_zone_processing_tasks(crop_area_id=None, zone_ids=None, force=False): from .tasks import process_zone_soil_data - queryset = CropZone.objects.select_related("crop_area").all() + queryset = CropZone.objects.all() if crop_area_id is not None: queryset = queryset.filter(crop_area_id=crop_area_id) if zone_ids is not None: queryset = queryset.filter(id__in=zone_ids) - zones = list(queryset.only("id", "task_id", "processing_status", "crop_area__sensor_id")) - sensor_task_ids = {} + zones = list(queryset.only("id", "task_id", "processing_status").order_by("sequence", "id")) for zone in zones: - sensor_id = zone.crop_area.sensor_id - existing_task_id = sensor_task_ids.get(sensor_id) or zone.task_id - if existing_task_id and zone.processing_status in {CropZone.STATUS_PENDING, CropZone.STATUS_PROCESSING}: - sensor_task_ids[sensor_id] = existing_task_id - if zone.task_id != existing_task_id: - CropZone.objects.filter(id=zone.id).update(task_id=existing_task_id) + if zone.processing_status == CropZone.STATUS_COMPLETED: + continue + if not force and zone.processing_status == CropZone.STATUS_PROCESSING and zone.task_id: + continue + if not force and zone.processing_status == CropZone.STATUS_PENDING and zone.task_id: continue try: @@ -705,16 +791,12 @@ def dispatch_zone_processing_tasks(crop_area_id=None, zone_ids=None): task_identifier = str(uuid.uuid4()) processing_error = f"Celery dispatch failed: {exc}" - update_fields = {"task_id": task_identifier} - if zone.processing_status == CropZone.STATUS_FAILED: - update_fields["processing_status"] = CropZone.STATUS_PENDING - if processing_error: - update_fields["processing_error"] = processing_error - elif zone.processing_status == CropZone.STATUS_FAILED: - update_fields["processing_error"] = "" + update_fields = { + "task_id": task_identifier, + "processing_status": CropZone.STATUS_PENDING, + } + update_fields["processing_error"] = processing_error CropZone.objects.filter(id=zone.id).update(**update_fields) - if sensor_id and task_identifier: - sensor_task_ids[sensor_id] = task_identifier def create_missing_zones_for_area(crop_area): @@ -766,23 +848,17 @@ def ensure_latest_area_ready_for_processing(sensor_uuid, area_feature=None): for zone in zones: ensure_rule_based_zone_data(zone) - active_task_id = next((zone.task_id for zone in zones if zone.task_id and zone.processing_status in {CropZone.STATUS_PENDING, CropZone.STATUS_PROCESSING}), "") - zones_to_dispatch = [] - for zone in zones: - if zone.processing_status == CropZone.STATUS_COMPLETED: - continue - if active_task_id: - if not zone.task_id: - CropZone.objects.filter(id=zone.id).update(task_id=active_task_id) - continue - if zone.processing_status == CropZone.STATUS_PROCESSING and zone.task_id: - active_task_id = zone.task_id - continue - if zone.processing_status == CropZone.STATUS_PENDING and zone.task_id: - active_task_id = zone.task_id - continue - zones_to_dispatch.append(zone.id) + stale_zone_ids = _get_stale_zone_ids(zones) + zones_to_dispatch = [ + zone.id + for zone in zones + if zone.processing_status != CropZone.STATUS_COMPLETED + and zone.id not in stale_zone_ids + and not (zone.processing_status in {CropZone.STATUS_PENDING, CropZone.STATUS_PROCESSING} and zone.task_id) + ] + if stale_zone_ids: + dispatch_zone_processing_tasks(zone_ids=stale_zone_ids, force=True) if zones_to_dispatch: dispatch_zone_processing_tasks(zone_ids=zones_to_dispatch) @@ -847,12 +923,13 @@ def _zones_queryset(zone_ids=None): def get_latest_area_payload(area=None): area = area or CropArea.objects.order_by("-created_at", "-id").first() if area: - zones = list(area.zones.only("zone_id", "task_id", "processing_status", "processing_error")) - total_zones = len(zones) - completed_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_COMPLETED) - processing_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_PROCESSING) - failed_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_FAILED) - pending_zones = sum(1 for zone in zones if zone.processing_status == CropZone.STATUS_PENDING) + status_zones = list(area.zones.only("zone_id", "task_id", "processing_status", "processing_error")) + total_zones = len(status_zones) + completed_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_COMPLETED) + processing_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_PROCESSING) + failed_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_FAILED) + pending_zones = sum(1 for zone in status_zones if zone.processing_status == CropZone.STATUS_PENDING) + zones = _zones_queryset().filter(crop_area=area) if failed_zones: task_status = "FAILURE" @@ -863,27 +940,61 @@ def get_latest_area_payload(area=None): else: task_status = "PENDING" + current_stage = "waiting_to_start" + if failed_zones: + current_stage = "failed" + elif total_zones and completed_zones == total_zones: + current_stage = "completed" + elif processing_zones: + current_stage = "processing_zones" + elif pending_zones and completed_zones: + current_stage = "continuing_processing" + elif pending_zones: + current_stage = "queued" + + progress_percent = 0 + if total_zones: + progress_percent = round((completed_zones / total_zones) * 100, 2) + return { "task": { "status": task_status, + "stage": current_stage, + "stage_label": { + "waiting_to_start": "در انتظار شروع پردازش", + "queued": "تسک ساخته شده و در صف پردازش است", + "processing_zones": "در حال پردازش زون‌ها", + "continuing_processing": "بخشی از زون‌ها پردازش شده و بقیه در صف هستند", + "completed": "پردازش همه زون‌ها کامل شده است", + "failed": "پردازش بعضی زون‌ها با خطا مواجه شده است", + }[current_stage], "area_uuid": str(area.uuid), "total_zones": total_zones, "completed_zones": completed_zones, "processing_zones": processing_zones, "pending_zones": pending_zones, "failed_zones": failed_zones, - "task_ids": [zone.task_id for zone in zones if zone.task_id], + "remaining_zones": max(total_zones - completed_zones, 0), + "progress_percent": progress_percent, + "summary": { + "done": completed_zones, + "in_progress": processing_zones, + "remaining": pending_zones, + "failed": failed_zones, + }, + "message": f"از مجموع {total_zones} زون، {completed_zones} زون پردازش شده، {processing_zones} زون در حال پردازش و {pending_zones} زون باقی مانده است.", "failed_zone_errors": [ { "zoneId": zone.zone_id, "error": zone.processing_error, } - for zone in zones + for zone in status_zones if zone.processing_status == CropZone.STATUS_FAILED and zone.processing_error ], "cell_side_km": round(math.sqrt(max(area.chunk_area_sqm, 1)) / 1000.0, 4), }, "area": area.geometry, + "zones": [build_area_zone_payload(zone) for zone in zones], } return { "task": { @@ -894,11 +1005,11 @@ def get_latest_area_payload(area=None): "processing_zones": 0, "pending_zones": 0, "failed_zones": 0, - "task_ids": [], "failed_zone_errors": [], "cell_side_km": round(get_default_cell_side_km(), 4), }, "area": get_default_area_feature(), + "zones": [], } diff --git a/crop_zoning/tests.py b/crop_zoning/tests.py index a9856c7..d80b0e8 100644 --- a/crop_zoning/tests.py +++ b/crop_zoning/tests.py @@ -4,7 +4,9 @@ from kombu.exceptions import OperationalError from django.contrib.auth import get_user_model from django.test import TestCase, override_settings +from django.utils import timezone from rest_framework.test import APIRequestFactory +from datetime import timedelta from crop_zoning.models import CropArea, CropZone from crop_zoning.views import AreaView, ZonesInitialView @@ -126,6 +128,9 @@ class AreaViewTests(TestCase): self.assertEqual(response.data["data"]["task"]["status"], "PROCESSING") self.assertEqual(response.data["data"]["task"]["total_zones"], 2) self.assertEqual(response.data["data"]["area"], AREA_GEOJSON) + self.assertEqual(len(response.data["data"]["zones"]), 2) + self.assertEqual(response.data["data"]["zones"][0]["zoneId"], "zone-0") + self.assertIn("processing_status", response.data["data"]["zones"][0]) def test_get_returns_area_when_all_tasks_complete(self): crop_area = self._create_area() @@ -148,6 +153,10 @@ class AreaViewTests(TestCase): self.assertEqual(response.status_code, 200) self.assertEqual(response.data["data"]["task"]["status"], "SUCCESS") self.assertEqual(response.data["data"]["area"], AREA_GEOJSON) + self.assertEqual(len(response.data["data"]["zones"]), 2) + self.assertEqual(response.data["data"]["zones"][1]["zoneId"], "zone-1") + self.assertIn("crop", response.data["data"]["zones"][0]) + self.assertIn("waterNeedLayer", response.data["data"]["zones"][0]) @patch("crop_zoning.services.dispatch_zone_processing_tasks") def test_get_dispatches_zone_task_when_task_id_is_missing(self, mock_dispatch): @@ -183,7 +192,7 @@ class AreaViewTests(TestCase): self.assertEqual(mock_create.call_args.kwargs["sensor"], self.sensor) @patch("crop_zoning.tasks.process_zone_soil_data.delay") - def test_only_one_active_task_is_created_per_sensor(self, mock_delay): + def test_each_zone_gets_its_own_task(self, mock_delay): crop_area = self._create_area() zone0 = CropZone.objects.create( crop_area=crop_area, @@ -211,18 +220,19 @@ class AreaViewTests(TestCase): ) class Result: - id = "shared-task-id" + def __init__(self, task_id): + self.id = task_id - mock_delay.return_value = Result() + mock_delay.side_effect = [Result("task-zone-0"), Result("task-zone-1")] response = AreaView.as_view()(self._request()) self.assertEqual(response.status_code, 200) - self.assertEqual(mock_delay.call_count, 1) + self.assertEqual(mock_delay.call_count, 2) zone0.refresh_from_db() zone1.refresh_from_db() - self.assertEqual(zone0.task_id, "shared-task-id") - self.assertEqual(zone1.task_id, "shared-task-id") + self.assertEqual(zone0.task_id, "task-zone-0") + self.assertEqual(zone1.task_id, "task-zone-1") @patch("crop_zoning.tasks.process_zone_soil_data.delay", side_effect=OperationalError("redis down")) def test_get_generates_local_task_id_when_broker_is_unavailable(self, mock_delay): @@ -245,7 +255,8 @@ class AreaViewTests(TestCase): self.assertEqual(response.status_code, 200) zone.refresh_from_db() self.assertTrue(zone.task_id) - self.assertEqual(response.data["data"]["task"]["task_ids"], [zone.task_id]) + self.assertEqual(response.data["data"]["task"]["summary"]["remaining"], 1) + self.assertEqual(response.data["data"]["task"]["remaining_zones"], 1) self.assertEqual(response.data["data"]["task"]["status"], "PENDING") self.assertIn("Celery broker unavailable", zone.processing_error) @@ -274,12 +285,56 @@ class AreaViewTests(TestCase): self.assertEqual(first_response.status_code, 200) zone.refresh_from_db() self.assertEqual(zone.task_id, "persisted-task-id") - self.assertEqual(first_response.data["data"]["task"]["task_ids"], ["persisted-task-id"]) + self.assertEqual(first_response.data["data"]["task"]["summary"]["done"], 0) + self.assertEqual(first_response.data["data"]["task"]["summary"]["remaining"], 1) self.assertEqual(mock_delay.call_count, 1) second_response = AreaView.as_view()(self._request()) self.assertEqual(second_response.status_code, 200) - self.assertEqual(second_response.data["data"]["task"]["task_ids"], ["persisted-task-id"]) + self.assertEqual(second_response.data["data"]["task"]["summary"]["remaining"], 1) self.assertEqual(second_response.data["data"]["task"]["status"], "PENDING") self.assertEqual(mock_delay.call_count, 1) + @patch("crop_zoning.services.AsyncResult") + @patch("crop_zoning.tasks.process_zone_soil_data.delay") + def test_get_redispatches_pending_zone_when_shared_task_already_completed(self, mock_delay, mock_async_result): + crop_area = self._create_area() + CropZone.objects.create( + crop_area=crop_area, + zone_id="zone-0", + geometry=AREA_GEOJSON["geometry"], + points=AREA_GEOJSON["geometry"]["coordinates"][0][:-1], + center={"longitude": 51.4087, "latitude": 35.6957}, + area_sqm=150000, + area_hectares=15, + sequence=0, + processing_status=CropZone.STATUS_COMPLETED, + task_id="legacy-shared-task-id", + ) + stale_zone = CropZone.objects.create( + crop_area=crop_area, + zone_id="zone-1", + geometry=AREA_GEOJSON["geometry"], + points=AREA_GEOJSON["geometry"]["coordinates"][0][:-1], + center={"longitude": 51.4088, "latitude": 35.6958}, + area_sqm=150000, + area_hectares=15, + sequence=1, + processing_status=CropZone.STATUS_PENDING, + task_id="legacy-shared-task-id", + ) + stale_zone.updated_at = timezone.now() - timedelta(minutes=10) + stale_zone.save(update_fields=["updated_at"]) + + class Result: + id = "requeued-zone-1" + + mock_delay.return_value = Result() + mock_async_result.return_value.state = "SUCCESS" + + response = AreaView.as_view()(self._request()) + + self.assertEqual(response.status_code, 200) + self.assertEqual(mock_delay.call_count, 1) + stale_zone.refresh_from_db() + self.assertEqual(stale_zone.task_id, "requeued-zone-1") diff --git a/crop_zoning/urls.py b/crop_zoning/urls.py index 75983c5..9234db9 100644 --- a/crop_zoning/urls.py +++ b/crop_zoning/urls.py @@ -13,22 +13,22 @@ from .views import ( urlpatterns = [ path("area/", AreaView.as_view(), name="crop-zoning-area"), path("products/", ProductsView.as_view(), name="crop-zoning-products"), - path("zones/initial/", ZonesInitialView.as_view(), name="crop-zoning-zones-initial"), - path( - "zones/water-need/", - ZonesWaterNeedView.as_view(), - name="crop-zoning-zones-water-need", - ), - path( - "zones/soil-quality/", - ZonesSoilQualityView.as_view(), - name="crop-zoning-zones-soil-quality", - ), - path( - "zones/cultivation-risk/", - ZonesCultivationRiskView.as_view(), - name="crop-zoning-zones-cultivation-risk", - ), + # path("zones/initial/", ZonesInitialView.as_view(), name="crop-zoning-zones-initial"), + # path( + # "zones/water-need/", + # ZonesWaterNeedView.as_view(), + # name="crop-zoning-zones-water-need", + # ), + # path( + # "zones/soil-quality/", + # ZonesSoilQualityView.as_view(), + # name="crop-zoning-zones-soil-quality", + # ), + # path( + # "zones/cultivation-risk/", + # ZonesCultivationRiskView.as_view(), + # name="crop-zoning-zones-cultivation-risk", + # ), path( "zones//details/", ZoneDetailsView.as_view(), diff --git a/docker-compose-prod.yaml b/docker-compose-prod.yaml index b7254c8..4fa9d97 100644 --- a/docker-compose-prod.yaml +++ b/docker-compose-prod.yaml @@ -1,14 +1,14 @@ -# Production: no source mount; image contains code services: db: image: docker.iranserver.com/mysql:8 + container_name: backend-db environment: MYSQL_DATABASE: ${DB_NAME} MYSQL_USER: ${DB_USER} MYSQL_PASSWORD: ${DB_PASSWORD} MYSQL_ROOT_PASSWORD: ${DB_ROOT_PASSWORD} volumes: - - mysql_data:/var/lib/mysql + - backend_mysql_data:/var/lib/mysql restart: unless-stopped healthcheck: test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${DB_ROOT_PASSWORD}"] @@ -16,7 +16,23 @@ services: timeout: 5s retries: 5 networks: - - crop_network + - crop_network + + redis: + image: redis:7-alpine + container_name: backend-redis + command: ["redis-server", "--appendonly", "yes", "--save", "60", "1"] + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 10 + volumes: + - backend_redis_data:/data + networks: + - crop_network + web: container_name: backend-web build: . @@ -24,16 +40,44 @@ services: - .env environment: DB_HOST: db + CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis:6379/0} + CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/0} + QDRANT_HOST: ${QDRANT_HOST:-qdrant} + QDRANT_PORT: ${QDRANT_PORT:-6333} depends_on: db: condition: service_healthy + redis: + condition: service_healthy restart: unless-stopped - networks: - - crop_network + - crop_network + + celery: + container_name: backend-celery + build: . + command: ["celery", "-A", "config", "worker", "-l", "info"] + env_file: + - .env + environment: + DB_HOST: db + CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis:6379/0} + CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/0} + CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP: "true" + SKIP_MIGRATE: "1" + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + restart: unless-stopped + networks: + - crop_network networks: crop_network: - external: true + external: true + volumes: - mysql_data: + backend_mysql_data: + backend_redis_data: diff --git a/docker-compose.yaml b/docker-compose.yaml index d71cb4a..33c6a5d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,23 +1,23 @@ services: db: image: docker.iranserver.com/mysql:8 - container_name: ai-db + container_name: backend-db environment: - MYSQL_DATABASE: ${DB_NAME:-ai} - MYSQL_USER: ${DB_USER:-ai} + MYSQL_DATABASE: ${DB_NAME:-backend} + MYSQL_USER: ${DB_USER:-backend} MYSQL_PASSWORD: ${DB_PASSWORD:-changeme} - MYSQL_ROOT_PASSWORD: ${DB_PASSWORD:-changeme} + MYSQL_ROOT_PASSWORD: ${DB_ROOT_PASSWORD:-changeme} volumes: - - ai_mysql_data:/var/lib/mysql + - backend_mysql_data:/var/lib/mysql healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${DB_PASSWORD:-changeme}"] + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${DB_ROOT_PASSWORD:-changeme}"] interval: 5s timeout: 5s retries: 5 phpmyadmin: image: docker-mirror.liara.ir/phpmyadmin:latest - container_name: ai-phpmyadmin + container_name: backend-phpmyadmin environment: PMA_HOST: db PMA_PORT: 3306 @@ -30,18 +30,27 @@ services: redis: image: redis:7-alpine - container_name: ai-redis + container_name: backend-redis + command: ["redis-server", "--appendonly", "yes", "--save", "60", "1"] + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 10 ports: - "6380:6379" + volumes: + - backend_redis_data:/data qdrant: image: qdrant/qdrant:latest - container_name: ai-qdrant + container_name: backend-qdrant ports: - "6333:6333" - "6334:6334" volumes: - - qdrant_data:/qdrant/storage + - backend_qdrant_data:/qdrant/storage restart: unless-stopped web: @@ -52,7 +61,7 @@ services: PIP_INDEX_URL: https://package-mirror.liara.ir/repository/pypi/simple PIP_EXTRA_INDEX_URL: https://mirror2.chabokan.net/pypi/simple PYTHON_MIRROR: mirror2.chabokan.net - container_name: ai-web + container_name: backend-web command: ["python", "manage.py", "runserver", "0.0.0.0:8000"] volumes: - .:/app @@ -71,9 +80,10 @@ services: db: condition: service_healthy redis: - condition: service_started + condition: service_healthy qdrant: condition: service_started + restart: unless-stopped celery: build: @@ -83,8 +93,8 @@ services: PIP_INDEX_URL: https://package-mirror.liara.ir/repository/pypi/simple PIP_EXTRA_INDEX_URL: https://mirror2.chabokan.net/pypi/simple PYTHON_MIRROR: mirror2.chabokan.net - container_name: ai-celery - command: celery -A config worker -l info + container_name: backend-celery + command: ["celery", "-A", "config", "worker", "-l", "info"] volumes: - .:/app - ./logs:/app/logs @@ -94,13 +104,16 @@ services: DB_HOST: db CELERY_BROKER_URL: redis://redis:6379/0 CELERY_RESULT_BACKEND: redis://redis:6379/0 + CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP: "true" SKIP_MIGRATE: "1" depends_on: db: condition: service_healthy redis: - condition: service_started + condition: service_healthy + restart: unless-stopped volumes: - ai_mysql_data: - qdrant_data: + backend_mysql_data: + backend_redis_data: + backend_qdrant_data: diff --git a/sensor_hub/seeds.py b/sensor_hub/seeds.py index b2af338..170ee8e 100644 --- a/sensor_hub/seeds.py +++ b/sensor_hub/seeds.py @@ -107,5 +107,5 @@ def seed_admin_sensor(): }, ) if created: - dispatch_sensor_zoning(ADMIN_SENSOR_AREA_GEOJSON) + dispatch_sensor_zoning(ADMIN_SENSOR_AREA_GEOJSON, sensor) return sensor, created