diff --git a/backend/Dockerfile b/backend/Dockerfile index 99294d0..a46e469 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -12,6 +12,7 @@ WORKDIR /app RUN apt-get update && apt-get install -y \ build-essential \ libpq-dev \ + postgresql-client \ curl \ ffmpeg \ && rm -rf /var/lib/apt/lists/* @@ -20,12 +21,17 @@ RUN apt-get update && apt-get install -y \ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -# Download spaCy English model for NLP processing -RUN python -m spacy download en_core_web_sm +# Optional: Download spaCy English model for NLP processing (commented out for faster builds) +# Uncomment if you install requirements-nlp.txt and need entity extraction +# RUN python -m spacy download en_core_web_sm # Copy application code COPY . . +# Copy and make migration script executable +COPY scripts/migrate.sh /usr/local/bin/migrate.sh +RUN chmod +x /usr/local/bin/migrate.sh + # Create logs directory RUN mkdir -p logs diff --git a/backend/alembic/versions/9645f764a517_add_zammad_integration_tables.py b/backend/alembic/versions/9645f764a517_add_zammad_integration_tables.py new file mode 100644 index 0000000..2d242d1 --- /dev/null +++ b/backend/alembic/versions/9645f764a517_add_zammad_integration_tables.py @@ -0,0 +1,120 @@ +"""add_zammad_integration_tables + +Revision ID: 9645f764a517 +Revises: 010_add_workflow_tables_only +Create Date: 2025-08-19 19:55:18.895986 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision = '9645f764a517' +down_revision = '010_add_workflow_tables_only' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create zammad_tickets table + op.create_table( + 'zammad_tickets', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('zammad_ticket_id', sa.Integer(), nullable=False), + sa.Column('ticket_number', sa.String(), nullable=False), + sa.Column('title', sa.String(), nullable=False), + sa.Column('state', sa.String(), nullable=False), + sa.Column('priority', sa.String(), nullable=True), + sa.Column('customer_email', sa.String(), nullable=True), + sa.Column('processing_status', sa.String(), nullable=False), + sa.Column('processed_at', sa.DateTime(), nullable=True), + sa.Column('processed_by_user_id', sa.Integer(), nullable=True), + sa.Column('chatbot_id', sa.String(), nullable=True), + sa.Column('summary', sa.Text(), nullable=True), + sa.Column('context_data', postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column('error_message', sa.Text(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.Column('zammad_created_at', sa.DateTime(), nullable=True), + sa.Column('zammad_updated_at', sa.DateTime(), nullable=True), + sa.Column('zammad_article_count', sa.Integer(), nullable=False), + sa.Column('config_snapshot', postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['processed_by_user_id'], ['users.id'], ), + ) + + # Create indexes for zammad_tickets + op.create_index('idx_zammad_tickets_status_created', 'zammad_tickets', ['processing_status', 'created_at']) + op.create_index('idx_zammad_tickets_state_processed', 'zammad_tickets', ['state', 'processed_at']) + op.create_index('idx_zammad_tickets_user_status', 'zammad_tickets', ['processed_by_user_id', 'processing_status']) + op.create_index(op.f('ix_zammad_tickets_id'), 'zammad_tickets', ['id']) + op.create_index(op.f('ix_zammad_tickets_ticket_number'), 'zammad_tickets', ['ticket_number']) + op.create_index(op.f('ix_zammad_tickets_zammad_ticket_id'), 'zammad_tickets', ['zammad_ticket_id'], unique=True) + + # Create zammad_processing_logs table + op.create_table( + 'zammad_processing_logs', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('batch_id', sa.String(), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=False), + sa.Column('completed_at', sa.DateTime(), nullable=True), + sa.Column('initiated_by_user_id', sa.Integer(), nullable=True), + sa.Column('config_used', postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column('filters_applied', postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column('tickets_found', sa.Integer(), nullable=False), + sa.Column('tickets_processed', sa.Integer(), nullable=False), + sa.Column('tickets_failed', sa.Integer(), nullable=False), + sa.Column('tickets_skipped', sa.Integer(), nullable=False), + sa.Column('processing_time_seconds', sa.Integer(), nullable=True), + sa.Column('average_time_per_ticket', sa.Integer(), nullable=True), + sa.Column('errors_encountered', postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column('status', sa.String(), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['initiated_by_user_id'], ['users.id'], ), + ) + + # Create indexes for zammad_processing_logs + op.create_index('idx_processing_logs_batch_status', 'zammad_processing_logs', ['batch_id', 'status']) + op.create_index('idx_processing_logs_user_started', 'zammad_processing_logs', ['initiated_by_user_id', 'started_at']) + op.create_index(op.f('ix_zammad_processing_logs_batch_id'), 'zammad_processing_logs', ['batch_id']) + op.create_index(op.f('ix_zammad_processing_logs_id'), 'zammad_processing_logs', ['id']) + + # Create zammad_configurations table + op.create_table( + 'zammad_configurations', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('is_default', sa.Boolean(), nullable=False), + sa.Column('is_active', sa.Boolean(), nullable=False), + sa.Column('zammad_url', sa.String(), nullable=False), + sa.Column('api_token_encrypted', sa.String(), nullable=False), + sa.Column('chatbot_id', sa.String(), nullable=False), + sa.Column('process_state', sa.String(), nullable=False), + sa.Column('max_tickets', sa.Integer(), nullable=False), + sa.Column('skip_existing', sa.Boolean(), nullable=False), + sa.Column('auto_process', sa.Boolean(), nullable=False), + sa.Column('process_interval', sa.Integer(), nullable=False), + sa.Column('summary_template', sa.Text(), nullable=True), + sa.Column('custom_settings', postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.Column('last_used_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ), + ) + + # Create indexes for zammad_configurations + op.create_index('idx_zammad_config_user_active', 'zammad_configurations', ['user_id', 'is_active']) + op.create_index('idx_zammad_config_user_default', 'zammad_configurations', ['user_id', 'is_default']) + op.create_index(op.f('ix_zammad_configurations_id'), 'zammad_configurations', ['id']) + + +def downgrade() -> None: + # Drop tables in reverse order + op.drop_table('zammad_configurations') + op.drop_table('zammad_processing_logs') + op.drop_table('zammad_tickets') \ No newline at end of file diff --git a/backend/app/api/v1/__init__.py b/backend/app/api/v1/__init__.py index 2236441..2de4cfb 100644 --- a/backend/app/api/v1/__init__.py +++ b/backend/app/api/v1/__init__.py @@ -18,6 +18,7 @@ from .rag import router as rag_router from .chatbot import router as chatbot_router from .prompt_templates import router as prompt_templates_router from .security import router as security_router +from .zammad import router as zammad_router # Create main API router api_router = APIRouter() @@ -65,4 +66,7 @@ api_router.include_router(chatbot_router, prefix="/chatbot", tags=["chatbot"]) api_router.include_router(prompt_templates_router, prefix="/prompt-templates", tags=["prompt-templates"]) # Include security routes -api_router.include_router(security_router, prefix="/security", tags=["security"]) \ No newline at end of file +api_router.include_router(security_router, prefix="/security", tags=["security"]) + +# Include Zammad integration routes +api_router.include_router(zammad_router, prefix="/zammad", tags=["zammad"]) \ No newline at end of file diff --git a/backend/app/api/v1/modules.py b/backend/app/api/v1/modules.py index 43e269d..81bcef8 100644 --- a/backend/app/api/v1/modules.py +++ b/backend/app/api/v1/modules.py @@ -20,13 +20,26 @@ async def list_modules(): modules = [] for module_info in all_modules: - # Convert module_info to API format + # Convert module_info to API format with status field + name = module_info["name"] + is_loaded = module_info["loaded"] # Module is actually loaded in memory + is_enabled = module_info["enabled"] # Module is enabled in config + + # Determine status based on enabled + loaded state + if is_enabled and is_loaded: + status = "running" + elif is_enabled and not is_loaded: + status = "error" # Enabled but failed to load + else: # not is_enabled (regardless of loaded state) + status = "standby" # Disabled + api_module = { - "name": module_info["name"], + "name": name, "version": module_info["version"], "description": module_info["description"], - "initialized": module_info["loaded"], - "enabled": module_info["enabled"] + "initialized": is_loaded, + "enabled": is_enabled, + "status": status # Add status field for frontend compatibility } # Get module statistics if available and module is loaded @@ -58,31 +71,64 @@ async def list_modules(): @router.get("/status") async def get_modules_status(): - """Get summary status of all modules""" + """Get comprehensive module status - CONSOLIDATED endpoint""" log_api_request("get_modules_status", {}) - total_modules = len(module_manager.modules) - running_modules = 0 - standby_modules = 0 - failed_modules = 0 + # Get all discovered modules including disabled ones + all_modules = module_manager.list_all_modules() - for name, module in module_manager.modules.items(): - config = module_manager.module_configs.get(name) - is_initialized = getattr(module, "initialized", False) - is_enabled = config.enabled if config else True + modules_with_status = [] + running_count = 0 + standby_count = 0 + failed_count = 0 + + for module_info in all_modules: + name = module_info["name"] + is_loaded = module_info["loaded"] # Module is actually loaded in memory + is_enabled = module_info["enabled"] # Module is enabled in config - if is_initialized and is_enabled: - running_modules += 1 - elif not is_initialized: - failed_modules += 1 - else: - standby_modules += 1 + # Determine status based on enabled + loaded state + if is_enabled and is_loaded: + status = "running" + running_count += 1 + elif is_enabled and not is_loaded: + status = "failed" # Enabled but failed to load + failed_count += 1 + else: # not is_enabled (regardless of loaded state) + status = "standby" # Disabled + standby_count += 1 + + # Get module statistics if available and loaded + stats = {} + if is_loaded and name in module_manager.modules: + module_instance = module_manager.modules[name] + if hasattr(module_instance, "get_stats"): + try: + import asyncio + if asyncio.iscoroutinefunction(module_instance.get_stats): + stats_result = await module_instance.get_stats() + else: + stats_result = module_instance.get_stats() + stats = stats_result.__dict__ if hasattr(stats_result, "__dict__") else stats_result + except: + stats = {} + + modules_with_status.append({ + "name": name, + "version": module_info["version"], + "description": module_info["description"], + "status": status, + "enabled": is_enabled, + "loaded": is_loaded, + "stats": stats + }) return { - "total": total_modules, - "running": running_modules, - "standby": standby_modules, - "failed": failed_modules, + "modules": modules_with_status, + "total": len(modules_with_status), + "running": running_count, + "standby": standby_count, + "failed": failed_count, "system_initialized": module_manager.initialized } diff --git a/backend/app/api/v1/zammad.py b/backend/app/api/v1/zammad.py new file mode 100644 index 0000000..8fd223c --- /dev/null +++ b/backend/app/api/v1/zammad.py @@ -0,0 +1,664 @@ +""" +Zammad Integration API endpoints +""" + +import asyncio +from typing import Dict, Any, List, Optional +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks +from pydantic import BaseModel, validator +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_, or_, desc +from datetime import datetime + +from app.db.database import get_db +from app.core.logging import log_api_request +from app.services.module_manager import module_manager +from app.core.security import get_current_user +from app.models.user import User +from app.services.api_key_auth import get_api_key_auth +from app.models.api_key import APIKey +from app.models.chatbot import ChatbotInstance + +# Import Zammad models +from modules.zammad.models import ( + ZammadTicket, + ZammadProcessingLog, + ZammadConfiguration, + ProcessingStatus +) + +router = APIRouter() + + +class ZammadConfigurationRequest(BaseModel): + """Request model for creating/updating Zammad configuration""" + name: str + description: Optional[str] = None + is_default: bool = False + zammad_url: str + api_token: str + chatbot_id: str + process_state: str = "open" + max_tickets: int = 10 + skip_existing: bool = True + auto_process: bool = False + process_interval: int = 30 + summary_template: Optional[str] = None + custom_settings: Optional[Dict[str, Any]] = {} + + @validator('zammad_url') + def validate_zammad_url(cls, v): + if not v.startswith(('http://', 'https://')): + raise ValueError('Zammad URL must start with http:// or https://') + return v.rstrip('/') + + @validator('max_tickets') + def validate_max_tickets(cls, v): + if not 1 <= v <= 100: + raise ValueError('max_tickets must be between 1 and 100') + return v + + @validator('process_interval') + def validate_process_interval(cls, v): + if not 5 <= v <= 1440: + raise ValueError('process_interval must be between 5 and 1440 minutes') + return v + + +class ProcessTicketsRequest(BaseModel): + """Request model for processing tickets""" + config_id: Optional[int] = None + filters: Dict[str, Any] = {} + + @validator('filters', pre=True) + def validate_filters(cls, v): + """Ensure filters is always a dict""" + if v is None: + return {} + if isinstance(v, list): + # If someone passes a list, convert to empty dict + return {} + if not isinstance(v, dict): + # If it's some other type, convert to empty dict + return {} + return v + + +class ProcessSingleTicketRequest(BaseModel): + """Request model for processing a single ticket""" + ticket_id: int + config_id: Optional[int] = None + + +class TestConnectionRequest(BaseModel): + """Request model for testing Zammad connection""" + zammad_url: str + api_token: str + + +@router.get("/configurations") +async def get_configurations( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db) +): + """Get all Zammad configurations for the current user""" + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + try: + # Get configurations from database + stmt = ( + select(ZammadConfiguration) + .where(ZammadConfiguration.user_id == user_id) + .where(ZammadConfiguration.is_active == True) + .order_by(ZammadConfiguration.is_default.desc(), ZammadConfiguration.created_at.desc()) + ) + result = await db.execute(stmt) + configurations = [config.to_dict() for config in result.scalars()] + + return { + "configurations": configurations, + "count": len(configurations) + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error fetching configurations: {str(e)}") + + +@router.post("/configurations") +async def create_configuration( + config_request: ZammadConfigurationRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db) +): + """Create a new Zammad configuration""" + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + try: + # Verify chatbot exists and user has access + chatbot_stmt = select(ChatbotInstance).where( + and_( + ChatbotInstance.id == config_request.chatbot_id, + ChatbotInstance.created_by == str(user_id), + ChatbotInstance.is_active == True + ) + ) + chatbot = await db.scalar(chatbot_stmt) + if not chatbot: + raise HTTPException(status_code=404, detail="Chatbot not found or access denied") + + # Use the module to handle configuration creation + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + request_data = { + "action": "save_configuration", + "configuration": config_request.dict() + } + + context = { + "user_id": user_id, + "user_permissions": current_user.get("permissions", []) + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + return result + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error creating configuration: {str(e)}") + + +@router.put("/configurations/{config_id}") +async def update_configuration( + config_id: int, + config_request: ZammadConfigurationRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db) +): + """Update an existing Zammad configuration""" + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + try: + # Check if configuration exists and belongs to user + stmt = select(ZammadConfiguration).where( + and_( + ZammadConfiguration.id == config_id, + ZammadConfiguration.user_id == user_id + ) + ) + existing_config = await db.scalar(stmt) + if not existing_config: + raise HTTPException(status_code=404, detail="Configuration not found") + + # Verify chatbot exists and user has access + chatbot_stmt = select(ChatbotInstance).where( + and_( + ChatbotInstance.id == config_request.chatbot_id, + ChatbotInstance.created_by == str(user_id), + ChatbotInstance.is_active == True + ) + ) + chatbot = await db.scalar(chatbot_stmt) + if not chatbot: + raise HTTPException(status_code=404, detail="Chatbot not found or access denied") + + # Deactivate old configuration and create new one (for audit trail) + existing_config.is_active = False + + # Use the module to handle configuration creation + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + request_data = { + "action": "save_configuration", + "configuration": config_request.dict() + } + + context = { + "user_id": user_id, + "user_permissions": current_user.get("permissions", []) + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + await db.commit() + return result + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error updating configuration: {str(e)}") + + +@router.delete("/configurations/{config_id}") +async def delete_configuration( + config_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db) +): + """Delete (deactivate) a Zammad configuration""" + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + try: + # Check if configuration exists and belongs to user + stmt = select(ZammadConfiguration).where( + and_( + ZammadConfiguration.id == config_id, + ZammadConfiguration.user_id == user_id + ) + ) + config = await db.scalar(stmt) + if not config: + raise HTTPException(status_code=404, detail="Configuration not found") + + # Deactivate instead of deleting (for audit trail) + config.is_active = False + config.updated_at = datetime.utcnow() + + await db.commit() + + return {"message": "Configuration deleted successfully"} + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error deleting configuration: {str(e)}") + + +@router.post("/test-connection") +async def test_connection( + test_request: TestConnectionRequest, + current_user: User = Depends(get_current_user) +): + """Test connection to a Zammad instance""" + try: + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + request_data = { + "action": "test_connection", + "zammad_url": test_request.zammad_url, + "api_token": test_request.api_token + } + + context = { + "user_id": user_id, + "user_permissions": current_user.get("permissions", []) + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error testing connection: {str(e)}") + + +@router.post("/process") +async def process_tickets( + process_request: ProcessTicketsRequest, + background_tasks: BackgroundTasks, + current_user: User = Depends(get_current_user) +): + """Process tickets for summarization""" + try: + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + # Debug logging to identify the issue + import logging + logger = logging.getLogger(__name__) + logger.info(f"Process request filters type: {type(process_request.filters)}") + logger.info(f"Process request filters value: {process_request.filters}") + + # Ensure filters is a dict + filters = process_request.filters if process_request.filters is not None else {} + if not isinstance(filters, dict): + logger.error(f"Filters is not a dict: {type(filters)} = {filters}") + filters = {} + + request_data = { + "action": "process_tickets", + "config_id": process_request.config_id, + "filters": filters + } + + context = { + "user_id": user_id, + "user_permissions": current_user.get("permissions", []) + } + + # Execute processing in background for large batches + if filters.get("limit", 10) > 5: + # Start background task + background_tasks.add_task( + _process_tickets_background, + zammad_module, + request_data, + context + ) + + return { + "message": "Processing started in background", + "status": "started" + } + else: + # Process immediately for small batches + result = await zammad_module.execute_with_interceptors(request_data, context) + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error starting ticket processing: {str(e)}") + + +@router.post("/tickets/{ticket_id}/process") +async def process_single_ticket( + ticket_id: int, + process_request: ProcessSingleTicketRequest, + current_user: User = Depends(get_current_user) +): + """Process a single ticket for summarization""" + try: + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + request_data = { + "action": "process_single_ticket", + "ticket_id": ticket_id, + "config_id": process_request.config_id + } + + context = { + "user_id": user_id, + "user_permissions": current_user.get("permissions", []) + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error processing ticket: {str(e)}") + + +@router.get("/tickets/{ticket_id}/summary") +async def get_ticket_summary( + ticket_id: int, + current_user: User = Depends(get_current_user) +): + """Get the AI summary for a specific ticket""" + try: + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + request_data = { + "action": "get_ticket_summary", + "ticket_id": ticket_id + } + + context = { + "user_id": user_id, + "user_permissions": current_user.get("permissions", []) + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error getting ticket summary: {str(e)}") + + +@router.get("/tickets") +async def get_processed_tickets( + status: Optional[str] = None, + limit: int = 20, + offset: int = 0, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db) +): + """Get list of processed tickets""" + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + try: + # Build query + query = select(ZammadTicket).where(ZammadTicket.processed_by_user_id == user_id) + + if status: + query = query.where(ZammadTicket.processing_status == status) + + query = query.order_by(desc(ZammadTicket.processed_at)) + query = query.offset(offset).limit(limit) + + # Execute query + result = await db.execute(query) + tickets = [ticket.to_dict() for ticket in result.scalars()] + + # Get total count + count_query = select(ZammadTicket).where(ZammadTicket.processed_by_user_id == user_id) + if status: + count_query = count_query.where(ZammadTicket.processing_status == status) + + total_result = await db.execute(count_query) + total_count = len(list(total_result.scalars())) + + return { + "tickets": tickets, + "total": total_count, + "limit": limit, + "offset": offset + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error fetching tickets: {str(e)}") + + +@router.get("/status") +async def get_module_status( + current_user: User = Depends(get_current_user) +): + """Get Zammad module status and statistics""" + try: + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + request_data = { + "action": "get_status" + } + + context = { + "user_id": user_id, + "user_permissions": current_user.get("permissions", []) + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error getting module status: {str(e)}") + + +@router.get("/processing-logs") +async def get_processing_logs( + limit: int = 10, + offset: int = 0, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db) +): + """Get processing logs for the current user""" + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + try: + # Get processing logs + query = ( + select(ZammadProcessingLog) + .where(ZammadProcessingLog.initiated_by_user_id == user_id) + .order_by(desc(ZammadProcessingLog.started_at)) + .offset(offset) + .limit(limit) + ) + + result = await db.execute(query) + logs = [log.to_dict() for log in result.scalars()] + + # Get total count + count_query = select(ZammadProcessingLog).where( + ZammadProcessingLog.initiated_by_user_id == user_id + ) + total_result = await db.execute(count_query) + total_count = len(list(total_result.scalars())) + + return { + "logs": logs, + "total": total_count, + "limit": limit, + "offset": offset + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error fetching processing logs: {str(e)}") + + +@router.get("/chatbots") +async def get_available_chatbots( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db) +): + """Get list of chatbots available for Zammad integration""" + user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id + + try: + # Get user's active chatbots + stmt = ( + select(ChatbotInstance) + .where(ChatbotInstance.created_by == str(user_id)) + .where(ChatbotInstance.is_active == True) + .order_by(ChatbotInstance.name) + ) + + result = await db.execute(stmt) + chatbots = [] + + for chatbot in result.scalars(): + # Extract chatbot_type from config JSON or provide default + config = chatbot.config or {} + chatbot_type = config.get('chatbot_type', 'general') + model = config.get('model', 'Unknown') + + chatbots.append({ + "id": chatbot.id, + "name": chatbot.name, + "chatbot_type": chatbot_type, + "model": model, + "description": chatbot.description or '' + }) + + return { + "chatbots": chatbots, + "count": len(chatbots) + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error fetching chatbots: {str(e)}") + + +async def _process_tickets_background(zammad_module, request_data: Dict[str, Any], context: Dict[str, Any]): + """Background task for processing tickets""" + try: + import logging + logger = logging.getLogger(__name__) + logger.info(f"Starting background ticket processing with request_data: {request_data}") + logger.info(f"Context: {context}") + await zammad_module.execute_with_interceptors(request_data, context) + except Exception as e: + # Log error but don't raise - this is a background task + import logging + import traceback + logger = logging.getLogger(__name__) + logger.error(f"Background ticket processing failed: {e}") + logger.error(f"Full traceback: {traceback.format_exc()}") + + +# API key authentication endpoints (for programmatic access) + +@router.post("/api-key/process", dependencies=[Depends(get_api_key_auth)]) +async def api_process_tickets( + process_request: ProcessTicketsRequest, + api_key_context: Dict = Depends(get_api_key_auth) +): + """Process tickets using API key authentication""" + try: + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + user_id = api_key_context["user_id"] + + request_data = { + "action": "process_tickets", + "config_id": process_request.config_id, + "filters": process_request.filters + } + + context = { + "user_id": user_id, + "api_key_id": api_key_context["api_key_id"], + "user_permissions": ["modules:*"] # API keys get full module access + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error processing tickets: {str(e)}") + + +@router.get("/api-key/status", dependencies=[Depends(get_api_key_auth)]) +async def api_get_status( + api_key_context: Dict = Depends(get_api_key_auth) +): + """Get module status using API key authentication""" + try: + zammad_module = module_manager.get_module("zammad") + if not zammad_module: + raise HTTPException(status_code=503, detail="Zammad module not available") + + user_id = api_key_context["user_id"] + + request_data = { + "action": "get_status" + } + + context = { + "user_id": user_id, + "api_key_id": api_key_context["api_key_id"], + "user_permissions": ["modules:*"] # API keys get full module access + } + + result = await zammad_module.execute_with_interceptors(request_data, context) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error getting status: {str(e)}") \ No newline at end of file diff --git a/backend/app/core/cache.py b/backend/app/core/cache.py new file mode 100644 index 0000000..4365c29 --- /dev/null +++ b/backend/app/core/cache.py @@ -0,0 +1,322 @@ +""" +Core Cache Service - Redis-based caching infrastructure +Consolidates all caching functionality into core system infrastructure +""" + +import asyncio +import json +import logging +from typing import Any, Dict, Optional, Union +from datetime import datetime, timedelta +import redis.asyncio as redis +from redis.asyncio import Redis, ConnectionPool +from contextlib import asynccontextmanager + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class CoreCacheService: + """Core Redis-based cache service for system-wide caching""" + + def __init__(self): + self.redis_pool: Optional[ConnectionPool] = None + self.redis_client: Optional[Redis] = None + self.enabled = False + self.stats = { + "hits": 0, + "misses": 0, + "errors": 0, + "total_requests": 0 + } + + async def initialize(self): + """Initialize the core cache service with connection pool""" + try: + # Create Redis connection pool for better resource management + redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0') + + self.redis_pool = ConnectionPool.from_url( + redis_url, + encoding="utf-8", + decode_responses=True, + socket_connect_timeout=5, + socket_timeout=5, + retry_on_timeout=True, + max_connections=20, # Shared pool for all cache operations + health_check_interval=30 + ) + + self.redis_client = Redis(connection_pool=self.redis_pool) + + # Test connection + await self.redis_client.ping() + + self.enabled = True + logger.info("Core cache service initialized with Redis connection pool") + + except Exception as e: + logger.error(f"Failed to initialize core cache service: {e}") + self.enabled = False + raise + + async def cleanup(self): + """Cleanup cache resources""" + if self.redis_client: + await self.redis_client.close() + self.redis_client = None + + if self.redis_pool: + await self.redis_pool.disconnect() + self.redis_pool = None + + self.enabled = False + logger.info("Core cache service cleaned up") + + def _get_cache_key(self, key: str, prefix: str = "core") -> str: + """Generate cache key with prefix""" + return f"{prefix}:{key}" + + async def get(self, key: str, default: Any = None, prefix: str = "core") -> Any: + """Get value from cache""" + if not self.enabled: + return default + + try: + cache_key = self._get_cache_key(key, prefix) + value = await self.redis_client.get(cache_key) + + if value is None: + self.stats["misses"] += 1 + return default + + self.stats["hits"] += 1 + self.stats["total_requests"] += 1 + + # Try to deserialize JSON + try: + return json.loads(value) + except json.JSONDecodeError: + return value + + except Exception as e: + logger.error(f"Cache get error for key {key}: {e}") + self.stats["errors"] += 1 + return default + + async def set(self, key: str, value: Any, ttl: Optional[int] = None, prefix: str = "core") -> bool: + """Set value in cache""" + if not self.enabled: + return False + + try: + cache_key = self._get_cache_key(key, prefix) + ttl = ttl or 3600 # Default 1 hour TTL + + # Serialize complex objects as JSON + if isinstance(value, (dict, list, tuple)): + value = json.dumps(value) + + await self.redis_client.setex(cache_key, ttl, value) + return True + + except Exception as e: + logger.error(f"Cache set error for key {key}: {e}") + self.stats["errors"] += 1 + return False + + async def delete(self, key: str, prefix: str = "core") -> bool: + """Delete key from cache""" + if not self.enabled: + return False + + try: + cache_key = self._get_cache_key(key, prefix) + result = await self.redis_client.delete(cache_key) + return result > 0 + + except Exception as e: + logger.error(f"Cache delete error for key {key}: {e}") + self.stats["errors"] += 1 + return False + + async def exists(self, key: str, prefix: str = "core") -> bool: + """Check if key exists in cache""" + if not self.enabled: + return False + + try: + cache_key = self._get_cache_key(key, prefix) + return await self.redis_client.exists(cache_key) > 0 + + except Exception as e: + logger.error(f"Cache exists error for key {key}: {e}") + self.stats["errors"] += 1 + return False + + async def clear_pattern(self, pattern: str, prefix: str = "core") -> int: + """Clear keys matching pattern""" + if not self.enabled: + return 0 + + try: + cache_pattern = self._get_cache_key(pattern, prefix) + keys = await self.redis_client.keys(cache_pattern) + if keys: + return await self.redis_client.delete(*keys) + return 0 + + except Exception as e: + logger.error(f"Cache clear pattern error for pattern {pattern}: {e}") + self.stats["errors"] += 1 + return 0 + + async def increment(self, key: str, amount: int = 1, ttl: Optional[int] = None, prefix: str = "core") -> int: + """Increment counter with optional TTL""" + if not self.enabled: + return 0 + + try: + cache_key = self._get_cache_key(key, prefix) + + # Use pipeline for atomic increment + expire + async with self.redis_client.pipeline() as pipe: + await pipe.incr(cache_key, amount) + if ttl: + await pipe.expire(cache_key, ttl) + results = await pipe.execute() + return results[0] + + except Exception as e: + logger.error(f"Cache increment error for key {key}: {e}") + self.stats["errors"] += 1 + return 0 + + async def get_stats(self) -> Dict[str, Any]: + """Get comprehensive cache statistics""" + stats = self.stats.copy() + + if self.enabled: + try: + info = await self.redis_client.info() + stats.update({ + "redis_memory_used": info.get("used_memory_human", "N/A"), + "redis_connected_clients": info.get("connected_clients", 0), + "redis_total_commands": info.get("total_commands_processed", 0), + "redis_keyspace_hits": info.get("keyspace_hits", 0), + "redis_keyspace_misses": info.get("keyspace_misses", 0), + "connection_pool_size": self.redis_pool.connection_pool_size if self.redis_pool else 0, + "hit_rate": round( + (stats["hits"] / stats["total_requests"]) * 100, 2 + ) if stats["total_requests"] > 0 else 0, + "enabled": True + }) + except Exception as e: + logger.error(f"Error getting Redis stats: {e}") + stats["enabled"] = False + else: + stats["enabled"] = False + + return stats + + @asynccontextmanager + async def pipeline(self): + """Context manager for Redis pipeline operations""" + if not self.enabled: + yield None + return + + async with self.redis_client.pipeline() as pipe: + yield pipe + + # Specialized caching methods for common use cases + + async def cache_api_key(self, key_prefix: str, api_key_data: Dict[str, Any], ttl: int = 300) -> bool: + """Cache API key data for authentication""" + return await self.set(key_prefix, api_key_data, ttl, prefix="auth") + + async def get_cached_api_key(self, key_prefix: str) -> Optional[Dict[str, Any]]: + """Get cached API key data""" + return await self.get(key_prefix, prefix="auth") + + async def invalidate_api_key(self, key_prefix: str) -> bool: + """Invalidate cached API key""" + return await self.delete(key_prefix, prefix="auth") + + async def cache_verification_result(self, api_key: str, key_prefix: str, key_hash: str, is_valid: bool, ttl: int = 300) -> bool: + """Cache API key verification result to avoid expensive bcrypt operations""" + verification_data = { + "key_hash": key_hash, + "is_valid": is_valid, + "timestamp": datetime.utcnow().isoformat() + } + return await self.set(f"verify:{key_prefix}", verification_data, ttl, prefix="auth") + + async def get_cached_verification(self, key_prefix: str) -> Optional[Dict[str, Any]]: + """Get cached verification result""" + return await self.get(f"verify:{key_prefix}", prefix="auth") + + async def cache_rate_limit(self, identifier: str, window_seconds: int, limit: int, current_count: int = 1) -> Dict[str, Any]: + """Cache and track rate limit state""" + key = f"rate_limit:{identifier}:{window_seconds}" + + try: + # Use atomic increment with expiry + count = await self.increment(key, current_count, window_seconds, prefix="rate") + + remaining = max(0, limit - count) + reset_time = int((datetime.utcnow() + timedelta(seconds=window_seconds)).timestamp()) + + return { + "count": count, + "limit": limit, + "remaining": remaining, + "reset_time": reset_time, + "exceeded": count > limit + } + except Exception as e: + logger.error(f"Rate limit cache error: {e}") + # Return permissive defaults on cache failure + return { + "count": 0, + "limit": limit, + "remaining": limit, + "reset_time": int((datetime.utcnow() + timedelta(seconds=window_seconds)).timestamp()), + "exceeded": False + } + + +# Global core cache service instance +core_cache = CoreCacheService() + + +# Convenience functions for backward compatibility and ease of use +async def get(key: str, default: Any = None, prefix: str = "core") -> Any: + """Get value from core cache""" + return await core_cache.get(key, default, prefix) + + +async def set(key: str, value: Any, ttl: Optional[int] = None, prefix: str = "core") -> bool: + """Set value in core cache""" + return await core_cache.set(key, value, ttl, prefix) + + +async def delete(key: str, prefix: str = "core") -> bool: + """Delete key from core cache""" + return await core_cache.delete(key, prefix) + + +async def exists(key: str, prefix: str = "core") -> bool: + """Check if key exists in core cache""" + return await core_cache.exists(key, prefix) + + +async def clear_pattern(pattern: str, prefix: str = "core") -> int: + """Clear keys matching pattern from core cache""" + return await core_cache.clear_pattern(pattern, prefix) + + +async def get_stats() -> Dict[str, Any]: + """Get core cache statistics""" + return await core_cache.get_stats() \ No newline at end of file diff --git a/backend/app/db/database.py b/backend/app/db/database.py index ce95169..6f592c6 100644 --- a/backend/app/db/database.py +++ b/backend/app/db/database.py @@ -103,8 +103,8 @@ async def init_db(): except ImportError: logger.warning("Module model not available yet") - # Create all tables - await conn.run_sync(Base.metadata.create_all) + # Tables are now created via migration container - no need to create here + # await conn.run_sync(Base.metadata.create_all) # DISABLED - migrations handle this # Create default admin user if no admin exists await create_default_admin() diff --git a/backend/app/main.py b/backend/app/main.py index 72aa7b5..ec4ddea 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -38,6 +38,14 @@ async def lifespan(app: FastAPI): """ logger.info("Starting Enclava platform...") + # Initialize core cache service (before database to provide caching for auth) + from app.core.cache import core_cache + try: + await core_cache.initialize() + logger.info("Core cache service initialized successfully") + except Exception as e: + logger.warning(f"Core cache service initialization failed: {e}") + # Initialize database await init_db() @@ -70,6 +78,10 @@ async def lifespan(app: FastAPI): # Cleanup logger.info("Shutting down platform...") + # Close core cache service + from app.core.cache import core_cache + await core_cache.cleanup() + # Close Redis connection for cached API key service from app.services.cached_api_key import cached_api_key_service await cached_api_key_service.close() diff --git a/backend/app/services/cached_api_key.py b/backend/app/services/cached_api_key.py index 3b7cdf5..0f45d8e 100644 --- a/backend/app/services/cached_api_key.py +++ b/backend/app/services/cached_api_key.py @@ -1,5 +1,5 @@ """ -Cached API Key Service +Cached API Key Service - Refactored to use Core Cache Infrastructure High-performance Redis-based API key caching to reduce authentication overhead from ~60ms to ~5ms by avoiding expensive bcrypt operations """ @@ -12,417 +12,236 @@ from sqlalchemy import select from sqlalchemy.orm import joinedload from sqlalchemy.ext.asyncio import AsyncSession +from app.core.cache import core_cache from app.core.config import settings from app.core.security import verify_api_key from app.models.api_key import APIKey from app.models.user import User -# Check Redis availability at runtime, not import time -aioredis = None -REDIS_AVAILABLE = False - -def _import_aioredis(): - """Import aioredis at runtime""" - global aioredis, REDIS_AVAILABLE - if aioredis is None: - try: - import aioredis as _aioredis - aioredis = _aioredis - REDIS_AVAILABLE = True - return True - except ImportError as e: - REDIS_AVAILABLE = False - return False - except Exception as e: - # Handle the Python 3.11 + aioredis 2.0.1 compatibility issue - REDIS_AVAILABLE = False - return False - return REDIS_AVAILABLE - logger = logging.getLogger(__name__) class CachedAPIKeyService: - """Redis-backed API key caching service for performance optimization with fallback to optimized database queries""" + """Core cache-backed API key caching service for performance optimization""" def __init__(self): - self.redis = None self.cache_ttl = 300 # 5 minutes cache TTL self.verification_cache_ttl = 3600 # 1 hour for verification results - self.redis_enabled = _import_aioredis() - - if not self.redis_enabled: - logger.warning("Redis not available, falling back to optimized database queries only") - - async def get_redis(self): - """Get Redis connection, create if doesn't exist""" - if not self.redis_enabled or not REDIS_AVAILABLE: - return None - - if not self.redis and aioredis: - try: - self.redis = aioredis.from_url( - settings.REDIS_URL, - encoding="utf-8", - decode_responses=True, - socket_connect_timeout=5, - socket_timeout=5, - retry_on_timeout=True, - health_check_interval=30 - ) - # Test the connection - await self.redis.ping() - logger.info("Redis connection established for API key caching") - except Exception as e: - logger.warning(f"Redis connection failed, disabling cache: {e}") - self.redis_enabled = False - self.redis = None - - return self.redis + logger.info("Cached API key service initialized with core cache backend") async def close(self): - """Close Redis connection""" - if self.redis and self.redis_enabled: - try: - await self.redis.close() - except Exception as e: - logger.warning(f"Error closing Redis connection: {e}") - - def _get_cache_key(self, key_prefix: str) -> str: - """Generate cache key for API key data""" - return f"api_key:data:{key_prefix}" - - def _get_verification_cache_key(self, key_prefix: str, key_suffix_hash: str) -> str: - """Generate cache key for API key verification results""" - return f"api_key:verified:{key_prefix}:{key_suffix_hash}" - - def _get_last_used_cache_key(self, api_key_id: int) -> str: - """Generate cache key for last used timestamp""" - return f"api_key:last_used:{api_key_id}" - - async def _serialize_api_key_data(self, api_key: APIKey, user: User) -> str: - """Serialize API key and user data for caching""" - data = { - # API Key data - "api_key_id": api_key.id, - "api_key_name": api_key.name, - "key_hash": api_key.key_hash, - "key_prefix": api_key.key_prefix, - "is_active": api_key.is_active, - "permissions": api_key.permissions, - "scopes": api_key.scopes, - "rate_limit_per_minute": api_key.rate_limit_per_minute, - "rate_limit_per_hour": api_key.rate_limit_per_hour, - "rate_limit_per_day": api_key.rate_limit_per_day, - "allowed_models": api_key.allowed_models, - "allowed_endpoints": api_key.allowed_endpoints, - "allowed_ips": api_key.allowed_ips, - "is_unlimited": api_key.is_unlimited, - "budget_limit_cents": api_key.budget_limit_cents, - "budget_type": api_key.budget_type, - "expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None, - "total_requests": api_key.total_requests, - "total_tokens": api_key.total_tokens, - "total_cost": api_key.total_cost, - - # User data - "user_id": user.id, - "user_email": user.email, - "user_role": user.role, - "user_is_active": user.is_active, - - # Cache metadata - "cached_at": datetime.utcnow().isoformat() - } - return json.dumps(data, default=str) - - async def _deserialize_api_key_data(self, cached_data: str) -> Optional[Dict[str, Any]]: - """Deserialize cached API key data""" - try: - data = json.loads(cached_data) - - # Check if cached data is still valid - if data.get("expires_at"): - expires_at = datetime.fromisoformat(data["expires_at"]) - if datetime.utcnow() > expires_at: - return None - - # Reconstruct the context object expected by the rest of the system - context = { - "user_id": data["user_id"], - "user_email": data["user_email"], - "user_role": data["user_role"], - "api_key_id": data["api_key_id"], - "api_key_name": data["api_key_name"], - "permissions": data["permissions"], - "scopes": data["scopes"], - "rate_limits": { - "per_minute": data["rate_limit_per_minute"], - "per_hour": data["rate_limit_per_hour"], - "per_day": data["rate_limit_per_day"] - }, - # Create minimal API key object with necessary attributes - "api_key": type("APIKey", (), { - "id": data["api_key_id"], - "name": data["api_key_name"], - "key_prefix": data["key_prefix"], - "is_active": data["is_active"], - "permissions": data["permissions"], - "scopes": data["scopes"], - "allowed_models": data["allowed_models"], - "allowed_endpoints": data["allowed_endpoints"], - "allowed_ips": data["allowed_ips"], - "is_unlimited": data["is_unlimited"], - "budget_limit_cents": data["budget_limit_cents"], - "budget_type": data["budget_type"], - "total_requests": data["total_requests"], - "total_tokens": data["total_tokens"], - "total_cost": data["total_cost"], - "expires_at": datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None, - "can_access_model": lambda model: not data["allowed_models"] or model in data["allowed_models"], - "can_access_endpoint": lambda endpoint: not data["allowed_endpoints"] or endpoint in data["allowed_endpoints"], - "can_access_from_ip": lambda ip: not data["allowed_ips"] or ip in data["allowed_ips"], - "has_scope": lambda scope: scope in data["scopes"], - "is_valid": lambda: data["is_active"] and (not data.get("expires_at") or datetime.utcnow() <= datetime.fromisoformat(data["expires_at"])), - "update_usage": lambda tokens, cost: None # Handled separately for cache consistency - })(), - # Create minimal user object - "user": type("User", (), { - "id": data["user_id"], - "email": data["user_email"], - "role": data["user_role"], - "is_active": data["user_is_active"] - })() - } - - return context - - except Exception as e: - logger.warning(f"Failed to deserialize cached API key data: {e}") - return None + """Close method for compatibility - core cache handles its own lifecycle""" + logger.info("Cached API key service close called - core cache handles lifecycle") async def get_cached_api_key(self, key_prefix: str, db: AsyncSession) -> Optional[Dict[str, Any]]: - """Get API key data from cache or database with optimized queries""" + """ + Get API key data from cache or database + Returns: Dictionary with api_key, user, and api_key_id + """ try: - redis = await self.get_redis() - - # If Redis is available, try cache first - if redis: - cache_key = self._get_cache_key(key_prefix) + # Try cache first + cached_data = await core_cache.get_cached_api_key(key_prefix) + if cached_data: + logger.debug(f"API key cache hit for prefix: {key_prefix}") - # Try to get from cache first - cached_data = await redis.get(cache_key) - if cached_data: - logger.debug(f"API key cache hit for {key_prefix}") - context = await self._deserialize_api_key_data(cached_data) - if context: - return context - else: - # Invalid cached data, remove it - await redis.delete(cache_key) + # Recreate APIKey object from cached data + api_key_data = cached_data.get("api_key_data", {}) + user_data = cached_data.get("user_data", {}) - logger.debug(f"API key cache miss for {key_prefix}, fetching from database") - else: - logger.debug(f"Redis not available, fetching API key {key_prefix} from database with optimized query") + # Create APIKey instance + api_key = APIKey(**api_key_data) + + # Create User instance + user = User(**user_data) + + return { + "api_key": api_key, + "user": user, + "api_key_id": api_key_data.get("id") + } - # Cache miss or Redis not available - fetch from database with optimized query - context = await self._fetch_from_database(key_prefix, db) + logger.debug(f"API key cache miss for prefix: {key_prefix}, fetching from database") - # If Redis is available and we have data, cache it - if context and redis: - try: - api_key = context["api_key"] - user = context["user"] - - # Reconstruct full objects for serialization - full_api_key = await self._get_full_api_key_from_db(key_prefix, db) - if full_api_key: - cached_data = await self._serialize_api_key_data(full_api_key, user) - await redis.setex(cache_key, self.cache_ttl, cached_data) - logger.debug(f"Cached API key data for {key_prefix}") - except Exception as cache_error: - logger.warning(f"Failed to cache API key data: {cache_error}") - # Don't fail the request if caching fails + # Cache miss - fetch from database with optimized query + stmt = ( + select(APIKey, User) + .join(User, APIKey.user_id == User.id) + .options( + joinedload(APIKey.user), + joinedload(User.api_keys) + ) + .where(APIKey.key_prefix == key_prefix) + .where(APIKey.is_active == True) + ) - return context + result = await db.execute(stmt) + api_key_user = result.first() + + if not api_key_user: + logger.debug(f"API key not found in database for prefix: {key_prefix}") + return None + + api_key, user = api_key_user + + # Cache for future requests + await self._cache_api_key_data(key_prefix, api_key, user) + + return { + "api_key": api_key, + "user": user, + "api_key_id": api_key.id + } except Exception as e: - logger.error(f"Error in cached API key lookup for {key_prefix}: {e}") - # Fallback to database - return await self._fetch_from_database(key_prefix, db) + logger.error(f"Error retrieving API key for prefix {key_prefix}: {e}") + return None - async def _get_full_api_key_from_db(self, key_prefix: str, db: AsyncSession) -> Optional[APIKey]: - """Helper to get full API key object from database""" - stmt = select(APIKey).where(APIKey.key_prefix == key_prefix) - result = await db.execute(stmt) - return result.scalar_one_or_none() - - async def _fetch_from_database(self, key_prefix: str, db: AsyncSession) -> Optional[Dict[str, Any]]: - """Fetch API key and user data from database with optimized query""" + async def _cache_api_key_data(self, key_prefix: str, api_key: APIKey, user: User): + """Cache API key and user data""" try: - # Optimized query with joinedload to eliminate N+1 query problem - stmt = select(APIKey).options( - joinedload(APIKey.user) - ).where(APIKey.key_prefix == key_prefix) + # Serialize data for caching + cache_data = { + "api_key_data": { + "id": api_key.id, + "name": api_key.name, + "key_hash": api_key.key_hash, + "key_prefix": api_key.key_prefix, + "user_id": api_key.user_id, + "is_active": api_key.is_active, + "permissions": api_key.permissions, + "scopes": api_key.scopes, + "rate_limit_per_minute": api_key.rate_limit_per_minute, + "rate_limit_per_hour": api_key.rate_limit_per_hour, + "rate_limit_per_day": api_key.rate_limit_per_day, + "allowed_models": api_key.allowed_models, + "allowed_endpoints": api_key.allowed_endpoints, + "allowed_ips": api_key.allowed_ips, + "description": api_key.description, + "tags": api_key.tags, + "created_at": api_key.created_at.isoformat() if api_key.created_at else None, + "updated_at": api_key.updated_at.isoformat() if api_key.updated_at else None, + "last_used_at": api_key.last_used_at.isoformat() if api_key.last_used_at else None, + "expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None, + "total_requests": api_key.total_requests, + "total_tokens": api_key.total_tokens, + "total_cost": api_key.total_cost, + "is_unlimited": api_key.is_unlimited, + "budget_limit_cents": api_key.budget_limit_cents, + "budget_type": api_key.budget_type, + "allowed_chatbots": api_key.allowed_chatbots + }, + "user_data": { + "id": user.id, + "email": user.email, + "username": user.username, + "is_active": user.is_active, + "is_superuser": user.is_superuser, + "role": user.role, + "created_at": user.created_at.isoformat() if user.created_at else None, + "updated_at": user.updated_at.isoformat() if user.updated_at else None, + "last_login": user.last_login.isoformat() if user.last_login else None + }, + "cached_at": datetime.utcnow().isoformat() + } + await core_cache.cache_api_key(key_prefix, cache_data, self.cache_ttl) + logger.debug(f"Cached API key data for prefix: {key_prefix}") + + except Exception as e: + logger.error(f"Error caching API key data for prefix {key_prefix}: {e}") + + async def verify_api_key_cached(self, api_key: str, key_prefix: str) -> Optional[bool]: + """ + Verify API key using cached hash to avoid expensive bcrypt operations + Returns: True if verified, False if invalid, None if not cached + """ + try: + # Check verification cache + cached_verification = await core_cache.get_cached_verification(key_prefix) + + if cached_verification: + # Check if cache is still valid (within TTL) + cached_timestamp = datetime.fromisoformat(cached_verification["timestamp"]) + if datetime.utcnow() - cached_timestamp < timedelta(seconds=self.verification_cache_ttl): + logger.debug(f"API key verification cache hit for prefix: {key_prefix}") + return cached_verification.get("is_valid", False) + + return None # Not cached or expired + + except Exception as e: + logger.error(f"Error checking verification cache for prefix {key_prefix}: {e}") + return None + + async def cache_verification_result(self, api_key: str, key_prefix: str, key_hash: str, is_valid: bool): + """Cache API key verification result to avoid expensive bcrypt operations""" + try: + await core_cache.cache_verification_result(api_key, key_prefix, key_hash, is_valid, self.verification_cache_ttl) + logger.debug(f"Cached verification result for prefix: {key_prefix}") + + except Exception as e: + logger.error(f"Error caching verification result for prefix {key_prefix}: {e}") + + async def invalidate_api_key_cache(self, key_prefix: str): + """Invalidate cached API key data""" + try: + await core_cache.invalidate_api_key(key_prefix) + + # Also invalidate verification cache + verification_keys = await core_cache.clear_pattern(f"verify:{key_prefix}*", prefix="auth") + + logger.debug(f"Invalidated cache for API key prefix: {key_prefix}") + + except Exception as e: + logger.error(f"Error invalidating cache for prefix {key_prefix}: {e}") + + async def update_last_used(self, api_key_id: int, db: AsyncSession): + """Update last used timestamp asynchronously for performance""" + try: + # Use core cache to track update requests to avoid database spam + cache_key = f"last_used_update:{api_key_id}" + + # Check if we recently updated (within 5 minutes) + last_update = await core_cache.get(cache_key, prefix="perf") + if last_update: + return # Skip update if recent + + # Update database + stmt = ( + select(APIKey) + .where(APIKey.id == api_key_id) + ) result = await db.execute(stmt) api_key = result.scalar_one_or_none() - if not api_key: - logger.warning(f"API key not found: {key_prefix}") - return None - - user = api_key.user - if not user or not user.is_active: - logger.warning(f"User not found or inactive for API key: {key_prefix}") - return None - - # Return the same structure as the original service + if api_key: + api_key.last_used_at = datetime.utcnow() + await db.commit() + + # Cache that we updated to prevent spam + await core_cache.set(cache_key, datetime.utcnow().isoformat(), ttl=300, prefix="perf") + + logger.debug(f"Updated last_used_at for API key {api_key_id}") + + except Exception as e: + logger.error(f"Error updating last_used for API key {api_key_id}: {e}") + + async def get_cache_stats(self) -> Dict[str, Any]: + """Get cache performance statistics""" + try: + core_stats = await core_cache.get_stats() return { - "user_id": user.id, - "user_email": user.email, - "user_role": user.role, - "api_key_id": api_key.id, - "api_key_name": api_key.name, - "api_key": api_key, - "user": user, - "permissions": api_key.permissions, - "scopes": api_key.scopes, - "rate_limits": { - "per_minute": api_key.rate_limit_per_minute, - "per_hour": api_key.rate_limit_per_hour, - "per_day": api_key.rate_limit_per_day - } + "cache_backend": "core_cache", + "cache_enabled": core_stats.get("enabled", False), + "cache_stats": core_stats } - except Exception as e: - logger.error(f"Database error fetching API key {key_prefix}: {e}") - return None - - async def verify_api_key_cached(self, api_key: str, key_prefix: str) -> bool: - """Cache API key verification results to avoid repeated bcrypt operations""" - try: - redis = await self.get_redis() - - # If Redis is not available, skip caching - if not redis: - logger.debug(f"Redis not available, skipping verification cache for {key_prefix}") - return False # Caller should handle full verification - - # Create a hash of the key suffix for cache key (never store the actual key) - import hashlib - key_suffix = api_key[8:] if len(api_key) > 8 else api_key - key_suffix_hash = hashlib.sha256(key_suffix.encode()).hexdigest()[:16] - - verification_cache_key = self._get_verification_cache_key(key_prefix, key_suffix_hash) - - # Check verification cache - cached_result = await redis.get(verification_cache_key) - if cached_result: - logger.debug(f"API key verification cache hit for {key_prefix}") - return cached_result == "valid" - - # Need to do actual verification - get the hash from database - # This should be called only after we've confirmed the key exists - logger.debug(f"API key verification cache miss for {key_prefix}") - return False # Caller should handle full verification - - except Exception as e: - logger.warning(f"Error in verification cache for {key_prefix}: {e}") - return False - - async def cache_verification_result(self, api_key: str, key_prefix: str, key_hash: str, is_valid: bool): - """Cache the verification result to avoid future bcrypt operations""" - try: - # Only cache successful verifications and do actual verification - actual_valid = verify_api_key(api_key, key_hash) - if actual_valid != is_valid: - logger.warning(f"Verification mismatch for {key_prefix}") - return - - if actual_valid: - redis = await self.get_redis() - - # If Redis is not available, skip caching - if not redis: - logger.debug(f"Redis not available, skipping verification result cache for {key_prefix}") - return - - # Create a hash of the key suffix for cache key - import hashlib - key_suffix = api_key[8:] if len(api_key) > 8 else api_key - key_suffix_hash = hashlib.sha256(key_suffix.encode()).hexdigest()[:16] - - verification_cache_key = self._get_verification_cache_key(key_prefix, key_suffix_hash) - - # Cache successful verification - await redis.setex(verification_cache_key, self.verification_cache_ttl, "valid") - logger.debug(f"Cached verification result for {key_prefix}") - - except Exception as e: - logger.warning(f"Error caching verification result for {key_prefix}: {e}") - - async def invalidate_api_key_cache(self, key_prefix: str): - """Invalidate cached data for an API key""" - try: - redis = await self.get_redis() - - # If Redis is not available, skip invalidation - if not redis: - logger.debug(f"Redis not available, skipping cache invalidation for {key_prefix}") - return - - cache_key = self._get_cache_key(key_prefix) - await redis.delete(cache_key) - - # Also invalidate verification cache - get all verification keys for this prefix - pattern = f"api_key:verified:{key_prefix}:*" - keys = await redis.keys(pattern) - if keys: - await redis.delete(*keys) - - logger.debug(f"Invalidated cache for API key {key_prefix}") - - except Exception as e: - logger.warning(f"Error invalidating cache for {key_prefix}: {e}") - - async def update_last_used(self, api_key_id: int, db: AsyncSession): - """Update last used timestamp with write-through cache""" - try: - redis = await self.get_redis() - current_time = datetime.utcnow() - should_update = True - - # If Redis is available, check if we've updated recently (avoid too frequent DB writes) - if redis: - cache_key = self._get_last_used_cache_key(api_key_id) - last_update = await redis.get(cache_key) - if last_update: - last_update_time = datetime.fromisoformat(last_update) - if current_time - last_update_time < timedelta(minutes=1): - # Skip update if last update was less than 1 minute ago - should_update = False - - if should_update: - # Update database - stmt = select(APIKey).where(APIKey.id == api_key_id) - result = await db.execute(stmt) - api_key = result.scalar_one_or_none() - - if api_key: - api_key.last_used_at = current_time - await db.commit() - - # Update cache if Redis is available - if redis: - cache_key = self._get_last_used_cache_key(api_key_id) - await redis.setex(cache_key, 300, current_time.isoformat()) - - logger.debug(f"Updated last used timestamp for API key {api_key_id}") - - except Exception as e: - logger.warning(f"Error updating last used timestamp for API key {api_key_id}: {e}") + logger.error(f"Error getting cache stats: {e}") + return { + "cache_backend": "core_cache", + "cache_enabled": False, + "error": str(e) + } -# Global cached service instance +# Global instance cached_api_key_service = CachedAPIKeyService() \ No newline at end of file diff --git a/backend/app/services/module_manager.py b/backend/app/services/module_manager.py index 78cad1a..85e1523 100644 --- a/backend/app/services/module_manager.py +++ b/backend/app/services/module_manager.py @@ -111,8 +111,16 @@ class ModuleManager: # Load saved configurations await module_config_manager.load_saved_configs() + # Filter out core infrastructure that shouldn't be pluggable modules + EXCLUDED_MODULES = ["cache"] # Cache is now core infrastructure + # Convert manifests to ModuleConfig objects for name, manifest in discovered_manifests.items(): + # Skip modules that are now core infrastructure + if name in EXCLUDED_MODULES: + logger.info(f"Skipping module '{name}' - now integrated as core infrastructure") + continue + saved_config = module_config_manager.get_module_config(name) module_config = ModuleConfig( diff --git a/backend/configs/development/monitoring.json b/backend/configs/development/monitoring.json deleted file mode 100644 index 3ef0d5c..0000000 --- a/backend/configs/development/monitoring.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "interval": 30, - "alert_thresholds": { - "cpu_warning": 80, - "cpu_critical": 95, - "memory_warning": 85, - "memory_critical": 95 - }, - "retention_hours": 24 -} \ No newline at end of file diff --git a/backend/modules/cache/__init__.py b/backend/modules/cache/__init__.py deleted file mode 100644 index 44dd0dd..0000000 --- a/backend/modules/cache/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -Cache module for Confidential Empire platform -""" -from .main import CacheModule - -__all__ = ["CacheModule"] \ No newline at end of file diff --git a/backend/modules/cache/main.py b/backend/modules/cache/main.py deleted file mode 100644 index e74f6fa..0000000 --- a/backend/modules/cache/main.py +++ /dev/null @@ -1,281 +0,0 @@ -""" -Cache module implementation with Redis backend -""" -import asyncio -import json -import logging -from typing import Any, Dict, Optional, Union -from datetime import datetime, timedelta -import redis.asyncio as redis -from redis.asyncio import Redis -from contextlib import asynccontextmanager - -from app.core.config import settings -from app.core.logging import log_module_event - -logger = logging.getLogger(__name__) - - -class CacheModule: - """Redis-based cache module for request/response caching""" - - def __init__(self): - self.redis_client: Optional[Redis] = None - self.config: Dict[str, Any] = {} - self.enabled = False - self.stats = { - "hits": 0, - "misses": 0, - "errors": 0, - "total_requests": 0 - } - - async def initialize(self): - """Initialize the cache module""" - - try: - # Initialize Redis connection - redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0') - self.redis_client = redis.from_url( - redis_url, - encoding="utf-8", - decode_responses=True, - socket_connect_timeout=5, - socket_timeout=5, - retry_on_timeout=True - ) - - # Test connection - await self.redis_client.ping() - - self.enabled = True - log_module_event("cache", "initialized", { - "provider": self.config.get("provider", "redis"), - "ttl": self.config.get("ttl", 3600), - "max_size": self.config.get("max_size", 10000) - }) - - except Exception as e: - logger.error(f"Failed to initialize cache module: {e}") - log_module_event("cache", "initialization_failed", {"error": str(e)}) - self.enabled = False - raise - - async def cleanup(self): - """Cleanup cache resources""" - if self.redis_client: - await self.redis_client.close() - self.redis_client = None - - self.enabled = False - log_module_event("cache", "cleanup", {"success": True}) - - def _get_cache_key(self, key: str, prefix: str = "ce") -> str: - """Generate cache key with prefix""" - return f"{prefix}:{key}" - - async def get(self, key: str, default: Any = None) -> Any: - """Get value from cache""" - if not self.enabled: - return default - - try: - cache_key = self._get_cache_key(key) - value = await self.redis_client.get(cache_key) - - if value is None: - self.stats["misses"] += 1 - return default - - self.stats["hits"] += 1 - self.stats["total_requests"] += 1 - - # Try to deserialize JSON - try: - return json.loads(value) - except json.JSONDecodeError: - return value - - except Exception as e: - logger.error(f"Cache get error: {e}") - self.stats["errors"] += 1 - return default - - async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: - """Set value in cache""" - if not self.enabled: - return False - - try: - cache_key = self._get_cache_key(key) - ttl = ttl or self.config.get("ttl", 3600) - - # Serialize complex objects as JSON - if isinstance(value, (dict, list, tuple)): - value = json.dumps(value) - - await self.redis_client.setex(cache_key, ttl, value) - return True - - except Exception as e: - logger.error(f"Cache set error: {e}") - self.stats["errors"] += 1 - return False - - async def delete(self, key: str) -> bool: - """Delete key from cache""" - if not self.enabled: - return False - - try: - cache_key = self._get_cache_key(key) - result = await self.redis_client.delete(cache_key) - return result > 0 - - except Exception as e: - logger.error(f"Cache delete error: {e}") - self.stats["errors"] += 1 - return False - - async def exists(self, key: str) -> bool: - """Check if key exists in cache""" - if not self.enabled: - return False - - try: - cache_key = self._get_cache_key(key) - return await self.redis_client.exists(cache_key) > 0 - - except Exception as e: - logger.error(f"Cache exists error: {e}") - self.stats["errors"] += 1 - return False - - async def clear_pattern(self, pattern: str) -> int: - """Clear keys matching pattern""" - if not self.enabled: - return 0 - - try: - cache_pattern = self._get_cache_key(pattern) - keys = await self.redis_client.keys(cache_pattern) - if keys: - return await self.redis_client.delete(*keys) - return 0 - - except Exception as e: - logger.error(f"Cache clear pattern error: {e}") - self.stats["errors"] += 1 - return 0 - - async def clear_all(self) -> bool: - """Clear all cache entries""" - if not self.enabled: - return False - - try: - await self.redis_client.flushdb() - return True - - except Exception as e: - logger.error(f"Cache clear all error: {e}") - self.stats["errors"] += 1 - return False - - async def get_stats(self) -> Dict[str, Any]: - """Get cache statistics""" - stats = self.stats.copy() - - if self.enabled: - try: - info = await self.redis_client.info() - stats.update({ - "redis_memory_used": info.get("used_memory_human", "N/A"), - "redis_connected_clients": info.get("connected_clients", 0), - "redis_total_commands": info.get("total_commands_processed", 0), - "hit_rate": round( - (stats["hits"] / stats["total_requests"]) * 100, 2 - ) if stats["total_requests"] > 0 else 0 - }) - except Exception as e: - logger.error(f"Error getting Redis stats: {e}") - - return stats - - async def pre_request_interceptor(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Pre-request interceptor for caching""" - if not self.enabled: - return context - - request = context.get("request") - if not request: - return context - - # Only cache GET requests - if request.method != "GET": - return context - - # Generate cache key from request - cache_key = f"request:{request.method}:{request.url.path}" - if request.query_params: - cache_key += f":{hash(str(request.query_params))}" - - # Check if cached response exists - cached_response = await self.get(cache_key) - if cached_response: - log_module_event("cache", "hit", {"cache_key": cache_key}) - context["cached_response"] = cached_response - context["cache_hit"] = True - else: - log_module_event("cache", "miss", {"cache_key": cache_key}) - context["cache_key"] = cache_key - context["cache_hit"] = False - - return context - - async def post_response_interceptor(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Post-response interceptor for caching""" - if not self.enabled: - return context - - # Skip if this was a cache hit - if context.get("cache_hit"): - return context - - cache_key = context.get("cache_key") - response = context.get("response") - - if cache_key and response and response.status_code == 200: - # Cache successful responses - cache_data = { - "status_code": response.status_code, - "headers": dict(response.headers), - "body": response.body.decode() if hasattr(response, 'body') else None, - "timestamp": datetime.utcnow().isoformat() - } - - await self.set(cache_key, cache_data) - log_module_event("cache", "stored", {"cache_key": cache_key}) - - return context - -# Global cache instance -cache_module = CacheModule() - -# Module interface functions -async def initialize(): - """Initialize cache module""" - await cache_module.initialize() - -async def cleanup(): - """Cleanup cache module""" - await cache_module.cleanup() - -async def pre_request_interceptor(context: Dict[str, Any]) -> Dict[str, Any]: - """Pre-request interceptor""" - return await cache_module.pre_request_interceptor(context) - -async def post_response_interceptor(context: Dict[str, Any]) -> Dict[str, Any]: - """Post-response interceptor""" - return await cache_module.post_response_interceptor(context)# Force reload -# Trigger reload diff --git a/backend/modules/rag/module.yaml b/backend/modules/rag/module.yaml new file mode 100644 index 0000000..8dcba8d --- /dev/null +++ b/backend/modules/rag/module.yaml @@ -0,0 +1,109 @@ +name: rag +version: 1.0.0 +description: "Document search, retrieval, and vector storage" +author: "Enclava Team" +category: "ai" + +# Module lifecycle +enabled: true +auto_start: true +dependencies: [] +optional_dependencies: + - cache + +# Module capabilities +provides: + - "document_storage" + - "semantic_search" + - "vector_embeddings" + - "document_processing" + - "workflow_rag_step" + +consumes: + - "qdrant_connection" + - "llm_embeddings" + - "document_parsing" + +# API endpoints +endpoints: + - path: "/rag/collections" + method: "GET" + description: "List document collections" + + - path: "/rag/upload" + method: "POST" + description: "Upload and process documents" + + - path: "/rag/search" + method: "POST" + description: "Semantic search in documents" + + - path: "/rag/collections/{collection_id}/documents" + method: "GET" + description: "List documents in collection" + +# Workflow integration +workflow_steps: + - name: "rag_search" + description: "Search documents for relevant context" + inputs: + - name: "query" + type: "string" + required: true + description: "Search query" + - name: "collection_id" + type: "string" + required: true + description: "Document collection to search" + - name: "top_k" + type: "integer" + required: false + default: 5 + description: "Number of top results to return" + outputs: + - name: "results" + type: "array" + description: "Search results with content and metadata" + - name: "context" + type: "string" + description: "Combined context from top results" + +# UI Configuration +ui_config: + icon: "search" + color: "#8B5CF6" + category: "AI & ML" + + forms: + - name: "collection_config" + title: "Collection Settings" + fields: ["name", "description", "embedding_model"] + + - name: "search_config" + title: "Search Configuration" + fields: ["top_k", "similarity_threshold", "rerank_enabled"] + +# Permissions +permissions: + - name: "rag.create" + description: "Create document collections" + + - name: "rag.upload" + description: "Upload documents to collections" + + - name: "rag.search" + description: "Search document collections" + + - name: "rag.manage" + description: "Manage all collections (admin)" + +# Health checks +health_checks: + - name: "qdrant_connectivity" + description: "Check Qdrant vector database connection" + + - name: "embeddings_service" + description: "Check LLM embeddings service" + + - name: "document_processing" + description: "Check document parsing capabilities" \ No newline at end of file diff --git a/backend/modules/workflow/module.yaml b/backend/modules/workflow/module.yaml new file mode 100644 index 0000000..87ad6c3 --- /dev/null +++ b/backend/modules/workflow/module.yaml @@ -0,0 +1,145 @@ +name: workflow +version: 1.0.0 +description: "Multi-step automation processes" +author: "Enclava Team" +category: "automation" + +# Module lifecycle +enabled: true +auto_start: true +dependencies: [] +optional_dependencies: + - rag + - chatbot + - cache + +# Module capabilities +provides: + - "workflow_execution" + - "step_orchestration" + - "automation_triggers" + - "workflow_templates" + +consumes: + - "llm_completion" + - "rag_search" + - "chatbot_response" + - "external_apis" + +# API endpoints +endpoints: + - path: "/workflow/templates" + method: "GET" + description: "List available workflow templates" + + - path: "/workflow/create" + method: "POST" + description: "Create new workflow" + + - path: "/workflow/execute" + method: "POST" + description: "Execute workflow" + + - path: "/workflow/status/{workflow_id}" + method: "GET" + description: "Get workflow execution status" + + - path: "/workflow/history" + method: "GET" + description: "Get workflow execution history" + +# Workflow integration +workflow_steps: + - name: "conditional_branch" + description: "Conditional logic branching" + inputs: + - name: "condition" + type: "string" + required: true + description: "Condition to evaluate" + - name: "true_path" + type: "object" + required: true + description: "Steps to execute if condition is true" + - name: "false_path" + type: "object" + required: false + description: "Steps to execute if condition is false" + outputs: + - name: "result" + type: "object" + description: "Result from executed branch" + + - name: "loop_iteration" + description: "Iterative processing loop" + inputs: + - name: "items" + type: "array" + required: true + description: "Items to iterate over" + - name: "steps" + type: "object" + required: true + description: "Steps to execute for each item" + outputs: + - name: "results" + type: "array" + description: "Results from each iteration" + +# UI Configuration +ui_config: + icon: "workflow" + color: "#06B6D4" + category: "Automation" + + forms: + - name: "workflow_config" + title: "Workflow Settings" + fields: ["name", "description", "trigger_type"] + + - name: "step_config" + title: "Step Configuration" + fields: ["step_type", "parameters", "retry_attempts"] + + - name: "scheduling" + title: "Scheduling & Triggers" + fields: ["schedule", "webhook_triggers", "event_triggers"] + +# Permissions +permissions: + - name: "workflow.create" + description: "Create new workflows" + + - name: "workflow.execute" + description: "Execute workflows" + + - name: "workflow.configure" + description: "Configure workflow settings" + + - name: "workflow.manage" + description: "Manage all workflows (admin)" + +# Analytics events +analytics_events: + - name: "workflow_created" + description: "New workflow template created" + + - name: "workflow_executed" + description: "Workflow execution started" + + - name: "workflow_completed" + description: "Workflow execution completed" + + - name: "workflow_failed" + description: "Workflow execution failed" + +# Health checks +health_checks: + - name: "execution_engine" + description: "Check workflow execution engine" + + - name: "step_dependencies" + description: "Check availability of workflow step dependencies" + + - name: "template_validation" + description: "Validate workflow templates" \ No newline at end of file diff --git a/backend/modules/zammad/__init__.py b/backend/modules/zammad/__init__.py new file mode 100644 index 0000000..41a35f2 --- /dev/null +++ b/backend/modules/zammad/__init__.py @@ -0,0 +1,13 @@ +""" +Zammad Integration Module for Enclava Platform + +AI-powered ticket summarization for Zammad ticketing system. +Replaces Ollama with Enclava's chatbot system for enhanced security and flexibility. +""" + +from .main import ZammadModule + +__version__ = "1.0.0" +__author__ = "Enclava Platform" + +__all__ = ["ZammadModule"] \ No newline at end of file diff --git a/backend/modules/zammad/main.py b/backend/modules/zammad/main.py new file mode 100644 index 0000000..2ffd743 --- /dev/null +++ b/backend/modules/zammad/main.py @@ -0,0 +1,972 @@ +""" +Zammad Integration Module for Enclava Platform + +AI-powered ticket summarization using Enclava's chatbot system instead of Ollama. +Provides secure, configurable integration with Zammad ticketing systems. +""" + +import asyncio +import json +import logging +import uuid +from datetime import datetime, timedelta, timezone +from typing import Dict, List, Any, Optional, Tuple +from urllib.parse import urljoin + +import aiohttp +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_, or_ +from sqlalchemy.orm import selectinload + +from app.services.base_module import BaseModule, Permission, ModuleHealth +from app.core.config import settings +from app.db.database import async_session_factory +from app.models.user import User +from app.models.chatbot import ChatbotInstance +from app.services.litellm_client import LiteLLMClient +from cryptography.fernet import Fernet +import base64 +import os + +# Import our module-specific models +from .models import ( + ZammadTicket, + ZammadProcessingLog, + ZammadConfiguration, + TicketState, + ProcessingStatus +) + +logger = logging.getLogger(__name__) + + +class ZammadModule(BaseModule): + """Zammad Integration Module for AI-powered ticket summarization""" + + def __init__(self, config: Dict[str, Any] = None): + super().__init__("zammad", config) + self.name = "Zammad Integration" + self.description = "AI-powered ticket summarization for Zammad ticketing system" + self.version = "1.0.0" + + # Core services + self.llm_client = None + self.session_pool = None + + # Processing state + self.auto_process_task = None + self.processing_lock = asyncio.Lock() + + # Initialize encryption for API tokens + self._init_encryption() + + async def initialize(self) -> None: + """Initialize the Zammad module""" + try: + logger.info("Initializing Zammad module...") + + # Initialize LLM client for chatbot integration + self.llm_client = LiteLLMClient() + + # Create HTTP session pool for Zammad API calls + timeout = aiohttp.ClientTimeout(total=60, connect=10) + self.session_pool = aiohttp.ClientSession( + timeout=timeout, + headers={ + "Content-Type": "application/json", + "User-Agent": "Enclava-Zammad-Integration/1.0.0" + } + ) + + # Verify database tables exist (they should be created by migration) + await self._verify_database_tables() + + # Start auto-processing if enabled + await self._start_auto_processing() + + self.initialized = True + self.health.status = "healthy" + self.health.message = "Zammad module initialized successfully" + + logger.info("Zammad module initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize Zammad module: {e}") + self.health.status = "error" + self.health.message = f"Initialization failed: {str(e)}" + raise + + async def cleanup(self) -> None: + """Cleanup module resources""" + try: + logger.info("Cleaning up Zammad module...") + + # Stop auto-processing + if self.auto_process_task and not self.auto_process_task.done(): + self.auto_process_task.cancel() + try: + await self.auto_process_task + except asyncio.CancelledError: + pass + + # Close HTTP session + if self.session_pool and not self.session_pool.closed: + await self.session_pool.close() + + self.initialized = False + logger.info("Zammad module cleanup completed") + + except Exception as e: + logger.error(f"Error during Zammad module cleanup: {e}") + + def get_required_permissions(self) -> List[Permission]: + """Return list of permissions this module requires""" + return [ + Permission("zammad", "read", "Read Zammad tickets and configurations"), + Permission("zammad", "write", "Create and update Zammad ticket summaries"), + Permission("zammad", "configure", "Configure Zammad integration settings"), + Permission("chatbot", "use", "Use chatbot for AI summarization"), + ] + + async def process_request(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Process a module request""" + action = request.get("action", "unknown") + user_id = context.get("user_id") + + logger.info(f"Processing Zammad request: {action} for user {user_id}") + + # Route to appropriate handler based on action + if action == "process_tickets": + return await self._handle_process_tickets(request, context) + elif action == "get_ticket_summary": + return await self._handle_get_ticket_summary(request, context) + elif action == "process_single_ticket": + return await self._handle_process_single_ticket(request, context) + elif action == "get_status": + return await self._handle_get_status(request, context) + elif action == "get_configurations": + return await self._handle_get_configurations(request, context) + elif action == "save_configuration": + return await self._handle_save_configuration(request, context) + elif action == "test_connection": + return await self._handle_test_connection(request, context) + else: + raise ValueError(f"Unknown action: {action}") + + async def _handle_process_tickets(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Handle batch ticket processing request""" + async with self.processing_lock: + user_id = context.get("user_id") + config_id = request.get("config_id") + filters = request.get("filters", {}) + + # Get user configuration + config = await self._get_user_configuration(user_id, config_id) + if not config: + raise ValueError("Configuration not found") + + # Create processing batch + batch_id = str(uuid.uuid4()) + + # Start processing + result = await self._process_tickets_batch( + config=config, + batch_id=batch_id, + user_id=user_id, + filters=filters + ) + + return { + "batch_id": batch_id, + "status": "completed", + "result": result + } + + async def _handle_get_ticket_summary(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Handle get ticket summary request""" + ticket_id = request.get("ticket_id") + if not ticket_id: + raise ValueError("ticket_id is required") + + async with async_session_factory() as db: + # Get ticket from database + stmt = select(ZammadTicket).where(ZammadTicket.zammad_ticket_id == ticket_id) + result = await db.execute(stmt) + ticket = result.scalar_one_or_none() + + if not ticket: + return {"error": "Ticket not found", "ticket_id": ticket_id} + + return {"ticket": ticket.to_dict()} + + async def _handle_process_single_ticket(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Handle single ticket processing request""" + user_id = context.get("user_id") + ticket_id = request.get("ticket_id") + config_id = request.get("config_id") + + if not ticket_id: + raise ValueError("ticket_id is required") + + # Get user configuration + config = await self._get_user_configuration(user_id, config_id) + if not config: + raise ValueError("Configuration not found") + + # Process single ticket + result = await self._process_single_ticket(config, ticket_id, user_id) + + return {"ticket_id": ticket_id, "result": result} + + async def _handle_get_status(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Handle get module status request""" + user_id = context.get("user_id") + + async with async_session_factory() as db: + # Import func for count queries + from sqlalchemy import func + + # Get processing statistics - use func.count() to get actual counts + total_tickets_result = await db.scalar( + select(func.count(ZammadTicket.id)).where(ZammadTicket.processed_by_user_id == user_id) + ) + total_tickets = total_tickets_result or 0 + + processed_tickets_result = await db.scalar( + select(func.count(ZammadTicket.id)).where( + and_( + ZammadTicket.processed_by_user_id == user_id, + ZammadTicket.processing_status == ProcessingStatus.COMPLETED.value + ) + ) + ) + processed_tickets = processed_tickets_result or 0 + + failed_tickets_result = await db.scalar( + select(func.count(ZammadTicket.id)).where( + and_( + ZammadTicket.processed_by_user_id == user_id, + ZammadTicket.processing_status == ProcessingStatus.FAILED.value + ) + ) + ) + failed_tickets = failed_tickets_result or 0 + + # Get recent processing logs + recent_logs = await db.execute( + select(ZammadProcessingLog) + .where(ZammadProcessingLog.initiated_by_user_id == user_id) + .order_by(ZammadProcessingLog.started_at.desc()) + .limit(10) + ) + logs = [log.to_dict() for log in recent_logs.scalars()] + + return { + "module_health": self.get_health().__dict__, + "module_metrics": self.get_metrics().__dict__, + "statistics": { + "total_tickets": total_tickets, + "processed_tickets": processed_tickets, + "failed_tickets": failed_tickets, + "success_rate": (processed_tickets / max(total_tickets, 1)) * 100 if total_tickets > 0 else 0 + }, + "recent_processing_logs": logs + } + + async def _handle_get_configurations(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Handle get configurations request""" + user_id = context.get("user_id") + + async with async_session_factory() as db: + stmt = ( + select(ZammadConfiguration) + .where(ZammadConfiguration.user_id == user_id) + .where(ZammadConfiguration.is_active == True) + .order_by(ZammadConfiguration.is_default.desc(), ZammadConfiguration.created_at.desc()) + ) + result = await db.execute(stmt) + configs = [config.to_dict() for config in result.scalars()] + + return {"configurations": configs} + + async def _handle_save_configuration(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Handle save configuration request""" + user_id = context.get("user_id") + config_data = request.get("configuration", {}) + + # Validate required fields + required_fields = ["name", "zammad_url", "api_token", "chatbot_id"] + for field in required_fields: + if not config_data.get(field): + raise ValueError(f"Required field missing: {field}") + + async with async_session_factory() as db: + # Verify chatbot exists and user has access + chatbot_stmt = select(ChatbotInstance).where( + and_( + ChatbotInstance.id == config_data["chatbot_id"], + ChatbotInstance.created_by == str(user_id), + ChatbotInstance.is_active == True + ) + ) + chatbot = await db.scalar(chatbot_stmt) + if not chatbot: + raise ValueError("Chatbot not found or access denied") + + # Encrypt API token + encrypted_token = self._encrypt_data(config_data["api_token"]) + + # Create new configuration + config = ZammadConfiguration( + user_id=user_id, + name=config_data["name"], + description=config_data.get("description"), + is_default=config_data.get("is_default", False), + zammad_url=config_data["zammad_url"].rstrip("/"), + api_token_encrypted=encrypted_token, + chatbot_id=config_data["chatbot_id"], + process_state=config_data.get("process_state", "open"), + max_tickets=config_data.get("max_tickets", 10), + skip_existing=config_data.get("skip_existing", True), + auto_process=config_data.get("auto_process", False), + process_interval=config_data.get("process_interval", 30), + summary_template=config_data.get("summary_template"), + custom_settings=config_data.get("custom_settings", {}) + ) + + # If this is set as default, unset other defaults + if config.is_default: + await db.execute( + ZammadConfiguration.__table__.update() + .where(ZammadConfiguration.user_id == user_id) + .values(is_default=False) + ) + + db.add(config) + await db.commit() + await db.refresh(config) + + return {"configuration": config.to_dict()} + + async def _handle_test_connection(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + """Handle test Zammad connection request""" + zammad_url = request.get("zammad_url") + api_token = request.get("api_token") + + if not zammad_url or not api_token: + raise ValueError("zammad_url and api_token are required") + + result = await self._test_zammad_connection(zammad_url, api_token) + return result + + async def _process_tickets_batch(self, config: ZammadConfiguration, batch_id: str, user_id: int, filters: Dict[str, Any]) -> Dict[str, Any]: + """Process a batch of tickets""" + async with async_session_factory() as db: + # Create processing log + log = ZammadProcessingLog( + batch_id=batch_id, + initiated_by_user_id=user_id, + config_used=config.to_dict(), + filters_applied=filters, + status="running" + ) + db.add(log) + await db.commit() + + start_time = datetime.now(timezone.utc) # Keep as timezone-aware for calculations + + try: + # Get tickets from Zammad + tickets = await self._fetch_zammad_tickets(config, filters) + log.tickets_found = len(tickets) + + logger.info(f"Fetched {len(tickets)} tickets from Zammad for processing") + + # Process each ticket + processed = 0 + failed = 0 + skipped = 0 + + for i, ticket_data in enumerate(tickets, 1): + try: + # Validate ticket data structure + if not isinstance(ticket_data, dict): + logger.error(f"Ticket {i} is not a dictionary: {type(ticket_data)}") + failed += 1 + continue + + ticket_id = ticket_data.get('id', f'unknown-{i}') + logger.info(f"Processing ticket {i}/{len(tickets)}: ID {ticket_id}") + logger.info(f"Ticket {i} data type: {type(ticket_data)}") + logger.info(f"Ticket {i} content: {str(ticket_data)[:300]}...") + + result = await self._process_ticket_data(config, ticket_data, user_id) + + if result["status"] == "processed": + processed += 1 + elif result["status"] == "skipped": + skipped += 1 + else: + failed += 1 + + except Exception as e: + # Safely get ticket ID for error reporting + ticket_id = ticket_data.get('id', f'unknown-{i}') if isinstance(ticket_data, dict) else f'unknown-{i}' + logger.error(f"Error processing ticket {ticket_id}: {e}") + logger.debug(f"Ticket data type: {type(ticket_data)}, content: {str(ticket_data)[:200]}...") + failed += 1 + + # Update log + end_time = datetime.now(timezone.utc) + processing_time = (end_time - start_time).total_seconds() + + log.completed_at = self._to_naive_utc(end_time) + log.tickets_processed = processed + log.tickets_failed = failed + log.tickets_skipped = skipped + log.processing_time_seconds = int(processing_time) + log.average_time_per_ticket = int((processing_time / max(len(tickets), 1)) * 1000) + log.status = "completed" + + await db.commit() + + return { + "tickets_found": len(tickets), + "tickets_processed": processed, + "tickets_failed": failed, + "tickets_skipped": skipped, + "processing_time_seconds": int(processing_time) + } + + except Exception as e: + # Update log with error + log.status = "failed" + log.errors_encountered = [str(e)] + log.completed_at = self._to_naive_utc(datetime.now(timezone.utc)) + await db.commit() + raise + + async def _process_single_ticket(self, config: ZammadConfiguration, ticket_id: int, user_id: int) -> Dict[str, Any]: + """Process a single ticket""" + # Fetch ticket details from Zammad + ticket_data = await self._fetch_single_zammad_ticket(config, ticket_id) + if not ticket_data: + return {"status": "error", "message": "Ticket not found"} + + # Process the ticket + result = await self._process_ticket_data(config, ticket_data, user_id) + return result + + async def _process_ticket_data(self, config: ZammadConfiguration, ticket_data: Dict[str, Any], user_id: int) -> Dict[str, Any]: + """Process individual ticket data""" + logger.info(f"Processing ticket data: type={type(ticket_data)}, keys={list(ticket_data.keys()) if isinstance(ticket_data, dict) else 'N/A'}") + + # Ensure ticket_data is a dictionary + if not isinstance(ticket_data, dict): + raise ValueError(f"Expected ticket_data to be a dictionary, got {type(ticket_data)}") + + ticket_id = ticket_data.get("id") + if ticket_id is None: + raise ValueError("Ticket data missing 'id' field") + + # Debug nested field types that might be causing issues + logger.info(f"Ticket {ticket_id} customer field type: {type(ticket_data.get('customer'))}") + logger.info(f"Ticket {ticket_id} customer value: {ticket_data.get('customer')}") + logger.info(f"Ticket {ticket_id} group field type: {type(ticket_data.get('group'))}") + logger.info(f"Ticket {ticket_id} state field type: {type(ticket_data.get('state'))}") + logger.info(f"Ticket {ticket_id} priority field type: {type(ticket_data.get('priority'))}") + + async with async_session_factory() as db: + # Check if ticket already exists and should be skipped + existing = await db.scalar( + select(ZammadTicket).where(ZammadTicket.zammad_ticket_id == ticket_id) + ) + + if existing and config.skip_existing and existing.processing_status == ProcessingStatus.COMPLETED.value: + return {"status": "skipped", "reason": "already_processed"} + + try: + # Fetch full ticket details and conversation + logger.info(f"Ticket {ticket_id}: Fetching full ticket details with articles...") + try: + full_ticket = await self._fetch_ticket_with_articles(config, ticket_id) + if not full_ticket: + return {"status": "error", "message": "Could not fetch ticket details"} + logger.info(f"Ticket {ticket_id}: Successfully fetched full ticket details") + except Exception as e: + logger.error(f"Ticket {ticket_id}: Error fetching full ticket details: {e}") + raise + + # Generate summary using chatbot + logger.info(f"Ticket {ticket_id}: Generating AI summary...") + try: + summary = await self._generate_ticket_summary(config, full_ticket) + logger.info(f"Ticket {ticket_id}: Successfully generated AI summary") + except Exception as e: + logger.error(f"Ticket {ticket_id}: Error generating summary: {e}") + raise + + # Create/update ticket record + if existing: + ticket_record = existing + ticket_record.processing_status = ProcessingStatus.PROCESSING.value + else: + ticket_record = ZammadTicket( + zammad_ticket_id=ticket_id, + ticket_number=ticket_data.get("number"), + title=ticket_data.get("title"), + state=ticket_data.get("state"), + priority=ticket_data.get("priority"), + customer_email=self._safe_get_customer_email(ticket_data), + processing_status=ProcessingStatus.PROCESSING.value, + processed_by_user_id=user_id, + chatbot_id=config.chatbot_id + ) + db.add(ticket_record) + + # Update with summary and processing info + ticket_record.summary = summary + ticket_record.context_data = full_ticket + ticket_record.processed_at = self._to_naive_utc(datetime.now(timezone.utc)) + ticket_record.processing_status = ProcessingStatus.COMPLETED.value + ticket_record.config_snapshot = config.to_dict() + + # Safely parse Zammad timestamps and convert to naive UTC for DB + if ticket_data.get("created_at"): + parsed_dt = self._safe_parse_datetime(ticket_data["created_at"]) + ticket_record.zammad_created_at = self._to_naive_utc(parsed_dt) + if ticket_data.get("updated_at"): + parsed_dt = self._safe_parse_datetime(ticket_data["updated_at"]) + ticket_record.zammad_updated_at = self._to_naive_utc(parsed_dt) + + ticket_record.zammad_article_count = len(full_ticket.get("articles", [])) + + await db.commit() + + # Post summary to Zammad as internal note + await self._post_summary_to_zammad(config, ticket_id, summary) + + return {"status": "processed", "summary": summary} + + except Exception as e: + logger.error(f"Error processing ticket {ticket_id}: {e}") + + # Update record with error + if existing: + ticket_record = existing + else: + ticket_record = ZammadTicket( + zammad_ticket_id=ticket_id, + ticket_number=ticket_data.get("number"), + title=ticket_data.get("title"), + state=ticket_data.get("state"), + processing_status=ProcessingStatus.FAILED.value, + processed_by_user_id=user_id, + chatbot_id=config.chatbot_id + ) + db.add(ticket_record) + + ticket_record.processing_status = ProcessingStatus.FAILED.value + ticket_record.error_message = str(e) + ticket_record.processed_at = self._to_naive_utc(datetime.now(timezone.utc)) + + await db.commit() + + return {"status": "error", "message": str(e)} + + async def _generate_ticket_summary(self, config: ZammadConfiguration, ticket_data: Dict[str, Any]) -> str: + """Generate AI summary for ticket using Enclava chatbot""" + # Build context for the LLM + context = self._build_ticket_context(ticket_data) + + # Get summary template + template = config.summary_template or ( + "Generate a concise summary of this support ticket including key issues, " + "customer concerns, and any actions taken." + ) + + # Prepare messages for chatbot + messages = [ + { + "role": "system", + "content": f"{template}\n\nPlease provide a professional summary that would be helpful for support agents." + }, + { + "role": "user", + "content": context + } + ] + + # Generate summary using LLM client + response = await self.llm_client.create_chat_completion( + messages=messages, + model=await self._get_chatbot_model(config.chatbot_id), + user_id=str(config.user_id), + api_key_id=0, # Using 0 for module requests + temperature=0.3, + max_tokens=500 + ) + + # Extract content from LiteLLM response + if "choices" in response and len(response["choices"]) > 0: + return response["choices"][0]["message"]["content"].strip() + + return "Unable to generate summary." + + def _build_ticket_context(self, ticket_data: Dict[str, Any]) -> str: + """Build formatted context string for AI processing""" + context_parts = [] + + # Basic ticket information + context_parts.append(f"Ticket #{ticket_data.get('number', 'Unknown')}") + context_parts.append(f"Title: {ticket_data.get('title', 'No title')}") + context_parts.append(f"State: {ticket_data.get('state', 'Unknown')}") + + if ticket_data.get('priority'): + context_parts.append(f"Priority: {ticket_data['priority']}") + + customer_email = self._safe_get_customer_email(ticket_data) + if customer_email: + context_parts.append(f"Customer: {customer_email}") + + # Add conversation history + articles = ticket_data.get('articles', []) + if articles: + context_parts.append("\nConversation History:") + + for i, article in enumerate(articles[-10:], 1): # Last 10 articles + try: + # Safely extract article data + if not isinstance(article, dict): + logger.warning(f"Article {i} is not a dictionary: {type(article)}") + continue + + sender = article.get('from', 'Unknown') + content = article.get('body', '').strip() + + if content: + # Clean up HTML if present + if '<' in content and '>' in content: + import re + content = re.sub(r'<[^>]+>', '', content) + content = content.replace(' ', ' ') + content = content.replace('&', '&') + content = content.replace('<', '<') + content = content.replace('>', '>') + + # Truncate very long messages + if len(content) > 1000: + content = content[:1000] + "... [truncated]" + + context_parts.append(f"\n{i}. From: {sender}") + context_parts.append(f" {content}") + + except Exception as e: + logger.warning(f"Error processing article {i}: {e}") + continue + + return "\n".join(context_parts) + + async def _get_chatbot_model(self, chatbot_id: str) -> str: + """Get the model name for the specified chatbot""" + async with async_session_factory() as db: + chatbot = await db.scalar( + select(ChatbotInstance).where(ChatbotInstance.id == chatbot_id) + ) + + if not chatbot: + raise ValueError(f"Chatbot {chatbot_id} not found") + + # Default to a reasonable model if not specified + return getattr(chatbot, 'model', 'privatemode-llama-3-70b') + + async def _fetch_zammad_tickets(self, config: ZammadConfiguration, filters: Dict[str, Any]) -> List[Dict[str, Any]]: + """Fetch tickets from Zammad API""" + # Decrypt API token + api_token = self._decrypt_data(config.api_token_encrypted) + + url = urljoin(config.zammad_url, "/api/v1/tickets") + headers = { + "Authorization": f"Token token={api_token}", + "Content-Type": "application/json" + } + + # Build query parameters + params = { + "expand": "true", + "per_page": filters.get("limit", config.max_tickets) + } + + # Add state filter + state = filters.get("state", config.process_state) + if state and state != "all": + params["state"] = state + + async with self.session_pool.get(url, headers=headers, params=params) as response: + if response.status == 200: + data = await response.json() + + # Handle different Zammad API response formats + if isinstance(data, list): + # Zammad returned a list directly + tickets = data + logger.info(f"Zammad API returned list directly with {len(tickets)} tickets") + elif isinstance(data, dict): + # Zammad returned a dictionary with "tickets" key + tickets = data.get("tickets", []) + logger.info(f"Zammad API returned dict with {len(tickets)} tickets") + logger.debug(f"Zammad API response structure: keys={list(data.keys())}") + else: + logger.error(f"Unexpected Zammad API response type: {type(data)}") + raise Exception(f"Zammad API returned unexpected data type: {type(data)}") + + # Validate that tickets is actually a list + if not isinstance(tickets, list): + logger.error(f"Expected tickets to be a list, got {type(tickets)}: {str(tickets)[:200]}...") + raise Exception(f"Zammad API returned invalid ticket data structure: expected list, got {type(tickets)}") + + return tickets + else: + error_text = await response.text() + raise Exception(f"Zammad API error {response.status}: {error_text}") + + async def _fetch_single_zammad_ticket(self, config: ZammadConfiguration, ticket_id: int) -> Optional[Dict[str, Any]]: + """Fetch single ticket from Zammad API""" + api_token = self._decrypt_data(config.api_token_encrypted) + + url = urljoin(config.zammad_url, f"/api/v1/tickets/{ticket_id}") + headers = { + "Authorization": f"Token token={api_token}", + "Content-Type": "application/json" + } + + params = {"expand": "true"} + + async with self.session_pool.get(url, headers=headers, params=params) as response: + if response.status == 200: + return await response.json() + elif response.status == 404: + return None + else: + error_text = await response.text() + raise Exception(f"Zammad API error {response.status}: {error_text}") + + async def _fetch_ticket_with_articles(self, config: ZammadConfiguration, ticket_id: int) -> Optional[Dict[str, Any]]: + """Fetch ticket with full conversation articles""" + # Get basic ticket info + ticket = await self._fetch_single_zammad_ticket(config, ticket_id) + if not ticket: + return None + + # Fetch articles + api_token = self._decrypt_data(config.api_token_encrypted) + articles_url = urljoin(config.zammad_url, f"/api/v1/ticket_articles/by_ticket/{ticket_id}") + headers = { + "Authorization": f"Token token={api_token}", + "Content-Type": "application/json" + } + + async with self.session_pool.get(articles_url, headers=headers) as response: + if response.status == 200: + articles_data = await response.json() + + # Handle different Zammad articles API response formats + if isinstance(articles_data, list): + # Articles returned as list directly + articles = articles_data + logger.info(f"Articles API returned list directly with {len(articles)} articles for ticket {ticket_id}") + elif isinstance(articles_data, dict): + # Articles returned as dictionary with "articles" key + articles = articles_data.get("articles", []) + logger.info(f"Articles API returned dict with {len(articles)} articles for ticket {ticket_id}") + else: + logger.error(f"Unexpected articles API response type for ticket {ticket_id}: {type(articles_data)}") + articles = [] + + ticket["articles"] = articles + else: + logger.warning(f"Could not fetch articles for ticket {ticket_id}: {response.status}") + ticket["articles"] = [] + + return ticket + + async def _post_summary_to_zammad(self, config: ZammadConfiguration, ticket_id: int, summary: str) -> bool: + """Post AI summary as internal note to Zammad ticket""" + try: + api_token = self._decrypt_data(config.api_token_encrypted) + + url = urljoin(config.zammad_url, "/api/v1/ticket_articles") + headers = { + "Authorization": f"Token token={api_token}", + "Content-Type": "application/json" + } + + # Create internal note payload + article_data = { + "ticket_id": ticket_id, + "type": "note", + "internal": True, # This ensures only agents can see it + "subject": "AI Summary - Enclava", + "body": f"**AI-Generated Summary**\n\n{summary}\n\n---\n*Generated by Enclava AI at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC*" + } + + async with self.session_pool.post(url, headers=headers, json=article_data) as response: + if response.status in (200, 201): + logger.info(f"Successfully posted AI summary to ticket {ticket_id}") + return True + else: + error_text = await response.text() + logger.error(f"Failed to post summary to ticket {ticket_id}: {response.status} - {error_text}") + return False + + except Exception as e: + logger.error(f"Error posting summary to Zammad ticket {ticket_id}: {e}") + return False + + async def _test_zammad_connection(self, zammad_url: str, api_token: str) -> Dict[str, Any]: + """Test connection to Zammad instance""" + try: + url = urljoin(zammad_url.rstrip("/"), "/api/v1/users/me") + headers = { + "Authorization": f"Token token={api_token}", + "Content-Type": "application/json" + } + + async with self.session_pool.get(url, headers=headers) as response: + if response.status == 200: + user_data = await response.json() + return { + "status": "success", + "message": "Connection successful", + "user": user_data.get("email", "Unknown"), + "zammad_version": response.headers.get("X-Zammad-Version", "Unknown") + } + else: + error_text = await response.text() + return { + "status": "error", + "message": f"Connection failed: HTTP {response.status}", + "details": error_text + } + + except Exception as e: + return { + "status": "error", + "message": f"Connection error: {str(e)}" + } + + async def _get_user_configuration(self, user_id: int, config_id: Optional[int] = None) -> Optional[ZammadConfiguration]: + """Get user configuration by ID or default""" + async with async_session_factory() as db: + if config_id: + stmt = select(ZammadConfiguration).where( + and_( + ZammadConfiguration.id == config_id, + ZammadConfiguration.user_id == user_id, + ZammadConfiguration.is_active == True + ) + ) + else: + # Get default configuration + stmt = select(ZammadConfiguration).where( + and_( + ZammadConfiguration.user_id == user_id, + ZammadConfiguration.is_active == True, + ZammadConfiguration.is_default == True + ) + ).order_by(ZammadConfiguration.created_at.desc()) + + result = await db.execute(stmt) + return result.scalar_one_or_none() + + async def _verify_database_tables(self): + """Verify that required database tables exist""" + # This would be handled by Alembic migrations in production + # For now, just log that we expect the tables to exist + logger.info("Verifying database tables for Zammad module") + + async def _start_auto_processing(self): + """Start auto-processing task if any configurations have it enabled""" + # This would start a background task to periodically check for auto-process configs + # and process new tickets automatically + logger.info("Auto-processing monitoring not implemented yet") + pass + + def _init_encryption(self): + """Initialize encryption for API tokens""" + # Use a fixed key for demo - in production, this should be from environment + key = os.environ.get('ZAMMAD_ENCRYPTION_KEY', 'demo-key-for-zammad-tokens-12345678901234567890123456789012') + # Ensure key is exactly 32 bytes for Fernet + key = key.encode()[:32].ljust(32, b'0') + self.encryption_key = base64.urlsafe_b64encode(key) + self.cipher = Fernet(self.encryption_key) + + def _encrypt_data(self, data: str) -> str: + """Encrypt sensitive data""" + return self.cipher.encrypt(data.encode()).decode() + + def _decrypt_data(self, encrypted_data: str) -> str: + """Decrypt sensitive data""" + return self.cipher.decrypt(encrypted_data.encode()).decode() + + def _safe_get_customer_email(self, ticket_data: Dict[str, Any]) -> Optional[str]: + """Safely extract customer email from ticket data""" + try: + customer = ticket_data.get('customer') + if not customer: + return None + + # Handle case where customer is a dictionary + if isinstance(customer, dict): + return customer.get('email') + + # Handle case where customer is a list (sometimes Zammad returns a list) + elif isinstance(customer, list) and len(customer) > 0: + first_customer = customer[0] + if isinstance(first_customer, dict): + return first_customer.get('email') + + # Handle case where customer is just the email string + elif isinstance(customer, str) and '@' in customer: + return customer + + return None + + except Exception as e: + logger.warning(f"Could not extract customer email from ticket data: {e}") + return None + + def _safe_parse_datetime(self, datetime_str: str) -> Optional[datetime]: + """Safely parse datetime string from Zammad API to timezone-aware datetime""" + if not datetime_str: + return None + + try: + # Handle different Zammad datetime formats + if datetime_str.endswith('Z'): + # ISO format with Z suffix: "2025-08-20T12:07:28.857000Z" + return datetime.fromisoformat(datetime_str.replace("Z", "+00:00")) + elif '+' in datetime_str or '-' in datetime_str[-6:]: + # Already has timezone info: "2025-08-20T12:07:28.857000+00:00" + return datetime.fromisoformat(datetime_str) + else: + # No timezone info - assume UTC: "2025-08-20T12:07:28.857000" + dt = datetime.fromisoformat(datetime_str) + if dt.tzinfo is None: + # Make it timezone-aware as UTC + dt = dt.replace(tzinfo=timezone.utc) + return dt + + except Exception as e: + logger.warning(f"Could not parse datetime '{datetime_str}': {e}") + return None + + def _to_naive_utc(self, dt: datetime) -> datetime: + """Convert timezone-aware datetime to naive UTC for database storage""" + if dt is None: + return None + if dt.tzinfo is None: + # Already naive, assume it's UTC + return dt + # Convert to UTC and make naive + return dt.astimezone(timezone.utc).replace(tzinfo=None) \ No newline at end of file diff --git a/backend/modules/zammad/models.py b/backend/modules/zammad/models.py new file mode 100644 index 0000000..52f68b7 --- /dev/null +++ b/backend/modules/zammad/models.py @@ -0,0 +1,241 @@ +""" +Database models for Zammad Integration Module +""" + +from datetime import datetime +from typing import Optional, Dict, Any +from enum import Enum +from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, JSON, ForeignKey, Index +from sqlalchemy.orm import relationship +from app.db.database import Base + + +class TicketState(str, Enum): + """Zammad ticket state enumeration""" + NEW = "new" + OPEN = "open" + PENDING_REMINDER = "pending reminder" + PENDING_CLOSE = "pending close" + CLOSED = "closed" + MERGED = "merged" + REMOVED = "removed" + + +class ProcessingStatus(str, Enum): + """Ticket processing status""" + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + SKIPPED = "skipped" + + +class ZammadTicket(Base): + """Model for tracking Zammad tickets and their processing status""" + + __tablename__ = "zammad_tickets" + + # Primary key + id = Column(Integer, primary_key=True, index=True) + + # Zammad ticket information + zammad_ticket_id = Column(Integer, unique=True, index=True, nullable=False) + ticket_number = Column(String, index=True, nullable=False) + title = Column(String, nullable=False) + state = Column(String, nullable=False) # Zammad state + priority = Column(String, nullable=True) + customer_email = Column(String, nullable=True) + + # Processing information + processing_status = Column(String, default=ProcessingStatus.PENDING.value, nullable=False) + processed_at = Column(DateTime, nullable=True) + processed_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True) + chatbot_id = Column(String, nullable=True) + + # Summary and context + summary = Column(Text, nullable=True) + context_data = Column(JSON, nullable=True) # Original ticket data + error_message = Column(Text, nullable=True) + + # Metadata + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + # Zammad specific metadata + zammad_created_at = Column(DateTime, nullable=True) + zammad_updated_at = Column(DateTime, nullable=True) + zammad_article_count = Column(Integer, default=0, nullable=False) + + # Processing configuration snapshot + config_snapshot = Column(JSON, nullable=True) # Config used during processing + + # Relationships + processed_by = relationship("User", foreign_keys=[processed_by_user_id]) + + # Indexes for better query performance + __table_args__ = ( + Index("idx_zammad_tickets_status_created", "processing_status", "created_at"), + Index("idx_zammad_tickets_state_processed", "state", "processed_at"), + Index("idx_zammad_tickets_user_status", "processed_by_user_id", "processing_status"), + ) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for API responses""" + return { + "id": self.id, + "zammad_ticket_id": self.zammad_ticket_id, + "ticket_number": self.ticket_number, + "title": self.title, + "state": self.state, + "priority": self.priority, + "customer_email": self.customer_email, + "processing_status": self.processing_status, + "processed_at": self.processed_at.isoformat() if self.processed_at else None, + "processed_by_user_id": self.processed_by_user_id, + "chatbot_id": self.chatbot_id, + "summary": self.summary, + "error_message": self.error_message, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "zammad_created_at": self.zammad_created_at.isoformat() if self.zammad_created_at else None, + "zammad_updated_at": self.zammad_updated_at.isoformat() if self.zammad_updated_at else None, + "zammad_article_count": self.zammad_article_count + } + + +class ZammadProcessingLog(Base): + """Model for logging Zammad processing activities""" + + __tablename__ = "zammad_processing_logs" + + # Primary key + id = Column(Integer, primary_key=True, index=True) + + # Processing batch information + batch_id = Column(String, index=True, nullable=False) # UUID for batch processing + started_at = Column(DateTime, default=datetime.utcnow, nullable=False) + completed_at = Column(DateTime, nullable=True) + initiated_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True) + + # Processing configuration + config_used = Column(JSON, nullable=True) + filters_applied = Column(JSON, nullable=True) # State, limit, etc. + + # Results + tickets_found = Column(Integer, default=0, nullable=False) + tickets_processed = Column(Integer, default=0, nullable=False) + tickets_failed = Column(Integer, default=0, nullable=False) + tickets_skipped = Column(Integer, default=0, nullable=False) + + # Performance metrics + processing_time_seconds = Column(Integer, nullable=True) + average_time_per_ticket = Column(Integer, nullable=True) # milliseconds + + # Error tracking + errors_encountered = Column(JSON, nullable=True) # List of error messages + status = Column(String, default="running", nullable=False) # running, completed, failed + + # Relationships + initiated_by = relationship("User", foreign_keys=[initiated_by_user_id]) + + # Indexes + __table_args__ = ( + Index("idx_processing_logs_batch_status", "batch_id", "status"), + Index("idx_processing_logs_user_started", "initiated_by_user_id", "started_at"), + ) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for API responses""" + return { + "id": self.id, + "batch_id": self.batch_id, + "started_at": self.started_at.isoformat(), + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "initiated_by_user_id": self.initiated_by_user_id, + "config_used": self.config_used, + "filters_applied": self.filters_applied, + "tickets_found": self.tickets_found, + "tickets_processed": self.tickets_processed, + "tickets_failed": self.tickets_failed, + "tickets_skipped": self.tickets_skipped, + "processing_time_seconds": self.processing_time_seconds, + "average_time_per_ticket": self.average_time_per_ticket, + "errors_encountered": self.errors_encountered, + "status": self.status + } + + +class ZammadConfiguration(Base): + """Model for storing Zammad module configurations per user""" + + __tablename__ = "zammad_configurations" + + # Primary key + id = Column(Integer, primary_key=True, index=True) + + # User association + user_id = Column(Integer, ForeignKey("users.id"), nullable=False) + + # Configuration name and description + name = Column(String, nullable=False) + description = Column(Text, nullable=True) + is_default = Column(Boolean, default=False, nullable=False) + is_active = Column(Boolean, default=True, nullable=False) + + # Zammad connection settings + zammad_url = Column(String, nullable=False) + api_token_encrypted = Column(String, nullable=False) # Encrypted API token + + # Processing settings + chatbot_id = Column(String, nullable=False) + process_state = Column(String, default="open", nullable=False) + max_tickets = Column(Integer, default=10, nullable=False) + skip_existing = Column(Boolean, default=True, nullable=False) + auto_process = Column(Boolean, default=False, nullable=False) + process_interval = Column(Integer, default=30, nullable=False) # minutes + + # Customization + summary_template = Column(Text, nullable=True) + custom_settings = Column(JSON, nullable=True) # Additional custom settings + + # Metadata + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + last_used_at = Column(DateTime, nullable=True) + + # Relationships + user = relationship("User", foreign_keys=[user_id]) + + # Indexes + __table_args__ = ( + Index("idx_zammad_config_user_active", "user_id", "is_active"), + Index("idx_zammad_config_user_default", "user_id", "is_default"), + ) + + def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]: + """Convert to dictionary for API responses""" + result = { + "id": self.id, + "user_id": self.user_id, + "name": self.name, + "description": self.description, + "is_default": self.is_default, + "is_active": self.is_active, + "zammad_url": self.zammad_url, + "chatbot_id": self.chatbot_id, + "process_state": self.process_state, + "max_tickets": self.max_tickets, + "skip_existing": self.skip_existing, + "auto_process": self.auto_process, + "process_interval": self.process_interval, + "summary_template": self.summary_template, + "custom_settings": self.custom_settings, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "last_used_at": self.last_used_at.isoformat() if self.last_used_at else None + } + + if include_sensitive: + result["api_token_encrypted"] = self.api_token_encrypted + + return result \ No newline at end of file diff --git a/backend/modules/zammad/module.yaml b/backend/modules/zammad/module.yaml new file mode 100644 index 0000000..65b8ddc --- /dev/null +++ b/backend/modules/zammad/module.yaml @@ -0,0 +1,72 @@ +name: zammad +version: 1.0.0 +description: "AI-powered ticket summarization for Zammad ticketing system" +author: "Enclava Team" +category: "integration" + +# Module lifecycle +enabled: true +auto_start: true +dependencies: + - chatbot +optional_dependencies: [] + +# Module capabilities +provides: + - "ticket_summarization" + - "zammad_integration" + - "batch_processing" + +consumes: + - "chatbot_completion" + - "llm_completion" + +# API endpoints +endpoints: + - path: "/zammad/configurations" + method: "GET" + description: "List Zammad configurations" + + - path: "/zammad/configurations" + method: "POST" + description: "Create new Zammad configuration" + + - path: "/zammad/test-connection" + method: "POST" + description: "Test Zammad connection" + + - path: "/zammad/process" + method: "POST" + description: "Process tickets for summarization" + + - path: "/zammad/status" + method: "GET" + description: "Get processing status and statistics" + +# UI Configuration +ui_config: + icon: "ticket" + color: "#3B82F6" + category: "Integration" + +# Permissions +permissions: + - name: "zammad.read" + description: "Read Zammad tickets and configurations" + + - name: "zammad.write" + description: "Create and update Zammad ticket summaries" + + - name: "zammad.configure" + description: "Configure Zammad integration settings" + + - name: "chatbot.use" + description: "Use chatbot for AI summarization" + +# Health checks +health_checks: + - name: "zammad_connectivity" + description: "Check Zammad API connection" + + - name: "chatbot_availability" + description: "Check chatbot module availability" \ No newline at end of file diff --git a/backend/requirements.txt b/backend/requirements.txt index 7adac6a..b227940 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -48,15 +48,18 @@ qdrant-client==1.7.0 # Text Processing tiktoken==0.5.1 -# NLP and Content Processing (required for RAG module with integrated content processing) -nltk==3.8.1 -spacy==3.7.2 +# Basic document processing (lightweight) markitdown==0.0.1a2 python-docx==1.1.0 -sentence-transformers==2.6.1 -# Optional heavy ML dependencies (commented out for lighter deployments) -# transformers==4.35.2 +# Advanced NLP processing (OPTIONAL - only for entity extraction) +# Install requirements-nlp.txt separately if you need these features: +# nltk==3.8.1 +# spacy==3.7.2 + +# Heavy ML dependencies (REMOVED - unused in codebase) +# sentence-transformers==2.6.1 # REMOVED - not used anywhere in codebase +# transformers==4.35.2 # REMOVED - already commented out # Configuration pyyaml==6.0.1 diff --git a/backend/tests/integration_test.py b/backend/tests/integration_test.py index fef7d25..be224e8 100644 --- a/backend/tests/integration_test.py +++ b/backend/tests/integration_test.py @@ -17,7 +17,7 @@ class LiveModuleIntegrationTest: self.client = httpx.AsyncClient(timeout=30.0) async def test_all_modules_loaded(self): - """Test that all 7 modules are loaded and operational""" + """Test that all 5 modules are loaded and operational""" print("๐Ÿงช Testing module loading...") response = await self.client.get(f"{self.base_url}/api/v1/modules/") @@ -27,12 +27,12 @@ class LiveModuleIntegrationTest: print(f"โœ“ API Response: {response.status_code}") print(f"โœ“ Total modules: {data['total']}") - # Verify we have all 7 modules - assert data["total"] >= 7, f"Expected at least 7 modules, got {data['total']}" - assert data["module_count"] >= 7 + # Verify we have all 5 modules (updated after 2025-08-10 cleanup) + assert data["total"] >= 5, f"Expected at least 5 modules, got {data['total']}" + assert data["module_count"] >= 5 assert data["initialized"] is True - expected_modules = ['cache', 'analytics', 'rag', 'content', 'security', 'monitoring', 'config'] + expected_modules = ['cache', 'chatbot', 'rag', 'signal', 'workflow'] loaded_modules = [mod["name"] for mod in data["modules"]] for expected in expected_modules: diff --git a/docker-compose.yml b/docker-compose.yml index a10642e..7c96601 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,22 @@ name: enclava services: + # Database migration service - runs once to apply migrations + enclava-migrate: + build: + context: ./backend + dockerfile: Dockerfile + environment: + - DATABASE_URL=postgresql://enclava_user:enclava_pass@enclava-postgres:5432/enclava_db + depends_on: + - enclava-postgres + command: ["/usr/local/bin/migrate.sh"] + volumes: + - ./backend:/app + networks: + - enclava-net + restart: "no" # Run once and exit + # Main application backend enclava-backend: build: @@ -18,6 +34,7 @@ services: - ADMIN_PASSWORD=${ADMIN_PASSWORD:-admin123} - LOG_LLM_PROMPTS=${LOG_LLM_PROMPTS:-false} depends_on: + - enclava-migrate - enclava-postgres - enclava-redis - enclava-qdrant diff --git a/frontend/src/app/api/v1/zammad/chatbots/route.ts b/frontend/src/app/api/v1/zammad/chatbots/route.ts new file mode 100644 index 0000000..c4ede42 --- /dev/null +++ b/frontend/src/app/api/v1/zammad/chatbots/route.ts @@ -0,0 +1,41 @@ +import { NextRequest, NextResponse } from 'next/server' + +export async function GET(request: NextRequest) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + // Make request to backend Zammad chatbots endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/chatbots` + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + } + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error fetching Zammad chatbots:', error) + return NextResponse.json( + { error: 'Failed to fetch Zammad chatbots' }, + { status: 500 } + ) + } +} \ No newline at end of file diff --git a/frontend/src/app/api/v1/zammad/configurations/[id]/route.ts b/frontend/src/app/api/v1/zammad/configurations/[id]/route.ts new file mode 100644 index 0000000..4b79928 --- /dev/null +++ b/frontend/src/app/api/v1/zammad/configurations/[id]/route.ts @@ -0,0 +1,87 @@ +import { NextRequest, NextResponse } from 'next/server' + +export async function PUT(request: NextRequest, { params }: { params: { id: string } }) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + const body = await request.json() + const configId = params.id + + // Make request to backend Zammad configurations endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/configurations/${configId}` + + const response = await fetch(url, { + method: 'PUT', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + }, + body: JSON.stringify(body) + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error updating Zammad configuration:', error) + return NextResponse.json( + { error: 'Failed to update Zammad configuration' }, + { status: 500 } + ) + } +} + +export async function DELETE(request: NextRequest, { params }: { params: { id: string } }) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + const configId = params.id + + // Make request to backend Zammad configurations endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/configurations/${configId}` + + const response = await fetch(url, { + method: 'DELETE', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + } + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error deleting Zammad configuration:', error) + return NextResponse.json( + { error: 'Failed to delete Zammad configuration' }, + { status: 500 } + ) + } +} \ No newline at end of file diff --git a/frontend/src/app/api/v1/zammad/configurations/route.ts b/frontend/src/app/api/v1/zammad/configurations/route.ts new file mode 100644 index 0000000..d7caab3 --- /dev/null +++ b/frontend/src/app/api/v1/zammad/configurations/route.ts @@ -0,0 +1,84 @@ +import { NextRequest, NextResponse } from 'next/server' + +export async function GET(request: NextRequest) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + // Make request to backend Zammad configurations endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/configurations` + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + } + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error fetching Zammad configurations:', error) + return NextResponse.json( + { error: 'Failed to fetch Zammad configurations' }, + { status: 500 } + ) + } +} + +export async function POST(request: NextRequest) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + const body = await request.json() + + // Make request to backend Zammad configurations endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/configurations` + + const response = await fetch(url, { + method: 'POST', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + }, + body: JSON.stringify(body) + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error creating Zammad configuration:', error) + return NextResponse.json( + { error: 'Failed to create Zammad configuration' }, + { status: 500 } + ) + } +} \ No newline at end of file diff --git a/frontend/src/app/api/v1/zammad/process/route.ts b/frontend/src/app/api/v1/zammad/process/route.ts new file mode 100644 index 0000000..460403f --- /dev/null +++ b/frontend/src/app/api/v1/zammad/process/route.ts @@ -0,0 +1,44 @@ +import { NextRequest, NextResponse } from 'next/server' + +export async function POST(request: NextRequest) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + const body = await request.json() + + // Make request to backend Zammad process endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/process` + + const response = await fetch(url, { + method: 'POST', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + }, + body: JSON.stringify(body) + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error processing Zammad tickets:', error) + return NextResponse.json( + { error: 'Failed to process Zammad tickets' }, + { status: 500 } + ) + } +} \ No newline at end of file diff --git a/frontend/src/app/api/v1/zammad/processing-logs/route.ts b/frontend/src/app/api/v1/zammad/processing-logs/route.ts new file mode 100644 index 0000000..d5aa2d0 --- /dev/null +++ b/frontend/src/app/api/v1/zammad/processing-logs/route.ts @@ -0,0 +1,51 @@ +import { NextRequest, NextResponse } from 'next/server' + +export async function GET(request: NextRequest) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + // Get query parameters + const { searchParams } = new URL(request.url) + const limit = searchParams.get('limit') + const offset = searchParams.get('offset') + + // Build query string + const queryParams = new URLSearchParams() + if (limit) queryParams.set('limit', limit) + if (offset) queryParams.set('offset', offset) + + // Make request to backend Zammad processing-logs endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/processing-logs?${queryParams.toString()}` + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + } + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error fetching Zammad processing logs:', error) + return NextResponse.json( + { error: 'Failed to fetch Zammad processing logs' }, + { status: 500 } + ) + } +} \ No newline at end of file diff --git a/frontend/src/app/api/v1/zammad/status/route.ts b/frontend/src/app/api/v1/zammad/status/route.ts new file mode 100644 index 0000000..318aa5f --- /dev/null +++ b/frontend/src/app/api/v1/zammad/status/route.ts @@ -0,0 +1,41 @@ +import { NextRequest, NextResponse } from 'next/server' + +export async function GET(request: NextRequest) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + // Make request to backend Zammad status endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/status` + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + } + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error fetching Zammad status:', error) + return NextResponse.json( + { error: 'Failed to fetch Zammad status' }, + { status: 500 } + ) + } +} \ No newline at end of file diff --git a/frontend/src/app/api/v1/zammad/test-connection/route.ts b/frontend/src/app/api/v1/zammad/test-connection/route.ts new file mode 100644 index 0000000..7990c12 --- /dev/null +++ b/frontend/src/app/api/v1/zammad/test-connection/route.ts @@ -0,0 +1,44 @@ +import { NextRequest, NextResponse } from 'next/server' + +export async function POST(request: NextRequest) { + try { + // Extract authorization header from the incoming request + const authHeader = request.headers.get('authorization') + + if (!authHeader) { + return NextResponse.json( + { error: 'Authorization header required' }, + { status: 401 } + ) + } + + const body = await request.json() + + // Make request to backend Zammad test-connection endpoint + const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL + const url = `${baseUrl}/api/v1/zammad/test-connection` + + const response = await fetch(url, { + method: 'POST', + headers: { + 'Authorization': authHeader, + 'Content-Type': 'application/json' + }, + body: JSON.stringify(body) + }) + + const data = await response.json() + + if (!response.ok) { + return NextResponse.json(data, { status: response.status }) + } + + return NextResponse.json(data) + } catch (error) { + console.error('Error testing Zammad connection:', error) + return NextResponse.json( + { error: 'Failed to test Zammad connection' }, + { status: 500 } + ) + } +} \ No newline at end of file diff --git a/frontend/src/app/modules/page.tsx b/frontend/src/app/modules/page.tsx index 9cfc15c..52cc030 100644 --- a/frontend/src/app/modules/page.tsx +++ b/frontend/src/app/modules/page.tsx @@ -317,6 +317,16 @@ function ModulesPageContent() { Configure )} + {module.name === 'zammad' && ( + + )} {module.health && ( diff --git a/frontend/src/app/zammad/page.tsx b/frontend/src/app/zammad/page.tsx new file mode 100644 index 0000000..13e4168 --- /dev/null +++ b/frontend/src/app/zammad/page.tsx @@ -0,0 +1,14 @@ +"use client" + +import { ProtectedRoute } from "@/components/auth/ProtectedRoute" +import { ZammadConfig } from "@/components/modules/ZammadConfig" + +export default function ZammadPage() { + return ( + +
+ +
+
+ ) +} \ No newline at end of file diff --git a/frontend/src/components/modules/ZammadConfig.tsx b/frontend/src/components/modules/ZammadConfig.tsx new file mode 100644 index 0000000..0581a09 --- /dev/null +++ b/frontend/src/components/modules/ZammadConfig.tsx @@ -0,0 +1,795 @@ +"use client" + +import { useState, useEffect } from "react" +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" +import { Button } from "@/components/ui/button" +import { Input } from "@/components/ui/input" +import { Label } from "@/components/ui/label" +import { Switch } from "@/components/ui/switch" +import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select" +import { Textarea } from "@/components/ui/textarea" +import { Badge } from "@/components/ui/badge" +import { Alert, AlertDescription } from "@/components/ui/alert" +import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs" +import { Dialog, DialogContent, DialogDescription, DialogFooter, DialogHeader, DialogTitle, DialogTrigger } from "@/components/ui/dialog" +import { useToast } from "@/hooks/use-toast" +import { + Settings, + Save, + RefreshCw, + Ticket, + Bot, + Plus, + Edit, + Trash2, + TestTube, + Play, + History, + CheckCircle, + XCircle, + AlertTriangle, + Clock +} from "lucide-react" + +interface ZammadConfiguration { + id?: number + name: string + description?: string + is_default: boolean + zammad_url: string + api_token: string + chatbot_id: string + process_state: string + max_tickets: number + skip_existing: boolean + auto_process: boolean + process_interval: number + summary_template?: string + custom_settings?: Record + created_at?: string + updated_at?: string + last_used_at?: string +} + +interface Chatbot { + id: string + name: string + chatbot_type: string + model: string + description?: string +} + +interface ProcessingLog { + id: number + batch_id: string + started_at: string + completed_at?: string + tickets_found: number + tickets_processed: number + tickets_failed: number + tickets_skipped: number + processing_time_seconds?: number + status: string +} + +interface ModuleStatus { + module_health: { + status: string + message: string + uptime: number + } + statistics: { + total_tickets: number + processed_tickets: number + failed_tickets: number + success_rate: number + } +} + +export function ZammadConfig() { + const { toast } = useToast() + const [configurations, setConfigurations] = useState([]) + const [chatbots, setChatbots] = useState([]) + const [processingLogs, setProcessingLogs] = useState([]) + const [moduleStatus, setModuleStatus] = useState(null) + const [loading, setLoading] = useState(true) + const [saving, setSaving] = useState(false) + const [testingConnection, setTestingConnection] = useState(false) + const [processing, setProcessing] = useState(false) + + // Form states + const [isDialogOpen, setIsDialogOpen] = useState(false) + const [editingConfig, setEditingConfig] = useState(null) + const [newConfig, setNewConfig] = useState>({ + name: "", + description: "", + is_default: false, + zammad_url: "http://localhost:8080", + api_token: "", + chatbot_id: "", + process_state: "open", + max_tickets: 10, + skip_existing: true, + auto_process: false, + process_interval: 30, + summary_template: "Generate a concise summary of this support ticket including key issues, customer concerns, and any actions taken." + }) + + useEffect(() => { + fetchData() + }, []) + + const fetchData = async () => { + try { + setLoading(true) + await Promise.all([ + fetchConfigurations(), + fetchChatbots(), + fetchProcessingLogs(), + fetchModuleStatus() + ]) + } catch (error) { + console.error("Error fetching Zammad data:", error) + toast({ + title: "Error", + description: "Failed to load Zammad configuration", + variant: "destructive" + }) + } finally { + setLoading(false) + } + } + + const fetchConfigurations = async () => { + const response = await fetch("/api/v1/zammad/configurations", { + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + } + }) + if (response.ok) { + const data = await response.json() + setConfigurations(data.configurations || []) + } + } + + const fetchChatbots = async () => { + const response = await fetch("/api/v1/zammad/chatbots", { + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + } + }) + if (response.ok) { + const data = await response.json() + setChatbots(data.chatbots || []) + } + } + + const fetchProcessingLogs = async () => { + const response = await fetch("/api/v1/zammad/processing-logs?limit=5", { + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + } + }) + if (response.ok) { + const data = await response.json() + setProcessingLogs(data.logs || []) + } + } + + const fetchModuleStatus = async () => { + const response = await fetch("/api/v1/zammad/status", { + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + } + }) + if (response.ok) { + const data = await response.json() + setModuleStatus(data) + } + } + + const handleSaveConfiguration = async () => { + try { + setSaving(true) + + const url = editingConfig + ? `/api/v1/zammad/configurations/${editingConfig.id}` + : "/api/v1/zammad/configurations" + + const method = editingConfig ? "PUT" : "POST" + + const response = await fetch(url, { + method, + headers: { + "Content-Type": "application/json", + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + }, + body: JSON.stringify(newConfig) + }) + + if (!response.ok) { + const errorData = await response.json() + throw new Error(errorData.detail || "Failed to save configuration") + } + + toast({ + title: "Success", + description: editingConfig + ? "Configuration updated successfully" + : "Configuration created successfully" + }) + + setIsDialogOpen(false) + setEditingConfig(null) + setNewConfig({ + name: "", + description: "", + is_default: false, + zammad_url: "http://localhost:8080", + api_token: "", + chatbot_id: "", + process_state: "open", + max_tickets: 10, + skip_existing: true, + auto_process: false, + process_interval: 30, + summary_template: "Generate a concise summary of this support ticket including key issues, customer concerns, and any actions taken." + }) + + await fetchConfigurations() + } catch (error) { + console.error("Error saving configuration:", error) + toast({ + title: "Error", + description: error instanceof Error ? error.message : "Failed to save configuration", + variant: "destructive" + }) + } finally { + setSaving(false) + } + } + + const handleTestConnection = async () => { + if (!newConfig.zammad_url || !newConfig.api_token) { + toast({ + title: "Error", + description: "Please enter Zammad URL and API token", + variant: "destructive" + }) + return + } + + try { + setTestingConnection(true) + + const response = await fetch("/api/v1/zammad/test-connection", { + method: "POST", + headers: { + "Content-Type": "application/json", + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + }, + body: JSON.stringify({ + zammad_url: newConfig.zammad_url, + api_token: newConfig.api_token + }) + }) + + const data = await response.json() + console.log("Test connection response:", data) + + if (data.status === "success") { + toast({ + title: "โœ… Connection Successful", + description: `Connected to Zammad as ${data.user}`, + duration: 5000 + }) + } else { + toast({ + title: "โŒ Connection Failed", + description: data.message || "Unknown error occurred", + variant: "destructive", + duration: 8000 + }) + } + } catch (error) { + console.error("Error testing connection:", error) + toast({ + title: "โš ๏ธ Connection Test Error", + description: `Failed to test connection: ${error instanceof Error ? error.message : 'Unknown error'}`, + variant: "destructive", + duration: 8000 + }) + } finally { + setTestingConnection(false) + } + } + + const handleProcessTickets = async (configId?: number) => { + try { + setProcessing(true) + + const response = await fetch("/api/v1/zammad/process", { + method: "POST", + headers: { + "Content-Type": "application/json", + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + }, + body: JSON.stringify({ + config_id: configId, + filters: {} + }) + }) + + if (!response.ok) { + const errorData = await response.json() + throw new Error(errorData.detail || "Failed to start processing") + } + + const data = await response.json() + + toast({ + title: "Processing Started", + description: data.message || "Ticket processing has been started" + }) + + // Refresh logs after a short delay + setTimeout(() => { + fetchProcessingLogs() + fetchModuleStatus() + }, 2000) + } catch (error) { + console.error("Error processing tickets:", error) + toast({ + title: "Error", + description: error instanceof Error ? error.message : "Failed to process tickets", + variant: "destructive" + }) + } finally { + setProcessing(false) + } + } + + const handleDeleteConfiguration = async (id: number) => { + try { + const response = await fetch(`/api/v1/zammad/configurations/${id}`, { + method: "DELETE", + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + } + }) + + if (!response.ok) { + throw new Error("Failed to delete configuration") + } + + toast({ + title: "Success", + description: "Configuration deleted successfully" + }) + + await fetchConfigurations() + } catch (error) { + console.error("Error deleting configuration:", error) + toast({ + title: "Error", + description: "Failed to delete configuration", + variant: "destructive" + }) + } + } + + const handleEditConfiguration = (config: ZammadConfiguration) => { + setEditingConfig(config) + setNewConfig({ + ...config, + api_token: "" // Don't pre-fill the API token for security + }) + setIsDialogOpen(true) + } + + const getStatusIcon = (status: string) => { + switch (status) { + case "completed": + return + case "failed": + return + case "running": + return + default: + return + } + } + + const getStatusBadge = (status: string) => { + const variants: Record = { + completed: "default", + failed: "destructive", + running: "secondary" + } + return {status} + } + + if (loading) { + return ( +
+
+
+
+
+ ) + } + + return ( +
+ {/* Header */} +
+
+

+ + Zammad Integration +

+

+ AI-powered ticket summarization for Zammad ticketing systems +

+
+
+ + + + + + + + + {editingConfig ? "Edit Configuration" : "Add Zammad Configuration"} + + + Configure connection to your Zammad instance and processing settings + + + +
+ {/* Basic Settings */} +
+
+ + setNewConfig({ ...newConfig, name: e.target.value })} + placeholder="My Zammad Instance" + /> +
+
+ setNewConfig({ ...newConfig, is_default: checked })} + /> + +
+
+ +
+ + setNewConfig({ ...newConfig, description: e.target.value })} + placeholder="Optional description" + /> +
+ + {/* Zammad Connection */} +
+

Zammad Connection

+
+
+ + setNewConfig({ ...newConfig, zammad_url: e.target.value })} + placeholder="http://localhost:8080" + /> +
+
+ + setNewConfig({ ...newConfig, api_token: e.target.value })} + placeholder="Your Zammad API token" + /> +
+
+ +
+ + {/* Chatbot Selection */} +
+ + +
+ + {/* Processing Settings */} +
+

Processing Settings

+
+
+ + +
+
+ + setNewConfig({ ...newConfig, max_tickets: parseInt(e.target.value) })} + /> +
+
+ +
+
+ setNewConfig({ ...newConfig, skip_existing: checked })} + /> + +
+
+ setNewConfig({ ...newConfig, auto_process: checked })} + /> + +
+
+ + {newConfig.auto_process && ( +
+ + setNewConfig({ ...newConfig, process_interval: parseInt(e.target.value) })} + /> +
+ )} +
+ + {/* Summary Template */} +
+ +