""" تسک‌های Celery برای واکشی داده‌های خاک از API SoilGrids. """ from decimal import Decimal import requests from config.celery import app from django.db import transaction from .models import SoilDepthData, SoilLocation SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query" PROPERTIES = [ "bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs", "phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500", ] VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"] DEPTHS = ["0-5cm", "5-15cm", "15-30cm"] def _fetch_soilgrids(lon: float, lat: float, depth: str) -> dict | None: """درخواست به API SoilGrids برای یک عمق.""" params = { "lon": lon, "lat": lat, "depth": depth, } for p in PROPERTIES: params.setdefault("property", []).append(p) for v in VALUES: params.setdefault("value", []).append(v) resp = requests.get( SOILGRIDS_BASE, params=params, headers={"accept": "application/json"}, timeout=60, ) resp.raise_for_status() return resp.json() def _parse_response_to_fields(data: dict) -> dict: """ استخراج مقادیر mean از response و ساخت dict مناسب برای SoilDepthData. """ fields = {p: None for p in PROPERTIES} layers = data.get("properties", {}).get("layers", []) for layer in layers: name = layer.get("name") if name not in fields: continue depths_list = layer.get("depths", []) if depths_list: values = depths_list[0].get("values", {}) mean_val = values.get("mean") if mean_val is not None: fields[name] = float(mean_val) return fields @app.task(bind=True) def fetch_soil_data_task(self, latitude: float, longitude: float): """ واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB. برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود. """ lat = Decimal(str(round(float(latitude), 6))) lon = Decimal(str(round(float(longitude), 6))) with transaction.atomic(): location, created = SoilLocation.objects.select_for_update().get_or_create( latitude=lat, longitude=lon, defaults={"task_id": self.request.id}, ) if not created: location.task_id = self.request.id location.save(update_fields=["task_id"]) for i, depth in enumerate(DEPTHS): self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": len(DEPTHS), "message": f"در حال واکشی عمق {depth}...", }, ) try: data = _fetch_soilgrids(float(lon), float(lat), depth) except requests.RequestException as e: return { "status": "error", "location_id": location.id, "depth": depth, "error": str(e), } fields = _parse_response_to_fields(data) with transaction.atomic(): SoilDepthData.objects.update_or_create( soil_location=location, depth_label=depth, defaults=fields, ) with transaction.atomic(): location.task_id = "" location.save(update_fields=["task_id"]) return { "status": "completed", "location_id": location.id, "depths": DEPTHS, }