This commit is contained in:
2026-04-29 03:47:48 +03:30
parent 27784ee8b9
commit 8139a49756
11 changed files with 533 additions and 197 deletions
+4
View File
@@ -28,6 +28,10 @@ CROP_ZONE_CHUNK_AREA_SQM=10000
CELERY_BROKER_URL=redis://redis:6379/0 CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/0 CELERY_RESULT_BACKEND=redis://redis:6379/0
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP=true CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP=true
FARM_ALERTS_AI_SYNC_CRON_MINUTE=0
FARM_ALERTS_AI_SYNC_CRON_HOUR=*
QDRANT_HOST=qdrant QDRANT_HOST=qdrant
QDRANT_PORT=6333 QDRANT_PORT=6333
Binary file not shown.
+24
View File
@@ -3,6 +3,7 @@ from datetime import timedelta
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
from celery.schedules import crontab
load_dotenv() load_dotenv()
@@ -227,6 +228,18 @@ CELERY_WORKER_PREFETCH_MULTIPLIER = int(os.getenv("CELERY_WORKER_PREFETCH_MULTIP
CELERY_TASK_TIME_LIMIT = int(os.getenv("CELERY_TASK_TIME_LIMIT", "120")) CELERY_TASK_TIME_LIMIT = int(os.getenv("CELERY_TASK_TIME_LIMIT", "120"))
CELERY_TASK_SOFT_TIME_LIMIT = int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "90")) CELERY_TASK_SOFT_TIME_LIMIT = int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "90"))
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = os.getenv("CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP", "true").lower() == "true" CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = os.getenv("CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP", "true").lower() == "true"
FARM_ALERTS_AI_SYNC_CRON_MINUTE = os.getenv("FARM_ALERTS_AI_SYNC_CRON_MINUTE", "0")
FARM_ALERTS_AI_SYNC_CRON_HOUR = os.getenv("FARM_ALERTS_AI_SYNC_CRON_HOUR", "*")
CELERY_BEAT_SCHEDULE = {
"sync-farm-alert-trackers": {
"task": "farm_alerts.tasks.sync_farm_alert_trackers",
"schedule": crontab(
minute=FARM_ALERTS_AI_SYNC_CRON_MINUTE,
hour=FARM_ALERTS_AI_SYNC_CRON_HOUR,
),
}
}
LOGGING = { LOGGING = {
"version": 1, "version": 1,
@@ -243,6 +256,12 @@ LOGGING = {
"filename": LOG_DIR / "farm_ai_assistant.log", "filename": LOG_DIR / "farm_ai_assistant.log",
"formatter": "standard", "formatter": "standard",
}, },
"farm_alerts_file": {
"level": "INFO",
"class": "logging.FileHandler",
"filename": LOG_DIR / "farm_alerts.log",
"formatter": "standard",
},
"external_api_adapter_file": { "external_api_adapter_file": {
"level": "WARNING", "level": "WARNING",
"class": "logging.FileHandler", "class": "logging.FileHandler",
@@ -256,6 +275,11 @@ LOGGING = {
"level": "WARNING", "level": "WARNING",
"propagate": False, "propagate": False,
}, },
"farm_alerts": {
"handlers": ["farm_alerts_file"],
"level": "INFO",
"propagate": False,
},
"external_api_adapter": { "external_api_adapter": {
"handlers": ["external_api_adapter_file"], "handlers": ["external_api_adapter_file"],
"level": "WARNING", "level": "WARNING",
+28
View File
@@ -102,6 +102,34 @@ services:
networks: networks:
- crop_network - crop_network
celery-beat:
container_name: backend-celery-beat
build: .
command: ["celery", "-A", "config", "beat", "-l", "info"]
env_file:
- .env
environment:
DOCKER_VERSION: ${DOCKER_VERSION:-production}
ALLOWED_HOSTS: ${ALLOWED_HOSTS:-localhost,127.0.0.1,0.0.0.0,web,backend-web}
AI_SERVICE_BASE_URL: ${AI_SERVICE_BASE_URL:-http://ai-web:8000}
AI_SERVICE_HOST_HEADER: ${AI_SERVICE_HOST_HEADER:-localhost}
DB_HOST: croplogic-db
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://backend-redis:6379/0}
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://backend-redis:6379/0}
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP: "true"
SKIP_MIGRATE: "1"
ACCESS_CONTROL_AUTHZ_BASE_URL: http://croplogic-accsess-opa:8181
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
accsess:
condition: service_started
restart: unless-stopped
networks:
- crop_network
networks: networks:
crop_network: crop_network:
external: true external: true
+34
View File
@@ -121,6 +121,40 @@ services:
networks: networks:
- crop_network - crop_network
celery-beat:
build:
context: .
args:
APT_MIRROR: mirror2.chabokan.net
PIP_INDEX_URL: https://package-mirror.liara.ir/repository/pypi/simple
PIP_EXTRA_INDEX_URL: https://mirror2.chabokan.net/pypi/simple
PYTHON_MIRROR: mirror2.chabokan.net
container_name: backend-celery-beat
command: ["celery", "-A", "config", "beat", "-l", "info"]
volumes:
- .:/app
- ./logs:/app/logs
env_file:
- .env
environment:
DOCKER_VERSION: ${DOCKER_VERSION:-develop}
ALLOWED_HOSTS: ${ALLOWED_HOSTS:-localhost,127.0.0.1,0.0.0.0,web,backend-web}
AI_SERVICE_BASE_URL: ${AI_SERVICE_BASE_URL:-http://ai-web:8000}
DB_HOST: croplogic-db
CELERY_BROKER_URL: redis://backend-redis:6379/0
CELERY_RESULT_BACKEND: redis://backend-redis:6379/0
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP: "true"
SKIP_MIGRATE: "1"
ACCESS_CONTROL_AUTHZ_BASE_URL: http://croplogic-accsess-opa:8181
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
restart: unless-stopped
networks:
- crop_network
volumes: volumes:
backend_mysql_data: backend_mysql_data:
backend_redis_data: backend_redis_data:
@@ -0,0 +1,48 @@
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("farm_hub", "0001_initial"),
("farm_alerts", "0003_farmalert_tracker_fields"),
]
operations = [
migrations.CreateModel(
name="FarmAlertTrackerSnapshot",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("service_id", models.CharField(default="farm_alerts", max_length=64)),
("tracker", models.JSONField(blank=True, default=dict)),
("headline", models.CharField(blank=True, default="", max_length=255)),
("overview", models.TextField(blank=True, default="")),
(
"status_level",
models.CharField(
choices=[("info", "Info"), ("warning", "Warning"), ("error", "Error"), ("success", "Success")],
default="info",
max_length=32,
),
),
("raw_llm_response", models.TextField(blank=True, default="")),
("structured_context", models.JSONField(blank=True, default=dict)),
("last_ai_synced_at", models.DateTimeField(blank=True, null=True)),
("last_source_update_at", models.DateTimeField(blank=True, null=True)),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
(
"farm",
models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
related_name="alert_tracker_snapshot",
to="farm_hub.farmhub",
),
),
],
options={
"db_table": "farm_alert_tracker_snapshots",
},
),
]
+25
View File
@@ -71,3 +71,28 @@ class Recommendation(models.Model):
def __str__(self): def __str__(self):
return self.title return self.title
class FarmAlertTrackerSnapshot(models.Model):
farm = models.OneToOneField(
FarmHub,
on_delete=models.CASCADE,
related_name="alert_tracker_snapshot",
)
service_id = models.CharField(max_length=64, default="farm_alerts")
tracker = models.JSONField(default=dict, blank=True)
headline = models.CharField(max_length=255, blank=True, default="")
overview = models.TextField(blank=True, default="")
status_level = models.CharField(max_length=32, default="info", choices=SEVERITY_CHOICES)
raw_llm_response = models.TextField(blank=True, default="")
structured_context = models.JSONField(default=dict, blank=True)
last_ai_synced_at = models.DateTimeField(null=True, blank=True)
last_source_update_at = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = "farm_alert_tracker_snapshots"
def __str__(self):
return f"Tracker snapshot for {self.farm_id}"
+208 -28
View File
@@ -1,7 +1,11 @@
from collections import Counter from collections import Counter
from copy import deepcopy from copy import deepcopy
import json import json
import logging
from django.utils import timezone
from external_api_adapter import request as external_api_request
from farm_hub.models import FarmHub from farm_hub.models import FarmHub
from notifications.models import FarmNotification from notifications.models import FarmNotification
from notifications.services import create_notification_for_farm_uuid, get_recent_notifications_for_farm from notifications.services import create_notification_for_farm_uuid, get_recent_notifications_for_farm
@@ -12,7 +16,7 @@ from .mock_data import (
FARM_ALERTS_TIMELINE, FARM_ALERTS_TIMELINE,
RECOMMENDATIONS_LIST, RECOMMENDATIONS_LIST,
) )
from .models import AnomalyDetection, FarmAlert, Recommendation from .models import AnomalyDetection, FarmAlert, FarmAlertTrackerSnapshot, Recommendation
LEVEL_ALIAS_MAP = { LEVEL_ALIAS_MAP = {
@@ -21,6 +25,9 @@ LEVEL_ALIAS_MAP = {
"warn": "warning", "warn": "warning",
} }
TRACKER_AI_NOTIFICATION_SOURCE = "farm_alerts_tracker_ai"
logger = logging.getLogger("farm_alerts")
class AlertService: class AlertService:
@staticmethod @staticmethod
@@ -100,8 +107,9 @@ class AlertService:
) )
def serialize_notifications_for_ai(*, farm, since_days=3, limit=10): def serialize_notifications_for_ai(*, farm, since_days=3, limit=5):
notifications = get_recent_notifications_for_farm(farm=farm, since_days=since_days, limit=limit) notifications = get_recent_notifications_for_farm(farm=farm, since_days=since_days, limit=limit)
notifications = [item for item in notifications if item.metadata.get("source") != TRACKER_AI_NOTIFICATION_SOURCE]
return [ return [
{ {
"id": notification.id, "id": notification.id,
@@ -152,55 +160,227 @@ def save_tracker_notifications(*, farm_uuid, notifications):
source_alert_id=source_alert_id, source_alert_id=source_alert_id,
source_metric_type=notification_data.get("source_metric_type", ""), source_metric_type=notification_data.get("source_metric_type", ""),
payload=notification_data.get("payload") or {}, payload=notification_data.get("payload") or {},
metadata={"source": "farm_alerts_tracker_ai"}, metadata={"source": TRACKER_AI_NOTIFICATION_SOURCE},
) )
) )
return saved_notifications return saved_notifications
def build_tracker_context(*, farm, alerts): def build_tracker_context(*, farm):
recent_notifications = serialize_notifications_for_ai(farm=farm, since_days=3, limit=10) recent_notifications = serialize_notifications_for_ai(farm=farm, since_days=3, limit=5)
payload = {"farm_uuid": str(farm.farm_uuid)}
if recent_notifications:
counts = Counter( counts = Counter(
AlertService.normalize_level(alert.get("level")) AlertService.normalize_level(notification.get("level"))
for alert in alerts for notification in recent_notifications
if alert.get("level") if notification.get("level")
) )
structured_context = { payload["recent_notifications"] = recent_notifications
payload["structured_context"] = {
"farm_uuid": str(farm.farm_uuid), "farm_uuid": str(farm.farm_uuid),
"alerts_count": len(alerts), "notifications_count": len(recent_notifications),
"recent_notifications_count": len(recent_notifications), "recent_notifications_count": len(recent_notifications),
"recent_notifications_window_days": 3, "recent_notifications_window_days": 3,
"recent_notifications_limit": 10, "recent_notifications_limit": 5,
"alert_levels": dict(counts), "notification_levels": dict(counts),
}
return {
"farm_uuid": str(farm.farm_uuid),
"alerts": alerts,
"recent_notifications": recent_notifications,
"structured_context": structured_context,
} }
return payload
def build_tracker_response(*, farm, adapter_payload):
def serialize_alerts_for_ai(*, farm, since=None, limit=50):
queryset = FarmAlert.objects.filter(farm=farm).order_by("-created_at", "-id")
if since is not None:
queryset = queryset.filter(created_at__gt=since)
alerts = queryset[:limit]
return [
{
"alert_id": alert.external_alert_id,
"level": alert.color,
"title": alert.title,
"message": alert.description,
"suggested_action": alert.suggested_action,
"source_metric_type": alert.source_metric_type,
"timestamp": alert.occurred_at.isoformat() if alert.occurred_at else None,
"payload": alert.payload,
}
for alert in alerts
]
def get_tracker_notifications(*, farm, limit=10):
return list(
FarmNotification.objects.filter(farm=farm, endpoint="tracker")
.order_by("-created_at", "-id")[:limit]
)
def get_tracker_source_updated_at(*, farm):
latest_alert = FarmAlert.objects.filter(farm=farm).order_by("-created_at", "-id").values_list("created_at", flat=True).first()
latest_notification = (
FarmNotification.objects.filter(farm=farm)
.exclude(metadata__source=TRACKER_AI_NOTIFICATION_SOURCE)
.order_by("-updated_at", "-id")
.values_list("updated_at", flat=True)
.first()
)
candidates = [item for item in (latest_alert, latest_notification) if item is not None]
if not candidates:
return None
return max(candidates)
def get_or_create_tracker_snapshot(*, farm):
snapshot, _ = FarmAlertTrackerSnapshot.objects.get_or_create(farm=farm)
return snapshot
def update_tracker_snapshot(*, farm, adapter_payload, source_updated_at):
snapshot = get_or_create_tracker_snapshot(farm=farm)
notifications_payload = adapter_payload.get("notifications") or [] notifications_payload = adapter_payload.get("notifications") or []
saved_notifications = save_tracker_notifications(farm_uuid=farm.farm_uuid, notifications=notifications_payload) save_tracker_notifications(farm_uuid=farm.farm_uuid, notifications=notifications_payload)
raw_llm_response = adapter_payload.get("raw_llm_response", "") raw_llm_response = adapter_payload.get("raw_llm_response", "")
if not raw_llm_response: if not raw_llm_response:
raw_llm_response = json.dumps(adapter_payload, ensure_ascii=False) raw_llm_response = json.dumps(adapter_payload, ensure_ascii=False)
snapshot.service_id = adapter_payload.get("service_id", "farm_alerts")
snapshot.tracker = adapter_payload.get("tracker") or {}
snapshot.headline = adapter_payload.get("headline", "")
snapshot.overview = adapter_payload.get("overview", "")
snapshot.status_level = AlertService.normalize_level(adapter_payload.get("status_level"))
snapshot.raw_llm_response = raw_llm_response
snapshot.structured_context = adapter_payload.get("structured_context") or {}
snapshot.last_ai_synced_at = timezone.now()
snapshot.last_source_update_at = source_updated_at
snapshot.save(
update_fields=[
"service_id",
"tracker",
"headline",
"overview",
"status_level",
"raw_llm_response",
"structured_context",
"last_ai_synced_at",
"last_source_update_at",
"updated_at",
]
)
return snapshot
def build_tracker_response_from_snapshot(*, farm):
snapshot = FarmAlertTrackerSnapshot.objects.filter(farm=farm).first()
notifications = get_tracker_notifications(farm=farm, limit=10)
if snapshot is None:
return { return {
"farm_uuid": str(farm.farm_uuid), "farm_uuid": str(farm.farm_uuid),
"service_id": adapter_payload.get("service_id", "farm_alerts"), "service_id": "farm_alerts",
"tracker": adapter_payload.get("tracker") or {}, "tracker": {},
"headline": adapter_payload.get("headline", ""), "headline": "",
"overview": adapter_payload.get("overview", ""), "overview": "",
"status_level": AlertService.normalize_level(adapter_payload.get("status_level")), "status_level": "info",
"notifications": saved_notifications, "notifications": notifications,
"raw_llm_response": raw_llm_response, "raw_llm_response": "",
"structured_context": adapter_payload.get("structured_context") or {}, "structured_context": {},
}
return {
"farm_uuid": str(farm.farm_uuid),
"service_id": snapshot.service_id,
"tracker": snapshot.tracker or {},
"headline": snapshot.headline,
"overview": snapshot.overview,
"status_level": AlertService.normalize_level(snapshot.status_level),
"notifications": notifications,
"raw_llm_response": snapshot.raw_llm_response,
"structured_context": snapshot.structured_context or {},
} }
def sync_farm_tracker_with_ai(*, farm):
snapshot = FarmAlertTrackerSnapshot.objects.filter(farm=farm).first()
source_updated_at = get_tracker_source_updated_at(farm=farm)
if source_updated_at is None:
logger.info(
"farm=%s tracker sync proceeding without source data snapshot_exists=%s",
farm.farm_uuid,
snapshot is not None,
)
if (
source_updated_at is not None
and snapshot is not None
and snapshot.last_source_update_at is not None
and source_updated_at <= snapshot.last_source_update_at
):
logger.info(
"farm=%s tracker sync skipped: no changes source_updated_at=%s last_source_update_at=%s",
farm.farm_uuid,
source_updated_at,
snapshot.last_source_update_at,
)
return {"farm_uuid": str(farm.farm_uuid), "status": "skipped", "reason": "no_changes"}
tracker_payload = build_tracker_context(farm=farm)
logger.info(
"farm=%s tracker sync sending AI request recent_notifications=%s payload=%s",
farm.farm_uuid,
len(tracker_payload.get("recent_notifications", [])),
tracker_payload,
)
adapter_response = external_api_request(
"ai",
"/api/farm-alerts/tracker/",
method="POST",
payload=tracker_payload,
)
if adapter_response.status_code >= 400:
logger.warning(
"farm=%s tracker sync failed status_code=%s response=%s",
farm.farm_uuid,
adapter_response.status_code,
adapter_response.data,
)
raise ValueError(f"AI tracker sync failed with status {adapter_response.status_code}.")
adapter_data = adapter_response.data if isinstance(adapter_response.data, dict) else {}
logger.info(
"farm=%s tracker sync received AI response status_code=%s response=%s",
farm.farm_uuid,
adapter_response.status_code,
adapter_data,
)
payload = adapter_data.get("data")
if isinstance(payload, dict) and isinstance(payload.get("result"), dict):
payload = payload["result"]
elif not isinstance(payload, dict):
payload = adapter_data.get("result") if isinstance(adapter_data.get("result"), dict) else adapter_data
logger.info(
"farm=%s tracker sync normalized AI payload=%s",
farm.farm_uuid,
payload,
)
update_tracker_snapshot(
farm=farm,
adapter_payload=payload or {},
source_updated_at=source_updated_at,
)
logger.info("farm=%s tracker sync completed successfully", farm.farm_uuid)
return {"farm_uuid": str(farm.farm_uuid), "status": "synced"}
def sync_all_farm_alert_trackers():
farms = FarmHub.objects.all().order_by("id")
logger.info("farm alerts sync discovered %s farm(s) to process", farms.count())
results = []
for farm in farms:
results.append(sync_farm_tracker_with_ai(farm=farm))
return {"processed": len(results), "results": results}
def get_alert_tracker_data(farm=None): def get_alert_tracker_data(farm=None):
if farm is None: if farm is None:
return deepcopy(ARM_ALERTS_TRACKER) return deepcopy(ARM_ALERTS_TRACKER)
+21
View File
@@ -0,0 +1,21 @@
import logging
from celery import shared_task
from .services import sync_all_farm_alert_trackers
logger = logging.getLogger("farm_alerts")
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_jitter=True,
retry_kwargs={"max_retries": 3},
)
def sync_farm_alert_trackers(self):
logger.info("farm alerts periodic sync task started task_id=%s", getattr(self.request, "id", ""))
result = sync_all_farm_alert_trackers()
logger.info("farm alerts periodic sync task finished result=%s", result)
return result
+123 -112
View File
@@ -10,8 +10,9 @@ from external_api_adapter.adapter import AdapterResponse
from farm_hub.models import FarmHub, FarmType from farm_hub.models import FarmHub, FarmType
from notifications.models import FarmNotification from notifications.models import FarmNotification
from .models import FarmAlert from .models import FarmAlert, FarmAlertTrackerSnapshot
from .serializers import FarmAlertsTrackerRequestSerializer from .serializers import FarmAlertsTrackerRequestSerializer
from .services import sync_farm_tracker_with_ai
from .views import AlertTrackerView from .views import AlertTrackerView
@@ -20,13 +21,7 @@ class FarmAlertsTrackerRequestSerializerTests(SimpleTestCase):
serializer = FarmAlertsTrackerRequestSerializer( serializer = FarmAlertsTrackerRequestSerializer(
data={ data={
"farm_uuid": "11111111-1111-1111-1111-111111111111", "farm_uuid": "11111111-1111-1111-1111-111111111111",
"alerts": [ "alerts": [],
{
"alert_id": "soil-1",
"level": "warning",
"title": "Low moisture",
}
],
} }
) )
@@ -62,112 +57,22 @@ class FarmAlertsTrackerViewTests(TestCase):
self.farm_type = FarmType.objects.create(name="مرکبات") self.farm_type = FarmType.objects.create(name="مرکبات")
self.farm = FarmHub.objects.create(owner=self.user, farm_type=self.farm_type, name="Farm Alerts") self.farm = FarmHub.objects.create(owner=self.user, farm_type=self.farm_type, name="Farm Alerts")
@patch("farm_alerts.views.external_api_request") def test_tracker_returns_cached_snapshot_without_accepting_alerts(self):
def test_tracker_persists_incoming_alerts_and_sends_recent_notifications_to_ai(self, mock_external_api_request):
recent_notification = FarmNotification.objects.create(
farm=self.farm,
endpoint="tracker",
title="Recent alert",
message="Recent notification",
level="warning",
)
old_notification = FarmNotification.objects.create(
farm=self.farm,
endpoint="tracker",
title="Old alert",
message="Old notification",
level="info",
)
FarmNotification.objects.filter(id=old_notification.id).update(created_at=timezone.now() - timedelta(days=4))
old_notification.refresh_from_db()
mock_external_api_request.return_value = AdapterResponse(
status_code=200,
data={
"data": {
"headline": "وضعیت هشدارها",
"overview": "دو مورد نیاز به پیگیری دارد.",
"status_level": "warning",
"tracker": {"active": 2},
"notifications": [
{
"title": "افت رطوبت خاک",
"message": "تنش رطوبتی ادامه دارد.",
"level": "warning",
"suggested_action": "آبیاری جبرانی انجام شود.",
"source_alert_id": "soil-1",
"source_metric_type": "moisture",
"payload": {"current_value": 38.5},
}
],
"structured_context": {"source": "ai"},
}
},
)
request = self.factory.post(
"/api/farm-alerts/tracker/",
{
"farm_uuid": str(self.farm.farm_uuid),
"alerts": [
{
"alert_id": "soil-1",
"level": "danger",
"title": "افت رطوبت خاک",
"message": "رطوبت خاک کمتر از حد مطلوب است.",
"suggested_action": "آبیاری اصلاحی بررسی شود.",
"source_metric_type": "moisture",
"payload": {"current_value": 38.5},
}
],
},
format="json",
)
force_authenticate(request, user=self.user)
response = AlertTrackerView.as_view()(request)
self.assertEqual(response.status_code, 200)
self.assertEqual(FarmAlert.objects.filter(farm=self.farm).count(), 1)
saved_alert = FarmAlert.objects.get(farm=self.farm)
self.assertEqual(saved_alert.external_alert_id, "soil-1")
self.assertEqual(saved_alert.color, "error")
self.assertEqual(saved_alert.source_metric_type, "moisture")
mock_external_api_request.assert_called_once()
outbound_payload = mock_external_api_request.call_args.kwargs["payload"]
self.assertEqual(outbound_payload["farm_uuid"], str(self.farm.farm_uuid))
self.assertEqual(len(outbound_payload["alerts"]), 1)
self.assertEqual(len(outbound_payload["recent_notifications"]), 1)
self.assertEqual(outbound_payload["recent_notifications"][0]["id"], recent_notification.id)
self.assertEqual(response.data["data"]["headline"], "وضعیت هشدارها")
self.assertEqual(response.data["data"]["status_level"], "warning")
self.assertEqual(len(response.data["data"]["notifications"]), 1)
self.assertEqual(response.data["data"]["notifications"][0]["endpoint"], "tracker")
persisted_notification = FarmNotification.objects.filter(
farm=self.farm,
title="افت رطوبت خاک",
endpoint="tracker",
).latest("id")
self.assertEqual(persisted_notification.source_alert_id, "soil-1")
self.assertEqual(persisted_notification.suggested_action, "آبیاری جبرانی انجام شود.")
@patch("farm_alerts.views.external_api_request")
def test_tracker_limits_recent_notifications_to_ten(self, mock_external_api_request):
for index in range(12):
FarmNotification.objects.create( FarmNotification.objects.create(
farm=self.farm, farm=self.farm,
endpoint="tracker", endpoint="tracker",
title=f"Notification {index}", title="AI alert",
message="msg", message="Cached notification",
level="warning",
) )
FarmAlertTrackerSnapshot.objects.create(
mock_external_api_request.return_value = AdapterResponse( farm=self.farm,
status_code=200, headline="وضعیت هشدارها",
data={"data": {"headline": "", "overview": "", "status_level": "info", "notifications": []}}, overview="دو مورد نیاز به پیگیری دارد.",
status_level="warning",
tracker={"active": 2},
raw_llm_response='{"headline":"cached"}',
structured_context={"source": "ai"},
) )
request = self.factory.post( request = self.factory.post(
@@ -180,8 +85,27 @@ class FarmAlertsTrackerViewTests(TestCase):
response = AlertTrackerView.as_view()(request) response = AlertTrackerView.as_view()(request)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
outbound_payload = mock_external_api_request.call_args.kwargs["payload"] self.assertEqual(response.data["data"]["headline"], "وضعیت هشدارها")
self.assertEqual(len(outbound_payload["recent_notifications"]), 10) self.assertEqual(response.data["data"]["status_level"], "warning")
self.assertEqual(len(response.data["data"]["notifications"]), 1)
self.assertEqual(response.data["data"]["notifications"][0]["endpoint"], "tracker")
def test_tracker_limits_cached_notifications_to_ten(self):
for index in range(12):
FarmNotification.objects.create(farm=self.farm, endpoint="tracker", title=f"Notification {index}", message="msg")
FarmAlertTrackerSnapshot.objects.create(farm=self.farm)
request = self.factory.post(
"/api/farm-alerts/tracker/",
{"farm_uuid": str(self.farm.farm_uuid)},
format="json",
)
force_authenticate(request, user=self.user)
response = AlertTrackerView.as_view()(request)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data["data"]["notifications"]), 10)
def test_tracker_rejects_unowned_farm(self): def test_tracker_rejects_unowned_farm(self):
request = self.factory.post( request = self.factory.post(
@@ -195,3 +119,90 @@ class FarmAlertsTrackerViewTests(TestCase):
self.assertEqual(response.status_code, 400) self.assertEqual(response.status_code, 400)
self.assertEqual(response.data["farm_uuid"][0], "Farm not found.") self.assertEqual(response.data["farm_uuid"][0], "Farm not found.")
@patch("farm_alerts.services.external_api_request")
def test_sync_task_sends_last_five_notifications_to_ai_and_updates_snapshot(self, mock_external_api_request):
for index in range(6):
FarmNotification.objects.create(
farm=self.farm,
endpoint="irrigation",
title=f"Irrigation reminder {index}",
message=f"Run irrigation cycle {index}",
level="info" if index % 2 == 0 else "warning",
)
FarmNotification.objects.create(
farm=self.farm,
endpoint="irrigation",
title="AI generated tracker notice",
message="Should be excluded from AI input",
level="info",
metadata={"source": "farm_alerts_tracker_ai"},
)
mock_external_api_request.return_value = AdapterResponse(
status_code=200,
data={
"data": {
"headline": "وضعیت جدید",
"overview": "یک تغییر جدید شناسایی شد.",
"status_level": "warning",
"tracker": {"active": 1},
"notifications": [
{
"title": "افت رطوبت خاک",
"message": "تنش رطوبتی ادامه دارد.",
"level": "warning",
"suggested_action": "آبیاری جبرانی انجام شود.",
"source_alert_id": "soil-1",
}
],
"structured_context": {"source": "ai"},
}
},
)
result = sync_farm_tracker_with_ai(farm=self.farm)
self.assertEqual(result["status"], "synced")
mock_external_api_request.assert_called_once()
outbound_payload = mock_external_api_request.call_args.kwargs["payload"]
self.assertEqual(outbound_payload["farm_uuid"], str(self.farm.farm_uuid))
self.assertNotIn("alerts", outbound_payload)
self.assertEqual(len(outbound_payload["recent_notifications"]), 5)
self.assertEqual(outbound_payload["recent_notifications"][0]["title"], "Irrigation reminder 5")
self.assertEqual(outbound_payload["recent_notifications"][-1]["title"], "Irrigation reminder 1")
snapshot = FarmAlertTrackerSnapshot.objects.get(farm=self.farm)
self.assertEqual(snapshot.headline, "وضعیت جدید")
self.assertEqual(snapshot.status_level, "warning")
self.assertIsNotNone(snapshot.last_ai_synced_at)
self.assertIsNotNone(snapshot.last_source_update_at)
persisted_notification = FarmNotification.objects.filter(
farm=self.farm,
title="افت رطوبت خاک",
endpoint="tracker",
).latest("id")
self.assertEqual(persisted_notification.metadata["source"], "farm_alerts_tracker_ai")
@patch("farm_alerts.services.external_api_request")
def test_sync_task_skips_ai_when_no_new_data_exists(self, mock_external_api_request):
snapshot = FarmAlertTrackerSnapshot.objects.create(
farm=self.farm,
last_ai_synced_at=timezone.now(),
last_source_update_at=timezone.now(),
)
notification = FarmNotification.objects.create(
farm=self.farm,
endpoint="irrigation",
title="Irrigation reminder",
message="Run irrigation cycle",
level="warning",
)
FarmNotification.objects.filter(id=notification.id).update(updated_at=snapshot.last_source_update_at - timedelta(minutes=1))
result = sync_farm_tracker_with_ai(farm=self.farm)
self.assertEqual(result["status"], "skipped")
self.assertEqual(result["reason"], "no_changes")
mock_external_api_request.assert_not_called()
+15 -54
View File
@@ -1,3 +1,5 @@
import logging
from drf_spectacular.utils import OpenApiExample, extend_schema from drf_spectacular.utils import OpenApiExample, extend_schema
from rest_framework import serializers, status from rest_framework import serializers, status
from rest_framework.permissions import IsAuthenticated from rest_framework.permissions import IsAuthenticated
@@ -5,45 +7,17 @@ from rest_framework.response import Response
from rest_framework.views import APIView from rest_framework.views import APIView
from config.swagger import code_response from config.swagger import code_response
from external_api_adapter import request as external_api_request
from farm_hub.models import FarmHub from farm_hub.models import FarmHub
from .serializers import AlertTrackerAIResponseSerializer, FarmAlertsTrackerRequestSerializer from .serializers import AlertTrackerAIResponseSerializer, FarmAlertsTrackerRequestSerializer
from .services import AlertService, build_tracker_context, build_tracker_response from .services import AlertService, build_tracker_response_from_snapshot
logger = logging.getLogger("farm_alerts")
class FarmAlertsBaseView(APIView): class FarmAlertsBaseView(APIView):
permission_classes = [IsAuthenticated] permission_classes = [IsAuthenticated]
@staticmethod
def _extract_result(adapter_data):
if not isinstance(adapter_data, dict):
return {}
data = adapter_data.get("data")
if isinstance(data, dict) and isinstance(data.get("result"), dict):
return data["result"]
if isinstance(data, dict):
return data
result = adapter_data.get("result")
if isinstance(result, dict):
return result
return adapter_data
@staticmethod
def _error_response(adapter_response):
response_data = (
adapter_response.data
if isinstance(adapter_response.data, dict)
else {"message": str(adapter_response.data)}
)
return Response(
{"code": adapter_response.status_code, "msg": "error", "data": response_data},
status=adapter_response.status_code,
)
@staticmethod @staticmethod
def _get_farm(request, farm_uuid): def _get_farm(request, farm_uuid):
if not farm_uuid: if not farm_uuid:
@@ -63,16 +37,6 @@ class AlertTrackerView(FarmAlertsBaseView):
"Tracker Request", "Tracker Request",
value={ value={
"farm_uuid": "11111111-1111-1111-1111-111111111111", "farm_uuid": "11111111-1111-1111-1111-111111111111",
"alerts": [
{
"alert_id": "soil-moisture-001",
"level": "warning",
"title": "افت رطوبت خاک",
"message": "رطوبت خاک کمتر از حد مطلوب گزارش شده است.",
"suggested_action": "آبیاری اصلاحی بررسی شود.",
"source_metric_type": "moisture",
}
],
}, },
request_only=True, request_only=True,
) )
@@ -84,20 +48,17 @@ class AlertTrackerView(FarmAlertsBaseView):
request_serializer.is_valid(raise_exception=True) request_serializer.is_valid(raise_exception=True)
farm = self._get_farm(request, request_serializer.validated_data["farm_uuid"]) farm = self._get_farm(request, request_serializer.validated_data["farm_uuid"])
incoming_alerts = request_serializer.validated_data.get("alerts", []) logger.info(
AlertService.persist_incoming_alerts(farm=farm, alerts=incoming_alerts) "tracker endpoint received request farm=%s payload=%s",
farm.farm_uuid,
tracker_payload = build_tracker_context(farm=farm, alerts=incoming_alerts) request.data,
adapter_response = external_api_request(
"ai",
"/api/farm-alerts/tracker/",
method="POST",
payload=tracker_payload,
) )
if adapter_response.status_code >= 400:
return self._error_response(adapter_response)
payload = self._extract_result(adapter_response.data) response_data = build_tracker_response_from_snapshot(farm=farm)
response_data = build_tracker_response(farm=farm, adapter_payload=payload) logger.info(
"tracker endpoint returning cached response farm=%s response=%s",
farm.farm_uuid,
response_data,
)
serializer = AlertTrackerAIResponseSerializer(instance=response_data) serializer = AlertTrackerAIResponseSerializer(instance=response_data)
return Response({"code": 200, "msg": "success", "data": serializer.data}, status=status.HTTP_200_OK) return Response({"code": 200, "msg": "success", "data": serializer.data}, status=status.HTTP_200_OK)