""" تسک‌های Celery برای واکشی داده‌های خاک از API SoilGrids. """ from decimal import Decimal import requests from config.celery import app from django.db import transaction from .models import SoilDepthData, SoilLocation SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query" PROPERTIES = [ "bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs", "phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500", ] VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"] DEPTHS = ["0-5cm", "5-15cm", "15-30cm"] def _fetch_soilgrids(lon: float, lat: float, depth: str) -> dict | None: """درخواست به API SoilGrids برای یک عمق.""" params = { "lon": lon, "lat": lat, "depth": depth, } for p in PROPERTIES: params.setdefault("property", []).append(p) for v in VALUES: params.setdefault("value", []).append(v) resp = requests.get( SOILGRIDS_BASE, params=params, headers={"accept": "application/json"}, timeout=60, ) resp.raise_for_status() return resp.json() def _parse_response_to_fields(data: dict) -> dict: """ استخراج مقادیر mean از response و ساخت dict مناسب برای SoilDepthData. """ fields = {p: None for p in PROPERTIES} layers = data.get("properties", {}).get("layers", []) for layer in layers: name = layer.get("name") if name not in fields: continue depths_list = layer.get("depths", []) if depths_list: values = depths_list[0].get("values", {}) mean_val = values.get("mean") if mean_val is not None: fields[name] = float(mean_val) return fields def fetch_soil_data_for_coordinates( latitude: float, longitude: float, task_id: str = "", progress_callback=None, ): """ واکشی سنکرون داده خاک برای مختصات داده‌شده و ذخیره در DB. این helper هم توسط Celery task و هم توسط endpointهای sync استفاده می‌شود. """ lat = Decimal(str(round(float(latitude), 6))) lon = Decimal(str(round(float(longitude), 6))) with transaction.atomic(): location, created = SoilLocation.objects.select_for_update().get_or_create( latitude=lat, longitude=lon, defaults={"task_id": task_id}, ) if not created and task_id: location.task_id = task_id location.save(update_fields=["task_id"]) for i, depth in enumerate(DEPTHS): if progress_callback is not None: progress_callback( state="PROGRESS", meta={ "current": i + 1, "total": len(DEPTHS), "message": f"در حال واکشی عمق {depth}...", }, ) data = _fetch_soilgrids(float(lon), float(lat), depth) fields = _parse_response_to_fields(data) with transaction.atomic(): SoilDepthData.objects.update_or_create( soil_location=location, depth_label=depth, defaults=fields, ) if task_id: with transaction.atomic(): location.task_id = "" location.save(update_fields=["task_id"]) return { "status": "completed", "location_id": location.id, "depths": DEPTHS, } @app.task(bind=True) def fetch_soil_data_task(self, latitude: float, longitude: float): """ واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB. برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود. """ try: return fetch_soil_data_for_coordinates( latitude=latitude, longitude=longitude, task_id=self.request.id, progress_callback=self.update_state, ) except requests.RequestException as e: lat = Decimal(str(round(float(latitude), 6))) lon = Decimal(str(round(float(longitude), 6))) location = SoilLocation.objects.filter(latitude=lat, longitude=lon).first() return { "status": "error", "location_id": getattr(location, "id", None), "error": str(e), }