118 lines
3.6 KiB
Python
118 lines
3.6 KiB
Python
|
|
"""
|
||
|
|
تسکهای Celery برای واکشی دادههای خاک از API SoilGrids.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from decimal import Decimal
|
||
|
|
|
||
|
|
import requests
|
||
|
|
from config.celery import app
|
||
|
|
from django.db import transaction
|
||
|
|
|
||
|
|
from .models import SoilDepthData, SoilLocation
|
||
|
|
|
||
|
|
SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query"
|
||
|
|
PROPERTIES = [
|
||
|
|
"bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs",
|
||
|
|
"phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500",
|
||
|
|
]
|
||
|
|
VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"]
|
||
|
|
DEPTHS = ["0-5cm", "5-15cm", "15-30cm"]
|
||
|
|
|
||
|
|
|
||
|
|
def _fetch_soilgrids(lon: float, lat: float, depth: str) -> dict | None:
|
||
|
|
"""درخواست به API SoilGrids برای یک عمق."""
|
||
|
|
params = {
|
||
|
|
"lon": lon,
|
||
|
|
"lat": lat,
|
||
|
|
"depth": depth,
|
||
|
|
}
|
||
|
|
for p in PROPERTIES:
|
||
|
|
params.setdefault("property", []).append(p)
|
||
|
|
for v in VALUES:
|
||
|
|
params.setdefault("value", []).append(v)
|
||
|
|
|
||
|
|
resp = requests.get(
|
||
|
|
SOILGRIDS_BASE,
|
||
|
|
params=params,
|
||
|
|
headers={"accept": "application/json"},
|
||
|
|
timeout=60,
|
||
|
|
)
|
||
|
|
resp.raise_for_status()
|
||
|
|
return resp.json()
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_response_to_fields(data: dict) -> dict:
|
||
|
|
"""
|
||
|
|
استخراج مقادیر mean از response و ساخت dict مناسب برای SoilDepthData.
|
||
|
|
"""
|
||
|
|
fields = {p: None for p in PROPERTIES}
|
||
|
|
layers = data.get("properties", {}).get("layers", [])
|
||
|
|
for layer in layers:
|
||
|
|
name = layer.get("name")
|
||
|
|
if name not in fields:
|
||
|
|
continue
|
||
|
|
depths_list = layer.get("depths", [])
|
||
|
|
if depths_list:
|
||
|
|
values = depths_list[0].get("values", {})
|
||
|
|
mean_val = values.get("mean")
|
||
|
|
if mean_val is not None:
|
||
|
|
fields[name] = float(mean_val)
|
||
|
|
return fields
|
||
|
|
|
||
|
|
|
||
|
|
@app.task(bind=True)
|
||
|
|
def fetch_soil_data_task(self, latitude: float, longitude: float):
|
||
|
|
"""
|
||
|
|
واکشی دادههای خاک برای مختصات دادهشده از SoilGrids و ذخیره در DB.
|
||
|
|
برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده میشود.
|
||
|
|
"""
|
||
|
|
lat = Decimal(str(round(float(latitude), 6)))
|
||
|
|
lon = Decimal(str(round(float(longitude), 6)))
|
||
|
|
|
||
|
|
with transaction.atomic():
|
||
|
|
location, created = SoilLocation.objects.select_for_update().get_or_create(
|
||
|
|
latitude=lat,
|
||
|
|
longitude=lon,
|
||
|
|
defaults={"task_id": self.request.id},
|
||
|
|
)
|
||
|
|
if not created:
|
||
|
|
location.task_id = self.request.id
|
||
|
|
location.save(update_fields=["task_id"])
|
||
|
|
|
||
|
|
for i, depth in enumerate(DEPTHS):
|
||
|
|
self.update_state(
|
||
|
|
state="PROGRESS",
|
||
|
|
meta={
|
||
|
|
"current": i + 1,
|
||
|
|
"total": len(DEPTHS),
|
||
|
|
"message": f"در حال واکشی عمق {depth}...",
|
||
|
|
},
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
data = _fetch_soilgrids(float(lon), float(lat), depth)
|
||
|
|
except requests.RequestException as e:
|
||
|
|
return {
|
||
|
|
"status": "error",
|
||
|
|
"location_id": location.id,
|
||
|
|
"depth": depth,
|
||
|
|
"error": str(e),
|
||
|
|
}
|
||
|
|
|
||
|
|
fields = _parse_response_to_fields(data)
|
||
|
|
with transaction.atomic():
|
||
|
|
SoilDepthData.objects.update_or_create(
|
||
|
|
soil_location=location,
|
||
|
|
depth_label=depth,
|
||
|
|
defaults=fields,
|
||
|
|
)
|
||
|
|
|
||
|
|
with transaction.atomic():
|
||
|
|
location.task_id = ""
|
||
|
|
location.save(update_fields=["task_id"])
|
||
|
|
|
||
|
|
return {
|
||
|
|
"status": "completed",
|
||
|
|
"location_id": location.id,
|
||
|
|
"depths": DEPTHS,
|
||
|
|
}
|