""" پایپ‌لاین ورودی RAG: خواندن، چانک، embed و ذخیره در vector store سه منبع: ۱. لحن (tone) — sensor_uuid=__global__ ۲. پایگاه دانش (knowledge base) — sensor_uuid=__global__ ۳. دیتای خاک هر کاربر از DB (sensor_data + soil_data) — sensor_uuid=uuid """ import uuid from pathlib import Path from .chunker import chunk_text, chunk_texts from .config import load_rag_config, RAGConfig from .embedding import embed_texts from .user_data import load_user_sources from .vector_store import QdrantVectorStore # پسوندهای قابل خواندن TEXT_EXTENSIONS = {".txt", ".md", ".rst", ".json"} SENSOR_UUID_GLOBAL = "__global__" def _resolve_path(base: Path, p: str) -> Path: """تبدیل مسیر نسبی به مطلق نسبت به base پروژه.""" path = Path(p) if not path.is_absolute(): path = base / path return path def _load_file(path: Path) -> str | None: """خواندن یک فایل متنی.""" if not path.exists() or not path.is_file(): return None try: return path.read_text(encoding="utf-8").strip() except Exception: return None def _load_files_from_dir(dir_path: Path, prefix: str = "kb") -> list[tuple[str, str]]: """ خواندن همه فایل‌های متنی از یک دایرکتوری. Returns: [(source_id, content), ...] """ if not dir_path.exists() or not dir_path.is_dir(): return [] out: list[tuple[str, str]] = [] for f in sorted(dir_path.rglob("*")): if f.is_file() and f.suffix.lower() in TEXT_EXTENSIONS: rel = f.relative_to(dir_path) source_id = f"{prefix}:{rel}" content = _load_file(f) if content: out.append((source_id, content)) return out def load_sources(config: RAGConfig | None = None) -> list[tuple[str, str, str]]: """ بارگذاری سه منبع: لحن، پایگاه دانش، دیتای کاربر از DB. Returns: [(source_id, content, sensor_uuid), ...] sensor_uuid: __global__ برای tone/kb، uuid سنسور برای user """ cfg = config or load_rag_config() base = Path(__file__).resolve().parent.parent sources: list[tuple[str, str, str]] = [] # ۱. لحن tone_path = _resolve_path(base, cfg.tone_file) content = _load_file(tone_path) if content: sources.append(("tone", content, SENSOR_UUID_GLOBAL)) # ۲. پایگاه دانش kb_path = _resolve_path(base, cfg.knowledge_base_path) for sid, c in _load_files_from_dir(kb_path, prefix="kb"): sources.append((sid, c, SENSOR_UUID_GLOBAL)) if kb_path.is_file(): content = _load_file(kb_path) if content: sources.append((f"kb:{kb_path.name}", content, SENSOR_UUID_GLOBAL)) # ۳. دیتای کاربران از sensor_data + soil_data for sid, content in load_user_sources(): sensor_uuid = sid.replace("user:", "") sources.append((sid, content, sensor_uuid)) return sources def ingest(recreate: bool = False, config: RAGConfig | None = None) -> dict: """ ورودی کامل: منابع را می‌خواند، چانک می‌کند، embed می‌کند و به vector store می‌فرستد. دیتای هر کاربر (sensor_uuid) جدا embed و با metadata ذخیره می‌شود. Args: recreate: اگر True باشد، collection را از نو می‌سازد config: تنظیمات RAG Returns: آمار ورودی (تعداد چانک، منبع‌ها، خطاها) """ cfg = config or load_rag_config() store = QdrantVectorStore(config=cfg) if recreate: store.ensure_collection(recreate=True) sources = load_sources(config=cfg) if not sources: return {"chunks_added": 0, "sources": [], "error": "هیچ منبعی یافت نشد"} all_chunks: list[str] = [] all_metas: list[dict] = [] all_ids: list[str] = [] for source_id, content, sensor_uuid in sources: chunks = chunk_text(content, config=cfg) for i, ch in enumerate(chunks): uid = str(uuid.uuid4()) all_ids.append(uid) all_chunks.append(ch) all_metas.append({ "source": source_id, "chunk_index": i, "sensor_uuid": sensor_uuid, }) if not all_chunks: return {"chunks_added": 0, "sources": [s[0] for s in sources], "error": "هیچ چانکی ساخته نشد"} embeddings = embed_texts(all_chunks, config=cfg) if len(embeddings) != len(all_chunks): return { "chunks_added": 0, "sources": [s[0] for s in sources], "error": f"تعداد embed با چانک‌ها مطابقت ندارد: {len(embeddings)} vs {len(all_chunks)}", } store.add_documents( ids=all_ids, embeddings=embeddings, documents=all_chunks, metadatas=all_metas, ) return { "chunks_added": len(all_chunks), "sources": [s[0] for s in sources], }