Files
Ai/rag/ingest.py
T
sajad-dev 2c42ebe01c Refactor user data handling and enhance chat functionality
- Removed deprecated user_info files and paths from configuration.
- Added user soil data integration in chat context to improve response accuracy.
- Updated build_rag_context and chat_rag_stream functions to include sensor_uuid for user-specific data retrieval.
- Enhanced load_sources function to load user data from the database.
- Implemented filtering in search_with_query and QdrantVectorStore to isolate user data based on sensor_uuid.
- Introduced Celery Beat schedule for periodic user data ingestion.
2026-02-27 20:06:46 +03:30

153 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
پایپ‌لاین ورودی RAG: خواندن، چانک، embed و ذخیره در vector store
سه منبع:
۱. لحن (tone) — sensor_uuid=__global__
۲. پایگاه دانش (knowledge base) — sensor_uuid=__global__
۳. دیتای خاک هر کاربر از DB (sensor_data + soil_data) — sensor_uuid=uuid
"""
import uuid
from pathlib import Path
from .chunker import chunk_text, chunk_texts
from .config import load_rag_config, RAGConfig
from .embedding import embed_texts
from .user_data import load_user_sources
from .vector_store import QdrantVectorStore
# پسوندهای قابل خواندن
TEXT_EXTENSIONS = {".txt", ".md", ".rst", ".json"}
SENSOR_UUID_GLOBAL = "__global__"
def _resolve_path(base: Path, p: str) -> Path:
"""تبدیل مسیر نسبی به مطلق نسبت به base پروژه."""
path = Path(p)
if not path.is_absolute():
path = base / path
return path
def _load_file(path: Path) -> str | None:
"""خواندن یک فایل متنی."""
if not path.exists() or not path.is_file():
return None
try:
return path.read_text(encoding="utf-8").strip()
except Exception:
return None
def _load_files_from_dir(dir_path: Path, prefix: str = "kb") -> list[tuple[str, str]]:
"""
خواندن همه فایل‌های متنی از یک دایرکتوری.
Returns: [(source_id, content), ...]
"""
if not dir_path.exists() or not dir_path.is_dir():
return []
out: list[tuple[str, str]] = []
for f in sorted(dir_path.rglob("*")):
if f.is_file() and f.suffix.lower() in TEXT_EXTENSIONS:
rel = f.relative_to(dir_path)
source_id = f"{prefix}:{rel}"
content = _load_file(f)
if content:
out.append((source_id, content))
return out
def load_sources(config: RAGConfig | None = None) -> list[tuple[str, str, str]]:
"""
بارگذاری سه منبع: لحن، پایگاه دانش، دیتای کاربر از DB.
Returns:
[(source_id, content, sensor_uuid), ...]
sensor_uuid: __global__ برای tone/kb، uuid سنسور برای user
"""
cfg = config or load_rag_config()
base = Path(__file__).resolve().parent.parent
sources: list[tuple[str, str, str]] = []
# ۱. لحن
tone_path = _resolve_path(base, cfg.tone_file)
content = _load_file(tone_path)
if content:
sources.append(("tone", content, SENSOR_UUID_GLOBAL))
# ۲. پایگاه دانش
kb_path = _resolve_path(base, cfg.knowledge_base_path)
for sid, c in _load_files_from_dir(kb_path, prefix="kb"):
sources.append((sid, c, SENSOR_UUID_GLOBAL))
if kb_path.is_file():
content = _load_file(kb_path)
if content:
sources.append((f"kb:{kb_path.name}", content, SENSOR_UUID_GLOBAL))
# ۳. دیتای کاربران از sensor_data + soil_data
for sid, content in load_user_sources():
sensor_uuid = sid.replace("user:", "")
sources.append((sid, content, sensor_uuid))
return sources
def ingest(recreate: bool = False, config: RAGConfig | None = None) -> dict:
"""
ورودی کامل: منابع را می‌خواند، چانک می‌کند، embed می‌کند و به vector store می‌فرستد.
دیتای هر کاربر (sensor_uuid) جدا embed و با metadata ذخیره می‌شود.
Args:
recreate: اگر True باشد، collection را از نو می‌سازد
config: تنظیمات RAG
Returns:
آمار ورودی (تعداد چانک، منبع‌ها، خطاها)
"""
cfg = config or load_rag_config()
store = QdrantVectorStore(config=cfg)
if recreate:
store.ensure_collection(recreate=True)
sources = load_sources(config=cfg)
if not sources:
return {"chunks_added": 0, "sources": [], "error": "هیچ منبعی یافت نشد"}
all_chunks: list[str] = []
all_metas: list[dict] = []
all_ids: list[str] = []
for source_id, content, sensor_uuid in sources:
chunks = chunk_text(content, config=cfg)
for i, ch in enumerate(chunks):
uid = str(uuid.uuid4())
all_ids.append(uid)
all_chunks.append(ch)
all_metas.append({
"source": source_id,
"chunk_index": i,
"sensor_uuid": sensor_uuid,
})
if not all_chunks:
return {"chunks_added": 0, "sources": [s[0] for s in sources], "error": "هیچ چانکی ساخته نشد"}
embeddings = embed_texts(all_chunks, config=cfg)
if len(embeddings) != len(all_chunks):
return {
"chunks_added": 0,
"sources": [s[0] for s in sources],
"error": f"تعداد embed با چانک‌ها مطابقت ندارد: {len(embeddings)} vs {len(all_chunks)}",
}
store.add_documents(
ids=all_ids,
embeddings=embeddings,
documents=all_chunks,
metadatas=all_metas,
)
return {
"chunks_added": len(all_chunks),
"sources": [s[0] for s in sources],
}