From 2c42ebe01ca7b9ca24a625888afbf21b1e4d6741 Mon Sep 17 00:00:00 2001 From: Mohammad Sajad Pourajam Date: Fri, 27 Feb 2026 20:06:46 +0330 Subject: [PATCH] Refactor user data handling and enhance chat functionality - Removed deprecated user_info files and paths from configuration. - Added user soil data integration in chat context to improve response accuracy. - Updated build_rag_context and chat_rag_stream functions to include sensor_uuid for user-specific data retrieval. - Enhanced load_sources function to load user data from the database. - Implemented filtering in search_with_query and QdrantVectorStore to isolate user data based on sensor_uuid. - Introduced Celery Beat schedule for periodic user data ingestion. --- config/rag_config.yaml | 1 - config/settings.py | 8 ++ config/user_info/README.md | 3 - config/user_info/farm_soil_mock.json | 44 ---------- rag/__init__.py | 3 + rag/chat.py | 49 +++++++---- rag/config.py | 2 - rag/ingest.py | 46 ++++++----- rag/retrieve.py | 6 ++ rag/tasks.py | 17 ++++ rag/user_data.py | 117 +++++++++++++++++++++++++++ rag/vector_store.py | 18 +++++ rag/views.py | 21 ++++- 13 files changed, 246 insertions(+), 89 deletions(-) delete mode 100644 config/user_info/README.md delete mode 100644 config/user_info/farm_soil_mock.json create mode 100644 rag/tasks.py create mode 100644 rag/user_data.py diff --git a/config/rag_config.yaml b/config/rag_config.yaml index a083764..d39fa1b 100644 --- a/config/rag_config.yaml +++ b/config/rag_config.yaml @@ -26,4 +26,3 @@ llm: tone_file: "config/tone.txt" knowledge_base_path: "config/knowledge_base" -user_info_path: "config/user_info" diff --git a/config/settings.py b/config/settings.py index 514c96b..3437c1c 100644 --- a/config/settings.py +++ b/config/settings.py @@ -107,3 +107,11 @@ CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/ CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0") CELERY_ACCEPT_CONTENT = ["json"] CELERY_TASK_SERIALIZER = "json" + +# Celery Beat — embed دیتای کاربران هر ۶ ساعت +CELERY_BEAT_SCHEDULE = { + "rag-ingest-periodic": { + "task": "rag.tasks.rag_ingest_task", + "schedule": 6 * 60 * 60, # ۶ ساعت + }, +} diff --git a/config/user_info/README.md b/config/user_info/README.md deleted file mode 100644 index 4542240..0000000 --- a/config/user_info/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# اطلاعات کاربران - -فایل‌های `.txt` و `.md` این پوشه به‌عنوان اطلاعات هر کاربر embed و ذخیره می‌شوند. diff --git a/config/user_info/farm_soil_mock.json b/config/user_info/farm_soil_mock.json deleted file mode 100644 index d8b2479..0000000 --- a/config/user_info/farm_soil_mock.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "farm": { - "name": "مزرعه نمونه گلستان", - "location": { - "latitude": 36.2, - "longitude": 52.5 - } - }, - "soil_data": { - "0-5cm": { - "phh2o": 7.2, - "clay": 25, - "sand": 45, - "silt": 30, - "soc": 1.4, - "nitrogen": 0.12 - }, - "5-15cm": { - "phh2o": 7.4, - "clay": 28, - "sand": 42, - "silt": 30, - "soc": 1.1, - "nitrogen": 0.09 - }, - "15-30cm": { - "phh2o": 7.5, - "clay": 30, - "sand": 40, - "silt": 30, - "soc": 0.8, - "nitrogen": 0.07 - } - }, - "sensor_readings": { - "soil_moisture": 32, - "soil_temperature": 24.5, - "soil_ph": 7.1, - "electrical_conductivity": 2.1, - "nitrogen": 15, - "phosphorus": 8, - "potassium": 180 - } -} diff --git a/rag/__init__.py b/rag/__init__.py index 72f8864..5c534ee 100644 --- a/rag/__init__.py +++ b/rag/__init__.py @@ -10,6 +10,7 @@ from .config import load_rag_config from .embedding import embed_single, embed_texts from .ingest import ingest, load_sources from .retrieve import search_with_query +from .user_data import build_user_soil_text, load_user_sources from .vector_store import QdrantVectorStore __all__ = [ @@ -22,6 +23,8 @@ __all__ = [ "ingest", "load_rag_config", "load_sources", + "load_user_sources", + "build_user_soil_text", "QdrantVectorStore", "search_with_query", ] diff --git a/rag/chat.py b/rag/chat.py index 0dae738..8d2628e 100644 --- a/rag/chat.py +++ b/rag/chat.py @@ -8,6 +8,7 @@ from openai import OpenAI from .config import load_rag_config, RAGConfig from .retrieve import search_with_query +from .user_data import build_user_soil_text def _get_chat_client(config: RAGConfig | None) -> OpenAI: @@ -32,32 +33,49 @@ def _load_tone(config: RAGConfig | None) -> str: return "" -def build_rag_context(query: str, config: RAGConfig | None = None, limit: int = 5) -> str: +def build_rag_context( + query: str, + sensor_uuid: str, + config: RAGConfig | None = None, + limit: int = 8, +) -> str: """ - بازیابی متن‌های مرتبط از RAG برای کوئری کاربر. + ساخت context برای LLM: دیتای فعلی خاک کاربر + متن‌های مرتبط از RAG. + دیتای کاربر همیشه اول می‌آید تا LLM مقادیر واقعی (مثل pH) را ببیند. """ - results = search_with_query(query, limit=limit, config=config) - if not results: - return "" - parts = [] - for r in results: - text = r.get("text", "").strip() - if text: - parts.append(text) - return "\n\n---\n\n".join(parts) + parts: list[str] = [] + + # ۱. دیتای فعلی خاک کاربر از DB — همیشه اول (برای سوالاتی مثل «pH خاک من چند است») + user_soil = build_user_soil_text(sensor_uuid) + if user_soil and user_soil.strip(): + parts.append("[داده‌های فعلی خاک شما]\n" + user_soil.strip()) + + # ۲. متن‌های مرتبط از RAG + results = search_with_query( + query, sensor_uuid=sensor_uuid, limit=limit, config=config + ) + if results: + rag_texts = [r.get("text", "").strip() for r in results if r.get("text")] + if rag_texts: + parts.append("[متن‌های مرجع]\n" + "\n\n---\n\n".join(rag_texts)) + + return "\n\n---\n\n".join(parts) if parts else "" def chat_rag_stream( query: str, + sensor_uuid: str, config: RAGConfig | None = None, limit: int = 5, system_override: str | None = None, ): """ چت RAG با استریم: دیتای embed شده را بازیابی می‌کند و با LLM جواب می‌دهد. + فقط دیتای همان کاربر (sensor_uuid) قابل دسترسی است. Args: query: پیام کاربر + sensor_uuid: شناسه سنسور کاربر — اجباری config: تنظیمات RAG limit: تعداد چانک‌های بازیابی‌شده system_override: جایگزین system prompt (اختیاری) @@ -69,7 +87,7 @@ def chat_rag_stream( client = _get_chat_client(cfg) model = cfg.llm.model - context = build_rag_context(query, config=cfg, limit=limit) + context = build_rag_context(query, sensor_uuid, config=cfg, limit=limit) if system_override is not None: system_content = system_override @@ -77,11 +95,12 @@ def chat_rag_stream( tone = _load_tone(cfg) system_parts = [tone] if tone else [] system_parts.append( - "با استفاده از بخش «متن‌های مرجع» زیر به سوال کاربر پاسخ بده. " - "فقط در حد نیاز از مرجع استفاده کن و پاسخ را به زبان کاربر بنویس." + "با استفاده از بخش «داده‌های فعلی خاک شما» و «متن‌های مرجع» زیر به سوال کاربر پاسخ بده. " + "برای سوالاتی درباره خاک کاربر (مثل pH، رطوبت، NPK) حتماً از داده‌های فعلی استفاده کن. " + "پاسخ را به زبان کاربر بنویس." ) if context: - system_parts.append("\n\nمتن‌های مرجع:\n" + context) + system_parts.append("\n\n" + context) system_content = "\n".join(system_parts) messages = [ diff --git a/rag/config.py b/rag/config.py index 908f718..c1d3584 100644 --- a/rag/config.py +++ b/rag/config.py @@ -47,7 +47,6 @@ class RAGConfig: llm: LLMConfig = field(default_factory=LLMConfig) tone_file: str = "config/tone.txt" knowledge_base_path: str = "config/knowledge_base" - user_info_path: str = "config/user_info" chromadb: dict[str, Any] = field(default_factory=dict) @@ -104,6 +103,5 @@ def load_rag_config(config_path: str | Path | None = None) -> RAGConfig: llm=llm, tone_file=data.get("tone_file", "config/tone.txt"), knowledge_base_path=data.get("knowledge_base_path", "config/knowledge_base"), - user_info_path=data.get("user_info_path", "config/user_info"), chromadb=data.get("chromadb", {}), ) diff --git a/rag/ingest.py b/rag/ingest.py index 174896f..e668dd3 100644 --- a/rag/ingest.py +++ b/rag/ingest.py @@ -2,9 +2,9 @@ پایپ‌لاین ورودی RAG: خواندن، چانک، embed و ذخیره در vector store سه منبع: -۱. لحن (tone) -۲. پایگاه دانش (knowledge base) -۳. اطلاعات هر کاربر (user info) +۱. لحن (tone) — sensor_uuid=__global__ +۲. پایگاه دانش (knowledge base) — sensor_uuid=__global__ +۳. دیتای خاک هر کاربر از DB (sensor_data + soil_data) — sensor_uuid=uuid """ import uuid from pathlib import Path @@ -12,11 +12,14 @@ from pathlib import Path from .chunker import chunk_text, chunk_texts from .config import load_rag_config, RAGConfig from .embedding import embed_texts +from .user_data import load_user_sources from .vector_store import QdrantVectorStore # پسوندهای قابل خواندن TEXT_EXTENSIONS = {".txt", ".md", ".rst", ".json"} +SENSOR_UUID_GLOBAL = "__global__" + def _resolve_path(base: Path, p: str) -> Path: """تبدیل مسیر نسبی به مطلق نسبت به base پروژه.""" @@ -54,41 +57,37 @@ def _load_files_from_dir(dir_path: Path, prefix: str = "kb") -> list[tuple[str, return out -def load_sources(config: RAGConfig | None = None) -> list[tuple[str, str]]: +def load_sources(config: RAGConfig | None = None) -> list[tuple[str, str, str]]: """ - بارگذاری سه منبع: لحن، پایگاه دانش، اطلاعات کاربر. + بارگذاری سه منبع: لحن، پایگاه دانش، دیتای کاربر از DB. Returns: - [(source_id, content), ...] - source_id مثال: tone, kb:file.txt, user:profile.txt + [(source_id, content, sensor_uuid), ...] + sensor_uuid: __global__ برای tone/kb، uuid سنسور برای user """ cfg = config or load_rag_config() base = Path(__file__).resolve().parent.parent - sources: list[tuple[str, str]] = [] + sources: list[tuple[str, str, str]] = [] # ۱. لحن tone_path = _resolve_path(base, cfg.tone_file) content = _load_file(tone_path) if content: - sources.append(("tone", content)) + sources.append(("tone", content, SENSOR_UUID_GLOBAL)) # ۲. پایگاه دانش kb_path = _resolve_path(base, cfg.knowledge_base_path) for sid, c in _load_files_from_dir(kb_path, prefix="kb"): - sources.append((sid, c)) + sources.append((sid, c, SENSOR_UUID_GLOBAL)) if kb_path.is_file(): content = _load_file(kb_path) if content: - sources.append((f"kb:{kb_path.name}", content)) + sources.append((f"kb:{kb_path.name}", content, SENSOR_UUID_GLOBAL)) - # ۳. اطلاعات کاربر - user_path = _resolve_path(base, cfg.user_info_path) - for sid, c in _load_files_from_dir(user_path, prefix="user"): - sources.append((sid, c)) - if user_path.is_file(): - content = _load_file(user_path) - if content: - sources.append((f"user:{user_path.name}", content)) + # ۳. دیتای کاربران از sensor_data + soil_data + for sid, content in load_user_sources(): + sensor_uuid = sid.replace("user:", "") + sources.append((sid, content, sensor_uuid)) return sources @@ -96,6 +95,7 @@ def load_sources(config: RAGConfig | None = None) -> list[tuple[str, str]]: def ingest(recreate: bool = False, config: RAGConfig | None = None) -> dict: """ ورودی کامل: منابع را می‌خواند، چانک می‌کند، embed می‌کند و به vector store می‌فرستد. + دیتای هر کاربر (sensor_uuid) جدا embed و با metadata ذخیره می‌شود. Args: recreate: اگر True باشد، collection را از نو می‌سازد @@ -117,13 +117,17 @@ def ingest(recreate: bool = False, config: RAGConfig | None = None) -> dict: all_metas: list[dict] = [] all_ids: list[str] = [] - for source_id, content in sources: + for source_id, content, sensor_uuid in sources: chunks = chunk_text(content, config=cfg) for i, ch in enumerate(chunks): uid = str(uuid.uuid4()) all_ids.append(uid) all_chunks.append(ch) - all_metas.append({"source": source_id, "chunk_index": i}) + all_metas.append({ + "source": source_id, + "chunk_index": i, + "sensor_uuid": sensor_uuid, + }) if not all_chunks: return {"chunks_added": 0, "sources": [s[0] for s in sources], "error": "هیچ چانکی ساخته نشد"} diff --git a/rag/retrieve.py b/rag/retrieve.py index 20423a4..c64fce0 100644 --- a/rag/retrieve.py +++ b/rag/retrieve.py @@ -8,12 +8,17 @@ from .vector_store import QdrantVectorStore def search_with_query( query: str, + sensor_uuid: str, limit: int = 5, score_threshold: float | None = None, config: RAGConfig | None = None, ) -> list[dict]: """ کوئری را embed می‌کند و در vector store جستجو می‌کند. + فقط chunks مربوط به sensor_uuid یا __global__ برمی‌گردد (ایزوله‌سازی کاربر). + + Args: + sensor_uuid: شناسه سنسور کاربر — اجباری برای امنیت Returns: لیست نتایج با id, score, text, metadata @@ -25,4 +30,5 @@ def search_with_query( query_vector=query_vector, limit=limit, score_threshold=score_threshold, + sensor_uuid=sensor_uuid, ) diff --git a/rag/tasks.py b/rag/tasks.py new file mode 100644 index 0000000..a7cf177 --- /dev/null +++ b/rag/tasks.py @@ -0,0 +1,17 @@ +""" +تسک‌های Celery برای RAG +""" +from config.celery import app + +from .ingest import ingest + + +@app.task +def rag_ingest_task(recreate: bool = True): + """ + embed و ذخیره دیتای همه کاربران در Qdrant. + هر چند ساعت یکبار اجرا شود (از طریق Celery Beat). + recreate=True: collection از نو ساخته می‌شود تا دیتای قدیمی حذف شود. + """ + result = ingest(recreate=recreate) + return result diff --git a/rag/user_data.py b/rag/user_data.py new file mode 100644 index 0000000..d184b04 --- /dev/null +++ b/rag/user_data.py @@ -0,0 +1,117 @@ +""" +ساخت دیتای خاک کاربر از sensor_data و soil_data — Schema-agnostic +هر سنسور = یک کاربر. شناسایی با uuid_sensor. + +مدل‌های Django داخل توابع import می‌شوند تا از AppRegistryNotReady جلوگیری شود. +""" +from django.db.models import Model + + +# فیلدهایی که در متن embed نباید بیایند (شناسه‌ها، رابطه‌ها) +EXCLUDE_FIELD_NAMES = {"id", "created_at", "updated_at", "task_id", "recorded_at"} + + +def _model_to_data_fields(instance: Model, exclude: set[str] | None = None) -> dict: + """ + استخراج فیلدهای داده از یک instance با استفاده از introspection. + تغییرات بعدی در مدل باعث شکستن نمی‌شود. + """ + exclude = exclude or set() + out: dict = {} + for f in instance._meta.get_fields(): + if f.many_to_many or f.one_to_many or f.one_to_one and f.auto_created: + continue + if f.name in exclude or f.name in EXCLUDE_FIELD_NAMES: + continue + if hasattr(f, "related_model") and f.related_model: + continue # FK + try: + val = getattr(instance, f.name, None) + if val is not None: + out[f.name] = val + except Exception: + pass + return out + + +def build_user_soil_text(sensor_uuid: str) -> str | None: + """ + ساخت متن قابل embed برای یک سنسور (کاربر). + از SensorData → SoilLocation → SoilDepthData خوانده می‌شود. + + Returns: + متن متنی قابل چانک، یا None اگر سنسور یافت نشد. + """ + from sensor_data.models import SensorData + from soil_data.models import SoilDepthData + + try: + sensor = SensorData.objects.select_related("location").get( + uuid_sensor=sensor_uuid + ) + except SensorData.DoesNotExist: + return None + + parts: list[str] = [] + + # شناسه سنسور + parts.append(f"سنسور: {sensor.uuid_sensor}") + + # موقعیت مزرعه + loc = sensor.location + parts.append( + f"موقعیت مزرعه: عرض {loc.latitude}، طول {loc.longitude}" + ) + + # خوانش‌های سنسور (schema-agnostic) + sensor_fields = _model_to_data_fields( + sensor, exclude={"uuid_sensor", "location_id", "location"} + ) + if sensor_fields: + sensor_lines = [f" {k}: {v}" for k, v in sorted(sensor_fields.items())] + parts.append("خوانش‌های سنسور:\n" + "\n".join(sensor_lines)) + + # داده‌های خاک به تفکیک عمق + depths = ( + SoilDepthData.objects.filter(soil_location=loc) + .order_by("depth_label") + .all() + ) + if depths: + depth_parts = [] + for d in depths: + d_data = _model_to_data_fields( + d, exclude={"soil_location", "soil_location_id"} + ) + if d_data: + lines = [f" {k}: {v}" for k, v in sorted(d_data.items())] + depth_parts.append(f" عمق {d.depth_label}:\n" + "\n".join(lines)) + if depth_parts: + parts.append("داده‌های خاک:\n" + "\n".join(depth_parts)) + + return "\n\n".join(parts) if parts else None + + +def get_all_sensor_uuids() -> list[str]: + """لیست همه uuid_sensor های موجود.""" + from sensor_data.models import SensorData + + return [ + str(u) for u in + SensorData.objects.values_list("uuid_sensor", flat=True).distinct() + ] + + +def load_user_sources() -> list[tuple[str, str]]: + """ + بارگذاری منابع دیتای کاربران از DB. + Returns: [(source_id, content), ...] + source_id = user:{sensor_uuid} + """ + uuids = get_all_sensor_uuids() + sources: list[tuple[str, str]] = [] + for uid in uuids: + text = build_user_soil_text(str(uid)) + if text and text.strip(): + sources.append((f"user:{uid}", text)) + return sources diff --git a/rag/vector_store.py b/rag/vector_store.py index 11cbb57..01c2259 100644 --- a/rag/vector_store.py +++ b/rag/vector_store.py @@ -95,16 +95,34 @@ class QdrantVectorStore: query_vector: list[float], limit: int = 5, score_threshold: float | None = None, + sensor_uuid: str | None = None, ) -> list[dict]: """ جستجوی شباهت بر اساس query vector. از query_points استفاده می‌کند (qdrant-client >= 2.0). + sensor_uuid: اجباری — فقط chunks مربوط به این سنسور یا __global__ برگردانده می‌شود. """ + query_filter = None + if sensor_uuid: + query_filter = qmodels.Filter( + should=[ + qmodels.FieldCondition( + key="sensor_uuid", + match=qmodels.MatchValue(value=sensor_uuid), + ), + qmodels.FieldCondition( + key="sensor_uuid", + match=qmodels.MatchValue(value="__global__"), + ), + ] + ) + response = self.client.query_points( collection_name=self.qdrant.collection_name, query=query_vector, limit=limit, score_threshold=score_threshold, + query_filter=query_filter, ) points = getattr(response, "points", []) or [] diff --git a/rag/views.py b/rag/views.py index e33c038..8c66a51 100644 --- a/rag/views.py +++ b/rag/views.py @@ -13,11 +13,15 @@ from .chat import chat_rag_stream class ChatView(APIView): """ چت RAG با استریم. - POST با {"message": "متن سوال"} یا query param message + POST با {"message": "متن سوال", "sensor_uuid": "uuid-سنسور"} + sensor_uuid اجباری — هر کاربر فقط به دیتای خودش دسترسی دارد. """ def post(self, request: Request): - message = request.data.get("message") or request.query_params.get("message") + data = request.data if request.method == "POST" else request.query_params + message = data.get("message") + sensor_uuid = data.get("sensor_uuid") + if not message or not isinstance(message, str): return Response( {"code": 400, "msg": "پارامتر message الزامی است."}, @@ -29,10 +33,21 @@ class ChatView(APIView): {"code": 400, "msg": "پیام نباید خالی باشد."}, status=status.HTTP_400_BAD_REQUEST, ) + if not sensor_uuid or not isinstance(sensor_uuid, str): + return Response( + {"code": 400, "msg": "پارامتر sensor_uuid الزامی است."}, + status=status.HTTP_400_BAD_REQUEST, + ) + sensor_uuid = str(sensor_uuid).strip() + if not sensor_uuid: + return Response( + {"code": 400, "msg": "sensor_uuid نباید خالی باشد."}, + status=status.HTTP_400_BAD_REQUEST, + ) def generate(): try: - for chunk in chat_rag_stream(message): + for chunk in chat_rag_stream(message, sensor_uuid=sensor_uuid): yield chunk except Exception as e: yield f"\n[خطا: {e}]"