From 26a587fe09f3d1216eb40fe06a25034bcdf4c5e5 Mon Sep 17 00:00:00 2001 From: Mohammad Sajad Pourajam Date: Sun, 5 Apr 2026 00:57:50 +0330 Subject: [PATCH] UPDATE --- config/urls.py | 3 + docker-compose-prod.yaml | 19 ++ docker-compose.yaml | 19 ++ ingest/admin.py | 10 - ingest/apps.py | 6 - ingest/cassandra.py | 108 ----------- ingest/constants.py | 14 ++ ingest/{migrations => management}/__init__.py | 0 ingest/management/commands/__init__.py | 0 .../management/commands/send_sensor_data.py | 72 +++++++ ingest/migrations/0001_initial.py | 27 --- ingest/models.py | 21 -- ingest/serializers.py | 38 ---- ingest/services.py | 10 - ingest/templates/ingest/index.html | 166 ++++++++++++++++ ingest/tests.py | 43 ----- ingest/urls.py | 6 +- ingest/views.py | 180 +++++++++--------- 18 files changed, 381 insertions(+), 361 deletions(-) delete mode 100644 ingest/admin.py delete mode 100644 ingest/apps.py delete mode 100644 ingest/cassandra.py create mode 100644 ingest/constants.py rename ingest/{migrations => management}/__init__.py (100%) create mode 100644 ingest/management/commands/__init__.py create mode 100644 ingest/management/commands/send_sensor_data.py delete mode 100644 ingest/migrations/0001_initial.py delete mode 100644 ingest/models.py delete mode 100644 ingest/serializers.py delete mode 100644 ingest/services.py create mode 100644 ingest/templates/ingest/index.html delete mode 100644 ingest/tests.py diff --git a/config/urls.py b/config/urls.py index c215692..4d33509 100644 --- a/config/urls.py +++ b/config/urls.py @@ -1,7 +1,10 @@ from django.contrib import admin from django.urls import include, path +from ingest.views import SensorSimulatorAppView + urlpatterns = [ path("admin/", admin.site.urls), + path("", SensorSimulatorAppView.as_view(), name="home"), path("api/ingest/", include("ingest.urls")), ] diff --git a/docker-compose-prod.yaml b/docker-compose-prod.yaml index 09dcd22..d971535 100644 --- a/docker-compose-prod.yaml +++ b/docker-compose-prod.yaml @@ -36,6 +36,25 @@ services: networks: - sensor-network + sensor-sender: + build: + context: . + dockerfile: Dockerfile + container_name: sensor-hub-sender + command: python manage.py send_sensor_data + restart: always + env_file: + - .env + environment: + DB_HOST: db + depends_on: + web: + condition: service_started + db: + condition: service_healthy + networks: + - sensor-network + volumes: sensor_hub_mysql_data: diff --git a/docker-compose.yaml b/docker-compose.yaml index 3146e4c..d6eee6f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -63,6 +63,25 @@ services: cassandra: condition: service_started + sensor-sender: + build: . + container_name: sensor-hub-sender + command: python manage.py send_sensor_data + volumes: + - .:/app + env_file: + - .env + environment: + DB_HOST: db + CASSANDRA_HOSTS: cassandra + depends_on: + web: + condition: service_started + db: + condition: service_healthy + cassandra: + condition: service_started + volumes: sensor_hub_mysql_data: sensor_hub_cassandra_data: diff --git a/ingest/admin.py b/ingest/admin.py deleted file mode 100644 index 09de4be..0000000 --- a/ingest/admin.py +++ /dev/null @@ -1,10 +0,0 @@ -from django.contrib import admin - -from .models import SensorDevice - - -@admin.register(SensorDevice) -class SensorDeviceAdmin(admin.ModelAdmin): - list_display = ("device_identifier", "sensor_uuid", "is_active", "created_at") - search_fields = ("device_identifier", "sensor_uuid") - list_filter = ("is_active", "created_at") diff --git a/ingest/apps.py b/ingest/apps.py deleted file mode 100644 index 3108d2e..0000000 --- a/ingest/apps.py +++ /dev/null @@ -1,6 +0,0 @@ -from django.apps import AppConfig - - -class IngestConfig(AppConfig): - default_auto_field = "django.db.models.BigAutoField" - name = "ingest" diff --git a/ingest/cassandra.py b/ingest/cassandra.py deleted file mode 100644 index 7a009a4..0000000 --- a/ingest/cassandra.py +++ /dev/null @@ -1,108 +0,0 @@ -import logging -import os -from typing import Any - -logger = logging.getLogger(__name__) - -try: - from cassandra.cluster import Cluster - from cassandra.query import dict_factory -except Exception: - Cluster = None - dict_factory = None - - -class CassandraService: - def __init__(self) -> None: - self.enabled = os.environ.get("CASSANDRA_ENABLED", "0") == "1" - self.contact_points = [ - item.strip() - for item in os.environ.get("CASSANDRA_HOSTS", "cassandra").split(",") - if item.strip() - ] - self.port = int(os.environ.get("CASSANDRA_PORT", "9042")) - self.keyspace = os.environ.get("CASSANDRA_KEYSPACE", "sensor_hub") - self.replication = os.environ.get( - "CASSANDRA_REPLICATION", - "{'class': 'SimpleStrategy', 'replication_factor': 1}", - ) - self._session = None - - def is_available(self) -> bool: - return self.enabled and Cluster is not None - - def connect(self): - if self._session is not None: - return self._session - if not self.is_available(): - return None - - cluster = Cluster(contact_points=self.contact_points, port=self.port) - session = cluster.connect() - if dict_factory is not None: - session.row_factory = dict_factory - session.execute( - f"CREATE KEYSPACE IF NOT EXISTS {self.keyspace} " - f"WITH replication = {self.replication}" - ) - session.set_keyspace(self.keyspace) - session.execute( - """ - CREATE TABLE IF NOT EXISTS sensor_payloads ( - sensor_uuid text, - reading_at timestamp, - payload_id uuid, - original_payload text, - translated_payload text, - translation_status text, - created_at timestamp, - PRIMARY KEY ((sensor_uuid), reading_at, payload_id) - ) WITH CLUSTERING ORDER BY (reading_at DESC, payload_id DESC) - """ - ) - self._session = session - return self._session - - def save_payload(self, *, sensor_uuid: str, payload_id: Any, reading_at, original_payload: str, translated_payload: str, translation_status: str, created_at): - session = self.connect() - if session is None: - logger.warning("Cassandra is disabled or unavailable; skipping payload persistence.") - return False - - session.execute( - """ - INSERT INTO sensor_payloads ( - sensor_uuid, - reading_at, - payload_id, - original_payload, - translated_payload, - translation_status, - created_at - ) VALUES (%s, %s, %s, %s, %s, %s, %s) - """, - ( - sensor_uuid, - reading_at, - payload_id, - original_payload, - translated_payload, - translation_status, - created_at, - ), - ) - return True - - def get_payloads(self, sensor_uuid: str, limit: int = 50): - session = self.connect() - if session is None: - return [] - rows = session.execute( - "SELECT sensor_uuid, reading_at, payload_id, original_payload, translated_payload, translation_status, created_at " - "FROM sensor_payloads WHERE sensor_uuid = %s LIMIT %s", - (sensor_uuid, limit), - ) - return list(rows) - - -cassandra_service = CassandraService() diff --git a/ingest/constants.py b/ingest/constants.py new file mode 100644 index 0000000..41f05fb --- /dev/null +++ b/ingest/constants.py @@ -0,0 +1,14 @@ +API_TARGET_URL = "http://backend-web:8000" +API_KEY = "12345" +REQUEST_INTERVAL_SECONDS = 10 + +STATIC_SENSOR_PAYLOAD = { + "uuid": "11111111111111111111", + "soil_moisture": 42.5, + "soil_temperature": 24.3, + "soil_ph": 6.8, + "soil_ec": 1.4, + "nitrogen": 32, + "phosphorus": 18, + "potassium": 27, +} diff --git a/ingest/migrations/__init__.py b/ingest/management/__init__.py similarity index 100% rename from ingest/migrations/__init__.py rename to ingest/management/__init__.py diff --git a/ingest/management/commands/__init__.py b/ingest/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ingest/management/commands/send_sensor_data.py b/ingest/management/commands/send_sensor_data.py new file mode 100644 index 0000000..9489807 --- /dev/null +++ b/ingest/management/commands/send_sensor_data.py @@ -0,0 +1,72 @@ +import json +import time +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +from django.core.management.base import BaseCommand + +from ingest.constants import API_KEY, API_TARGET_URL, REQUEST_INTERVAL_SECONDS, STATIC_SENSOR_PAYLOAD + + +class Command(BaseCommand): + help = "Send the static soil sensor payload to the upstream API every 10 seconds." + + def add_arguments(self, parser): + parser.add_argument( + "--once", + action="store_true", + help="Send the request once and exit.", + ) + + def handle(self, *args, **options): + run_once = options["once"] + + self.stdout.write( + self.style.SUCCESS( + f"Starting sensor sender -> {API_TARGET_URL} (interval: {REQUEST_INTERVAL_SECONDS}s)" + ) + ) + + while True: + self.send_payload() + if run_once: + break + time.sleep(REQUEST_INTERVAL_SECONDS) + + def send_payload(self): + body = json.dumps(STATIC_SENSOR_PAYLOAD).encode("utf-8") + request = Request( + API_TARGET_URL, + data=body, + headers={ + "Content-Type": "application/json", + "api_key": API_KEY, + }, + method="POST", + ) + + try: + with urlopen(request, timeout=15) as response: + response_body = response.read().decode("utf-8", errors="replace") + self.stdout.write( + self.style.SUCCESS( + f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Sent payload successfully - status {response.status}" + ) + ) + if response_body: + self.stdout.write(response_body) + except HTTPError as exc: + error_body = exc.read().decode("utf-8", errors="replace") + self.stderr.write( + self.style.ERROR( + f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Upstream error - status {exc.code}" + ) + ) + if error_body: + self.stderr.write(error_body) + except URLError as exc: + self.stderr.write( + self.style.ERROR( + f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connection error - {exc.reason}" + ) + ) diff --git a/ingest/migrations/0001_initial.py b/ingest/migrations/0001_initial.py deleted file mode 100644 index 2fa08eb..0000000 --- a/ingest/migrations/0001_initial.py +++ /dev/null @@ -1,27 +0,0 @@ -from django.db import migrations, models -import uuid - - -class Migration(migrations.Migration): - initial = True - - dependencies = [] - - operations = [ - migrations.CreateModel( - name="SensorDevice", - fields=[ - ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), - ("sensor_uuid", models.UUIDField(default=uuid.uuid4, editable=False, unique=True)), - ("device_identifier", models.CharField(max_length=255, unique=True)), - ("device_name", models.CharField(blank=True, max_length=255)), - ("handshake_token", models.CharField(blank=True, max_length=255)), - ("metadata", models.JSONField(blank=True, default=dict)), - ("translation_config", models.JSONField(blank=True, default=dict)), - ("is_active", models.BooleanField(default=True)), - ("created_at", models.DateTimeField(auto_now_add=True)), - ("updated_at", models.DateTimeField(auto_now=True)), - ], - options={"ordering": ["-created_at"]}, - ), - ] diff --git a/ingest/models.py b/ingest/models.py deleted file mode 100644 index 96f7e53..0000000 --- a/ingest/models.py +++ /dev/null @@ -1,21 +0,0 @@ -import uuid - -from django.db import models - - -class SensorDevice(models.Model): - sensor_uuid = models.UUIDField(default=uuid.uuid4, unique=True, editable=False) - device_identifier = models.CharField(max_length=255, unique=True) - device_name = models.CharField(max_length=255, blank=True) - handshake_token = models.CharField(max_length=255, blank=True) - metadata = models.JSONField(default=dict, blank=True) - translation_config = models.JSONField(default=dict, blank=True) - is_active = models.BooleanField(default=True) - created_at = models.DateTimeField(auto_now_add=True) - updated_at = models.DateTimeField(auto_now=True) - - class Meta: - ordering = ["-created_at"] - - def __str__(self) -> str: - return f"{self.device_identifier} ({self.sensor_uuid})" diff --git a/ingest/serializers.py b/ingest/serializers.py deleted file mode 100644 index 9f7e114..0000000 --- a/ingest/serializers.py +++ /dev/null @@ -1,38 +0,0 @@ -from rest_framework import serializers - -from .models import SensorDevice - - -class HandshakeSerializer(serializers.Serializer): - device_identifier = serializers.CharField(max_length=255) - device_name = serializers.CharField(max_length=255, required=False, allow_blank=True) - handshake_token = serializers.CharField(max_length=255, required=False, allow_blank=True) - metadata = serializers.JSONField(required=False) - translation_config = serializers.JSONField(required=False) - - -class SensorIngestSerializer(serializers.Serializer): - sensor_uuid = serializers.UUIDField() - reading_at = serializers.DateTimeField(required=False) - payload = serializers.JSONField() - - -class SensorPayloadQuerySerializer(serializers.Serializer): - limit = serializers.IntegerField(required=False, min_value=1, max_value=500, default=50) - - -class SensorDeviceSerializer(serializers.ModelSerializer): - class Meta: - model = SensorDevice - fields = ( - "sensor_uuid", - "device_identifier", - "device_name", - "handshake_token", - "metadata", - "translation_config", - "is_active", - "created_at", - "updated_at", - ) - read_only_fields = ("sensor_uuid", "created_at", "updated_at") diff --git a/ingest/services.py b/ingest/services.py deleted file mode 100644 index 3e1940b..0000000 --- a/ingest/services.py +++ /dev/null @@ -1,10 +0,0 @@ -from copy import deepcopy - - -class TranslationService: - def translate(self, payload: dict, config: dict | None = None) -> tuple[dict, str]: - translated = deepcopy(payload) - return translated, "bypassed" - - -translation_service = TranslationService() diff --git a/ingest/templates/ingest/index.html b/ingest/templates/ingest/index.html new file mode 100644 index 0000000..8e14672 --- /dev/null +++ b/ingest/templates/ingest/index.html @@ -0,0 +1,166 @@ + + + + + + شبیه ساز سنسور خاک + + + +
+
+

ارسال استاتیک داده سنسور خاک

+

+ این صفحه بدون هیچ اتصال به دیتابیس، یک payload استاتیک از داده های سنسور خاک را با متد POST + و هدر api_key به API مقصد ارسال می کند. +

+
+ +
+
+ + + + + + + + +

+ فیلدها شامل uuid، رطوبت خاک، دمای خاک، pH، EC، نیتروژن، فسفر و پتاسیم هستند و فعلا همه به صورت استاتیک تعریف شده اند. +

+ +
+ +
+
+ +
+ +
هنوز درخواستی ارسال نشده است.
+

+ در پاسخ، payload ارسالی، هدرهای ارسال شده و پاسخ API مقصد نمایش داده می شود. +

+
+
+
+ + + + diff --git a/ingest/tests.py b/ingest/tests.py deleted file mode 100644 index c3a1cb9..0000000 --- a/ingest/tests.py +++ /dev/null @@ -1,43 +0,0 @@ -from unittest.mock import patch - -from django.test import TestCase -from rest_framework.test import APIClient - -from .models import SensorDevice - - -class IngestApiTests(TestCase): - def setUp(self): - self.client = APIClient() - - def test_handshake_returns_sensor_uuid(self): - response = self.client.post( - "/api/ingest/devices/handshake/", - { - "device_identifier": "device-001", - "device_name": "Field Sensor", - "metadata": {"farm": "north"}, - }, - format="json", - ) - - self.assertEqual(response.status_code, 201) - self.assertIn("sensor_uuid", response.data) - self.assertTrue(SensorDevice.objects.filter(device_identifier="device-001").exists()) - - @patch("ingest.views.cassandra_service.save_payload", return_value=True) - def test_ingest_uses_sensor_uuid(self, _mock_save): - device = SensorDevice.objects.create(device_identifier="device-002") - - response = self.client.post( - "/api/ingest/payloads/ingest/", - { - "sensor_uuid": str(device.sensor_uuid), - "payload": {"temperature": 24.1}, - }, - format="json", - ) - - self.assertEqual(response.status_code, 201) - self.assertEqual(response.data["sensor_uuid"], str(device.sensor_uuid)) - self.assertEqual(response.data["stored_in_cassandra"], True) diff --git a/ingest/urls.py b/ingest/urls.py index 5cb132e..1a17e56 100644 --- a/ingest/urls.py +++ b/ingest/urls.py @@ -1,9 +1,7 @@ from django.urls import path -from .views import DeviceHandshakeView, SensorIngestView, SensorPayloadListView +from .views import ForwardSensorDataView urlpatterns = [ - path("devices/handshake/", DeviceHandshakeView.as_view(), name="device-handshake"), - path("payloads/ingest/", SensorIngestView.as_view(), name="sensor-ingest"), - path("payloads//", SensorPayloadListView.as_view(), name="sensor-payload-list"), + path("forward/", ForwardSensorDataView.as_view(), name="forward-sensor-data"), ] diff --git a/ingest/views.py b/ingest/views.py index 8465fbc..3db564e 100644 --- a/ingest/views.py +++ b/ingest/views.py @@ -1,106 +1,98 @@ import json -import uuid -from datetime import timezone as dt_timezone +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen -from django.shortcuts import get_object_or_404 -from django.utils import timezone -from rest_framework import status -from rest_framework.response import Response -from rest_framework.views import APIView +from django.http import HttpResponse, JsonResponse +from django.shortcuts import render +from django.views import View +from django.views.decorators.csrf import csrf_exempt +from django.utils.decorators import method_decorator -from .cassandra import cassandra_service -from .models import SensorDevice -from .serializers import ( - HandshakeSerializer, - SensorDeviceSerializer, - SensorIngestSerializer, - SensorPayloadQuerySerializer, -) -from .services import translation_service +from ingest.constants import API_KEY, API_TARGET_URL, STATIC_SENSOR_PAYLOAD -class DeviceHandshakeView(APIView): +class SensorSimulatorAppView(View): + def get(self, request): + return render( + request, + "ingest/index.html", + { + "default_payload": json.dumps(STATIC_SENSOR_PAYLOAD, indent=2), + "default_url": API_TARGET_URL, + "default_api_key": API_KEY, + }, + ) + + +@method_decorator(csrf_exempt, name="dispatch") +class ForwardSensorDataView(View): def post(self, request): - serializer = HandshakeSerializer(data=request.data) - serializer.is_valid(raise_exception=True) - data = serializer.validated_data + target_url = request.POST.get("target_url", "").strip() + api_key = request.POST.get("api_key", "").strip() - device, created = SensorDevice.objects.update_or_create( - device_identifier=data["device_identifier"], - defaults={ - "device_name": data.get("device_name", ""), - "handshake_token": data.get("handshake_token", ""), - "metadata": data.get("metadata", {}), - "translation_config": data.get("translation_config", {}), - "is_active": True, + if not target_url: + return JsonResponse({"error": "target_url is required"}, status=400) + if not api_key: + return JsonResponse({"error": "api_key is required"}, status=400) + + payload = STATIC_SENSOR_PAYLOAD + body = json.dumps(payload).encode("utf-8") + outbound_request = Request( + target_url, + data=body, + headers={ + "Content-Type": "application/json", + "api_key": api_key, }, + method="POST", ) - response_status = status.HTTP_201_CREATED if created else status.HTTP_200_OK - return Response( - { - "message": "Handshake completed successfully.", - "sensor_uuid": str(device.sensor_uuid), - "device": SensorDeviceSerializer(device).data, - }, - status=response_status, - ) + try: + with urlopen(outbound_request, timeout=15) as response: + response_body = response.read().decode("utf-8") + content_type = response.headers.get("Content-Type", "") + parsed_body = response_body + if "application/json" in content_type and response_body: + parsed_body = json.loads(response_body) + return JsonResponse( + { + "status": response.status, + "sent_headers": { + "Content-Type": "application/json", + "api_key": api_key, + }, + "sent_payload": payload, + "response": parsed_body, + } + ) + except HTTPError as exc: + error_body = exc.read().decode("utf-8", errors="replace") + return JsonResponse( + { + "error": "upstream returned an error", + "status": exc.code, + "sent_headers": { + "Content-Type": "application/json", + "api_key": api_key, + }, + "sent_payload": payload, + "response": error_body, + }, + status=502, + ) + except URLError as exc: + return JsonResponse( + { + "error": "could not reach upstream api", + "details": str(exc.reason), + "sent_headers": { + "Content-Type": "application/json", + "api_key": api_key, + }, + "sent_payload": payload, + }, + status=502, + ) -class SensorIngestView(APIView): - def post(self, request): - serializer = SensorIngestSerializer(data=request.data) - serializer.is_valid(raise_exception=True) - data = serializer.validated_data - - device = get_object_or_404(SensorDevice, sensor_uuid=data["sensor_uuid"], is_active=True) - reading_at = data.get("reading_at") or timezone.now() - if reading_at.tzinfo is None: - reading_at = reading_at.replace(tzinfo=dt_timezone.utc) - - translated_payload, translation_status = translation_service.translate( - data["payload"], - config=device.translation_config, - ) - payload_id = uuid.uuid4() - created_at = timezone.now() - - stored = cassandra_service.save_payload( - sensor_uuid=str(device.sensor_uuid), - payload_id=payload_id, - reading_at=reading_at, - original_payload=json.dumps(data["payload"], ensure_ascii=False), - translated_payload=json.dumps(translated_payload, ensure_ascii=False), - translation_status=translation_status, - created_at=created_at, - ) - - return Response( - { - "message": "Payload processed successfully.", - "sensor_uuid": str(device.sensor_uuid), - "payload_id": str(payload_id), - "translation_status": translation_status, - "stored_in_cassandra": stored, - }, - status=status.HTTP_201_CREATED, - ) - - -class SensorPayloadListView(APIView): - def get(self, request, sensor_uuid): - query_serializer = SensorPayloadQuerySerializer(data=request.query_params) - query_serializer.is_valid(raise_exception=True) - limit = query_serializer.validated_data["limit"] - - device = get_object_or_404(SensorDevice, sensor_uuid=sensor_uuid, is_active=True) - rows = cassandra_service.get_payloads(str(device.sensor_uuid), limit=limit) - - return Response( - { - "sensor_uuid": str(device.sensor_uuid), - "device_identifier": device.device_identifier, - "count": len(rows), - "results": rows, - } - ) + return HttpResponse(status=500)