Files
Ai/location_data/tasks.py
T
2026-03-19 22:54:29 +03:30

118 lines
3.6 KiB
Python

"""
تسک‌های Celery برای واکشی داده‌های خاک از API SoilGrids.
"""
from decimal import Decimal
import requests
from config.celery import app
from django.db import transaction
from .models import SoilDepthData, SoilLocation
SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query"
PROPERTIES = [
"bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs",
"phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500",
]
VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"]
DEPTHS = ["0-5cm", "5-15cm", "15-30cm"]
def _fetch_soilgrids(lon: float, lat: float, depth: str) -> dict | None:
"""درخواست به API SoilGrids برای یک عمق."""
params = {
"lon": lon,
"lat": lat,
"depth": depth,
}
for p in PROPERTIES:
params.setdefault("property", []).append(p)
for v in VALUES:
params.setdefault("value", []).append(v)
resp = requests.get(
SOILGRIDS_BASE,
params=params,
headers={"accept": "application/json"},
timeout=60,
)
resp.raise_for_status()
return resp.json()
def _parse_response_to_fields(data: dict) -> dict:
"""
استخراج مقادیر mean از response و ساخت dict مناسب برای SoilDepthData.
"""
fields = {p: None for p in PROPERTIES}
layers = data.get("properties", {}).get("layers", [])
for layer in layers:
name = layer.get("name")
if name not in fields:
continue
depths_list = layer.get("depths", [])
if depths_list:
values = depths_list[0].get("values", {})
mean_val = values.get("mean")
if mean_val is not None:
fields[name] = float(mean_val)
return fields
@app.task(bind=True)
def fetch_soil_data_task(self, latitude: float, longitude: float):
"""
واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB.
برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود.
"""
lat = Decimal(str(round(float(latitude), 6)))
lon = Decimal(str(round(float(longitude), 6)))
with transaction.atomic():
location, created = SoilLocation.objects.select_for_update().get_or_create(
latitude=lat,
longitude=lon,
defaults={"task_id": self.request.id},
)
if not created:
location.task_id = self.request.id
location.save(update_fields=["task_id"])
for i, depth in enumerate(DEPTHS):
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": len(DEPTHS),
"message": f"در حال واکشی عمق {depth}...",
},
)
try:
data = _fetch_soilgrids(float(lon), float(lat), depth)
except requests.RequestException as e:
return {
"status": "error",
"location_id": location.id,
"depth": depth,
"error": str(e),
}
fields = _parse_response_to_fields(data)
with transaction.atomic():
SoilDepthData.objects.update_or_create(
soil_location=location,
depth_label=depth,
defaults=fields,
)
with transaction.atomic():
location.task_id = ""
location.save(update_fields=["task_id"])
return {
"status": "completed",
"location_id": location.id,
"depths": DEPTHS,
}