Files
SensorHub/ingest/tests.py
T

44 lines
1.4 KiB
Python
Raw Normal View History

2026-03-25 01:54:43 +03:30
from unittest.mock import patch
from django.test import TestCase
from rest_framework.test import APIClient
from .models import SensorDevice
class IngestApiTests(TestCase):
def setUp(self):
self.client = APIClient()
def test_handshake_returns_sensor_uuid(self):
response = self.client.post(
"/api/ingest/devices/handshake/",
{
"device_identifier": "device-001",
"device_name": "Field Sensor",
"metadata": {"farm": "north"},
},
format="json",
)
self.assertEqual(response.status_code, 201)
self.assertIn("sensor_uuid", response.data)
self.assertTrue(SensorDevice.objects.filter(device_identifier="device-001").exists())
@patch("ingest.views.cassandra_service.save_payload", return_value=True)
def test_ingest_uses_sensor_uuid(self, _mock_save):
device = SensorDevice.objects.create(device_identifier="device-002")
response = self.client.post(
"/api/ingest/payloads/ingest/",
{
"sensor_uuid": str(device.sensor_uuid),
"payload": {"temperature": 24.1},
},
format="json",
)
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data["sensor_uuid"], str(device.sensor_uuid))
self.assertEqual(response.data["stored_in_cassandra"], True)