from __future__ import annotations from dataclasses import dataclass from datetime import date from math import acos, cos, exp, pi, sin, sqrt, tan from typing import Any DEFAULT_CROP_PROFILE = { "kc_initial": 0.6, "kc_mid": 1.05, "kc_end": 0.8, "growth_stage_duration": { "initial": 20, "mid": 30, "late": 25, }, "current_stage": "mid", } DEFAULT_STAGE_KC = { "initial": "kc_initial", "development": "kc_mid", "mid": "kc_mid", "late": "kc_end", "end": "kc_end", } @dataclass class DailyWaterNeed: forecast_date: str et0_mm: float etc_mm: float effective_rainfall_mm: float net_irrigation_mm: float gross_irrigation_mm: float kc: float irrigation_timing: str def _saturation_vapor_pressure(temperature_c: float) -> float: return 0.6108 * exp((17.27 * temperature_c) / (temperature_c + 237.3)) def _slope_vapor_pressure_curve(temperature_c: float) -> float: es = _saturation_vapor_pressure(temperature_c) return (4098 * es) / ((temperature_c + 237.3) ** 2) def _psychrometric_constant(elevation_m: float = 0.0) -> float: pressure = 101.3 * (((293.0 - (0.0065 * elevation_m)) / 293.0) ** 5.26) return 0.000665 * pressure def _extraterrestrial_radiation(day_of_year: int, latitude_deg: float) -> float: latitude_rad = latitude_deg * pi / 180.0 dr = 1 + (0.033 * cos((2 * pi / 365) * day_of_year)) solar_declination = 0.409 * sin(((2 * pi / 365) * day_of_year) - 1.39) ws = acos(max(-1.0, min(1.0, -tan(latitude_rad) * tan(solar_declination)))) return ( (24 * 60 / pi) * 0.0820 * dr * ( (ws * sin(latitude_rad) * sin(solar_declination)) + (cos(latitude_rad) * cos(solar_declination) * sin(ws)) ) ) def _estimate_net_radiation( forecast: Any, latitude_deg: float, elevation_m: float = 0.0, ) -> float: day_of_year = forecast.forecast_date.timetuple().tm_yday ra = _extraterrestrial_radiation(day_of_year, latitude_deg) temp_max = float(getattr(forecast, "temperature_max", None) or getattr(forecast, "temperature_mean", 25.0)) temp_min = float(getattr(forecast, "temperature_min", None) or getattr(forecast, "temperature_mean", 15.0)) rh_mean = float(getattr(forecast, "humidity_mean", None) or 50.0) temp_range = max(temp_max - temp_min, 0.1) rs = 0.16 * sqrt(temp_range) * ra rso = (0.75 + (2e-5 * elevation_m)) * ra ea = (rh_mean / 100.0) * _saturation_vapor_pressure((temp_max + temp_min) / 2.0) rns = (1 - 0.23) * rs rs_rso_ratio = min(rs / rso, 1.0) if rso else 0.0 rnl = 4.903e-9 * ( (((temp_max + 273.16) ** 4) + ((temp_min + 273.16) ** 4)) / 2 ) * (0.34 - (0.14 * sqrt(max(ea, 0.0)))) * ((1.35 * rs_rso_ratio) - 0.35) return max(rns - rnl, 0.0) def calculate_daily_et0(forecast: Any, latitude_deg: float, elevation_m: float = 0.0) -> float: temp_mean = float(getattr(forecast, "temperature_mean", None) or 20.0) temp_max = float(getattr(forecast, "temperature_max", None) or temp_mean + 3.0) temp_min = float(getattr(forecast, "temperature_min", None) or temp_mean - 3.0) wind_speed_kmh = float(getattr(forecast, "wind_speed_max", None) or 7.2) wind_speed_ms = wind_speed_kmh / 3.6 rh_mean = float(getattr(forecast, "humidity_mean", None) or 50.0) delta = _slope_vapor_pressure_curve(temp_mean) gamma = _psychrometric_constant(elevation_m) rn = _estimate_net_radiation(forecast, latitude_deg=latitude_deg, elevation_m=elevation_m) es = (_saturation_vapor_pressure(temp_max) + _saturation_vapor_pressure(temp_min)) / 2.0 ea = (rh_mean / 100.0) * _saturation_vapor_pressure(temp_mean) g = 0.0 numerator = (0.408 * delta * (rn - g)) + (gamma * (900.0 / (temp_mean + 273.0)) * wind_speed_ms * (es - ea)) denominator = delta + (gamma * (1 + (0.34 * wind_speed_ms))) if denominator == 0: return 0.0 return round(max(numerator / denominator, 0.0), 3) def resolve_crop_profile(plant: Any | None, growth_stage: str | None = None) -> dict: profile = getattr(plant, "irrigation_profile", None) or {} merged = {**DEFAULT_CROP_PROFILE, **profile} durations = {**DEFAULT_CROP_PROFILE["growth_stage_duration"], **profile.get("growth_stage_duration", {})} merged["growth_stage_duration"] = durations merged["current_stage"] = (growth_stage or profile.get("current_stage") or DEFAULT_CROP_PROFILE["current_stage"]).lower() return merged def resolve_kc(profile: dict, growth_stage: str | None = None) -> float: stage = (growth_stage or profile.get("current_stage") or "mid").lower() kc_key = DEFAULT_STAGE_KC.get(stage, "kc_mid") return float(profile.get(kc_key, DEFAULT_CROP_PROFILE[kc_key])) def effective_rainfall(precipitation_mm: float, etc_mm: float) -> float: if precipitation_mm <= 0: return 0.0 return round(min(precipitation_mm * 0.8, etc_mm), 3) def recommend_irrigation_timing(forecast: Any) -> str: temp_mean = float(getattr(forecast, "temperature_mean", None) or 20.0) wind_speed = float(getattr(forecast, "wind_speed_max", None) or 0.0) if temp_mean >= 30 or wind_speed >= 20: return "اوایل صبح" if temp_mean <= 18: return "اواخر صبح" return "اوایل صبح یا نزدیک غروب" def calculate_daily_water_need( forecast: Any, latitude_deg: float, crop_profile: dict, growth_stage: str | None = None, irrigation_efficiency_percent: float | None = None, elevation_m: float = 0.0, ) -> DailyWaterNeed: et0_mm = calculate_daily_et0(forecast, latitude_deg=latitude_deg, elevation_m=elevation_m) kc = resolve_kc(crop_profile, growth_stage=growth_stage) etc_mm = round(kc * et0_mm, 3) rainfall_mm = float(getattr(forecast, "precipitation", None) or 0.0) effective_rain_mm = effective_rainfall(rainfall_mm, etc_mm) net_irrigation_mm = round(max(etc_mm - effective_rain_mm, 0.0), 3) efficiency = max((irrigation_efficiency_percent or 100.0) / 100.0, 0.01) gross_irrigation_mm = round(net_irrigation_mm / efficiency, 3) return DailyWaterNeed( forecast_date=forecast.forecast_date.isoformat() if isinstance(forecast.forecast_date, date) else str(forecast.forecast_date), et0_mm=et0_mm, etc_mm=etc_mm, effective_rainfall_mm=effective_rain_mm, net_irrigation_mm=net_irrigation_mm, gross_irrigation_mm=gross_irrigation_mm, kc=round(kc, 3), irrigation_timing=recommend_irrigation_timing(forecast), ) def calculate_forecast_water_needs( forecasts: list[Any], latitude_deg: float, crop_profile: dict, growth_stage: str | None = None, irrigation_efficiency_percent: float | None = None, elevation_m: float = 0.0, ) -> list[dict]: return [ calculate_daily_water_need( forecast=forecast, latitude_deg=latitude_deg, crop_profile=crop_profile, growth_stage=growth_stage, irrigation_efficiency_percent=irrigation_efficiency_percent, elevation_m=elevation_m, ).__dict__ for forecast in forecasts ]