UPDATE
This commit is contained in:
+86
-94
@@ -1,106 +1,98 @@
|
||||
import json
|
||||
import uuid
|
||||
from datetime import timezone as dt_timezone
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils import timezone
|
||||
from rest_framework import status
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import render
|
||||
from django.views import View
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.utils.decorators import method_decorator
|
||||
|
||||
from .cassandra import cassandra_service
|
||||
from .models import SensorDevice
|
||||
from .serializers import (
|
||||
HandshakeSerializer,
|
||||
SensorDeviceSerializer,
|
||||
SensorIngestSerializer,
|
||||
SensorPayloadQuerySerializer,
|
||||
)
|
||||
from .services import translation_service
|
||||
from ingest.constants import API_KEY, API_TARGET_URL, STATIC_SENSOR_PAYLOAD
|
||||
|
||||
|
||||
class DeviceHandshakeView(APIView):
|
||||
class SensorSimulatorAppView(View):
|
||||
def get(self, request):
|
||||
return render(
|
||||
request,
|
||||
"ingest/index.html",
|
||||
{
|
||||
"default_payload": json.dumps(STATIC_SENSOR_PAYLOAD, indent=2),
|
||||
"default_url": API_TARGET_URL,
|
||||
"default_api_key": API_KEY,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@method_decorator(csrf_exempt, name="dispatch")
|
||||
class ForwardSensorDataView(View):
|
||||
def post(self, request):
|
||||
serializer = HandshakeSerializer(data=request.data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
data = serializer.validated_data
|
||||
target_url = request.POST.get("target_url", "").strip()
|
||||
api_key = request.POST.get("api_key", "").strip()
|
||||
|
||||
device, created = SensorDevice.objects.update_or_create(
|
||||
device_identifier=data["device_identifier"],
|
||||
defaults={
|
||||
"device_name": data.get("device_name", ""),
|
||||
"handshake_token": data.get("handshake_token", ""),
|
||||
"metadata": data.get("metadata", {}),
|
||||
"translation_config": data.get("translation_config", {}),
|
||||
"is_active": True,
|
||||
if not target_url:
|
||||
return JsonResponse({"error": "target_url is required"}, status=400)
|
||||
if not api_key:
|
||||
return JsonResponse({"error": "api_key is required"}, status=400)
|
||||
|
||||
payload = STATIC_SENSOR_PAYLOAD
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
outbound_request = Request(
|
||||
target_url,
|
||||
data=body,
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"api_key": api_key,
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
|
||||
response_status = status.HTTP_201_CREATED if created else status.HTTP_200_OK
|
||||
return Response(
|
||||
{
|
||||
"message": "Handshake completed successfully.",
|
||||
"sensor_uuid": str(device.sensor_uuid),
|
||||
"device": SensorDeviceSerializer(device).data,
|
||||
},
|
||||
status=response_status,
|
||||
)
|
||||
try:
|
||||
with urlopen(outbound_request, timeout=15) as response:
|
||||
response_body = response.read().decode("utf-8")
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
parsed_body = response_body
|
||||
if "application/json" in content_type and response_body:
|
||||
parsed_body = json.loads(response_body)
|
||||
|
||||
return JsonResponse(
|
||||
{
|
||||
"status": response.status,
|
||||
"sent_headers": {
|
||||
"Content-Type": "application/json",
|
||||
"api_key": api_key,
|
||||
},
|
||||
"sent_payload": payload,
|
||||
"response": parsed_body,
|
||||
}
|
||||
)
|
||||
except HTTPError as exc:
|
||||
error_body = exc.read().decode("utf-8", errors="replace")
|
||||
return JsonResponse(
|
||||
{
|
||||
"error": "upstream returned an error",
|
||||
"status": exc.code,
|
||||
"sent_headers": {
|
||||
"Content-Type": "application/json",
|
||||
"api_key": api_key,
|
||||
},
|
||||
"sent_payload": payload,
|
||||
"response": error_body,
|
||||
},
|
||||
status=502,
|
||||
)
|
||||
except URLError as exc:
|
||||
return JsonResponse(
|
||||
{
|
||||
"error": "could not reach upstream api",
|
||||
"details": str(exc.reason),
|
||||
"sent_headers": {
|
||||
"Content-Type": "application/json",
|
||||
"api_key": api_key,
|
||||
},
|
||||
"sent_payload": payload,
|
||||
},
|
||||
status=502,
|
||||
)
|
||||
|
||||
class SensorIngestView(APIView):
|
||||
def post(self, request):
|
||||
serializer = SensorIngestSerializer(data=request.data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
data = serializer.validated_data
|
||||
|
||||
device = get_object_or_404(SensorDevice, sensor_uuid=data["sensor_uuid"], is_active=True)
|
||||
reading_at = data.get("reading_at") or timezone.now()
|
||||
if reading_at.tzinfo is None:
|
||||
reading_at = reading_at.replace(tzinfo=dt_timezone.utc)
|
||||
|
||||
translated_payload, translation_status = translation_service.translate(
|
||||
data["payload"],
|
||||
config=device.translation_config,
|
||||
)
|
||||
payload_id = uuid.uuid4()
|
||||
created_at = timezone.now()
|
||||
|
||||
stored = cassandra_service.save_payload(
|
||||
sensor_uuid=str(device.sensor_uuid),
|
||||
payload_id=payload_id,
|
||||
reading_at=reading_at,
|
||||
original_payload=json.dumps(data["payload"], ensure_ascii=False),
|
||||
translated_payload=json.dumps(translated_payload, ensure_ascii=False),
|
||||
translation_status=translation_status,
|
||||
created_at=created_at,
|
||||
)
|
||||
|
||||
return Response(
|
||||
{
|
||||
"message": "Payload processed successfully.",
|
||||
"sensor_uuid": str(device.sensor_uuid),
|
||||
"payload_id": str(payload_id),
|
||||
"translation_status": translation_status,
|
||||
"stored_in_cassandra": stored,
|
||||
},
|
||||
status=status.HTTP_201_CREATED,
|
||||
)
|
||||
|
||||
|
||||
class SensorPayloadListView(APIView):
|
||||
def get(self, request, sensor_uuid):
|
||||
query_serializer = SensorPayloadQuerySerializer(data=request.query_params)
|
||||
query_serializer.is_valid(raise_exception=True)
|
||||
limit = query_serializer.validated_data["limit"]
|
||||
|
||||
device = get_object_or_404(SensorDevice, sensor_uuid=sensor_uuid, is_active=True)
|
||||
rows = cassandra_service.get_payloads(str(device.sensor_uuid), limit=limit)
|
||||
|
||||
return Response(
|
||||
{
|
||||
"sensor_uuid": str(device.sensor_uuid),
|
||||
"device_identifier": device.device_identifier,
|
||||
"count": len(rows),
|
||||
"results": rows,
|
||||
}
|
||||
)
|
||||
return HttpResponse(status=500)
|
||||
|
||||
Reference in New Issue
Block a user