Files
Ai/rag/services/fertilization_plan_parser.py
2026-05-13 16:45:54 +03:30

407 lines
16 KiB
Python

from __future__ import annotations
import json
import logging
from typing import Any, Literal
from pydantic import BaseModel, Field, ValidationError
from farm_data.services import build_ai_farm_snapshot
from rag.api_provider import get_chat_client
from rag.chat import (
_complete_audit_log,
_create_audit_log,
_fail_audit_log,
_load_service_tone,
build_rag_context,
)
from rag.config import RAGConfig, get_service_config, load_rag_config
logger = logging.getLogger(__name__)
SERVICE_ID = "fertilization_plan_parser"
KB_NAME = "fertilization_plan_parser"
CORE_FIELDS = [
"crop_name",
"growth_stage",
"fertilizer_name",
"formula",
"amount",
"application_method",
"timing",
"interval_days",
]
FERTILIZATION_PLAN_PROMPT = (
"شما یک تحلیل گر برنامه کودهی هستی. "
"کاربر ممکن است برنامه کودهی را کامل یا ناقص توضیح دهد. "
"فقط JSON معتبر برگردان و هرگز متن خارج از JSON، markdown یا کلید اضافه تولید نکن. "
"اگر اطلاعات کافی بود status را completed بگذار و final_plan را تکمیل کن. "
"اگر اطلاعات ناقص بود status را needs_clarification بگذار، missing_fields را پر کن و در questions سوال های کوتاه و دقیق برگردان. "
"اگر چند کود در متن بود، همه را در applications لیست کن. "
"اگر هرکدام از فیلدهای اصلی خالی، null یا نامشخص بود، حق نداری status را completed بگذاری. "
"در حالت completed هیچ فیلد null در collected_data و final_plan نباید وجود داشته باشد. "
"از حدس زدن مقدار، زمان یا روش مصرف خودداری کن. "
"Schema: "
"{"
'"status": "completed" | "needs_clarification", '
'"summary": string, '
'"missing_fields": [string], '
'"questions": [{"id": string, "field": string, "question": string, "rationale": string}], '
'"collected_data": {'
'"crop_name": string|null, '
'"growth_stage": string|null, '
'"objective": string|null, '
'"applications": ['
"{"
'"fertilizer_name": string|null, '
'"formula": string|null, '
'"amount": string|null, '
'"application_method": string|null, '
'"timing": string|null, '
'"interval_days": integer|null, '
'"purpose": string|null'
"}"
"], "
'"notes": [string]'
"}, "
'"final_plan": {same shape as collected_data} | null'
"}."
)
class ClarificationQuestionSchema(BaseModel):
id: str
field: str
question: str
rationale: str = ""
class FertilizerApplicationSchema(BaseModel):
fertilizer_name: str | None = None
formula: str | None = None
amount: str | None = None
application_method: str | None = None
timing: str | None = None
interval_days: int | None = None
purpose: str | None = None
class FertilizationPlanSchema(BaseModel):
crop_name: str | None = None
growth_stage: str | None = None
objective: str | None = None
applications: list[FertilizerApplicationSchema] = Field(default_factory=list)
notes: list[str] = Field(default_factory=list)
class FertilizationPlanParseResultSchema(BaseModel):
status: Literal["completed", "needs_clarification"]
summary: str
missing_fields: list[str] = Field(default_factory=list)
questions: list[ClarificationQuestionSchema] = Field(default_factory=list)
collected_data: FertilizationPlanSchema = Field(default_factory=FertilizationPlanSchema)
final_plan: FertilizationPlanSchema | None = None
class FertilizationPlanParserService:
def parse_plan(
self,
*,
message: str = "",
answers: dict[str, Any] | None = None,
partial_plan: dict[str, Any] | None = None,
farm_uuid: str | None = None,
) -> dict[str, Any]:
cfg = load_rag_config()
service, client, model = self._build_service_client(cfg)
normalized_message = (message or "").strip()
normalized_answers = answers if isinstance(answers, dict) else {}
normalized_partial = partial_plan if isinstance(partial_plan, dict) else {}
structured_context = {
"message": normalized_message,
"answers": normalized_answers,
"partial_plan": normalized_partial,
"required_core_fields": CORE_FIELDS,
"service": "fertilization_plan_parser",
"endpoint_policy": "parser_first",
}
if farm_uuid:
# Parser-first endpoint: farm context is optional enrichment only.
structured_context["farm_context_source_metadata"] = {
"source": "build_ai_farm_snapshot",
"optional": True,
}
rag_query = self._build_retrieval_query(
message=normalized_message,
answers=normalized_answers,
)
rag_context = build_rag_context(
query=rag_query,
sensor_uuid=farm_uuid,
config=cfg,
kb_name=KB_NAME,
service_id=SERVICE_ID,
)
system_prompt, messages = self._build_messages(
service=service,
cfg=cfg,
structured_context=structured_context,
rag_context=rag_context,
)
audit_log = None
if farm_uuid:
try:
audit_log = _create_audit_log(
farm_uuid=farm_uuid,
service_id=SERVICE_ID,
model=model,
query=rag_query,
system_prompt=system_prompt,
messages=messages,
)
except Exception as exc:
logger.warning("Fertilization plan parser audit log creation failed for %s: %s", farm_uuid, exc)
try:
response = client.chat.completions.create(
model=model,
messages=messages,
response_format={"type": "json_object"},
)
raw = (response.choices[0].message.content or "").strip()
parsed = self._clean_json(raw)
validated = FertilizationPlanParseResultSchema.model_validate(parsed)
normalized = self._normalize_result(validated)
if audit_log is not None:
_complete_audit_log(audit_log, raw)
return normalized
except (ValidationError, ValueError, KeyError, IndexError) as exc:
logger.warning("Fertilization plan parser parsing failed: %s", exc)
if audit_log is not None:
_fail_audit_log(audit_log, str(exc))
return self._fallback_result(
message=normalized_message,
answers=normalized_answers,
partial_plan=normalized_partial,
)
except Exception as exc:
logger.error("Fertilization plan parser failed: %s", exc)
if audit_log is not None:
_fail_audit_log(audit_log, str(exc))
return self._fallback_result(
message=normalized_message,
answers=normalized_answers,
partial_plan=normalized_partial,
)
def _build_service_client(self, cfg: RAGConfig):
service = get_service_config(SERVICE_ID, cfg)
service_cfg = RAGConfig(
embedding=cfg.embedding,
qdrant=cfg.qdrant,
chunking=cfg.chunking,
llm=service.llm,
knowledge_bases=cfg.knowledge_bases,
services=cfg.services,
chromadb=cfg.chromadb,
)
client = get_chat_client(service_cfg)
return service, client, service.llm.model
def _build_messages(
self,
*,
service: Any,
cfg: RAGConfig,
structured_context: dict[str, Any],
rag_context: str,
) -> tuple[str, list[dict[str, str]]]:
tone = _load_service_tone(service, cfg)
system_parts = [tone] if tone else []
if service.system_prompt:
system_parts.append(service.system_prompt)
system_parts.append(FERTILIZATION_PLAN_PROMPT)
system_parts.append(
"[structured_context]\n"
+ json.dumps(structured_context, ensure_ascii=False, indent=2, default=str)
)
if rag_context:
system_parts.append(rag_context)
system_prompt = "\n\n".join(part for part in system_parts if part)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "برنامه کودهی را استخراج یا برای تکمیل آن سوال بپرس."},
]
return system_prompt, messages
def _build_retrieval_query(
self,
*,
message: str,
answers: dict[str, Any],
) -> str:
answer_lines = [f"{key}: {value}" for key, value in answers.items()]
parts = [part for part in [message, "\n".join(answer_lines)] if part]
return "\n".join(parts) or "استخراج برنامه کودهی از متن کاربر"
def _normalize_result(self, validated: FertilizationPlanParseResultSchema) -> dict[str, Any]:
collected = validated.collected_data.model_dump()
final_plan = validated.final_plan.model_dump() if validated.final_plan is not None else None
missing_fields = list(dict.fromkeys(validated.missing_fields))
computed_missing = self._find_missing_fields(final_plan or collected)
for field in computed_missing:
if field not in missing_fields:
missing_fields.append(field)
can_complete = validated.status == "completed" and not missing_fields
if can_complete:
final_plan = final_plan or collected
questions: list[dict[str, Any]] = []
status_fa = "تکمیل شد"
else:
questions = [item.model_dump() for item in validated.questions]
if not questions and missing_fields:
questions = self._build_generic_questions(missing_fields)
final_plan = None
validated.status = "needs_clarification"
status_fa = "نیازمند پرسش تکمیلی"
return {
"status": "completed" if can_complete else "needs_clarification",
"status_fa": status_fa,
"summary": validated.summary,
"missing_fields": missing_fields,
"questions": questions,
"collected_data": collected,
"final_plan": final_plan,
}
def _fallback_result(
self,
*,
message: str,
answers: dict[str, Any],
partial_plan: dict[str, Any],
) -> dict[str, Any]:
applications = partial_plan.get("applications")
if not isinstance(applications, list):
applications = []
notes = list(partial_plan.get("notes") or [])
if message:
notes.append(f"متن اولیه کاربر: {message}")
if answers:
notes.append("پاسخ های تکمیلی کاربر دریافت شده است.")
return {
"status": "needs_clarification",
"status_fa": "نیازمند پرسش تکمیلی",
"summary": "اطلاعات برنامه کودهی برای ساخت JSON نهایی کافی نیست و به چند پاسخ تکمیلی نیاز است.",
"missing_fields": CORE_FIELDS,
"questions": self._build_generic_questions(CORE_FIELDS),
"collected_data": {
"crop_name": partial_plan.get("crop_name"),
"growth_stage": partial_plan.get("growth_stage"),
"objective": partial_plan.get("objective"),
"applications": applications,
"notes": notes,
},
"final_plan": None,
}
def _build_generic_questions(self, missing_fields: list[str]) -> list[dict[str, str]]:
catalog = {
"crop_name": {
"id": "crop_name",
"field": "crop_name",
"question": "این برنامه کودهی برای کدام محصول است؟",
"rationale": "نام محصول برای ثبت برنامه لازم است.",
},
"growth_stage": {
"id": "growth_stage",
"field": "growth_stage",
"question": "محصول الان در چه مرحله رشدی قرار دارد؟",
"rationale": "مرحله رشد برای تکمیل برنامه لازم است.",
},
"fertilizer_name": {
"id": "fertilizer_name",
"field": "fertilizer_name",
"question": "نام کود یا ترکیب کودی چیست؟",
"rationale": "بدون نام کود نمی توان برنامه را نهایی کرد.",
},
"formula": {
"id": "formula",
"field": "formula",
"question": "فرمول یا آنالیز کود چیست؟ مثلا 20-20-20.",
"rationale": "ترکیب دقیق کود هنوز مشخص نشده است.",
},
"amount": {
"id": "amount",
"field": "amount",
"question": "مقدار مصرف هر نوبت کود چقدر است؟",
"rationale": "دوز مصرف در متن مشخص نشده است.",
},
"application_method": {
"id": "application_method",
"field": "application_method",
"question": "روش مصرف کود چیست؟ مثلا کودآبیاری، سرک یا محلول پاشی.",
"rationale": "روش اجرا هنوز معلوم نیست.",
},
"timing": {
"id": "timing",
"field": "timing",
"question": "زمان مصرف کود چه موقع است؟ مثلا هر 10 روز یا بعد از آبیاری.",
"rationale": "زمان بندی برنامه نیاز به شفاف سازی دارد.",
},
"interval_days": {
"id": "interval_days",
"field": "interval_days",
"question": "فاصله بین نوبت های مصرف کود چند روز است؟",
"rationale": "عدد فاصله بین نوبت ها برای JSON نهایی لازم است.",
},
}
return [catalog[field] for field in missing_fields if field in catalog][:5]
def _find_missing_fields(self, plan: dict[str, Any]) -> list[str]:
missing: list[str] = []
if not isinstance(plan, dict):
return CORE_FIELDS[:]
if plan.get("crop_name") in (None, ""):
missing.append("crop_name")
if plan.get("growth_stage") in (None, ""):
missing.append("growth_stage")
applications = plan.get("applications")
if not isinstance(applications, list) or not applications:
return missing + [
field
for field in ["fertilizer_name", "formula", "amount", "application_method", "timing", "interval_days"]
if field not in missing
]
first_application = applications[0] if isinstance(applications[0], dict) else {}
for field in ["fertilizer_name", "formula", "amount", "application_method", "timing", "interval_days"]:
value = first_application.get(field)
if value is None or (isinstance(value, str) and not value.strip()):
missing.append(field)
return missing
def _clean_json(self, raw: str) -> dict[str, Any]:
cleaned = (raw or "").strip()
if cleaned.startswith("```"):
cleaned = cleaned.strip("`")
if cleaned.startswith("json"):
cleaned = cleaned[4:]
cleaned = cleaned.strip()
if not cleaned:
raise ValueError("Fertilization plan parser response was empty.")
parsed = json.loads(cleaned)
if not isinstance(parsed, dict):
raise ValueError("Fertilization plan parser response root must be an object.")
return parsed