This commit is contained in:
2026-04-07 01:08:41 +03:30
parent ff464cb4a5
commit 5acee1fa2c
5 changed files with 198 additions and 34 deletions
+49 -28
View File
@@ -60,11 +60,15 @@ def _parse_response_to_fields(data: dict) -> dict:
return fields
@app.task(bind=True)
def fetch_soil_data_task(self, latitude: float, longitude: float):
def fetch_soil_data_for_coordinates(
latitude: float,
longitude: float,
task_id: str = "",
progress_callback=None,
):
"""
واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB.
برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود.
واکشی سنکرون داده خاک برای مختصات داده‌شده و ذخیره در DB.
این helper هم توسط Celery task و هم توسط endpointهای sync استفاده می‌شود.
"""
lat = Decimal(str(round(float(latitude), 6)))
lon = Decimal(str(round(float(longitude), 6)))
@@ -73,31 +77,23 @@ def fetch_soil_data_task(self, latitude: float, longitude: float):
location, created = SoilLocation.objects.select_for_update().get_or_create(
latitude=lat,
longitude=lon,
defaults={"task_id": self.request.id},
defaults={"task_id": task_id},
)
if not created:
location.task_id = self.request.id
if not created and task_id:
location.task_id = task_id
location.save(update_fields=["task_id"])
for i, depth in enumerate(DEPTHS):
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": len(DEPTHS),
"message": f"در حال واکشی عمق {depth}...",
},
)
try:
data = _fetch_soilgrids(float(lon), float(lat), depth)
except requests.RequestException as e:
return {
"status": "error",
"location_id": location.id,
"depth": depth,
"error": str(e),
}
if progress_callback is not None:
progress_callback(
state="PROGRESS",
meta={
"current": i + 1,
"total": len(DEPTHS),
"message": f"در حال واکشی عمق {depth}...",
},
)
data = _fetch_soilgrids(float(lon), float(lat), depth)
fields = _parse_response_to_fields(data)
with transaction.atomic():
SoilDepthData.objects.update_or_create(
@@ -106,12 +102,37 @@ def fetch_soil_data_task(self, latitude: float, longitude: float):
defaults=fields,
)
with transaction.atomic():
location.task_id = ""
location.save(update_fields=["task_id"])
if task_id:
with transaction.atomic():
location.task_id = ""
location.save(update_fields=["task_id"])
return {
"status": "completed",
"location_id": location.id,
"depths": DEPTHS,
}
@app.task(bind=True)
def fetch_soil_data_task(self, latitude: float, longitude: float):
"""
واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB.
برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود.
"""
try:
return fetch_soil_data_for_coordinates(
latitude=latitude,
longitude=longitude,
task_id=self.request.id,
progress_callback=self.update_state,
)
except requests.RequestException as e:
lat = Decimal(str(round(float(latitude), 6)))
lon = Decimal(str(round(float(longitude), 6)))
location = SoilLocation.objects.filter(latitude=lat, longitude=lon).first()
return {
"status": "error",
"location_id": getattr(location, "id", None),
"error": str(e),
}