Files
Logic/Modules/Ai/irrigation/evapotranspiration.py
T
2026-05-11 03:27:21 +03:30

195 lines
7.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from math import acos, cos, exp, pi, sin, sqrt, tan
from typing import Any
DEFAULT_CROP_PROFILE = {
"kc_initial": 0.6,
"kc_mid": 1.05,
"kc_end": 0.8,
"growth_stage_duration": {
"initial": 20,
"mid": 30,
"late": 25,
},
"current_stage": "mid",
}
DEFAULT_STAGE_KC = {
"initial": "kc_initial",
"development": "kc_mid",
"mid": "kc_mid",
"late": "kc_end",
"end": "kc_end",
}
@dataclass
class DailyWaterNeed:
forecast_date: str
et0_mm: float
etc_mm: float
effective_rainfall_mm: float
net_irrigation_mm: float
gross_irrigation_mm: float
kc: float
irrigation_timing: str
def _saturation_vapor_pressure(temperature_c: float) -> float:
return 0.6108 * exp((17.27 * temperature_c) / (temperature_c + 237.3))
def _slope_vapor_pressure_curve(temperature_c: float) -> float:
es = _saturation_vapor_pressure(temperature_c)
return (4098 * es) / ((temperature_c + 237.3) ** 2)
def _psychrometric_constant(elevation_m: float = 0.0) -> float:
pressure = 101.3 * (((293.0 - (0.0065 * elevation_m)) / 293.0) ** 5.26)
return 0.000665 * pressure
def _extraterrestrial_radiation(day_of_year: int, latitude_deg: float) -> float:
latitude_rad = latitude_deg * pi / 180.0
dr = 1 + (0.033 * cos((2 * pi / 365) * day_of_year))
solar_declination = 0.409 * sin(((2 * pi / 365) * day_of_year) - 1.39)
ws = acos(max(-1.0, min(1.0, -tan(latitude_rad) * tan(solar_declination))))
return (
(24 * 60 / pi)
* 0.0820
* dr
* (
(ws * sin(latitude_rad) * sin(solar_declination))
+ (cos(latitude_rad) * cos(solar_declination) * sin(ws))
)
)
def _estimate_net_radiation(
forecast: Any,
latitude_deg: float,
elevation_m: float = 0.0,
) -> float:
day_of_year = forecast.forecast_date.timetuple().tm_yday
ra = _extraterrestrial_radiation(day_of_year, latitude_deg)
temp_max = float(getattr(forecast, "temperature_max", None) or getattr(forecast, "temperature_mean", 25.0))
temp_min = float(getattr(forecast, "temperature_min", None) or getattr(forecast, "temperature_mean", 15.0))
rh_mean = float(getattr(forecast, "humidity_mean", None) or 50.0)
temp_range = max(temp_max - temp_min, 0.1)
rs = 0.16 * sqrt(temp_range) * ra
rso = (0.75 + (2e-5 * elevation_m)) * ra
ea = (rh_mean / 100.0) * _saturation_vapor_pressure((temp_max + temp_min) / 2.0)
rns = (1 - 0.23) * rs
rs_rso_ratio = min(rs / rso, 1.0) if rso else 0.0
rnl = 4.903e-9 * (
(((temp_max + 273.16) ** 4) + ((temp_min + 273.16) ** 4)) / 2
) * (0.34 - (0.14 * sqrt(max(ea, 0.0)))) * ((1.35 * rs_rso_ratio) - 0.35)
return max(rns - rnl, 0.0)
def calculate_daily_et0(forecast: Any, latitude_deg: float, elevation_m: float = 0.0) -> float:
temp_mean = float(getattr(forecast, "temperature_mean", None) or 20.0)
temp_max = float(getattr(forecast, "temperature_max", None) or temp_mean + 3.0)
temp_min = float(getattr(forecast, "temperature_min", None) or temp_mean - 3.0)
wind_speed_kmh = float(getattr(forecast, "wind_speed_max", None) or 7.2)
wind_speed_ms = wind_speed_kmh / 3.6
rh_mean = float(getattr(forecast, "humidity_mean", None) or 50.0)
delta = _slope_vapor_pressure_curve(temp_mean)
gamma = _psychrometric_constant(elevation_m)
rn = _estimate_net_radiation(forecast, latitude_deg=latitude_deg, elevation_m=elevation_m)
es = (_saturation_vapor_pressure(temp_max) + _saturation_vapor_pressure(temp_min)) / 2.0
ea = (rh_mean / 100.0) * _saturation_vapor_pressure(temp_mean)
g = 0.0
numerator = (0.408 * delta * (rn - g)) + (gamma * (900.0 / (temp_mean + 273.0)) * wind_speed_ms * (es - ea))
denominator = delta + (gamma * (1 + (0.34 * wind_speed_ms)))
if denominator == 0:
return 0.0
return round(max(numerator / denominator, 0.0), 3)
def resolve_crop_profile(plant: Any | None, growth_stage: str | None = None) -> dict:
profile = getattr(plant, "irrigation_profile", None) or {}
merged = {**DEFAULT_CROP_PROFILE, **profile}
durations = {**DEFAULT_CROP_PROFILE["growth_stage_duration"], **profile.get("growth_stage_duration", {})}
merged["growth_stage_duration"] = durations
merged["current_stage"] = (growth_stage or profile.get("current_stage") or DEFAULT_CROP_PROFILE["current_stage"]).lower()
return merged
def resolve_kc(profile: dict, growth_stage: str | None = None) -> float:
stage = (growth_stage or profile.get("current_stage") or "mid").lower()
kc_key = DEFAULT_STAGE_KC.get(stage, "kc_mid")
return float(profile.get(kc_key, DEFAULT_CROP_PROFILE[kc_key]))
def effective_rainfall(precipitation_mm: float, etc_mm: float) -> float:
if precipitation_mm <= 0:
return 0.0
return round(min(precipitation_mm * 0.8, etc_mm), 3)
def recommend_irrigation_timing(forecast: Any) -> str:
temp_mean = float(getattr(forecast, "temperature_mean", None) or 20.0)
wind_speed = float(getattr(forecast, "wind_speed_max", None) or 0.0)
if temp_mean >= 30 or wind_speed >= 20:
return "اوایل صبح"
if temp_mean <= 18:
return "اواخر صبح"
return "اوایل صبح یا نزدیک غروب"
def calculate_daily_water_need(
forecast: Any,
latitude_deg: float,
crop_profile: dict,
growth_stage: str | None = None,
irrigation_efficiency_percent: float | None = None,
elevation_m: float = 0.0,
) -> DailyWaterNeed:
et0_mm = calculate_daily_et0(forecast, latitude_deg=latitude_deg, elevation_m=elevation_m)
kc = resolve_kc(crop_profile, growth_stage=growth_stage)
etc_mm = round(kc * et0_mm, 3)
rainfall_mm = float(getattr(forecast, "precipitation", None) or 0.0)
effective_rain_mm = effective_rainfall(rainfall_mm, etc_mm)
net_irrigation_mm = round(max(etc_mm - effective_rain_mm, 0.0), 3)
efficiency = max((irrigation_efficiency_percent or 100.0) / 100.0, 0.01)
gross_irrigation_mm = round(net_irrigation_mm / efficiency, 3)
return DailyWaterNeed(
forecast_date=forecast.forecast_date.isoformat() if isinstance(forecast.forecast_date, date) else str(forecast.forecast_date),
et0_mm=et0_mm,
etc_mm=etc_mm,
effective_rainfall_mm=effective_rain_mm,
net_irrigation_mm=net_irrigation_mm,
gross_irrigation_mm=gross_irrigation_mm,
kc=round(kc, 3),
irrigation_timing=recommend_irrigation_timing(forecast),
)
def calculate_forecast_water_needs(
forecasts: list[Any],
latitude_deg: float,
crop_profile: dict,
growth_stage: str | None = None,
irrigation_efficiency_percent: float | None = None,
elevation_m: float = 0.0,
) -> list[dict]:
return [
calculate_daily_water_need(
forecast=forecast,
latitude_deg=latitude_deg,
crop_profile=crop_profile,
growth_stage=growth_stage,
irrigation_efficiency_percent=irrigation_efficiency_percent,
elevation_m=elevation_m,
).__dict__
for forecast in forecasts
]