Files
Ai/rag/ingest.py
T
sajad-dev 197f70ee12 Add Qdrant and ChromaDB support to the project
- Added Qdrant service to both docker-compose files for production and development.
- Updated environment variables in .env.example and settings.py to include Qdrant configuration.
- Included necessary dependencies for Qdrant and ChromaDB in requirements.txt.
- Updated .gitignore to exclude ChromaDB data files.
2026-02-27 19:37:02 +03:30

149 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
پایپ‌لاین ورودی RAG: خواندن، چانک، embed و ذخیره در vector store
سه منبع:
۱. لحن (tone)
۲. پایگاه دانش (knowledge base)
۳. اطلاعات هر کاربر (user info)
"""
import uuid
from pathlib import Path
from .chunker import chunk_text, chunk_texts
from .config import load_rag_config, RAGConfig
from .embedding import embed_texts
from .vector_store import QdrantVectorStore
# پسوندهای قابل خواندن
TEXT_EXTENSIONS = {".txt", ".md", ".rst", ".json"}
def _resolve_path(base: Path, p: str) -> Path:
"""تبدیل مسیر نسبی به مطلق نسبت به base پروژه."""
path = Path(p)
if not path.is_absolute():
path = base / path
return path
def _load_file(path: Path) -> str | None:
"""خواندن یک فایل متنی."""
if not path.exists() or not path.is_file():
return None
try:
return path.read_text(encoding="utf-8").strip()
except Exception:
return None
def _load_files_from_dir(dir_path: Path, prefix: str = "kb") -> list[tuple[str, str]]:
"""
خواندن همه فایل‌های متنی از یک دایرکتوری.
Returns: [(source_id, content), ...]
"""
if not dir_path.exists() or not dir_path.is_dir():
return []
out: list[tuple[str, str]] = []
for f in sorted(dir_path.rglob("*")):
if f.is_file() and f.suffix.lower() in TEXT_EXTENSIONS:
rel = f.relative_to(dir_path)
source_id = f"{prefix}:{rel}"
content = _load_file(f)
if content:
out.append((source_id, content))
return out
def load_sources(config: RAGConfig | None = None) -> list[tuple[str, str]]:
"""
بارگذاری سه منبع: لحن، پایگاه دانش، اطلاعات کاربر.
Returns:
[(source_id, content), ...]
source_id مثال: tone, kb:file.txt, user:profile.txt
"""
cfg = config or load_rag_config()
base = Path(__file__).resolve().parent.parent
sources: list[tuple[str, str]] = []
# ۱. لحن
tone_path = _resolve_path(base, cfg.tone_file)
content = _load_file(tone_path)
if content:
sources.append(("tone", content))
# ۲. پایگاه دانش
kb_path = _resolve_path(base, cfg.knowledge_base_path)
for sid, c in _load_files_from_dir(kb_path, prefix="kb"):
sources.append((sid, c))
if kb_path.is_file():
content = _load_file(kb_path)
if content:
sources.append((f"kb:{kb_path.name}", content))
# ۳. اطلاعات کاربر
user_path = _resolve_path(base, cfg.user_info_path)
for sid, c in _load_files_from_dir(user_path, prefix="user"):
sources.append((sid, c))
if user_path.is_file():
content = _load_file(user_path)
if content:
sources.append((f"user:{user_path.name}", content))
return sources
def ingest(recreate: bool = False, config: RAGConfig | None = None) -> dict:
"""
ورودی کامل: منابع را می‌خواند، چانک می‌کند، embed می‌کند و به vector store می‌فرستد.
Args:
recreate: اگر True باشد، collection را از نو می‌سازد
config: تنظیمات RAG
Returns:
آمار ورودی (تعداد چانک، منبع‌ها، خطاها)
"""
cfg = config or load_rag_config()
store = QdrantVectorStore(config=cfg)
if recreate:
store.ensure_collection(recreate=True)
sources = load_sources(config=cfg)
if not sources:
return {"chunks_added": 0, "sources": [], "error": "هیچ منبعی یافت نشد"}
all_chunks: list[str] = []
all_metas: list[dict] = []
all_ids: list[str] = []
for source_id, content in sources:
chunks = chunk_text(content, config=cfg)
for i, ch in enumerate(chunks):
uid = str(uuid.uuid4())
all_ids.append(uid)
all_chunks.append(ch)
all_metas.append({"source": source_id, "chunk_index": i})
if not all_chunks:
return {"chunks_added": 0, "sources": [s[0] for s in sources], "error": "هیچ چانکی ساخته نشد"}
embeddings = embed_texts(all_chunks, config=cfg)
if len(embeddings) != len(all_chunks):
return {
"chunks_added": 0,
"sources": [s[0] for s in sources],
"error": f"تعداد embed با چانک‌ها مطابقت ندارد: {len(embeddings)} vs {len(all_chunks)}",
}
store.add_documents(
ids=all_ids,
embeddings=embeddings,
documents=all_chunks,
metadatas=all_metas,
)
return {
"chunks_added": len(all_chunks),
"sources": [s[0] for s in sources],
}