Files
Backend/dashboard/services.py
T
2026-04-11 03:54:15 +03:30

124 lines
4.6 KiB
Python

from copy import deepcopy
from water.services import (
get_farm_weather_card_data,
get_water_need_prediction_data,
get_water_stress_index_data,
)
from crop_health.services import get_crop_health_summary_data
from economic_overview.services import get_economic_overview_data
from farm_alerts.services import (
get_alert_timeline_data,
get_alert_tracker_data,
get_recommendations_list_data,
)
from fertilization_recommendation.services import get_fertilization_dashboard_recommendation
from irrigation_recommendation.services import get_irrigation_dashboard_recommendation
from pest_detection.services import get_risk_summary_data
from sensor_7_in_1.services import (
get_sensor_7_in_1_summary_data,
)
from yield_harvest.services import get_yield_harvest_summary_data
from .mock_data import ALL_CARDS
def _update_kpi(card_lookup, card_data):
if not card_data:
return
card_id = card_data.get("id")
if not card_id or card_id not in card_lookup:
return
details = card_data.get("details")
clean_data = {key: value for key, value in card_data.items() if key != "details"}
card_lookup[card_id].update(clean_data)
if details is not None:
card_lookup[card_id]["details"] = details
def _build_overview_kpis(base_cards, crop_health_summary, water_stress_index, avg_soil_moisture, risk_summary, yield_summary):
kpis = [crop_health_summary["farmHealthScore"], water_stress_index, avg_soil_moisture, *deepcopy(base_cards["kpis"])]
card_lookup = {item["id"]: item for item in kpis}
_update_kpi(card_lookup, water_stress_index)
_update_kpi(card_lookup, avg_soil_moisture)
_update_kpi(card_lookup, risk_summary.get("disease_risk", {}))
_update_kpi(card_lookup, risk_summary.get("pest_risk", {}))
_update_kpi(card_lookup, yield_summary.get("yield_prediction_card", {}))
return {"kpis": kpis}
def _build_recommendations_list(farm, fallback_data, harvest_card):
recommendations = []
recommendations.extend(get_recommendations_list_data(farm).get("recommendations", []))
recommendations.append(get_irrigation_dashboard_recommendation(farm))
recommendations.append(get_fertilization_dashboard_recommendation(farm))
if harvest_card:
recommendations.append(
{
"title": f"بازه برداشت: {harvest_card.get('optimalWindowStart', '')} تا {harvest_card.get('optimalWindowEnd', '')}",
"subtitle": harvest_card.get("description", ""),
"avatarIcon": "tabler-calendar-event",
"avatarColor": "info",
}
)
deduped = []
seen_titles = set()
for item in recommendations:
title = item.get("title")
if not title or title in seen_titles:
continue
seen_titles.add(title)
deduped.append(item)
if deduped:
return {"recommendations": deduped[:4]}
return deepcopy(fallback_data)
def get_farm_dashboard_cards(farm):
cards = deepcopy(ALL_CARDS)
weather_card = get_farm_weather_card_data(farm)
crop_health_summary = get_crop_health_summary_data(farm)
risk_summary = get_risk_summary_data(farm)
yield_summary = get_yield_harvest_summary_data(farm)
water_stress_index = get_water_stress_index_data(farm)
sensor_summary = get_sensor_7_in_1_summary_data(farm)
avg_soil_moisture = sensor_summary["avgSoilMoisture"]
cards["farmWeatherCard"] = weather_card
cards["farmAlertsTracker"] = get_alert_tracker_data(farm)
cards["farmAlertsTimeline"] = get_alert_timeline_data(farm)
cards["sensorValuesList"] = sensor_summary["sensorValuesList"]
cards["anomalyDetectionCard"] = sensor_summary["anomalyDetectionCard"]
cards["waterNeedPrediction"] = get_water_need_prediction_data(farm)
cards["harvestPredictionCard"] = yield_summary["harvest_prediction_card"]
cards["yieldPredictionChart"] = yield_summary["yield_prediction_chart"]
cards["sensorRadarChart"] = sensor_summary["sensorRadarChart"]
cards["sensorComparisonChart"] = sensor_summary["sensorComparisonChart"]
cards["soilMoistureHeatmap"] = sensor_summary["soilMoistureHeatmap"]
cards["ndviHealthCard"] = crop_health_summary["ndviHealthCard"]
cards["economicOverview"] = get_economic_overview_data(farm)
cards["farmOverviewKpis"] = _build_overview_kpis(
cards["farmOverviewKpis"],
crop_health_summary,
water_stress_index,
avg_soil_moisture,
risk_summary,
yield_summary,
)
cards["recommendationsList"] = _build_recommendations_list(
farm,
cards["recommendationsList"],
yield_summary.get("harvest_prediction_card", {}),
)
return cards