diff --git a/.env.example b/.env.example index e9a0485..27134f0 100644 --- a/.env.example +++ b/.env.example @@ -13,3 +13,10 @@ DB_PORT=3306 # Optional: for running manage.py from host (local DB) # DB_HOST=127.0.0.1 + +# Cassandra +CASSANDRA_ENABLED=1 +CASSANDRA_HOSTS=cassandra +CASSANDRA_PORT=9042 +CASSANDRA_KEYSPACE=sensor_hub +CASSANDRA_REPLICATION={'class': 'SimpleStrategy', 'replication_factor': 1} diff --git a/config/settings.py b/config/settings.py index 3a4b2fe..467b490 100644 --- a/config/settings.py +++ b/config/settings.py @@ -18,10 +18,7 @@ INSTALLED_APPS = [ "django.contrib.sessions", "django.contrib.messages", "django.contrib.staticfiles", - "auth.apps.AuthConfig", - "account", - "sensor_hub", - "dashboard", + "ingest", "rest_framework", "corsheaders", ] @@ -98,9 +95,11 @@ REST_FRAMEWORK = { "DEFAULT_PERMISSION_CLASSES": [ "rest_framework.permissions.AllowAny", ], - "DEFAULT_AUTHENTICATION_CLASSES": [ - "rest_framework_simplejwt.authentication.JWTAuthentication", - ], } +if "rest_framework_simplejwt" in INSTALLED_APPS: + REST_FRAMEWORK["DEFAULT_AUTHENTICATION_CLASSES"] = [ + "rest_framework_simplejwt.authentication.JWTAuthentication", + ] + CORS_ALLOW_ALL_ORIGINS = DEBUG diff --git a/config/urls.py b/config/urls.py index 60db6b8..c215692 100644 --- a/config/urls.py +++ b/config/urls.py @@ -3,9 +3,5 @@ from django.urls import include, path urlpatterns = [ path("admin/", admin.site.urls), - path("api/auth/", include("auth.urls")), - path("api/account/", include("account.urls")), - path("api/sensor-hub/", include("sensor_hub.urls")), - path("api/farm-dashboard-config/", include("dashboard.urls_config")), - path("api/farm-dashboard/", include("dashboard.urls")), + path("api/ingest/", include("ingest.urls")), ] diff --git a/docker-compose.yaml b/docker-compose.yaml index 18b667d..3146e4c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,6 +2,19 @@ name: sensor-hub services: + cassandra: + image: docker-mirror.liara.ir/cassandra:5.0 + container_name: sensor-hub-cassandra + ports: + - "9042:9042" + volumes: + - sensor_hub_cassandra_data:/var/lib/cassandra + healthcheck: + test: ["CMD-SHELL", "cqlsh -e 'DESCRIBE KEYSPACES' || exit 1"] + interval: 20s + timeout: 10s + retries: 10 + db: image: docker-mirror.liara.ir/mysql:8.0 container_name: sensor-hub-db @@ -43,9 +56,13 @@ services: - .env environment: DB_HOST: db + CASSANDRA_HOSTS: cassandra depends_on: db: condition: service_healthy + cassandra: + condition: service_started volumes: sensor_hub_mysql_data: + sensor_hub_cassandra_data: diff --git a/ingest/__init__.py b/ingest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ingest/admin.py b/ingest/admin.py new file mode 100644 index 0000000..09de4be --- /dev/null +++ b/ingest/admin.py @@ -0,0 +1,10 @@ +from django.contrib import admin + +from .models import SensorDevice + + +@admin.register(SensorDevice) +class SensorDeviceAdmin(admin.ModelAdmin): + list_display = ("device_identifier", "sensor_uuid", "is_active", "created_at") + search_fields = ("device_identifier", "sensor_uuid") + list_filter = ("is_active", "created_at") diff --git a/ingest/apps.py b/ingest/apps.py new file mode 100644 index 0000000..3108d2e --- /dev/null +++ b/ingest/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class IngestConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "ingest" diff --git a/ingest/cassandra.py b/ingest/cassandra.py new file mode 100644 index 0000000..7a009a4 --- /dev/null +++ b/ingest/cassandra.py @@ -0,0 +1,108 @@ +import logging +import os +from typing import Any + +logger = logging.getLogger(__name__) + +try: + from cassandra.cluster import Cluster + from cassandra.query import dict_factory +except Exception: + Cluster = None + dict_factory = None + + +class CassandraService: + def __init__(self) -> None: + self.enabled = os.environ.get("CASSANDRA_ENABLED", "0") == "1" + self.contact_points = [ + item.strip() + for item in os.environ.get("CASSANDRA_HOSTS", "cassandra").split(",") + if item.strip() + ] + self.port = int(os.environ.get("CASSANDRA_PORT", "9042")) + self.keyspace = os.environ.get("CASSANDRA_KEYSPACE", "sensor_hub") + self.replication = os.environ.get( + "CASSANDRA_REPLICATION", + "{'class': 'SimpleStrategy', 'replication_factor': 1}", + ) + self._session = None + + def is_available(self) -> bool: + return self.enabled and Cluster is not None + + def connect(self): + if self._session is not None: + return self._session + if not self.is_available(): + return None + + cluster = Cluster(contact_points=self.contact_points, port=self.port) + session = cluster.connect() + if dict_factory is not None: + session.row_factory = dict_factory + session.execute( + f"CREATE KEYSPACE IF NOT EXISTS {self.keyspace} " + f"WITH replication = {self.replication}" + ) + session.set_keyspace(self.keyspace) + session.execute( + """ + CREATE TABLE IF NOT EXISTS sensor_payloads ( + sensor_uuid text, + reading_at timestamp, + payload_id uuid, + original_payload text, + translated_payload text, + translation_status text, + created_at timestamp, + PRIMARY KEY ((sensor_uuid), reading_at, payload_id) + ) WITH CLUSTERING ORDER BY (reading_at DESC, payload_id DESC) + """ + ) + self._session = session + return self._session + + def save_payload(self, *, sensor_uuid: str, payload_id: Any, reading_at, original_payload: str, translated_payload: str, translation_status: str, created_at): + session = self.connect() + if session is None: + logger.warning("Cassandra is disabled or unavailable; skipping payload persistence.") + return False + + session.execute( + """ + INSERT INTO sensor_payloads ( + sensor_uuid, + reading_at, + payload_id, + original_payload, + translated_payload, + translation_status, + created_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s) + """, + ( + sensor_uuid, + reading_at, + payload_id, + original_payload, + translated_payload, + translation_status, + created_at, + ), + ) + return True + + def get_payloads(self, sensor_uuid: str, limit: int = 50): + session = self.connect() + if session is None: + return [] + rows = session.execute( + "SELECT sensor_uuid, reading_at, payload_id, original_payload, translated_payload, translation_status, created_at " + "FROM sensor_payloads WHERE sensor_uuid = %s LIMIT %s", + (sensor_uuid, limit), + ) + return list(rows) + + +cassandra_service = CassandraService() diff --git a/ingest/migrations/0001_initial.py b/ingest/migrations/0001_initial.py new file mode 100644 index 0000000..2fa08eb --- /dev/null +++ b/ingest/migrations/0001_initial.py @@ -0,0 +1,27 @@ +from django.db import migrations, models +import uuid + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="SensorDevice", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("sensor_uuid", models.UUIDField(default=uuid.uuid4, editable=False, unique=True)), + ("device_identifier", models.CharField(max_length=255, unique=True)), + ("device_name", models.CharField(blank=True, max_length=255)), + ("handshake_token", models.CharField(blank=True, max_length=255)), + ("metadata", models.JSONField(blank=True, default=dict)), + ("translation_config", models.JSONField(blank=True, default=dict)), + ("is_active", models.BooleanField(default=True)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ], + options={"ordering": ["-created_at"]}, + ), + ] diff --git a/ingest/migrations/__init__.py b/ingest/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ingest/models.py b/ingest/models.py new file mode 100644 index 0000000..96f7e53 --- /dev/null +++ b/ingest/models.py @@ -0,0 +1,21 @@ +import uuid + +from django.db import models + + +class SensorDevice(models.Model): + sensor_uuid = models.UUIDField(default=uuid.uuid4, unique=True, editable=False) + device_identifier = models.CharField(max_length=255, unique=True) + device_name = models.CharField(max_length=255, blank=True) + handshake_token = models.CharField(max_length=255, blank=True) + metadata = models.JSONField(default=dict, blank=True) + translation_config = models.JSONField(default=dict, blank=True) + is_active = models.BooleanField(default=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + ordering = ["-created_at"] + + def __str__(self) -> str: + return f"{self.device_identifier} ({self.sensor_uuid})" diff --git a/ingest/serializers.py b/ingest/serializers.py new file mode 100644 index 0000000..9f7e114 --- /dev/null +++ b/ingest/serializers.py @@ -0,0 +1,38 @@ +from rest_framework import serializers + +from .models import SensorDevice + + +class HandshakeSerializer(serializers.Serializer): + device_identifier = serializers.CharField(max_length=255) + device_name = serializers.CharField(max_length=255, required=False, allow_blank=True) + handshake_token = serializers.CharField(max_length=255, required=False, allow_blank=True) + metadata = serializers.JSONField(required=False) + translation_config = serializers.JSONField(required=False) + + +class SensorIngestSerializer(serializers.Serializer): + sensor_uuid = serializers.UUIDField() + reading_at = serializers.DateTimeField(required=False) + payload = serializers.JSONField() + + +class SensorPayloadQuerySerializer(serializers.Serializer): + limit = serializers.IntegerField(required=False, min_value=1, max_value=500, default=50) + + +class SensorDeviceSerializer(serializers.ModelSerializer): + class Meta: + model = SensorDevice + fields = ( + "sensor_uuid", + "device_identifier", + "device_name", + "handshake_token", + "metadata", + "translation_config", + "is_active", + "created_at", + "updated_at", + ) + read_only_fields = ("sensor_uuid", "created_at", "updated_at") diff --git a/ingest/services.py b/ingest/services.py new file mode 100644 index 0000000..3e1940b --- /dev/null +++ b/ingest/services.py @@ -0,0 +1,10 @@ +from copy import deepcopy + + +class TranslationService: + def translate(self, payload: dict, config: dict | None = None) -> tuple[dict, str]: + translated = deepcopy(payload) + return translated, "bypassed" + + +translation_service = TranslationService() diff --git a/ingest/tests.py b/ingest/tests.py new file mode 100644 index 0000000..c3a1cb9 --- /dev/null +++ b/ingest/tests.py @@ -0,0 +1,43 @@ +from unittest.mock import patch + +from django.test import TestCase +from rest_framework.test import APIClient + +from .models import SensorDevice + + +class IngestApiTests(TestCase): + def setUp(self): + self.client = APIClient() + + def test_handshake_returns_sensor_uuid(self): + response = self.client.post( + "/api/ingest/devices/handshake/", + { + "device_identifier": "device-001", + "device_name": "Field Sensor", + "metadata": {"farm": "north"}, + }, + format="json", + ) + + self.assertEqual(response.status_code, 201) + self.assertIn("sensor_uuid", response.data) + self.assertTrue(SensorDevice.objects.filter(device_identifier="device-001").exists()) + + @patch("ingest.views.cassandra_service.save_payload", return_value=True) + def test_ingest_uses_sensor_uuid(self, _mock_save): + device = SensorDevice.objects.create(device_identifier="device-002") + + response = self.client.post( + "/api/ingest/payloads/ingest/", + { + "sensor_uuid": str(device.sensor_uuid), + "payload": {"temperature": 24.1}, + }, + format="json", + ) + + self.assertEqual(response.status_code, 201) + self.assertEqual(response.data["sensor_uuid"], str(device.sensor_uuid)) + self.assertEqual(response.data["stored_in_cassandra"], True) diff --git a/ingest/urls.py b/ingest/urls.py new file mode 100644 index 0000000..5cb132e --- /dev/null +++ b/ingest/urls.py @@ -0,0 +1,9 @@ +from django.urls import path + +from .views import DeviceHandshakeView, SensorIngestView, SensorPayloadListView + +urlpatterns = [ + path("devices/handshake/", DeviceHandshakeView.as_view(), name="device-handshake"), + path("payloads/ingest/", SensorIngestView.as_view(), name="sensor-ingest"), + path("payloads//", SensorPayloadListView.as_view(), name="sensor-payload-list"), +] diff --git a/ingest/views.py b/ingest/views.py new file mode 100644 index 0000000..8465fbc --- /dev/null +++ b/ingest/views.py @@ -0,0 +1,106 @@ +import json +import uuid +from datetime import timezone as dt_timezone + +from django.shortcuts import get_object_or_404 +from django.utils import timezone +from rest_framework import status +from rest_framework.response import Response +from rest_framework.views import APIView + +from .cassandra import cassandra_service +from .models import SensorDevice +from .serializers import ( + HandshakeSerializer, + SensorDeviceSerializer, + SensorIngestSerializer, + SensorPayloadQuerySerializer, +) +from .services import translation_service + + +class DeviceHandshakeView(APIView): + def post(self, request): + serializer = HandshakeSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + data = serializer.validated_data + + device, created = SensorDevice.objects.update_or_create( + device_identifier=data["device_identifier"], + defaults={ + "device_name": data.get("device_name", ""), + "handshake_token": data.get("handshake_token", ""), + "metadata": data.get("metadata", {}), + "translation_config": data.get("translation_config", {}), + "is_active": True, + }, + ) + + response_status = status.HTTP_201_CREATED if created else status.HTTP_200_OK + return Response( + { + "message": "Handshake completed successfully.", + "sensor_uuid": str(device.sensor_uuid), + "device": SensorDeviceSerializer(device).data, + }, + status=response_status, + ) + + +class SensorIngestView(APIView): + def post(self, request): + serializer = SensorIngestSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + data = serializer.validated_data + + device = get_object_or_404(SensorDevice, sensor_uuid=data["sensor_uuid"], is_active=True) + reading_at = data.get("reading_at") or timezone.now() + if reading_at.tzinfo is None: + reading_at = reading_at.replace(tzinfo=dt_timezone.utc) + + translated_payload, translation_status = translation_service.translate( + data["payload"], + config=device.translation_config, + ) + payload_id = uuid.uuid4() + created_at = timezone.now() + + stored = cassandra_service.save_payload( + sensor_uuid=str(device.sensor_uuid), + payload_id=payload_id, + reading_at=reading_at, + original_payload=json.dumps(data["payload"], ensure_ascii=False), + translated_payload=json.dumps(translated_payload, ensure_ascii=False), + translation_status=translation_status, + created_at=created_at, + ) + + return Response( + { + "message": "Payload processed successfully.", + "sensor_uuid": str(device.sensor_uuid), + "payload_id": str(payload_id), + "translation_status": translation_status, + "stored_in_cassandra": stored, + }, + status=status.HTTP_201_CREATED, + ) + + +class SensorPayloadListView(APIView): + def get(self, request, sensor_uuid): + query_serializer = SensorPayloadQuerySerializer(data=request.query_params) + query_serializer.is_valid(raise_exception=True) + limit = query_serializer.validated_data["limit"] + + device = get_object_or_404(SensorDevice, sensor_uuid=sensor_uuid, is_active=True) + rows = cassandra_service.get_payloads(str(device.sensor_uuid), limit=limit) + + return Response( + { + "sensor_uuid": str(device.sensor_uuid), + "device_identifier": device.device_identifier, + "count": len(rows), + "results": rows, + } + ) diff --git a/requirements.txt b/requirements.txt index 1268019..e7d1cd8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,5 @@ gunicorn>=22,<23 python-dotenv>=1.0,<1.1 + +cassandra-driver>=3.29,<3.30