AI UPDATE

This commit is contained in:
2026-03-22 03:08:27 +03:30
parent 3ee14ca977
commit d977a583c6
37 changed files with 3525 additions and 263 deletions
+872
View File
@@ -0,0 +1,872 @@
# مستند فرمول‌های `dashboard_data/cards`
این فایل توضیح می‌دهد هر کارت در `dashboard_data/cards` چطور داده‌های خروجی خود را محاسبه می‌کند، از چه فیلدهایی استفاده می‌کند، و چه fallbackهایی دارد.
## منبع داده‌های مشترک
کانتکست بیشتر کارت‌ها از `dashboard_data/context.py` می‌آید:
- `sensor`: رکورد اصلی سنسور
- `location`: لوکیشن سنسور
- `depths`: داده‌های عمق خاک از `SoilDepthData`
- `forecasts`: حداکثر ۷ پیش‌بینی آب‌وهوا از امروز به بعد
- `history`: حداکثر ۳۰ رکورد تاریخچه سنسور، مرتب‌شده از جدید به قدیم
- `plants`: گیاه‌های متصل به سنسور
- `irrigation_methods`: حداکثر ۵ روش آبیاری
## توابع کمکی مشترک
این توابع در `dashboard_data/card_utils.py` استفاده می‌شوند:
- `safe_number(value, default=0)`: اگر مقدار `None` باشد، `default` برمی‌گرداند.
- `average(values, default=0)`: میانگین مقادیر غیر `None` را می‌دهد؛ اگر هیچ مقداری نبود `default` برمی‌گرداند.
- `latest_history_value(history, field_name, default=None)`: مقدار `field_name` را از جدیدترین رکورد history می‌گیرد.
- `compute_trend(current, previous)`:
- `diff = round(current_value - previous_value, 1)`
- `trend = "positive"` اگر `diff >= 0`، وگرنه `"negative"`
---
## 1) `farm_overview_kpis.py`
تابع سازنده: `build_farm_overview_kpis`
### ورودی‌های اصلی
- `sensor.soil_moisture``moisture`
- `sensor.soil_ph``ph`
- `sensor.electrical_conductivity``ec`
- `sensor.soil_temperature`
- میانگین `forecast.humidity_mean` برای ۳ پیش‌بینی اول → `humidity`
### فرمول‌ها
#### 1. امتیاز سلامت مزرعه (`farm_health_score`)
```text
health_score = clamp(
round(
100
- abs(65 - moisture)
- (abs(6.8 - ph) * 10)
- (ec * 5)
),
0,
100
)
```
توضیح:
- رطوبت ایده‌آل ۶۵٪ فرض شده.
- pH ایده‌آل ۶.۸ فرض شده.
- EC بالاتر، امتیاز را کم می‌کند.
آستانه‌های نمایش:
- اگر `health_score >= 70` → وضعیت `خوب` و رنگ `success`
- در غیر این صورت → `متوسط` و رنگ `warning`
#### 2. شاخص تنش آبی (`water_stress_index`)
```text
water_stress = clamp(round(35 - (moisture / 2)), 0, 100)
```
توضیح:
- هرچه رطوبت خاک بیشتر شود، تنش آبی کمتر می‌شود.
آستانه نمایش:
- اگر `water_stress <= 20``پایین`
- در غیر این صورت → `متوسط`
#### 3. ریسک بیماری (`disease_risk`)
```text
disease_risk = clamp(
round((humidity * 0.4) + (soil_temperature * 0.6) - 20),
0,
100
)
```
توضیح:
- دمای خاک وزن ۶۰٪ دارد.
- رطوبت هوا وزن ۴۰٪ دارد.
آستانه نمایش:
- اگر `disease_risk < 30``پایین`
- در غیر این صورت → `متوسط`
#### 4. میانگین رطوبت خاک (`avg_soil_moisture`)
```text
avg_soil_moisture = round(moisture)
```
نکته:
- اینجا عملاً فقط از `sensor.soil_moisture` فعلی استفاده می‌شود و واقعاً میانگین چند سنسور یا چند ناحیه محاسبه نمی‌شود.
آستانه نمایش:
- اگر `45 <= moisture <= 75``بهینه`
- در غیر این صورت → `نیازمند بررسی`
#### 5. پیش‌بینی عملکرد (`yield_prediction`)
```text
yield_prediction = round(max(5, health_score / 2.1), 1)
```
فرمول متن chip:
```text
yield_chip = "+" + str(max(0, health_score - 50)) + "%"
```
#### 6. ریسک آفات (`pest_risk`)
```text
pest_risk = max(5, round(disease_risk * 0.7))
```
نکته:
- ریسک آفات به‌صورت مستقیم از ۷۰٪ ریسک بیماری ساخته شده.
---
## 2) `farm_weather_card.py`
تابع سازنده: `build_farm_weather_card`
### منطق کلی
اگر `forecasts` خالی باشد:
- `condition = "نامشخص"`
- `temperature = 0`
- `humidity = 0`
- `windSpeed = 0`
- `chartData.labels = []`
- `chartData.series = [[]]`
در غیر این صورت:
### فرمول‌ها
#### 1. وضعیت آب‌وهوا
```text
condition = weather_condition(current_forecast.weather_code)
```
که `weather_code` با جدول `WMO_CONDITIONS` به متن فارسی تبدیل می‌شود.
#### 2. دما
```text
temperature = round(
safe_number(current_forecast.temperature_mean, current_forecast.temperature_max)
)
```
یعنی:
- اول `temperature_mean`
- اگر `None` بود، `temperature_max`
#### 3. رطوبت
```text
humidity = round(average([current_forecast.humidity_mean], default=0))
```
نکته:
- چون فقط یک مقدار داخل `average` قرار می‌گیرد، عملاً همان `humidity_mean` فعلی است.
#### 4. سرعت باد
```text
windSpeed = round(safe_number(current_forecast.wind_speed_max, 0))
```
#### 5. نمودار دما
برای ۷ روز اول:
```text
labels = [str(forecast.forecast_date) for forecast in forecasts[:7]]
series = [[round(safe_number(forecast.temperature_mean, 0)) for forecast in forecasts[:7]]]
```
---
## 3) `farm_alerts_tracker.py`
تابع سازنده: `build_farm_alerts_tracker`
### ورودی‌ها
- `sensor.soil_moisture``moisture`
- میانگین `humidity_mean` برای ۳ forecast اول → `humidity`
- `temperature_min` برای ۳ forecast اول
### فرمول‌ها
#### 1. هشدار کمبود آب
```text
low_water_count = 2 if moisture < 45 else 0
```
#### 2. هشدار ریسک قارچی
```text
fungal_count = 1 if (humidity > 70 and moisture > 60) else 0
```
#### 3. هشدار یخبندان
```text
frost_count = count(
forecast for first 3 forecasts
if temperature_min <= 0
)
```
در کد:
```text
frost_count = sum(
1 for forecast in forecasts[:3]
if safe_number(forecast.temperature_min, 10) <= 0
)
```
#### 4. مجموع هشدارها
```text
totalAlerts = low_water_count + fungal_count + frost_count
```
#### 5. مقدار radial bar
```text
radialBarValue = min(100, totalAlerts * 10)
```
---
## 4) `sensor_values_list.py`
تابع سازنده: `build_sensor_values_list`
این کارت برای هر آیتم، مقدار فعلی و trend را می‌سازد.
### فرمول trend
برای هر سنسور که از `compute_trend` استفاده می‌کند:
```text
trendNumber = round(current - previous, 1)
trend = "positive" if trendNumber >= 0 else "negative"
```
### آیتم‌ها
#### 1. دمای هوا
```text
title = round(current_weather.temperature_mean or 0) + "°C"
previous = latest_history_value(history, "soil_temperature", 0)
```
نکته مهم:
- trend دمای هوا با `soil_temperature` از history مقایسه می‌شود، نه با history دمای هوا.
#### 2. دمای خاک
```text
current = sensor.soil_temperature
previous = latest_history_value(history, "soil_temperature", 0)
```
#### 3. رطوبت هوا
```text
current = current_weather.humidity_mean or 0
previous = 0
```
نکته:
- همیشه نسبت به صفر trend می‌گیرد، نه history.
#### 4. رطوبت خاک
```text
current = sensor.soil_moisture
previous = latest_history_value(history, "soil_moisture", 0)
```
#### 5. pH خاک
```text
current = sensor.soil_ph
previous = latest_history_value(history, "soil_ph", 0)
```
#### 6. هدایت الکتریکی
```text
current = sensor.electrical_conductivity
previous = latest_history_value(history, "electrical_conductivity", 0)
```
#### 7. شدت نور
```text
title = "850"
trendNumber = 0
trend = "positive"
```
نکته:
- این مقدار کاملاً ثابت (hard-coded) است و فرمولی ندارد.
#### 8. سرعت باد
```text
current = current_weather.wind_speed_max or 0
previous = 0
```
نکته:
- trend سرعت باد هم نسبت به صفر محاسبه می‌شود.
---
## 5) `sensor_radar_chart.py`
تابع سازنده: `build_sensor_radar_chart`
### تابع نرمال‌سازی
```text
to_score(value, lower, upper):
if value is None -> 0
if value <= lower -> 0
if value >= upper -> 100
else -> round(((value - lower) / (upper - lower)) * 100)
```
### سری «امروز»
به‌ترتیب:
```text
soil_temperature_score = to_score(sensor.soil_temperature, 0, 40)
soil_moisture_score = to_score(sensor.soil_moisture, 0, 100)
soil_ph_score = to_score(sensor.soil_ph, 0, 14)
ec_score = to_score(sensor.electrical_conductivity, 0, 5)
light_score = 85
wind_score = to_score(current_weather.wind_speed_max if current_weather else 0, 0, 30)
```
خروجی:
```text
series[0].data = [
soil_temperature_score,
soil_moisture_score,
soil_ph_score,
ec_score,
85,
wind_score
]
```
### سری «ایده‌آل»
```text
[80, 70, 75, 75, 90, 50]
```
نکته:
- این مقادیر ثابت هستند و از دیتابیس محاسبه نمی‌شوند.
---
## 6) `sensor_comparison_chart.py`
تابع سازنده: `build_sensor_comparison_chart`
### ورودی‌ها
- `current_sensor.soil_moisture``current_value`
- `history[:7]` → داده‌های هفته جاری
- `history[7:14]` → داده‌های هفته قبل
### فرمول‌ها
#### 1. مقدار فعلی
```text
currentValue = round(sensor.soil_moisture)
```
#### 2. سری هفته جاری
```text
recent = reversed(history[:7])
this_week = [round(item.soil_moisture or current_value) for item in recent]
```
اگر کمتر از ۷ مقدار باشد:
```text
while len(this_week) < 7:
this_week.append(current_value)
```
#### 3. سری هفته قبل
```text
previous = reversed(history[7:14])
last_week = [round(item.soil_moisture or (current_value - 5)) for item in previous]
```
اگر کمتر از ۷ مقدار باشد:
```text
while len(last_week) < 7:
last_week.append(max(0, current_value - 5))
```
#### 4. درصد تغییر نسبت به هفته قبل
```text
avg_this = sum(this_week) / len(this_week)
avg_last = sum(last_week) / len(last_week)
delta = round(((avg_this - avg_last) / avg_last) * 100) if avg_last else 0
```
نمایش متن:
```text
vsLastWeek = f"{'+' if delta >= 0 else ''}{delta}%"
vsLastWeekValue = delta
```
#### 5. دسته‌بندی روزها
روزهای ۷ روز اخیر با نام فارسی weekday ساخته می‌شوند:
```text
categories = [
PERSIAN_WEEKDAYS[(today - offset_days).weekday()]
for offset_days in range(6, -1, -1)
]
```
---
## 7) `anomaly_detection_card.py`
تابع سازنده: `build_anomaly_detection_card`
این کارت anomalyها را فقط برای دو شاخص تولید می‌کند: رطوبت خاک و pH خاک.
### 1. anomaly رطوبت خاک
فقط وقتی ساخته می‌شود که:
```text
moisture < 45
```
ساختار خروجی:
```text
value = round(moisture) + "%"
expected = "45-65%"
deviation = round(moisture - 55) + "%"
severity = "warning"
```
نکته:
- deviation نسبت به نقطه مرجع ۵۵٪ محاسبه می‌شود، نه مرز ۴۵٪.
### 2. anomaly pH خاک
فقط وقتی ساخته می‌شود که:
```text
soil_ph < 6 or soil_ph > 7
```
ساختار خروجی:
```text
value = format(soil_ph, ".1f")
expected = "6.0-7.0"
deviation = round(soil_ph - 6.5, 1)
severity = "error" if (soil_ph < 5.5 or soil_ph > 7.5) else "warning"
```
---
## 8) `farm_alerts_timeline.py`
تابع سازنده: `build_farm_alerts_timeline`
### منطق
این کارت هیچ فرمول داخلی ندارد و داده را مستقیماً از `ai_bundle` برمی‌دارد:
```text
alerts = ai_bundle.get("timeline", [])
```
---
## 9) `water_need_prediction.py`
تابع سازنده: `build_water_need_prediction`
برای ۷ forecast اول:
### فرمول نیاز آبی روزانه
```text
et0 = safe_number(forecast.et0, 4)
rain = safe_number(forecast.precipitation, 0)
need = max(0, round((et0 * 100) - (rain * 20)))
```
توضیح:
- `ET0` در ۱۰۰ ضرب می‌شود.
- بارش در ۲۰ ضرب و از آن کم می‌شود.
- مقدار منفی به صفر clamp می‌شود.
### خروجی نهایی
```text
daily_needs = [need for first 7 forecasts]
totalNext7Days = sum(daily_needs)
categories = ["روز 1", "روز 2", ...]
series = [{"name": "نیاز آبی", "data": daily_needs}]
```
واحد خروجی:
```text
unit = "m³"
```
---
## 10) `harvest_prediction_card.py`
تابع سازنده: `build_harvest_prediction_card`
### ورودی‌ها
- میانگین `temperature_mean` تمام forecastها → `avg_temp`
- `sensor.soil_moisture``moisture_factor`
- نام اولین گیاه → `plant_name`
### فرمول‌ها
#### 1. میانگین دما
```text
avg_temp = average([forecast.temperature_mean for forecast in forecasts], default=24)
```
#### 2. فاکتور رطوبت
```text
moisture_factor = sensor.soil_moisture if available else 50
```
#### 3. روز باقی‌مانده تا برداشت
```text
days_until = max(10, int(90 - avg_temp - (moisture_factor / 5)))
```
توضیح:
- هرچه دمای متوسط بیشتر باشد، `days_until` کمتر می‌شود.
- هرچه رطوبت خاک بیشتر باشد، `days_until` کمتر می‌شود.
- حداقل ۱۰ روز است.
#### 4. تاریخ برداشت و بازه بهینه
```text
target_date = today + days_until
optimalWindowStart = target_date - 3 days
optimalWindowEnd = target_date + 3 days
```
#### 5. توضیح متنی
اگر گیاه وجود داشته باشد:
```text
description = "بر اساس دمای فعلی، رطوبت خاک و اطلاعات <plant_name>. بازه بهینه برداشت محاسبه شده است."
```
اگر گیاهی وجود نداشته باشد:
```text
plant_name = "محصول"
```
---
## 11) `yield_prediction_chart.py`
تابع سازنده: `build_yield_prediction_chart`
### فرمول‌ها
#### 1. مقدار پایه
```text
base = max(10, round(sensor.soil_moisture * 0.6))
```
#### 2. سری سال جاری
```text
current_year = [
base + 0,
base + 2,
base + 4,
base + 6,
base + 8,
base + 10,
base + 12,
base + 11,
base + 9,
base + 7,
base + 5,
base + 4
]
```
#### 3. سری سال گذشته
```text
last_year = [value - 3 for value in current_year]
```
#### 4. خلاصه کارت
عملکرد پیش‌بینی‌شده:
```text
summary[0].amount = current_year[9] + " تن"
```
یعنی مقدار ماه دهم لیست (اندیس ۹).
تاریخ برداشت:
```text
harvest_month = "حدود " + str(today.month)
summary[1].amount = "+8%"
```
نکته:
- `+8%` مقدار ثابت است و از فرمول نیامده.
---
## 12) `soil_moisture_heatmap.py`
تابع سازنده: `build_soil_moisture_heatmap`
### ورودی‌ها
- `sensor.soil_moisture``base_moisture`
- `depth.wv0033` برای هر لایه عمق خاک
### منطق اولیه
```text
hours = ["۶ ص", "۸ ص", "۱۰ ص", "۱۲ ظ", "۱۴ ع", "۱۶ ع", "۱۸ ع"]
```
اگر `depths` خالی باشد:
```text
depths = [None, None]
```
### فرمول zone offset
برای هر depth:
```text
depth_offset = 0 if depth is None else round(depth.wv0033 / 10)
```
### فرمول هر خانه heatmap
برای هر zone و هر ساعت:
```text
value = clamp(
round(base_moisture + depth_offset - abs(3 - hour_index) * 2),
0,
100
)
```
توضیح:
- `hour_index = 3` مرکز نمودار است و بیشترین مقدار را می‌دهد.
- هرچه از مرکز دورتر شویم، به ازای هر پله ۲ واحد کم می‌شود.
ساختار خروجی هر نقطه:
```text
{"x": hour, "y": value}
```
---
## 13) `ndvi_health_card.py`
تابع سازنده: `build_ndvi_health_card`
### ورودی‌ها
- `sensor.nitrogen``nitrogen`
- `sensor.soil_moisture``moisture`
### فرمول NDVI
```text
ndvi = round(
clamp(((nitrogen / 100) * 0.4) + ((moisture / 100) * 0.6), 0.1, 0.95),
2
)
```
توضیح:
- نیتروژن ۴۰٪ وزن دارد.
- رطوبت خاک ۶۰٪ وزن دارد.
- خروجی بین `0.1` و `0.95` محدود می‌شود.
### وضعیت‌های متنی
#### 1. تنش نیتروژن
```text
"پایین" if nitrogen >= 30 else "بالا"
```
#### 2. سلامت محصول
```text
"خوب" if ndvi >= 0.65 else "متوسط"
```
---
## 14) `recommendations_list.py`
تابع سازنده: `build_recommendations_list`
### منطق
این کارت فرمول داخلی ندارد:
```text
recommendations = ai_bundle.get("recommendations", [])
```
---
## 15) `economic_overview.py`
تابع سازنده: `build_economic_overview`
### ورودی‌ها
- `forecast.et0` برای ۶ forecast اول
- `sensor.nitrogen`
- `sensor.phosphorus`
- `sensor.potassium`
### فرمول‌ها
#### 1. هزینه آب
```text
water_cost = round(sum(max(0, forecast.et0 * 20) for first 6 forecasts))
```
در کد با fallback:
```text
water_cost = round(
sum(max(0, safe_number(forecast.et0, 0) * 20) for forecast in forecasts[:6])
)
```
#### 2. نیاز کودی
```text
fertilizer_need = round((nitrogen + phosphorus + potassium) / 3)
```
با fallback صفر برای هر کدام.
#### 3. پیش‌بینی درآمد
```text
revenue = round(max(1000, water_cost * 4.5))
```
#### 4. صرفه‌جویی آب هوشمند
```text
smart_saving = round(water_cost * 0.18)
```
### chartSeries
#### سری هزینه آب
```text
water_series = [max(1, round(water_cost / 6)) for _ in range(6)]
```
#### سری کود
```text
fertilizer_series = [max(1, round(fertilizer_need / 6)) for _ in range(6)]
```
نکته:
- هر دو سری، ۶ مقدار تکراری یکسان تولید می‌کنند و روند ماهانه واقعی ندارند.
---
## کارت‌های بدون فرمول محاسباتی
این کارت‌ها فقط داده را از `ai_bundle` می‌خوانند:
- `farm_alerts_timeline.py`
- `recommendations_list.py`
---
## نکات مهم برای تیم
- چند اسم کارت با واقعیت محاسبه‌شان دقیقاً منطبق نیست؛ مثلاً `avg_soil_moisture` واقعاً average چند منبع نیست.
- بعضی trendها نسبت به history درستِ همان شاخص محاسبه نمی‌شوند؛ مخصوصاً در `sensor_values_list.py`.
- چند مقدار hard-coded هستند، مثل:
- `light_score = 85`
- `sensor_values_list` برای نور = `850`
- `yield_prediction_chart` برای برداشت = `+8%`
- سری ایده‌آل در `sensor_radar_chart`
- چند کارت به‌جای مدل تحلیلی واقعی، از فرمول‌های heuristic ساده استفاده می‌کنند.
+251 -23
View File
@@ -1,34 +1,262 @@
from dashboard_data.card_utils import safe_number
from __future__ import annotations
from math import sqrt
from statistics import mean
from typing import Any
METRIC_CONFIG = {
"soil_moisture": {
"label": "رطوبت خاک",
"unit": "%",
"source": "history",
"current_field": "soil_moisture",
},
"soil_temperature": {
"label": "دمای خاک",
"unit": "°C",
"source": "history",
"current_field": "soil_temperature",
},
"humidity": {
"label": "رطوبت هوا",
"unit": "%",
"source": "forecast",
"forecast_field": "humidity_mean",
},
"soil_ph": {
"label": "pH خاک",
"unit": "pH",
"source": "history",
"current_field": "soil_ph",
},
"electrical_conductivity": {
"label": "هدایت الکتریکی",
"unit": "dS/m",
"source": "history",
"current_field": "electrical_conductivity",
},
}
METHOD_PRIORITY = {"IQR": 2, "Z_SCORE": 1}
def _percentile(sorted_values: list[float], percentile: float) -> float:
if not sorted_values:
return 0.0
if len(sorted_values) == 1:
return sorted_values[0]
index = (len(sorted_values) - 1) * percentile
lower = int(index)
upper = min(lower + 1, len(sorted_values) - 1)
fraction = index - lower
return sorted_values[lower] + ((sorted_values[upper] - sorted_values[lower]) * fraction)
def _population_std(values: list[float]) -> float:
if len(values) < 2:
return 0.0
center = mean(values)
variance = sum((value - center) ** 2 for value in values) / len(values)
return sqrt(variance)
def _severity_from_score(score: float) -> str:
absolute = abs(score)
if absolute >= 3.5:
return "critical"
if absolute >= 2.5:
return "high"
if absolute >= 1.5:
return "medium"
return "low"
def _history_series(history: list[Any], field_name: str) -> tuple[list[float], str | None, float | None]:
values: list[float] = []
latest_timestamp = None
latest_value = None
for item in history:
value = getattr(item, field_name, None)
if value is None:
continue
numeric = float(value)
values.append(numeric)
if latest_timestamp is None:
recorded_at = getattr(item, "recorded_at", None)
latest_timestamp = recorded_at.isoformat() if recorded_at is not None else None
latest_value = numeric
return list(reversed(values)), latest_timestamp, latest_value
def _forecast_series(forecasts: list[Any], field_name: str) -> tuple[list[float], str | None, float | None]:
values: list[float] = []
latest_timestamp = None
latest_value = None
for forecast in forecasts[:7]:
value = getattr(forecast, field_name, None)
if value is None:
continue
numeric = float(value)
values.append(numeric)
if latest_timestamp is None:
forecast_date = getattr(forecast, "forecast_date", None)
latest_timestamp = forecast_date.isoformat() if forecast_date is not None else None
latest_value = numeric
return values, latest_timestamp, latest_value
def _detect_with_z_score(values: list[float], observed_value: float) -> dict[str, Any] | None:
if len(values) < 5:
return None
center = mean(values)
std = _population_std(values)
if std == 0:
return None
score = (observed_value - center) / std
if abs(score) < 2.0:
return None
return {
"anomaly_method": "Z_SCORE",
"deviation_score": round(score, 3),
"expected_range": [round(center - (2 * std), 2), round(center + (2 * std), 2)],
"severity": _severity_from_score(score),
}
def _detect_with_iqr(values: list[float], observed_value: float) -> dict[str, Any] | None:
if len(values) < 5:
return None
sorted_values = sorted(values)
q1 = _percentile(sorted_values, 0.25)
q3 = _percentile(sorted_values, 0.75)
iqr = q3 - q1
if iqr == 0:
return None
lower = q1 - (1.5 * iqr)
upper = q3 + (1.5 * iqr)
if lower <= observed_value <= upper:
return None
if observed_value < lower:
score = (observed_value - lower) / iqr
else:
score = (observed_value - upper) / iqr
return {
"anomaly_method": "IQR",
"deviation_score": round(score, 3),
"expected_range": [round(lower, 2), round(upper, 2)],
"severity": _severity_from_score(score),
}
def _select_detection_result(results: list[dict[str, Any]]) -> dict[str, Any] | None:
if not results:
return None
return sorted(
results,
key=lambda item: (METHOD_PRIORITY[item["anomaly_method"]], abs(item["deviation_score"])),
reverse=True,
)[0]
def _build_contextual_interpretation(anomalies: list[dict[str, Any]], ai_bundle: dict | None = None) -> dict[str, Any]:
ai_bundle = ai_bundle or {}
ai_payload = ai_bundle.get("anomalyDetectionCard", {}) if isinstance(ai_bundle, dict) else {}
if isinstance(ai_payload, dict) and all(ai_payload.get(key) for key in ("explanation", "likely_cause", "recommended_action")):
return {
"explanation": ai_payload["explanation"],
"likely_cause": ai_payload["likely_cause"],
"recommended_action": ai_payload["recommended_action"],
}
metric_types = {item["metric_type"] for item in anomalies}
if {"soil_temperature", "soil_moisture"} <= metric_types:
return {
"explanation": "هم‌زمانی ناهنجاری دمای خاک و رطوبت خاک نشان می‌دهد تنش ترکیبی در ناحیه ریشه در حال شکل‌گیری است.",
"likely_cause": "احتمالاً الگوی آبیاری، موج گرما یا افت ناگهانی ظرفیت نگهداشت رطوبت خاک عامل اصلی است.",
"recommended_action": "زمان‌بندی آبیاری و وضعیت زهکشی/تبخیر بررسی و قرائت‌های سنسور در ۲۴ ساعت آینده دوباره پایش شود.",
}
if "electrical_conductivity" in metric_types and "soil_moisture" in metric_types:
return {
"explanation": "هم‌زمانی ناهنجاری EC و رطوبت می‌تواند نشان‌دهنده فشار شوری یا تجمع نمک در بستر باشد.",
"likely_cause": "کیفیت آب آبیاری، کوددهی اخیر یا کاهش شست‌وشوی خاک می‌تواند عامل این الگو باشد.",
"recommended_action": "EC آب و برنامه کوددهی بازبینی و در صورت نیاز شست‌وشوی کنترل‌شده خاک بررسی شود.",
}
if anomalies:
top = anomalies[0]
return {
"explanation": f"در شاخص {top['label']} یک ناهنجاری آماری با روش {top['anomaly_method']} شناسایی شده است.",
"likely_cause": "این رخداد می‌تواند ناشی از تغییر ناگهانی شرایط محیطی، خطای فرایندی یا نیاز به کالیبراسیون سنسور باشد.",
"recommended_action": "روند همان شاخص و داده‌های پیرامونی بازبینی و در صورت تداوم، اقدام اصلاحی مزرعه اجرا شود.",
}
return {
"explanation": "ناهنجاری آماری معناداری در داده‌های اخیر شناسایی نشد.",
"likely_cause": "داده‌های فعلی با الگوی تاریخی سازگار هستند.",
"recommended_action": "پایش عادی ادامه یابد.",
}
def build_anomaly_detection_card(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
sensor = (context or {}).get("sensor")
context = context or {}
sensor = context.get("sensor")
history = context.get("history", [])
forecasts = context.get("forecasts", [])
if sensor is None:
return {"anomalies": []}
return {"anomalies": [], "interpretation": None}
anomalies: list[dict[str, Any]] = []
for metric_type, config in METRIC_CONFIG.items():
if config["source"] == "history":
values, timestamp, observed_value = _history_series(history, config["current_field"])
current_value = getattr(sensor, config["current_field"], None)
if current_value is not None:
observed_value = float(current_value)
timestamp = getattr(sensor, "updated_at", None)
timestamp = timestamp.isoformat() if timestamp is not None else timestamp
else:
values, timestamp, observed_value = _forecast_series(forecasts, config["forecast_field"])
if observed_value is None or len(values) < 5:
continue
detection = _select_detection_result(
[
result
for result in (
_detect_with_z_score(values, observed_value),
_detect_with_iqr(values, observed_value),
)
if result is not None
]
)
if detection is None:
continue
anomalies = []
moisture = safe_number(sensor.soil_moisture, 0)
if moisture < 45:
anomalies.append(
{
"sensor": "رطوبت خاک",
"value": f"{round(moisture)}%",
"expected": "45-65%",
"deviation": f"{round(moisture - 55)}%",
"severity": "warning",
"metric_type": metric_type,
"label": config["label"],
"timestamp": timestamp,
"observed_value": round(observed_value, 2),
"expected_range": detection["expected_range"],
"deviation_score": detection["deviation_score"],
"anomaly_method": detection["anomaly_method"],
"severity": detection["severity"],
"unit": config["unit"],
}
)
soil_ph = safe_number(sensor.soil_ph, 7)
if soil_ph < 6 or soil_ph > 7:
anomalies.append(
{
"sensor": "pH خاک",
"value": f"{soil_ph:.1f}",
"expected": "6.0-7.0",
"deviation": f"{round(soil_ph - 6.5, 1)}",
"severity": "error" if soil_ph < 5.5 or soil_ph > 7.5 else "warning",
}
)
anomalies.sort(key=lambda item: abs(item["deviation_score"]), reverse=True)
interpretation = _build_contextual_interpretation(anomalies, ai_bundle=ai_bundle)
return {"anomalies": anomalies}
return {
"anomalies": anomalies,
"interpretation": interpretation,
}
+4 -1
View File
@@ -1,3 +1,6 @@
def build_farm_alerts_timeline(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
ai_bundle = ai_bundle or {}
return {"alerts": ai_bundle.get("timeline", [])}
return {
"alerts": ai_bundle.get("timeline", []),
"structuredContext": ai_bundle.get("structured_context", {}),
}
+550 -31
View File
@@ -1,41 +1,560 @@
from dashboard_data.card_utils import average, safe_number
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from typing import Any
from django.utils import timezone
from dashboard_data.card_utils import safe_number
SEVERITY_ORDER = {"low": 1, "medium": 2, "high": 3, "critical": 4}
SEVERITY_UI = {
"low": {"avatarColor": "info", "chipColor": "info"},
"medium": {"avatarColor": "warning", "chipColor": "warning"},
"high": {"avatarColor": "error", "chipColor": "error"},
"critical": {"avatarColor": "error", "chipColor": "error"},
}
METRIC_META = {
"moisture": {
"title": "تنش رطوبتی",
"icon": "tabler-droplet-half-2",
"unit": "%",
"domain": "water_balance",
"threshold": 45.0,
"danger_span": 20.0,
"direction": "below",
},
"temperature": {
"title": "تنش دمایی",
"icon": "tabler-snowflake",
"unit": "°C",
"domain": "temperature_stress",
"threshold": 0.0,
"danger_span": 8.0,
"direction": "below",
},
"ph": {
"title": "عدم تعادل pH",
"icon": "tabler-flask",
"unit": "pH",
"domain": "root_chemistry",
"threshold_low": 6.0,
"threshold_high": 7.5,
"danger_span": 1.5,
},
"ec": {
"title": "شوری / EC بالا",
"icon": "tabler-bolt",
"unit": "dS/m",
"domain": "root_chemistry",
"threshold": 3.0,
"danger_span": 2.0,
"direction": "above",
},
"fungal_risk": {
"title": "ریسک قارچی",
"icon": "tabler-mushroom",
"unit": "%",
"domain": "disease_pressure",
"threshold": 70.0,
"danger_span": 20.0,
"direction": "above",
},
}
SUMMARY_TEMPLATES = {
"moisture": {
"low": "افت رطوبت خاک ثبت شده و نیاز به پایش نزدیک‌تر دارد.",
"medium": "رطوبت خاک پایین‌تر از محدوده مطلوب است و برنامه آبیاری باید بازبینی شود.",
"high": "تنش آبی قابل‌توجه شناسایی شده و مزرعه به اقدام آبیاری سریع نیاز دارد.",
"critical": "کمبود شدید رطوبت فعال است و خطر افت رشد یا آسیب ریشه بالا رفته است.",
},
"temperature": {
"low": "دمای پایین ثبت شده و باید روند شبانه پایش شود.",
"medium": "ریسک سرمازدگی ایجاد شده و اقدامات محافظتی باید آماده شود.",
"high": "سرما به محدوده پرخطر رسیده و حفاظت دمایی باید در اولویت باشد.",
"critical": "یخبندان بحرانی پیش‌بینی یا مشاهده شده و اقدام فوری حفاظتی لازم است.",
},
"ph": {
"low": "pH از محدوده مطلوب فاصله گرفته و نیاز به بررسی اصلاحی دارد.",
"medium": "عدم تعادل pH می‌تواند جذب عناصر را مختل کند و باید اصلاح شود.",
"high": "انحراف pH شدید است و ریسک اختلال تغذیه گیاه بالا رفته است.",
"critical": "pH در وضعیت بحرانی قرار دارد و مداخله سریع برای جلوگیری از تنش تغذیه‌ای لازم است.",
},
"ec": {
"low": "EC بالاتر از حد مرجع است و باید روند شوری پیگیری شود.",
"medium": "شوری خاک می‌تواند رشد را محدود کند و نیاز به تعدیل دارد.",
"high": "EC بالا به سطح پرخطر رسیده و مدیریت شوری باید انجام شود.",
"critical": "شوری بحرانی فعال است و احتمال آسیب ریشه و افت جذب آب بسیار بالاست.",
},
"fungal_risk": {
"low": "شرایط اولیه برای فشار بیماری قارچی مشاهده شده است.",
"medium": "رطوبت و خیس‌ماندگی بستر، ریسک بیماری قارچی را افزایش داده است.",
"high": "فشار بیماری قارچی بالا است و عملیات پیشگیرانه باید در اولویت قرار گیرد.",
"critical": "الگوی بسیار پرخطر بیماری قارچی فعال است و اقدام فوری محافظتی لازم است.",
},
}
ACTION_TEMPLATES = {
"moisture": {
"low": "روند رطوبت را در نوبت بعدی کنترل و یکنواختی آبیاری را بررسی کنید.",
"medium": "یک نوبت آبیاری اصلاحی برنامه‌ریزی و افت رطوبت در عمق‌های مختلف پایش شود.",
"high": "آبیاری جبرانی کوتاه‌مدت اجرا و راندمان روش آبیاری بازبینی شود.",
"critical": "آبیاری اضطراری، بررسی انسداد سامانه و پایش مجدد سنسور فوراً انجام شود.",
},
"temperature": {
"low": "پوشش یا برنامه محافظتی شبانه آماده نگه داشته شود.",
"medium": "زمان‌بندی آبیاری و پوشش حفاظتی برای ساعات سرد تنظیم شود.",
"high": "اقدامات ضدیخبندان مانند آبیاری حفاظتی یا پوشش فوری اجرا شود.",
"critical": "پروتکل کامل حفاظت سرما فوراً فعال و وضعیت مزرعه در چند ساعت بعدی بازبینی شود.",
},
"ph": {
"low": "نمونه‌برداری تکمیلی انجام و روند pH برای چند قرائت بعدی کنترل شود.",
"medium": "برنامه اصلاح pH با توجه به نوع خاک و کود مصرفی بازتنظیم شود.",
"high": "اصلاح‌کننده مناسب خاک در اولویت قرار گیرد و تغذیه گیاه بازبینی شود.",
"critical": "مداخله اصلاحی فوری برای pH انجام و مصرف نهاده‌های تشدیدکننده متوقف شود.",
},
"ec": {
"low": "منبع آب و روند EC در روزهای آینده کنترل شود.",
"medium": "شست‌وشوی محدود خاک یا اصلاح برنامه کوددهی بررسی شود.",
"high": "کاهش بار نمکی، بازبینی کوددهی و ارزیابی زهکشی در اولویت قرار گیرد.",
"critical": "اقدام فوری برای کاهش شوری و توقف نهاده‌های شورکننده انجام شود.",
},
"fungal_risk": {
"low": "تهویه و رطوبت بستر پایش شود و نشانه‌های اولیه بیماری بررسی گردد.",
"medium": "فاصله آبیاری و تهویه مزرعه تنظیم و بازدید بیماری انجام شود.",
"high": "اقدامات پیشگیرانه بیماری و کاهش رطوبت ماندگار فوراً اجرا شود.",
"critical": "پروتکل فوری مدیریت بیماری فعال و مزرعه از نظر آلودگی کانونی بررسی شود.",
},
}
EXPLANATION_TEMPLATES = {
"moisture": {
"low": "رطوبت فعلی {current_value}{unit} به زیر آستانه {threshold_value}{unit} رسیده است و این وضعیت {duration_text} ادامه داشته است.",
"medium": "رطوبت خاک {current_value}{unit} است؛ فاصله از آستانه {threshold_value}{unit} و تداوم {duration_text} نشان‌دهنده تنش آبی است.",
"high": "رطوبت خاک در {current_value}{unit} ثبت شده که به‌طور معنی‌دار پایین‌تر از آستانه {threshold_value}{unit} است و {duration_text} پایدار مانده است.",
"critical": "رطوبت خاک به {current_value}{unit} سقوط کرده و با عبور شدید از آستانه {threshold_value}{unit}، {duration_text} در وضعیت بحرانی باقی مانده است.",
},
"temperature": {
"low": "دما به {current_value}{unit} رسیده که از حد هشدار {threshold_value}{unit} پایین‌تر است و {duration_text} تداوم داشته است.",
"medium": "دمای ثبت‌شده {current_value}{unit} کمتر از آستانه {threshold_value}{unit} است و تداوم {duration_text} ریسک تنش سرما را بالا برده است.",
"high": "افت دما تا {current_value}{unit} همراه با ماندگاری {duration_text} شرایط پرخطر سرما را ایجاد کرده است.",
"critical": "دمای {current_value}{unit} با ماندگاری {duration_text} نشان می‌دهد مزرعه در معرض یخبندان بحرانی قرار دارد.",
},
"ph": {
"low": "pH فعلی {current_value}{unit} از محدوده مرجع {threshold_value} خارج شده و این انحراف {duration_text} ادامه داشته است.",
"medium": "انحراف pH تا {current_value}{unit} نسبت به حد مجاز {threshold_value} همراه با تداوم {duration_text} می‌تواند جذب عناصر را مختل کند.",
"high": "pH {current_value}{unit} با فاصله زیاد از محدوده مرجع و پایداری {duration_text} یک تنش شیمیایی مهم ایجاد کرده است.",
"critical": "وضعیت بحرانی pH در سطح {current_value}{unit} و با تداوم {duration_text} نیاز به اصلاح فوری دارد.",
},
"ec": {
"low": "EC فعلی {current_value}{unit} از آستانه {threshold_value}{unit} عبور کرده و {duration_text} پایدار مانده است.",
"medium": "EC برابر {current_value}{unit} است؛ عبور از حد {threshold_value}{unit} با ماندگاری {duration_text} فشار شوری را افزایش داده است.",
"high": "شوری ثبت‌شده در {current_value}{unit} با تداوم {duration_text} به سطح پرخطر رسیده است.",
"critical": "EC در {current_value}{unit} و با پایداری {duration_text} نشان‌دهنده شوری بحرانی خاک است.",
},
"fungal_risk": {
"low": "رطوبت هوا و خاک شرایط اولیه فشار قارچی را ایجاد کرده و این الگو {duration_text} ادامه داشته است.",
"medium": "ترکیب رطوبت {current_value}{unit} و ماندگاری {duration_text} از آستانه {threshold_value}{unit} عبور کرده و ریسک قارچی را بالا برده است.",
"high": "شرایط مرطوب پایدار در {current_value}{unit} و تداوم {duration_text} فشار قارچی جدی ایجاد کرده است.",
"critical": "ماندگاری طولانی شرایط بسیار مرطوب ({current_value}{unit}) در برابر حد {threshold_value}{unit} نشان‌دهنده ریسک بحرانی بیماری قارچی است.",
},
}
CLUSTER_TITLES = {
"water_balance": "تعادل آب",
"temperature_stress": "تنش دمایی",
"root_chemistry": "شیمی ناحیه ریشه",
"disease_pressure": "فشار بیماری",
}
def _now() -> datetime:
return timezone.now()
def _timestamp_for(obj: Any, fallback: datetime) -> datetime:
for attr in ("recorded_at", "updated_at", "created_at", "forecast_date"):
value = getattr(obj, attr, None)
if value is not None:
if isinstance(value, datetime):
return value
return datetime.combine(value, datetime.min.time(), tzinfo=fallback.tzinfo)
return fallback
def _format_timestamp(value: datetime) -> str:
if timezone.is_naive(value):
value = timezone.make_aware(value, timezone.get_current_timezone())
return value.isoformat()
def _format_duration(hours: float) -> str:
rounded = max(1, round(hours))
if rounded >= 24:
days = rounded // 24
rem_hours = rounded % 24
if rem_hours == 0:
return f"{days} روز"
return f"{days} روز و {rem_hours} ساعت"
return f"{rounded} ساعت"
def _severity_from_score(score: float) -> str:
if score >= 0.85:
return "critical"
if score >= 0.55:
return "high"
if score >= 0.3:
return "medium"
return "low"
def _build_severity(distance_ratio: float, duration_hours: float) -> str:
duration_ratio = min(duration_hours / 72.0, 1.0)
score = min((distance_ratio * 0.7) + (duration_ratio * 0.3), 1.0)
return _severity_from_score(score)
def _collect_active_history_duration(
current_value: float,
history: list[Any],
field_name: str,
threshold: float,
direction: str,
fallback_timestamp: datetime,
) -> tuple[float, datetime]:
if direction == "below":
is_violating = lambda value: value < threshold
else:
is_violating = lambda value: value > threshold
if not is_violating(current_value):
return 0.0, fallback_timestamp
violating_times = [fallback_timestamp]
for item in history:
value = getattr(item, field_name, None)
if value is None:
break
if not is_violating(value):
break
violating_times.append(_timestamp_for(item, fallback_timestamp))
oldest_violation = min(violating_times)
duration_hours = max((_now() - oldest_violation).total_seconds() / 3600, 1.0)
return duration_hours, oldest_violation
def _make_alert(
metric_type: str,
current_value: float,
threshold_value: float | str,
severity: str,
duration_hours: float,
timestamp: datetime,
sensor_id: str,
zone_id: str | None = None,
direction: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
meta = METRIC_META[metric_type]
unit = meta["unit"]
threshold_display = threshold_value
if isinstance(threshold_value, float):
threshold_display = round(threshold_value, 2)
explanation = EXPLANATION_TEMPLATES[metric_type][severity].format(
current_value=round(current_value, 2),
threshold_value=threshold_display,
unit=unit,
duration_text=_format_duration(duration_hours),
)
return {
"metric_type": metric_type,
"title": meta["title"],
"current_value": round(current_value, 2),
"threshold_value": threshold_display,
"severity": severity,
"duration_hours": round(duration_hours, 1),
"duration": _format_duration(duration_hours),
"timestamp": _format_timestamp(timestamp),
"sensor_id": sensor_id,
"zone_id": zone_id,
"domain": meta["domain"],
"direction": direction,
"unit": unit,
"icon": meta["icon"],
"summary": SUMMARY_TEMPLATES[metric_type][severity],
"recommended_action": ACTION_TEMPLATES[metric_type][severity],
"explanation": explanation,
"metadata": metadata or {},
}
def _detect_moisture_alert(sensor: Any, history: list[Any], sensor_id: str) -> list[dict[str, Any]]:
current_value = safe_number(getattr(sensor, "soil_moisture", None), 0)
meta = METRIC_META["moisture"]
threshold = meta["threshold"]
if current_value >= threshold:
return []
timestamp = _timestamp_for(sensor, _now())
duration_hours, started_at = _collect_active_history_duration(
current_value=current_value,
history=history,
field_name="soil_moisture",
threshold=threshold,
direction=meta["direction"],
fallback_timestamp=timestamp,
)
distance_ratio = min((threshold - current_value) / meta["danger_span"], 1.0)
severity = _build_severity(distance_ratio, duration_hours)
return [
_make_alert(
metric_type="moisture",
current_value=current_value,
threshold_value=threshold,
severity=severity,
duration_hours=duration_hours,
timestamp=started_at,
sensor_id=sensor_id,
direction=meta["direction"],
)
]
def _detect_ph_alert(sensor: Any, history: list[Any], sensor_id: str) -> list[dict[str, Any]]:
current_value = safe_number(getattr(sensor, "soil_ph", None), 7)
meta = METRIC_META["ph"]
low = meta["threshold_low"]
high = meta["threshold_high"]
if low <= current_value <= high:
return []
direction = "below" if current_value < low else "above"
threshold = low if direction == "below" else high
timestamp = _timestamp_for(sensor, _now())
duration_hours, started_at = _collect_active_history_duration(
current_value=current_value,
history=history,
field_name="soil_ph",
threshold=threshold,
direction=direction,
fallback_timestamp=timestamp,
)
distance_ratio = min(abs(current_value - threshold) / meta["danger_span"], 1.0)
severity = _build_severity(distance_ratio, duration_hours)
threshold_display = f"{low}-{high}"
return [
_make_alert(
metric_type="ph",
current_value=current_value,
threshold_value=threshold_display,
severity=severity,
duration_hours=duration_hours,
timestamp=started_at,
sensor_id=sensor_id,
direction=direction,
metadata={"boundary_threshold": threshold},
)
]
def _detect_ec_alert(sensor: Any, history: list[Any], sensor_id: str) -> list[dict[str, Any]]:
current_value = safe_number(getattr(sensor, "electrical_conductivity", None), 0)
meta = METRIC_META["ec"]
threshold = meta["threshold"]
if current_value <= threshold:
return []
timestamp = _timestamp_for(sensor, _now())
duration_hours, started_at = _collect_active_history_duration(
current_value=current_value,
history=history,
field_name="electrical_conductivity",
threshold=threshold,
direction=meta["direction"],
fallback_timestamp=timestamp,
)
distance_ratio = min((current_value - threshold) / meta["danger_span"], 1.0)
severity = _build_severity(distance_ratio, duration_hours)
return [
_make_alert(
metric_type="ec",
current_value=current_value,
threshold_value=threshold,
severity=severity,
duration_hours=duration_hours,
timestamp=started_at,
sensor_id=sensor_id,
direction=meta["direction"],
)
]
def _detect_frost_alert(forecasts: list[Any], sensor_id: str) -> list[dict[str, Any]]:
violating = [forecast for forecast in forecasts[:3] if safe_number(getattr(forecast, "temperature_min", None), 10) < 0]
if not violating:
return []
first = violating[0]
coldest = min(safe_number(getattr(item, "temperature_min", None), 0) for item in violating)
duration_hours = max(len(violating) * 24.0, 24.0)
meta = METRIC_META["temperature"]
distance_ratio = min((meta["threshold"] - coldest) / meta["danger_span"], 1.0)
severity = _build_severity(distance_ratio, duration_hours)
timestamp = _timestamp_for(first, _now())
return [
_make_alert(
metric_type="temperature",
current_value=coldest,
threshold_value=meta["threshold"],
severity=severity,
duration_hours=duration_hours,
timestamp=timestamp,
sensor_id=sensor_id,
direction=meta["direction"],
metadata={"forecast_days_impacted": len(violating)},
)
]
def _detect_fungal_risk(sensor: Any, forecasts: list[Any], history: list[Any], sensor_id: str) -> list[dict[str, Any]]:
humidity_values = [safe_number(getattr(forecast, "humidity_mean", None), None) for forecast in forecasts[:3]]
humidity_values = [value for value in humidity_values if value is not None]
if not humidity_values:
return []
humidity = sum(humidity_values) / len(humidity_values)
moisture = safe_number(getattr(sensor, "soil_moisture", None), 0)
meta = METRIC_META["fungal_risk"]
threshold = meta["threshold"]
if humidity <= threshold or moisture <= 60:
return []
timestamp = _timestamp_for(sensor, _now())
duration_hours, started_at = _collect_active_history_duration(
current_value=moisture,
history=history,
field_name="soil_moisture",
threshold=60.0,
direction="above",
fallback_timestamp=timestamp,
)
duration_hours = max(duration_hours, len(forecasts[:3]) * 12.0)
humidity_ratio = min((humidity - threshold) / meta["danger_span"], 1.0)
moisture_ratio = min((moisture - 60.0) / 20.0, 1.0)
severity = _build_severity((humidity_ratio * 0.6) + (moisture_ratio * 0.4), duration_hours)
return [
_make_alert(
metric_type="fungal_risk",
current_value=humidity,
threshold_value=threshold,
severity=severity,
duration_hours=duration_hours,
timestamp=started_at,
sensor_id=sensor_id,
direction=meta["direction"],
metadata={"soil_moisture": round(moisture, 2)},
)
]
def _sort_alerts(alerts: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(
alerts,
key=lambda alert: (
SEVERITY_ORDER[alert["severity"]],
alert["duration_hours"],
abs(float(alert["current_value"])) if isinstance(alert["current_value"], (int, float)) else 0,
),
reverse=True,
)
def _build_clusters(alerts: list[dict[str, Any]]) -> list[dict[str, Any]]:
grouped: dict[str, list[dict[str, Any]]] = defaultdict(list)
for alert in alerts:
grouped[alert["domain"]].append(alert)
clusters: list[dict[str, Any]] = []
for domain, items in grouped.items():
ordered = _sort_alerts(items)
top = ordered[0]
clusters.append(
{
"domain": domain,
"title": CLUSTER_TITLES.get(domain, domain),
"alert_count": len(items),
"highest_severity": top["severity"],
"primary_metric": top["metric_type"],
"summary": top["summary"],
"alert_ids": [f"{item['metric_type']}:{item['timestamp']}" for item in ordered],
}
)
return sorted(clusters, key=lambda cluster: SEVERITY_ORDER[cluster["highest_severity"]], reverse=True)
def _build_alert_stats(alerts: list[dict[str, Any]]) -> list[dict[str, Any]]:
stats: list[dict[str, Any]] = []
for metric_type, meta in METRIC_META.items():
matches = [alert for alert in alerts if alert["metric_type"] == metric_type]
if not matches:
continue
top = _sort_alerts(matches)[0]
ui = SEVERITY_UI[top["severity"]]
stats.append(
{
"title": meta["title"],
"count": str(len(matches)),
"avatarColor": ui["avatarColor"],
"avatarIcon": meta["icon"],
"severity": top["severity"],
"topSummary": top["summary"],
}
)
return stats
def build_farm_alerts_tracker(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
context = context or {}
sensor = context.get("sensor")
forecasts = context.get("forecasts", [])
if sensor is None:
return {"totalAlerts": 0, "radialBarValue": 0, "alertStats": []}
history = context.get("history", [])
moisture = safe_number(sensor.soil_moisture, 0)
humidity = average([forecast.humidity_mean for forecast in forecasts[:3]], default=0)
frost_count = sum(1 for forecast in forecasts[:3] if safe_number(forecast.temperature_min, 10) <= 0)
low_water_count = 2 if moisture < 45 else 0
fungal_count = 1 if humidity > 70 and moisture > 60 else 0
total = low_water_count + fungal_count + frost_count
if sensor is None:
return {
"totalAlerts": 0,
"alerts": [],
"alertStats": [],
"alertClusters": [],
"mostCriticalIssue": None,
"prioritizedAlertSummaries": [],
"recommendedOperationalActions": [],
"humanReadableExplanations": [],
}
alerts = []
alerts.extend(_detect_moisture_alert(sensor, history, sensor_id))
alerts.extend(_detect_ph_alert(sensor, history, sensor_id))
alerts.extend(_detect_ec_alert(sensor, history, sensor_id))
alerts.extend(_detect_frost_alert(forecasts, sensor_id))
alerts.extend(_detect_fungal_risk(sensor, forecasts, history, sensor_id))
ordered_alerts = _sort_alerts(alerts)
clusters = _build_clusters(ordered_alerts)
top_alert = ordered_alerts[0] if ordered_alerts else None
return {
"totalAlerts": total,
"radialBarValue": min(100, total * 10),
"alertStats": [
{
"title": "کمبود آب",
"count": str(low_water_count),
"avatarColor": "error",
"avatarIcon": "tabler-droplet-half-2",
},
{
"title": "ریسک قارچی",
"count": str(fungal_count),
"avatarColor": "warning",
"avatarIcon": "tabler-mushroom",
},
{
"title": "هشدار یخبندان",
"count": str(frost_count),
"avatarColor": "info",
"avatarIcon": "tabler-snowflake",
},
],
"totalAlerts": len(ordered_alerts),
"alerts": ordered_alerts,
"alertStats": _build_alert_stats(ordered_alerts),
"alertClusters": clusters,
"mostCriticalIssue": top_alert,
"prioritizedAlertSummaries": [alert["summary"] for alert in ordered_alerts],
"recommendedOperationalActions": [alert["recommended_action"] for alert in ordered_alerts],
"humanReadableExplanations": [alert["explanation"] for alert in ordered_alerts],
}
+174 -8
View File
@@ -1,6 +1,160 @@
from __future__ import annotations
from typing import Any
from dashboard_data.card_utils import average, safe_number
DEFAULT_HEALTH_PROFILE = {
"moisture": {"ideal_value": 65.0, "min_range": 45.0, "max_range": 75.0, "weight": 0.45},
"ph": {"ideal_value": 6.6, "min_range": 6.0, "max_range": 7.5, "weight": 0.30},
"ec": {"ideal_value": 1.2, "min_range": 0.2, "max_range": 3.0, "weight": 0.25},
}
METRIC_SPECS = {
"moisture": {
"sensor_field": "soil_moisture",
"label": "رطوبت خاک",
"unit": "%",
},
"ph": {
"sensor_field": "soil_ph",
"label": "pH خاک",
"unit": "pH",
},
"ec": {
"sensor_field": "electrical_conductivity",
"label": "هدایت الکتریکی",
"unit": "dS/m",
},
}
def _normalize_metric(value: float, ideal_value: float, min_range: float, max_range: float) -> float:
if max_range <= min_range:
return 0.0
if value <= min_range or value >= max_range:
return 0.0
if value == ideal_value:
return 1.0
if value < ideal_value:
span = ideal_value - min_range
if span <= 0:
return 0.0
return max(0.0, min(1.0, (value - min_range) / span))
span = max_range - ideal_value
if span <= 0:
return 0.0
return max(0.0, min(1.0, (max_range - value) / span))
def _resolve_plant_profile(context: dict[str, Any]) -> tuple[dict[str, dict[str, float]], str]:
plants = context.get("plants", [])
for plant in plants:
profile = getattr(plant, "health_profile", None) or {}
if profile:
merged = {
metric: {
**DEFAULT_HEALTH_PROFILE.get(metric, {}),
**profile.get(metric, {}),
}
for metric in set(DEFAULT_HEALTH_PROFILE) | set(profile)
}
return merged, getattr(plant, "name", "گیاه")
return DEFAULT_HEALTH_PROFILE, (plants[0].name if plants else "پروفایل پیش‌فرض")
def _compute_health_score(sensor: Any, profile: dict[str, dict[str, float]]) -> tuple[int, list[dict[str, Any]]]:
weighted_sum = 0.0
total_weight = 0.0
components: list[dict[str, Any]] = []
for metric_type, config in profile.items():
spec = METRIC_SPECS.get(metric_type)
if spec is None:
continue
sensor_value = getattr(sensor, spec["sensor_field"], None)
if sensor_value is None:
continue
current_value = float(safe_number(sensor_value, 0))
ideal_value = float(config.get("ideal_value", DEFAULT_HEALTH_PROFILE.get(metric_type, {}).get("ideal_value", 0)))
min_range = float(config.get("min_range", DEFAULT_HEALTH_PROFILE.get(metric_type, {}).get("min_range", 0)))
max_range = float(config.get("max_range", DEFAULT_HEALTH_PROFILE.get(metric_type, {}).get("max_range", 0)))
weight = float(config.get("weight", DEFAULT_HEALTH_PROFILE.get(metric_type, {}).get("weight", 0)))
if weight <= 0:
continue
normalized_value = _normalize_metric(
value=current_value,
ideal_value=ideal_value,
min_range=min_range,
max_range=max_range,
)
weighted_sum += weight * normalized_value
total_weight += weight
components.append(
{
"metricType": metric_type,
"label": spec["label"],
"unit": spec["unit"],
"currentValue": round(current_value, 2),
"idealValue": round(ideal_value, 2),
"minRange": round(min_range, 2),
"maxRange": round(max_range, 2),
"weight": round(weight, 3),
"normalizedValue": round(normalized_value, 4),
"weightedContribution": round(weight * normalized_value, 4),
}
)
if total_weight <= 0:
return 0, components
score = round((weighted_sum / total_weight) * 100)
return max(0, min(100, score)), components
def _health_language(health_score: int, ai_bundle: dict | None = None) -> dict[str, str]:
ai_bundle = ai_bundle or {}
ai_health = ai_bundle.get("farmOverviewKpis", {}) if isinstance(ai_bundle, dict) else {}
short_chip_text = ai_health.get("short_chip_text")
action_hint = ai_health.get("action_hint")
explanation = ai_health.get("explanation")
if isinstance(short_chip_text, str) and short_chip_text.strip() and isinstance(action_hint, str) and action_hint.strip() and isinstance(explanation, str) and explanation.strip():
return {
"short_chip_text": short_chip_text.strip(),
"action_hint": action_hint.strip(),
"explanation": explanation.strip(),
}
if health_score >= 85:
return {
"short_chip_text": "بسیار خوب",
"action_hint": "برنامه فعلی پایش و نگهداری حفظ شود.",
"explanation": "شاخص سلامت مزرعه به محدوده بسیار خوب رسیده و بیشتر پارامترهای کلیدی نزدیک به پروفایل ایده‌آل گیاه هستند.",
}
if health_score >= 70:
return {
"short_chip_text": "پایدار",
"action_hint": "تنظیمات فعلی حفظ و فقط شاخص‌های مرزی پایش شوند.",
"explanation": "سلامت مزرعه در محدوده قابل قبول است، اما برخی پارامترها هنوز با مقدار ایده‌آل فاصله دارند.",
}
if health_score >= 50:
return {
"short_chip_text": "نیازمند تنظیم",
"action_hint": "پارامترهای دور از محدوده ایده‌آل در اولویت اصلاح قرار گیرند.",
"explanation": "امتیاز سلامت نشان می‌دهد بخشی از شرایط محیطی از پروفایل مطلوب گیاه فاصله گرفته و باید تنظیم شود.",
}
return {
"short_chip_text": "تنش بالا",
"action_hint": "اصلاح فوری رطوبت، تغذیه یا شوری بر اساس اجزای امتیاز انجام شود.",
"explanation": "سلامت مزرعه در محدوده ضعیف قرار دارد و چند شاخص اصلی خارج از بازه قابل قبول گیاه هستند.",
}
def build_farm_overview_kpis(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
context = context or {}
sensor = context.get("sensor")
@@ -8,26 +162,36 @@ def build_farm_overview_kpis(sensor_id: str, context: dict | None = None, ai_bun
if sensor is None:
return {"kpis": []}
profile, profile_source = _resolve_plant_profile(context)
health_score, health_components = _compute_health_score(sensor, profile)
health_language = _health_language(health_score, ai_bundle=ai_bundle)
moisture = safe_number(sensor.soil_moisture, 0)
ph = safe_number(sensor.soil_ph, 7)
ec = safe_number(sensor.electrical_conductivity, 0)
humidity = average([forecast.humidity_mean for forecast in forecasts[:3]], default=45)
health_score = max(0, min(100, round(100 - abs(65 - moisture) - (abs(6.8 - ph) * 10) - (ec * 5))))
water_stress = max(0, min(100, round(35 - (moisture / 2))))
disease_risk = max(0, min(100, round((humidity * 0.4) + (safe_number(sensor.soil_temperature, 0) * 0.6) - 20)))
yield_prediction = round(max(5, (health_score / 2.1)), 1)
primary_gap = min(health_components, key=lambda item: item["normalizedValue"], default=None)
return {
"kpis": [
{
"id": "farm_health_score",
"title": "امتیاز سلامت مزرعه",
"subtitle": "تحلیل هوشمند",
"subtitle": f"پروفایل {profile_source}",
"stats": f"{health_score}%",
"avatarColor": "success" if health_score >= 70 else "warning",
"avatarColor": "success" if health_score >= 70 else "warning" if health_score >= 50 else "error",
"avatarIcon": "tabler-heartbeat",
"chipText": "خوب" if health_score >= 70 else "متوسط",
"chipColor": "success" if health_score >= 70 else "warning",
"chipText": health_language["short_chip_text"],
"chipColor": "success" if health_score >= 70 else "warning" if health_score >= 50 else "error",
"actionHint": health_language["action_hint"],
"explanation": health_language["explanation"],
"healthScoreDetails": {
"method": "normalized_weighted_average",
"profileSource": profile_source,
"components": health_components,
},
},
{
"id": "water_stress_index",
@@ -66,8 +230,10 @@ def build_farm_overview_kpis(sensor_id: str, context: dict | None = None, ai_bun
"stats": f"{yield_prediction} تن",
"avatarColor": "secondary",
"avatarIcon": "tabler-chart-bar",
"chipText": f"+{max(0, health_score - 50)}%",
"chipColor": "success",
"chipText": (
primary_gap["label"] if primary_gap else "پایدار"
),
"chipColor": "warning" if primary_gap and primary_gap["normalizedValue"] < 0.6 else "success",
},
{
"id": "pest_risk",
+30 -14
View File
@@ -1,6 +1,22 @@
from datetime import date, timedelta
from __future__ import annotations
from dashboard_data.card_utils import average, safe_number
from datetime import date
from plant.gdd import predict_harvest_from_forecasts
def _harvest_language(prediction: dict, plant_name: str, ai_bundle: dict | None = None) -> str:
ai_bundle = ai_bundle or {}
ai_payload = ai_bundle.get("harvestPredictionCard", {}) if isinstance(ai_bundle, dict) else {}
description = ai_payload.get("description")
if isinstance(description, str) and description.strip():
return description.strip()
return (
f"برای {plant_name}، رشد تجمعی بر اساس مدل GDD محاسبه شده است. "
f"تا امروز {prediction['current_cumulative_gdd']} واحد-روز رشد ثبت شده و "
f"برای رسیدن به بلوغ حدود {prediction['remaining_gdd']} واحد-روز دیگر نیاز است."
)
def build_harvest_prediction_card(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
@@ -8,19 +24,19 @@ def build_harvest_prediction_card(sensor_id: str, context: dict | None = None, a
forecasts = context.get("forecasts", [])
plants = context.get("plants", [])
avg_temp = average([forecast.temperature_mean for forecast in forecasts], default=24)
moisture_factor = safe_number(getattr(context.get("sensor"), "soil_moisture", None), 50)
days_until = max(10, int(90 - avg_temp - (moisture_factor / 5)))
target_date = date.today() + timedelta(days=days_until)
window_start = target_date - timedelta(days=3)
window_end = target_date + timedelta(days=3)
plant_name = plants[0].name if plants else "محصول"
plant = plants[0] if plants else None
plant_name = plant.name if plant else "محصول"
prediction = predict_harvest_from_forecasts(forecasts=forecasts, plant=plant).__dict__
target_date = date.fromisoformat(prediction["predicted_harvest_date"])
window_start = date.fromisoformat(prediction["predicted_harvest_window"]["start"])
window_end = date.fromisoformat(prediction["predicted_harvest_window"]["end"])
return {
"date": str(target_date),
"date": prediction["predicted_harvest_date"],
"dateFormatted": f"{target_date.day} {target_date.strftime('%B')} {target_date.year}",
"daysUntil": days_until,
"description": f"بر اساس دمای فعلی، رطوبت خاک و اطلاعات {plant_name}. بازه بهینه برداشت محاسبه شده است.",
"optimalWindowStart": str(window_start),
"optimalWindowEnd": str(window_end),
"daysUntil": prediction["estimated_days_to_harvest"],
"description": _harvest_language(prediction, plant_name=plant_name, ai_bundle=ai_bundle),
"optimalWindowStart": window_start.isoformat(),
"optimalWindowEnd": window_end.isoformat(),
"gddDetails": prediction,
}
+64 -16
View File
@@ -1,30 +1,78 @@
from dashboard_data.card_utils import safe_number
from __future__ import annotations
from dashboard_data.remote_sensing import fetch_or_get_ndvi_observation
def _ndvi_explanation(observation, ai_bundle: dict | None = None) -> str:
ai_bundle = ai_bundle or {}
ai_payload = ai_bundle.get("ndviHealthCard", {}) if isinstance(ai_bundle, dict) else {}
explanation = ai_payload.get("explanation")
if isinstance(explanation, str) and explanation.strip():
return explanation.strip()
return (
f"میانگین NDVI مزرعه {observation.mean_ndvi} ثبت شده و کلاس سلامت پوشش گیاهی "
f"در وضعیت {observation.vegetation_health_class} قرار دارد."
)
def build_ndvi_health_card(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
context = context or {}
sensor = context.get("sensor")
if sensor is None:
return {"ndviIndex": 0, "healthData": []}
location = context.get("location")
if location is None:
return {
"mean_ndvi": None,
"ndvi_map": {},
"vegetation_health_class": None,
"observation_date": None,
"satellite_source": None,
"healthData": [],
}
nitrogen = safe_number(sensor.nitrogen, 0)
moisture = safe_number(sensor.soil_moisture, 0)
ndvi = round(min(0.95, max(0.1, ((nitrogen / 100) * 0.4) + ((moisture / 100) * 0.6))), 2)
observation = fetch_or_get_ndvi_observation(location)
if observation is None:
return {
"mean_ndvi": None,
"ndvi_map": {},
"vegetation_health_class": "Unavailable",
"observation_date": None,
"satellite_source": None,
"healthData": [
{
"title": "وضعیت NDVI",
"value": "داده ماهواره‌ای موجود نیست",
"color": "warning",
"icon": "tabler-satellite-off",
},
],
}
mean_value = round(observation.mean_ndvi, 2)
vegetation_class = observation.vegetation_health_class
return {
"ndviIndex": ndvi,
"ndviIndex": mean_value,
"mean_ndvi": mean_value,
"ndvi_map": observation.ndvi_map,
"vegetation_health_class": vegetation_class,
"observation_date": observation.observation_date.isoformat(),
"satellite_source": observation.satellite_source,
"healthData": [
{
"title": "تنش نیتروژن",
"value": "پایین" if nitrogen >= 30 else "بالا",
"color": "success" if nitrogen >= 30 else "warning",
"icon": "tabler-leaf",
"title": "سلامت پوشش گیاهی",
"value": vegetation_class,
"color": "success" if mean_value > 0.6 else "warning" if mean_value >= 0.4 else "error",
"icon": "tabler-plant",
},
{
"title": "سلامت محصول",
"value": "خوب" if ndvi >= 0.65 else "متوسط",
"color": "success" if ndvi >= 0.65 else "warning",
"icon": "tabler-plant",
"title": "تاریخ مشاهده",
"value": observation.observation_date.isoformat(),
"color": "info",
"icon": "tabler-calendar",
},
{
"title": "تفسیر",
"value": _ndvi_explanation(observation, ai_bundle=ai_bundle),
"color": "primary",
"icon": "tabler-message-2",
},
],
}
+4 -1
View File
@@ -1,3 +1,6 @@
def build_recommendations_list(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
ai_bundle = ai_bundle or {}
return {"recommendations": ai_bundle.get("recommendations", [])}
return {
"recommendations": ai_bundle.get("recommendations", []),
"structuredContext": ai_bundle.get("structured_context", {}),
}
+107 -17
View File
@@ -1,35 +1,125 @@
from datetime import date, timedelta
from __future__ import annotations
from dashboard_data.card_utils import PERSIAN_WEEKDAYS, safe_number
from datetime import date, timedelta
from typing import Any
from dashboard_data.card_utils import PERSIAN_WEEKDAYS
INTERPOLATION_LIMIT = 3
QUALITY_REAL = "REAL"
QUALITY_INTERPOLATED = "INTERPOLATED"
QUALITY_MISSING = "MISSING"
def _day_categories() -> list[date]:
return [date.today() - timedelta(days=offset) for offset in range(6, -1, -1)]
def _day_label(day: date) -> str:
return PERSIAN_WEEKDAYS[day.weekday()]
def _history_value_map(history: list[Any], field_name: str) -> dict[date, float | None]:
value_map: dict[date, float | None] = {}
for item in history:
timestamp = getattr(item, "recorded_at", None)
if timestamp is None:
continue
day = timestamp.date()
if day in value_map:
continue
value = getattr(item, field_name, None)
value_map[day] = float(value) if value is not None else None
return value_map
def _apply_linear_interpolation(points: list[dict[str, Any]], limit: int = INTERPOLATION_LIMIT) -> list[dict[str, Any]]:
output = [dict(point) for point in points]
known_indexes = [index for index, point in enumerate(output) if point["value"] is not None]
for start_index, end_index in zip(known_indexes, known_indexes[1:]):
gap = end_index - start_index - 1
if gap <= 0 or gap > limit:
continue
start_value = output[start_index]["value"]
end_value = output[end_index]["value"]
if start_value is None or end_value is None:
continue
step = (end_value - start_value) / (gap + 1)
for offset in range(1, gap + 1):
target_index = start_index + offset
output[target_index]["value"] = round(start_value + (step * offset), 2)
output[target_index]["quality_flag"] = QUALITY_INTERPOLATED
return output
def _build_week_points(
history: list[Any],
field_name: str,
day_offset_start: int,
) -> list[dict[str, Any]]:
days = [date.today() - timedelta(days=offset) for offset in range(day_offset_start + 6, day_offset_start - 1, -1)]
value_map = _history_value_map(history, field_name)
raw_points = [
{
"timestamp": day.isoformat(),
"value": round(value_map[day], 2) if day in value_map and value_map[day] is not None else None,
"quality_flag": QUALITY_REAL if day in value_map and value_map[day] is not None else QUALITY_MISSING,
}
for day in days
]
return _apply_linear_interpolation(raw_points)
def _average_known(points: list[dict[str, Any]]) -> float | None:
values = [point["value"] for point in points if point["value"] is not None]
if not values:
return None
return sum(values) / len(values)
def build_sensor_comparison_chart(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
history = (context or {}).get("history", [])
current_sensor = (context or {}).get("sensor")
current_value = round(safe_number(getattr(current_sensor, "soil_moisture", None), 0))
current_value = getattr(current_sensor, "soil_moisture", None) if current_sensor is not None else None
current_value = round(float(current_value), 2) if current_value is not None else None
recent = list(reversed(history[:7]))
previous = list(reversed(history[7:14]))
this_week = [round(safe_number(item.soil_moisture, current_value)) for item in recent]
last_week = [round(safe_number(item.soil_moisture, current_value - 5)) for item in previous]
this_week_points = _build_week_points(history, "soil_moisture", day_offset_start=0)
last_week_points = _build_week_points(history, "soil_moisture", day_offset_start=7)
while len(this_week) < 7:
this_week.append(current_value)
while len(last_week) < 7:
last_week.append(max(0, current_value - 5))
this_week_avg = _average_known(this_week_points)
last_week_avg = _average_known(last_week_points)
delta = 0
if this_week_avg is not None and last_week_avg not in (None, 0):
delta = round(((this_week_avg - last_week_avg) / last_week_avg) * 100)
categories = [PERSIAN_WEEKDAYS[(date.today() - timedelta(days=offset)).weekday()] for offset in range(6, -1, -1)]
avg_this = sum(this_week) / len(this_week)
avg_last = sum(last_week) / len(last_week)
delta = round(((avg_this - avg_last) / avg_last) * 100) if avg_last else 0
categories = [_day_label(day) for day in _day_categories()]
return {
"currentValue": current_value,
"currentValueQuality": QUALITY_REAL if current_value is not None else QUALITY_MISSING,
"vsLastWeek": f"{'+' if delta >= 0 else ''}{delta}%",
"vsLastWeekValue": delta,
"categories": categories,
"series": [
{"name": "امروز", "data": this_week},
{"name": "هفته قبل", "data": last_week},
{
"name": "هفته جاری",
"data": [point["value"] for point in this_week_points],
"points": this_week_points,
},
{
"name": "هفته قبل",
"data": [point["value"] for point in last_week_points],
"points": last_week_points,
},
],
"qualityLegend": {
QUALITY_REAL: "اندازه‌گیری واقعی سنسور",
QUALITY_INTERPOLATED: "برآورد خطی برای شکاف کوتاه داده",
QUALITY_MISSING: "داده معتبر در دسترس نیست",
},
}
+122 -19
View File
@@ -1,14 +1,92 @@
from dashboard_data.card_utils import safe_number
from __future__ import annotations
from typing import Any
from dashboard_data.card_utils import average, safe_number
def _to_score(value, lower, upper):
if value is None:
DEFAULT_IDEAL_SENSOR_PROFILE = {
"temperature": {"ideal": 24.0, "min": 18.0, "max": 30.0},
"moisture": {"ideal": 65.0, "min": 45.0, "max": 80.0},
"ph": {"ideal": 6.5, "min": 6.0, "max": 7.2},
"ec": {"ideal": 1.4, "min": 1.0, "max": 2.0},
"humidity": {"ideal": 60.0, "min": 45.0, "max": 75.0},
}
METRIC_ORDER = [
("temperature", "دما"),
("moisture", "رطوبت"),
("ph", "pH"),
("ec", "هدایت الکتریکی"),
("humidity", "رطوبت هوا"),
]
def _normalize_to_ideal_score(value: float | None, minimum: float, ideal: float, maximum: float) -> int:
if value is None or maximum <= minimum:
return 0
if value <= lower:
if value <= minimum or value >= maximum:
return 0
if value >= upper:
if value == ideal:
return 100
return round(((value - lower) / (upper - lower)) * 100)
if value < ideal:
span = ideal - minimum
if span <= 0:
return 0
return round(max(0.0, min(1.0, (value - minimum) / span)) * 100)
span = maximum - ideal
if span <= 0:
return 0
return round(max(0.0, min(1.0, (maximum - value) / span)) * 100)
def _resolve_profile(context: dict[str, Any]) -> tuple[dict[str, dict[str, float]], str]:
location = context.get("location")
if location is not None:
location_profile = getattr(location, "ideal_sensor_profile", None) or {}
if location_profile:
merged = {
metric: {**DEFAULT_IDEAL_SENSOR_PROFILE.get(metric, {}), **location_profile.get(metric, {})}
for metric in set(DEFAULT_IDEAL_SENSOR_PROFILE) | set(location_profile)
}
return merged, "location"
plants = context.get("plants", [])
for plant in plants:
plant_profile = getattr(plant, "health_profile", None) or {}
if plant_profile:
translated: dict[str, dict[str, float]] = {}
for metric in DEFAULT_IDEAL_SENSOR_PROFILE:
metric_data = plant_profile.get(metric)
if not metric_data:
continue
translated[metric] = {
"ideal": float(metric_data.get("ideal_value", DEFAULT_IDEAL_SENSOR_PROFILE[metric]["ideal"])),
"min": float(metric_data.get("min_range", DEFAULT_IDEAL_SENSOR_PROFILE[metric]["min"])),
"max": float(metric_data.get("max_range", DEFAULT_IDEAL_SENSOR_PROFILE[metric]["max"])),
}
merged = {
metric: {**DEFAULT_IDEAL_SENSOR_PROFILE.get(metric, {}), **translated.get(metric, {})}
for metric in DEFAULT_IDEAL_SENSOR_PROFILE
}
return merged, f"plant:{getattr(plant, 'name', 'unknown')}"
return DEFAULT_IDEAL_SENSOR_PROFILE, "default"
def _current_metric_values(sensor: Any, forecasts: list[Any]) -> dict[str, float]:
current_forecast = forecasts[0] if forecasts else None
humidity = average(
[getattr(forecast, "humidity_mean", None) for forecast in forecasts[:3]],
default=safe_number(getattr(current_forecast, "humidity_mean", None), 0),
)
return {
"temperature": float(safe_number(getattr(sensor, "soil_temperature", None), 0)),
"moisture": float(safe_number(getattr(sensor, "soil_moisture", None), 0)),
"ph": float(safe_number(getattr(sensor, "soil_ph", None), 0)),
"ec": float(safe_number(getattr(sensor, "electrical_conductivity", None), 0)),
"humidity": float(safe_number(humidity, 0)),
}
def build_sensor_radar_chart(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
@@ -16,22 +94,47 @@ def build_sensor_radar_chart(sensor_id: str, context: dict | None = None, ai_bun
sensor = context.get("sensor")
forecasts = context.get("forecasts", [])
if sensor is None:
return {"labels": [], "series": []}
return {"labels": [], "series": [], "profileSource": None, "profile": {}}
current_weather = forecasts[0] if forecasts else None
current = [
_to_score(sensor.soil_temperature, 0, 40),
_to_score(sensor.soil_moisture, 0, 100),
_to_score(sensor.soil_ph, 0, 14),
_to_score(sensor.electrical_conductivity, 0, 5),
85,
_to_score(current_weather.wind_speed_max if current_weather else 0, 0, 30),
]
profile, profile_source = _resolve_profile(context)
current_values = _current_metric_values(sensor, forecasts)
labels: list[str] = []
current_series: list[int] = []
ideal_series: list[int] = []
metric_details: list[dict[str, Any]] = []
for metric_key, label in METRIC_ORDER:
metric_profile = profile.get(metric_key, DEFAULT_IDEAL_SENSOR_PROFILE.get(metric_key, {}))
minimum = float(metric_profile.get("min", DEFAULT_IDEAL_SENSOR_PROFILE[metric_key]["min"]))
ideal = float(metric_profile.get("ideal", DEFAULT_IDEAL_SENSOR_PROFILE[metric_key]["ideal"]))
maximum = float(metric_profile.get("max", DEFAULT_IDEAL_SENSOR_PROFILE[metric_key]["max"]))
current_value = current_values.get(metric_key)
labels.append(label)
current_score = _normalize_to_ideal_score(current_value, minimum, ideal, maximum)
current_series.append(current_score)
ideal_series.append(100)
metric_details.append(
{
"metricType": metric_key,
"label": label,
"currentValue": round(current_value, 2) if current_value is not None else None,
"idealValue": round(ideal, 2),
"minRange": round(minimum, 2),
"maxRange": round(maximum, 2),
"currentScore": current_score,
"idealScore": 100,
}
)
return {
"labels": ["دما", "رطوبت", "pH", "هدایت الکتریکی", "نور", "باد"],
"labels": labels,
"profileSource": profile_source,
"profile": profile,
"metricDetails": metric_details,
"series": [
{"name": "امروز", "data": current},
{"name": "ایده‌آل", "data": [80, 70, 75, 75, 90, 50]},
{"name": "امروز", "data": current_series},
{"name": "پروفایل ایده‌آل", "data": ideal_series},
],
}
+191 -22
View File
@@ -1,32 +1,201 @@
from dashboard_data.card_utils import safe_number
from __future__ import annotations
from math import sqrt
from typing import Any
from sensor_data.models import SensorData, SensorDataHistory
QUALITY_REAL = "REAL"
QUALITY_INTERPOLATED = "INTERPOLATED"
QUALITY_MISSING = "MISSING"
INTERPOLATION_LIMIT = 3
IDW_POWER = 2
MAX_GRID_STEPS = 10
def _interpolate_series(points: list[dict[str, Any]], limit: int = INTERPOLATION_LIMIT) -> list[dict[str, Any]]:
output = [dict(point) for point in points]
known_indexes = [index for index, point in enumerate(output) if point["value"] is not None]
for start_index, end_index in zip(known_indexes, known_indexes[1:]):
gap = end_index - start_index - 1
if gap <= 0 or gap > limit:
continue
start_value = output[start_index]["value"]
end_value = output[end_index]["value"]
if start_value is None or end_value is None:
continue
step = (end_value - start_value) / (gap + 1)
for offset in range(1, gap + 1):
target_index = start_index + offset
output[target_index]["value"] = round(start_value + (step * offset), 2)
output[target_index]["quality_flag"] = QUALITY_INTERPOLATED
return output
def _sensor_time_series(sensor: Any, histories: list[Any]) -> list[dict[str, Any]]:
points = []
for item in reversed(histories):
points.append(
{
"timestamp": item.recorded_at.isoformat(),
"value": float(item.soil_moisture) if item.soil_moisture is not None else None,
"quality_flag": QUALITY_REAL if item.soil_moisture is not None else QUALITY_MISSING,
}
)
points.append(
{
"timestamp": sensor.updated_at.isoformat() if getattr(sensor, "updated_at", None) else None,
"value": float(sensor.soil_moisture) if getattr(sensor, "soil_moisture", None) is not None else None,
"quality_flag": QUALITY_REAL if getattr(sensor, "soil_moisture", None) is not None else QUALITY_MISSING,
}
)
return _interpolate_series(points)
def _latest_sensor_measurement(sensor: Any, histories: list[Any]) -> dict[str, Any]:
series = _sensor_time_series(sensor, histories)
latest = series[-1] if series else {"timestamp": None, "value": None, "quality_flag": QUALITY_MISSING}
return {
"sensor_id": str(sensor.uuid_sensor),
"latitude": float(sensor.location.latitude),
"longitude": float(sensor.location.longitude),
"depth": None,
"timestamp": latest["timestamp"],
"soil_moisture_value": latest["value"],
"quality_flag": latest["quality_flag"],
}
def _idw_value(lat: float, lon: float, sensor_points: list[dict[str, Any]]) -> float | None:
weighted_sum = 0.0
weight_total = 0.0
for point in sensor_points:
value = point["soil_moisture_value"]
if value is None:
continue
distance = sqrt(((lat - point["latitude"]) ** 2) + ((lon - point["longitude"]) ** 2))
if distance == 0:
return round(float(value), 2)
weight = 1 / (distance**IDW_POWER)
weighted_sum += weight * float(value)
weight_total += weight
if weight_total == 0:
return None
return round(weighted_sum / weight_total, 2)
def _grid_axis(min_value: float, max_value: float) -> list[float]:
if min_value == max_value:
return [round(min_value, 6)]
step_count = min(MAX_GRID_STEPS, max(int((max_value - min_value) / 0.0001) + 1, 2))
step = (max_value - min_value) / (step_count - 1)
return [round(min_value + (step * index), 6) for index in range(step_count)]
def _load_sensor_network(current_sensor: Any) -> list[Any]:
plant_ids = list(current_sensor.plants.values_list("id", flat=True))
queryset = SensorData.objects.select_related("location").prefetch_related("plants")
if plant_ids:
queryset = queryset.filter(plants__id__in=plant_ids).distinct()
return list(queryset)
def build_soil_moisture_heatmap(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
context = context or {}
sensor = context.get("sensor")
depths = context.get("depths", [])
if sensor is None:
return {"zones": [], "hours": [], "series": []}
current_sensor = context.get("sensor")
if current_sensor is None:
return {
"timestamp": None,
"grid_resolution": None,
"grid_cells": [],
"sensor_points": [],
"quality_legend": {},
}
hours = ["۶ ص", "۸ ص", "۱۰ ص", "۱۲ ظ", "۱۴ ع", "۱۶ ع", "۱۸ ع"]
base_moisture = safe_number(sensor.soil_moisture, 0)
series = []
zones = []
sensors = _load_sensor_network(current_sensor)
sensor_ids = [sensor.uuid_sensor for sensor in sensors]
history_rows = SensorDataHistory.objects.filter(uuid_sensor__in=sensor_ids).order_by("-recorded_at")[:200]
history_map: dict[Any, list[Any]] = {}
for row in history_rows:
history_map.setdefault(row.uuid_sensor, []).append(row)
if not depths:
depths = [None, None]
sensor_points = [
_latest_sensor_measurement(sensor, history_map.get(sensor.uuid_sensor, []))
for sensor in sensors
]
valid_sensor_points = [point for point in sensor_points if point["soil_moisture_value"] is not None]
for index, depth in enumerate(depths[:7], start=1):
zones.append(f"زون {index}")
depth_offset = 0 if depth is None else round(safe_number(getattr(depth, "wv0033", None), 0) / 10)
data = []
for hour_index, hour in enumerate(hours):
value = max(0, min(100, round(base_moisture + depth_offset - abs(3 - hour_index) * 2)))
data.append({"x": hour, "y": value})
series.append({"name": f"زون {index}", "data": data})
if not valid_sensor_points:
return {
"timestamp": current_sensor.updated_at.isoformat() if getattr(current_sensor, "updated_at", None) else None,
"grid_resolution": None,
"grid_cells": [],
"sensor_points": sensor_points,
"quality_legend": {
QUALITY_REAL: "اندازه‌گیری واقعی سنسور",
QUALITY_INTERPOLATED: "مقدار سنسور با درون‌یابی زمانی کوتاه‌مدت",
QUALITY_MISSING: "داده معتبر برای سنسور موجود نیست",
},
}
min_lat = min(point["latitude"] for point in valid_sensor_points)
max_lat = max(point["latitude"] for point in valid_sensor_points)
min_lon = min(point["longitude"] for point in valid_sensor_points)
max_lon = max(point["longitude"] for point in valid_sensor_points)
lat_axis = _grid_axis(min_lat, max_lat)
lon_axis = _grid_axis(min_lon, max_lon)
grid_cells = []
for lat in lat_axis:
for lon in lon_axis:
direct_sensor = next(
(
point for point in valid_sensor_points
if point["latitude"] == lat and point["longitude"] == lon
),
None,
)
if direct_sensor is not None:
moisture_value = direct_sensor["soil_moisture_value"]
quality_flag = direct_sensor["quality_flag"]
elif len(valid_sensor_points) >= 2:
moisture_value = _idw_value(lat, lon, valid_sensor_points)
quality_flag = QUALITY_INTERPOLATED if moisture_value is not None else QUALITY_MISSING
else:
moisture_value = None
quality_flag = QUALITY_MISSING
grid_cells.append(
{
"lat": lat,
"lon": lon,
"moisture_value": moisture_value,
"quality_flag": quality_flag,
}
)
lat_step = round(abs(lat_axis[1] - lat_axis[0]), 6) if len(lat_axis) > 1 else 0.0
lon_step = round(abs(lon_axis[1] - lon_axis[0]), 6) if len(lon_axis) > 1 else 0.0
return {
"zones": zones,
"hours": hours,
"series": series,
"timestamp": max(point["timestamp"] for point in sensor_points if point["timestamp"]),
"grid_resolution": {
"lat_step": lat_step,
"lon_step": lon_step,
"rows": len(lat_axis),
"cols": len(lon_axis),
},
"grid_cells": grid_cells,
"sensor_points": sensor_points,
"quality_legend": {
QUALITY_REAL: "اندازه‌گیری واقعی سنسور",
QUALITY_INTERPOLATED: "مقدار برآوردشده با درون‌یابی زمانی/فضایی",
QUALITY_MISSING: "داده معتبر در دسترس نیست",
},
}
+32 -12
View File
@@ -1,18 +1,38 @@
from dashboard_data.card_utils import safe_number
from irrigation.evapotranspiration import calculate_forecast_water_needs, resolve_crop_profile
def build_water_need_prediction(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
forecasts = (context or {}).get("forecasts", [])
daily_needs = []
for forecast in forecasts[:7]:
et0 = safe_number(forecast.et0, 4)
rain = safe_number(forecast.precipitation, 0)
need = max(0, round((et0 * 100) - (rain * 20)))
daily_needs.append(need)
context = context or {}
forecasts = context.get("forecasts", [])
location = context.get("location")
plants = context.get("plants", [])
irrigation_methods = context.get("irrigation_methods", [])
if not forecasts or location is None:
return {
"totalNext7Days": 0,
"unit": "mm",
"categories": [],
"series": [],
"dailyBreakdown": [],
}
plant = plants[0] if plants else None
crop_profile = resolve_crop_profile(plant)
efficiency = getattr(irrigation_methods[0], "water_efficiency_percent", None) if irrigation_methods else None
daily = calculate_forecast_water_needs(
forecasts=forecasts[:7],
latitude_deg=float(location.latitude),
crop_profile=crop_profile,
growth_stage=crop_profile.get("current_stage"),
irrigation_efficiency_percent=efficiency,
)
daily_requirements = [round(item["gross_irrigation_mm"], 2) for item in daily]
return {
"totalNext7Days": sum(daily_needs),
"unit": "",
"categories": [f"روز {index}" for index in range(1, len(daily_needs) + 1)],
"series": [{"name": "نیاز آبی", "data": daily_needs}],
"totalNext7Days": round(sum(daily_requirements), 2),
"unit": "mm",
"categories": [f"روز {index}" for index in range(1, len(daily_requirements) + 1)],
"series": [{"name": "نیاز آبی تعدیل‌شده", "data": daily_requirements}],
"dailyBreakdown": daily,
}