Files
Ai/fertilization/views.py
T
2026-03-21 23:50:36 +03:30

109 lines
3.9 KiB
Python

from drf_spectacular.utils import (
OpenApiExample,
OpenApiResponse,
extend_schema,
)
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from .serializers import FertilizationRecommendRequestSerializer
class FertilizationRecommendView(APIView):
"""
توصیه کودهی با Celery.
POST با sensor_uuid، plant_name، growth_stage.
اطلاعات گیاه از plant app دریافت می‌شود.
نیازی به دریافت نوع آبیاری نیست.
"""
@extend_schema(
tags=["Fertilization Recommendation"],
summary="درخواست توصیه کودهی",
description=(
"داده‌های سنسور و گیاه را دریافت کرده و یک تسک Celery "
"برای تولید توصیه کودهی در صف قرار می‌دهد. "
"اطلاعات گیاه از جدول Plant بارگذاری می‌شود."
),
request=FertilizationRecommendRequestSerializer,
responses={
202: OpenApiResponse(description="تسک در صف قرار گرفت"),
400: OpenApiResponse(description="پارامتر ورودی نامعتبر"),
},
examples=[
OpenApiExample(
"نمونه درخواست",
value={
"sensor_uuid": "550e8400-e29b-41d4-a716-446655440000",
"plant_name": "گوجه‌فرنگی",
"growth_stage": "گلدهی",
},
request_only=True,
),
],
)
def post(self, request):
from rag.tasks import fertilization_recommendation_task
serializer = FertilizationRecommendRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"code": 400, "msg": "داده نامعتبر.", "data": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
validated = serializer.validated_data
sensor_uuid = validated["sensor_uuid"]
plant_name = validated.get("plant_name")
growth_stage = validated.get("growth_stage")
query = validated.get("query")
task = fertilization_recommendation_task.delay(
sensor_uuid=sensor_uuid,
plant_name=plant_name,
growth_stage=growth_stage,
query=query,
)
return Response(
{
"code": 202,
"msg": "تسک توصیه کودهی در صف قرار گرفت.",
"data": {
"task_id": task.id,
"status_url": f"/api/fertilization/recommend/{task.id}/status/",
},
},
status=status.HTTP_202_ACCEPTED,
)
class FertilizationRecommendStatusView(APIView):
"""وضعیت تسک توصیه کودهی."""
@extend_schema(
tags=["Fertilization Recommendation"],
summary="وضعیت تسک توصیه کودهی",
description="وضعیت تسک Celery توصیه کودهی را برمی‌گرداند.",
responses={
200: OpenApiResponse(description="وضعیت تسک"),
},
)
def get(self, request, task_id):
from celery.result import AsyncResult
result = AsyncResult(task_id)
data = {"task_id": task_id, "status": result.state}
if result.state == "PENDING":
data["message"] = "تسک در صف یا یافت نشد."
elif result.state == "PROGRESS":
data["progress"] = result.info
elif result.state == "SUCCESS":
data["result"] = result.result
elif result.state == "FAILURE":
data["error"] = str(result.result)
return Response(
{"code": 200, "msg": "success", "data": data},
status=status.HTTP_200_OK,
)