This commit is contained in:
2026-03-26 15:39:31 +03:30
parent f305e00cfe
commit 32a0e3f3d9
26 changed files with 2188 additions and 265 deletions
+81 -98
View File
@@ -1,64 +1,15 @@
from rest_framework import status
from rest_framework import serializers
from rest_framework import serializers, status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema, extend_schema_view
from drf_spectacular.utils import extend_schema
from config.swagger import code_response
from .models import Sensor
from .serializers import SensorCreateSerializer, SensorSerializer
from .serializers import SensorCreateSerializer, SensorSerializer, SensorToggleSerializer
@extend_schema_view(
get=extend_schema(
tags=["Sensor Hub"],
responses={
200: code_response("SensorHubGetResponse", data=serializers.JSONField()),
404: code_response("SensorHubNotFoundResponse"),
},
),
post=extend_schema(
tags=["Sensor Hub"],
request=OpenApiTypes.OBJECT,
responses={
201: code_response("SensorCreateResponse", data=serializers.JSONField()),
200: code_response("SensorToggleResponse"),
400: code_response("SensorToggleValidationResponse"),
404: code_response("SensorToggleNotFoundResponse"),
},
),
patch=extend_schema(
tags=["Sensor Hub"],
request=SensorCreateSerializer,
responses={
200: code_response("SensorUpdateResponse", data=SensorSerializer()),
404: code_response("SensorUpdateNotFoundResponse"),
},
),
delete=extend_schema(
tags=["Sensor Hub"],
responses={
200: code_response("SensorDeleteResponse"),
404: code_response("SensorDeleteNotFoundResponse"),
},
),
)
class SensorHubView(APIView):
"""
Sensor-hub CRUD endpoints connected to the database.
Routes:
- GET "" → List sensors for authenticated user.
- GET "<uuid>/" → Detail of a single sensor.
- POST "" → Create a new sensor.
- PATCH "<uuid>/" → Update an existing sensor.
- DELETE "<uuid>/" → Delete a sensor.
- POST "active/" → Activate a sensor (requires uuid_sensor in body).
- POST "deactive/" → Deactivate a sensor (requires uuid_sensor in body).
"""
class SensorHubBaseView(APIView):
permission_classes = [IsAuthenticated]
def _get_sensor(self, request, uuid):
@@ -67,74 +18,106 @@ class SensorHubView(APIView):
except Sensor.DoesNotExist:
return None
def get(self, request, *args, **kwargs):
uuid = kwargs.get("uuid")
if uuid is not None:
sensor = self._get_sensor(request, uuid)
if sensor is None:
return Response(
{"code": 404, "msg": "Sensor not found."},
status=status.HTTP_404_NOT_FOUND,
)
data = SensorSerializer(sensor).data
return Response({"code": 200, "msg": "success", "data": data}, status=status.HTTP_200_OK)
class SensorListCreateView(SensorHubBaseView):
@extend_schema(
tags=["Sensor Hub"],
responses={200: code_response("SensorListResponse", data=SensorSerializer(many=True))},
)
def get(self, request):
sensors = Sensor.objects.filter(owner=request.user)
data = SensorSerializer(sensors, many=True).data
return Response({"code": 200, "msg": "success", "data": data}, status=status.HTTP_200_OK)
def post(self, request, *args, **kwargs):
action = kwargs.get("action")
if action in ("active", "deactive"):
return self._toggle_active(request, is_active=(action == "active"))
@extend_schema(
tags=["Sensor Hub"],
request=SensorCreateSerializer,
responses={201: code_response("SensorCreateResponse", data=SensorSerializer())},
)
def post(self, request):
serializer = SensorCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
sensor = serializer.save(owner=request.user)
data = SensorSerializer(sensor).data
return Response(
{"code": 201, "msg": "success", "data": data},
status=status.HTTP_201_CREATED,
)
return Response({"code": 201, "msg": "success", "data": data}, status=status.HTTP_201_CREATED)
def patch(self, request, *args, **kwargs):
uuid = kwargs.get("uuid")
class SensorDetailView(SensorHubBaseView):
@extend_schema(
tags=["Sensor Hub"],
responses={
200: code_response("SensorDetailResponse", data=SensorSerializer()),
404: code_response("SensorNotFoundResponse"),
},
)
def get(self, request, uuid):
sensor = self._get_sensor(request, uuid)
if sensor is None:
return Response(
{"code": 404, "msg": "Sensor not found."},
status=status.HTTP_404_NOT_FOUND,
)
return Response({"code": 404, "msg": "Sensor not found."}, status=status.HTTP_404_NOT_FOUND)
data = SensorSerializer(sensor).data
return Response({"code": 200, "msg": "success", "data": data}, status=status.HTTP_200_OK)
@extend_schema(
tags=["Sensor Hub"],
request=SensorCreateSerializer,
responses={
200: code_response("SensorUpdateResponse", data=SensorSerializer()),
404: code_response("SensorUpdateNotFoundResponse"),
},
)
def patch(self, request, uuid):
sensor = self._get_sensor(request, uuid)
if sensor is None:
return Response({"code": 404, "msg": "Sensor not found."}, status=status.HTTP_404_NOT_FOUND)
serializer = SensorCreateSerializer(sensor, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
data = SensorSerializer(sensor).data
return Response({"code": 200, "msg": "success", "data": data}, status=status.HTTP_200_OK)
def delete(self, request, *args, **kwargs):
uuid = kwargs.get("uuid")
@extend_schema(
tags=["Sensor Hub"],
responses={
200: code_response("SensorDeleteResponse"),
404: code_response("SensorDeleteNotFoundResponse"),
},
)
def delete(self, request, uuid):
sensor = self._get_sensor(request, uuid)
if sensor is None:
return Response(
{"code": 404, "msg": "Sensor not found."},
status=status.HTTP_404_NOT_FOUND,
)
return Response({"code": 404, "msg": "Sensor not found."}, status=status.HTTP_404_NOT_FOUND)
sensor.delete()
return Response({"code": 200, "msg": "success"}, status=status.HTTP_200_OK)
def _toggle_active(self, request, is_active):
uuid_sensor = request.data.get("uuid_sensor")
if not uuid_sensor:
return Response(
{"code": 400, "msg": "uuid_sensor is required."},
status=status.HTTP_400_BAD_REQUEST,
)
sensor = self._get_sensor(request, uuid_sensor)
class SensorToggleView(SensorHubBaseView):
action = None
@extend_schema(
tags=["Sensor Hub"],
request=SensorToggleSerializer,
responses={
200: code_response("SensorToggleResponse"),
400: code_response("SensorToggleValidationResponse"),
404: code_response("SensorToggleNotFoundResponse"),
},
)
def post(self, request):
serializer = SensorToggleSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
sensor = self._get_sensor(request, serializer.validated_data["uuid_sensor"])
if sensor is None:
return Response(
{"code": 404, "msg": "Sensor not found."},
status=status.HTTP_404_NOT_FOUND,
)
sensor.is_active = is_active
return Response({"code": 404, "msg": "Sensor not found."}, status=status.HTTP_404_NOT_FOUND)
sensor.is_active = self.action == "active"
sensor.save(update_fields=["is_active", "updated_at"])
return Response({"code": 200, "msg": "success"}, status=status.HTTP_200_OK)
class SensorActiveView(SensorToggleView):
action = "active"
class SensorDeactiveView(SensorToggleView):
action = "deactive"