195 lines
7.1 KiB
Python
195 lines
7.1 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from datetime import date
|
||
|
|
from math import acos, cos, exp, pi, sin, sqrt, tan
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
|
||
|
|
DEFAULT_CROP_PROFILE = {
|
||
|
|
"kc_initial": 0.6,
|
||
|
|
"kc_mid": 1.05,
|
||
|
|
"kc_end": 0.8,
|
||
|
|
"growth_stage_duration": {
|
||
|
|
"initial": 20,
|
||
|
|
"mid": 30,
|
||
|
|
"late": 25,
|
||
|
|
},
|
||
|
|
"current_stage": "mid",
|
||
|
|
}
|
||
|
|
|
||
|
|
DEFAULT_STAGE_KC = {
|
||
|
|
"initial": "kc_initial",
|
||
|
|
"development": "kc_mid",
|
||
|
|
"mid": "kc_mid",
|
||
|
|
"late": "kc_end",
|
||
|
|
"end": "kc_end",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class DailyWaterNeed:
|
||
|
|
forecast_date: str
|
||
|
|
et0_mm: float
|
||
|
|
etc_mm: float
|
||
|
|
effective_rainfall_mm: float
|
||
|
|
net_irrigation_mm: float
|
||
|
|
gross_irrigation_mm: float
|
||
|
|
kc: float
|
||
|
|
irrigation_timing: str
|
||
|
|
|
||
|
|
|
||
|
|
def _saturation_vapor_pressure(temperature_c: float) -> float:
|
||
|
|
return 0.6108 * exp((17.27 * temperature_c) / (temperature_c + 237.3))
|
||
|
|
|
||
|
|
|
||
|
|
def _slope_vapor_pressure_curve(temperature_c: float) -> float:
|
||
|
|
es = _saturation_vapor_pressure(temperature_c)
|
||
|
|
return (4098 * es) / ((temperature_c + 237.3) ** 2)
|
||
|
|
|
||
|
|
|
||
|
|
def _psychrometric_constant(elevation_m: float = 0.0) -> float:
|
||
|
|
pressure = 101.3 * (((293.0 - (0.0065 * elevation_m)) / 293.0) ** 5.26)
|
||
|
|
return 0.000665 * pressure
|
||
|
|
|
||
|
|
|
||
|
|
def _extraterrestrial_radiation(day_of_year: int, latitude_deg: float) -> float:
|
||
|
|
latitude_rad = latitude_deg * pi / 180.0
|
||
|
|
dr = 1 + (0.033 * cos((2 * pi / 365) * day_of_year))
|
||
|
|
solar_declination = 0.409 * sin(((2 * pi / 365) * day_of_year) - 1.39)
|
||
|
|
ws = acos(max(-1.0, min(1.0, -tan(latitude_rad) * tan(solar_declination))))
|
||
|
|
return (
|
||
|
|
(24 * 60 / pi)
|
||
|
|
* 0.0820
|
||
|
|
* dr
|
||
|
|
* (
|
||
|
|
(ws * sin(latitude_rad) * sin(solar_declination))
|
||
|
|
+ (cos(latitude_rad) * cos(solar_declination) * sin(ws))
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _estimate_net_radiation(
|
||
|
|
forecast: Any,
|
||
|
|
latitude_deg: float,
|
||
|
|
elevation_m: float = 0.0,
|
||
|
|
) -> float:
|
||
|
|
day_of_year = forecast.forecast_date.timetuple().tm_yday
|
||
|
|
ra = _extraterrestrial_radiation(day_of_year, latitude_deg)
|
||
|
|
temp_max = float(getattr(forecast, "temperature_max", None) or getattr(forecast, "temperature_mean", 25.0))
|
||
|
|
temp_min = float(getattr(forecast, "temperature_min", None) or getattr(forecast, "temperature_mean", 15.0))
|
||
|
|
rh_mean = float(getattr(forecast, "humidity_mean", None) or 50.0)
|
||
|
|
|
||
|
|
temp_range = max(temp_max - temp_min, 0.1)
|
||
|
|
rs = 0.16 * sqrt(temp_range) * ra
|
||
|
|
rso = (0.75 + (2e-5 * elevation_m)) * ra
|
||
|
|
ea = (rh_mean / 100.0) * _saturation_vapor_pressure((temp_max + temp_min) / 2.0)
|
||
|
|
rns = (1 - 0.23) * rs
|
||
|
|
rs_rso_ratio = min(rs / rso, 1.0) if rso else 0.0
|
||
|
|
rnl = 4.903e-9 * (
|
||
|
|
(((temp_max + 273.16) ** 4) + ((temp_min + 273.16) ** 4)) / 2
|
||
|
|
) * (0.34 - (0.14 * sqrt(max(ea, 0.0)))) * ((1.35 * rs_rso_ratio) - 0.35)
|
||
|
|
return max(rns - rnl, 0.0)
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_daily_et0(forecast: Any, latitude_deg: float, elevation_m: float = 0.0) -> float:
|
||
|
|
temp_mean = float(getattr(forecast, "temperature_mean", None) or 20.0)
|
||
|
|
temp_max = float(getattr(forecast, "temperature_max", None) or temp_mean + 3.0)
|
||
|
|
temp_min = float(getattr(forecast, "temperature_min", None) or temp_mean - 3.0)
|
||
|
|
wind_speed_kmh = float(getattr(forecast, "wind_speed_max", None) or 7.2)
|
||
|
|
wind_speed_ms = wind_speed_kmh / 3.6
|
||
|
|
rh_mean = float(getattr(forecast, "humidity_mean", None) or 50.0)
|
||
|
|
|
||
|
|
delta = _slope_vapor_pressure_curve(temp_mean)
|
||
|
|
gamma = _psychrometric_constant(elevation_m)
|
||
|
|
rn = _estimate_net_radiation(forecast, latitude_deg=latitude_deg, elevation_m=elevation_m)
|
||
|
|
es = (_saturation_vapor_pressure(temp_max) + _saturation_vapor_pressure(temp_min)) / 2.0
|
||
|
|
ea = (rh_mean / 100.0) * _saturation_vapor_pressure(temp_mean)
|
||
|
|
g = 0.0
|
||
|
|
|
||
|
|
numerator = (0.408 * delta * (rn - g)) + (gamma * (900.0 / (temp_mean + 273.0)) * wind_speed_ms * (es - ea))
|
||
|
|
denominator = delta + (gamma * (1 + (0.34 * wind_speed_ms)))
|
||
|
|
if denominator == 0:
|
||
|
|
return 0.0
|
||
|
|
return round(max(numerator / denominator, 0.0), 3)
|
||
|
|
|
||
|
|
|
||
|
|
def resolve_crop_profile(plant: Any | None, growth_stage: str | None = None) -> dict:
|
||
|
|
profile = getattr(plant, "irrigation_profile", None) or {}
|
||
|
|
merged = {**DEFAULT_CROP_PROFILE, **profile}
|
||
|
|
durations = {**DEFAULT_CROP_PROFILE["growth_stage_duration"], **profile.get("growth_stage_duration", {})}
|
||
|
|
merged["growth_stage_duration"] = durations
|
||
|
|
merged["current_stage"] = (growth_stage or profile.get("current_stage") or DEFAULT_CROP_PROFILE["current_stage"]).lower()
|
||
|
|
return merged
|
||
|
|
|
||
|
|
|
||
|
|
def resolve_kc(profile: dict, growth_stage: str | None = None) -> float:
|
||
|
|
stage = (growth_stage or profile.get("current_stage") or "mid").lower()
|
||
|
|
kc_key = DEFAULT_STAGE_KC.get(stage, "kc_mid")
|
||
|
|
return float(profile.get(kc_key, DEFAULT_CROP_PROFILE[kc_key]))
|
||
|
|
|
||
|
|
|
||
|
|
def effective_rainfall(precipitation_mm: float, etc_mm: float) -> float:
|
||
|
|
if precipitation_mm <= 0:
|
||
|
|
return 0.0
|
||
|
|
return round(min(precipitation_mm * 0.8, etc_mm), 3)
|
||
|
|
|
||
|
|
|
||
|
|
def recommend_irrigation_timing(forecast: Any) -> str:
|
||
|
|
temp_mean = float(getattr(forecast, "temperature_mean", None) or 20.0)
|
||
|
|
wind_speed = float(getattr(forecast, "wind_speed_max", None) or 0.0)
|
||
|
|
if temp_mean >= 30 or wind_speed >= 20:
|
||
|
|
return "اوایل صبح"
|
||
|
|
if temp_mean <= 18:
|
||
|
|
return "اواخر صبح"
|
||
|
|
return "اوایل صبح یا نزدیک غروب"
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_daily_water_need(
|
||
|
|
forecast: Any,
|
||
|
|
latitude_deg: float,
|
||
|
|
crop_profile: dict,
|
||
|
|
growth_stage: str | None = None,
|
||
|
|
irrigation_efficiency_percent: float | None = None,
|
||
|
|
elevation_m: float = 0.0,
|
||
|
|
) -> DailyWaterNeed:
|
||
|
|
et0_mm = calculate_daily_et0(forecast, latitude_deg=latitude_deg, elevation_m=elevation_m)
|
||
|
|
kc = resolve_kc(crop_profile, growth_stage=growth_stage)
|
||
|
|
etc_mm = round(kc * et0_mm, 3)
|
||
|
|
rainfall_mm = float(getattr(forecast, "precipitation", None) or 0.0)
|
||
|
|
effective_rain_mm = effective_rainfall(rainfall_mm, etc_mm)
|
||
|
|
net_irrigation_mm = round(max(etc_mm - effective_rain_mm, 0.0), 3)
|
||
|
|
efficiency = max((irrigation_efficiency_percent or 100.0) / 100.0, 0.01)
|
||
|
|
gross_irrigation_mm = round(net_irrigation_mm / efficiency, 3)
|
||
|
|
return DailyWaterNeed(
|
||
|
|
forecast_date=forecast.forecast_date.isoformat() if isinstance(forecast.forecast_date, date) else str(forecast.forecast_date),
|
||
|
|
et0_mm=et0_mm,
|
||
|
|
etc_mm=etc_mm,
|
||
|
|
effective_rainfall_mm=effective_rain_mm,
|
||
|
|
net_irrigation_mm=net_irrigation_mm,
|
||
|
|
gross_irrigation_mm=gross_irrigation_mm,
|
||
|
|
kc=round(kc, 3),
|
||
|
|
irrigation_timing=recommend_irrigation_timing(forecast),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def calculate_forecast_water_needs(
|
||
|
|
forecasts: list[Any],
|
||
|
|
latitude_deg: float,
|
||
|
|
crop_profile: dict,
|
||
|
|
growth_stage: str | None = None,
|
||
|
|
irrigation_efficiency_percent: float | None = None,
|
||
|
|
elevation_m: float = 0.0,
|
||
|
|
) -> list[dict]:
|
||
|
|
return [
|
||
|
|
calculate_daily_water_need(
|
||
|
|
forecast=forecast,
|
||
|
|
latitude_deg=latitude_deg,
|
||
|
|
crop_profile=crop_profile,
|
||
|
|
growth_stage=growth_stage,
|
||
|
|
irrigation_efficiency_percent=irrigation_efficiency_percent,
|
||
|
|
elevation_m=elevation_m,
|
||
|
|
).__dict__
|
||
|
|
for forecast in forecasts
|
||
|
|
]
|