diff --git a/Dockerfile b/Dockerfile index 0e05676..58f7b6b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,4 +19,5 @@ COPY . . EXPOSE 8000 +ENTRYPOINT ["sh", "/app/entrypoint.sh"] CMD ["gunicorn", "config.wsgi:application", "--bind", "0.0.0.0:8000"] diff --git a/config/__init__.py b/config/__init__.py index e69de29..53f4ccb 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -0,0 +1,3 @@ +from .celery import app as celery_app + +__all__ = ("celery_app",) diff --git a/config/celery.py b/config/celery.py new file mode 100644 index 0000000..3a079f1 --- /dev/null +++ b/config/celery.py @@ -0,0 +1,9 @@ +import os + +from celery import Celery + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") + +app = Celery("config") +app.config_from_object("django.conf:settings", namespace="CELERY") +app.autodiscover_tasks() diff --git a/config/settings.py b/config/settings.py index 60825b9..e51981e 100644 --- a/config/settings.py +++ b/config/settings.py @@ -18,12 +18,10 @@ INSTALLED_APPS = [ "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", - "auth.apps.AuthConfig", - "account", - "sensor_hub", - "dashboard", "rest_framework", "corsheaders", + "tasks", + "soil_data", ] MIDDLEWARE = [ @@ -98,9 +96,12 @@ REST_FRAMEWORK = { "DEFAULT_PERMISSION_CLASSES": [ "rest_framework.permissions.AllowAny", ], - "DEFAULT_AUTHENTICATION_CLASSES": [ - "rest_framework_simplejwt.authentication.JWTAuthentication", - ], } CORS_ALLOW_ALL_ORIGINS = DEBUG + +# Celery +CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0") +CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0") +CELERY_ACCEPT_CONTENT = ["json"] +CELERY_TASK_SERIALIZER = "json" diff --git a/config/urls.py b/config/urls.py index 60db6b8..ff5d222 100644 --- a/config/urls.py +++ b/config/urls.py @@ -3,9 +3,6 @@ from django.urls import include, path urlpatterns = [ path("admin/", admin.site.urls), - path("api/auth/", include("auth.urls")), - path("api/account/", include("account.urls")), - path("api/sensor-hub/", include("sensor_hub.urls")), - path("api/farm-dashboard-config/", include("dashboard.urls_config")), - path("api/farm-dashboard/", include("dashboard.urls")), + path("api/tasks/", include("tasks.urls")), + path("api/soil-data/", include("soil_data.urls")), ] diff --git a/docker-compose-prod.yaml b/docker-compose-prod.yaml index 9c8e771..56c5d39 100644 --- a/docker-compose-prod.yaml +++ b/docker-compose-prod.yaml @@ -33,6 +33,11 @@ services: condition: service_healthy restart: unless-stopped + redis: + image: redis:7-alpine + container_name: ai-redis + restart: unless-stopped + web: build: . container_name: ai-web @@ -40,12 +45,33 @@ services: - .env environment: DB_HOST: db + CELERY_BROKER_URL: redis://redis:6379/0 + CELERY_RESULT_BACKEND: redis://redis:6379/0 depends_on: db: condition: service_healthy + redis: + condition: service_started restart: unless-stopped ports: - "8020:8000" + celery: + build: . + container_name: ai-celery + command: celery -A config worker -l info + env_file: + - .env + environment: + DB_HOST: db + CELERY_BROKER_URL: redis://redis:6379/0 + CELERY_RESULT_BACKEND: redis://redis:6379/0 + depends_on: + db: + condition: service_healthy + redis: + condition: service_started + restart: unless-stopped + volumes: ai_mysql_data: diff --git a/docker-compose.yaml b/docker-compose.yaml index ae19087..fc5b043 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -31,10 +31,16 @@ services: db: condition: service_healthy + redis: + image: redis:7-alpine + container_name: ai-redis + ports: + - "6380:6379" # host:container — سرویس‌ها داخل شبکه از redis:6379 استفاده می‌کنند + web: build: . container_name: ai-web - command: python manage.py runserver 0.0.0.0:8000 + command: ["python", "manage.py", "runserver", "0.0.0.0:8000"] volumes: - .:/app ports: @@ -43,9 +49,32 @@ services: - .env environment: DB_HOST: db + CELERY_BROKER_URL: redis://redis:6379/0 + CELERY_RESULT_BACKEND: redis://redis:6379/0 depends_on: db: condition: service_healthy + redis: + condition: service_started + + celery: + build: . + container_name: ai-celery + command: celery -A config worker -l info + volumes: + - .:/app + env_file: + - .env + environment: + DB_HOST: db + CELERY_BROKER_URL: redis://redis:6379/0 + CELERY_RESULT_BACKEND: redis://redis:6379/0 + SKIP_MIGRATE: "1" + depends_on: + db: + condition: service_healthy + redis: + condition: service_started volumes: ai_mysql_data: diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..5eaa21d --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,9 @@ +#!/bin/sh +set -e +if [ "${SKIP_MIGRATE}" != "1" ]; then + echo "Running migrations..." + python manage.py migrate --noinput --fake-initial + echo "Migrations done." +fi +echo "Starting command: $*" +exec "$@" diff --git a/requirements.txt b/requirements.txt index 36cb173..0b286ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,6 @@ django-cors-headers>=4.3,<5 mysqlclient>=2.2,<3 gunicorn>=22,<25 python-dotenv>=1.0,<2 +celery[redis]>=5.4,<6 +redis>=5.0,<6 +requests>=2.31,<3 diff --git a/soil_data/__init__.py b/soil_data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/soil_data/admin.py b/soil_data/admin.py new file mode 100644 index 0000000..620297e --- /dev/null +++ b/soil_data/admin.py @@ -0,0 +1,24 @@ +from django.contrib import admin +from .models import SoilDepthData, SoilLocation + + +class SoilDepthDataInline(admin.TabularInline): + model = SoilDepthData + extra = 0 + readonly_fields = ("depth_label", "bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs", "phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500") + + +@admin.register(SoilLocation) +class SoilLocationAdmin(admin.ModelAdmin): + list_display = ("id", "latitude", "longitude", "is_complete", "created_at") + list_filter = ("created_at",) + search_fields = ("latitude", "longitude") + readonly_fields = ("created_at", "updated_at") + inlines = [SoilDepthDataInline] + + +@admin.register(SoilDepthData) +class SoilDepthDataAdmin(admin.ModelAdmin): + list_display = ("id", "soil_location", "depth_label", "bdod", "cec", "phh2o", "clay", "sand", "silt") + list_filter = ("depth_label",) + search_fields = ("soil_location__latitude", "soil_location__longitude") diff --git a/soil_data/apps.py b/soil_data/apps.py new file mode 100644 index 0000000..71a697d --- /dev/null +++ b/soil_data/apps.py @@ -0,0 +1,7 @@ +from django.apps import AppConfig + + +class SoilDataConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "soil_data" + verbose_name = "Soil Data (SoilGrids)" diff --git a/soil_data/migrations/0001_initial.py b/soil_data/migrations/0001_initial.py new file mode 100644 index 0000000..7225a2d --- /dev/null +++ b/soil_data/migrations/0001_initial.py @@ -0,0 +1,34 @@ +# Generated manually for soil_data + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="SoilLocation", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("latitude", models.DecimalField(db_index=True, decimal_places=6, help_text="عرض جغرافیایی (lat)", max_digits=9)), + ("longitude", models.DecimalField(db_index=True, decimal_places=6, help_text="طول جغرافیایی (lon)", max_digits=9)), + ("depth_0_5cm", models.JSONField(blank=True, help_text="داده‌های لایه ۰–۵ سانتی‌متر از API SoilGrids", null=True)), + ("depth_5_15cm", models.JSONField(blank=True, help_text="داده‌های لایه ۵–۱۵ سانتی‌متر از API SoilGrids", null=True)), + ("depth_15_30cm", models.JSONField(blank=True, help_text="داده‌های لایه ۱۵–۳۰ سانتی‌متر از API SoilGrids", null=True)), + ("task_id", models.CharField(blank=True, help_text="شناسه تسک Celery در حال پردازش", max_length=255)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ], + options={ + "ordering": ["-updated_at"], + }, + ), + migrations.AddConstraint( + model_name="soillocation", + constraint=models.UniqueConstraint(fields=("latitude", "longitude"), name="soil_location_unique_lat_lon"), + ), + ] diff --git a/soil_data/migrations/0002_soildepthdata_refactor.py b/soil_data/migrations/0002_soildepthdata_refactor.py new file mode 100644 index 0000000..711bbb7 --- /dev/null +++ b/soil_data/migrations/0002_soildepthdata_refactor.py @@ -0,0 +1,77 @@ +# Generated manually: refactor to SoilDepthData table + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("soil_data", "0001_initial"), + ] + + operations = [ + migrations.CreateModel( + name="SoilDepthData", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "depth_label", + models.CharField( + choices=[ + ("0-5cm", "۰–۵ سانتی‌متر"), + ("5-15cm", "۵–۱۵ سانتی‌متر"), + ("15-30cm", "۱۵–۳۰ سانتی‌متر"), + ], + db_index=True, + max_length=10, + ), + ), + ("bdod", models.FloatField(blank=True, null=True)), + ("cec", models.FloatField(blank=True, null=True)), + ("cfvo", models.FloatField(blank=True, null=True)), + ("clay", models.FloatField(blank=True, null=True)), + ("nitrogen", models.FloatField(blank=True, null=True)), + ("ocd", models.FloatField(blank=True, null=True)), + ("ocs", models.FloatField(blank=True, null=True)), + ("phh2o", models.FloatField(blank=True, null=True)), + ("sand", models.FloatField(blank=True, null=True)), + ("silt", models.FloatField(blank=True, null=True)), + ("soc", models.FloatField(blank=True, null=True)), + ("wv0010", models.FloatField(blank=True, null=True)), + ("wv0033", models.FloatField(blank=True, null=True)), + ("wv1500", models.FloatField(blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "soil_location", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="depths", + to="soil_data.soillocation", + ), + ), + ], + options={ + "ordering": ["soil_location", "depth_label"], + }, + ), + migrations.AddConstraint( + model_name="soildepthdata", + constraint=models.UniqueConstraint( + fields=("soil_location", "depth_label"), + name="soil_depth_unique_location_depth", + ), + ), + migrations.RemoveField( + model_name="soillocation", + name="depth_0_5cm", + ), + migrations.RemoveField( + model_name="soillocation", + name="depth_5_15cm", + ), + migrations.RemoveField( + model_name="soillocation", + name="depth_15_30cm", + ), + ] diff --git a/soil_data/migrations/__init__.py b/soil_data/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/soil_data/models.py b/soil_data/models.py new file mode 100644 index 0000000..b2e6aac --- /dev/null +++ b/soil_data/models.py @@ -0,0 +1,100 @@ +from django.db import models + + +class SoilLocation(models.Model): + """ + مختصات جغرافیایی برای داده‌های خاک. + هر مختصات سه سطر در SoilDepthData دارد (۰–۵، ۵–۱۵، ۱۵–۳۰ سانتی‌متر). + """ + + latitude = models.DecimalField( + max_digits=9, + decimal_places=6, + db_index=True, + help_text="عرض جغرافیایی (lat)", + ) + longitude = models.DecimalField( + max_digits=9, + decimal_places=6, + db_index=True, + help_text="طول جغرافیایی (lon)", + ) + task_id = models.CharField( + max_length=255, + blank=True, + help_text="شناسه تسک Celery در حال پردازش", + ) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["latitude", "longitude"], + name="soil_location_unique_lat_lon", + ) + ] + ordering = ["-updated_at"] + + def __str__(self): + return f"SoilLocation({self.latitude}, {self.longitude})" + + @property + def is_complete(self): + """آیا هر سه عمق ذخیره شده‌اند؟""" + return self.depths.count() == 3 + + +class SoilDepthData(models.Model): + """ + داده‌های خاک برای یک عمق مشخص، مرتبط با یک SoilLocation. + مقادیر خام از API SoilGrids (قبل از اعمال d_factor). + """ + + DEPTH_0_5 = "0-5cm" + DEPTH_5_15 = "5-15cm" + DEPTH_15_30 = "15-30cm" + DEPTH_CHOICES = [ + (DEPTH_0_5, "۰–۵ سانتی‌متر"), + (DEPTH_5_15, "۵–۱۵ سانتی‌متر"), + (DEPTH_15_30, "۱۵–۳۰ سانتی‌متر"), + ] + + soil_location = models.ForeignKey( + SoilLocation, + on_delete=models.CASCADE, + related_name="depths", + ) + depth_label = models.CharField( + max_length=10, + choices=DEPTH_CHOICES, + db_index=True, + ) + # خواص خاک — مقادیر mean از API (raw) + bdod = models.FloatField(null=True, blank=True) + cec = models.FloatField(null=True, blank=True) + cfvo = models.FloatField(null=True, blank=True) + clay = models.FloatField(null=True, blank=True) + nitrogen = models.FloatField(null=True, blank=True) + ocd = models.FloatField(null=True, blank=True) + ocs = models.FloatField(null=True, blank=True) + phh2o = models.FloatField(null=True, blank=True) + sand = models.FloatField(null=True, blank=True) + silt = models.FloatField(null=True, blank=True) + soc = models.FloatField(null=True, blank=True) + wv0010 = models.FloatField(null=True, blank=True) + wv0033 = models.FloatField(null=True, blank=True) + wv1500 = models.FloatField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["soil_location", "depth_label"], + name="soil_depth_unique_location_depth", + ) + ] + ordering = ["soil_location", "depth_label"] + + def __str__(self): + return f"SoilDepthData({self.soil_location_id}, {self.depth_label})" diff --git a/soil_data/postman/soil_data.json b/soil_data/postman/soil_data.json new file mode 100644 index 0000000..0c1d060 --- /dev/null +++ b/soil_data/postman/soil_data.json @@ -0,0 +1,93 @@ +{ + "info": { + "name": "Soil Data", + "description": "API داده‌های خاک (SoilGrids) بر اساس مختصات جغرافیایی", + "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" + }, + "variable": [ + { + "key": "baseUrl", + "value": "http://localhost:8020" + }, + { + "key": "task_id", + "value": "" + } + ], + "item": [ + { + "name": "Get Soil Data (query)", + "request": { + "method": "GET", + "header": [ + { + "key": "Accept", + "value": "application/json" + } + ], + "url": { + "raw": "{{baseUrl}}/api/soil-data/?lon=52.42&lat=36.38", + "host": ["{{baseUrl}}"], + "path": ["api", "soil-data", ""], + "query": [ + { + "key": "lon", + "value": "52.42", + "description": "طول جغرافیایی" + }, + { + "key": "lat", + "value": "36.38", + "description": "عرض جغرافیایی" + } + ] + }, + "description": "دریافت داده خاک با lon و lat در query. اگر داده در DB باشد 200، وگرنه 202 با task_id برمی‌گردد." + } + }, + { + "name": "Get Soil Data (POST)", + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + }, + { + "key": "Accept", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"lon\": 52.42,\n \"lat\": 36.38\n}" + }, + "url": { + "raw": "{{baseUrl}}/api/soil-data/", + "host": ["{{baseUrl}}"], + "path": ["api", "soil-data", ""] + }, + "description": "دریافت داده خاک با lon و lat در body. اگر داده در DB باشد 200، وگرنه 202 با task_id برمی‌گردد." + } + }, + { + "name": "Task Status", + "request": { + "method": "GET", + "header": [ + { + "key": "Accept", + "value": "application/json" + } + ], + "url": { + "raw": "{{baseUrl}}/api/soil-data/tasks/{{task_id}}/status/", + "host": ["{{baseUrl}}"], + "path": ["api", "soil-data", "tasks", "{{task_id}}", "status", ""] + }, + "description": "بررسی وضعیت تسک واکشی خاک. task_id را از پاسخ 202 دریافت می‌کنید." + } + } + ] +} diff --git a/soil_data/serializers.py b/soil_data/serializers.py new file mode 100644 index 0000000..829320e --- /dev/null +++ b/soil_data/serializers.py @@ -0,0 +1,76 @@ +from rest_framework import serializers + +from .models import SoilDepthData, SoilLocation + + +class SoilDataRequestSerializer(serializers.Serializer): + """سریالایزر ورودی: lon و lat برای درخواست داده خاک.""" + + lon = serializers.DecimalField(max_digits=9, decimal_places=6, required=True) + lat = serializers.DecimalField(max_digits=9, decimal_places=6, required=True) + + +class SoilDepthDataSerializer(serializers.ModelSerializer): + """سریالایزر خروجی برای هر عمق خاک.""" + + class Meta: + model = SoilDepthData + fields = [ + "depth_label", + "bdod", + "cec", + "cfvo", + "clay", + "nitrogen", + "ocd", + "ocs", + "phh2o", + "sand", + "silt", + "soc", + "wv0010", + "wv0033", + "wv1500", + ] + + +class SoilLocationResponseSerializer(serializers.ModelSerializer): + """سریالایزر خروجی برای SoilLocation همراه با depths.""" + + lon = serializers.DecimalField( + source="longitude", + max_digits=9, + decimal_places=6, + read_only=True, + ) + lat = serializers.DecimalField( + source="latitude", + max_digits=9, + decimal_places=6, + read_only=True, + ) + depths = serializers.SerializerMethodField() + + class Meta: + model = SoilLocation + fields = ["id", "lon", "lat", "depths"] + + def get_depths(self, obj): + from .tasks import DEPTHS + + depth_qs = obj.depths.all() + order = {d: i for i, d in enumerate(DEPTHS)} + sorted_depths = sorted( + depth_qs, key=lambda d: order.get(d.depth_label, 99) + ) + return SoilDepthDataSerializer(sorted_depths, many=True).data + + +class SoilDataTaskResponseSerializer(serializers.Serializer): + """سریالایزر خروجی وقتی تسک در صف قرار گرفته (۲۰۲).""" + + source = serializers.CharField(default="task") + task_id = serializers.CharField() + lon = serializers.FloatField(source="longitude") + lat = serializers.FloatField(source="latitude") + status_url = serializers.URLField(required=False) diff --git a/soil_data/tasks.py b/soil_data/tasks.py new file mode 100644 index 0000000..f52f91f --- /dev/null +++ b/soil_data/tasks.py @@ -0,0 +1,117 @@ +""" +تسک‌های Celery برای واکشی داده‌های خاک از API SoilGrids. +""" + +from decimal import Decimal + +import requests +from config.celery import app +from django.db import transaction + +from .models import SoilDepthData, SoilLocation + +SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query" +PROPERTIES = [ + "bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs", + "phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500", +] +VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"] +DEPTHS = ["0-5cm", "5-15cm", "15-30cm"] + + +def _fetch_soilgrids(lon: float, lat: float, depth: str) -> dict | None: + """درخواست به API SoilGrids برای یک عمق.""" + params = { + "lon": lon, + "lat": lat, + "depth": depth, + } + for p in PROPERTIES: + params.setdefault("property", []).append(p) + for v in VALUES: + params.setdefault("value", []).append(v) + + resp = requests.get( + SOILGRIDS_BASE, + params=params, + headers={"accept": "application/json"}, + timeout=60, + ) + resp.raise_for_status() + return resp.json() + + +def _parse_response_to_fields(data: dict) -> dict: + """ + استخراج مقادیر mean از response و ساخت dict مناسب برای SoilDepthData. + """ + fields = {p: None for p in PROPERTIES} + layers = data.get("properties", {}).get("layers", []) + for layer in layers: + name = layer.get("name") + if name not in fields: + continue + depths_list = layer.get("depths", []) + if depths_list: + values = depths_list[0].get("values", {}) + mean_val = values.get("mean") + if mean_val is not None: + fields[name] = float(mean_val) + return fields + + +@app.task(bind=True) +def fetch_soil_data_task(self, latitude: float, longitude: float): + """ + واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB. + برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود. + """ + lat = Decimal(str(round(float(latitude), 6))) + lon = Decimal(str(round(float(longitude), 6))) + + with transaction.atomic(): + location, created = SoilLocation.objects.select_for_update().get_or_create( + latitude=lat, + longitude=lon, + defaults={"task_id": self.request.id}, + ) + if not created: + location.task_id = self.request.id + location.save(update_fields=["task_id"]) + + for i, depth in enumerate(DEPTHS): + self.update_state( + state="PROGRESS", + meta={ + "current": i + 1, + "total": len(DEPTHS), + "message": f"در حال واکشی عمق {depth}...", + }, + ) + try: + data = _fetch_soilgrids(float(lon), float(lat), depth) + except requests.RequestException as e: + return { + "status": "error", + "location_id": location.id, + "depth": depth, + "error": str(e), + } + + fields = _parse_response_to_fields(data) + with transaction.atomic(): + SoilDepthData.objects.update_or_create( + soil_location=location, + depth_label=depth, + defaults=fields, + ) + + with transaction.atomic(): + location.task_id = "" + location.save(update_fields=["task_id"]) + + return { + "status": "completed", + "location_id": location.id, + "depths": DEPTHS, + } diff --git a/soil_data/urls.py b/soil_data/urls.py new file mode 100644 index 0000000..35d9c96 --- /dev/null +++ b/soil_data/urls.py @@ -0,0 +1,8 @@ +from django.urls import path + +from .views import SoilDataTaskStatusView, SoilDataView + +urlpatterns = [ + path("", SoilDataView.as_view(), name="soil-data"), + path("tasks//status/", SoilDataTaskStatusView.as_view(), name="soil-data-task-status"), +] diff --git a/soil_data/views.py b/soil_data/views.py new file mode 100644 index 0000000..ffbe701 --- /dev/null +++ b/soil_data/views.py @@ -0,0 +1,120 @@ +from rest_framework import status +from rest_framework.response import Response +from rest_framework.views import APIView + +from .models import SoilLocation +from .serializers import ( + SoilDataRequestSerializer, + SoilDataTaskResponseSerializer, + SoilLocationResponseSerializer, +) +from .tasks import fetch_soil_data_task + + +class SoilDataView(APIView): + """ + API خاک: مختصات جغرافیایی را می‌گیرد. + اگر داده در DB موجود باشد، برگردانده می‌شود؛ در غیر این صورت + تسک Celery صف می‌شود و task_id برمی‌گردد. + """ + + def _get_request_data(self, request): + return request.data if request.method == "POST" else request.query_params + + def get(self, request): + return self._process(request) + + def post(self, request): + return self._process(request) + + def _process(self, request): + data = self._get_request_data(request) + serializer = SoilDataRequestSerializer(data=data) + if not serializer.is_valid(): + return Response( + {"code": 400, "msg": "داده نامعتبر.", "data": serializer.errors}, + status=status.HTTP_400_BAD_REQUEST, + ) + lat = serializer.validated_data["lat"] + lon = serializer.validated_data["lon"] + lat_rounded = round(lat, 6) + lon_rounded = round(lon, 6) + + location = ( + SoilLocation.objects.filter( + latitude=lat_rounded, + longitude=lon_rounded, + ) + .prefetch_related("depths") + .first() + ) + + if location and location.is_complete: + data_serializer = SoilLocationResponseSerializer(location) + return Response( + { + "code": 200, + "msg": "success", + "data": { + "source": "database", + **data_serializer.data, + }, + }, + status=status.HTTP_200_OK, + ) + + result = fetch_soil_data_task.delay(float(lat_rounded), float(lon_rounded)) + task_data = SoilDataTaskResponseSerializer( + { + "task_id": result.id, + "longitude": float(lon_rounded), + "latitude": float(lat_rounded), + "status_url": f"/api/soil-data/tasks/{result.id}/status/", + } + ).data + return Response( + { + "code": 202, + "msg": "تسک در صف. وضعیت را با task_id بررسی کنید.", + "data": task_data, + }, + status=status.HTTP_202_ACCEPTED, + ) + + +class SoilDataTaskStatusView(APIView): + """وضعیت تسک واکشی خاک. در صورت SUCCESS لیست اطلاعات هر سه عمق برگردانده می‌شود.""" + + def get(self, request, task_id): + from celery.result import AsyncResult + + result = AsyncResult(task_id) + state = result.state + data = {"task_id": task_id, "status": state} + + if state == "PENDING": + data["message"] = "تسک در صف یا یافت نشد." + elif state == "PROGRESS": + data["progress"] = result.info + elif state == "SUCCESS": + task_result = result.result + if isinstance(task_result, dict) and task_result.get("status") == "completed": + location_id = task_result.get("location_id") + location = ( + SoilLocation.objects.filter(pk=location_id) + .prefetch_related("depths") + .first() + ) + if location and location.is_complete: + data["result"] = SoilLocationResponseSerializer(location).data + else: + data["result"] = task_result + else: + data["result"] = task_result + elif state == "FAILURE": + data["error"] = str(result.result) + + return Response( + {"code": 200, "msg": "success", "data": data}, + status=status.HTTP_200_OK, + ) diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tasks/apps.py b/tasks/apps.py new file mode 100644 index 0000000..b5dc464 --- /dev/null +++ b/tasks/apps.py @@ -0,0 +1,7 @@ +from django.apps import AppConfig + + +class TasksConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "tasks" + verbose_name = "Celery Tasks" diff --git a/tasks/celery_tasks.py b/tasks/celery_tasks.py new file mode 100644 index 0000000..1e0f755 --- /dev/null +++ b/tasks/celery_tasks.py @@ -0,0 +1,15 @@ +import time + +from config.celery import app + + +@app.task(bind=True) +def sample_task(self, duration: int = 1): + """تسک نمونه برای تست. duration تعداد ثانیه‌ای که تسک طول می‌کشه.""" + for i in range(duration): + self.update_state( + state="PROGRESS", + meta={"current": i + 1, "total": duration, "message": "در حال پردازش..."}, + ) + time.sleep(1) + return {"status": "completed", "duration": duration} diff --git a/tasks/urls.py b/tasks/urls.py new file mode 100644 index 0000000..67c59b6 --- /dev/null +++ b/tasks/urls.py @@ -0,0 +1,8 @@ +from django.urls import path + +from .views import TaskStatusView, TaskTriggerView + +urlpatterns = [ + path("", TaskTriggerView.as_view(), name="task-trigger"), + path("/status/", TaskStatusView.as_view(), name="task-status"), +] diff --git a/tasks/views.py b/tasks/views.py new file mode 100644 index 0000000..4abf251 --- /dev/null +++ b/tasks/views.py @@ -0,0 +1,51 @@ +from celery.result import AsyncResult + +from rest_framework import status +from rest_framework.response import Response +from rest_framework.views import APIView + +from .celery_tasks import sample_task + + +class TaskTriggerView(APIView): + """ + ثبت و اجرای تسک. + POST با بدنه اختیاری: {"duration": 3} - مدت زمان تسک به ثانیه. + """ + + def post(self, request): + duration = request.data.get("duration", 1) + try: + duration = int(duration) + duration = max(1, min(duration, 60)) + except (TypeError, ValueError): + duration = 1 + result = sample_task.delay(duration) + return Response( + {"code": 200, "msg": "success", "data": {"task_id": result.id}}, + status=status.HTTP_200_OK, + ) + + +class TaskStatusView(APIView): + """ + وضعیت تسک بر اساس task_id. + GET /api/tasks//status/ + """ + + def get(self, request, task_id): + result = AsyncResult(task_id) + state = result.state + data = {"task_id": task_id, "status": state} + if state == "PENDING": + data["message"] = "تسک در صف یا یافت نشد." + elif state == "PROGRESS": + data["progress"] = result.info + elif state == "SUCCESS": + data["result"] = result.result + elif state == "FAILURE": + data["error"] = str(result.result) + return Response( + {"code": 200, "msg": "success", "data": data}, + status=status.HTTP_200_OK, + )