This commit is contained in:
2026-04-25 17:22:41 +03:30
parent 569d520a5c
commit aa24fc22b0
124 changed files with 8491 additions and 2582 deletions
+1
View File
@@ -0,0 +1 @@
+262
View File
@@ -0,0 +1,262 @@
from __future__ import annotations
from math import sqrt
from statistics import mean
from typing import Any
METRIC_CONFIG = {
"soil_moisture": {
"label": "رطوبت خاک",
"unit": "%",
"source": "history",
"current_field": "soil_moisture",
},
"soil_temperature": {
"label": "دمای خاک",
"unit": "°C",
"source": "history",
"current_field": "soil_temperature",
},
"humidity": {
"label": "رطوبت هوا",
"unit": "%",
"source": "forecast",
"forecast_field": "humidity_mean",
},
"soil_ph": {
"label": "pH خاک",
"unit": "pH",
"source": "history",
"current_field": "soil_ph",
},
"electrical_conductivity": {
"label": "هدایت الکتریکی",
"unit": "dS/m",
"source": "history",
"current_field": "electrical_conductivity",
},
}
METHOD_PRIORITY = {"IQR": 2, "Z_SCORE": 1}
def _percentile(sorted_values: list[float], percentile: float) -> float:
if not sorted_values:
return 0.0
if len(sorted_values) == 1:
return sorted_values[0]
index = (len(sorted_values) - 1) * percentile
lower = int(index)
upper = min(lower + 1, len(sorted_values) - 1)
fraction = index - lower
return sorted_values[lower] + ((sorted_values[upper] - sorted_values[lower]) * fraction)
def _population_std(values: list[float]) -> float:
if len(values) < 2:
return 0.0
center = mean(values)
variance = sum((value - center) ** 2 for value in values) / len(values)
return sqrt(variance)
def _severity_from_score(score: float) -> str:
absolute = abs(score)
if absolute >= 3.5:
return "critical"
if absolute >= 2.5:
return "high"
if absolute >= 1.5:
return "medium"
return "low"
def _history_series(history: list[Any], field_name: str) -> tuple[list[float], str | None, float | None]:
values: list[float] = []
latest_timestamp = None
latest_value = None
for item in history:
value = getattr(item, field_name, None)
if value is None:
continue
numeric = float(value)
values.append(numeric)
if latest_timestamp is None:
recorded_at = getattr(item, "recorded_at", None)
latest_timestamp = recorded_at.isoformat() if recorded_at is not None else None
latest_value = numeric
return list(reversed(values)), latest_timestamp, latest_value
def _forecast_series(forecasts: list[Any], field_name: str) -> tuple[list[float], str | None, float | None]:
values: list[float] = []
latest_timestamp = None
latest_value = None
for forecast in forecasts[:7]:
value = getattr(forecast, field_name, None)
if value is None:
continue
numeric = float(value)
values.append(numeric)
if latest_timestamp is None:
forecast_date = getattr(forecast, "forecast_date", None)
latest_timestamp = forecast_date.isoformat() if forecast_date is not None else None
latest_value = numeric
return values, latest_timestamp, latest_value
def _detect_with_z_score(values: list[float], observed_value: float) -> dict[str, Any] | None:
if len(values) < 5:
return None
center = mean(values)
std = _population_std(values)
if std == 0:
return None
score = (observed_value - center) / std
if abs(score) < 2.0:
return None
return {
"anomaly_method": "Z_SCORE",
"deviation_score": round(score, 3),
"expected_range": [round(center - (2 * std), 2), round(center + (2 * std), 2)],
"severity": _severity_from_score(score),
}
def _detect_with_iqr(values: list[float], observed_value: float) -> dict[str, Any] | None:
if len(values) < 5:
return None
sorted_values = sorted(values)
q1 = _percentile(sorted_values, 0.25)
q3 = _percentile(sorted_values, 0.75)
iqr = q3 - q1
if iqr == 0:
return None
lower = q1 - (1.5 * iqr)
upper = q3 + (1.5 * iqr)
if lower <= observed_value <= upper:
return None
if observed_value < lower:
score = (observed_value - lower) / iqr
else:
score = (observed_value - upper) / iqr
return {
"anomaly_method": "IQR",
"deviation_score": round(score, 3),
"expected_range": [round(lower, 2), round(upper, 2)],
"severity": _severity_from_score(score),
}
def _select_detection_result(results: list[dict[str, Any]]) -> dict[str, Any] | None:
if not results:
return None
return sorted(
results,
key=lambda item: (METHOD_PRIORITY[item["anomaly_method"]], abs(item["deviation_score"])),
reverse=True,
)[0]
def _build_contextual_interpretation(anomalies: list[dict[str, Any]], ai_bundle: dict | None = None) -> dict[str, Any]:
ai_bundle = ai_bundle or {}
ai_payload = ai_bundle.get("anomalyDetectionCard", {}) if isinstance(ai_bundle, dict) else {}
if isinstance(ai_payload, dict) and all(ai_payload.get(key) for key in ("explanation", "likely_cause", "recommended_action")):
return {
"explanation": ai_payload["explanation"],
"likely_cause": ai_payload["likely_cause"],
"recommended_action": ai_payload["recommended_action"],
}
metric_types = {item["metric_type"] for item in anomalies}
if {"soil_temperature", "soil_moisture"} <= metric_types:
return {
"explanation": "هم‌زمانی ناهنجاری دمای خاک و رطوبت خاک نشان می‌دهد تنش ترکیبی در ناحیه ریشه در حال شکل‌گیری است.",
"likely_cause": "احتمالاً الگوی آبیاری، موج گرما یا افت ناگهانی ظرفیت نگهداشت رطوبت خاک عامل اصلی است.",
"recommended_action": "زمان‌بندی آبیاری و وضعیت زهکشی/تبخیر بررسی و قرائت‌های سنسور در ۲۴ ساعت آینده دوباره پایش شود.",
}
if "electrical_conductivity" in metric_types and "soil_moisture" in metric_types:
return {
"explanation": "هم‌زمانی ناهنجاری EC و رطوبت می‌تواند نشان‌دهنده فشار شوری یا تجمع نمک در بستر باشد.",
"likely_cause": "کیفیت آب آبیاری، کوددهی اخیر یا کاهش شست‌وشوی خاک می‌تواند عامل این الگو باشد.",
"recommended_action": "EC آب و برنامه کوددهی بازبینی و در صورت نیاز شست‌وشوی کنترل‌شده خاک بررسی شود.",
}
if anomalies:
top = anomalies[0]
return {
"explanation": f"در شاخص {top['label']} یک ناهنجاری آماری با روش {top['anomaly_method']} شناسایی شده است.",
"likely_cause": "این رخداد می‌تواند ناشی از تغییر ناگهانی شرایط محیطی، خطای فرایندی یا نیاز به کالیبراسیون سنسور باشد.",
"recommended_action": "روند همان شاخص و داده‌های پیرامونی بازبینی و در صورت تداوم، اقدام اصلاحی مزرعه اجرا شود.",
}
return {
"explanation": "ناهنجاری آماری معناداری در داده‌های اخیر شناسایی نشد.",
"likely_cause": "داده‌های فعلی با الگوی تاریخی سازگار هستند.",
"recommended_action": "پایش عادی ادامه یابد.",
}
def build_anomaly_detection_card(sensor_id: str, context: dict | None = None, ai_bundle: dict | None = None) -> dict:
context = context or {}
sensor = context.get("sensor")
history = context.get("history", [])
forecasts = context.get("forecasts", [])
if sensor is None:
return {"anomalies": [], "interpretation": None}
anomalies: list[dict[str, Any]] = []
for metric_type, config in METRIC_CONFIG.items():
if config["source"] == "history":
values, timestamp, observed_value = _history_series(history, config["current_field"])
current_value = getattr(sensor, config["current_field"], None)
if current_value is not None:
observed_value = float(current_value)
timestamp = getattr(sensor, "updated_at", None)
timestamp = timestamp.isoformat() if timestamp is not None else timestamp
else:
values, timestamp, observed_value = _forecast_series(forecasts, config["forecast_field"])
if observed_value is None or len(values) < 5:
continue
detection = _select_detection_result(
[
result
for result in (
_detect_with_z_score(values, observed_value),
_detect_with_iqr(values, observed_value),
)
if result is not None
]
)
if detection is None:
continue
anomalies.append(
{
"metric_type": metric_type,
"label": config["label"],
"timestamp": timestamp,
"observed_value": round(observed_value, 2),
"expected_range": detection["expected_range"],
"deviation_score": detection["deviation_score"],
"anomaly_method": detection["anomaly_method"],
"severity": detection["severity"],
"unit": config["unit"],
}
)
anomalies.sort(key=lambda item: abs(item["deviation_score"]), reverse=True)
interpretation = _build_contextual_interpretation(anomalies, ai_bundle=ai_bundle)
return {
"anomalies": anomalies,
"interpretation": interpretation,
}
+36
View File
@@ -0,0 +1,36 @@
from functools import cached_property
from django.apps import AppConfig
class SoileConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "soile"
verbose_name = "Soile"
@cached_property
def soil_moisture_service(self):
from .services import SoilMoistureHeatmapService
return SoilMoistureHeatmapService()
def get_soil_moisture_service(self):
return self.soil_moisture_service
@cached_property
def soil_health_service(self):
from .services import SoilHealthService
return SoilHealthService()
def get_soil_health_service(self):
return self.soil_health_service
@cached_property
def soil_anomaly_service(self):
from .services import SoilAnomalyDetectionService
return SoilAnomalyDetectionService()
def get_soil_anomaly_service(self):
return self.soil_anomaly_service
+147
View File
@@ -0,0 +1,147 @@
from __future__ import annotations
from typing import Any
DEFAULT_HEALTH_PROFILE = {
"moisture": {"ideal_value": 65.0, "min_range": 45.0, "max_range": 75.0, "weight": 0.45},
"ph": {"ideal_value": 6.6, "min_range": 6.0, "max_range": 7.5, "weight": 0.30},
"ec": {"ideal_value": 1.2, "min_range": 0.2, "max_range": 3.0, "weight": 0.25},
}
METRIC_SPECS = {
"moisture": {"sensor_field": "soil_moisture", "label": "رطوبت خاک", "unit": "%"},
"ph": {"sensor_field": "soil_ph", "label": "pH خاک", "unit": "pH"},
"ec": {"sensor_field": "electrical_conductivity", "label": "هدایت الکتریکی", "unit": "dS/m"},
}
def _safe_number(value: Any, default: float = 0.0) -> float:
return default if value is None else float(value)
def _normalize_metric(value: float, ideal_value: float, min_range: float, max_range: float) -> float:
if max_range <= min_range:
return 0.0
if value <= min_range or value >= max_range:
return 0.0
if value == ideal_value:
return 1.0
if value < ideal_value:
span = ideal_value - min_range
if span <= 0:
return 0.0
return max(0.0, min(1.0, (value - min_range) / span))
span = max_range - ideal_value
if span <= 0:
return 0.0
return max(0.0, min(1.0, (max_range - value) / span))
def resolve_plant_profile(plants: list[Any]) -> tuple[dict[str, dict[str, float]], str]:
for plant in plants:
profile = getattr(plant, "health_profile", None) or {}
if profile:
merged = {
metric: {
**DEFAULT_HEALTH_PROFILE.get(metric, {}),
**profile.get(metric, {}),
}
for metric in set(DEFAULT_HEALTH_PROFILE) | set(profile)
}
return merged, getattr(plant, "name", "گیاه")
return DEFAULT_HEALTH_PROFILE, (plants[0].name if plants else "پروفایل پیش‌فرض")
def compute_health_score(sensor: Any, profile: dict[str, dict[str, float]]) -> tuple[int, list[dict[str, Any]]]:
weighted_sum = 0.0
total_weight = 0.0
components: list[dict[str, Any]] = []
for metric_type, config in profile.items():
spec = METRIC_SPECS.get(metric_type)
if spec is None:
continue
sensor_value = getattr(sensor, spec["sensor_field"], None)
if sensor_value is None:
continue
current_value = _safe_number(sensor_value, 0)
defaults = DEFAULT_HEALTH_PROFILE.get(metric_type, {})
ideal_value = float(config.get("ideal_value", defaults.get("ideal_value", 0)))
min_range = float(config.get("min_range", defaults.get("min_range", 0)))
max_range = float(config.get("max_range", defaults.get("max_range", 0)))
weight = float(config.get("weight", defaults.get("weight", 0)))
if weight <= 0:
continue
normalized_value = _normalize_metric(current_value, ideal_value, min_range, max_range)
weighted_sum += weight * normalized_value
total_weight += weight
components.append(
{
"metricType": metric_type,
"label": spec["label"],
"unit": spec["unit"],
"currentValue": round(current_value, 2),
"idealValue": round(ideal_value, 2),
"minRange": round(min_range, 2),
"maxRange": round(max_range, 2),
"weight": round(weight, 3),
"normalizedValue": round(normalized_value, 4),
"weightedContribution": round(weight * normalized_value, 4),
}
)
if total_weight <= 0:
return 0, components
score = round((weighted_sum / total_weight) * 100)
return max(0, min(100, score)), components
def health_language(health_score: int) -> dict[str, str]:
if health_score >= 85:
return {
"short_chip_text": "بسیار خوب",
"action_hint": "برنامه فعلی پایش و نگهداری حفظ شود.",
"explanation": "بیشتر شاخص های کلیدی نزدیک به پروفایل ایده آل گیاه هستند.",
}
if health_score >= 70:
return {
"short_chip_text": "پایدار",
"action_hint": "تنظیمات فعلی حفظ و فقط شاخص های مرزی پایش شوند.",
"explanation": "وضعیت کلی مزرعه قابل قبول است اما بعضی شاخص ها هنوز جای بهبود دارند.",
}
if health_score >= 50:
return {
"short_chip_text": "نیازمند تنظیم",
"action_hint": "پارامترهای دور از محدوده ایده آل در اولویت اصلاح قرار گیرند.",
"explanation": "بخشی از شرایط محیطی از پروفایل مطلوب گیاه فاصله گرفته است.",
}
return {
"short_chip_text": "تنش بالا",
"action_hint": "اصلاح فوری رطوبت، تغذیه يا شوری بر اساس اجزای امتیاز انجام شود.",
"explanation": "چند شاخص اصلی خارج از بازه قابل قبول گیاه هستند.",
}
def build_soil_health_summary(sensor: Any, plants: list[Any]) -> dict[str, Any]:
profile, profile_source = resolve_plant_profile(plants)
health_score, health_components = compute_health_score(sensor, profile)
moisture = _safe_number(getattr(sensor, "soil_moisture", None), 0)
language = health_language(health_score)
return {
"healthScore": health_score,
"profileSource": profile_source,
"healthScoreDetails": {
"method": "normalized_weighted_average",
"profileSource": profile_source,
"components": health_components,
},
"healthLanguage": language,
"avgSoilMoisture": round(moisture),
"avgSoilMoistureRaw": round(moisture, 2),
"avgSoilMoistureStatus": "بهینه" if 45 <= moisture <= 75 else "نیازمند بررسی",
}
+46
View File
@@ -0,0 +1,46 @@
from rest_framework import serializers
class SoilMoistureHeatmapRequestSerializer(serializers.Serializer):
farm_uuid = serializers.UUIDField(required=True, help_text="شناسه یکتای مزرعه")
class SoilMoistureHeatmapResponseSerializer(serializers.Serializer):
farm_uuid = serializers.CharField()
location = serializers.JSONField()
current_sensor = serializers.JSONField()
soil_profile = serializers.JSONField()
timestamp = serializers.CharField(allow_null=True)
grid_resolution = serializers.JSONField(allow_null=True)
grid_cells = serializers.JSONField()
sensor_points = serializers.JSONField()
quality_legend = serializers.JSONField()
class SoilAnomalyDetectionRequestSerializer(serializers.Serializer):
farm_uuid = serializers.UUIDField(required=True, help_text="شناسه یکتای مزرعه")
class SoilHealthSummaryRequestSerializer(serializers.Serializer):
farm_uuid = serializers.UUIDField(required=True, help_text="شناسه یکتای مزرعه")
class SoilHealthSummaryResponseSerializer(serializers.Serializer):
farm_uuid = serializers.CharField()
healthScore = serializers.IntegerField()
profileSource = serializers.CharField()
healthScoreDetails = serializers.JSONField()
healthLanguage = serializers.JSONField()
avgSoilMoisture = serializers.IntegerField()
avgSoilMoistureRaw = serializers.FloatField()
avgSoilMoistureStatus = serializers.CharField()
class SoilAnomalyDetectionResponseSerializer(serializers.Serializer):
farm_uuid = serializers.CharField()
generated_at = serializers.CharField()
anomalies = serializers.JSONField()
interpretation = serializers.JSONField()
knowledge_base = serializers.CharField(allow_null=True, required=False)
tone_file = serializers.CharField(allow_null=True, required=False)
raw_response = serializers.CharField(allow_null=True, required=False)
+473
View File
@@ -0,0 +1,473 @@
from __future__ import annotations
from datetime import datetime
from math import sqrt
from statistics import median
from typing import Any
from django.utils import timezone
from farm_data.context import load_farm_context
from farm_data.models import SensorData
from rag.services import get_soil_anomaly_insight
from .anomaly_detection import build_anomaly_detection_card
from .health_summary import build_soil_health_summary
QUALITY_REAL = "REAL"
QUALITY_INTERPOLATED = "INTERPOLATED"
QUALITY_MISSING = "MISSING"
QUALITY_EXTRAPOLATED = "EXTRAPOLATED"
IDW_POWER = 2
MAX_GRID_STEPS = 10
FRESHNESS_HALF_LIFE_HOURS = 24.0
MAX_SENSOR_INFLUENCE_DISTANCE = 0.08
def _safe_float(value: Any) -> float | None:
try:
if value in (None, ""):
return None
return float(value)
except (TypeError, ValueError):
return None
def _sensor_time_series(sensor: Any) -> list[dict[str, Any]]:
sensor_block = sensor.get_sensor_block() if hasattr(sensor, "get_sensor_block") else {}
soil_moisture = _safe_float(getattr(sensor, "soil_moisture", None))
measured_at = sensor_block.get("timestamp") or sensor_block.get("measured_at")
if measured_at is None and getattr(sensor, "updated_at", None):
measured_at = sensor.updated_at.isoformat()
return [
{
"timestamp": measured_at,
"value": soil_moisture,
"quality_flag": QUALITY_REAL if soil_moisture is not None else QUALITY_MISSING,
}
]
def _parse_timestamp(value: Any) -> datetime | None:
if isinstance(value, datetime):
return value
if not value:
return None
if isinstance(value, str):
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if timezone.is_naive(parsed):
return timezone.make_aware(parsed, timezone.get_current_timezone())
return parsed
return None
def _hours_since(timestamp: Any) -> float | None:
parsed = _parse_timestamp(timestamp)
if parsed is None:
return None
delta = timezone.now() - parsed
return max(delta.total_seconds() / 3600.0, 0.0)
def _freshness_weight(timestamp: Any) -> float:
age_hours = _hours_since(timestamp)
if age_hours is None:
return 0.65
return 1.0 / (1.0 + (age_hours / FRESHNESS_HALF_LIFE_HOURS))
def _sensor_anomaly_penalty(value: float | None, network_values: list[float]) -> float:
if value is None or len(network_values) < 3:
return 1.0
center = median(network_values)
deviations = [abs(item - center) for item in network_values]
typical_deviation = median(deviations) or 1.0
normalized_distance = abs(value - center) / typical_deviation
return max(0.45, min(1.0, 1.15 - (normalized_distance * 0.18)))
def _boundary_points(sensor: Any) -> list[tuple[float, float]]:
boundary = getattr(sensor.center_location, "farm_boundary", None) or {}
coordinates = []
if isinstance(boundary, dict) and boundary.get("type") == "Polygon":
coordinates = boundary.get("coordinates") or []
if coordinates and isinstance(coordinates[0], list):
return [(float(point[1]), float(point[0])) for point in coordinates[0] if len(point) >= 2]
corners = boundary.get("corners") if isinstance(boundary, dict) else boundary if isinstance(boundary, list) else []
points = []
for point in corners or []:
if isinstance(point, dict) and point.get("lat") is not None and point.get("lon") is not None:
points.append((float(point["lat"]), float(point["lon"])))
return points
def _point_in_polygon(lat: float, lon: float, polygon: list[tuple[float, float]]) -> bool:
if len(polygon) < 3:
return True
inside = False
for index in range(len(polygon)):
lat1, lon1 = polygon[index]
lat2, lon2 = polygon[(index + 1) % len(polygon)]
intersects = ((lon1 > lon) != (lon2 > lon)) and (
lat < ((lat2 - lat1) * (lon - lon1) / max(lon2 - lon1, 1e-12)) + lat1
)
if intersects:
inside = not inside
return inside
def _latest_sensor_measurement(sensor: Any, network_values: list[float]) -> dict[str, Any]:
series = _sensor_time_series(sensor)
latest = series[-1] if series else {"timestamp": None, "value": None, "quality_flag": QUALITY_MISSING}
reliability = _sensor_anomaly_penalty(latest["value"], network_values) * _freshness_weight(latest["timestamp"])
return {
"sensor_id": str(sensor.farm_uuid),
"latitude": float(sensor.center_location.latitude),
"longitude": float(sensor.center_location.longitude),
"depth": None,
"timestamp": latest["timestamp"],
"soil_moisture_value": latest["value"],
"quality_flag": latest["quality_flag"],
"freshness_weight": round(_freshness_weight(latest["timestamp"]), 4),
"reliability_score": round(reliability, 4),
}
def _spatial_weight(distance: float) -> float:
if distance == 0:
return 1.0
if distance > MAX_SENSOR_INFLUENCE_DISTANCE:
return 0.0
return 1 / (distance**IDW_POWER)
def _interpolate_cell(
lat: float,
lon: float,
sensor_points: list[dict[str, Any]],
) -> tuple[float | None, str, float]:
weighted_sum = 0.0
weight_total = 0.0
min_distance = None
for point in sensor_points:
value = point["soil_moisture_value"]
if value is None:
continue
distance = sqrt(((lat - point["latitude"]) ** 2) + ((lon - point["longitude"]) ** 2))
min_distance = distance if min_distance is None else min(min_distance, distance)
if distance == 0:
return round(float(value), 2), point["quality_flag"], 1.0
spatial_weight = _spatial_weight(distance)
if spatial_weight == 0.0:
continue
composite_weight = spatial_weight * float(point.get("reliability_score", 1.0))
weighted_sum += composite_weight * float(value)
weight_total += composite_weight
if weight_total == 0.0:
return None, QUALITY_MISSING, 0.0
uncertainty = 1.0 - min(weight_total / (weight_total + 6.0), 1.0)
quality_flag = QUALITY_INTERPOLATED
if min_distance is not None and min_distance > (MAX_SENSOR_INFLUENCE_DISTANCE / 2):
quality_flag = QUALITY_EXTRAPOLATED
return round(weighted_sum / weight_total, 2), quality_flag, round(max(0.0, min(1.0, uncertainty)), 4)
def _grid_axis(min_value: float, max_value: float) -> list[float]:
if min_value == max_value:
return [round(min_value, 6)]
step_count = min(MAX_GRID_STEPS, max(int((max_value - min_value) / 0.0001) + 1, 2))
step = (max_value - min_value) / (step_count - 1)
return [round(min_value + (step * index), 6) for index in range(step_count)]
def _load_sensor_network(current_sensor: Any) -> list[Any]:
plant_ids = list(current_sensor.plants.values_list("id", flat=True))
queryset = SensorData.objects.select_related("center_location").prefetch_related("plants", "center_location__depths")
if plant_ids:
queryset = queryset.filter(plants__id__in=plant_ids).distinct()
return list(queryset)
def _soil_profile(sensor: Any) -> list[dict[str, Any]]:
depths = sensor.center_location.depths.all()
return [
{
"depth_label": depth.depth_label,
"field_capacity": depth.wv0033,
"wilting_point": depth.wv1500,
"saturation": depth.wv0010,
"nitrogen": depth.nitrogen,
"ph": depth.phh2o,
"sand": depth.sand,
"silt": depth.silt,
"clay": depth.clay,
}
for depth in depths
]
def _depth_layers(soil_profile: list[dict[str, Any]], grid_cells: list[dict[str, Any]]) -> list[dict[str, Any]]:
layers = []
if not soil_profile or not grid_cells:
return layers
for index, depth in enumerate(soil_profile):
depth_factor = max(0.72, 1.0 - (index * 0.08))
layer_cells = []
for cell in grid_cells:
if cell["moisture_value"] is None:
moisture_value = None
else:
moisture_value = round(cell["moisture_value"] * depth_factor, 2)
layer_cells.append(
{
"lat": cell["lat"],
"lon": cell["lon"],
"moisture_value": moisture_value,
"quality_flag": cell["quality_flag"],
"uncertainty": cell.get("uncertainty"),
}
)
layers.append(
{
"depth_label": depth.get("depth_label"),
"estimated_from_surface": True,
"cells": layer_cells,
}
)
return layers
def _heatmap_summary(sensor_points: list[dict[str, Any]], grid_cells: list[dict[str, Any]]) -> dict[str, Any]:
sensor_values = [point["soil_moisture_value"] for point in sensor_points if point["soil_moisture_value"] is not None]
uncertainties = [cell["uncertainty"] for cell in grid_cells if cell.get("uncertainty") is not None]
return {
"sensor_count": len(sensor_points),
"active_sensor_count": len(sensor_values),
"interpolation_model": "boundary_aware_weighted_idw",
"uses_sensor_history": False,
"uses_freshness_weighting": True,
"uses_boundary_mask": True,
"uses_outlier_penalty": True,
"avg_sensor_moisture": round(sum(sensor_values) / len(sensor_values), 2) if sensor_values else None,
"avg_uncertainty": round(sum(uncertainties) / len(uncertainties), 4) if uncertainties else None,
}
class SoilMoistureHeatmapService:
def get_heatmap(self, *, farm_uuid: str) -> dict[str, Any]:
current_sensor = (
SensorData.objects.select_related("center_location")
.prefetch_related("plants", "center_location__depths")
.filter(farm_uuid=farm_uuid)
.first()
)
if current_sensor is None:
raise ValueError("Farm not found.")
sensors = _load_sensor_network(current_sensor)
raw_network_values = [
_safe_float(getattr(sensor, "soil_moisture", None))
for sensor in sensors
if _safe_float(getattr(sensor, "soil_moisture", None)) is not None
]
sensor_points = [_latest_sensor_measurement(sensor, raw_network_values) for sensor in sensors]
valid_sensor_points = [point for point in sensor_points if point["soil_moisture_value"] is not None]
soil_profile = _soil_profile(current_sensor)
farm_polygon = _boundary_points(current_sensor)
if not valid_sensor_points:
return {
"farm_uuid": str(current_sensor.farm_uuid),
"location": {
"lat": float(current_sensor.center_location.latitude),
"lon": float(current_sensor.center_location.longitude),
},
"current_sensor": {
"soil_moisture": current_sensor.soil_moisture,
"soil_temperature": current_sensor.soil_temperature,
"soil_ph": current_sensor.soil_ph,
"electrical_conductivity": current_sensor.electrical_conductivity,
},
"soil_profile": soil_profile,
"depth_layers": [],
"timestamp": current_sensor.updated_at.isoformat() if getattr(current_sensor, "updated_at", None) else None,
"grid_resolution": None,
"grid_cells": [],
"sensor_points": sensor_points,
"model_metadata": {
"interpolation_model": "boundary_aware_weighted_idw",
"uses_sensor_history": False,
"limitations": [
"history واقعی سنسورها در مدل حاضر در دسترس نیست",
"depth layers از surface estimate مشتق می‌شوند",
],
},
"summary": _heatmap_summary(sensor_points, []),
"quality_legend": {
QUALITY_REAL: "اندازه گیری واقعی سنسور",
QUALITY_INTERPOLATED: "مقدار برآوردشده با وزن دهی زمانی/فضایی",
QUALITY_MISSING: "داده معتبر در دسترس نیست",
QUALITY_EXTRAPOLATED: "برآورد در ناحیه دور از سنسورها",
},
}
if farm_polygon:
min_lat = min(point[0] for point in farm_polygon)
max_lat = max(point[0] for point in farm_polygon)
min_lon = min(point[1] for point in farm_polygon)
max_lon = max(point[1] for point in farm_polygon)
else:
min_lat = min(point["latitude"] for point in valid_sensor_points)
max_lat = max(point["latitude"] for point in valid_sensor_points)
min_lon = min(point["longitude"] for point in valid_sensor_points)
max_lon = max(point["longitude"] for point in valid_sensor_points)
lat_axis = _grid_axis(min_lat, max_lat)
lon_axis = _grid_axis(min_lon, max_lon)
grid_cells = []
for lat in lat_axis:
for lon in lon_axis:
if farm_polygon and not _point_in_polygon(lat, lon, farm_polygon):
grid_cells.append(
{
"lat": lat,
"lon": lon,
"moisture_value": None,
"quality_flag": QUALITY_MISSING,
"uncertainty": None,
"inside_farm_boundary": False,
}
)
continue
direct_sensor = next(
(
point
for point in valid_sensor_points
if point["latitude"] == lat and point["longitude"] == lon
),
None,
)
if direct_sensor is not None:
moisture_value = direct_sensor["soil_moisture_value"]
quality_flag = direct_sensor["quality_flag"]
uncertainty = round(1.0 - float(direct_sensor.get("reliability_score", 1.0)), 4)
else:
moisture_value, quality_flag, uncertainty = _interpolate_cell(lat, lon, valid_sensor_points)
grid_cells.append(
{
"lat": lat,
"lon": lon,
"moisture_value": moisture_value,
"quality_flag": quality_flag,
"uncertainty": uncertainty if moisture_value is not None else None,
"inside_farm_boundary": True,
}
)
lat_step = round(abs(lat_axis[1] - lat_axis[0]), 6) if len(lat_axis) > 1 else 0.0
lon_step = round(abs(lon_axis[1] - lon_axis[0]), 6) if len(lon_axis) > 1 else 0.0
timestamps = [point["timestamp"] for point in sensor_points if point["timestamp"]]
depth_layers = _depth_layers(soil_profile, [cell for cell in grid_cells if cell["inside_farm_boundary"]])
return {
"farm_uuid": str(current_sensor.farm_uuid),
"location": {
"lat": float(current_sensor.center_location.latitude),
"lon": float(current_sensor.center_location.longitude),
},
"current_sensor": {
"soil_moisture": current_sensor.soil_moisture,
"soil_temperature": current_sensor.soil_temperature,
"soil_ph": current_sensor.soil_ph,
"electrical_conductivity": current_sensor.electrical_conductivity,
"nitrogen": current_sensor.nitrogen,
"phosphorus": current_sensor.phosphorus,
"potassium": current_sensor.potassium,
},
"soil_profile": soil_profile,
"depth_layers": depth_layers,
"timestamp": max(timestamps) if timestamps else None,
"grid_resolution": {
"lat_step": lat_step,
"lon_step": lon_step,
"rows": len(lat_axis),
"cols": len(lon_axis),
},
"grid_cells": grid_cells,
"sensor_points": sensor_points,
"model_metadata": {
"interpolation_model": "boundary_aware_weighted_idw",
"uses_sensor_history": False,
"uses_freshness_weighting": True,
"uses_outlier_penalty": True,
"uses_depth_estimation": True,
"uses_boundary_mask": bool(farm_polygon),
"limitations": [
"history واقعی سنسورها در مدل حاضر ذخیره نشده است",
"depth layers از داده سطحی و پروفایل خاک مشتق شده‌اند",
"uncertainty به صورت heuristic برآورد می‌شود",
],
},
"summary": _heatmap_summary(sensor_points, [cell for cell in grid_cells if cell["inside_farm_boundary"]]),
"quality_legend": {
QUALITY_REAL: "اندازه گیری واقعی سنسور",
QUALITY_INTERPOLATED: "مقدار برآوردشده با وزن دهی زمانی/فضایی",
QUALITY_MISSING: "داده معتبر در دسترس نیست",
QUALITY_EXTRAPOLATED: "برآورد در ناحیه دور از سنسورها",
},
}
class SoilHealthService:
def get_health_summary(self, *, farm_uuid: str) -> dict[str, Any]:
sensor = (
SensorData.objects.select_related("center_location")
.prefetch_related("plants")
.filter(farm_uuid=farm_uuid)
.first()
)
if sensor is None:
raise ValueError("Farm not found.")
return {
"farm_uuid": str(sensor.farm_uuid),
**build_soil_health_summary(sensor, list(sensor.plants.all())),
}
class SoilAnomalyDetectionService:
def get_anomaly_detection(self, *, farm_uuid: str) -> dict[str, Any]:
context = load_farm_context(farm_uuid)
if context is None:
raise ValueError("Farm not found.")
anomaly_payload = build_anomaly_detection_card(
sensor_id=farm_uuid,
context=context,
ai_bundle=None,
)
rag_payload = get_soil_anomaly_insight(
farm_uuid=farm_uuid,
anomaly_payload=anomaly_payload,
ai_bundle=None,
)
return {
"farm_uuid": farm_uuid,
**anomaly_payload,
**rag_payload,
}
+258
View File
@@ -0,0 +1,258 @@
from __future__ import annotations
from datetime import timedelta
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from django.test import TestCase, override_settings
from django.utils import timezone
from rest_framework.test import APIClient
from soile.services import SoilMoistureHeatmapService
@override_settings(ROOT_URLCONF="soile.urls")
class SoilMoistureHeatmapApiTests(TestCase):
def setUp(self):
self.client = APIClient()
@patch("soile.views.apps.get_app_config")
def test_heatmap_api_returns_payload(self, mock_get_app_config):
mock_service = SimpleNamespace(
get_heatmap=lambda **_kwargs: {
"farm_uuid": "550e8400-e29b-41d4-a716-446655440000",
"location": {"lat": 35.7, "lon": 51.4},
"current_sensor": {"soil_moisture": 22.5},
"soil_profile": [{"depth_label": "0-5cm", "field_capacity": 0.34}],
"timestamp": "2026-04-01T00:00:00",
"grid_resolution": {"lat_step": 0.001, "lon_step": 0.001, "rows": 2, "cols": 2},
"grid_cells": [{"lat": 35.7, "lon": 51.4, "moisture_value": 22.5, "quality_flag": "REAL"}],
"sensor_points": [{"sensor_id": "farm-1", "soil_moisture_value": 22.5}],
"quality_legend": {"REAL": "اندازه گیری واقعی سنسور"},
}
)
mock_get_app_config.return_value = SimpleNamespace(
get_soil_moisture_service=lambda: mock_service
)
response = self.client.post(
"/moisture-heatmap/",
data={"farm_uuid": "550e8400-e29b-41d4-a716-446655440000"},
format="json",
)
self.assertEqual(response.status_code, 200)
payload = response.json()["data"]
self.assertEqual(payload["farm_uuid"], "550e8400-e29b-41d4-a716-446655440000")
self.assertEqual(payload["current_sensor"]["soil_moisture"], 22.5)
@patch("soile.views.apps.get_app_config")
def test_heatmap_api_returns_404_for_missing_farm(self, mock_get_app_config):
mock_service = SimpleNamespace(
get_heatmap=lambda **_kwargs: (_ for _ in ()).throw(ValueError("Farm not found."))
)
mock_get_app_config.return_value = SimpleNamespace(
get_soil_moisture_service=lambda: mock_service
)
response = self.client.post(
"/moisture-heatmap/",
data={"farm_uuid": "550e8400-e29b-41d4-a716-446655440000"},
format="json",
)
self.assertEqual(response.status_code, 404)
self.assertEqual(response.json()["msg"], "Farm not found.")
@override_settings(ROOT_URLCONF="soile.urls")
class SoilAnomalyDetectionApiTests(TestCase):
def setUp(self):
self.client = APIClient()
@patch("soile.views.apps.get_app_config")
def test_anomaly_api_returns_payload(self, mock_get_app_config):
mock_service = SimpleNamespace(
get_anomaly_detection=lambda **_kwargs: {
"farm_uuid": "550e8400-e29b-41d4-a716-446655440000",
"generated_at": "2026-04-01T00:00:00",
"anomalies": [
{
"metric_type": "soil_moisture",
"label": "رطوبت خاک",
"severity": "high",
"observed_value": 21.4,
}
],
"interpretation": {
"summary": "ناهنجاري در رطوبت خاک شناسايي شد.",
"explanation": "رطوبت خاک از الگوي معمول فاصله گرفته است.",
"likely_cause": "احتمال اختلال در آبياري يا افزايش تبخير.",
"recommended_action": "آبياري و قرائت سنسور بازبيني شود.",
"monitoring_priority": "urgent",
"confidence": 0.84,
},
"knowledge_base": "soil_anomaly",
"tone_file": "config/tones/soil_anomaly_tone.txt",
"raw_response": "{\"summary\":\"ok\"}",
}
)
mock_get_app_config.return_value = SimpleNamespace(
get_soil_anomaly_service=lambda: mock_service
)
response = self.client.post(
"/anomaly-detection/",
data={"farm_uuid": "550e8400-e29b-41d4-a716-446655440000"},
format="json",
)
self.assertEqual(response.status_code, 200)
payload = response.json()["data"]
self.assertEqual(payload["farm_uuid"], "550e8400-e29b-41d4-a716-446655440000")
self.assertEqual(payload["knowledge_base"], "soil_anomaly")
self.assertEqual(payload["interpretation"]["monitoring_priority"], "urgent")
@patch("soile.views.apps.get_app_config")
def test_anomaly_api_returns_404_for_missing_farm(self, mock_get_app_config):
mock_service = SimpleNamespace(
get_anomaly_detection=lambda **_kwargs: (_ for _ in ()).throw(ValueError("Farm not found."))
)
mock_get_app_config.return_value = SimpleNamespace(
get_soil_anomaly_service=lambda: mock_service
)
response = self.client.post(
"/anomaly-detection/",
data={"farm_uuid": "550e8400-e29b-41d4-a716-446655440000"},
format="json",
)
self.assertEqual(response.status_code, 404)
self.assertEqual(response.json()["msg"], "Farm not found.")
class SoilMoistureHeatmapServiceTests(TestCase):
@patch("soile.services.SensorData.objects")
def test_heatmap_service_builds_boundary_aware_weighted_output(self, mock_objects):
now = timezone.now()
boundary = {
"type": "Polygon",
"coordinates": [
[
[51.39, 35.70],
[51.41, 35.70],
[51.41, 35.72],
[51.39, 35.72],
[51.39, 35.70],
]
],
}
depth = SimpleNamespace(
depth_label="0-5cm",
wv0033=0.34,
wv1500=0.14,
wv0010=0.40,
nitrogen=12.0,
phh2o=7.1,
sand=40.0,
silt=35.0,
clay=25.0,
)
plants = SimpleNamespace(values_list=lambda *args, **kwargs: [1])
center_a = SimpleNamespace(latitude=35.70, longitude=51.39, farm_boundary=boundary, depths=SimpleNamespace(all=lambda: [depth]))
center_b = SimpleNamespace(latitude=35.72, longitude=51.41, farm_boundary=boundary, depths=SimpleNamespace(all=lambda: [depth]))
sensor_a = SimpleNamespace(
farm_uuid="farm-a",
center_location=center_a,
plants=plants,
sensor_payload={"sensor-1": {"soil_moisture": 20.0, "timestamp": (now - timedelta(hours=2)).isoformat()}},
updated_at=now - timedelta(hours=2),
get_sensor_block=lambda: {"soil_moisture": 20.0, "timestamp": (now - timedelta(hours=2)).isoformat()},
soil_moisture=20.0,
soil_temperature=18.0,
soil_ph=7.0,
electrical_conductivity=1.2,
nitrogen=10.0,
phosphorus=8.0,
potassium=12.0,
)
sensor_b = SimpleNamespace(
farm_uuid="farm-b",
center_location=center_b,
plants=plants,
sensor_payload={"sensor-1": {"soil_moisture": 36.0, "timestamp": (now - timedelta(hours=30)).isoformat()}},
updated_at=now - timedelta(hours=30),
get_sensor_block=lambda: {"soil_moisture": 36.0, "timestamp": (now - timedelta(hours=30)).isoformat()},
soil_moisture=36.0,
soil_temperature=19.0,
soil_ph=7.2,
electrical_conductivity=1.3,
nitrogen=11.0,
phosphorus=8.5,
potassium=12.5,
)
current_first = MagicMock()
current_first.first.return_value = sensor_a
current_filter = MagicMock()
current_filter.filter.return_value = current_first
current_qs = MagicMock()
current_qs.prefetch_related.return_value = current_filter
network_distinct = MagicMock()
network_distinct.distinct.return_value = [sensor_a, sensor_b]
network_filter = MagicMock()
network_filter.filter.return_value = network_distinct
network_qs = MagicMock()
network_qs.prefetch_related.return_value = network_filter
mock_objects.select_related.side_effect = [current_qs, network_qs]
payload = SoilMoistureHeatmapService().get_heatmap(farm_uuid="farm-a")
self.assertEqual(payload["model_metadata"]["interpolation_model"], "boundary_aware_weighted_idw")
self.assertTrue(payload["model_metadata"]["uses_freshness_weighting"])
self.assertTrue(payload["model_metadata"]["uses_boundary_mask"])
self.assertEqual(payload["summary"]["active_sensor_count"], 2)
self.assertEqual(payload["depth_layers"][0]["depth_label"], "0-5cm")
self.assertGreater(payload["sensor_points"][0]["reliability_score"], payload["sensor_points"][1]["reliability_score"])
outside_cells = [cell for cell in payload["grid_cells"] if not cell["inside_farm_boundary"]]
self.assertTrue(outside_cells)
self.assertTrue(all(cell["moisture_value"] is None for cell in outside_cells))
self.assertIn("uncertainty", payload["grid_cells"][0])
@override_settings(ROOT_URLCONF="soile.urls")
class SoilHealthSummaryApiTests(TestCase):
def setUp(self):
self.client = APIClient()
@patch("soile.views.apps.get_app_config")
def test_health_summary_api_returns_payload(self, mock_get_app_config):
mock_service = SimpleNamespace(
get_health_summary=lambda **_kwargs: {
"farm_uuid": "550e8400-e29b-41d4-a716-446655440000",
"healthScore": 82,
"profileSource": "گوجه فرنگی",
"healthScoreDetails": {"components": []},
"healthLanguage": {"short_chip_text": "پایدار"},
"avgSoilMoisture": 46,
"avgSoilMoistureRaw": 45.8,
"avgSoilMoistureStatus": "بهینه",
}
)
mock_get_app_config.return_value = SimpleNamespace(
get_soil_health_service=lambda: mock_service
)
response = self.client.post(
"/health-summary/",
data={"farm_uuid": "550e8400-e29b-41d4-a716-446655440000"},
format="json",
)
self.assertEqual(response.status_code, 200)
payload = response.json()["data"]
self.assertEqual(payload["healthScore"], 82)
self.assertEqual(payload["avgSoilMoistureStatus"], "بهینه")
+10
View File
@@ -0,0 +1,10 @@
from django.urls import path
from .views import SoilAnomalyDetectionView, SoilHealthSummaryView, SoilMoistureHeatmapView
urlpatterns = [
path("anomaly-detection/", SoilAnomalyDetectionView.as_view(), name="soil-anomaly-detection"),
path("health-summary/", SoilHealthSummaryView.as_view(), name="soil-health-summary"),
path("moisture-heatmap/", SoilMoistureHeatmapView.as_view(), name="soil-moisture-heatmap"),
]
+191
View File
@@ -0,0 +1,191 @@
from django.apps import apps
from drf_spectacular.utils import OpenApiExample, extend_schema
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from config.openapi import build_envelope_serializer, build_response
from .serializers import (
SoilAnomalyDetectionRequestSerializer,
SoilAnomalyDetectionResponseSerializer,
SoilHealthSummaryRequestSerializer,
SoilHealthSummaryResponseSerializer,
SoilMoistureHeatmapRequestSerializer,
SoilMoistureHeatmapResponseSerializer,
)
SoileHeatmapEnvelopeSerializer = build_envelope_serializer(
"SoileHeatmapEnvelopeSerializer",
SoilMoistureHeatmapResponseSerializer,
)
SoileErrorSerializer = build_envelope_serializer(
"SoileErrorSerializer",
data_required=False,
allow_null=True,
)
SoileAnomalyEnvelopeSerializer = build_envelope_serializer(
"SoileAnomalyEnvelopeSerializer",
SoilAnomalyDetectionResponseSerializer,
)
SoileHealthEnvelopeSerializer = build_envelope_serializer(
"SoileHealthEnvelopeSerializer",
SoilHealthSummaryResponseSerializer,
)
class SoilMoistureHeatmapView(APIView):
@extend_schema(
tags=["Soile"],
summary="دریافت heatmap رطوبت خاک مزرعه",
description=(
"با دریافت farm_uuid، heatmap رطوبت خاک را با وزن دهی زمانی/فضایی، "
"mask مرز مزرعه و برآورد عدم قطعیت از app مستقل soile برمی گرداند."
),
request=SoilMoistureHeatmapRequestSerializer,
responses={
200: build_response(
SoileHeatmapEnvelopeSerializer,
"داده heatmap رطوبت خاک مزرعه با موفقیت بازگردانده شد.",
),
400: build_response(
SoileErrorSerializer,
"داده ورودی نامعتبر است.",
),
404: build_response(
SoileErrorSerializer,
"مزرعه یافت نشد.",
),
},
examples=[
OpenApiExample(
"نمونه درخواست soile",
value={"farm_uuid": "11111111-1111-1111-1111-111111111111"},
request_only=True,
)
],
)
def post(self, request):
serializer = SoilMoistureHeatmapRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"code": 400, "msg": "داده نامعتبر.", "data": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
service = apps.get_app_config("soile").get_soil_moisture_service()
try:
data = service.get_heatmap(farm_uuid=str(serializer.validated_data["farm_uuid"]))
except ValueError as exc:
return Response(
{"code": 404, "msg": str(exc), "data": None},
status=status.HTTP_404_NOT_FOUND,
)
return Response(
{"code": 200, "msg": "success", "data": data},
status=status.HTTP_200_OK,
)
class SoilHealthSummaryView(APIView):
@extend_schema(
tags=["Soile"],
summary="خلاصه سلامت و رطوبت خاک مزرعه",
description="با دریافت farm_uuid، امتیاز سلامت خاک/سنسور و میانگین رطوبت فعلی خاک را برمی گرداند.",
request=SoilHealthSummaryRequestSerializer,
responses={
200: build_response(SoileHealthEnvelopeSerializer, "خلاصه سلامت خاک با موفقیت بازگردانده شد."),
400: build_response(SoileErrorSerializer, "داده ورودی نامعتبر است."),
404: build_response(SoileErrorSerializer, "مزرعه یافت نشد."),
},
examples=[
OpenApiExample(
"نمونه درخواست soil health",
value={"farm_uuid": "11111111-1111-1111-1111-111111111111"},
request_only=True,
)
],
)
def post(self, request):
serializer = SoilHealthSummaryRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"code": 400, "msg": "داده نامعتبر.", "data": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
service = apps.get_app_config("soile").get_soil_health_service()
try:
data = service.get_health_summary(farm_uuid=str(serializer.validated_data["farm_uuid"]))
except ValueError as exc:
return Response(
{"code": 404, "msg": str(exc), "data": None},
status=status.HTTP_404_NOT_FOUND,
)
return Response({"code": 200, "msg": "success", "data": data}, status=status.HTTP_200_OK)
class SoilAnomalyDetectionView(APIView):
@extend_schema(
tags=["Soile"],
summary="تحلیل ناهنجاری خاک با کمک RAG",
description="با دریافت farm_uuid، ناهنجاری های آماری داده های خاک را استخراج می کند و تفسیر تخصصی آن را با پایگاه دانش و tone مستقل برمی گرداند.",
request=SoilAnomalyDetectionRequestSerializer,
responses={
200: build_response(
SoileAnomalyEnvelopeSerializer,
"خروجی تحلیل ناهنجاری خاک با موفقیت بازگردانده شد.",
),
400: build_response(
SoileErrorSerializer,
"داده ورودی نامعتبر است.",
),
404: build_response(
SoileErrorSerializer,
"مزرعه یافت نشد.",
),
500: build_response(
SoileErrorSerializer,
"خطا در تحلیل ناهنجاری خاک.",
),
},
examples=[
OpenApiExample(
"نمونه درخواست anomaly",
value={"farm_uuid": "11111111-1111-1111-1111-111111111111"},
request_only=True,
)
],
)
def post(self, request):
serializer = SoilAnomalyDetectionRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"code": 400, "msg": "داده نامعتبر.", "data": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
service = apps.get_app_config("soile").get_soil_anomaly_service()
try:
data = service.get_anomaly_detection(
farm_uuid=str(serializer.validated_data["farm_uuid"])
)
except ValueError as exc:
return Response(
{"code": 404, "msg": str(exc), "data": None},
status=status.HTTP_404_NOT_FOUND,
)
except Exception as exc:
return Response(
{"code": 500, "msg": f"خطا در تحلیل ناهنجاری خاک: {exc}", "data": None},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
return Response(
{"code": 200, "msg": "success", "data": data},
status=status.HTTP_200_OK,
)