diff --git a/.env.example b/.env.example index 6e18aa1..25e0292 100644 --- a/.env.example +++ b/.env.example @@ -28,6 +28,10 @@ CROP_ZONE_CHUNK_AREA_SQM=10000 CELERY_BROKER_URL=redis://redis:6379/0 CELERY_RESULT_BACKEND=redis://redis:6379/0 CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP=true + +FARM_ALERTS_AI_SYNC_CRON_MINUTE=0 +FARM_ALERTS_AI_SYNC_CRON_HOUR=* + QDRANT_HOST=qdrant QDRANT_PORT=6333 diff --git a/celerybeat-schedule b/celerybeat-schedule new file mode 100644 index 0000000..086bbb4 Binary files /dev/null and b/celerybeat-schedule differ diff --git a/config/settings.py b/config/settings.py index 3244bfa..133c327 100644 --- a/config/settings.py +++ b/config/settings.py @@ -3,6 +3,7 @@ from datetime import timedelta from pathlib import Path from dotenv import load_dotenv +from celery.schedules import crontab load_dotenv() @@ -227,6 +228,18 @@ CELERY_WORKER_PREFETCH_MULTIPLIER = int(os.getenv("CELERY_WORKER_PREFETCH_MULTIP CELERY_TASK_TIME_LIMIT = int(os.getenv("CELERY_TASK_TIME_LIMIT", "120")) CELERY_TASK_SOFT_TIME_LIMIT = int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "90")) CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = os.getenv("CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP", "true").lower() == "true" +FARM_ALERTS_AI_SYNC_CRON_MINUTE = os.getenv("FARM_ALERTS_AI_SYNC_CRON_MINUTE", "0") +FARM_ALERTS_AI_SYNC_CRON_HOUR = os.getenv("FARM_ALERTS_AI_SYNC_CRON_HOUR", "*") + +CELERY_BEAT_SCHEDULE = { + "sync-farm-alert-trackers": { + "task": "farm_alerts.tasks.sync_farm_alert_trackers", + "schedule": crontab( + minute=FARM_ALERTS_AI_SYNC_CRON_MINUTE, + hour=FARM_ALERTS_AI_SYNC_CRON_HOUR, + ), + } +} LOGGING = { "version": 1, @@ -243,6 +256,12 @@ LOGGING = { "filename": LOG_DIR / "farm_ai_assistant.log", "formatter": "standard", }, + "farm_alerts_file": { + "level": "INFO", + "class": "logging.FileHandler", + "filename": LOG_DIR / "farm_alerts.log", + "formatter": "standard", + }, "external_api_adapter_file": { "level": "WARNING", "class": "logging.FileHandler", @@ -256,6 +275,11 @@ LOGGING = { "level": "WARNING", "propagate": False, }, + "farm_alerts": { + "handlers": ["farm_alerts_file"], + "level": "INFO", + "propagate": False, + }, "external_api_adapter": { "handlers": ["external_api_adapter_file"], "level": "WARNING", diff --git a/docker-compose-prod.yaml b/docker-compose-prod.yaml index e14ef45..3214fb1 100644 --- a/docker-compose-prod.yaml +++ b/docker-compose-prod.yaml @@ -102,6 +102,34 @@ services: networks: - crop_network + celery-beat: + container_name: backend-celery-beat + build: . + command: ["celery", "-A", "config", "beat", "-l", "info"] + env_file: + - .env + environment: + DOCKER_VERSION: ${DOCKER_VERSION:-production} + ALLOWED_HOSTS: ${ALLOWED_HOSTS:-localhost,127.0.0.1,0.0.0.0,web,backend-web} + AI_SERVICE_BASE_URL: ${AI_SERVICE_BASE_URL:-http://ai-web:8000} + AI_SERVICE_HOST_HEADER: ${AI_SERVICE_HOST_HEADER:-localhost} + DB_HOST: croplogic-db + CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://backend-redis:6379/0} + CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://backend-redis:6379/0} + CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP: "true" + SKIP_MIGRATE: "1" + ACCESS_CONTROL_AUTHZ_BASE_URL: http://croplogic-accsess-opa:8181 + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + accsess: + condition: service_started + restart: unless-stopped + networks: + - crop_network + networks: crop_network: external: true diff --git a/docker-compose.yaml b/docker-compose.yaml index b6ac482..14dfd87 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -121,6 +121,40 @@ services: networks: - crop_network + celery-beat: + build: + context: . + args: + APT_MIRROR: mirror2.chabokan.net + PIP_INDEX_URL: https://package-mirror.liara.ir/repository/pypi/simple + PIP_EXTRA_INDEX_URL: https://mirror2.chabokan.net/pypi/simple + PYTHON_MIRROR: mirror2.chabokan.net + container_name: backend-celery-beat + command: ["celery", "-A", "config", "beat", "-l", "info"] + volumes: + - .:/app + - ./logs:/app/logs + env_file: + - .env + environment: + DOCKER_VERSION: ${DOCKER_VERSION:-develop} + ALLOWED_HOSTS: ${ALLOWED_HOSTS:-localhost,127.0.0.1,0.0.0.0,web,backend-web} + AI_SERVICE_BASE_URL: ${AI_SERVICE_BASE_URL:-http://ai-web:8000} + DB_HOST: croplogic-db + CELERY_BROKER_URL: redis://backend-redis:6379/0 + CELERY_RESULT_BACKEND: redis://backend-redis:6379/0 + CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP: "true" + SKIP_MIGRATE: "1" + ACCESS_CONTROL_AUTHZ_BASE_URL: http://croplogic-accsess-opa:8181 + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + restart: unless-stopped + networks: + - crop_network + volumes: backend_mysql_data: backend_redis_data: diff --git a/farm_alerts/migrations/0004_farmalerttrackersnapshot.py b/farm_alerts/migrations/0004_farmalerttrackersnapshot.py new file mode 100644 index 0000000..03604f6 --- /dev/null +++ b/farm_alerts/migrations/0004_farmalerttrackersnapshot.py @@ -0,0 +1,48 @@ +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("farm_hub", "0001_initial"), + ("farm_alerts", "0003_farmalert_tracker_fields"), + ] + + operations = [ + migrations.CreateModel( + name="FarmAlertTrackerSnapshot", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("service_id", models.CharField(default="farm_alerts", max_length=64)), + ("tracker", models.JSONField(blank=True, default=dict)), + ("headline", models.CharField(blank=True, default="", max_length=255)), + ("overview", models.TextField(blank=True, default="")), + ( + "status_level", + models.CharField( + choices=[("info", "Info"), ("warning", "Warning"), ("error", "Error"), ("success", "Success")], + default="info", + max_length=32, + ), + ), + ("raw_llm_response", models.TextField(blank=True, default="")), + ("structured_context", models.JSONField(blank=True, default=dict)), + ("last_ai_synced_at", models.DateTimeField(blank=True, null=True)), + ("last_source_update_at", models.DateTimeField(blank=True, null=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ( + "farm", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="alert_tracker_snapshot", + to="farm_hub.farmhub", + ), + ), + ], + options={ + "db_table": "farm_alert_tracker_snapshots", + }, + ), + ] diff --git a/farm_alerts/models.py b/farm_alerts/models.py index cb81f5b..8c313f8 100644 --- a/farm_alerts/models.py +++ b/farm_alerts/models.py @@ -71,3 +71,28 @@ class Recommendation(models.Model): def __str__(self): return self.title + + +class FarmAlertTrackerSnapshot(models.Model): + farm = models.OneToOneField( + FarmHub, + on_delete=models.CASCADE, + related_name="alert_tracker_snapshot", + ) + service_id = models.CharField(max_length=64, default="farm_alerts") + tracker = models.JSONField(default=dict, blank=True) + headline = models.CharField(max_length=255, blank=True, default="") + overview = models.TextField(blank=True, default="") + status_level = models.CharField(max_length=32, default="info", choices=SEVERITY_CHOICES) + raw_llm_response = models.TextField(blank=True, default="") + structured_context = models.JSONField(default=dict, blank=True) + last_ai_synced_at = models.DateTimeField(null=True, blank=True) + last_source_update_at = models.DateTimeField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + db_table = "farm_alert_tracker_snapshots" + + def __str__(self): + return f"Tracker snapshot for {self.farm_id}" diff --git a/farm_alerts/services.py b/farm_alerts/services.py index 3d43a8e..83520b2 100644 --- a/farm_alerts/services.py +++ b/farm_alerts/services.py @@ -1,7 +1,11 @@ from collections import Counter from copy import deepcopy import json +import logging +from django.utils import timezone + +from external_api_adapter import request as external_api_request from farm_hub.models import FarmHub from notifications.models import FarmNotification from notifications.services import create_notification_for_farm_uuid, get_recent_notifications_for_farm @@ -12,7 +16,7 @@ from .mock_data import ( FARM_ALERTS_TIMELINE, RECOMMENDATIONS_LIST, ) -from .models import AnomalyDetection, FarmAlert, Recommendation +from .models import AnomalyDetection, FarmAlert, FarmAlertTrackerSnapshot, Recommendation LEVEL_ALIAS_MAP = { @@ -21,6 +25,9 @@ LEVEL_ALIAS_MAP = { "warn": "warning", } +TRACKER_AI_NOTIFICATION_SOURCE = "farm_alerts_tracker_ai" +logger = logging.getLogger("farm_alerts") + class AlertService: @staticmethod @@ -100,8 +107,9 @@ class AlertService: ) -def serialize_notifications_for_ai(*, farm, since_days=3, limit=10): +def serialize_notifications_for_ai(*, farm, since_days=3, limit=5): notifications = get_recent_notifications_for_farm(farm=farm, since_days=since_days, limit=limit) + notifications = [item for item in notifications if item.metadata.get("source") != TRACKER_AI_NOTIFICATION_SOURCE] return [ { "id": notification.id, @@ -152,55 +160,227 @@ def save_tracker_notifications(*, farm_uuid, notifications): source_alert_id=source_alert_id, source_metric_type=notification_data.get("source_metric_type", ""), payload=notification_data.get("payload") or {}, - metadata={"source": "farm_alerts_tracker_ai"}, + metadata={"source": TRACKER_AI_NOTIFICATION_SOURCE}, ) ) return saved_notifications -def build_tracker_context(*, farm, alerts): - recent_notifications = serialize_notifications_for_ai(farm=farm, since_days=3, limit=10) - counts = Counter( - AlertService.normalize_level(alert.get("level")) +def build_tracker_context(*, farm): + recent_notifications = serialize_notifications_for_ai(farm=farm, since_days=3, limit=5) + payload = {"farm_uuid": str(farm.farm_uuid)} + + if recent_notifications: + counts = Counter( + AlertService.normalize_level(notification.get("level")) + for notification in recent_notifications + if notification.get("level") + ) + payload["recent_notifications"] = recent_notifications + payload["structured_context"] = { + "farm_uuid": str(farm.farm_uuid), + "notifications_count": len(recent_notifications), + "recent_notifications_count": len(recent_notifications), + "recent_notifications_window_days": 3, + "recent_notifications_limit": 5, + "notification_levels": dict(counts), + } + + return payload + + +def serialize_alerts_for_ai(*, farm, since=None, limit=50): + queryset = FarmAlert.objects.filter(farm=farm).order_by("-created_at", "-id") + if since is not None: + queryset = queryset.filter(created_at__gt=since) + + alerts = queryset[:limit] + return [ + { + "alert_id": alert.external_alert_id, + "level": alert.color, + "title": alert.title, + "message": alert.description, + "suggested_action": alert.suggested_action, + "source_metric_type": alert.source_metric_type, + "timestamp": alert.occurred_at.isoformat() if alert.occurred_at else None, + "payload": alert.payload, + } for alert in alerts - if alert.get("level") + ] + + +def get_tracker_notifications(*, farm, limit=10): + return list( + FarmNotification.objects.filter(farm=farm, endpoint="tracker") + .order_by("-created_at", "-id")[:limit] ) - structured_context = { - "farm_uuid": str(farm.farm_uuid), - "alerts_count": len(alerts), - "recent_notifications_count": len(recent_notifications), - "recent_notifications_window_days": 3, - "recent_notifications_limit": 10, - "alert_levels": dict(counts), - } - return { - "farm_uuid": str(farm.farm_uuid), - "alerts": alerts, - "recent_notifications": recent_notifications, - "structured_context": structured_context, - } -def build_tracker_response(*, farm, adapter_payload): +def get_tracker_source_updated_at(*, farm): + latest_alert = FarmAlert.objects.filter(farm=farm).order_by("-created_at", "-id").values_list("created_at", flat=True).first() + latest_notification = ( + FarmNotification.objects.filter(farm=farm) + .exclude(metadata__source=TRACKER_AI_NOTIFICATION_SOURCE) + .order_by("-updated_at", "-id") + .values_list("updated_at", flat=True) + .first() + ) + candidates = [item for item in (latest_alert, latest_notification) if item is not None] + if not candidates: + return None + return max(candidates) + + +def get_or_create_tracker_snapshot(*, farm): + snapshot, _ = FarmAlertTrackerSnapshot.objects.get_or_create(farm=farm) + return snapshot + + +def update_tracker_snapshot(*, farm, adapter_payload, source_updated_at): + snapshot = get_or_create_tracker_snapshot(farm=farm) notifications_payload = adapter_payload.get("notifications") or [] - saved_notifications = save_tracker_notifications(farm_uuid=farm.farm_uuid, notifications=notifications_payload) + save_tracker_notifications(farm_uuid=farm.farm_uuid, notifications=notifications_payload) + raw_llm_response = adapter_payload.get("raw_llm_response", "") if not raw_llm_response: raw_llm_response = json.dumps(adapter_payload, ensure_ascii=False) + snapshot.service_id = adapter_payload.get("service_id", "farm_alerts") + snapshot.tracker = adapter_payload.get("tracker") or {} + snapshot.headline = adapter_payload.get("headline", "") + snapshot.overview = adapter_payload.get("overview", "") + snapshot.status_level = AlertService.normalize_level(adapter_payload.get("status_level")) + snapshot.raw_llm_response = raw_llm_response + snapshot.structured_context = adapter_payload.get("structured_context") or {} + snapshot.last_ai_synced_at = timezone.now() + snapshot.last_source_update_at = source_updated_at + snapshot.save( + update_fields=[ + "service_id", + "tracker", + "headline", + "overview", + "status_level", + "raw_llm_response", + "structured_context", + "last_ai_synced_at", + "last_source_update_at", + "updated_at", + ] + ) + return snapshot + + +def build_tracker_response_from_snapshot(*, farm): + snapshot = FarmAlertTrackerSnapshot.objects.filter(farm=farm).first() + notifications = get_tracker_notifications(farm=farm, limit=10) + if snapshot is None: + return { + "farm_uuid": str(farm.farm_uuid), + "service_id": "farm_alerts", + "tracker": {}, + "headline": "", + "overview": "", + "status_level": "info", + "notifications": notifications, + "raw_llm_response": "", + "structured_context": {}, + } + return { "farm_uuid": str(farm.farm_uuid), - "service_id": adapter_payload.get("service_id", "farm_alerts"), - "tracker": adapter_payload.get("tracker") or {}, - "headline": adapter_payload.get("headline", ""), - "overview": adapter_payload.get("overview", ""), - "status_level": AlertService.normalize_level(adapter_payload.get("status_level")), - "notifications": saved_notifications, - "raw_llm_response": raw_llm_response, - "structured_context": adapter_payload.get("structured_context") or {}, + "service_id": snapshot.service_id, + "tracker": snapshot.tracker or {}, + "headline": snapshot.headline, + "overview": snapshot.overview, + "status_level": AlertService.normalize_level(snapshot.status_level), + "notifications": notifications, + "raw_llm_response": snapshot.raw_llm_response, + "structured_context": snapshot.structured_context or {}, } +def sync_farm_tracker_with_ai(*, farm): + snapshot = FarmAlertTrackerSnapshot.objects.filter(farm=farm).first() + source_updated_at = get_tracker_source_updated_at(farm=farm) + if source_updated_at is None: + logger.info( + "farm=%s tracker sync proceeding without source data snapshot_exists=%s", + farm.farm_uuid, + snapshot is not None, + ) + + if ( + source_updated_at is not None + and snapshot is not None + and snapshot.last_source_update_at is not None + and source_updated_at <= snapshot.last_source_update_at + ): + logger.info( + "farm=%s tracker sync skipped: no changes source_updated_at=%s last_source_update_at=%s", + farm.farm_uuid, + source_updated_at, + snapshot.last_source_update_at, + ) + return {"farm_uuid": str(farm.farm_uuid), "status": "skipped", "reason": "no_changes"} + + tracker_payload = build_tracker_context(farm=farm) + logger.info( + "farm=%s tracker sync sending AI request recent_notifications=%s payload=%s", + farm.farm_uuid, + len(tracker_payload.get("recent_notifications", [])), + tracker_payload, + ) + adapter_response = external_api_request( + "ai", + "/api/farm-alerts/tracker/", + method="POST", + payload=tracker_payload, + ) + if adapter_response.status_code >= 400: + logger.warning( + "farm=%s tracker sync failed status_code=%s response=%s", + farm.farm_uuid, + adapter_response.status_code, + adapter_response.data, + ) + raise ValueError(f"AI tracker sync failed with status {adapter_response.status_code}.") + + adapter_data = adapter_response.data if isinstance(adapter_response.data, dict) else {} + logger.info( + "farm=%s tracker sync received AI response status_code=%s response=%s", + farm.farm_uuid, + adapter_response.status_code, + adapter_data, + ) + payload = adapter_data.get("data") + if isinstance(payload, dict) and isinstance(payload.get("result"), dict): + payload = payload["result"] + elif not isinstance(payload, dict): + payload = adapter_data.get("result") if isinstance(adapter_data.get("result"), dict) else adapter_data + logger.info( + "farm=%s tracker sync normalized AI payload=%s", + farm.farm_uuid, + payload, + ) + + update_tracker_snapshot( + farm=farm, + adapter_payload=payload or {}, + source_updated_at=source_updated_at, + ) + logger.info("farm=%s tracker sync completed successfully", farm.farm_uuid) + return {"farm_uuid": str(farm.farm_uuid), "status": "synced"} + + +def sync_all_farm_alert_trackers(): + farms = FarmHub.objects.all().order_by("id") + logger.info("farm alerts sync discovered %s farm(s) to process", farms.count()) + results = [] + for farm in farms: + results.append(sync_farm_tracker_with_ai(farm=farm)) + return {"processed": len(results), "results": results} def get_alert_tracker_data(farm=None): if farm is None: return deepcopy(ARM_ALERTS_TRACKER) diff --git a/farm_alerts/tasks.py b/farm_alerts/tasks.py new file mode 100644 index 0000000..485528b --- /dev/null +++ b/farm_alerts/tasks.py @@ -0,0 +1,21 @@ +import logging + +from celery import shared_task + +from .services import sync_all_farm_alert_trackers + +logger = logging.getLogger("farm_alerts") + + +@shared_task( + bind=True, + autoretry_for=(Exception,), + retry_backoff=True, + retry_jitter=True, + retry_kwargs={"max_retries": 3}, +) +def sync_farm_alert_trackers(self): + logger.info("farm alerts periodic sync task started task_id=%s", getattr(self.request, "id", "")) + result = sync_all_farm_alert_trackers() + logger.info("farm alerts periodic sync task finished result=%s", result) + return result diff --git a/farm_alerts/tests.py b/farm_alerts/tests.py index ba3d1b8..bb0daf3 100644 --- a/farm_alerts/tests.py +++ b/farm_alerts/tests.py @@ -10,8 +10,9 @@ from external_api_adapter.adapter import AdapterResponse from farm_hub.models import FarmHub, FarmType from notifications.models import FarmNotification -from .models import FarmAlert +from .models import FarmAlert, FarmAlertTrackerSnapshot from .serializers import FarmAlertsTrackerRequestSerializer +from .services import sync_farm_tracker_with_ai from .views import AlertTrackerView @@ -20,13 +21,7 @@ class FarmAlertsTrackerRequestSerializerTests(SimpleTestCase): serializer = FarmAlertsTrackerRequestSerializer( data={ "farm_uuid": "11111111-1111-1111-1111-111111111111", - "alerts": [ - { - "alert_id": "soil-1", - "level": "warning", - "title": "Low moisture", - } - ], + "alerts": [], } ) @@ -62,112 +57,22 @@ class FarmAlertsTrackerViewTests(TestCase): self.farm_type = FarmType.objects.create(name="مرکبات") self.farm = FarmHub.objects.create(owner=self.user, farm_type=self.farm_type, name="Farm Alerts") - @patch("farm_alerts.views.external_api_request") - def test_tracker_persists_incoming_alerts_and_sends_recent_notifications_to_ai(self, mock_external_api_request): - recent_notification = FarmNotification.objects.create( + def test_tracker_returns_cached_snapshot_without_accepting_alerts(self): + FarmNotification.objects.create( farm=self.farm, endpoint="tracker", - title="Recent alert", - message="Recent notification", + title="AI alert", + message="Cached notification", level="warning", ) - old_notification = FarmNotification.objects.create( + FarmAlertTrackerSnapshot.objects.create( farm=self.farm, - endpoint="tracker", - title="Old alert", - message="Old notification", - level="info", - ) - FarmNotification.objects.filter(id=old_notification.id).update(created_at=timezone.now() - timedelta(days=4)) - old_notification.refresh_from_db() - - mock_external_api_request.return_value = AdapterResponse( - status_code=200, - data={ - "data": { - "headline": "وضعیت هشدارها", - "overview": "دو مورد نیاز به پیگیری دارد.", - "status_level": "warning", - "tracker": {"active": 2}, - "notifications": [ - { - "title": "افت رطوبت خاک", - "message": "تنش رطوبتی ادامه دارد.", - "level": "warning", - "suggested_action": "آبیاری جبرانی انجام شود.", - "source_alert_id": "soil-1", - "source_metric_type": "moisture", - "payload": {"current_value": 38.5}, - } - ], - "structured_context": {"source": "ai"}, - } - }, - ) - - request = self.factory.post( - "/api/farm-alerts/tracker/", - { - "farm_uuid": str(self.farm.farm_uuid), - "alerts": [ - { - "alert_id": "soil-1", - "level": "danger", - "title": "افت رطوبت خاک", - "message": "رطوبت خاک کمتر از حد مطلوب است.", - "suggested_action": "آبیاری اصلاحی بررسی شود.", - "source_metric_type": "moisture", - "payload": {"current_value": 38.5}, - } - ], - }, - format="json", - ) - force_authenticate(request, user=self.user) - - response = AlertTrackerView.as_view()(request) - - self.assertEqual(response.status_code, 200) - self.assertEqual(FarmAlert.objects.filter(farm=self.farm).count(), 1) - - saved_alert = FarmAlert.objects.get(farm=self.farm) - self.assertEqual(saved_alert.external_alert_id, "soil-1") - self.assertEqual(saved_alert.color, "error") - self.assertEqual(saved_alert.source_metric_type, "moisture") - - mock_external_api_request.assert_called_once() - outbound_payload = mock_external_api_request.call_args.kwargs["payload"] - self.assertEqual(outbound_payload["farm_uuid"], str(self.farm.farm_uuid)) - self.assertEqual(len(outbound_payload["alerts"]), 1) - self.assertEqual(len(outbound_payload["recent_notifications"]), 1) - self.assertEqual(outbound_payload["recent_notifications"][0]["id"], recent_notification.id) - - self.assertEqual(response.data["data"]["headline"], "وضعیت هشدارها") - self.assertEqual(response.data["data"]["status_level"], "warning") - self.assertEqual(len(response.data["data"]["notifications"]), 1) - self.assertEqual(response.data["data"]["notifications"][0]["endpoint"], "tracker") - - persisted_notification = FarmNotification.objects.filter( - farm=self.farm, - title="افت رطوبت خاک", - endpoint="tracker", - ).latest("id") - self.assertEqual(persisted_notification.source_alert_id, "soil-1") - self.assertEqual(persisted_notification.suggested_action, "آبیاری جبرانی انجام شود.") - - @patch("farm_alerts.views.external_api_request") - def test_tracker_limits_recent_notifications_to_ten(self, mock_external_api_request): - for index in range(12): - FarmNotification.objects.create( - farm=self.farm, - endpoint="tracker", - title=f"Notification {index}", - message="msg", - ) - - mock_external_api_request.return_value = AdapterResponse( - status_code=200, - data={"data": {"headline": "", "overview": "", "status_level": "info", "notifications": []}}, + headline="وضعیت هشدارها", + overview="دو مورد نیاز به پیگیری دارد.", + status_level="warning", + tracker={"active": 2}, + raw_llm_response='{"headline":"cached"}', + structured_context={"source": "ai"}, ) request = self.factory.post( @@ -180,8 +85,27 @@ class FarmAlertsTrackerViewTests(TestCase): response = AlertTrackerView.as_view()(request) self.assertEqual(response.status_code, 200) - outbound_payload = mock_external_api_request.call_args.kwargs["payload"] - self.assertEqual(len(outbound_payload["recent_notifications"]), 10) + self.assertEqual(response.data["data"]["headline"], "وضعیت هشدارها") + self.assertEqual(response.data["data"]["status_level"], "warning") + self.assertEqual(len(response.data["data"]["notifications"]), 1) + self.assertEqual(response.data["data"]["notifications"][0]["endpoint"], "tracker") + + def test_tracker_limits_cached_notifications_to_ten(self): + for index in range(12): + FarmNotification.objects.create(farm=self.farm, endpoint="tracker", title=f"Notification {index}", message="msg") + FarmAlertTrackerSnapshot.objects.create(farm=self.farm) + + request = self.factory.post( + "/api/farm-alerts/tracker/", + {"farm_uuid": str(self.farm.farm_uuid)}, + format="json", + ) + force_authenticate(request, user=self.user) + + response = AlertTrackerView.as_view()(request) + + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data["data"]["notifications"]), 10) def test_tracker_rejects_unowned_farm(self): request = self.factory.post( @@ -195,3 +119,90 @@ class FarmAlertsTrackerViewTests(TestCase): self.assertEqual(response.status_code, 400) self.assertEqual(response.data["farm_uuid"][0], "Farm not found.") + + @patch("farm_alerts.services.external_api_request") + def test_sync_task_sends_last_five_notifications_to_ai_and_updates_snapshot(self, mock_external_api_request): + for index in range(6): + FarmNotification.objects.create( + farm=self.farm, + endpoint="irrigation", + title=f"Irrigation reminder {index}", + message=f"Run irrigation cycle {index}", + level="info" if index % 2 == 0 else "warning", + ) + FarmNotification.objects.create( + farm=self.farm, + endpoint="irrigation", + title="AI generated tracker notice", + message="Should be excluded from AI input", + level="info", + metadata={"source": "farm_alerts_tracker_ai"}, + ) + + mock_external_api_request.return_value = AdapterResponse( + status_code=200, + data={ + "data": { + "headline": "وضعیت جدید", + "overview": "یک تغییر جدید شناسایی شد.", + "status_level": "warning", + "tracker": {"active": 1}, + "notifications": [ + { + "title": "افت رطوبت خاک", + "message": "تنش رطوبتی ادامه دارد.", + "level": "warning", + "suggested_action": "آبیاری جبرانی انجام شود.", + "source_alert_id": "soil-1", + } + ], + "structured_context": {"source": "ai"}, + } + }, + ) + + result = sync_farm_tracker_with_ai(farm=self.farm) + + self.assertEqual(result["status"], "synced") + mock_external_api_request.assert_called_once() + outbound_payload = mock_external_api_request.call_args.kwargs["payload"] + self.assertEqual(outbound_payload["farm_uuid"], str(self.farm.farm_uuid)) + self.assertNotIn("alerts", outbound_payload) + self.assertEqual(len(outbound_payload["recent_notifications"]), 5) + self.assertEqual(outbound_payload["recent_notifications"][0]["title"], "Irrigation reminder 5") + self.assertEqual(outbound_payload["recent_notifications"][-1]["title"], "Irrigation reminder 1") + + snapshot = FarmAlertTrackerSnapshot.objects.get(farm=self.farm) + self.assertEqual(snapshot.headline, "وضعیت جدید") + self.assertEqual(snapshot.status_level, "warning") + self.assertIsNotNone(snapshot.last_ai_synced_at) + self.assertIsNotNone(snapshot.last_source_update_at) + + persisted_notification = FarmNotification.objects.filter( + farm=self.farm, + title="افت رطوبت خاک", + endpoint="tracker", + ).latest("id") + self.assertEqual(persisted_notification.metadata["source"], "farm_alerts_tracker_ai") + + @patch("farm_alerts.services.external_api_request") + def test_sync_task_skips_ai_when_no_new_data_exists(self, mock_external_api_request): + snapshot = FarmAlertTrackerSnapshot.objects.create( + farm=self.farm, + last_ai_synced_at=timezone.now(), + last_source_update_at=timezone.now(), + ) + notification = FarmNotification.objects.create( + farm=self.farm, + endpoint="irrigation", + title="Irrigation reminder", + message="Run irrigation cycle", + level="warning", + ) + FarmNotification.objects.filter(id=notification.id).update(updated_at=snapshot.last_source_update_at - timedelta(minutes=1)) + + result = sync_farm_tracker_with_ai(farm=self.farm) + + self.assertEqual(result["status"], "skipped") + self.assertEqual(result["reason"], "no_changes") + mock_external_api_request.assert_not_called() diff --git a/farm_alerts/views.py b/farm_alerts/views.py index cbf974f..39ed222 100644 --- a/farm_alerts/views.py +++ b/farm_alerts/views.py @@ -1,3 +1,5 @@ +import logging + from drf_spectacular.utils import OpenApiExample, extend_schema from rest_framework import serializers, status from rest_framework.permissions import IsAuthenticated @@ -5,45 +7,17 @@ from rest_framework.response import Response from rest_framework.views import APIView from config.swagger import code_response -from external_api_adapter import request as external_api_request from farm_hub.models import FarmHub from .serializers import AlertTrackerAIResponseSerializer, FarmAlertsTrackerRequestSerializer -from .services import AlertService, build_tracker_context, build_tracker_response +from .services import AlertService, build_tracker_response_from_snapshot + +logger = logging.getLogger("farm_alerts") class FarmAlertsBaseView(APIView): permission_classes = [IsAuthenticated] - @staticmethod - def _extract_result(adapter_data): - if not isinstance(adapter_data, dict): - return {} - - data = adapter_data.get("data") - if isinstance(data, dict) and isinstance(data.get("result"), dict): - return data["result"] - if isinstance(data, dict): - return data - - result = adapter_data.get("result") - if isinstance(result, dict): - return result - - return adapter_data - - @staticmethod - def _error_response(adapter_response): - response_data = ( - adapter_response.data - if isinstance(adapter_response.data, dict) - else {"message": str(adapter_response.data)} - ) - return Response( - {"code": adapter_response.status_code, "msg": "error", "data": response_data}, - status=adapter_response.status_code, - ) - @staticmethod def _get_farm(request, farm_uuid): if not farm_uuid: @@ -63,16 +37,6 @@ class AlertTrackerView(FarmAlertsBaseView): "Tracker Request", value={ "farm_uuid": "11111111-1111-1111-1111-111111111111", - "alerts": [ - { - "alert_id": "soil-moisture-001", - "level": "warning", - "title": "افت رطوبت خاک", - "message": "رطوبت خاک کمتر از حد مطلوب گزارش شده است.", - "suggested_action": "آبیاری اصلاحی بررسی شود.", - "source_metric_type": "moisture", - } - ], }, request_only=True, ) @@ -84,20 +48,17 @@ class AlertTrackerView(FarmAlertsBaseView): request_serializer.is_valid(raise_exception=True) farm = self._get_farm(request, request_serializer.validated_data["farm_uuid"]) - incoming_alerts = request_serializer.validated_data.get("alerts", []) - AlertService.persist_incoming_alerts(farm=farm, alerts=incoming_alerts) - - tracker_payload = build_tracker_context(farm=farm, alerts=incoming_alerts) - adapter_response = external_api_request( - "ai", - "/api/farm-alerts/tracker/", - method="POST", - payload=tracker_payload, + logger.info( + "tracker endpoint received request farm=%s payload=%s", + farm.farm_uuid, + request.data, ) - if adapter_response.status_code >= 400: - return self._error_response(adapter_response) - payload = self._extract_result(adapter_response.data) - response_data = build_tracker_response(farm=farm, adapter_payload=payload) + response_data = build_tracker_response_from_snapshot(farm=farm) + logger.info( + "tracker endpoint returning cached response farm=%s response=%s", + farm.farm_uuid, + response_data, + ) serializer = AlertTrackerAIResponseSerializer(instance=response_data) return Response({"code": 200, "msg": "success", "data": serializer.data}, status=status.HTTP_200_OK)