Files
Ai/soil_data/tasks.py
T
sajad-dev 09e0c26c68 Add Redis service and Celery configuration to Docker setup
- Introduced Redis service in both docker-compose files for production and development.
- Updated web and celery services to use Redis as the broker and result backend.
- Added necessary environment variables for Celery in settings.py.
- Included new tasks and soil_data apps in Django settings and updated URL routing.
- Updated requirements.txt to include Celery and Redis dependencies.
2026-02-27 13:09:00 +03:30

118 lines
3.6 KiB
Python

"""
تسک‌های Celery برای واکشی داده‌های خاک از API SoilGrids.
"""
from decimal import Decimal
import requests
from config.celery import app
from django.db import transaction
from .models import SoilDepthData, SoilLocation
SOILGRIDS_BASE = "https://rest.isric.org/soilgrids/v2.0/properties/query"
PROPERTIES = [
"bdod", "cec", "cfvo", "clay", "nitrogen", "ocd", "ocs",
"phh2o", "sand", "silt", "soc", "wv0010", "wv0033", "wv1500",
]
VALUES = ["Q0.5", "Q0.05", "Q0.95", "mean", "uncertainty"]
DEPTHS = ["0-5cm", "5-15cm", "15-30cm"]
def _fetch_soilgrids(lon: float, lat: float, depth: str) -> dict | None:
"""درخواست به API SoilGrids برای یک عمق."""
params = {
"lon": lon,
"lat": lat,
"depth": depth,
}
for p in PROPERTIES:
params.setdefault("property", []).append(p)
for v in VALUES:
params.setdefault("value", []).append(v)
resp = requests.get(
SOILGRIDS_BASE,
params=params,
headers={"accept": "application/json"},
timeout=60,
)
resp.raise_for_status()
return resp.json()
def _parse_response_to_fields(data: dict) -> dict:
"""
استخراج مقادیر mean از response و ساخت dict مناسب برای SoilDepthData.
"""
fields = {p: None for p in PROPERTIES}
layers = data.get("properties", {}).get("layers", [])
for layer in layers:
name = layer.get("name")
if name not in fields:
continue
depths_list = layer.get("depths", [])
if depths_list:
values = depths_list[0].get("values", {})
mean_val = values.get("mean")
if mean_val is not None:
fields[name] = float(mean_val)
return fields
@app.task(bind=True)
def fetch_soil_data_task(self, latitude: float, longitude: float):
"""
واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB.
برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود.
"""
lat = Decimal(str(round(float(latitude), 6)))
lon = Decimal(str(round(float(longitude), 6)))
with transaction.atomic():
location, created = SoilLocation.objects.select_for_update().get_or_create(
latitude=lat,
longitude=lon,
defaults={"task_id": self.request.id},
)
if not created:
location.task_id = self.request.id
location.save(update_fields=["task_id"])
for i, depth in enumerate(DEPTHS):
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": len(DEPTHS),
"message": f"در حال واکشی عمق {depth}...",
},
)
try:
data = _fetch_soilgrids(float(lon), float(lat), depth)
except requests.RequestException as e:
return {
"status": "error",
"location_id": location.id,
"depth": depth,
"error": str(e),
}
fields = _parse_response_to_fields(data)
with transaction.atomic():
SoilDepthData.objects.update_or_create(
soil_location=location,
depth_label=depth,
defaults=fields,
)
with transaction.atomic():
location.task_id = ""
location.save(update_fields=["task_id"])
return {
"status": "completed",
"location_id": location.id,
"depths": DEPTHS,
}