diff --git a/config/urls.py b/config/urls.py
index c215692..4d33509 100644
--- a/config/urls.py
+++ b/config/urls.py
@@ -1,7 +1,10 @@
from django.contrib import admin
from django.urls import include, path
+from ingest.views import SensorSimulatorAppView
+
urlpatterns = [
path("admin/", admin.site.urls),
+ path("", SensorSimulatorAppView.as_view(), name="home"),
path("api/ingest/", include("ingest.urls")),
]
diff --git a/docker-compose-prod.yaml b/docker-compose-prod.yaml
index 09dcd22..d971535 100644
--- a/docker-compose-prod.yaml
+++ b/docker-compose-prod.yaml
@@ -36,6 +36,25 @@ services:
networks:
- sensor-network
+ sensor-sender:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ container_name: sensor-hub-sender
+ command: python manage.py send_sensor_data
+ restart: always
+ env_file:
+ - .env
+ environment:
+ DB_HOST: db
+ depends_on:
+ web:
+ condition: service_started
+ db:
+ condition: service_healthy
+ networks:
+ - sensor-network
+
volumes:
sensor_hub_mysql_data:
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 3146e4c..d6eee6f 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -63,6 +63,25 @@ services:
cassandra:
condition: service_started
+ sensor-sender:
+ build: .
+ container_name: sensor-hub-sender
+ command: python manage.py send_sensor_data
+ volumes:
+ - .:/app
+ env_file:
+ - .env
+ environment:
+ DB_HOST: db
+ CASSANDRA_HOSTS: cassandra
+ depends_on:
+ web:
+ condition: service_started
+ db:
+ condition: service_healthy
+ cassandra:
+ condition: service_started
+
volumes:
sensor_hub_mysql_data:
sensor_hub_cassandra_data:
diff --git a/ingest/admin.py b/ingest/admin.py
deleted file mode 100644
index 09de4be..0000000
--- a/ingest/admin.py
+++ /dev/null
@@ -1,10 +0,0 @@
-from django.contrib import admin
-
-from .models import SensorDevice
-
-
-@admin.register(SensorDevice)
-class SensorDeviceAdmin(admin.ModelAdmin):
- list_display = ("device_identifier", "sensor_uuid", "is_active", "created_at")
- search_fields = ("device_identifier", "sensor_uuid")
- list_filter = ("is_active", "created_at")
diff --git a/ingest/apps.py b/ingest/apps.py
deleted file mode 100644
index 3108d2e..0000000
--- a/ingest/apps.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from django.apps import AppConfig
-
-
-class IngestConfig(AppConfig):
- default_auto_field = "django.db.models.BigAutoField"
- name = "ingest"
diff --git a/ingest/cassandra.py b/ingest/cassandra.py
deleted file mode 100644
index 7a009a4..0000000
--- a/ingest/cassandra.py
+++ /dev/null
@@ -1,108 +0,0 @@
-import logging
-import os
-from typing import Any
-
-logger = logging.getLogger(__name__)
-
-try:
- from cassandra.cluster import Cluster
- from cassandra.query import dict_factory
-except Exception:
- Cluster = None
- dict_factory = None
-
-
-class CassandraService:
- def __init__(self) -> None:
- self.enabled = os.environ.get("CASSANDRA_ENABLED", "0") == "1"
- self.contact_points = [
- item.strip()
- for item in os.environ.get("CASSANDRA_HOSTS", "cassandra").split(",")
- if item.strip()
- ]
- self.port = int(os.environ.get("CASSANDRA_PORT", "9042"))
- self.keyspace = os.environ.get("CASSANDRA_KEYSPACE", "sensor_hub")
- self.replication = os.environ.get(
- "CASSANDRA_REPLICATION",
- "{'class': 'SimpleStrategy', 'replication_factor': 1}",
- )
- self._session = None
-
- def is_available(self) -> bool:
- return self.enabled and Cluster is not None
-
- def connect(self):
- if self._session is not None:
- return self._session
- if not self.is_available():
- return None
-
- cluster = Cluster(contact_points=self.contact_points, port=self.port)
- session = cluster.connect()
- if dict_factory is not None:
- session.row_factory = dict_factory
- session.execute(
- f"CREATE KEYSPACE IF NOT EXISTS {self.keyspace} "
- f"WITH replication = {self.replication}"
- )
- session.set_keyspace(self.keyspace)
- session.execute(
- """
- CREATE TABLE IF NOT EXISTS sensor_payloads (
- sensor_uuid text,
- reading_at timestamp,
- payload_id uuid,
- original_payload text,
- translated_payload text,
- translation_status text,
- created_at timestamp,
- PRIMARY KEY ((sensor_uuid), reading_at, payload_id)
- ) WITH CLUSTERING ORDER BY (reading_at DESC, payload_id DESC)
- """
- )
- self._session = session
- return self._session
-
- def save_payload(self, *, sensor_uuid: str, payload_id: Any, reading_at, original_payload: str, translated_payload: str, translation_status: str, created_at):
- session = self.connect()
- if session is None:
- logger.warning("Cassandra is disabled or unavailable; skipping payload persistence.")
- return False
-
- session.execute(
- """
- INSERT INTO sensor_payloads (
- sensor_uuid,
- reading_at,
- payload_id,
- original_payload,
- translated_payload,
- translation_status,
- created_at
- ) VALUES (%s, %s, %s, %s, %s, %s, %s)
- """,
- (
- sensor_uuid,
- reading_at,
- payload_id,
- original_payload,
- translated_payload,
- translation_status,
- created_at,
- ),
- )
- return True
-
- def get_payloads(self, sensor_uuid: str, limit: int = 50):
- session = self.connect()
- if session is None:
- return []
- rows = session.execute(
- "SELECT sensor_uuid, reading_at, payload_id, original_payload, translated_payload, translation_status, created_at "
- "FROM sensor_payloads WHERE sensor_uuid = %s LIMIT %s",
- (sensor_uuid, limit),
- )
- return list(rows)
-
-
-cassandra_service = CassandraService()
diff --git a/ingest/constants.py b/ingest/constants.py
new file mode 100644
index 0000000..41f05fb
--- /dev/null
+++ b/ingest/constants.py
@@ -0,0 +1,14 @@
+API_TARGET_URL = "http://backend-web:8000"
+API_KEY = "12345"
+REQUEST_INTERVAL_SECONDS = 10
+
+STATIC_SENSOR_PAYLOAD = {
+ "uuid": "11111111111111111111",
+ "soil_moisture": 42.5,
+ "soil_temperature": 24.3,
+ "soil_ph": 6.8,
+ "soil_ec": 1.4,
+ "nitrogen": 32,
+ "phosphorus": 18,
+ "potassium": 27,
+}
diff --git a/ingest/migrations/__init__.py b/ingest/management/__init__.py
similarity index 100%
rename from ingest/migrations/__init__.py
rename to ingest/management/__init__.py
diff --git a/ingest/management/commands/__init__.py b/ingest/management/commands/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/ingest/management/commands/send_sensor_data.py b/ingest/management/commands/send_sensor_data.py
new file mode 100644
index 0000000..9489807
--- /dev/null
+++ b/ingest/management/commands/send_sensor_data.py
@@ -0,0 +1,72 @@
+import json
+import time
+from urllib.error import HTTPError, URLError
+from urllib.request import Request, urlopen
+
+from django.core.management.base import BaseCommand
+
+from ingest.constants import API_KEY, API_TARGET_URL, REQUEST_INTERVAL_SECONDS, STATIC_SENSOR_PAYLOAD
+
+
+class Command(BaseCommand):
+ help = "Send the static soil sensor payload to the upstream API every 10 seconds."
+
+ def add_arguments(self, parser):
+ parser.add_argument(
+ "--once",
+ action="store_true",
+ help="Send the request once and exit.",
+ )
+
+ def handle(self, *args, **options):
+ run_once = options["once"]
+
+ self.stdout.write(
+ self.style.SUCCESS(
+ f"Starting sensor sender -> {API_TARGET_URL} (interval: {REQUEST_INTERVAL_SECONDS}s)"
+ )
+ )
+
+ while True:
+ self.send_payload()
+ if run_once:
+ break
+ time.sleep(REQUEST_INTERVAL_SECONDS)
+
+ def send_payload(self):
+ body = json.dumps(STATIC_SENSOR_PAYLOAD).encode("utf-8")
+ request = Request(
+ API_TARGET_URL,
+ data=body,
+ headers={
+ "Content-Type": "application/json",
+ "api_key": API_KEY,
+ },
+ method="POST",
+ )
+
+ try:
+ with urlopen(request, timeout=15) as response:
+ response_body = response.read().decode("utf-8", errors="replace")
+ self.stdout.write(
+ self.style.SUCCESS(
+ f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Sent payload successfully - status {response.status}"
+ )
+ )
+ if response_body:
+ self.stdout.write(response_body)
+ except HTTPError as exc:
+ error_body = exc.read().decode("utf-8", errors="replace")
+ self.stderr.write(
+ self.style.ERROR(
+ f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Upstream error - status {exc.code}"
+ )
+ )
+ if error_body:
+ self.stderr.write(error_body)
+ except URLError as exc:
+ self.stderr.write(
+ self.style.ERROR(
+ f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Connection error - {exc.reason}"
+ )
+ )
diff --git a/ingest/migrations/0001_initial.py b/ingest/migrations/0001_initial.py
deleted file mode 100644
index 2fa08eb..0000000
--- a/ingest/migrations/0001_initial.py
+++ /dev/null
@@ -1,27 +0,0 @@
-from django.db import migrations, models
-import uuid
-
-
-class Migration(migrations.Migration):
- initial = True
-
- dependencies = []
-
- operations = [
- migrations.CreateModel(
- name="SensorDevice",
- fields=[
- ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
- ("sensor_uuid", models.UUIDField(default=uuid.uuid4, editable=False, unique=True)),
- ("device_identifier", models.CharField(max_length=255, unique=True)),
- ("device_name", models.CharField(blank=True, max_length=255)),
- ("handshake_token", models.CharField(blank=True, max_length=255)),
- ("metadata", models.JSONField(blank=True, default=dict)),
- ("translation_config", models.JSONField(blank=True, default=dict)),
- ("is_active", models.BooleanField(default=True)),
- ("created_at", models.DateTimeField(auto_now_add=True)),
- ("updated_at", models.DateTimeField(auto_now=True)),
- ],
- options={"ordering": ["-created_at"]},
- ),
- ]
diff --git a/ingest/models.py b/ingest/models.py
deleted file mode 100644
index 96f7e53..0000000
--- a/ingest/models.py
+++ /dev/null
@@ -1,21 +0,0 @@
-import uuid
-
-from django.db import models
-
-
-class SensorDevice(models.Model):
- sensor_uuid = models.UUIDField(default=uuid.uuid4, unique=True, editable=False)
- device_identifier = models.CharField(max_length=255, unique=True)
- device_name = models.CharField(max_length=255, blank=True)
- handshake_token = models.CharField(max_length=255, blank=True)
- metadata = models.JSONField(default=dict, blank=True)
- translation_config = models.JSONField(default=dict, blank=True)
- is_active = models.BooleanField(default=True)
- created_at = models.DateTimeField(auto_now_add=True)
- updated_at = models.DateTimeField(auto_now=True)
-
- class Meta:
- ordering = ["-created_at"]
-
- def __str__(self) -> str:
- return f"{self.device_identifier} ({self.sensor_uuid})"
diff --git a/ingest/serializers.py b/ingest/serializers.py
deleted file mode 100644
index 9f7e114..0000000
--- a/ingest/serializers.py
+++ /dev/null
@@ -1,38 +0,0 @@
-from rest_framework import serializers
-
-from .models import SensorDevice
-
-
-class HandshakeSerializer(serializers.Serializer):
- device_identifier = serializers.CharField(max_length=255)
- device_name = serializers.CharField(max_length=255, required=False, allow_blank=True)
- handshake_token = serializers.CharField(max_length=255, required=False, allow_blank=True)
- metadata = serializers.JSONField(required=False)
- translation_config = serializers.JSONField(required=False)
-
-
-class SensorIngestSerializer(serializers.Serializer):
- sensor_uuid = serializers.UUIDField()
- reading_at = serializers.DateTimeField(required=False)
- payload = serializers.JSONField()
-
-
-class SensorPayloadQuerySerializer(serializers.Serializer):
- limit = serializers.IntegerField(required=False, min_value=1, max_value=500, default=50)
-
-
-class SensorDeviceSerializer(serializers.ModelSerializer):
- class Meta:
- model = SensorDevice
- fields = (
- "sensor_uuid",
- "device_identifier",
- "device_name",
- "handshake_token",
- "metadata",
- "translation_config",
- "is_active",
- "created_at",
- "updated_at",
- )
- read_only_fields = ("sensor_uuid", "created_at", "updated_at")
diff --git a/ingest/services.py b/ingest/services.py
deleted file mode 100644
index 3e1940b..0000000
--- a/ingest/services.py
+++ /dev/null
@@ -1,10 +0,0 @@
-from copy import deepcopy
-
-
-class TranslationService:
- def translate(self, payload: dict, config: dict | None = None) -> tuple[dict, str]:
- translated = deepcopy(payload)
- return translated, "bypassed"
-
-
-translation_service = TranslationService()
diff --git a/ingest/templates/ingest/index.html b/ingest/templates/ingest/index.html
new file mode 100644
index 0000000..8e14672
--- /dev/null
+++ b/ingest/templates/ingest/index.html
@@ -0,0 +1,166 @@
+
+
+
+
+
+ شبیه ساز سنسور خاک
+
+
+
+
+
+ ارسال استاتیک داده سنسور خاک
+
+ این صفحه بدون هیچ اتصال به دیتابیس، یک payload استاتیک از داده های سنسور خاک را با متد POST
+ و هدر api_key به API مقصد ارسال می کند.
+
+
+
+
+
+
+
+
+ هنوز درخواستی ارسال نشده است.
+
+ در پاسخ، payload ارسالی، هدرهای ارسال شده و پاسخ API مقصد نمایش داده می شود.
+
+
+
+
+
+
+
+
diff --git a/ingest/tests.py b/ingest/tests.py
deleted file mode 100644
index c3a1cb9..0000000
--- a/ingest/tests.py
+++ /dev/null
@@ -1,43 +0,0 @@
-from unittest.mock import patch
-
-from django.test import TestCase
-from rest_framework.test import APIClient
-
-from .models import SensorDevice
-
-
-class IngestApiTests(TestCase):
- def setUp(self):
- self.client = APIClient()
-
- def test_handshake_returns_sensor_uuid(self):
- response = self.client.post(
- "/api/ingest/devices/handshake/",
- {
- "device_identifier": "device-001",
- "device_name": "Field Sensor",
- "metadata": {"farm": "north"},
- },
- format="json",
- )
-
- self.assertEqual(response.status_code, 201)
- self.assertIn("sensor_uuid", response.data)
- self.assertTrue(SensorDevice.objects.filter(device_identifier="device-001").exists())
-
- @patch("ingest.views.cassandra_service.save_payload", return_value=True)
- def test_ingest_uses_sensor_uuid(self, _mock_save):
- device = SensorDevice.objects.create(device_identifier="device-002")
-
- response = self.client.post(
- "/api/ingest/payloads/ingest/",
- {
- "sensor_uuid": str(device.sensor_uuid),
- "payload": {"temperature": 24.1},
- },
- format="json",
- )
-
- self.assertEqual(response.status_code, 201)
- self.assertEqual(response.data["sensor_uuid"], str(device.sensor_uuid))
- self.assertEqual(response.data["stored_in_cassandra"], True)
diff --git a/ingest/urls.py b/ingest/urls.py
index 5cb132e..1a17e56 100644
--- a/ingest/urls.py
+++ b/ingest/urls.py
@@ -1,9 +1,7 @@
from django.urls import path
-from .views import DeviceHandshakeView, SensorIngestView, SensorPayloadListView
+from .views import ForwardSensorDataView
urlpatterns = [
- path("devices/handshake/", DeviceHandshakeView.as_view(), name="device-handshake"),
- path("payloads/ingest/", SensorIngestView.as_view(), name="sensor-ingest"),
- path("payloads//", SensorPayloadListView.as_view(), name="sensor-payload-list"),
+ path("forward/", ForwardSensorDataView.as_view(), name="forward-sensor-data"),
]
diff --git a/ingest/views.py b/ingest/views.py
index 8465fbc..3db564e 100644
--- a/ingest/views.py
+++ b/ingest/views.py
@@ -1,106 +1,98 @@
import json
-import uuid
-from datetime import timezone as dt_timezone
+from urllib.error import HTTPError, URLError
+from urllib.request import Request, urlopen
-from django.shortcuts import get_object_or_404
-from django.utils import timezone
-from rest_framework import status
-from rest_framework.response import Response
-from rest_framework.views import APIView
+from django.http import HttpResponse, JsonResponse
+from django.shortcuts import render
+from django.views import View
+from django.views.decorators.csrf import csrf_exempt
+from django.utils.decorators import method_decorator
-from .cassandra import cassandra_service
-from .models import SensorDevice
-from .serializers import (
- HandshakeSerializer,
- SensorDeviceSerializer,
- SensorIngestSerializer,
- SensorPayloadQuerySerializer,
-)
-from .services import translation_service
+from ingest.constants import API_KEY, API_TARGET_URL, STATIC_SENSOR_PAYLOAD
-class DeviceHandshakeView(APIView):
+class SensorSimulatorAppView(View):
+ def get(self, request):
+ return render(
+ request,
+ "ingest/index.html",
+ {
+ "default_payload": json.dumps(STATIC_SENSOR_PAYLOAD, indent=2),
+ "default_url": API_TARGET_URL,
+ "default_api_key": API_KEY,
+ },
+ )
+
+
+@method_decorator(csrf_exempt, name="dispatch")
+class ForwardSensorDataView(View):
def post(self, request):
- serializer = HandshakeSerializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- data = serializer.validated_data
+ target_url = request.POST.get("target_url", "").strip()
+ api_key = request.POST.get("api_key", "").strip()
- device, created = SensorDevice.objects.update_or_create(
- device_identifier=data["device_identifier"],
- defaults={
- "device_name": data.get("device_name", ""),
- "handshake_token": data.get("handshake_token", ""),
- "metadata": data.get("metadata", {}),
- "translation_config": data.get("translation_config", {}),
- "is_active": True,
+ if not target_url:
+ return JsonResponse({"error": "target_url is required"}, status=400)
+ if not api_key:
+ return JsonResponse({"error": "api_key is required"}, status=400)
+
+ payload = STATIC_SENSOR_PAYLOAD
+ body = json.dumps(payload).encode("utf-8")
+ outbound_request = Request(
+ target_url,
+ data=body,
+ headers={
+ "Content-Type": "application/json",
+ "api_key": api_key,
},
+ method="POST",
)
- response_status = status.HTTP_201_CREATED if created else status.HTTP_200_OK
- return Response(
- {
- "message": "Handshake completed successfully.",
- "sensor_uuid": str(device.sensor_uuid),
- "device": SensorDeviceSerializer(device).data,
- },
- status=response_status,
- )
+ try:
+ with urlopen(outbound_request, timeout=15) as response:
+ response_body = response.read().decode("utf-8")
+ content_type = response.headers.get("Content-Type", "")
+ parsed_body = response_body
+ if "application/json" in content_type and response_body:
+ parsed_body = json.loads(response_body)
+ return JsonResponse(
+ {
+ "status": response.status,
+ "sent_headers": {
+ "Content-Type": "application/json",
+ "api_key": api_key,
+ },
+ "sent_payload": payload,
+ "response": parsed_body,
+ }
+ )
+ except HTTPError as exc:
+ error_body = exc.read().decode("utf-8", errors="replace")
+ return JsonResponse(
+ {
+ "error": "upstream returned an error",
+ "status": exc.code,
+ "sent_headers": {
+ "Content-Type": "application/json",
+ "api_key": api_key,
+ },
+ "sent_payload": payload,
+ "response": error_body,
+ },
+ status=502,
+ )
+ except URLError as exc:
+ return JsonResponse(
+ {
+ "error": "could not reach upstream api",
+ "details": str(exc.reason),
+ "sent_headers": {
+ "Content-Type": "application/json",
+ "api_key": api_key,
+ },
+ "sent_payload": payload,
+ },
+ status=502,
+ )
-class SensorIngestView(APIView):
- def post(self, request):
- serializer = SensorIngestSerializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- data = serializer.validated_data
-
- device = get_object_or_404(SensorDevice, sensor_uuid=data["sensor_uuid"], is_active=True)
- reading_at = data.get("reading_at") or timezone.now()
- if reading_at.tzinfo is None:
- reading_at = reading_at.replace(tzinfo=dt_timezone.utc)
-
- translated_payload, translation_status = translation_service.translate(
- data["payload"],
- config=device.translation_config,
- )
- payload_id = uuid.uuid4()
- created_at = timezone.now()
-
- stored = cassandra_service.save_payload(
- sensor_uuid=str(device.sensor_uuid),
- payload_id=payload_id,
- reading_at=reading_at,
- original_payload=json.dumps(data["payload"], ensure_ascii=False),
- translated_payload=json.dumps(translated_payload, ensure_ascii=False),
- translation_status=translation_status,
- created_at=created_at,
- )
-
- return Response(
- {
- "message": "Payload processed successfully.",
- "sensor_uuid": str(device.sensor_uuid),
- "payload_id": str(payload_id),
- "translation_status": translation_status,
- "stored_in_cassandra": stored,
- },
- status=status.HTTP_201_CREATED,
- )
-
-
-class SensorPayloadListView(APIView):
- def get(self, request, sensor_uuid):
- query_serializer = SensorPayloadQuerySerializer(data=request.query_params)
- query_serializer.is_valid(raise_exception=True)
- limit = query_serializer.validated_data["limit"]
-
- device = get_object_or_404(SensorDevice, sensor_uuid=sensor_uuid, is_active=True)
- rows = cassandra_service.get_payloads(str(device.sensor_uuid), limit=limit)
-
- return Response(
- {
- "sensor_uuid": str(device.sensor_uuid),
- "device_identifier": device.device_identifier,
- "count": len(rows),
- "results": rows,
- }
- )
+ return HttpResponse(status=500)