From d4d420a03a3f40d38f29349d2b081b73d93b491f Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Tue, 23 Sep 2025 15:47:33 +0200 Subject: [PATCH] rag improvements 2 --- .gitignore | 66 + backend/app/modules/chatbot/__init__.py | 21 + .../app/modules/chatbot/config_schema.json | 126 ++ .../examples/customer_support_workflow.json | 182 ++ backend/app/modules/chatbot/main.py | 936 ++++++++ backend/app/modules/chatbot/module.yaml | 110 + backend/app/modules/factory.py | 225 ++ backend/app/modules/protocols.py | 258 +++ backend/app/modules/rag/__init__.py | 6 + backend/app/modules/rag/main.py | 1922 +++++++++++++++++ backend/app/modules/rag/module.yaml | 82 + backend/app/services/jsonl_processor.py | 211 ++ backend/app/services/qdrant_stats_service.py | 163 ++ 13 files changed, 4308 insertions(+) create mode 100644 backend/app/modules/chatbot/__init__.py create mode 100644 backend/app/modules/chatbot/config_schema.json create mode 100644 backend/app/modules/chatbot/examples/customer_support_workflow.json create mode 100644 backend/app/modules/chatbot/main.py create mode 100644 backend/app/modules/chatbot/module.yaml create mode 100644 backend/app/modules/factory.py create mode 100644 backend/app/modules/protocols.py create mode 100644 backend/app/modules/rag/__init__.py create mode 100644 backend/app/modules/rag/main.py create mode 100644 backend/app/modules/rag/module.yaml create mode 100644 backend/app/services/jsonl_processor.py create mode 100644 backend/app/services/qdrant_stats_service.py diff --git a/.gitignore b/.gitignore index e69de29..6642c56 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1,66 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +*.env +*.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +*.sqlite3 +*.db + +# FastAPI logs +*.log + +# Node.js +node_modules/ +npm-debug.log* +yarn-debug.log* +yarn-error.log* +pnpm-debug.log* + +# Next.js build +frontend/.next/ +frontend/out/ +frontend/.env.local +frontend/.env.production +frontend/.env.development + + +backend/storage/ +# TypeScript +*.tsbuildinfo + +# Coverage reports +htmlcov/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.pyc +*.pyo +*.pyd +.pytest_cache/ +backend/.pytest_cache/ +backend/.mypy_cache/ +.mypy_cache/ +*.prof + +backend/_to_delete/ +backend/__pycache__/ +backend/app/core/__pycache__/ +backend/app/services/__pycache__/ +backend/app/services/llm/__pycache__/ +backend/app/services/llm/providers/__pycache__/ +backend/app/utils/__pycache__/ +backend/modules/rag/__pycache__/ +frontend/.next/ +frontend/node_modules/ +node_modules/ +venv/ \ No newline at end of file diff --git a/backend/app/modules/chatbot/__init__.py b/backend/app/modules/chatbot/__init__.py new file mode 100644 index 0000000..5131eeb --- /dev/null +++ b/backend/app/modules/chatbot/__init__.py @@ -0,0 +1,21 @@ +""" +Chatbot Module - AI Chatbot with RAG Integration + +This module provides AI chatbot capabilities with: +- Multiple personality types (Assistant, Customer Support, Teacher, etc.) +- RAG integration for knowledge-based responses +- Conversation memory and context management +- Workflow integration as building blocks +- UI-configurable settings +""" + +from .main import ChatbotModule, create_module + +__version__ = "1.0.0" +__author__ = "Enclava Team" + +# Export main classes for easy importing +__all__ = [ + "ChatbotModule", + "create_module" +] \ No newline at end of file diff --git a/backend/app/modules/chatbot/config_schema.json b/backend/app/modules/chatbot/config_schema.json new file mode 100644 index 0000000..d4a5dc7 --- /dev/null +++ b/backend/app/modules/chatbot/config_schema.json @@ -0,0 +1,126 @@ +{ + "title": "Chatbot Configuration", + "type": "object", + "properties": { + "name": { + "type": "string", + "title": "Chatbot Name", + "description": "Display name for this chatbot instance", + "minLength": 1, + "maxLength": 100 + }, + "chatbot_type": { + "type": "string", + "title": "Chatbot Type", + "description": "Select the type of chatbot personality", + "enum": ["assistant", "customer_support", "teacher", "researcher", "creative_writer", "custom"], + "enumNames": ["General Assistant", "Customer Support", "Teacher", "Researcher", "Creative Writer", "Custom"], + "default": "assistant" + }, + "model": { + "type": "string", + "title": "AI Model", + "description": "Choose the LLM model for responses", + "enum": ["gpt-4", "gpt-3.5-turbo", "claude-3-sonnet", "claude-3-opus", "llama-70b"], + "default": "gpt-3.5-turbo" + }, + "system_prompt": { + "type": "string", + "title": "System Prompt", + "description": "Define the chatbot's personality and behavior instructions", + "ui:widget": "textarea", + "ui:options": { + "rows": 6, + "placeholder": "You are a helpful AI assistant..." + } + }, + "use_rag": { + "type": "boolean", + "title": "Enable Knowledge Base", + "description": "Use RAG to search knowledge base for context", + "default": false + }, + "rag_collection": { + "type": "string", + "title": "Knowledge Base Collection", + "description": "Select which document collection to search", + "ui:widget": "rag-collection-selector", + "ui:condition": "use_rag === true" + }, + "rag_top_k": { + "type": "integer", + "title": "Knowledge Base Results", + "description": "Number of relevant documents to include", + "minimum": 1, + "maximum": 10, + "default": 5, + "ui:condition": "use_rag === true" + }, + "temperature": { + "type": "number", + "title": "Response Creativity", + "description": "Controls randomness (0.0 = focused, 1.0 = creative)", + "minimum": 0, + "maximum": 1, + "default": 0.7, + "ui:widget": "range", + "ui:options": { + "step": 0.1 + } + }, + "max_tokens": { + "type": "integer", + "title": "Maximum Response Length", + "description": "Maximum number of tokens in response", + "minimum": 50, + "maximum": 4000, + "default": 1000, + "ui:widget": "range", + "ui:options": { + "step": 50 + } + }, + "memory_length": { + "type": "integer", + "title": "Conversation Memory", + "description": "Number of previous message pairs to remember", + "minimum": 1, + "maximum": 50, + "default": 10, + "ui:widget": "range" + }, + "fallback_responses": { + "type": "array", + "title": "Fallback Responses", + "description": "Responses to use when the AI cannot answer", + "items": { + "type": "string", + "title": "Fallback Response" + }, + "default": [ + "I'm not sure how to help with that. Could you please rephrase your question?", + "I don't have enough information to answer that question accurately.", + "That's outside my knowledge area. Is there something else I can help you with?" + ], + "ui:options": { + "orderable": true, + "addable": true, + "removable": true + } + } + }, + "required": ["name", "chatbot_type", "model"], + "ui:order": [ + "name", + "chatbot_type", + "model", + "system_prompt", + "use_rag", + "rag_collection", + "rag_top_k", + "temperature", + "max_tokens", + "memory_length", + "fallback_responses" + ] +} \ No newline at end of file diff --git a/backend/app/modules/chatbot/examples/customer_support_workflow.json b/backend/app/modules/chatbot/examples/customer_support_workflow.json new file mode 100644 index 0000000..7d22781 --- /dev/null +++ b/backend/app/modules/chatbot/examples/customer_support_workflow.json @@ -0,0 +1,182 @@ +{ + "name": "Customer Support Workflow", + "description": "Intelligent customer support workflow with intent classification, knowledge base search, and chatbot response generation", + "version": "1.0", + "variables": { + "support_chatbot_id": "cs-bot-001", + "escalation_threshold": 0.3, + "max_attempts": 3 + }, + "steps": [ + { + "id": "classify_intent", + "name": "Classify Customer Intent", + "type": "llm_call", + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "system", + "content": "You are an intent classifier for customer support. Classify the customer message into one of these categories: technical_issue, billing_question, feature_request, complaint, general_inquiry. Also provide a confidence score between 0 and 1. Respond with JSON: {\"intent\": \"category\", \"confidence\": 0.95, \"reasoning\": \"explanation\"}" + }, + { + "role": "user", + "content": "{{ inputs.customer_message }}" + } + ], + "output_variable": "intent_classification" + }, + + { + "id": "search_knowledge_base", + "name": "Search Knowledge Base", + "type": "workflow_step", + "module": "rag", + "action": "search", + "config": { + "query": "{{ inputs.customer_message }}", + "collection": "support_documentation", + "top_k": 5, + "include_metadata": true + }, + "output_variable": "knowledge_results" + }, + + { + "id": "check_confidence", + "name": "Check Intent Confidence", + "type": "condition", + "condition": "JSON.parse(steps.classify_intent.result).confidence > variables.escalation_threshold", + "true_steps": [ + { + "id": "generate_chatbot_response", + "name": "Generate Chatbot Response", + "type": "workflow_step", + "module": "chatbot", + "action": "workflow_chat_step", + "config": { + "message": "{{ inputs.customer_message }}", + "chatbot_id": "{{ variables.support_chatbot_id }}", + "use_rag": true, + "context": { + "intent": "{{ steps.classify_intent.result }}", + "knowledge_base_results": "{{ steps.search_knowledge_base.result }}", + "customer_history": "{{ inputs.customer_history }}", + "additional_instructions": "Be empathetic and professional. If you cannot fully resolve the issue, offer to escalate to a human agent." + } + }, + "output_variable": "chatbot_response" + }, + + { + "id": "analyze_response_quality", + "name": "Analyze Response Quality", + "type": "llm_call", + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "system", + "content": "Analyze if this customer support response adequately addresses the customer's question. Consider completeness, accuracy, and helpfulness. Respond with JSON: {\"quality_score\": 0.85, \"is_adequate\": true, \"requires_escalation\": false, \"reasoning\": \"explanation\"}" + }, + { + "role": "user", + "content": "Customer Question: {{ inputs.customer_message }}\\n\\nChatbot Response: {{ steps.generate_chatbot_response.result.response }}\\n\\nKnowledge Base Context: {{ steps.search_knowledge_base.result }}" + } + ], + "output_variable": "response_quality" + }, + + { + "id": "final_response_decision", + "name": "Final Response Decision", + "type": "condition", + "condition": "JSON.parse(steps.analyze_response_quality.result).is_adequate === true", + "true_steps": [ + { + "id": "send_chatbot_response", + "name": "Send Chatbot Response", + "type": "output", + "config": { + "response_type": "chatbot_response", + "message": "{{ steps.generate_chatbot_response.result.response }}", + "sources": "{{ steps.generate_chatbot_response.result.sources }}", + "confidence": "{{ JSON.parse(steps.classify_intent.result).confidence }}", + "quality_score": "{{ JSON.parse(steps.analyze_response_quality.result).quality_score }}" + } + } + ], + "false_steps": [ + { + "id": "escalate_to_human", + "name": "Escalate to Human Agent", + "type": "output", + "config": { + "response_type": "human_escalation", + "message": "I'd like to connect you with one of our human support agents who can better assist with your specific situation. Please hold on while I transfer you.", + "escalation_reason": "Response quality below threshold", + "intent": "{{ steps.classify_intent.result }}", + "attempted_response": "{{ steps.generate_chatbot_response.result.response }}", + "priority": "normal" + } + } + ] + } + ], + "false_steps": [ + { + "id": "low_confidence_escalation", + "name": "Low Confidence Escalation", + "type": "output", + "config": { + "response_type": "human_escalation", + "message": "I want to make sure you get the best possible help. Let me connect you with one of our human support agents.", + "escalation_reason": "Low intent classification confidence", + "intent": "{{ steps.classify_intent.result }}", + "priority": "high" + } + } + ] + }, + + { + "id": "log_interaction", + "name": "Log Customer Interaction", + "type": "workflow_step", + "module": "analytics", + "action": "log_event", + "config": { + "event_type": "customer_support_interaction", + "data": { + "customer_message": "{{ inputs.customer_message }}", + "intent_classification": "{{ steps.classify_intent.result }}", + "response_generated": "{{ steps.generate_chatbot_response.result.response }}", + "knowledge_base_used": "{{ steps.search_knowledge_base.result }}", + "escalated": "{{ outputs.response_type === 'human_escalation' }}", + "workflow_execution_time": "{{ execution_time }}", + "timestamp": "{{ current_timestamp }}" + } + } + } + ], + + "outputs": { + "response_type": "string", + "message": "string", + "sources": "array", + "escalation_reason": "string", + "confidence": "number", + "quality_score": "number" + }, + + "error_handling": { + "retry_failed_steps": true, + "max_retries": 2, + "fallback_response": "I apologize, but I'm experiencing technical difficulties. Please contact our support team directly for assistance." + }, + + "metadata": { + "created_by": "support_team", + "use_case": "customer_support_automation", + "tags": ["customer_support", "chatbot", "rag", "escalation"], + "estimated_execution_time": "5-15 seconds" + } +} \ No newline at end of file diff --git a/backend/app/modules/chatbot/main.py b/backend/app/modules/chatbot/main.py new file mode 100644 index 0000000..3f9b8dc --- /dev/null +++ b/backend/app/modules/chatbot/main.py @@ -0,0 +1,936 @@ +""" +Chatbot Module Implementation + +Provides AI chatbot capabilities with: +- RAG integration for knowledge-based responses +- Custom prompts and personalities +- Conversation memory and context +- Workflow integration as building blocks +- UI-configurable settings +""" + +import json +from pprint import pprint +import uuid +from datetime import datetime, timedelta +from typing import Dict, List, Any, Optional, Union +from dataclasses import dataclass +from pydantic import BaseModel, Field +from enum import Enum + +from fastapi import APIRouter, HTTPException, Depends +from sqlalchemy.orm import Session + +from app.core.logging import get_logger +from app.services.llm.service import llm_service +from app.services.llm.models import ChatRequest as LLMChatRequest, ChatMessage as LLMChatMessage +from app.services.llm.exceptions import LLMError, ProviderError, SecurityError +from app.services.base_module import BaseModule, Permission +from app.models.user import User +from app.models.chatbot import ChatbotInstance as DBChatbotInstance, ChatbotConversation as DBConversation, ChatbotMessage as DBMessage, ChatbotAnalytics +from app.core.security import get_current_user +from app.db.database import get_db +from app.core.config import settings + +# Import protocols for type hints and dependency injection +from ..protocols import RAGServiceProtocol +# Note: LiteLLMClientProtocol replaced with direct LLM service usage + +logger = get_logger(__name__) + + +class ChatbotType(str, Enum): + """Types of chatbot personalities""" + ASSISTANT = "assistant" + CUSTOMER_SUPPORT = "customer_support" + TEACHER = "teacher" + RESEARCHER = "researcher" + CREATIVE_WRITER = "creative_writer" + CUSTOM = "custom" + + +class MessageRole(str, Enum): + """Message roles in conversation""" + USER = "user" + ASSISTANT = "assistant" + SYSTEM = "system" + + +@dataclass +class ChatbotConfig: + """Chatbot configuration""" + name: str + chatbot_type: str # Changed from ChatbotType enum to str to allow custom types + model: str + rag_collection: Optional[str] = None + system_prompt: str = "" + temperature: float = 0.7 + max_tokens: int = 1000 + memory_length: int = 10 # Number of previous messages to remember + use_rag: bool = False + rag_top_k: int = 5 + rag_score_threshold: float = 0.02 # Lowered from default 0.3 to allow more results + fallback_responses: List[str] = None + + def __post_init__(self): + if self.fallback_responses is None: + self.fallback_responses = [ + "I'm not sure how to help with that. Could you please rephrase your question?", + "I don't have enough information to answer that question accurately.", + "That's outside my knowledge area. Is there something else I can help you with?" + ] + + +class ChatMessage(BaseModel): + """Individual chat message""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + role: MessageRole + content: str + timestamp: datetime = Field(default_factory=datetime.utcnow) + metadata: Dict[str, Any] = Field(default_factory=dict) + sources: Optional[List[Dict[str, Any]]] = None + + +class Conversation(BaseModel): + """Conversation state""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + chatbot_id: str + user_id: str + messages: List[ChatMessage] = Field(default_factory=list) + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class ChatRequest(BaseModel): + """Chat completion request""" + message: str + conversation_id: Optional[str] = None + chatbot_id: str + use_rag: Optional[bool] = None + context: Optional[Dict[str, Any]] = None + + +class ChatResponse(BaseModel): + """Chat completion response""" + response: str + conversation_id: str + message_id: str + sources: Optional[List[Dict[str, Any]]] = None + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class ChatbotInstance(BaseModel): + """Configured chatbot instance""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + name: str + config: ChatbotConfig + created_by: str + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + is_active: bool = True + + +class ChatbotModule(BaseModule): + """Main chatbot module implementation""" + + def __init__(self, rag_service: Optional[RAGServiceProtocol] = None): + super().__init__("chatbot") + self.rag_module = rag_service # Keep same name for compatibility + self.db_session = None + + # System prompts will be loaded from database + self.system_prompts = {} + + async def initialize(self, **kwargs): + """Initialize the chatbot module""" + await super().initialize(**kwargs) + + # Initialize the LLM service + await llm_service.initialize() + + # Get RAG module dependency if not already injected + if not self.rag_module: + try: + # Try to get RAG module from module manager + from app.services.module_manager import module_manager + if hasattr(module_manager, 'modules') and 'rag' in module_manager.modules: + self.rag_module = module_manager.modules['rag'] + logger.info("RAG module injected from module manager") + except Exception as e: + logger.warning(f"Could not inject RAG module: {e}") + + # Load prompt templates from database + await self._load_prompt_templates() + + logger.info("Chatbot module initialized") + logger.info(f"LLM service available: {llm_service._initialized}") + logger.info(f"RAG module available after init: {self.rag_module is not None}") + logger.info(f"Loaded {len(self.system_prompts)} prompt templates") + + async def _ensure_dependencies(self): + """Lazy load dependencies if not available""" + # Ensure LLM service is initialized + if not llm_service._initialized: + await llm_service.initialize() + logger.info("LLM service lazy loaded") + + if not self.rag_module: + try: + # Try to get RAG module from module manager + from app.services.module_manager import module_manager + if hasattr(module_manager, 'modules') and 'rag' in module_manager.modules: + self.rag_module = module_manager.modules['rag'] + logger.info("RAG module lazy loaded from module manager") + except Exception as e: + logger.warning(f"Could not lazy load RAG module: {e}") + + async def _load_prompt_templates(self): + """Load prompt templates from database""" + try: + from app.db.database import SessionLocal + from app.models.prompt_template import PromptTemplate + from sqlalchemy import select + + db = SessionLocal() + try: + result = db.execute( + select(PromptTemplate) + .where(PromptTemplate.is_active == True) + ) + templates = result.scalars().all() + + for template in templates: + self.system_prompts[template.type_key] = template.system_prompt + + logger.info(f"Loaded {len(self.system_prompts)} prompt templates from database") + + finally: + db.close() + + except Exception as e: + logger.warning(f"Could not load prompt templates from database: {e}") + # Fallback to hardcoded prompts + self.system_prompts = { + "assistant": "You are a helpful AI assistant. Provide accurate, concise, and friendly responses. Always aim to be helpful while being honest about your limitations.", + "customer_support": "You are a professional customer support representative. Be empathetic, professional, and solution-focused in all interactions.", + "teacher": "You are an experienced educational tutor. Break down complex concepts into understandable parts. Be patient, supportive, and encouraging.", + "researcher": "You are a thorough research assistant with a focus on accuracy and evidence-based information.", + "creative_writer": "You are an experienced creative writing mentor and storytelling expert.", + "custom": "You are a helpful AI assistant. Your personality and behavior will be defined by custom instructions." + } + + async def get_system_prompt_for_type(self, chatbot_type: str) -> str: + """Get system prompt for a specific chatbot type""" + if chatbot_type in self.system_prompts: + return self.system_prompts[chatbot_type] + + # If not found, try to reload templates + await self._load_prompt_templates() + + return self.system_prompts.get(chatbot_type, self.system_prompts.get("assistant", + "You are a helpful AI assistant. Provide accurate, concise, and friendly responses.")) + + async def create_chatbot(self, config: ChatbotConfig, user_id: str, db: Session) -> ChatbotInstance: + """Create a new chatbot instance""" + + # Set system prompt based on type if not provided or empty + if not config.system_prompt or config.system_prompt.strip() == "": + config.system_prompt = await self.get_system_prompt_for_type(config.chatbot_type) + + # Create database record + db_chatbot = DBChatbotInstance( + name=config.name, + description=f"{config.chatbot_type.replace('_', ' ').title()} chatbot", + config=config.__dict__, + created_by=user_id + ) + + db.add(db_chatbot) + db.commit() + db.refresh(db_chatbot) + + # Convert to response model + chatbot = ChatbotInstance( + id=db_chatbot.id, + name=db_chatbot.name, + config=ChatbotConfig(**db_chatbot.config), + created_by=db_chatbot.created_by, + created_at=db_chatbot.created_at, + updated_at=db_chatbot.updated_at, + is_active=db_chatbot.is_active + ) + + logger.info(f"Created new chatbot: {chatbot.name} ({chatbot.id})") + return chatbot + + async def chat_completion(self, request: ChatRequest, user_id: str, db: Session) -> ChatResponse: + """Generate chat completion response""" + + # Get chatbot configuration from database + db_chatbot = db.query(DBChatbotInstance).filter(DBChatbotInstance.id == request.chatbot_id).first() + if not db_chatbot: + raise HTTPException(status_code=404, detail="Chatbot not found") + + chatbot_config = ChatbotConfig(**db_chatbot.config) + + # Get or create conversation + conversation = await self._get_or_create_conversation( + request.conversation_id, request.chatbot_id, user_id, db + ) + + # Create user message + user_message = DBMessage( + conversation_id=conversation.id, + role=MessageRole.USER.value, + content=request.message + ) + db.add(user_message) + db.commit() + db.refresh(user_message) + + logger.info(f"Created user message with ID {user_message.id} for conversation {conversation.id}") + + try: + # Force the session to see the committed changes + db.expire_all() + + # Get conversation history for context - includes the current message we just created + # Fetch up to memory_length pairs of messages (user + assistant) + # The +1 ensures we include the current message if we're at the limit + messages = db.query(DBMessage).filter( + DBMessage.conversation_id == conversation.id + ).order_by(DBMessage.timestamp.desc()).limit(chatbot_config.memory_length * 2 + 1).all() + + logger.info(f"Query for conversation_id={conversation.id}, memory_length={chatbot_config.memory_length}") + logger.info(f"Found {len(messages)} messages in conversation history") + + # If we don't have any messages, manually add the user message we just created + if len(messages) == 0: + logger.warning(f"No messages found in query, but we just created message {user_message.id}") + logger.warning(f"Using the user message we just created") + messages = [user_message] + + for idx, msg in enumerate(messages): + logger.info(f"Message {idx}: id={msg.id}, role={msg.role}, content_preview={msg.content[:50] if msg.content else 'None'}...") + + # Generate response + response_content, sources = await self._generate_response( + request.message, messages, chatbot_config, request.context, db + ) + + # Create assistant message + assistant_message = DBMessage( + conversation_id=conversation.id, + role=MessageRole.ASSISTANT.value, + content=response_content, + sources=sources, + metadata={"model": chatbot_config.model, "temperature": chatbot_config.temperature} + ) + db.add(assistant_message) + db.commit() + db.refresh(assistant_message) + + # Update conversation timestamp + conversation.updated_at = datetime.utcnow() + db.commit() + + return ChatResponse( + response=response_content, + conversation_id=conversation.id, + message_id=assistant_message.id, + sources=sources + ) + + except Exception as e: + logger.error(f"Chat completion failed: {e}") + # Return fallback response + fallback = chatbot_config.fallback_responses[0] if chatbot_config.fallback_responses else "I'm having trouble responding right now." + + assistant_message = DBMessage( + conversation_id=conversation.id, + role=MessageRole.ASSISTANT.value, + content=fallback, + metadata={"error": str(e), "fallback": True} + ) + db.add(assistant_message) + db.commit() + db.refresh(assistant_message) + + return ChatResponse( + response=fallback, + conversation_id=conversation.id, + message_id=assistant_message.id, + metadata={"error": str(e), "fallback": True} + ) + + async def _generate_response(self, message: str, db_messages: List[DBMessage], + config: ChatbotConfig, context: Optional[Dict] = None, db: Session = None) -> tuple[str, Optional[List]]: + """Generate response using LLM with optional RAG""" + + # Lazy load dependencies if not available + await self._ensure_dependencies() + + sources = None + rag_context = "" + + # Helper: detect encryption-related queries for extra care + def _is_encryption_query(q: str) -> bool: + ql = (q or "").lower() + return any(k in ql for k in ["encrypt", "encryption", "encrypted", "decrypt", "decryption", "sd card", "microsd", "micro-sd"]) + + is_encryption = _is_encryption_query(message) + + # RAG search if enabled + if config.use_rag and config.rag_collection and self.rag_module: + logger.info(f"RAG search enabled for collection: {config.rag_collection}") + try: + # Get the Qdrant collection name from RAG collection + qdrant_collection_name = await self._get_qdrant_collection_name(config.rag_collection, db) + logger.info(f"Qdrant collection name: {qdrant_collection_name}") + + if qdrant_collection_name: + logger.info(f"Searching RAG documents: query='{message[:50]}...', max_results={config.rag_top_k}") + rag_results = await self.rag_module.search_documents( + query=message, + max_results=config.rag_top_k, + collection_name=qdrant_collection_name, + score_threshold=config.rag_score_threshold + ) + + # If the user asks about encryption, prefer results that explicitly mention it + if rag_results and is_encryption: + kw = ["encrypt", "encryption", "encrypted", "decrypt", "decryption"] + filtered = [r for r in rag_results if any(k in (r.document.content or "").lower() for k in kw)] + if filtered: + rag_results = filtered + [r for r in rag_results if r not in filtered] + + if rag_results: + logger.info(f"RAG search found {len(rag_results)} results") + sources = [{"title": f"Document {i+1}", "content": result.document.content[:200]} + for i, result in enumerate(rag_results)] + + # Build full RAG context from all results + rag_context = "\n\nRelevant information from knowledge base:\n" + "\n\n".join([ + f"[Document {i+1}]:\n{result.document.content}" for i, result in enumerate(rag_results) + ]) + + # Detailed RAG logging - ALWAYS log for debugging + logger.info("=== COMPREHENSIVE RAG SEARCH RESULTS ===") + logger.info(f"Query: '{message}'") + logger.info(f"Collection: {qdrant_collection_name}") + logger.info(f"Number of results: {len(rag_results)}") + for i, result in enumerate(rag_results): + logger.info(f"\n--- RAG Result {i+1} ---") + logger.info(f"Score: {getattr(result, 'score', 'N/A')}") + logger.info(f"Document ID: {getattr(result.document, 'id', 'N/A')}") + logger.info(f"Full Content ({len(result.document.content)} chars):") + logger.info(f"{result.document.content}") + if hasattr(result.document, 'metadata'): + logger.info(f"Metadata: {result.document.metadata}") + logger.info(f"\n=== RAG CONTEXT BEING ADDED TO PROMPT ({len(rag_context)} chars) ===") + logger.info(rag_context) + logger.info("=== END RAG SEARCH RESULTS ===") + else: + logger.warning("RAG search returned no results") + else: + logger.warning(f"RAG collection '{config.rag_collection}' not found in database") + + except Exception as e: + logger.warning(f"RAG search failed: {e}") + import traceback + logger.warning(f"RAG search traceback: {traceback.format_exc()}") + + # Build conversation context (includes the current message from db_messages) + # Inject strict grounding instructions when RAG is used, especially for encryption questions + extra_instructions = {} + if config.use_rag: + guardrails = ( + "Answer strictly using the 'Relevant information' provided. " + "If the information does not explicitly answer the question, say you don't have enough information instead of guessing. " + ) + if is_encryption: + guardrails += ( + "When asked about encryption or SD-card backups, do not claim that backups are encrypted unless the provided context explicitly uses wording like 'encrypt', 'encrypted', or 'encryption'. " + "If such wording is absent, state clearly that the SD-card backup is not encrypted. " + ) + extra_instructions["additional_instructions"] = guardrails + + messages = self._build_conversation_messages(db_messages, config, rag_context, extra_instructions) + + # Note: Current user message is already included in db_messages from the query + logger.info(f"Built conversation context with {len(messages)} messages") + + # LLM completion + logger.info(f"Attempting LLM completion with model: {config.model}") + logger.info(f"Messages to send: {len(messages)} messages") + + # Always log detailed prompts for debugging + logger.info("=== COMPREHENSIVE LLM REQUEST ===") + logger.info(f"Model: {config.model}") + logger.info(f"Temperature: {config.temperature}") + logger.info(f"Max tokens: {config.max_tokens}") + logger.info(f"RAG enabled: {config.use_rag}") + logger.info(f"RAG collection: {config.rag_collection}") + if config.use_rag and rag_context: + logger.info(f"RAG context added: {len(rag_context)} characters") + logger.info(f"RAG sources: {len(sources) if sources else 0} documents") + logger.info("\n=== COMPLETE MESSAGES SENT TO LLM ===") + for i, msg in enumerate(messages): + logger.info(f"\n--- Message {i+1} ---") + logger.info(f"Role: {msg['role']}") + logger.info(f"Content ({len(msg['content'])} chars):") + # Truncate long content for logging (full RAG context can be very long) + if len(msg['content']) > 500: + logger.info(f"{msg['content'][:500]}... [truncated, total {len(msg['content'])} chars]") + else: + logger.info(msg['content']) + logger.info("=== END COMPREHENSIVE LLM REQUEST ===") + + try: + logger.info("Calling LLM service create_chat_completion...") + + # Convert messages to LLM service format + llm_messages = [LLMChatMessage(role=msg["role"], content=msg["content"]) for msg in messages] + + # Create LLM service request + llm_request = LLMChatRequest( + model=config.model, + messages=llm_messages, + temperature=config.temperature, + max_tokens=config.max_tokens, + user_id="chatbot_user", + api_key_id=0 # Chatbot module uses internal service + ) + + # Make request to LLM service + llm_response = await llm_service.create_chat_completion(llm_request) + + # Extract response content + if llm_response.choices: + content = llm_response.choices[0].message.content + logger.info(f"Response content length: {len(content)}") + + # Always log response for debugging + logger.info("=== COMPREHENSIVE LLM RESPONSE ===") + logger.info(f"Response content ({len(content)} chars):") + logger.info(content) + if llm_response.usage: + usage = llm_response.usage + logger.info(f"Token usage - Prompt: {usage.prompt_tokens}, Completion: {usage.completion_tokens}, Total: {usage.total_tokens}") + if sources: + logger.info(f"RAG sources included: {len(sources)} documents") + logger.info("=== END COMPREHENSIVE LLM RESPONSE ===") + + return content, sources + else: + logger.warning("No choices in LLM response") + return "I received an empty response from the AI model.", sources + + except SecurityError as e: + logger.error(f"Security error in LLM completion: {e}") + raise HTTPException(status_code=400, detail=f"Security validation failed: {e.message}") + except ProviderError as e: + logger.error(f"Provider error in LLM completion: {e}") + raise HTTPException(status_code=503, detail="LLM service temporarily unavailable") + except LLMError as e: + logger.error(f"LLM service error: {e}") + raise HTTPException(status_code=500, detail="LLM service error") + except Exception as e: + logger.error(f"LLM completion failed: {e}") + # Return fallback if available + return "I'm currently unable to process your request. Please try again later.", None + + def _build_conversation_messages(self, db_messages: List[DBMessage], config: ChatbotConfig, + rag_context: str = "", context: Optional[Dict] = None) -> List[Dict]: + """Build messages array for LLM completion""" + + messages = [] + + # System prompt + system_prompt = config.system_prompt + if rag_context: + # Add explicit instruction to use RAG context + system_prompt += "\n\nIMPORTANT: Use the following information from the knowledge base to answer the user's question. " \ + "This information is directly relevant to their query and should be your primary source:\n" + rag_context + if context and context.get('additional_instructions'): + system_prompt += f"\n\nAdditional instructions: {context['additional_instructions']}" + + messages.append({"role": "system", "content": system_prompt}) + + logger.info(f"Building messages from {len(db_messages)} database messages") + + # Conversation history (messages are already limited by memory_length in the query) + # Reverse to get chronological order + # Include ALL messages - the current user message is needed for the LLM to respond! + for idx, msg in enumerate(reversed(db_messages)): + logger.info(f"Processing message {idx}: role={msg.role}, content_preview={msg.content[:50] if msg.content else 'None'}...") + if msg.role in ["user", "assistant"]: + messages.append({ + "role": msg.role, + "content": msg.content + }) + logger.info(f"Added message with role {msg.role} to LLM messages") + else: + logger.info(f"Skipped message with role {msg.role}") + + logger.info(f"Final messages array has {len(messages)} messages") # For debugging, can be removed in production + return messages + + async def _get_or_create_conversation(self, conversation_id: Optional[str], + chatbot_id: str, user_id: str, db: Session) -> DBConversation: + """Get existing conversation or create new one""" + + if conversation_id: + conversation = db.query(DBConversation).filter(DBConversation.id == conversation_id).first() + if conversation: + return conversation + + # Create new conversation + conversation = DBConversation( + chatbot_id=chatbot_id, + user_id=user_id, + title="New Conversation" + ) + + db.add(conversation) + db.commit() + db.refresh(conversation) + return conversation + + def get_router(self) -> APIRouter: + """Get FastAPI router for chatbot endpoints""" + router = APIRouter(prefix="/chatbot", tags=["chatbot"]) + + @router.post("/chat", response_model=ChatResponse) + async def chat_endpoint( + request: ChatRequest, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) + ): + """Chat completion endpoint""" + return await self.chat_completion(request, str(current_user['id']), db) + + @router.post("/create", response_model=ChatbotInstance) + async def create_chatbot_endpoint( + config: ChatbotConfig, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) + ): + """Create new chatbot instance""" + return await self.create_chatbot(config, str(current_user['id']), db) + + @router.get("/list", response_model=List[ChatbotInstance]) + async def list_chatbots_endpoint( + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) + ): + """List user's chatbots""" + db_chatbots = db.query(DBChatbotInstance).filter( + (DBChatbotInstance.created_by == str(current_user['id'])) | + (DBChatbotInstance.created_by == "system") + ).all() + + chatbots = [] + for db_chatbot in db_chatbots: + chatbot = ChatbotInstance( + id=db_chatbot.id, + name=db_chatbot.name, + config=ChatbotConfig(**db_chatbot.config), + created_by=db_chatbot.created_by, + created_at=db_chatbot.created_at, + updated_at=db_chatbot.updated_at, + is_active=db_chatbot.is_active + ) + chatbots.append(chatbot) + + return chatbots + + @router.get("/conversations/{conversation_id}", response_model=Conversation) + async def get_conversation_endpoint( + conversation_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) + ): + """Get conversation history""" + conversation = db.query(DBConversation).filter( + DBConversation.id == conversation_id + ).first() + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Check if user owns this conversation + if conversation.user_id != str(current_user['id']): + raise HTTPException(status_code=403, detail="Not authorized") + + # Get messages + messages = db.query(DBMessage).filter( + DBMessage.conversation_id == conversation_id + ).order_by(DBMessage.timestamp).all() + + # Convert to response model + chat_messages = [] + for msg in messages: + chat_message = ChatMessage( + id=msg.id, + role=MessageRole(msg.role), + content=msg.content, + timestamp=msg.timestamp, + metadata=msg.metadata or {}, + sources=msg.sources + ) + chat_messages.append(chat_message) + + response_conversation = Conversation( + id=conversation.id, + chatbot_id=conversation.chatbot_id, + user_id=conversation.user_id, + messages=chat_messages, + created_at=conversation.created_at, + updated_at=conversation.updated_at, + metadata=conversation.context_data or {} + ) + + return response_conversation + + @router.get("/types", response_model=List[Dict[str, str]]) + async def get_chatbot_types_endpoint(): + """Get available chatbot types and their descriptions""" + return [ + {"type": "assistant", "name": "General Assistant", "description": "Helpful AI assistant for general questions"}, + {"type": "customer_support", "name": "Customer Support", "description": "Professional customer service chatbot"}, + {"type": "teacher", "name": "Teacher", "description": "Educational tutor and learning assistant"}, + {"type": "researcher", "name": "Researcher", "description": "Research assistant with fact-checking focus"}, + {"type": "creative_writer", "name": "Creative Writer", "description": "Creative writing and storytelling assistant"}, + {"type": "custom", "name": "Custom", "description": "Custom chatbot with user-defined personality"} + ] + + return router + + # API Compatibility Methods + async def chat(self, chatbot_config: Dict[str, Any], message: str, + conversation_history: List = None, user_id: str = "anonymous") -> Dict[str, Any]: + """Chat method for API compatibility""" + logger.info(f"Chat method called with message: {message[:50]}... by user: {user_id}") + + # Lazy load dependencies + await self._ensure_dependencies() + + logger.info(f"LLM service available: {llm_service._initialized}") + logger.info(f"RAG module available: {self.rag_module is not None}") + + try: + # Create a minimal database session for the chat + from app.db.database import SessionLocal + db = SessionLocal() + + try: + # Convert config dict to ChatbotConfig + config = ChatbotConfig( + name=chatbot_config.get("name", "Unknown"), + chatbot_type=chatbot_config.get("chatbot_type", "assistant"), + model=chatbot_config.get("model", "gpt-3.5-turbo"), + system_prompt=chatbot_config.get("system_prompt", ""), + temperature=chatbot_config.get("temperature", 0.7), + max_tokens=chatbot_config.get("max_tokens", 1000), + memory_length=chatbot_config.get("memory_length", 10), + use_rag=chatbot_config.get("use_rag", False), + rag_collection=chatbot_config.get("rag_collection"), + rag_top_k=chatbot_config.get("rag_top_k", 5), + fallback_responses=chatbot_config.get("fallback_responses", []) + ) + + # Generate response using internal method + # Create a temporary message object for the current user message + temp_messages = [ + DBMessage( + id=0, + conversation_id=0, + role="user", + content=message, + timestamp=datetime.utcnow(), + metadata={} + ) + ] + + response_content, sources = await self._generate_response( + message, temp_messages, config, None, db + ) + + return { + "response": response_content, + "sources": sources, + "conversation_id": None, + "message_id": f"msg_{uuid.uuid4()}" + } + + finally: + db.close() + + except Exception as e: + logger.error(f"Chat method failed: {e}") + fallback_responses = chatbot_config.get("fallback_responses", [ + "I'm sorry, I'm having trouble processing your request right now." + ]) + return { + "response": fallback_responses[0] if fallback_responses else "I'm sorry, I couldn't process your request.", + "sources": None, + "conversation_id": None, + "message_id": f"msg_{uuid.uuid4()}" + } + + # Workflow Integration Methods + async def workflow_chat_step(self, context: Dict[str, Any], step_config: Dict[str, Any], db: Session) -> Dict[str, Any]: + """Execute chatbot as a workflow step""" + + message = step_config.get('message', '') + chatbot_id = step_config.get('chatbot_id') + use_rag = step_config.get('use_rag', False) + + # Template substitution from context + message = self._substitute_template_variables(message, context) + + request = ChatRequest( + message=message, + chatbot_id=chatbot_id, + use_rag=use_rag, + context=step_config.get('context', {}) + ) + + # Use system user for workflow executions + response = await self.chat_completion(request, "workflow_system", db) + + return { + "response": response.response, + "conversation_id": response.conversation_id, + "sources": response.sources, + "metadata": response.metadata + } + + def _substitute_template_variables(self, template: str, context: Dict[str, Any]) -> str: + """Simple template variable substitution""" + import re + + def replace_var(match): + var_path = match.group(1) + try: + # Simple dot notation support: context.user.name + value = context + for part in var_path.split('.'): + value = value[part] + return str(value) + except (KeyError, TypeError): + return match.group(0) # Return original if not found + + return re.sub(r'\\{\\{\\s*([^}]+)\\s*\\}\\}', replace_var, template) + + async def _get_qdrant_collection_name(self, collection_identifier: str, db: Session) -> Optional[str]: + """Get Qdrant collection name from RAG collection ID, name, or direct Qdrant collection""" + try: + from app.models.rag_collection import RagCollection + from sqlalchemy import select + + logger.info(f"Looking up RAG collection with identifier: '{collection_identifier}'") + + # First check if this might be a direct Qdrant collection name + # (e.g., starts with "ext_", "rag_", or contains specific patterns) + if collection_identifier.startswith(("ext_", "rag_", "test_")) or "_" in collection_identifier: + # Check if this collection exists in Qdrant directly + actual_collection_name = collection_identifier + # Remove "ext_" prefix if present + if collection_identifier.startswith("ext_"): + actual_collection_name = collection_identifier[4:] + + logger.info(f"Checking if '{actual_collection_name}' exists in Qdrant directly") + if self.rag_module: + try: + # Try to verify the collection exists in Qdrant + from qdrant_client import QdrantClient + qdrant_client = QdrantClient(host="enclava-qdrant", port=6333) + collections = qdrant_client.get_collections() + collection_names = [c.name for c in collections.collections] + + if actual_collection_name in collection_names: + logger.info(f"Found Qdrant collection directly: {actual_collection_name}") + return actual_collection_name + except Exception as e: + logger.warning(f"Error checking Qdrant collections: {e}") + + rag_collection = None + + # Then try PostgreSQL lookup by ID if numeric + if collection_identifier.isdigit(): + logger.info(f"Treating '{collection_identifier}' as collection ID") + stmt = select(RagCollection).where( + RagCollection.id == int(collection_identifier), + RagCollection.is_active == True + ) + result = db.execute(stmt) + rag_collection = result.scalar_one_or_none() + + # If not found by ID, try to look up by name in PostgreSQL + if not rag_collection: + logger.info(f"Collection not found by ID, trying by name: '{collection_identifier}'") + stmt = select(RagCollection).where( + RagCollection.name == collection_identifier, + RagCollection.is_active == True + ) + result = db.execute(stmt) + rag_collection = result.scalar_one_or_none() + + if rag_collection: + logger.info(f"Found RAG collection: ID={rag_collection.id}, name='{rag_collection.name}', qdrant_collection='{rag_collection.qdrant_collection_name}'") + return rag_collection.qdrant_collection_name + else: + logger.warning(f"RAG collection '{collection_identifier}' not found in database (tried both ID and name)") + return None + + except Exception as e: + logger.error(f"Error looking up RAG collection '{collection_identifier}': {e}") + import traceback + logger.error(f"Traceback: {traceback.format_exc()}") + return None + + # Required abstract methods from BaseModule + + async def cleanup(self): + """Cleanup chatbot module resources""" + logger.info("Chatbot module cleanup completed") + + def get_required_permissions(self) -> List[Permission]: + """Get required permissions for chatbot module""" + return [ + Permission("chatbots", "create", "Create chatbot instances"), + Permission("chatbots", "configure", "Configure chatbot settings"), + Permission("chatbots", "chat", "Use chatbot for conversations"), + Permission("chatbots", "manage", "Manage all chatbots") + ] + + async def process_request(self, request_type: str, data: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Process chatbot requests""" + if request_type == "chat": + # Handle chat requests + chat_request = ChatRequest(**data) + user_id = context.get("user_id", "anonymous") + db = context.get("db") + + if db: + response = await self.chat_completion(chat_request, user_id, db) + return { + "success": True, + "response": response.response, + "conversation_id": response.conversation_id, + "sources": response.sources + } + + return {"success": False, "error": f"Unknown request type: {request_type}"} + + +# Module factory function +def create_module(rag_service: Optional[RAGServiceProtocol] = None) -> ChatbotModule: + """Factory function to create chatbot module instance""" + return ChatbotModule(rag_service=rag_service) + +# Create module instance (dependencies will be injected via factory) +chatbot_module = ChatbotModule() diff --git a/backend/app/modules/chatbot/module.yaml b/backend/app/modules/chatbot/module.yaml new file mode 100644 index 0000000..7d9fbd3 --- /dev/null +++ b/backend/app/modules/chatbot/module.yaml @@ -0,0 +1,110 @@ +name: chatbot +version: 1.0.0 +description: "AI Chatbot with RAG integration and customizable prompts" +author: "Enclava Team" +category: "conversation" + +# Module lifecycle +enabled: true +auto_start: true +dependencies: + - rag +optional_dependencies: + - analytics + +# Configuration +config_schema: "./config_schema.json" +ui_components: "./ui_components/" + +# Module capabilities +provides: + - "chat_completion" + - "conversation_management" + - "chatbot_configuration" + +consumes: + - "rag_search" + - "llm_completion" + +# API endpoints +endpoints: + - path: "/chatbot/chat" + method: "POST" + description: "Generate chat completion" + + - path: "/chatbot/create" + method: "POST" + description: "Create new chatbot instance" + + - path: "/chatbot/list" + method: "GET" + description: "List user chatbots" + +# UI Configuration +ui_config: + icon: "message-circle" + color: "#10B981" + category: "AI & ML" + + # Configuration forms + forms: + - name: "basic_config" + title: "Basic Settings" + fields: ["name", "chatbot_type", "model"] + + - name: "personality" + title: "Personality & Prompts" + fields: ["system_prompt", "temperature", "fallback_responses"] + + - name: "knowledge_base" + title: "Knowledge Base" + fields: ["use_rag", "rag_collection", "rag_top_k"] + + - name: "advanced" + title: "Advanced Settings" + fields: ["max_tokens", "memory_length"] + +# Permissions +permissions: + - name: "chatbot.create" + description: "Create new chatbot instances" + + - name: "chatbot.configure" + description: "Configure chatbot settings" + + - name: "chatbot.chat" + description: "Use chatbot for conversations" + + - name: "chatbot.manage" + description: "Manage all chatbots (admin)" + +# Analytics events +analytics_events: + - name: "chatbot_created" + description: "New chatbot instance created" + + - name: "chat_message_sent" + description: "User sent message to chatbot" + + - name: "chat_response_generated" + description: "Chatbot generated response" + + - name: "rag_context_used" + description: "RAG context was used in response" + +# Health checks +health_checks: + - name: "llm_connectivity" + description: "Check LLM client connection" + + - name: "rag_availability" + description: "Check RAG module availability" + + - name: "conversation_memory" + description: "Check conversation storage health" + +# Documentation +documentation: + readme: "./README.md" + examples: "./examples/" + api_docs: "./docs/api.md" \ No newline at end of file diff --git a/backend/app/modules/factory.py b/backend/app/modules/factory.py new file mode 100644 index 0000000..7e4590a --- /dev/null +++ b/backend/app/modules/factory.py @@ -0,0 +1,225 @@ +""" +Module Factory for Confidential Empire + +This factory creates and wires up all modules with their dependencies. +It ensures proper dependency injection while maintaining optimal performance +through direct method calls and minimal indirection. +""" + +from typing import Dict, Optional, Any +import logging + +# Import all modules +from .rag.main import RAGModule +from .chatbot.main import ChatbotModule, create_module as create_chatbot_module +from .workflow.main import WorkflowModule + +# Import services that modules depend on +from app.services.litellm_client import LiteLLMClient + +# Import protocols for type safety +from .protocols import ( + RAGServiceProtocol, + ChatbotServiceProtocol, + LiteLLMClientProtocol, + WorkflowServiceProtocol, + ServiceRegistry +) + +logger = logging.getLogger(__name__) + + +class ModuleFactory: + """Factory for creating and wiring module dependencies""" + + def __init__(self): + self.modules: Dict[str, Any] = {} + self.initialized = False + + async def create_all_modules(self, config: Optional[Dict[str, Any]] = None) -> ServiceRegistry: + """ + Create all modules with proper dependency injection + + Args: + config: Optional configuration for modules + + Returns: + Dictionary of created modules with their dependencies wired + """ + config = config or {} + + logger.info("Creating modules with dependency injection...") + + # Step 1: Create LiteLLM client (shared dependency) + litellm_client = LiteLLMClient() + + # Step 2: Create RAG module (no dependencies on other modules) + rag_module = RAGModule(config=config.get("rag", {})) + + # Step 3: Create chatbot module with RAG dependency + chatbot_module = create_chatbot_module( + litellm_client=litellm_client, + rag_service=rag_module # RAG module implements RAGServiceProtocol + ) + + # Step 4: Create workflow module with chatbot dependency + workflow_module = WorkflowModule( + chatbot_service=chatbot_module # Chatbot module implements ChatbotServiceProtocol + ) + + # Store all modules + modules = { + "rag": rag_module, + "chatbot": chatbot_module, + "workflow": workflow_module + } + + logger.info(f"Created {len(modules)} modules with dependencies wired") + + # Initialize all modules + await self._initialize_modules(modules, config) + + self.modules = modules + self.initialized = True + + return modules + + async def _initialize_modules(self, modules: Dict[str, Any], config: Dict[str, Any]): + """Initialize all modules in dependency order""" + + # Initialize in dependency order (modules with no deps first) + initialization_order = [ + ("rag", modules["rag"]), + ("chatbot", modules["chatbot"]), # Depends on RAG + ("workflow", modules["workflow"]) # Depends on Chatbot + ] + + for module_name, module in initialization_order: + try: + logger.info(f"Initializing {module_name} module...") + module_config = config.get(module_name, {}) + + # Different modules have different initialization patterns + if hasattr(module, 'initialize'): + if module_name == "rag": + await module.initialize() + else: + await module.initialize(**module_config) + + logger.info(f"✅ {module_name} module initialized successfully") + + except Exception as e: + logger.error(f"❌ Failed to initialize {module_name} module: {e}") + raise RuntimeError(f"Module initialization failed: {module_name}") from e + + async def cleanup_all_modules(self): + """Cleanup all modules in reverse dependency order""" + if not self.initialized: + return + + # Cleanup in reverse order + cleanup_order = ["workflow", "chatbot", "rag"] + + for module_name in cleanup_order: + if module_name in self.modules: + try: + logger.info(f"Cleaning up {module_name} module...") + module = self.modules[module_name] + if hasattr(module, 'cleanup'): + await module.cleanup() + logger.info(f"✅ {module_name} module cleaned up") + except Exception as e: + logger.error(f"❌ Error cleaning up {module_name}: {e}") + + self.modules.clear() + self.initialized = False + + def get_module(self, name: str) -> Optional[Any]: + """Get a module by name""" + return self.modules.get(name) + + def is_initialized(self) -> bool: + """Check if factory is initialized""" + return self.initialized + + +# Global factory instance +module_factory = ModuleFactory() + + +# Convenience functions for external use +async def create_modules(config: Optional[Dict[str, Any]] = None) -> ServiceRegistry: + """Create all modules with dependencies wired""" + return await module_factory.create_all_modules(config) + + +async def cleanup_modules(): + """Cleanup all modules""" + await module_factory.cleanup_all_modules() + + +def get_module(name: str) -> Optional[Any]: + """Get a module by name""" + return module_factory.get_module(name) + + +def get_all_modules() -> Dict[str, Any]: + """Get all modules""" + return module_factory.modules.copy() + + +# Factory functions for individual modules (for testing/special cases) +def create_rag_module(config: Optional[Dict[str, Any]] = None) -> RAGModule: + """Create RAG module""" + return RAGModule(config=config or {}) + + +def create_chatbot_with_rag(rag_service: RAGServiceProtocol, + litellm_client: LiteLLMClientProtocol) -> ChatbotModule: + """Create chatbot module with RAG dependency""" + return create_chatbot_module(litellm_client=litellm_client, rag_service=rag_service) + + +def create_workflow_with_chatbot(chatbot_service: ChatbotServiceProtocol) -> WorkflowModule: + """Create workflow module with chatbot dependency""" + return WorkflowModule(chatbot_service=chatbot_service) + + +# Module registry for backward compatibility +class ModuleRegistry: + """Registry that provides access to modules (for backward compatibility)""" + + def __init__(self, factory: ModuleFactory): + self._factory = factory + + @property + def modules(self) -> Dict[str, Any]: + """Get all modules (compatible with existing module_manager interface)""" + return self._factory.modules + + def get(self, name: str) -> Optional[Any]: + """Get module by name""" + return self._factory.get_module(name) + + def __getitem__(self, name: str) -> Any: + """Support dictionary-style access""" + module = self.get(name) + if module is None: + raise KeyError(f"Module '{name}' not found") + return module + + def keys(self): + """Get module names""" + return self._factory.modules.keys() + + def values(self): + """Get module instances""" + return self._factory.modules.values() + + def items(self): + """Get module name-instance pairs""" + return self._factory.modules.items() + + +# Create registry instance for backward compatibility +module_registry = ModuleRegistry(module_factory) \ No newline at end of file diff --git a/backend/app/modules/protocols.py b/backend/app/modules/protocols.py new file mode 100644 index 0000000..2aec3b2 --- /dev/null +++ b/backend/app/modules/protocols.py @@ -0,0 +1,258 @@ +""" +Module Protocols for Confidential Empire + +This file defines the interface contracts that modules must implement for inter-module communication. +Using Python protocols provides compile-time type checking with zero runtime overhead. +""" + +from typing import Protocol, Dict, List, Any, Optional, Union +from datetime import datetime +from abc import abstractmethod + + +class RAGServiceProtocol(Protocol): + """Protocol for RAG (Retrieval-Augmented Generation) service interface""" + + @abstractmethod + async def search(self, query: str, collection_name: str, top_k: int) -> Dict[str, Any]: + """ + Search for relevant documents + + Args: + query: Search query string + collection_name: Name of the collection to search in + top_k: Number of top results to return + + Returns: + Dictionary containing search results with 'results' key + """ + ... + + @abstractmethod + async def index_document(self, content: str, metadata: Dict[str, Any] = None) -> str: + """ + Index a document in the vector database + + Args: + content: Document content to index + metadata: Optional metadata for the document + + Returns: + Document ID + """ + ... + + @abstractmethod + async def delete_document(self, document_id: str) -> bool: + """ + Delete a document from the vector database + + Args: + document_id: ID of document to delete + + Returns: + True if successfully deleted + """ + ... + + +class ChatbotServiceProtocol(Protocol): + """Protocol for Chatbot service interface""" + + @abstractmethod + async def chat_completion(self, request: Any, user_id: str, db: Any) -> Any: + """ + Generate chat completion response + + Args: + request: Chat request object + user_id: ID of the user making the request + db: Database session + + Returns: + Chat response object + """ + ... + + @abstractmethod + async def create_chatbot(self, config: Any, user_id: str, db: Any) -> Any: + """ + Create a new chatbot instance + + Args: + config: Chatbot configuration + user_id: ID of the user creating the chatbot + db: Database session + + Returns: + Created chatbot instance + """ + ... + + +class LiteLLMClientProtocol(Protocol): + """Protocol for LiteLLM client interface""" + + @abstractmethod + async def completion(self, model: str, messages: List[Dict[str, str]], **kwargs) -> Any: + """ + Create a completion using the specified model + + Args: + model: Model name to use + messages: List of messages for the conversation + **kwargs: Additional parameters for the completion + + Returns: + Completion response object + """ + ... + + @abstractmethod + async def create_chat_completion(self, model: str, messages: List[Dict[str, str]], + user_id: str, api_key_id: str, **kwargs) -> Any: + """ + Create a chat completion with user tracking + + Args: + model: Model name to use + messages: List of messages for the conversation + user_id: ID of the user making the request + api_key_id: API key identifier + **kwargs: Additional parameters + + Returns: + Chat completion response + """ + ... + + +class CacheServiceProtocol(Protocol): + """Protocol for Cache service interface""" + + @abstractmethod + async def get(self, key: str, default: Any = None) -> Any: + """ + Get value from cache + + Args: + key: Cache key + default: Default value if key not found + + Returns: + Cached value or default + """ + ... + + @abstractmethod + async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: + """ + Set value in cache + + Args: + key: Cache key + value: Value to cache + ttl: Time to live in seconds + + Returns: + True if successfully cached + """ + ... + + @abstractmethod + async def delete(self, key: str) -> bool: + """ + Delete key from cache + + Args: + key: Cache key to delete + + Returns: + True if successfully deleted + """ + ... + + +class SecurityServiceProtocol(Protocol): + """Protocol for Security service interface""" + + @abstractmethod + async def analyze_request(self, request: Any) -> Any: + """ + Perform security analysis on a request + + Args: + request: Request object to analyze + + Returns: + Security analysis result + """ + ... + + @abstractmethod + async def validate_request(self, request: Any) -> bool: + """ + Validate request for security compliance + + Args: + request: Request object to validate + + Returns: + True if request is valid/safe + """ + ... + + +class WorkflowServiceProtocol(Protocol): + """Protocol for Workflow service interface""" + + @abstractmethod + async def execute_workflow(self, workflow: Any, input_data: Dict[str, Any] = None) -> Any: + """ + Execute a workflow definition + + Args: + workflow: Workflow definition to execute + input_data: Optional input data for the workflow + + Returns: + Workflow execution result + """ + ... + + @abstractmethod + async def get_execution(self, execution_id: str) -> Any: + """ + Get workflow execution status + + Args: + execution_id: ID of the execution to retrieve + + Returns: + Execution status object + """ + ... + + +class ModuleServiceProtocol(Protocol): + """Base protocol for all module services""" + + @abstractmethod + async def initialize(self, **kwargs) -> None: + """Initialize the module""" + ... + + @abstractmethod + async def cleanup(self) -> None: + """Cleanup module resources""" + ... + + @abstractmethod + def get_required_permissions(self) -> List[Any]: + """Get required permissions for this module""" + ... + + +# Type aliases for common service combinations +ServiceRegistry = Dict[str, ModuleServiceProtocol] +ServiceDependencies = Dict[str, Optional[ModuleServiceProtocol]] \ No newline at end of file diff --git a/backend/app/modules/rag/__init__.py b/backend/app/modules/rag/__init__.py new file mode 100644 index 0000000..7cd6008 --- /dev/null +++ b/backend/app/modules/rag/__init__.py @@ -0,0 +1,6 @@ +""" +RAG (Retrieval-Augmented Generation) module for Confidential Empire platform +""" +from .main import RAGModule + +__all__ = ["RAGModule"] \ No newline at end of file diff --git a/backend/app/modules/rag/main.py b/backend/app/modules/rag/main.py new file mode 100644 index 0000000..b708d97 --- /dev/null +++ b/backend/app/modules/rag/main.py @@ -0,0 +1,1922 @@ +""" +RAG module implementation with vector database and document processing +Includes comprehensive document processing, content extraction, and NLP analysis +""" +import asyncio +import io +import json +import logging +import mimetypes +import re +from typing import Any, Dict, List, Optional, Tuple, Union +from datetime import datetime +from dataclasses import dataclass, asdict +from pathlib import Path +import hashlib +import base64 +import numpy as np +import uuid + +# Initialize logger early +logger = logging.getLogger(__name__) + +# Document processing libraries (with graceful fallbacks) +try: + import nltk + from nltk.tokenize import sent_tokenize, word_tokenize + from nltk.corpus import stopwords + from nltk.stem import WordNetLemmatizer + NLTK_AVAILABLE = True +except ImportError: + logger.warning("NLTK not available - NLP features will be limited") + NLTK_AVAILABLE = False + +try: + import spacy + SPACY_AVAILABLE = True +except ImportError: + logger.warning("spaCy not available - entity extraction will be disabled") + SPACY_AVAILABLE = False + +try: + from markitdown import MarkItDown + MARKITDOWN_AVAILABLE = True +except ImportError: + logger.warning("MarkItDown not available - document conversion will be limited") + MARKITDOWN_AVAILABLE = False + +try: + from docx import Document as DocxDocument + PYTHON_DOCX_AVAILABLE = True +except ImportError: + logger.warning("python-docx not available - DOCX processing will be limited") + PYTHON_DOCX_AVAILABLE = False + +from qdrant_client import QdrantClient +from qdrant_client.models import Distance, VectorParams, PointStruct, ScoredPoint, Filter, FieldCondition, MatchValue +from qdrant_client.http import models +import tiktoken + +from app.core.config import settings +from app.core.logging import log_module_event +from app.services.base_module import BaseModule, Permission + + +@dataclass +class ProcessedDocument: + """Processed document data structure""" + id: str + original_filename: str + file_type: str + mime_type: str + content: str + extracted_text: str + metadata: Dict[str, Any] + word_count: int + sentence_count: int + language: str + entities: List[Dict[str, Any]] + keywords: List[str] + processing_time: float + processed_at: datetime + file_hash: str + file_size: int + embedding: Optional[List[float]] = None + created_at: datetime = None + + def __post_init__(self): + if self.created_at is None: + self.created_at = datetime.utcnow() + + +@dataclass +class ContentValidationResult: + """Content validation result""" + is_valid: bool + issues: List[str] + security_score: float + content_type: str + language_confidence: float + + +# Keep Document class for backward compatibility +@dataclass +class Document: + """Simple document data structure for backward compatibility""" + id: str + content: str + metadata: Dict[str, Any] + embedding: Optional[List[float]] = None + created_at: datetime = None + + def __post_init__(self): + if self.created_at is None: + self.created_at = datetime.utcnow() + + +@dataclass +class SearchResult: + """Search result data structure""" + document: Document + score: float + relevance_score: float + + +class RAGModule(BaseModule): + """RAG module for document storage, retrieval, and augmented generation with integrated content processing""" + + def __init__(self, config: Dict[str, Any] = None): + super().__init__(module_id="rag", config=config) + self.enabled = False + self.qdrant_client: Optional[QdrantClient] = None + self.default_collection_name = "documents" # Keep for backward compatibility + self.embedding_model = None + self.embedding_service = None + self.tokenizer = None + + # Set improved default configuration + self.config = { + "chunk_size": 300, # Reduced from 400 for better precision + "chunk_overlap": 50, # Added overlap for context preservation + "max_results": 10, + "score_threshold": 0.3, # Increased from 0.0 to filter low-quality results + "enable_hybrid": True, # Enable hybrid search (vector + BM25) + "hybrid_weights": {"vector": 0.7, "bm25": 0.3} # Weight for hybrid scoring + } + # Update with any provided config + if config: + self.config.update(config) + + # Content processing components + self.nlp_model = None + self.lemmatizer = None + self.stop_words = set() + self.markitdown = None + self.supported_types = { + 'text/plain': self._process_text, + 'application/pdf': self._process_with_markitdown, + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': self._process_docx, + 'application/msword': self._process_docx, + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': self._process_with_markitdown, + 'application/vnd.ms-excel': self._process_with_markitdown, + 'text/html': self._process_html, + 'application/json': self._process_json, + 'application/x-ndjson': self._process_jsonl, # JSONL support + 'text/markdown': self._process_markdown, + 'text/csv': self._process_csv + } + + self.stats = { + "documents_indexed": 0, + "documents_processed": 0, + "total_processing_time": 0, + "average_processing_time": 0, + "searches_performed": 0, + "average_search_time": 0.0, + "cache_hits": 0, + "errors": 0, + "supported_types": len(self.supported_types) + } + self.search_cache = {} + + def get_required_permissions(self) -> List[Permission]: + """Return list of permissions this module requires""" + return [ + Permission("documents", "index", "Index new documents"), + Permission("documents", "search", "Search documents"), + Permission("documents", "delete", "Delete documents"), + Permission("collections", "manage", "Manage collections"), + Permission("settings", "configure", "Configure RAG settings") + ] + + async def initialize(self): + """Initialize the RAG module with content processing capabilities""" + + try: + # Initialize Qdrant client + qdrant_host = getattr(settings, 'QDRANT_HOST', 'localhost') + qdrant_port = getattr(settings, 'QDRANT_PORT', 6333) + qdrant_url = f"http://{qdrant_host}:{qdrant_port}" + self.qdrant_client = QdrantClient(url=qdrant_url) + + # Initialize tokenizer + self.tokenizer = tiktoken.get_encoding("cl100k_base") + + # Initialize embedding model + self.embedding_model = await self._initialize_embedding_model() + + # Initialize content processing components + await self._initialize_content_processing() + + # Create default collection if it doesn't exist + await self._ensure_collection_exists(self.default_collection_name) + + self.enabled = True + self.initialized = True + log_module_event("rag", "initialized", { + "vector_db": self.config.get("vector_db", "qdrant"), + "embedding_model": self.embedding_model.get("model_name", "intfloat/multilingual-e5-large-instruct"), + "chunk_size": self.config.get("chunk_size", 400), + "max_results": self.config.get("max_results", 10), + "supported_file_types": list(self.supported_types.keys()), + "nltk_ready": True, + "spacy_ready": self.nlp_model is not None, + "markitdown_ready": self.markitdown is not None + }) + + except Exception as e: + logger.error(f"Failed to initialize RAG module: {e}") + log_module_event("rag", "initialization_failed", {"error": str(e)}) + self.enabled = False + raise + + def _generate_file_hash(self, content: bytes) -> str: + """Generate SHA-256 hash of file content""" + return hashlib.sha256(content).hexdigest() + + def _detect_mime_type(self, filename: str, content: bytes) -> str: + """Detect MIME type of file""" + # Try to detect from filename + mime_type, _ = mimetypes.guess_type(filename) + if mime_type: + return mime_type + + # Check for JSONL file extension + if filename.lower().endswith('.jsonl'): + return 'application/x-ndjson' + + # Try to detect from content + if content.startswith(b'%PDF'): + return 'application/pdf' + elif content.startswith(b'PK'): + # This could be DOCX, XLSX, or other Office formats + if filename.lower().endswith(('.docx', '.docm')): + return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' + elif filename.lower().endswith(('.xlsx', '.xlsm')): + return 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + else: + return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' + elif content.startswith(b'\xd0\xcf\x11\xe0'): + # Old Office format (DOC, XLS) + if filename.lower().endswith('.xls'): + return 'application/vnd.ms-excel' + else: + return 'application/msword' + elif content.startswith(b' 1 and all(line.strip().startswith('{') for line in lines[:3] if line.strip()): + return 'application/x-ndjson' + except: + pass + return 'application/json' + else: + return 'text/plain' + + def _detect_language(self, text: str) -> Tuple[str, float]: + """Detect language of text (simplified implementation)""" + if len(text) < 50: + return 'unknown', 0.0 + + # Simple heuristic based on common English words + english_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'shall'} + + if NLTK_AVAILABLE: + words = word_tokenize(text.lower()) + else: + # Fallback to simple whitespace tokenization + words = text.lower().split() + + english_count = sum(1 for word in words if word in english_words) + confidence = min(english_count / len(words), 1.0) if words else 0.0 + + return 'en' if confidence > 0.1 else 'unknown', confidence + + def _extract_entities(self, text: str) -> List[Dict[str, Any]]: + """Extract named entities from text""" + if not self.nlp_model: + return [] + + try: + doc = self.nlp_model(text[:10000]) # Limit text length for performance + entities = [] + + for ent in doc.ents: + entities.append({ + "text": ent.text, + "label": ent.label_, + "start": ent.start_char, + "end": ent.end_char, + "confidence": float(ent._.get("score", 0.0)) if hasattr(ent._, "score") else 0.0 + }) + + return entities + + except Exception as e: + logger.error(f"Error extracting entities: {e}") + return [] + + def _extract_keywords(self, text: str, max_keywords: int = 20) -> List[str]: + """Extract keywords from text""" + try: + if NLTK_AVAILABLE: + words = word_tokenize(text.lower()) + else: + # Fallback to simple whitespace tokenization + words = text.lower().split() + + words = [word for word in words if word.isalpha() and word not in self.stop_words] + + if self.lemmatizer and NLTK_AVAILABLE: + words = [self.lemmatizer.lemmatize(word) for word in words] + + # Simple frequency-based keyword extraction + word_freq = {} + for word in words: + word_freq[word] = word_freq.get(word, 0) + 1 + + # Sort by frequency and return top keywords + keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) + return [word for word, freq in keywords[:max_keywords] if freq > 1] + + except Exception as e: + logger.error(f"Error extracting keywords: {e}") + return [] + + def _clean_text(self, text: str) -> str: + """Clean and normalize text""" + if not text: + return "" + + # Remove excessive whitespace + text = re.sub(r'\s+', ' ', text) + + # Remove control characters except newlines and tabs + text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) + + # Normalize quotes + text = re.sub(r'[""''`]', '"', text) + + # Remove excessive punctuation + text = re.sub(r'[.]{3,}', '...', text) + text = re.sub(r'[!]{2,}', '!', text) + text = re.sub(r'[?]{2,}', '?', text) + + return text.strip() + + def _validate_content(self, content: str, file_type: str) -> ContentValidationResult: + """Validate and score content for security and quality""" + issues = [] + security_score = 100.0 + + # Check for potentially malicious content + if ' 1000000: # 1MB limit + issues.append("Content exceeds maximum size limit") + security_score -= 10 + + # Detect language + language, lang_confidence = self._detect_language(content) + + return ContentValidationResult( + is_valid=len(issues) == 0, + issues=issues, + security_score=max(0, security_score), + content_type=file_type, + language_confidence=lang_confidence + ) + + async def cleanup(self): + """Cleanup RAG resources""" + if self.qdrant_client: + self.qdrant_client.close() + self.qdrant_client = None + + if self.embedding_service: + await self.embedding_service.cleanup() + self.embedding_service = None + + # Cleanup content processing resources + self.nlp_model = None + self.lemmatizer = None + self.markitdown = None + self.stop_words.clear() + + self.enabled = False + self.search_cache.clear() + log_module_event("rag", "cleanup", {"success": True}) + + async def _initialize_embedding_model(self): + """Initialize embedding model""" + # Prefer enhanced embedding service (rate limiting + retry) + from app.services.enhanced_embedding_service import enhanced_embedding_service as embedding_service + + # Use intfloat/multilingual-e5-large-instruct for LLM service integration + model_name = self.config.get("embedding_model", "intfloat/multilingual-e5-large-instruct") + embedding_service.model_name = model_name + + # Initialize the embedding service + success = await embedding_service.initialize() + + if success: + self.embedding_service = embedding_service + logger.info(f"Successfully initialized embedding service with {model_name}") + return { + "model_name": model_name, + "dimension": embedding_service.dimension or 768 + } + else: + # Fallback to mock implementation + logger.warning("Failed to initialize embedding model, using fallback") + self.embedding_service = None + return { + "model_name": model_name, + "dimension": 1024 # Default dimension for intfloat/multilingual-e5-large-instruct + } + + async def _initialize_content_processing(self): + """Initialize content processing components""" + try: + # Download required NLTK data + await self._download_nltk_data() + + # Initialize NLP components + if NLTK_AVAILABLE: + self.lemmatizer = WordNetLemmatizer() + self.stop_words = set(stopwords.words('english')) + else: + self.lemmatizer = None + self.stop_words = set() + + # Initialize spaCy model + await self._initialize_spacy_model() + + # Initialize MarkItDown + if MARKITDOWN_AVAILABLE: + self.markitdown = MarkItDown() + else: + self.markitdown = None + + except Exception as e: + logger.warning(f"Failed to initialize some content processing components: {e}") + + async def _download_nltk_data(self): + """Download required NLTK data""" + if not NLTK_AVAILABLE: + return + + try: + nltk.download('punkt', quiet=True) + nltk.download('stopwords', quiet=True) + nltk.download('wordnet', quiet=True) + nltk.download('averaged_perceptron_tagger', quiet=True) + nltk.download('omw-1.4', quiet=True) + except Exception as e: + logger.warning(f"Failed to download NLTK data: {e}") + + async def _initialize_spacy_model(self): + """Initialize spaCy model for NLP tasks""" + if not SPACY_AVAILABLE: + self.nlp_model = None + return + + try: + self.nlp_model = spacy.load("en_core_web_sm") + except OSError: + logger.warning("spaCy model 'en_core_web_sm' not found. NLP features will be limited.") + self.nlp_model = None + + async def _get_collections_safely(self) -> List[str]: + """Get list of collections using raw HTTP to avoid Pydantic validation issues""" + try: + import httpx + qdrant_host = getattr(settings, 'QDRANT_HOST', 'localhost') + qdrant_port = getattr(settings, 'QDRANT_PORT', 6333) + qdrant_url = f"http://{qdrant_host}:{qdrant_port}" + + async with httpx.AsyncClient() as client: + response = await client.get(f"{qdrant_url}/collections") + if response.status_code == 200: + data = response.json() + result = data.get("result", {}) + collections = result.get("collections", []) + return [col.get("name", "") for col in collections if col.get("name")] + else: + logger.warning(f"Failed to get collections via HTTP: {response.status_code}") + return [] + except Exception as e: + logger.error(f"Error getting collections safely: {e}") + # Fallback to direct client call with error handling + try: + collections = self.qdrant_client.get_collections() + return [col.name for col in collections.collections] + except Exception as fallback_error: + logger.error(f"Fallback collection fetch also failed: {fallback_error}") + return [] + + async def _get_collection_info_safely(self, collection_name: str) -> Dict[str, Any]: + """Get collection information using raw HTTP to avoid Pydantic validation issues""" + try: + import httpx + qdrant_host = getattr(settings, 'QDRANT_HOST', 'localhost') + qdrant_port = getattr(settings, 'QDRANT_PORT', 6333) + qdrant_url = f"http://{qdrant_host}:{qdrant_port}" + + async with httpx.AsyncClient() as client: + response = await client.get(f"{qdrant_url}/collections/{collection_name}") + if response.status_code == 200: + data = response.json() + result = data.get("result", {}) + + # Extract relevant information safely + collection_info = { + "points_count": result.get("points_count", 0), + "status": result.get("status", "unknown"), + "vector_size": 384 # Default fallback + } + + # Try to get vector dimension from config + try: + config = result.get("config", {}) + params = config.get("params", {}) + vectors = params.get("vectors", {}) + + if isinstance(vectors, dict) and "size" in vectors: + collection_info["vector_size"] = vectors["size"] + elif isinstance(vectors, dict): + # Handle named vectors or default vector + if 'default' in vectors: + collection_info["vector_size"] = vectors['default'].get('size', 384) + else: + # Take first vector config if no default + first_vector = next(iter(vectors.values()), {}) + collection_info["vector_size"] = first_vector.get('size', 384) + except Exception: + # Keep default fallback + pass + + return collection_info + else: + logger.warning(f"Failed to get collection info via HTTP: {response.status_code}") + return {"points_count": 0, "status": "error", "vector_size": 384} + except Exception as e: + logger.error(f"Error getting collection info safely: {e}") + return {"points_count": 0, "status": "error", "vector_size": 384} + + async def _ensure_collection_exists(self, collection_name: str = None): + """Ensure the specified collection exists""" + collection_name = collection_name or self.default_collection_name + + try: + # Use safe collection fetching to avoid Pydantic validation errors + collection_names = await self._get_collections_safely() + + if collection_name not in collection_names: + # Create collection + self.qdrant_client.create_collection( + collection_name=collection_name, + vectors_config=VectorParams( + size=self.embedding_model.get("dimension", 768), + distance=Distance.COSINE + ) + ) + log_module_event("rag", "collection_created", {"collection": collection_name}) + + except Exception as e: + logger.error(f"Error ensuring collection exists: {e}") + raise + + async def create_collection(self, collection_name: str) -> bool: + """Create a new Qdrant collection""" + try: + await self._ensure_collection_exists(collection_name) + return True + except Exception as e: + logger.error(f"Error creating collection {collection_name}: {e}") + return False + + async def delete_collection(self, collection_name: str) -> bool: + """Delete a Qdrant collection""" + try: + # Use safe collection fetching to avoid Pydantic validation errors + collection_names = await self._get_collections_safely() + + if collection_name in collection_names: + self.qdrant_client.delete_collection(collection_name) + log_module_event("rag", "collection_deleted", {"collection": collection_name}) + return True + else: + logger.warning(f"Collection {collection_name} does not exist") + return False + + except Exception as e: + logger.error(f"Error deleting collection {collection_name}: {e}") + return False + + async def _generate_embedding(self, text: str) -> List[float]: + """Generate embedding for text""" + if self.embedding_service: + # Use real embedding service + return await self.embedding_service.get_embedding(text) + else: + # Fallback to deterministic random embedding for consistency + np.random.seed(hash(text) % 2**32) + return np.random.random(self.embedding_model.get("dimension", 768)).tolist() + + async def _generate_embeddings(self, texts: List[str], is_document: bool = True) -> List[List[float]]: + """Generate embeddings for multiple texts (batch processing)""" + if self.embedding_service: + # Add task-specific prefixes for better E5 model performance + if is_document: + # For document passages, use "passage:" prefix + prefixed_texts = [f"passage: {text}" for text in texts] + else: + # For queries, use "query:" prefix (handled in search method) + prefixed_texts = texts + + # Use real embedding service for batch processing + return await self.embedding_service.get_embeddings(prefixed_texts) + else: + # Fallback to individual processing + embeddings = [] + for text in texts: + embedding = await self._generate_embedding(text) + embeddings.append(embedding) + return embeddings + + def _chunk_text(self, text: str, chunk_size: int = None) -> List[str]: + """Split text into overlapping chunks for better context preservation""" + chunk_size = chunk_size or self.config.get("chunk_size", 300) + chunk_overlap = self.config.get("chunk_overlap", 50) + + # Tokenize text + tokens = self.tokenizer.encode(text) + + # Split into chunks with overlap + chunks = [] + start_idx = 0 + + while start_idx < len(tokens): + end_idx = min(start_idx + chunk_size, len(tokens)) + chunk_tokens = tokens[start_idx:end_idx] + chunk_text = self.tokenizer.decode(chunk_tokens) + + # Only add non-empty chunks + if chunk_text.strip(): + chunks.append(chunk_text) + + # Move to next chunk with overlap + start_idx = end_idx - chunk_overlap + + # Ensure progress (in case overlap >= chunk_size) + if start_idx >= end_idx: + start_idx = end_idx + + return chunks + + async def _process_text(self, content: bytes, filename: str) -> str: + """Process plain text files""" + try: + # Try different encodings + for encoding in ['utf-8', 'latin-1', 'cp1252']: + try: + return content.decode(encoding) + except UnicodeDecodeError: + continue + + # Fallback to utf-8 with error handling + return content.decode('utf-8', errors='replace') + + except Exception as e: + logger.error(f"Error processing text file: {e}") + return "" + + async def _process_with_markitdown(self, content: bytes, filename: str) -> str: + """Process documents using MarkItDown (PDF, DOCX, DOC, XLSX, XLS)""" + try: + if not self.markitdown: + raise RuntimeError("MarkItDown not initialized") + + # Create a temporary file path for the content + import tempfile + import os + + # Get file extension from filename + file_ext = Path(filename).suffix.lower() + if not file_ext: + # Try to determine extension from mime type + mime_type = self._detect_mime_type(filename, content) + if mime_type == 'application/pdf': + file_ext = '.pdf' + elif mime_type in ['application/vnd.openxmlformats-officedocument.wordprocessingml.document']: + file_ext = '.docx' + elif mime_type == 'application/msword': + file_ext = '.doc' + elif mime_type == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': + file_ext = '.xlsx' + elif mime_type == 'application/vnd.ms-excel': + file_ext = '.xls' + else: + file_ext = '.bin' + + # Write content to temporary file + with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as temp_file: + temp_file.write(content) + temp_path = temp_file.name + + try: + # Convert document to markdown using MarkItDown in a thread pool to avoid blocking + import concurrent.futures + import asyncio + + logger.info(f"Starting MarkItDown conversion for {filename}") + + def convert_sync(): + """Synchronous conversion function to run in thread pool""" + return self.markitdown.convert(temp_path) + + # Run the synchronous conversion in a thread pool with timeout + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor() as executor: + try: + result = await asyncio.wait_for( + loop.run_in_executor(executor, convert_sync), + timeout=120.0 # 2 minute timeout for MarkItDown conversion + ) + except asyncio.TimeoutError: + logger.error(f"MarkItDown conversion timed out for {filename}") + raise RuntimeError(f"Document conversion timed out after 2 minutes for {filename}") + + if result and hasattr(result, 'text_content'): + converted_text = result.text_content + elif result and isinstance(result, str): + converted_text = result + else: + # Fallback if result format is unexpected + converted_text = str(result) if result else "" + + logger.info(f"Successfully converted {filename} using MarkItDown ({len(converted_text)} characters)") + return converted_text + + finally: + # Clean up temporary file + try: + os.unlink(temp_path) + except OSError: + pass + + except Exception as e: + logger.error(f"Error processing {filename} with MarkItDown: {e}") + # Fallback to basic text extraction attempt + try: + return content.decode('utf-8', errors='replace') + except: + return f"Error processing {filename}: {str(e)}" + + async def _process_docx(self, content: bytes, filename: str) -> str: + """Process DOCX files using python-docx (more reliable than MarkItDown)""" + try: + if not PYTHON_DOCX_AVAILABLE: + logger.warning(f"python-docx not available, falling back to MarkItDown for {filename}") + return await self._process_with_markitdown(content, filename) + + # Create a temporary file for python-docx processing + import tempfile + import os + + logger.info(f"Starting DOCX processing for {filename} using python-docx") + + with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as temp_file: + temp_file.write(content) + temp_path = temp_file.name + + try: + # Process in a thread pool to avoid blocking + import concurrent.futures + import asyncio + + def extract_docx_text(): + """Extract text from DOCX file synchronously""" + doc = DocxDocument(temp_path) + text_parts = [] + + # Extract paragraphs + for paragraph in doc.paragraphs: + if paragraph.text.strip(): + text_parts.append(paragraph.text.strip()) + + # Extract text from tables + for table in doc.tables: + for row in table.rows: + row_text = [] + for cell in row.cells: + if cell.text.strip(): + row_text.append(cell.text.strip()) + if row_text: + text_parts.append(" | ".join(row_text)) + + return "\n\n".join(text_parts) + + # Run extraction in thread pool with timeout + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor() as executor: + try: + extracted_text = await asyncio.wait_for( + loop.run_in_executor(executor, extract_docx_text), + timeout=30.0 # 30 second timeout for DOCX processing + ) + except asyncio.TimeoutError: + logger.error(f"DOCX processing timed out for {filename}") + raise RuntimeError(f"DOCX processing timed out after 30 seconds for {filename}") + + logger.info(f"Successfully processed {filename} using python-docx ({len(extracted_text)} characters)") + return extracted_text + + finally: + # Clean up temporary file + try: + os.unlink(temp_path) + except OSError: + pass + + except Exception as e: + logger.error(f"Error processing DOCX file {filename}: {e}") + # Fallback to MarkItDown if python-docx fails + try: + logger.info(f"Falling back to MarkItDown for {filename}") + return await self._process_with_markitdown(content, filename) + except Exception as fallback_error: + logger.error(f"Both python-docx and MarkItDown failed for {filename}: {fallback_error}") + return f"Error processing DOCX {filename}: {str(e)}" + + async def _process_html(self, content: bytes, filename: str) -> str: + """Process HTML files""" + try: + html_content = content.decode('utf-8', errors='replace') + # Simple HTML tag removal + text = re.sub(r'<[^>]+>', '', html_content) + # Decode HTML entities + text = text.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace(''', "'") + return text + + except Exception as e: + logger.error(f"Error processing HTML file: {e}") + return "" + + async def _process_json(self, content: bytes, filename: str) -> str: + """Process JSON files""" + try: + json_data = json.loads(content.decode('utf-8')) + # Convert JSON to readable text + return json.dumps(json_data, indent=2) + + except Exception as e: + logger.error(f"Error processing JSON file: {e}") + return "" + + async def _process_markdown(self, content: bytes, filename: str) -> str: + """Process Markdown files""" + try: + md_content = content.decode('utf-8', errors='replace') + # Simple markdown processing - remove formatting + text = re.sub(r'#+\s*', '', md_content) # Remove headers + text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) # Bold + text = re.sub(r'\*(.+?)\*', r'\1', text) # Italic + text = re.sub(r'`(.+?)`', r'\1', text) # Code + text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) # Links + return text + + except Exception as e: + logger.error(f"Error processing Markdown file: {e}") + return "" + + async def _process_csv(self, content: bytes, filename: str) -> str: + """Process CSV files""" + try: + csv_content = content.decode('utf-8', errors='replace') + # Convert CSV to readable text + lines = csv_content.split('\n') + processed_lines = [] + + for line in lines[:100]: # Limit to first 100 lines + if line.strip(): + processed_lines.append(line.replace(',', ' | ')) + + return '\n'.join(processed_lines) + + except Exception as e: + logger.error(f"Error processing CSV file: {e}") + return "" + + async def _process_jsonl(self, content: bytes, filename: str) -> str: + """Process JSONL files (newline-delimited JSON) + + Specifically optimized for helpjuice-export.jsonl format: + - Each line contains a JSON object with 'id' and 'payload' + - Payload contains 'question', 'language', and 'answer' fields + - Combines question and answer into searchable content + + Performance optimizations: + - Processes articles in smaller batches to reduce memory usage + - Uses streaming approach for large files + """ + try: + # Use streaming approach for large files + jsonl_content = content.decode('utf-8', errors='replace') + lines = jsonl_content.strip().split('\n') + + processed_articles = [] + batch_size = 50 # Process in batches of 50 articles + + for line_num, line in enumerate(lines, 1): + if not line.strip(): + continue + + try: + # Parse each JSON line + data = json.loads(line) + + # Handle helpjuice export format + if 'payload' in data: + payload = data['payload'] + article_id = data.get('id', f'article_{line_num}') + + # Extract fields + question = payload.get('question', '') + answer = payload.get('answer', '') + language = payload.get('language', 'EN') + + # Combine question and answer for better search + if question or answer: + # Format as Q&A for better context + article_text = f"## {question}\n\n{answer}\n\n" + + # Add language tag if not English + if language != 'EN': + article_text = f"[{language}] {article_text}" + + # Add metadata separator + article_text += f"---\nArticle ID: {article_id}\nLanguage: {language}\n\n" + + processed_articles.append(article_text) + + # Handle generic JSONL format + else: + # Convert the entire JSON object to readable text + json_text = json.dumps(data, indent=2, ensure_ascii=False) + processed_articles.append(json_text + "\n\n") + + except json.JSONDecodeError as e: + logger.warning(f"Error parsing JSONL line {line_num}: {e}") + continue + except Exception as e: + logger.warning(f"Error processing JSONL line {line_num}: {e}") + continue + + # Combine all articles + combined_text = '\n'.join(processed_articles) + + logger.info(f"Successfully processed {len(processed_articles)} articles from JSONL file {filename}") + return combined_text + + except Exception as e: + logger.error(f"Error processing JSONL file {filename}: {e}") + return "" + + def _generate_document_id(self, content: str, metadata: Dict[str, Any]) -> str: + """Generate unique document ID""" + content_hash = hashlib.sha256(content.encode()).hexdigest()[:16] + metadata_hash = hashlib.sha256(json.dumps(metadata, sort_keys=True).encode()).hexdigest()[:8] + return f"{content_hash}_{metadata_hash}" + + async def process_document(self, file_data: bytes, filename: str, metadata: Dict[str, Any] = None) -> ProcessedDocument: + """Process a document and extract content""" + if not self.enabled: + raise RuntimeError("RAG module not initialized") + + import time + start_time = time.time() + + try: + logger.info(f"Starting document processing pipeline for {filename}") + + # Generate file hash and ID + file_hash = self._generate_file_hash(file_data) + doc_id = f"{file_hash}_{int(time.time())}" + logger.info(f"Generated document ID: {doc_id}") + + # Detect MIME type + mime_type = self._detect_mime_type(filename, file_data) + file_type = mime_type.split('/')[0] + logger.info(f"Detected MIME type: {mime_type}, file type: {file_type}") + + # Check if file type is supported + if mime_type not in self.supported_types: + raise ValueError(f"Unsupported file type: {mime_type}") + + # Extract content using appropriate processor + processor = self.supported_types[mime_type] + logger.info(f"Using processor: {processor.__name__} for {filename}") + extracted_text = await processor(file_data, filename) + logger.info(f"Content extraction completed for {filename}, extracted {len(extracted_text)} characters") + + # Clean the extracted text + logger.info(f"Starting text cleaning for {filename}") + cleaned_text = self._clean_text(extracted_text) + logger.info(f"Text cleaning completed for {filename}, final text length: {len(cleaned_text)}") + + # Validate content + logger.info(f"Starting content validation for {filename}") + validation_result = self._validate_content(cleaned_text, file_type) + logger.info(f"Content validation completed for {filename}") + + if not validation_result.is_valid: + logger.warning(f"Content validation issues: {validation_result.issues}") + + # Extract linguistic features + logger.info(f"Starting linguistic analysis for {filename}") + if NLTK_AVAILABLE and cleaned_text: + logger.info(f"Using NLTK for tokenization of {filename}") + sentences = sent_tokenize(cleaned_text) + words = word_tokenize(cleaned_text) + elif cleaned_text: + logger.info(f"Using fallback tokenization for {filename}") + # Fallback to simple tokenization + sentences = cleaned_text.split('.') + words = cleaned_text.split() + else: + logger.warning(f"No text content for linguistic analysis in {filename}") + sentences = [] + words = [] + + logger.info(f"Tokenization completed for {filename}: {len(sentences)} sentences, {len(words)} words") + + # Detect language + logger.info(f"Starting language detection for {filename}") + language, lang_confidence = self._detect_language(cleaned_text) + logger.info(f"Language detection completed for {filename}: {language} (confidence: {lang_confidence:.2f})") + + # Extract entities and keywords + logger.info(f"Starting entity extraction for {filename}") + entities = self._extract_entities(cleaned_text) + logger.info(f"Entity extraction completed for {filename}: found {len(entities)} entities") + + logger.info(f"Starting keyword extraction for {filename}") + keywords = self._extract_keywords(cleaned_text) + logger.info(f"Keyword extraction completed for {filename}: found {len(keywords)} keywords") + + # Calculate processing time + processing_time = time.time() - start_time + + # Create processed document + logger.info(f"Creating ProcessedDocument object for {filename}") + processed_doc = ProcessedDocument( + id=doc_id, + original_filename=filename, + file_type=file_type, + mime_type=mime_type, + content=cleaned_text, + extracted_text=extracted_text, + metadata={ + **(metadata or {}), + "validation": asdict(validation_result), + "file_size": len(file_data), + "processing_stats": { + "processing_time": processing_time, + "processor_used": processor.__name__ + } + }, + word_count=len(words), + sentence_count=len(sentences), + language=language, + entities=entities, + keywords=keywords, + processing_time=processing_time, + processed_at=datetime.utcnow(), + file_hash=file_hash, + file_size=len(file_data) + ) + logger.info(f"ProcessedDocument created for {filename}") + + # Update stats + self.stats["documents_processed"] += 1 + self.stats["total_processing_time"] += processing_time + self.stats["average_processing_time"] = ( + self.stats["total_processing_time"] / self.stats["documents_processed"] + ) + + log_module_event("rag", "document_processed", { + "document_id": doc_id, + "filename": filename, + "file_type": file_type, + "word_count": len(words), + "processing_time": processing_time, + "language": language, + "entities_count": len(entities), + "keywords_count": len(keywords) + }) + + logger.info(f"Document processing completed successfully for {filename} in {processing_time:.2f} seconds") + return processed_doc + + except Exception as e: + self.stats["errors"] += 1 + logger.error(f"Error processing document {filename}: {e}") + log_module_event("rag", "processing_failed", { + "filename": filename, + "error": str(e) + }) + raise + + async def index_document(self, content: str, metadata: Dict[str, Any] = None, collection_name: str = None) -> str: + """Index a document in the vector database (backward compatibility method)""" + if not self.enabled: + raise RuntimeError("RAG module not initialized") + + collection_name = collection_name or self.default_collection_name + metadata = metadata or {} + + try: + # Ensure collection exists + await self._ensure_collection_exists(collection_name) + + # Generate document ID + doc_id = self._generate_document_id(content, metadata) + + # Check if document already exists + if await self._document_exists(doc_id, collection_name): + log_module_event("rag", "document_exists", {"document_id": doc_id, "collection": collection_name}) + return doc_id + + # Chunk the document + chunks = self._chunk_text(content) + + # Generate embeddings for all chunks in batch (more efficient) + embeddings = await self._generate_embeddings(chunks, is_document=True) + + # Create document points + points = [] + for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): + chunk_id = str(uuid.uuid4()) + + chunk_metadata = { + **metadata, + "document_id": doc_id, + "chunk_index": i, + "chunk_count": len(chunks), + "content": chunk, + "indexed_at": datetime.utcnow().isoformat() + } + + points.append(PointStruct( + id=chunk_id, + vector=embedding, + payload=chunk_metadata + )) + + # Insert points into Qdrant + self.qdrant_client.upsert( + collection_name=collection_name, + points=points + ) + + self.stats["documents_indexed"] += 1 + log_module_event("rag", "document_indexed", { + "document_id": doc_id, + "collection": collection_name, + "chunks": len(chunks), + "metadata": metadata + }) + + return doc_id + + except Exception as e: + logger.error(f"Error indexing document: {e}") + log_module_event("rag", "indexing_failed", {"error": str(e)}) + raise + + async def index_processed_document(self, processed_doc: ProcessedDocument, collection_name: str = None) -> str: + """Index a processed document in the vector database""" + if not self.enabled: + raise RuntimeError("RAG module not initialized") + + collection_name = collection_name or self.default_collection_name + + try: + # Special handling for JSONL files + if processed_doc.file_type == 'jsonl': + # Import the optimized JSONL processor + from app.services.jsonl_processor import JSONLProcessor + jsonl_processor = JSONLProcessor(self) + + # Read the original file content + with open(processed_doc.metadata.get('file_path', ''), 'rb') as f: + file_content = f.read() + + # Process using the optimized JSONL processor + return await jsonl_processor.process_and_index_jsonl( + collection_name=collection_name, + content=file_content, + filename=processed_doc.original_filename, + metadata=processed_doc.metadata + ) + + # Ensure collection exists + await self._ensure_collection_exists(collection_name) + + # Check if document already exists + if await self._document_exists(processed_doc.id, collection_name): + log_module_event("rag", "document_exists", {"document_id": processed_doc.id, "collection": collection_name}) + return processed_doc.id + + # Chunk the document + chunks = self._chunk_text(processed_doc.content) + + # Generate embeddings for all chunks in batch (more efficient) + embeddings = await self._generate_embeddings(chunks, is_document=True) + + # Create document points with enhanced metadata + points = [] + for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): + chunk_id = str(uuid.uuid4()) + + chunk_metadata = { + **processed_doc.metadata, + "document_id": processed_doc.id, + "original_filename": processed_doc.original_filename, + "file_type": processed_doc.file_type, + "mime_type": processed_doc.mime_type, + "language": processed_doc.language, + "entities": processed_doc.entities, + "keywords": processed_doc.keywords, + "word_count": processed_doc.word_count, + "sentence_count": processed_doc.sentence_count, + "file_hash": processed_doc.file_hash, + "processed_at": processed_doc.processed_at.isoformat(), + "chunk_index": i, + "chunk_count": len(chunks), + "content": chunk, + "indexed_at": datetime.utcnow().isoformat() + } + + points.append(PointStruct( + id=chunk_id, + vector=embedding, + payload=chunk_metadata + )) + + # Insert points into Qdrant + self.qdrant_client.upsert( + collection_name=collection_name, + points=points + ) + + self.stats["documents_indexed"] += 1 + log_module_event("rag", "processed_document_indexed", { + "document_id": processed_doc.id, + "filename": processed_doc.original_filename, + "collection": collection_name, + "chunks": len(chunks), + "file_type": processed_doc.file_type, + "language": processed_doc.language + }) + + return processed_doc.id + + except Exception as e: + logger.error(f"Error indexing processed document: {e}") + log_module_event("rag", "indexing_failed", {"error": str(e)}) + raise + + async def _document_exists(self, document_id: str, collection_name: str = None) -> bool: + """Check if document exists in the collection""" + collection_name = collection_name or self.default_collection_name + + try: + result = self.qdrant_client.search( + collection_name=collection_name, + query_filter=Filter( + must=[FieldCondition(key="document_id", match=MatchValue(value=document_id))] + ), + limit=1 + ) + return len(result) > 0 + except Exception: + return False + + async def _hybrid_search(self, collection_name: str, query: str, query_vector: List[float], + query_filter: Optional[Filter], limit: int, score_threshold: float) -> List[Any]: + """Perform hybrid search combining vector similarity and BM25 scoring""" + + # Preprocess query for BM25 + query_terms = self._preprocess_text_for_bm25(query) + + # Get all documents from the collection (for BM25 scoring) + # Note: In production, you'd want to optimize this with a proper BM25 index + scroll_filter = query_filter or Filter() + all_points = [] + + # Use scroll to get all points + offset = None + batch_size = 100 + while True: + search_result = self.qdrant_client.scroll( + collection_name=collection_name, + scroll_filter=scroll_filter, + limit=batch_size, + offset=offset, + with_payload=True, + with_vectors=False + ) + + points = search_result[0] + all_points.extend(points) + + if len(points) < batch_size: + break + + offset = points[-1].id + + # Calculate BM25 scores for each document + bm25_scores = {} + for point in all_points: + doc_id = point.payload.get("document_id", "") + content = point.payload.get("content", "") + + # Calculate BM25 score + bm25_score = self._calculate_bm25_score(query_terms, content) + bm25_scores[doc_id] = bm25_score + + # Perform vector search + vector_results = self.qdrant_client.search( + collection_name=collection_name, + query_vector=query_vector, + query_filter=query_filter, + limit=limit * 2, # Get more results for re-ranking + score_threshold=score_threshold / 2 # Lower threshold for initial search + ) + + # Combine scores with improved normalization + hybrid_weights = self.config.get("hybrid_weights", {"vector": 0.7, "bm25": 0.3}) + vector_weight = hybrid_weights.get("vector", 0.7) + bm25_weight = hybrid_weights.get("bm25", 0.3) + + # Get score distributions for better normalization + vector_scores = [r.score for r in vector_results] + bm25_scores_list = list(bm25_scores.values()) + + # Calculate statistics for normalization + if vector_scores: + v_max = max(vector_scores) + v_min = min(vector_scores) + v_range = v_max - v_min if v_max != v_min else 1 + else: + v_max, v_min, v_range = 1, 0, 1 + + if bm25_scores_list: + bm25_max = max(bm25_scores_list) + bm25_min = min(bm25_scores_list) + bm25_range = bm25_max - bm25_min if bm25_max != bm25_min else 1 + else: + bm25_max, bm25_min, bm25_range = 1, 0, 1 + + # Create hybrid results with improved scoring + hybrid_results = [] + for result in vector_results: + doc_id = result.payload.get("document_id", "") + vector_score = result.score + bm25_score = bm25_scores.get(doc_id, 0.0) + + # Improved normalization using actual score distributions + vector_norm = (vector_score - v_min) / v_range if v_range > 0 else 0.5 + bm25_norm = (bm25_score - bm25_min) / bm25_range if bm25_range > 0 else 0.5 + + # Apply reciprocal rank fusion for better combination + # This gives more weight to documents that rank highly in both methods + rrf_vector = 1.0 / (1.0 + vector_results.index(result) + 1) # +1 to avoid division by zero + rrf_bm25 = 1.0 / (1.0 + sorted(bm25_scores_list, reverse=True).index(bm25_score) + 1) if bm25_score in bm25_scores_list else 0 + + # Calculate hybrid score using both normalized scores and RRF + hybrid_score = (vector_weight * vector_norm + bm25_weight * bm25_norm) * 0.7 + (rrf_vector + rrf_bm25) * 0.3 + + # Create new point with hybrid score + hybrid_point = ScoredPoint( + id=result.id, + payload=result.payload, + score=hybrid_score, + vector=result.vector, + shard_key=None, + order_value=None + ) + hybrid_results.append(hybrid_point) + + # Sort by hybrid score and apply final threshold + hybrid_results.sort(key=lambda x: x.score, reverse=True) + final_results = [r for r in hybrid_results if r.score >= score_threshold][:limit] + + logger.info(f"Hybrid search: {len(vector_results)} vector results, {len(final_results)} final results") + return final_results + + def _preprocess_text_for_bm25(self, text: str) -> List[str]: + """Preprocess text for BM25 scoring""" + if not NLTK_AVAILABLE: + return text.lower().split() + + try: + # Tokenize + tokens = word_tokenize(text.lower()) + + # Remove stopwords and non-alphabetic tokens + stop_words = set(stopwords.words('english')) + filtered_tokens = [ + token for token in tokens + if token.isalpha() and token not in stop_words and len(token) > 2 + ] + + return filtered_tokens + except: + # Fallback to simple splitting + return text.lower().split() + + def _calculate_bm25_score(self, query_terms: List[str], document: str) -> float: + """Calculate BM25 score for a document against query terms""" + if not query_terms: + return 0.0 + + # Preprocess document + doc_terms = self._preprocess_text_for_bm25(document) + if not doc_terms: + return 0.0 + + # Calculate term frequencies + doc_len = len(doc_terms) + avg_doc_len = 300 # Average document length (configurable) + + # BM25 parameters + k1 = 1.2 # Controls term frequency saturation + b = 0.75 # Controls document length normalization + + score = 0.0 + + # Calculate IDF for each query term + for term in set(query_terms): + # Term frequency in document + tf = doc_terms.count(term) + + # Simple IDF (log(N/n) + 1) + # In production, you'd use the actual document frequency + idf = 2.0 # Simplified IDF + + # BM25 formula + numerator = tf * (k1 + 1) + denominator = tf + k1 * (1 - b + b * (doc_len / avg_doc_len)) + + score += idf * (numerator / denominator) + + # Normalize score to 0-1 range + return min(score / 10.0, 1.0) # Simple normalization + + async def search_documents(self, query: str, max_results: int = None, filters: Dict[str, Any] = None, collection_name: str = None, score_threshold: float = None) -> List[SearchResult]: + """Search for relevant documents""" + if not self.enabled: + raise RuntimeError("RAG module not initialized") + + collection_name = collection_name or self.default_collection_name + max_results = max_results or self.config.get("max_results", 10) + + # Check cache (include collection name in cache key) + cache_key = f"{collection_name}_{query}_{max_results}_{hash(str(filters))}" + if cache_key in self.search_cache: + self.stats["cache_hits"] += 1 + return self.search_cache[cache_key] + + try: + import time + start_time = time.time() + + # Generate query embedding with task-specific prefix for better retrieval + # The E5 model works better with "query:" prefix for search queries + optimized_query = f"query: {query}" + query_embedding = await self._generate_embedding(optimized_query) + + # Build filter + search_filter = None + if filters: + conditions = [] + for key, value in filters.items(): + conditions.append(FieldCondition(key=key, match=MatchValue(value=value))) + search_filter = Filter(must=conditions) + + # Enhanced debugging for search + logger.info("=== ENHANCED RAG SEARCH DEBUGGING ===") + logger.info(f"Collection: {collection_name}") + logger.info(f"Query: '{query}'") + logger.info(f"Max results requested: {max_results}") + logger.info(f"Query embedding (first 10 values): {query_embedding[:10] if query_embedding else 'None'}") + logger.info(f"Embedding service available: {self.embedding_service is not None}") + + # Check if hybrid search is enabled + enable_hybrid = self.config.get("enable_hybrid", False) + # Use provided score_threshold or fall back to config + search_score_threshold = score_threshold if score_threshold is not None else self.config.get("score_threshold", 0.3) + + if enable_hybrid and NLTK_AVAILABLE: + # Perform hybrid search (vector + BM25) + search_results = await self._hybrid_search( + collection_name=collection_name, + query=query, + query_vector=query_embedding, + query_filter=search_filter, + limit=max_results, + score_threshold=search_score_threshold + ) + else: + # Pure vector search with improved threshold + search_results = self.qdrant_client.search( + collection_name=collection_name, + query_vector=query_embedding, + query_filter=search_filter, + limit=max_results, + score_threshold=search_score_threshold + ) + + logger.info(f"Raw search results count: {len(search_results)}") + + # Process results + results = [] + document_scores = {} + + for i, result in enumerate(search_results): + doc_id = result.payload.get("document_id") + content = result.payload.get("content", "") + score = result.score + + # Log each raw result for debugging + logger.info(f"\n--- Raw Result {i+1} ---") + logger.info(f"Score: {score}") + logger.info(f"Document ID: {doc_id}") + logger.info(f"Content preview (first 200 chars): {content[:200]}") + logger.info(f"Metadata keys: {list(result.payload.keys())}") + + # Aggregate scores by document + if doc_id in document_scores: + document_scores[doc_id]["score"] = max(document_scores[doc_id]["score"], score) + document_scores[doc_id]["content"] += "\n" + content + else: + document_scores[doc_id] = { + "score": score, + "content": content, + "metadata": {k: v for k, v in result.payload.items() if k not in ["content", "document_id"]} + } + + logger.info(f"\nAggregated documents count: {len(document_scores)}") + logger.info("=== END ENHANCED RAG SEARCH DEBUGGING ===") + + # Create SearchResult objects + for doc_id, data in document_scores.items(): + document = Document( + id=doc_id, + content=data["content"], + metadata=data["metadata"] + ) + + search_result = SearchResult( + document=document, + score=data["score"], + relevance_score=min(data["score"] * 100, 100) + ) + + results.append(search_result) + + # Sort by score + results.sort(key=lambda x: x.score, reverse=True) + + # Update stats + search_time = time.time() - start_time + self.stats["searches_performed"] += 1 + self.stats["average_search_time"] = ( + (self.stats["average_search_time"] * (self.stats["searches_performed"] - 1) + search_time) / + self.stats["searches_performed"] + ) + + # Cache results + self.search_cache[cache_key] = results + + log_module_event("rag", "search_completed", { + "query": query, + "collection": collection_name, + "results_count": len(results), + "search_time": search_time + }) + + return results + + except Exception as e: + logger.error(f"Error searching documents in collection {collection_name}: {e}") + log_module_event("rag", "search_failed", {"error": str(e), "collection": collection_name}) + raise + + async def delete_document(self, document_id: str, collection_name: str = None) -> bool: + """Delete a document from the vector database""" + if not self.enabled: + raise RuntimeError("RAG module not initialized") + + collection_name = collection_name or self.default_collection_name + + try: + # Delete all chunks for this document + self.qdrant_client.delete( + collection_name=collection_name, + points_selector=models.FilterSelector( + filter=Filter( + must=[FieldCondition(key="document_id", match=MatchValue(value=document_id))] + ) + ) + ) + + log_module_event("rag", "document_deleted", {"document_id": document_id, "collection": collection_name}) + return True + + except Exception as e: + logger.error(f"Error deleting document from collection {collection_name}: {e}") + log_module_event("rag", "deletion_failed", {"error": str(e), "collection": collection_name}) + return False + + async def get_stats(self) -> Dict[str, Any]: + """Get RAG module statistics""" + stats = self.stats.copy() + + if self.enabled: + try: + # Use raw HTTP call to avoid Pydantic validation issues + import httpx + + # Direct HTTP call to Qdrant API instead of using client to avoid Pydantic issues + qdrant_url = f"http://{settings.QDRANT_HOST}:{settings.QDRANT_PORT}" + + async with httpx.AsyncClient() as client: + response = await client.get(f"{qdrant_url}/collections/{self.default_collection_name}") + + if response.status_code == 200: + collection_data = response.json() + + # Safely extract stats from raw JSON + result = collection_data.get("result", {}) + + basic_stats = { + "total_points": result.get("points_count", 0), + "collection_status": result.get("status", "unknown"), + } + + # Try to get vector dimension from config + try: + config = result.get("config", {}) + params = config.get("params", {}) + vectors = params.get("vectors", {}) + + if isinstance(vectors, dict) and "size" in vectors: + basic_stats["vector_dimension"] = vectors["size"] + else: + basic_stats["vector_dimension"] = "unknown" + except Exception as config_error: + logger.debug(f"Could not get vector dimension: {config_error}") + basic_stats["vector_dimension"] = "unknown" + + stats.update(basic_stats) + else: + # Collection doesn't exist or error + stats.update({ + "total_points": 0, + "collection_status": "not_found", + "vector_dimension": "unknown" + }) + + except Exception as e: + logger.debug(f"Could not get Qdrant stats (using fallback): {e}") + # Add basic fallback stats without logging as error since this is not critical + stats.update({ + "total_points": 0, + "collection_status": "unavailable", + "vector_dimension": "unknown" + }) + else: + stats.update({ + "total_points": 0, + "collection_status": "disabled", + "vector_dimension": "unknown" + }) + + return stats + + async def process_request(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Process a module request through the interceptor pattern""" + if not self.enabled: + raise RuntimeError("RAG module not initialized") + + action = request.get("action", "search") + + if action == "search": + query = request.get("query") + if not query: + raise ValueError("Query is required for search action") + + max_results = request.get("max_results", self.config.get("max_results", 10)) + filters = request.get("filters", {}) + + results = await self.search_documents(query, max_results, filters) + + return { + "action": "search", + "query": query, + "results": [ + { + "document_id": result.document.id, + "content": result.document.content, + "metadata": result.document.metadata, + "score": result.score, + "relevance_score": result.relevance_score + } + for result in results + ], + "total_results": len(results), + "cache_hit": False # Would be determined by search logic + } + + elif action == "index": + content = request.get("content") + if not content: + raise ValueError("Content is required for index action") + + metadata = request.get("metadata", {}) + document_id = await self.index_document(content, metadata) + + return { + "action": "index", + "document_id": document_id, + "status": "success", + "message": "Document indexed successfully" + } + + elif action == "process": + file_data = request.get("file_data") + filename = request.get("filename") + if not file_data or not filename: + raise ValueError("File data and filename are required for process action") + + # Decode base64 file data if provided as string + if isinstance(file_data, str): + import base64 + file_data = base64.b64decode(file_data) + + metadata = request.get("metadata", {}) + processed_doc = await self.process_document(file_data, filename, metadata) + + return { + "action": "process", + "document_id": processed_doc.id, + "filename": processed_doc.original_filename, + "file_type": processed_doc.file_type, + "mime_type": processed_doc.mime_type, + "word_count": processed_doc.word_count, + "sentence_count": processed_doc.sentence_count, + "language": processed_doc.language, + "entities_count": len(processed_doc.entities), + "keywords_count": len(processed_doc.keywords), + "processing_time": processed_doc.processing_time, + "status": "success", + "message": "Document processed successfully" + } + + elif action == "delete": + document_id = request.get("document_id") + if not document_id: + raise ValueError("Document ID is required for delete action") + + success = await self.delete_document(document_id) + + return { + "action": "delete", + "document_id": document_id, + "status": "success" if success else "failed", + "message": "Document deleted successfully" if success else "Failed to delete document" + } + + elif action == "stats": + stats = await self.get_stats() + + return { + "action": "stats", + "statistics": stats + } + + else: + raise ValueError(f"Unsupported action: {action}") + + async def pre_request_interceptor(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Pre-request interceptor for RAG enhancement""" + if not self.enabled: + return context + + request = context.get("request") + if not request: + return context + + # Check if this is a request that could benefit from RAG + if request.url.path.startswith("/api/v1/chat") or request.url.path.startswith("/api/v1/completions"): + # Extract query/prompt from request + request_body = await request.body() if hasattr(request, 'body') else b"" + + if request_body: + try: + data = json.loads(request_body.decode()) + query = data.get("message", data.get("prompt", "")) + + if query: + # Search for relevant documents + search_results = await self.search_documents(query, max_results=3) + + if search_results: + # Add context to request + context["rag_context"] = [ + { + "content": result.document.content, + "metadata": result.document.metadata, + "relevance_score": result.relevance_score + } + for result in search_results + ] + + log_module_event("rag", "context_added", { + "query": query[:100], + "results_count": len(search_results) + }) + + except Exception as e: + logger.error(f"Error processing RAG request: {e}") + + return context + +# Global RAG instance +rag_module = RAGModule() + +# Module interface functions +async def initialize(config: Dict[str, Any]): + """Initialize RAG module""" + await rag_module.initialize(config) + +async def cleanup(): + """Cleanup RAG module""" + await rag_module.cleanup() + +async def pre_request_interceptor(context: Dict[str, Any]) -> Dict[str, Any]: + """Pre-request interceptor""" + return await rag_module.pre_request_interceptor(context) + +# Additional exported functions +async def process_document(file_data: bytes, filename: str, metadata: Dict[str, Any] = None) -> ProcessedDocument: + """Process a document with full content analysis""" + return await rag_module.process_document(file_data, filename, metadata) + +async def index_document(content: str, metadata: Dict[str, Any] = None, collection_name: str = None) -> str: + """Index a document (backward compatibility)""" + return await rag_module.index_document(content, metadata, collection_name) + +async def index_processed_document(processed_doc: ProcessedDocument, collection_name: str = None) -> str: + """Index a processed document""" + return await rag_module.index_processed_document(processed_doc, collection_name) + +async def search_documents(query: str, max_results: int = None, filters: Dict[str, Any] = None, collection_name: str = None, score_threshold: float = None) -> List[SearchResult]: + """Search documents""" + return await rag_module.search_documents(query, max_results, filters, collection_name, score_threshold) + +async def delete_document(document_id: str, collection_name: str = None) -> bool: + """Delete a document""" + return await rag_module.delete_document(document_id, collection_name) + +async def create_collection(collection_name: str) -> bool: + """Create a new Qdrant collection""" + return await rag_module.create_collection(collection_name) + +async def delete_collection(collection_name: str) -> bool: + """Delete a Qdrant collection""" + return await rag_module.delete_collection(collection_name) + +async def get_supported_types() -> List[str]: + """Get list of supported file types""" + return list(rag_module.supported_types.keys()) diff --git a/backend/app/modules/rag/module.yaml b/backend/app/modules/rag/module.yaml new file mode 100644 index 0000000..cfe8b53 --- /dev/null +++ b/backend/app/modules/rag/module.yaml @@ -0,0 +1,82 @@ +name: rag +version: 1.0.0 +description: "Document search, retrieval, and vector storage" +author: "Enclava Team" +category: "ai" + +# Module lifecycle +enabled: true +auto_start: true +dependencies: [] +optional_dependencies: + - cache + +# Module capabilities +provides: + - "document_storage" + - "semantic_search" + - "vector_embeddings" + - "document_processing" + +consumes: + - "qdrant_connection" + - "llm_embeddings" + - "document_parsing" + +# API endpoints +endpoints: + - path: "/rag/collections" + method: "GET" + description: "List document collections" + + - path: "/rag/upload" + method: "POST" + description: "Upload and process documents" + + - path: "/rag/search" + method: "POST" + description: "Semantic search in documents" + + - path: "/rag/collections/{collection_id}/documents" + method: "GET" + description: "List documents in collection" + +# UI Configuration +ui_config: + icon: "search" + color: "#8B5CF6" + category: "AI & ML" + + forms: + - name: "collection_config" + title: "Collection Settings" + fields: ["name", "description", "embedding_model"] + + - name: "search_config" + title: "Search Configuration" + fields: ["top_k", "similarity_threshold", "rerank_enabled"] + +# Permissions +permissions: + - name: "rag.create" + description: "Create document collections" + + - name: "rag.upload" + description: "Upload documents to collections" + + - name: "rag.search" + description: "Search document collections" + + - name: "rag.manage" + description: "Manage all collections (admin)" + +# Health checks +health_checks: + - name: "qdrant_connectivity" + description: "Check Qdrant vector database connection" + + - name: "embeddings_service" + description: "Check LLM embeddings service" + + - name: "document_processing" + description: "Check document parsing capabilities" \ No newline at end of file diff --git a/backend/app/services/jsonl_processor.py b/backend/app/services/jsonl_processor.py new file mode 100644 index 0000000..001cf08 --- /dev/null +++ b/backend/app/services/jsonl_processor.py @@ -0,0 +1,211 @@ +""" +Optimized JSONL Processor for RAG Module +Handles JSONL files efficiently to prevent resource exhaustion +""" + +import json +import logging +import asyncio +from typing import Dict, Any, List +from datetime import datetime +import uuid + +from qdrant_client.models import PointStruct, Filter, FieldCondition, MatchValue +from qdrant_client.http.models import Batch + +from app.modules.rag.main import ProcessedDocument +# from app.core.analytics import log_module_event # Analytics module not available + +logger = logging.getLogger(__name__) + + +class JSONLProcessor: + """Specialized processor for JSONL files""" + + def __init__(self, rag_module): + self.rag_module = rag_module + self.config = rag_module.config + + async def process_and_index_jsonl(self, collection_name: str, content: bytes, + filename: str, metadata: Dict[str, Any]) -> str: + """Process and index a JSONL file efficiently + + Processes each JSON line as a separate document to avoid + creating thousands of chunks from a single large document. + """ + try: + # Decode content + jsonl_content = content.decode('utf-8', errors='replace') + lines = jsonl_content.strip().split('\n') + + logger.info(f"Processing JSONL file {filename} with {len(lines)} lines") + + # Generate base document ID + base_doc_id = self.rag_module._generate_document_id(jsonl_content, metadata) + + # Process lines in batches + batch_size = 10 # Smaller batches for better memory management + processed_count = 0 + + for batch_start in range(0, len(lines), batch_size): + batch_end = min(batch_start + batch_size, len(lines)) + batch_lines = lines[batch_start:batch_end] + + # Process batch + await self._process_jsonl_batch( + collection_name, + batch_lines, + batch_start, + base_doc_id, + filename, + metadata + ) + + processed_count += len(batch_lines) + + # Log progress + if processed_count % 50 == 0: + logger.info(f"Processed {processed_count}/{len(lines)} lines from {filename}") + + # Small delay to prevent resource exhaustion + await asyncio.sleep(0.05) + + logger.info(f"Successfully processed JSONL file {filename} with {len(lines)} lines") + return base_doc_id + + except Exception as e: + logger.error(f"Error processing JSONL file {filename}: {e}") + raise + + async def _process_jsonl_batch(self, collection_name: str, lines: List[str], + start_idx: int, base_doc_id: str, + filename: str, metadata: Dict[str, Any]) -> None: + """Process a batch of JSONL lines""" + try: + points = [] + + for line_idx, line in enumerate(lines, start=start_idx + 1): + if not line.strip(): + continue + + try: + # Parse JSON line + data = json.loads(line) + + # Debug: check if data is None + if data is None: + logger.warning(f"JSON line {line_idx} parsed as None") + continue + + # Handle helpjuice export format + if 'payload' in data and data['payload'] is not None: + payload = data['payload'] + article_id = data.get('id', f'article_{line_idx}') + + # Extract Q&A + question = payload.get('question', '') + answer = payload.get('answer', '') + language = payload.get('language', 'EN') + + if question or answer: + # Create Q&A content + content = f"Question: {question}\n\nAnswer: {answer}" + + # Create metadata + doc_metadata = { + **metadata, + "article_id": article_id, + "language": language, + "filename": filename, + "line_number": line_idx, + "content_type": "qa_pair", + "question": question[:100], # Truncate for metadata + "processed_at": datetime.utcnow().isoformat() + } + + # Generate single embedding for the Q&A pair + embeddings = await self.rag_module._generate_embeddings([content]) + + # Create point + point_id = str(uuid.uuid4()) + points.append(PointStruct( + id=point_id, + vector=embeddings[0], + payload={ + **doc_metadata, + "document_id": f"{base_doc_id}_{article_id}", + "content": content, + "chunk_index": 0, + "chunk_count": 1 + } + )) + + # Handle generic JSON format + else: + content = json.dumps(data, indent=2, ensure_ascii=False) + + # For larger JSON objects, we might need to chunk + if len(content) > 1000: + chunks = self.rag_module._chunk_text(content, chunk_size=500) + embeddings = await self.rag_module._generate_embeddings(chunks) + + for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): + point_id = str(uuid.uuid4()) + points.append(PointStruct( + id=point_id, + vector=embedding, + payload={ + **metadata, + "filename": filename, + "line_number": line_idx, + "content_type": "json_object", + "document_id": f"{base_doc_id}_line_{line_idx}", + "content": chunk, + "chunk_index": i, + "chunk_count": len(chunks) + } + )) + else: + # Small JSON - no chunking needed + embeddings = await self.rag_module._generate_embeddings([content]) + point_id = str(uuid.uuid4()) + points.append(PointStruct( + id=point_id, + vector=embeddings[0], + payload={ + **metadata, + "filename": filename, + "line_number": line_idx, + "content_type": "json_object", + "document_id": f"{base_doc_id}_line_{line_idx}", + "content": content, + "chunk_index": 0, + "chunk_count": 1 + } + )) + + except json.JSONDecodeError as e: + logger.warning(f"Error parsing JSONL line {line_idx}: {e}") + continue + except Exception as e: + logger.warning(f"Error processing JSONL line {line_idx}: {e}") + continue + + # Insert all points in this batch + if points: + self.rag_module.qdrant_client.upsert( + collection_name=collection_name, + points=points + ) + + # Update stats + self.rag_module.stats["documents_indexed"] += len(points) + # log_module_event("rag", "jsonl_batch_processed", { # Analytics module not available + # "filename": filename, + # "lines_processed": len(lines), + # "points_created": len(points) + # }) + + except Exception as e: + logger.error(f"Error processing JSONL batch: {e}") + raise \ No newline at end of file diff --git a/backend/app/services/qdrant_stats_service.py b/backend/app/services/qdrant_stats_service.py new file mode 100644 index 0000000..b89d446 --- /dev/null +++ b/backend/app/services/qdrant_stats_service.py @@ -0,0 +1,163 @@ +""" +Qdrant Stats Service +Provides direct, live statistics from Qdrant vector database +This is the single source of truth for all RAG collection statistics +""" + +import httpx +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class QdrantStatsService: + """Service for getting live statistics from Qdrant""" + + def __init__(self): + self.qdrant_host = getattr(settings, 'QDRANT_HOST', 'enclava-qdrant') + self.qdrant_port = getattr(settings, 'QDRANT_PORT', 6333) + self.qdrant_url = f"http://{self.qdrant_host}:{self.qdrant_port}" + + async def get_collections_stats(self) -> Dict[str, Any]: + """Get live collection statistics directly from Qdrant""" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + # Get all collections + response = await client.get(f"{self.qdrant_url}/collections") + if response.status_code != 200: + logger.error(f"Failed to get collections: {response.status_code}") + return {"collections": [], "total_documents": 0, "total_size_bytes": 0} + + data = response.json() + result = data.get("result", {}) + collections_data = result.get("collections", []) + + collections = [] + total_documents = 0 + total_size_bytes = 0 + + # Get detailed info for each collection + for col_info in collections_data: + collection_name = col_info.get("name", "") + # Include all collections, not just rag_ ones + + # Get detailed collection info + try: + detail_response = await client.get(f"{self.qdrant_url}/collections/{collection_name}") + if detail_response.status_code == 200: + detail_data = detail_response.json() + detail_result = detail_data.get("result", {}) + + points_count = detail_result.get("points_count", 0) + status = detail_result.get("status", "unknown") + + # Get vector size for size calculation + vector_size = 1024 # Default for multilingual-e5-large + try: + config = detail_result.get("config", {}) + params = config.get("params", {}) + vectors = params.get("vectors", {}) + if isinstance(vectors, dict) and "size" in vectors: + vector_size = vectors["size"] + elif isinstance(vectors, dict) and "default" in vectors: + vector_size = vectors["default"].get("size", 1024) + except Exception: + pass + + # Estimate size (points * vector_size * 4 bytes + 20% metadata overhead) + estimated_size = int(points_count * vector_size * 4 * 1.2) + + # Extract collection metadata for user-friendly name + display_name = collection_name + description = "" + + # Parse collection name to get original name + if collection_name.startswith("rag_"): + parts = collection_name[4:].split("_") + if len(parts) > 1: + # Remove the UUID suffix + uuid_parts = [p for p in parts if len(p) == 8 and all(c in '0123456789abcdef' for c in p)] + for uuid_part in uuid_parts: + parts.remove(uuid_part) + display_name = " ".join(parts).replace("_", " ").title() + + collection_stat = { + "id": collection_name, + "name": display_name, + "description": description, + "document_count": points_count, + "vector_count": points_count, + "size_bytes": estimated_size, + "status": status, + "qdrant_collection_name": collection_name, + "created_at": "", # Not available from Qdrant + "updated_at": datetime.utcnow().isoformat(), + "is_active": status == "green", + "is_managed": True, + "source": "qdrant" + } + + collections.append(collection_stat) + total_documents += points_count + total_size_bytes += estimated_size + + except Exception as e: + logger.error(f"Error getting details for collection {collection_name}: {e}") + continue + + return { + "collections": collections, + "total_documents": total_documents, + "total_size_bytes": total_size_bytes, + "total_collections": len(collections) + } + + except Exception as e: + logger.error(f"Error getting Qdrant stats: {e}") + return {"collections": [], "total_documents": 0, "total_size_bytes": 0, "total_collections": 0} + + async def get_collection_stats(self, collection_name: str) -> Optional[Dict[str, Any]]: + """Get statistics for a specific collection""" + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.get(f"{self.qdrant_url}/collections/{collection_name}") + if response.status_code != 200: + return None + + data = response.json() + result = data.get("result", {}) + + points_count = result.get("points_count", 0) + status = result.get("status", "unknown") + + # Get vector size + vector_size = 1024 + try: + config = result.get("config", {}) + params = config.get("params", {}) + vectors = params.get("vectors", {}) + if isinstance(vectors, dict) and "size" in vectors: + vector_size = vectors["size"] + except Exception: + pass + + estimated_size = int(points_count * vector_size * 4 * 1.2) + + return { + "document_count": points_count, + "vector_count": points_count, + "size_bytes": estimated_size, + "status": status + } + + except Exception as e: + logger.error(f"Error getting collection stats for {collection_name}: {e}") + return None + + +# Global instance +qdrant_stats_service = QdrantStatsService() \ No newline at end of file