Files
Ai/location_data/tasks.py
T

98 lines
3.1 KiB
Python
Raw Normal View History

"""
2026-04-29 01:27:29 +03:30
تسک‌های Celery برای واکشی داده‌های خاک.
"""
from decimal import Decimal
from config.celery import app
2026-04-29 01:27:29 +03:30
from django.apps import apps
from django.db import transaction
from .models import SoilDepthData, SoilLocation
2026-04-29 01:27:29 +03:30
from .soil_adapters import DEPTHS
2026-04-29 01:27:29 +03:30
try:
import requests
except ImportError: # pragma: no cover - handled in stripped envs
RequestException = Exception
else:
RequestException = requests.RequestException
2026-04-07 01:08:41 +03:30
def fetch_soil_data_for_coordinates(
latitude: float,
longitude: float,
task_id: str = "",
progress_callback=None,
):
"""
2026-04-07 01:08:41 +03:30
واکشی سنکرون داده خاک برای مختصات داده‌شده و ذخیره در DB.
این helper هم توسط Celery task و هم توسط endpointهای sync استفاده می‌شود.
"""
lat = Decimal(str(round(float(latitude), 6)))
lon = Decimal(str(round(float(longitude), 6)))
2026-04-29 01:27:29 +03:30
adapter = apps.get_app_config("location_data").get_soil_data_adapter()
with transaction.atomic():
location, created = SoilLocation.objects.select_for_update().get_or_create(
latitude=lat,
longitude=lon,
2026-04-07 01:08:41 +03:30
defaults={"task_id": task_id},
)
2026-04-07 01:08:41 +03:30
if not created and task_id:
location.task_id = task_id
location.save(update_fields=["task_id"])
2026-04-29 01:27:29 +03:30
for index, depth in enumerate(DEPTHS):
2026-04-07 01:08:41 +03:30
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={
2026-04-29 01:27:29 +03:30
"current": index + 1,
2026-04-07 01:08:41 +03:30
"total": len(DEPTHS),
"message": f"در حال واکشی عمق {depth}...",
},
)
2026-04-29 01:27:29 +03:30
fields = adapter.fetch_depth_fields(float(lon), float(lat), depth)
with transaction.atomic():
SoilDepthData.objects.update_or_create(
soil_location=location,
depth_label=depth,
defaults=fields,
)
2026-04-07 01:08:41 +03:30
if task_id:
with transaction.atomic():
location.task_id = ""
location.save(update_fields=["task_id"])
return {
"status": "completed",
"location_id": location.id,
"depths": DEPTHS,
}
2026-04-07 01:08:41 +03:30
@app.task(bind=True)
def fetch_soil_data_task(self, latitude: float, longitude: float):
"""
2026-04-29 01:27:29 +03:30
واکشی داده‌های خاک برای مختصات داده‌شده و ذخیره در DB.
برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست/شبیه‌سازی جدا انجام می‌شود.
2026-04-07 01:08:41 +03:30
"""
try:
return fetch_soil_data_for_coordinates(
latitude=latitude,
longitude=longitude,
task_id=self.request.id,
progress_callback=self.update_state,
)
2026-04-29 01:27:29 +03:30
except RequestException as exc:
2026-04-07 01:08:41 +03:30
lat = Decimal(str(round(float(latitude), 6)))
lon = Decimal(str(round(float(longitude), 6)))
location = SoilLocation.objects.filter(latitude=lat, longitude=lon).first()
return {
"status": "error",
"location_id": getattr(location, "id", None),
2026-04-29 01:27:29 +03:30
"error": str(exc),
2026-04-07 01:08:41 +03:30
}