From 5acee1fa2c416cfde646740b5f4c8377db23ded2 Mon Sep 17 00:00:00 2001 From: Mohammad Sajad Pourajam Date: Tue, 7 Apr 2026 01:08:41 +0330 Subject: [PATCH] UPDATE --- farm_data/services.py | 38 ++++++++++++ farm_data/tests/test_farm_detail_api.py | 66 +++++++++++++++++++++ farm_data/views.py | 17 +++++- location_data/tasks.py | 77 ++++++++++++++++--------- weather/services.py | 34 +++++++++-- 5 files changed, 198 insertions(+), 34 deletions(-) diff --git a/farm_data/services.py b/farm_data/services.py index 38e5183..7c4e99e 100644 --- a/farm_data/services.py +++ b/farm_data/services.py @@ -6,7 +6,9 @@ from django.db import transaction from location_data.models import SoilLocation from location_data.serializers import SoilDepthDataSerializer +from location_data.tasks import fetch_soil_data_for_coordinates from plant.serializers import PlantSerializer +from weather.services import update_weather_for_location from weather.models import WeatherForecast from .models import SensorData @@ -17,6 +19,10 @@ DEPTH_PRIORITY = ["0-5cm", "5-15cm", "15-30cm"] DECIMAL_PRECISION = Decimal("0.000001") +class ExternalDataSyncError(Exception): + """خطا در همگام‌سازی داده از سرویس‌های بیرونی.""" + + def get_farm_details(farm_uuid: str): farm = ( SensorData.objects.select_related("center_location", "weather_forecast") @@ -104,6 +110,38 @@ def resolve_weather_for_location(location: SoilLocation) -> WeatherForecast | No ) +def ensure_location_and_weather_data(location: SoilLocation) -> tuple[SoilLocation, WeatherForecast | None]: + """ + اگر داده خاک یا آب‌وهوا برای location موجود نباشد، از سرویس مربوطه + واکشی و در دیتابیس ذخیره می‌شود. + """ + if not location.is_complete: + try: + soil_result = fetch_soil_data_for_coordinates( + latitude=float(location.latitude), + longitude=float(location.longitude), + ) + except Exception as exc: + raise ExternalDataSyncError(f"خطا در واکشی داده خاک: {exc}") from exc + + if soil_result.get("status") != "completed": + raise ExternalDataSyncError( + soil_result.get("error") or "واکشی داده خاک کامل نشد." + ) + location.refresh_from_db() + + weather_forecast = resolve_weather_for_location(location) + if weather_forecast is None: + weather_result = update_weather_for_location(location) + if weather_result.get("status") not in {"success", "no_data"}: + raise ExternalDataSyncError( + weather_result.get("error") or "واکشی داده آب‌وهوا کامل نشد." + ) + weather_forecast = resolve_weather_for_location(location) + + return location, weather_forecast + + def _flatten_sensor_metrics(sensor_payload: dict | None) -> dict: if not isinstance(sensor_payload, dict): return {} diff --git a/farm_data/tests/test_farm_detail_api.py b/farm_data/tests/test_farm_detail_api.py index 67b3322..e07510b 100644 --- a/farm_data/tests/test_farm_detail_api.py +++ b/farm_data/tests/test_farm_detail_api.py @@ -1,4 +1,5 @@ from datetime import date +from unittest.mock import patch import uuid from django.test import TestCase @@ -193,3 +194,68 @@ class FarmDataUpsertApiTests(TestCase): self.assertEqual(str(farm.center_location.latitude), "50.010000") self.assertEqual(str(farm.center_location.longitude), "50.010000") self.assertIsNone(farm.weather_forecast_id) + + @patch("farm_data.services.update_weather_for_location") + @patch("farm_data.services.fetch_soil_data_for_coordinates") + def test_post_fetches_missing_location_and_weather_data( + self, + mock_fetch_soil_data_for_coordinates, + mock_update_weather_for_location, + ): + missing_boundary = square_boundary_for_center(36.0, 52.0) + farm_uuid = uuid.uuid4() + + def soil_side_effect(latitude, longitude, task_id="", progress_callback=None): + location = SoilLocation.objects.get( + latitude="36.000000", + longitude="52.000000", + ) + SoilDepthData.objects.update_or_create( + soil_location=location, + depth_label="0-5cm", + defaults={"clay": 20.0}, + ) + SoilDepthData.objects.update_or_create( + soil_location=location, + depth_label="5-15cm", + defaults={"clay": 18.0}, + ) + SoilDepthData.objects.update_or_create( + soil_location=location, + depth_label="15-30cm", + defaults={"clay": 16.0}, + ) + return {"status": "completed", "location_id": location.id, "depths": ["0-5cm", "5-15cm", "15-30cm"]} + + def weather_side_effect(location): + WeatherForecast.objects.update_or_create( + location=location, + forecast_date=date(2026, 4, 12), + defaults={ + "temperature_min": 10.0, + "temperature_max": 20.0, + "temperature_mean": 15.0, + }, + ) + return {"status": "success", "location_id": location.id, "days_updated": 1} + + mock_fetch_soil_data_for_coordinates.side_effect = soil_side_effect + mock_update_weather_for_location.side_effect = weather_side_effect + + response = self.client.post( + "/api/farm-data/", + data={ + "farm_uuid": str(farm_uuid), + "farm_boundary": missing_boundary, + "sensor_payload": {"sensor-7-1": {"soil_moisture": 44.0}}, + }, + format="json", + ) + + self.assertEqual(response.status_code, 201) + mock_fetch_soil_data_for_coordinates.assert_called_once() + mock_update_weather_for_location.assert_called_once() + + farm = SensorData.objects.get(farm_uuid=farm_uuid) + self.assertEqual(farm.center_location.depths.count(), 3) + self.assertIsNotNone(farm.weather_forecast_id) diff --git a/farm_data/views.py b/farm_data/views.py index b68fc3a..8d66b56 100644 --- a/farm_data/views.py +++ b/farm_data/views.py @@ -21,9 +21,10 @@ from .serializers import ( SensorParameterSerializer, ) from .services import ( + ExternalDataSyncError, + ensure_location_and_weather_data, get_farm_details, resolve_center_location_from_boundary, - resolve_weather_for_location, ) @@ -93,6 +94,10 @@ class FarmDataUpsertView(APIView): SensorDataValidationErrorSerializer, "داده ورودی نامعتبر است.", ), + 502: build_response( + SensorDataNotFoundSerializer, + "واکشی داده خاک یا آب‌وهوا از سرویس بیرونی ناموفق بود.", + ), }, examples=[ OpenApiExample( @@ -171,7 +176,15 @@ class FarmDataUpsertView(APIView): {"code": 400, "msg": "داده نامعتبر.", "data": {"farm_boundary": [str(exc)]}}, status=status.HTTP_400_BAD_REQUEST, ) - weather_forecast = resolve_weather_for_location(center_location) + try: + center_location, weather_forecast = ensure_location_and_weather_data( + center_location + ) + except ExternalDataSyncError as exc: + return Response( + {"code": 502, "msg": str(exc), "data": None}, + status=status.HTTP_502_BAD_GATEWAY, + ) with transaction.atomic(): farm_data, created = SensorData.objects.get_or_create( diff --git a/location_data/tasks.py b/location_data/tasks.py index f52f91f..fb8f047 100644 --- a/location_data/tasks.py +++ b/location_data/tasks.py @@ -60,11 +60,15 @@ def _parse_response_to_fields(data: dict) -> dict: return fields -@app.task(bind=True) -def fetch_soil_data_task(self, latitude: float, longitude: float): +def fetch_soil_data_for_coordinates( + latitude: float, + longitude: float, + task_id: str = "", + progress_callback=None, +): """ - واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB. - برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود. + واکشی سنکرون داده خاک برای مختصات داده‌شده و ذخیره در DB. + این helper هم توسط Celery task و هم توسط endpointهای sync استفاده می‌شود. """ lat = Decimal(str(round(float(latitude), 6))) lon = Decimal(str(round(float(longitude), 6))) @@ -73,31 +77,23 @@ def fetch_soil_data_task(self, latitude: float, longitude: float): location, created = SoilLocation.objects.select_for_update().get_or_create( latitude=lat, longitude=lon, - defaults={"task_id": self.request.id}, + defaults={"task_id": task_id}, ) - if not created: - location.task_id = self.request.id + if not created and task_id: + location.task_id = task_id location.save(update_fields=["task_id"]) for i, depth in enumerate(DEPTHS): - self.update_state( - state="PROGRESS", - meta={ - "current": i + 1, - "total": len(DEPTHS), - "message": f"در حال واکشی عمق {depth}...", - }, - ) - try: - data = _fetch_soilgrids(float(lon), float(lat), depth) - except requests.RequestException as e: - return { - "status": "error", - "location_id": location.id, - "depth": depth, - "error": str(e), - } - + if progress_callback is not None: + progress_callback( + state="PROGRESS", + meta={ + "current": i + 1, + "total": len(DEPTHS), + "message": f"در حال واکشی عمق {depth}...", + }, + ) + data = _fetch_soilgrids(float(lon), float(lat), depth) fields = _parse_response_to_fields(data) with transaction.atomic(): SoilDepthData.objects.update_or_create( @@ -106,12 +102,37 @@ def fetch_soil_data_task(self, latitude: float, longitude: float): defaults=fields, ) - with transaction.atomic(): - location.task_id = "" - location.save(update_fields=["task_id"]) + if task_id: + with transaction.atomic(): + location.task_id = "" + location.save(update_fields=["task_id"]) return { "status": "completed", "location_id": location.id, "depths": DEPTHS, } + + +@app.task(bind=True) +def fetch_soil_data_task(self, latitude: float, longitude: float): + """ + واکشی داده‌های خاک برای مختصات داده‌شده از SoilGrids و ذخیره در DB. + برای هر عمق (0-5cm, 5-15cm, 15-30cm) یک ریکوئست جدا زده می‌شود. + """ + try: + return fetch_soil_data_for_coordinates( + latitude=latitude, + longitude=longitude, + task_id=self.request.id, + progress_callback=self.update_state, + ) + except requests.RequestException as e: + lat = Decimal(str(round(float(latitude), 6))) + lon = Decimal(str(round(float(longitude), 6))) + location = SoilLocation.objects.filter(latitude=lat, longitude=lon).first() + return { + "status": "error", + "location_id": getattr(location, "id", None), + "error": str(e), + } diff --git a/weather/services.py b/weather/services.py index 741e823..59a9bbc 100644 --- a/weather/services.py +++ b/weather/services.py @@ -5,6 +5,7 @@ import logging from datetime import date, timedelta +import requests from django.conf import settings from django.db import transaction @@ -42,10 +43,35 @@ def fetch_weather_from_api(latitude: float, longitude: float) -> dict | None: } } """ - # TODO: اتصال واقعی به API هواشناسی - # api_url = settings.WEATHER_API_BASE_URL - # api_key = settings.WEATHER_API_KEY - return None + params = { + "latitude": latitude, + "longitude": longitude, + "forecast_days": 7, + "timezone": "auto", + "daily": [ + "temperature_2m_max", + "temperature_2m_min", + "temperature_2m_mean", + "precipitation_sum", + "precipitation_probability_max", + "relative_humidity_2m_mean", + "wind_speed_10m_max", + "et0_fao_evapotranspiration", + "weather_code", + ], + } + headers = {"accept": "application/json"} + if settings.WEATHER_API_KEY: + headers["Authorization"] = f"Bearer {settings.WEATHER_API_KEY}" + + response = requests.get( + settings.WEATHER_API_BASE_URL, + params=params, + headers=headers, + timeout=60, + ) + response.raise_for_status() + return response.json() def parse_weather_response(data: dict) -> list[dict]: