42 lines
1.5 KiB
Python
42 lines
1.5 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from django.apps import apps
|
||
|
|
|
||
|
|
from farm_data.models import SensorData
|
||
|
|
|
||
|
|
|
||
|
|
def build_water_stress_summary(sensor: Any) -> dict[str, Any]:
|
||
|
|
moisture = float(getattr(sensor, "soil_moisture", None) or 0.0)
|
||
|
|
water_stress = max(0, min(100, round(35 - (moisture / 2))))
|
||
|
|
return {
|
||
|
|
"waterStressIndex": water_stress,
|
||
|
|
"level": "پایین" if water_stress <= 20 else "متوسط" if water_stress <= 45 else "بالا",
|
||
|
|
"sourceMetric": {
|
||
|
|
"soilMoisture": round(moisture, 2),
|
||
|
|
"formula": "clamp(round(35 - (soil_moisture / 2)), 0, 100)",
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class WaterStressService:
|
||
|
|
def get_water_stress(self, *, farm_uuid: str, plant_name: str | None = None) -> dict[str, Any]:
|
||
|
|
sensor = SensorData.objects.filter(farm_uuid=farm_uuid).first()
|
||
|
|
if sensor is None:
|
||
|
|
raise ValueError("Farm not found.")
|
||
|
|
simulation_service = apps.get_app_config("crop_simulation").get_water_stress_service()
|
||
|
|
try:
|
||
|
|
return simulation_service.get_water_stress(
|
||
|
|
farm_uuid=str(sensor.farm_uuid),
|
||
|
|
plant_name=plant_name,
|
||
|
|
)
|
||
|
|
except Exception as exc:
|
||
|
|
fallback = {
|
||
|
|
"farm_uuid": str(sensor.farm_uuid),
|
||
|
|
**build_water_stress_summary(sensor),
|
||
|
|
}
|
||
|
|
fallback["sourceMetric"]["engine"] = "sensor_fallback"
|
||
|
|
fallback["sourceMetric"]["fallbackReason"] = str(exc)
|
||
|
|
return fallback
|