from __future__ import annotations import json import logging from typing import Any from pydantic import BaseModel, Field, ValidationError from rag.api_provider import get_chat_client from rag.chat import ( _complete_audit_log, _create_audit_log, _fail_audit_log, _load_service_tone, ) from rag.config import RAGConfig, get_service_config, load_rag_config logger = logging.getLogger(__name__) SERVICE_ID = "yield_harvest" YIELD_HARVEST_PROMPT = ( "You are an expert agronomist writing concise dashboard narratives for farmers. " "Return only valid JSON matching this schema exactly: " "{" '"season_highlights_subtitle": string, ' '"yield_prediction_explanation": string, ' '"harvest_readiness_summary": string, ' '"operation_notes": [string, ...]' "}. " "Do not add markdown, explanations, or extra keys. " "Strict Golden Rule: do not invent numbers, dates, prices, revenues, percentages, KPIs, scores, or measurements. " "Use only values already present in the deterministic context. " "If a fact is missing from the context, say less rather than guessing." ) class YieldHarvestNarrativeSchema(BaseModel): season_highlights_subtitle: str yield_prediction_explanation: str harvest_readiness_summary: str operation_notes: list[str] = Field(default_factory=list) class YieldHarvestRAGService: def generate_narrative( self, deterministic_context: dict[str, Any], ) -> dict[str, Any]: cfg = load_rag_config() service, client, model = self._build_service_client(cfg) structured_context = self._build_structured_context( deterministic_context=deterministic_context, ) user_prompt = ( "Generate short user-friendly narrative fields for the Yield & Harvest Summary dashboard " "using only the deterministic context. Keep the language practical and agronomy-focused." ) system_prompt, messages = self._build_messages( service=service, cfg=cfg, structured_context=structured_context, query=user_prompt, ) farm_uuid = str(deterministic_context.get("farm_uuid") or "") audit_log = None if farm_uuid: try: audit_log = _create_audit_log( farm_uuid=farm_uuid, service_id=SERVICE_ID, model=model, query=user_prompt, system_prompt=system_prompt, messages=messages, ) except Exception as exc: logger.warning("Yield harvest audit log creation failed for %s: %s", farm_uuid, exc) try: response = client.chat.completions.create( model=model, messages=messages, response_format={"type": "json_object"}, ) raw = (response.choices[0].message.content or "").strip() parsed = self._clean_json(raw) validated = YieldHarvestNarrativeSchema.model_validate(parsed) if audit_log is not None: _complete_audit_log(audit_log, raw) return { "season_highlights_subtitle": validated.season_highlights_subtitle, "yield_prediction_explanation": validated.yield_prediction_explanation, "harvest_readiness_summary": validated.harvest_readiness_summary, "operation_notes": validated.operation_notes, } except (ValidationError, ValueError, KeyError, IndexError) as exc: logger.warning("Yield harvest narrative parsing failed for farm_uuid=%s: %s", farm_uuid, exc) if audit_log is not None: _fail_audit_log(audit_log, str(exc)) return {} except Exception as exc: logger.error("Yield harvest narrative LLM call failed for farm_uuid=%s: %s", farm_uuid, exc) if audit_log is not None: _fail_audit_log(audit_log, str(exc)) return {} def _build_service_client(self, cfg: RAGConfig): service = get_service_config(SERVICE_ID, cfg) service_cfg = RAGConfig( embedding=cfg.embedding, qdrant=cfg.qdrant, chunking=cfg.chunking, llm=service.llm, knowledge_bases=cfg.knowledge_bases, services=cfg.services, chromadb=cfg.chromadb, ) client = get_chat_client(service_cfg) return service, client, service.llm.model def _build_messages( self, *, service: Any, cfg: RAGConfig, structured_context: dict[str, Any], query: str, ) -> tuple[str, list[dict[str, str]]]: tone = _load_service_tone(service, cfg) system_parts = [tone] if tone else [] if service.system_prompt: system_parts.append(service.system_prompt) system_parts.append(YIELD_HARVEST_PROMPT) system_parts.append( "[deterministic_context]\n" + json.dumps(structured_context, ensure_ascii=False, indent=2, default=str) ) system_prompt = "\n\n".join(part for part in system_parts if part) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": query}, ] return system_prompt, messages def _build_structured_context( self, *, deterministic_context: dict[str, Any], ) -> dict[str, Any]: season = deterministic_context.get("season_highlights_card") or {} harvest = deterministic_context.get("harvest_prediction_card") or {} operations = deterministic_context.get("harvest_operations_card") or {} yield_prediction = deterministic_context.get("yield_prediction") or {} readiness = deterministic_context.get("harvest_readiness_zones") or {} operation_steps = [] for step in operations.get("steps") or []: if not isinstance(step, dict): continue operation_steps.append( { "key": step.get("key"), "title": step.get("title"), "status": step.get("status"), } ) return { "farm_context": deterministic_context.get("farm_context") or {}, "yield_prediction": { "predicted_yield_tons": yield_prediction.get("predicted_yield_tons"), "unit": yield_prediction.get("unit"), "simulation_warning": yield_prediction.get("simulation_warning"), "supporting_metrics": yield_prediction.get("supporting_metrics"), }, "season_highlights_card": { "title": season.get("title"), "subtitle": season.get("subtitle"), "total_predicted_yield": season.get("total_predicted_yield"), "yield_unit": season.get("yield_unit"), "target_harvest_date": season.get("target_harvest_date"), "days_until_harvest": season.get("days_until_harvest"), "average_readiness": season.get("average_readiness"), "primary_quality_grade": season.get("primary_quality_grade"), "estimated_revenue": season.get("estimated_revenue"), }, "harvest_prediction_card": { "harvest_date": harvest.get("harvest_date"), "harvest_date_formatted": harvest.get("harvest_date_formatted"), "days_until": harvest.get("days_until"), "optimal_window_start": harvest.get("optimal_window_start"), "optimal_window_end": harvest.get("optimal_window_end"), "description": harvest.get("description"), }, "harvest_readiness_zones": { "average_readiness": readiness.get("averageReadiness"), "mean_ndvi": readiness.get("meanNdvi"), "ndvi_trend": readiness.get("ndviTrend"), "zones": readiness.get("zones"), }, "harvest_operations_card": { "stage_label": operations.get("stage_label"), "days_until_harvest": operations.get("days_until_harvest"), "current_dvs": operations.get("current_dvs"), "summary": operations.get("summary"), "steps": operation_steps, }, } def _clean_json(self, raw: str) -> dict[str, Any]: cleaned = (raw or "").strip() if cleaned.startswith("```"): cleaned = cleaned.strip("`") if cleaned.startswith("json"): cleaned = cleaned[4:] cleaned = cleaned.strip() if not cleaned: raise ValueError("Yield harvest narrative response was empty.") try: parsed = json.loads(cleaned) except (json.JSONDecodeError, ValueError) as exc: raise ValueError("Yield harvest narrative response was not valid JSON.") from exc if not isinstance(parsed, dict): raise ValueError("Yield harvest narrative response root must be a JSON object.") return parsed