zammad working

This commit is contained in:
2025-08-20 20:39:20 +02:00
parent d3440ccb1b
commit be581b28f8
32 changed files with 4201 additions and 714 deletions

View File

@@ -12,6 +12,7 @@ WORKDIR /app
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
postgresql-client \
curl \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
@@ -20,12 +21,17 @@ RUN apt-get update && apt-get install -y \
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Download spaCy English model for NLP processing
RUN python -m spacy download en_core_web_sm
# Optional: Download spaCy English model for NLP processing (commented out for faster builds)
# Uncomment if you install requirements-nlp.txt and need entity extraction
# RUN python -m spacy download en_core_web_sm
# Copy application code
COPY . .
# Copy and make migration script executable
COPY scripts/migrate.sh /usr/local/bin/migrate.sh
RUN chmod +x /usr/local/bin/migrate.sh
# Create logs directory
RUN mkdir -p logs

View File

@@ -0,0 +1,120 @@
"""add_zammad_integration_tables
Revision ID: 9645f764a517
Revises: 010_add_workflow_tables_only
Create Date: 2025-08-19 19:55:18.895986
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '9645f764a517'
down_revision = '010_add_workflow_tables_only'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create zammad_tickets table
op.create_table(
'zammad_tickets',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('zammad_ticket_id', sa.Integer(), nullable=False),
sa.Column('ticket_number', sa.String(), nullable=False),
sa.Column('title', sa.String(), nullable=False),
sa.Column('state', sa.String(), nullable=False),
sa.Column('priority', sa.String(), nullable=True),
sa.Column('customer_email', sa.String(), nullable=True),
sa.Column('processing_status', sa.String(), nullable=False),
sa.Column('processed_at', sa.DateTime(), nullable=True),
sa.Column('processed_by_user_id', sa.Integer(), nullable=True),
sa.Column('chatbot_id', sa.String(), nullable=True),
sa.Column('summary', sa.Text(), nullable=True),
sa.Column('context_data', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('zammad_created_at', sa.DateTime(), nullable=True),
sa.Column('zammad_updated_at', sa.DateTime(), nullable=True),
sa.Column('zammad_article_count', sa.Integer(), nullable=False),
sa.Column('config_snapshot', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['processed_by_user_id'], ['users.id'], ),
)
# Create indexes for zammad_tickets
op.create_index('idx_zammad_tickets_status_created', 'zammad_tickets', ['processing_status', 'created_at'])
op.create_index('idx_zammad_tickets_state_processed', 'zammad_tickets', ['state', 'processed_at'])
op.create_index('idx_zammad_tickets_user_status', 'zammad_tickets', ['processed_by_user_id', 'processing_status'])
op.create_index(op.f('ix_zammad_tickets_id'), 'zammad_tickets', ['id'])
op.create_index(op.f('ix_zammad_tickets_ticket_number'), 'zammad_tickets', ['ticket_number'])
op.create_index(op.f('ix_zammad_tickets_zammad_ticket_id'), 'zammad_tickets', ['zammad_ticket_id'], unique=True)
# Create zammad_processing_logs table
op.create_table(
'zammad_processing_logs',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('batch_id', sa.String(), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('completed_at', sa.DateTime(), nullable=True),
sa.Column('initiated_by_user_id', sa.Integer(), nullable=True),
sa.Column('config_used', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column('filters_applied', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column('tickets_found', sa.Integer(), nullable=False),
sa.Column('tickets_processed', sa.Integer(), nullable=False),
sa.Column('tickets_failed', sa.Integer(), nullable=False),
sa.Column('tickets_skipped', sa.Integer(), nullable=False),
sa.Column('processing_time_seconds', sa.Integer(), nullable=True),
sa.Column('average_time_per_ticket', sa.Integer(), nullable=True),
sa.Column('errors_encountered', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column('status', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['initiated_by_user_id'], ['users.id'], ),
)
# Create indexes for zammad_processing_logs
op.create_index('idx_processing_logs_batch_status', 'zammad_processing_logs', ['batch_id', 'status'])
op.create_index('idx_processing_logs_user_started', 'zammad_processing_logs', ['initiated_by_user_id', 'started_at'])
op.create_index(op.f('ix_zammad_processing_logs_batch_id'), 'zammad_processing_logs', ['batch_id'])
op.create_index(op.f('ix_zammad_processing_logs_id'), 'zammad_processing_logs', ['id'])
# Create zammad_configurations table
op.create_table(
'zammad_configurations',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('is_default', sa.Boolean(), nullable=False),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.Column('zammad_url', sa.String(), nullable=False),
sa.Column('api_token_encrypted', sa.String(), nullable=False),
sa.Column('chatbot_id', sa.String(), nullable=False),
sa.Column('process_state', sa.String(), nullable=False),
sa.Column('max_tickets', sa.Integer(), nullable=False),
sa.Column('skip_existing', sa.Boolean(), nullable=False),
sa.Column('auto_process', sa.Boolean(), nullable=False),
sa.Column('process_interval', sa.Integer(), nullable=False),
sa.Column('summary_template', sa.Text(), nullable=True),
sa.Column('custom_settings', postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('last_used_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
)
# Create indexes for zammad_configurations
op.create_index('idx_zammad_config_user_active', 'zammad_configurations', ['user_id', 'is_active'])
op.create_index('idx_zammad_config_user_default', 'zammad_configurations', ['user_id', 'is_default'])
op.create_index(op.f('ix_zammad_configurations_id'), 'zammad_configurations', ['id'])
def downgrade() -> None:
# Drop tables in reverse order
op.drop_table('zammad_configurations')
op.drop_table('zammad_processing_logs')
op.drop_table('zammad_tickets')

View File

@@ -18,6 +18,7 @@ from .rag import router as rag_router
from .chatbot import router as chatbot_router
from .prompt_templates import router as prompt_templates_router
from .security import router as security_router
from .zammad import router as zammad_router
# Create main API router
api_router = APIRouter()
@@ -66,3 +67,6 @@ api_router.include_router(prompt_templates_router, prefix="/prompt-templates", t
# Include security routes
api_router.include_router(security_router, prefix="/security", tags=["security"])
# Include Zammad integration routes
api_router.include_router(zammad_router, prefix="/zammad", tags=["zammad"])

View File

@@ -20,13 +20,26 @@ async def list_modules():
modules = []
for module_info in all_modules:
# Convert module_info to API format
# Convert module_info to API format with status field
name = module_info["name"]
is_loaded = module_info["loaded"] # Module is actually loaded in memory
is_enabled = module_info["enabled"] # Module is enabled in config
# Determine status based on enabled + loaded state
if is_enabled and is_loaded:
status = "running"
elif is_enabled and not is_loaded:
status = "error" # Enabled but failed to load
else: # not is_enabled (regardless of loaded state)
status = "standby" # Disabled
api_module = {
"name": module_info["name"],
"name": name,
"version": module_info["version"],
"description": module_info["description"],
"initialized": module_info["loaded"],
"enabled": module_info["enabled"]
"initialized": is_loaded,
"enabled": is_enabled,
"status": status # Add status field for frontend compatibility
}
# Get module statistics if available and module is loaded
@@ -58,31 +71,64 @@ async def list_modules():
@router.get("/status")
async def get_modules_status():
"""Get summary status of all modules"""
"""Get comprehensive module status - CONSOLIDATED endpoint"""
log_api_request("get_modules_status", {})
total_modules = len(module_manager.modules)
running_modules = 0
standby_modules = 0
failed_modules = 0
# Get all discovered modules including disabled ones
all_modules = module_manager.list_all_modules()
for name, module in module_manager.modules.items():
config = module_manager.module_configs.get(name)
is_initialized = getattr(module, "initialized", False)
is_enabled = config.enabled if config else True
modules_with_status = []
running_count = 0
standby_count = 0
failed_count = 0
if is_initialized and is_enabled:
running_modules += 1
elif not is_initialized:
failed_modules += 1
else:
standby_modules += 1
for module_info in all_modules:
name = module_info["name"]
is_loaded = module_info["loaded"] # Module is actually loaded in memory
is_enabled = module_info["enabled"] # Module is enabled in config
# Determine status based on enabled + loaded state
if is_enabled and is_loaded:
status = "running"
running_count += 1
elif is_enabled and not is_loaded:
status = "failed" # Enabled but failed to load
failed_count += 1
else: # not is_enabled (regardless of loaded state)
status = "standby" # Disabled
standby_count += 1
# Get module statistics if available and loaded
stats = {}
if is_loaded and name in module_manager.modules:
module_instance = module_manager.modules[name]
if hasattr(module_instance, "get_stats"):
try:
import asyncio
if asyncio.iscoroutinefunction(module_instance.get_stats):
stats_result = await module_instance.get_stats()
else:
stats_result = module_instance.get_stats()
stats = stats_result.__dict__ if hasattr(stats_result, "__dict__") else stats_result
except:
stats = {}
modules_with_status.append({
"name": name,
"version": module_info["version"],
"description": module_info["description"],
"status": status,
"enabled": is_enabled,
"loaded": is_loaded,
"stats": stats
})
return {
"total": total_modules,
"running": running_modules,
"standby": standby_modules,
"failed": failed_modules,
"modules": modules_with_status,
"total": len(modules_with_status),
"running": running_count,
"standby": standby_count,
"failed": failed_count,
"system_initialized": module_manager.initialized
}

View File

@@ -0,0 +1,664 @@
"""
Zammad Integration API endpoints
"""
import asyncio
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel, validator
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_, desc
from datetime import datetime
from app.db.database import get_db
from app.core.logging import log_api_request
from app.services.module_manager import module_manager
from app.core.security import get_current_user
from app.models.user import User
from app.services.api_key_auth import get_api_key_auth
from app.models.api_key import APIKey
from app.models.chatbot import ChatbotInstance
# Import Zammad models
from modules.zammad.models import (
ZammadTicket,
ZammadProcessingLog,
ZammadConfiguration,
ProcessingStatus
)
router = APIRouter()
class ZammadConfigurationRequest(BaseModel):
"""Request model for creating/updating Zammad configuration"""
name: str
description: Optional[str] = None
is_default: bool = False
zammad_url: str
api_token: str
chatbot_id: str
process_state: str = "open"
max_tickets: int = 10
skip_existing: bool = True
auto_process: bool = False
process_interval: int = 30
summary_template: Optional[str] = None
custom_settings: Optional[Dict[str, Any]] = {}
@validator('zammad_url')
def validate_zammad_url(cls, v):
if not v.startswith(('http://', 'https://')):
raise ValueError('Zammad URL must start with http:// or https://')
return v.rstrip('/')
@validator('max_tickets')
def validate_max_tickets(cls, v):
if not 1 <= v <= 100:
raise ValueError('max_tickets must be between 1 and 100')
return v
@validator('process_interval')
def validate_process_interval(cls, v):
if not 5 <= v <= 1440:
raise ValueError('process_interval must be between 5 and 1440 minutes')
return v
class ProcessTicketsRequest(BaseModel):
"""Request model for processing tickets"""
config_id: Optional[int] = None
filters: Dict[str, Any] = {}
@validator('filters', pre=True)
def validate_filters(cls, v):
"""Ensure filters is always a dict"""
if v is None:
return {}
if isinstance(v, list):
# If someone passes a list, convert to empty dict
return {}
if not isinstance(v, dict):
# If it's some other type, convert to empty dict
return {}
return v
class ProcessSingleTicketRequest(BaseModel):
"""Request model for processing a single ticket"""
ticket_id: int
config_id: Optional[int] = None
class TestConnectionRequest(BaseModel):
"""Request model for testing Zammad connection"""
zammad_url: str
api_token: str
@router.get("/configurations")
async def get_configurations(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Get all Zammad configurations for the current user"""
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
try:
# Get configurations from database
stmt = (
select(ZammadConfiguration)
.where(ZammadConfiguration.user_id == user_id)
.where(ZammadConfiguration.is_active == True)
.order_by(ZammadConfiguration.is_default.desc(), ZammadConfiguration.created_at.desc())
)
result = await db.execute(stmt)
configurations = [config.to_dict() for config in result.scalars()]
return {
"configurations": configurations,
"count": len(configurations)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching configurations: {str(e)}")
@router.post("/configurations")
async def create_configuration(
config_request: ZammadConfigurationRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Create a new Zammad configuration"""
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
try:
# Verify chatbot exists and user has access
chatbot_stmt = select(ChatbotInstance).where(
and_(
ChatbotInstance.id == config_request.chatbot_id,
ChatbotInstance.created_by == str(user_id),
ChatbotInstance.is_active == True
)
)
chatbot = await db.scalar(chatbot_stmt)
if not chatbot:
raise HTTPException(status_code=404, detail="Chatbot not found or access denied")
# Use the module to handle configuration creation
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
request_data = {
"action": "save_configuration",
"configuration": config_request.dict()
}
context = {
"user_id": user_id,
"user_permissions": current_user.get("permissions", [])
}
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error creating configuration: {str(e)}")
@router.put("/configurations/{config_id}")
async def update_configuration(
config_id: int,
config_request: ZammadConfigurationRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Update an existing Zammad configuration"""
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
try:
# Check if configuration exists and belongs to user
stmt = select(ZammadConfiguration).where(
and_(
ZammadConfiguration.id == config_id,
ZammadConfiguration.user_id == user_id
)
)
existing_config = await db.scalar(stmt)
if not existing_config:
raise HTTPException(status_code=404, detail="Configuration not found")
# Verify chatbot exists and user has access
chatbot_stmt = select(ChatbotInstance).where(
and_(
ChatbotInstance.id == config_request.chatbot_id,
ChatbotInstance.created_by == str(user_id),
ChatbotInstance.is_active == True
)
)
chatbot = await db.scalar(chatbot_stmt)
if not chatbot:
raise HTTPException(status_code=404, detail="Chatbot not found or access denied")
# Deactivate old configuration and create new one (for audit trail)
existing_config.is_active = False
# Use the module to handle configuration creation
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
request_data = {
"action": "save_configuration",
"configuration": config_request.dict()
}
context = {
"user_id": user_id,
"user_permissions": current_user.get("permissions", [])
}
result = await zammad_module.execute_with_interceptors(request_data, context)
await db.commit()
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error updating configuration: {str(e)}")
@router.delete("/configurations/{config_id}")
async def delete_configuration(
config_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Delete (deactivate) a Zammad configuration"""
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
try:
# Check if configuration exists and belongs to user
stmt = select(ZammadConfiguration).where(
and_(
ZammadConfiguration.id == config_id,
ZammadConfiguration.user_id == user_id
)
)
config = await db.scalar(stmt)
if not config:
raise HTTPException(status_code=404, detail="Configuration not found")
# Deactivate instead of deleting (for audit trail)
config.is_active = False
config.updated_at = datetime.utcnow()
await db.commit()
return {"message": "Configuration deleted successfully"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error deleting configuration: {str(e)}")
@router.post("/test-connection")
async def test_connection(
test_request: TestConnectionRequest,
current_user: User = Depends(get_current_user)
):
"""Test connection to a Zammad instance"""
try:
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
request_data = {
"action": "test_connection",
"zammad_url": test_request.zammad_url,
"api_token": test_request.api_token
}
context = {
"user_id": user_id,
"user_permissions": current_user.get("permissions", [])
}
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error testing connection: {str(e)}")
@router.post("/process")
async def process_tickets(
process_request: ProcessTicketsRequest,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user)
):
"""Process tickets for summarization"""
try:
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
# Debug logging to identify the issue
import logging
logger = logging.getLogger(__name__)
logger.info(f"Process request filters type: {type(process_request.filters)}")
logger.info(f"Process request filters value: {process_request.filters}")
# Ensure filters is a dict
filters = process_request.filters if process_request.filters is not None else {}
if not isinstance(filters, dict):
logger.error(f"Filters is not a dict: {type(filters)} = {filters}")
filters = {}
request_data = {
"action": "process_tickets",
"config_id": process_request.config_id,
"filters": filters
}
context = {
"user_id": user_id,
"user_permissions": current_user.get("permissions", [])
}
# Execute processing in background for large batches
if filters.get("limit", 10) > 5:
# Start background task
background_tasks.add_task(
_process_tickets_background,
zammad_module,
request_data,
context
)
return {
"message": "Processing started in background",
"status": "started"
}
else:
# Process immediately for small batches
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error starting ticket processing: {str(e)}")
@router.post("/tickets/{ticket_id}/process")
async def process_single_ticket(
ticket_id: int,
process_request: ProcessSingleTicketRequest,
current_user: User = Depends(get_current_user)
):
"""Process a single ticket for summarization"""
try:
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
request_data = {
"action": "process_single_ticket",
"ticket_id": ticket_id,
"config_id": process_request.config_id
}
context = {
"user_id": user_id,
"user_permissions": current_user.get("permissions", [])
}
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing ticket: {str(e)}")
@router.get("/tickets/{ticket_id}/summary")
async def get_ticket_summary(
ticket_id: int,
current_user: User = Depends(get_current_user)
):
"""Get the AI summary for a specific ticket"""
try:
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
request_data = {
"action": "get_ticket_summary",
"ticket_id": ticket_id
}
context = {
"user_id": user_id,
"user_permissions": current_user.get("permissions", [])
}
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting ticket summary: {str(e)}")
@router.get("/tickets")
async def get_processed_tickets(
status: Optional[str] = None,
limit: int = 20,
offset: int = 0,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Get list of processed tickets"""
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
try:
# Build query
query = select(ZammadTicket).where(ZammadTicket.processed_by_user_id == user_id)
if status:
query = query.where(ZammadTicket.processing_status == status)
query = query.order_by(desc(ZammadTicket.processed_at))
query = query.offset(offset).limit(limit)
# Execute query
result = await db.execute(query)
tickets = [ticket.to_dict() for ticket in result.scalars()]
# Get total count
count_query = select(ZammadTicket).where(ZammadTicket.processed_by_user_id == user_id)
if status:
count_query = count_query.where(ZammadTicket.processing_status == status)
total_result = await db.execute(count_query)
total_count = len(list(total_result.scalars()))
return {
"tickets": tickets,
"total": total_count,
"limit": limit,
"offset": offset
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching tickets: {str(e)}")
@router.get("/status")
async def get_module_status(
current_user: User = Depends(get_current_user)
):
"""Get Zammad module status and statistics"""
try:
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
request_data = {
"action": "get_status"
}
context = {
"user_id": user_id,
"user_permissions": current_user.get("permissions", [])
}
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting module status: {str(e)}")
@router.get("/processing-logs")
async def get_processing_logs(
limit: int = 10,
offset: int = 0,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Get processing logs for the current user"""
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
try:
# Get processing logs
query = (
select(ZammadProcessingLog)
.where(ZammadProcessingLog.initiated_by_user_id == user_id)
.order_by(desc(ZammadProcessingLog.started_at))
.offset(offset)
.limit(limit)
)
result = await db.execute(query)
logs = [log.to_dict() for log in result.scalars()]
# Get total count
count_query = select(ZammadProcessingLog).where(
ZammadProcessingLog.initiated_by_user_id == user_id
)
total_result = await db.execute(count_query)
total_count = len(list(total_result.scalars()))
return {
"logs": logs,
"total": total_count,
"limit": limit,
"offset": offset
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching processing logs: {str(e)}")
@router.get("/chatbots")
async def get_available_chatbots(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""Get list of chatbots available for Zammad integration"""
user_id = current_user.get("id") if isinstance(current_user, dict) else current_user.id
try:
# Get user's active chatbots
stmt = (
select(ChatbotInstance)
.where(ChatbotInstance.created_by == str(user_id))
.where(ChatbotInstance.is_active == True)
.order_by(ChatbotInstance.name)
)
result = await db.execute(stmt)
chatbots = []
for chatbot in result.scalars():
# Extract chatbot_type from config JSON or provide default
config = chatbot.config or {}
chatbot_type = config.get('chatbot_type', 'general')
model = config.get('model', 'Unknown')
chatbots.append({
"id": chatbot.id,
"name": chatbot.name,
"chatbot_type": chatbot_type,
"model": model,
"description": chatbot.description or ''
})
return {
"chatbots": chatbots,
"count": len(chatbots)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching chatbots: {str(e)}")
async def _process_tickets_background(zammad_module, request_data: Dict[str, Any], context: Dict[str, Any]):
"""Background task for processing tickets"""
try:
import logging
logger = logging.getLogger(__name__)
logger.info(f"Starting background ticket processing with request_data: {request_data}")
logger.info(f"Context: {context}")
await zammad_module.execute_with_interceptors(request_data, context)
except Exception as e:
# Log error but don't raise - this is a background task
import logging
import traceback
logger = logging.getLogger(__name__)
logger.error(f"Background ticket processing failed: {e}")
logger.error(f"Full traceback: {traceback.format_exc()}")
# API key authentication endpoints (for programmatic access)
@router.post("/api-key/process", dependencies=[Depends(get_api_key_auth)])
async def api_process_tickets(
process_request: ProcessTicketsRequest,
api_key_context: Dict = Depends(get_api_key_auth)
):
"""Process tickets using API key authentication"""
try:
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
user_id = api_key_context["user_id"]
request_data = {
"action": "process_tickets",
"config_id": process_request.config_id,
"filters": process_request.filters
}
context = {
"user_id": user_id,
"api_key_id": api_key_context["api_key_id"],
"user_permissions": ["modules:*"] # API keys get full module access
}
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing tickets: {str(e)}")
@router.get("/api-key/status", dependencies=[Depends(get_api_key_auth)])
async def api_get_status(
api_key_context: Dict = Depends(get_api_key_auth)
):
"""Get module status using API key authentication"""
try:
zammad_module = module_manager.get_module("zammad")
if not zammad_module:
raise HTTPException(status_code=503, detail="Zammad module not available")
user_id = api_key_context["user_id"]
request_data = {
"action": "get_status"
}
context = {
"user_id": user_id,
"api_key_id": api_key_context["api_key_id"],
"user_permissions": ["modules:*"] # API keys get full module access
}
result = await zammad_module.execute_with_interceptors(request_data, context)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting status: {str(e)}")

322
backend/app/core/cache.py Normal file
View File

@@ -0,0 +1,322 @@
"""
Core Cache Service - Redis-based caching infrastructure
Consolidates all caching functionality into core system infrastructure
"""
import asyncio
import json
import logging
from typing import Any, Dict, Optional, Union
from datetime import datetime, timedelta
import redis.asyncio as redis
from redis.asyncio import Redis, ConnectionPool
from contextlib import asynccontextmanager
from app.core.config import settings
logger = logging.getLogger(__name__)
class CoreCacheService:
"""Core Redis-based cache service for system-wide caching"""
def __init__(self):
self.redis_pool: Optional[ConnectionPool] = None
self.redis_client: Optional[Redis] = None
self.enabled = False
self.stats = {
"hits": 0,
"misses": 0,
"errors": 0,
"total_requests": 0
}
async def initialize(self):
"""Initialize the core cache service with connection pool"""
try:
# Create Redis connection pool for better resource management
redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0')
self.redis_pool = ConnectionPool.from_url(
redis_url,
encoding="utf-8",
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
max_connections=20, # Shared pool for all cache operations
health_check_interval=30
)
self.redis_client = Redis(connection_pool=self.redis_pool)
# Test connection
await self.redis_client.ping()
self.enabled = True
logger.info("Core cache service initialized with Redis connection pool")
except Exception as e:
logger.error(f"Failed to initialize core cache service: {e}")
self.enabled = False
raise
async def cleanup(self):
"""Cleanup cache resources"""
if self.redis_client:
await self.redis_client.close()
self.redis_client = None
if self.redis_pool:
await self.redis_pool.disconnect()
self.redis_pool = None
self.enabled = False
logger.info("Core cache service cleaned up")
def _get_cache_key(self, key: str, prefix: str = "core") -> str:
"""Generate cache key with prefix"""
return f"{prefix}:{key}"
async def get(self, key: str, default: Any = None, prefix: str = "core") -> Any:
"""Get value from cache"""
if not self.enabled:
return default
try:
cache_key = self._get_cache_key(key, prefix)
value = await self.redis_client.get(cache_key)
if value is None:
self.stats["misses"] += 1
return default
self.stats["hits"] += 1
self.stats["total_requests"] += 1
# Try to deserialize JSON
try:
return json.loads(value)
except json.JSONDecodeError:
return value
except Exception as e:
logger.error(f"Cache get error for key {key}: {e}")
self.stats["errors"] += 1
return default
async def set(self, key: str, value: Any, ttl: Optional[int] = None, prefix: str = "core") -> bool:
"""Set value in cache"""
if not self.enabled:
return False
try:
cache_key = self._get_cache_key(key, prefix)
ttl = ttl or 3600 # Default 1 hour TTL
# Serialize complex objects as JSON
if isinstance(value, (dict, list, tuple)):
value = json.dumps(value)
await self.redis_client.setex(cache_key, ttl, value)
return True
except Exception as e:
logger.error(f"Cache set error for key {key}: {e}")
self.stats["errors"] += 1
return False
async def delete(self, key: str, prefix: str = "core") -> bool:
"""Delete key from cache"""
if not self.enabled:
return False
try:
cache_key = self._get_cache_key(key, prefix)
result = await self.redis_client.delete(cache_key)
return result > 0
except Exception as e:
logger.error(f"Cache delete error for key {key}: {e}")
self.stats["errors"] += 1
return False
async def exists(self, key: str, prefix: str = "core") -> bool:
"""Check if key exists in cache"""
if not self.enabled:
return False
try:
cache_key = self._get_cache_key(key, prefix)
return await self.redis_client.exists(cache_key) > 0
except Exception as e:
logger.error(f"Cache exists error for key {key}: {e}")
self.stats["errors"] += 1
return False
async def clear_pattern(self, pattern: str, prefix: str = "core") -> int:
"""Clear keys matching pattern"""
if not self.enabled:
return 0
try:
cache_pattern = self._get_cache_key(pattern, prefix)
keys = await self.redis_client.keys(cache_pattern)
if keys:
return await self.redis_client.delete(*keys)
return 0
except Exception as e:
logger.error(f"Cache clear pattern error for pattern {pattern}: {e}")
self.stats["errors"] += 1
return 0
async def increment(self, key: str, amount: int = 1, ttl: Optional[int] = None, prefix: str = "core") -> int:
"""Increment counter with optional TTL"""
if not self.enabled:
return 0
try:
cache_key = self._get_cache_key(key, prefix)
# Use pipeline for atomic increment + expire
async with self.redis_client.pipeline() as pipe:
await pipe.incr(cache_key, amount)
if ttl:
await pipe.expire(cache_key, ttl)
results = await pipe.execute()
return results[0]
except Exception as e:
logger.error(f"Cache increment error for key {key}: {e}")
self.stats["errors"] += 1
return 0
async def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive cache statistics"""
stats = self.stats.copy()
if self.enabled:
try:
info = await self.redis_client.info()
stats.update({
"redis_memory_used": info.get("used_memory_human", "N/A"),
"redis_connected_clients": info.get("connected_clients", 0),
"redis_total_commands": info.get("total_commands_processed", 0),
"redis_keyspace_hits": info.get("keyspace_hits", 0),
"redis_keyspace_misses": info.get("keyspace_misses", 0),
"connection_pool_size": self.redis_pool.connection_pool_size if self.redis_pool else 0,
"hit_rate": round(
(stats["hits"] / stats["total_requests"]) * 100, 2
) if stats["total_requests"] > 0 else 0,
"enabled": True
})
except Exception as e:
logger.error(f"Error getting Redis stats: {e}")
stats["enabled"] = False
else:
stats["enabled"] = False
return stats
@asynccontextmanager
async def pipeline(self):
"""Context manager for Redis pipeline operations"""
if not self.enabled:
yield None
return
async with self.redis_client.pipeline() as pipe:
yield pipe
# Specialized caching methods for common use cases
async def cache_api_key(self, key_prefix: str, api_key_data: Dict[str, Any], ttl: int = 300) -> bool:
"""Cache API key data for authentication"""
return await self.set(key_prefix, api_key_data, ttl, prefix="auth")
async def get_cached_api_key(self, key_prefix: str) -> Optional[Dict[str, Any]]:
"""Get cached API key data"""
return await self.get(key_prefix, prefix="auth")
async def invalidate_api_key(self, key_prefix: str) -> bool:
"""Invalidate cached API key"""
return await self.delete(key_prefix, prefix="auth")
async def cache_verification_result(self, api_key: str, key_prefix: str, key_hash: str, is_valid: bool, ttl: int = 300) -> bool:
"""Cache API key verification result to avoid expensive bcrypt operations"""
verification_data = {
"key_hash": key_hash,
"is_valid": is_valid,
"timestamp": datetime.utcnow().isoformat()
}
return await self.set(f"verify:{key_prefix}", verification_data, ttl, prefix="auth")
async def get_cached_verification(self, key_prefix: str) -> Optional[Dict[str, Any]]:
"""Get cached verification result"""
return await self.get(f"verify:{key_prefix}", prefix="auth")
async def cache_rate_limit(self, identifier: str, window_seconds: int, limit: int, current_count: int = 1) -> Dict[str, Any]:
"""Cache and track rate limit state"""
key = f"rate_limit:{identifier}:{window_seconds}"
try:
# Use atomic increment with expiry
count = await self.increment(key, current_count, window_seconds, prefix="rate")
remaining = max(0, limit - count)
reset_time = int((datetime.utcnow() + timedelta(seconds=window_seconds)).timestamp())
return {
"count": count,
"limit": limit,
"remaining": remaining,
"reset_time": reset_time,
"exceeded": count > limit
}
except Exception as e:
logger.error(f"Rate limit cache error: {e}")
# Return permissive defaults on cache failure
return {
"count": 0,
"limit": limit,
"remaining": limit,
"reset_time": int((datetime.utcnow() + timedelta(seconds=window_seconds)).timestamp()),
"exceeded": False
}
# Global core cache service instance
core_cache = CoreCacheService()
# Convenience functions for backward compatibility and ease of use
async def get(key: str, default: Any = None, prefix: str = "core") -> Any:
"""Get value from core cache"""
return await core_cache.get(key, default, prefix)
async def set(key: str, value: Any, ttl: Optional[int] = None, prefix: str = "core") -> bool:
"""Set value in core cache"""
return await core_cache.set(key, value, ttl, prefix)
async def delete(key: str, prefix: str = "core") -> bool:
"""Delete key from core cache"""
return await core_cache.delete(key, prefix)
async def exists(key: str, prefix: str = "core") -> bool:
"""Check if key exists in core cache"""
return await core_cache.exists(key, prefix)
async def clear_pattern(pattern: str, prefix: str = "core") -> int:
"""Clear keys matching pattern from core cache"""
return await core_cache.clear_pattern(pattern, prefix)
async def get_stats() -> Dict[str, Any]:
"""Get core cache statistics"""
return await core_cache.get_stats()

View File

@@ -103,8 +103,8 @@ async def init_db():
except ImportError:
logger.warning("Module model not available yet")
# Create all tables
await conn.run_sync(Base.metadata.create_all)
# Tables are now created via migration container - no need to create here
# await conn.run_sync(Base.metadata.create_all) # DISABLED - migrations handle this
# Create default admin user if no admin exists
await create_default_admin()

View File

@@ -38,6 +38,14 @@ async def lifespan(app: FastAPI):
"""
logger.info("Starting Enclava platform...")
# Initialize core cache service (before database to provide caching for auth)
from app.core.cache import core_cache
try:
await core_cache.initialize()
logger.info("Core cache service initialized successfully")
except Exception as e:
logger.warning(f"Core cache service initialization failed: {e}")
# Initialize database
await init_db()
@@ -70,6 +78,10 @@ async def lifespan(app: FastAPI):
# Cleanup
logger.info("Shutting down platform...")
# Close core cache service
from app.core.cache import core_cache
await core_cache.cleanup()
# Close Redis connection for cached API key service
from app.services.cached_api_key import cached_api_key_service
await cached_api_key_service.close()

View File

@@ -1,5 +1,5 @@
"""
Cached API Key Service
Cached API Key Service - Refactored to use Core Cache Infrastructure
High-performance Redis-based API key caching to reduce authentication overhead
from ~60ms to ~5ms by avoiding expensive bcrypt operations
"""
@@ -12,417 +12,236 @@ from sqlalchemy import select
from sqlalchemy.orm import joinedload
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.cache import core_cache
from app.core.config import settings
from app.core.security import verify_api_key
from app.models.api_key import APIKey
from app.models.user import User
# Check Redis availability at runtime, not import time
aioredis = None
REDIS_AVAILABLE = False
def _import_aioredis():
"""Import aioredis at runtime"""
global aioredis, REDIS_AVAILABLE
if aioredis is None:
try:
import aioredis as _aioredis
aioredis = _aioredis
REDIS_AVAILABLE = True
return True
except ImportError as e:
REDIS_AVAILABLE = False
return False
except Exception as e:
# Handle the Python 3.11 + aioredis 2.0.1 compatibility issue
REDIS_AVAILABLE = False
return False
return REDIS_AVAILABLE
logger = logging.getLogger(__name__)
class CachedAPIKeyService:
"""Redis-backed API key caching service for performance optimization with fallback to optimized database queries"""
"""Core cache-backed API key caching service for performance optimization"""
def __init__(self):
self.redis = None
self.cache_ttl = 300 # 5 minutes cache TTL
self.verification_cache_ttl = 3600 # 1 hour for verification results
self.redis_enabled = _import_aioredis()
if not self.redis_enabled:
logger.warning("Redis not available, falling back to optimized database queries only")
async def get_redis(self):
"""Get Redis connection, create if doesn't exist"""
if not self.redis_enabled or not REDIS_AVAILABLE:
return None
if not self.redis and aioredis:
try:
self.redis = aioredis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
# Test the connection
await self.redis.ping()
logger.info("Redis connection established for API key caching")
except Exception as e:
logger.warning(f"Redis connection failed, disabling cache: {e}")
self.redis_enabled = False
self.redis = None
return self.redis
logger.info("Cached API key service initialized with core cache backend")
async def close(self):
"""Close Redis connection"""
if self.redis and self.redis_enabled:
try:
await self.redis.close()
except Exception as e:
logger.warning(f"Error closing Redis connection: {e}")
def _get_cache_key(self, key_prefix: str) -> str:
"""Generate cache key for API key data"""
return f"api_key:data:{key_prefix}"
def _get_verification_cache_key(self, key_prefix: str, key_suffix_hash: str) -> str:
"""Generate cache key for API key verification results"""
return f"api_key:verified:{key_prefix}:{key_suffix_hash}"
def _get_last_used_cache_key(self, api_key_id: int) -> str:
"""Generate cache key for last used timestamp"""
return f"api_key:last_used:{api_key_id}"
async def _serialize_api_key_data(self, api_key: APIKey, user: User) -> str:
"""Serialize API key and user data for caching"""
data = {
# API Key data
"api_key_id": api_key.id,
"api_key_name": api_key.name,
"key_hash": api_key.key_hash,
"key_prefix": api_key.key_prefix,
"is_active": api_key.is_active,
"permissions": api_key.permissions,
"scopes": api_key.scopes,
"rate_limit_per_minute": api_key.rate_limit_per_minute,
"rate_limit_per_hour": api_key.rate_limit_per_hour,
"rate_limit_per_day": api_key.rate_limit_per_day,
"allowed_models": api_key.allowed_models,
"allowed_endpoints": api_key.allowed_endpoints,
"allowed_ips": api_key.allowed_ips,
"is_unlimited": api_key.is_unlimited,
"budget_limit_cents": api_key.budget_limit_cents,
"budget_type": api_key.budget_type,
"expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None,
"total_requests": api_key.total_requests,
"total_tokens": api_key.total_tokens,
"total_cost": api_key.total_cost,
# User data
"user_id": user.id,
"user_email": user.email,
"user_role": user.role,
"user_is_active": user.is_active,
# Cache metadata
"cached_at": datetime.utcnow().isoformat()
}
return json.dumps(data, default=str)
async def _deserialize_api_key_data(self, cached_data: str) -> Optional[Dict[str, Any]]:
"""Deserialize cached API key data"""
try:
data = json.loads(cached_data)
# Check if cached data is still valid
if data.get("expires_at"):
expires_at = datetime.fromisoformat(data["expires_at"])
if datetime.utcnow() > expires_at:
return None
# Reconstruct the context object expected by the rest of the system
context = {
"user_id": data["user_id"],
"user_email": data["user_email"],
"user_role": data["user_role"],
"api_key_id": data["api_key_id"],
"api_key_name": data["api_key_name"],
"permissions": data["permissions"],
"scopes": data["scopes"],
"rate_limits": {
"per_minute": data["rate_limit_per_minute"],
"per_hour": data["rate_limit_per_hour"],
"per_day": data["rate_limit_per_day"]
},
# Create minimal API key object with necessary attributes
"api_key": type("APIKey", (), {
"id": data["api_key_id"],
"name": data["api_key_name"],
"key_prefix": data["key_prefix"],
"is_active": data["is_active"],
"permissions": data["permissions"],
"scopes": data["scopes"],
"allowed_models": data["allowed_models"],
"allowed_endpoints": data["allowed_endpoints"],
"allowed_ips": data["allowed_ips"],
"is_unlimited": data["is_unlimited"],
"budget_limit_cents": data["budget_limit_cents"],
"budget_type": data["budget_type"],
"total_requests": data["total_requests"],
"total_tokens": data["total_tokens"],
"total_cost": data["total_cost"],
"expires_at": datetime.fromisoformat(data["expires_at"]) if data.get("expires_at") else None,
"can_access_model": lambda model: not data["allowed_models"] or model in data["allowed_models"],
"can_access_endpoint": lambda endpoint: not data["allowed_endpoints"] or endpoint in data["allowed_endpoints"],
"can_access_from_ip": lambda ip: not data["allowed_ips"] or ip in data["allowed_ips"],
"has_scope": lambda scope: scope in data["scopes"],
"is_valid": lambda: data["is_active"] and (not data.get("expires_at") or datetime.utcnow() <= datetime.fromisoformat(data["expires_at"])),
"update_usage": lambda tokens, cost: None # Handled separately for cache consistency
})(),
# Create minimal user object
"user": type("User", (), {
"id": data["user_id"],
"email": data["user_email"],
"role": data["user_role"],
"is_active": data["user_is_active"]
})()
}
return context
except Exception as e:
logger.warning(f"Failed to deserialize cached API key data: {e}")
return None
"""Close method for compatibility - core cache handles its own lifecycle"""
logger.info("Cached API key service close called - core cache handles lifecycle")
async def get_cached_api_key(self, key_prefix: str, db: AsyncSession) -> Optional[Dict[str, Any]]:
"""Get API key data from cache or database with optimized queries"""
"""
Get API key data from cache or database
Returns: Dictionary with api_key, user, and api_key_id
"""
try:
redis = await self.get_redis()
# Try cache first
cached_data = await core_cache.get_cached_api_key(key_prefix)
if cached_data:
logger.debug(f"API key cache hit for prefix: {key_prefix}")
# If Redis is available, try cache first
if redis:
cache_key = self._get_cache_key(key_prefix)
# Recreate APIKey object from cached data
api_key_data = cached_data.get("api_key_data", {})
user_data = cached_data.get("user_data", {})
# Try to get from cache first
cached_data = await redis.get(cache_key)
if cached_data:
logger.debug(f"API key cache hit for {key_prefix}")
context = await self._deserialize_api_key_data(cached_data)
if context:
return context
else:
# Invalid cached data, remove it
await redis.delete(cache_key)
# Create APIKey instance
api_key = APIKey(**api_key_data)
logger.debug(f"API key cache miss for {key_prefix}, fetching from database")
else:
logger.debug(f"Redis not available, fetching API key {key_prefix} from database with optimized query")
# Create User instance
user = User(**user_data)
# Cache miss or Redis not available - fetch from database with optimized query
context = await self._fetch_from_database(key_prefix, db)
return {
"api_key": api_key,
"user": user,
"api_key_id": api_key_data.get("id")
}
# If Redis is available and we have data, cache it
if context and redis:
try:
api_key = context["api_key"]
user = context["user"]
logger.debug(f"API key cache miss for prefix: {key_prefix}, fetching from database")
# Reconstruct full objects for serialization
full_api_key = await self._get_full_api_key_from_db(key_prefix, db)
if full_api_key:
cached_data = await self._serialize_api_key_data(full_api_key, user)
await redis.setex(cache_key, self.cache_ttl, cached_data)
logger.debug(f"Cached API key data for {key_prefix}")
except Exception as cache_error:
logger.warning(f"Failed to cache API key data: {cache_error}")
# Don't fail the request if caching fails
# Cache miss - fetch from database with optimized query
stmt = (
select(APIKey, User)
.join(User, APIKey.user_id == User.id)
.options(
joinedload(APIKey.user),
joinedload(User.api_keys)
)
.where(APIKey.key_prefix == key_prefix)
.where(APIKey.is_active == True)
)
return context
result = await db.execute(stmt)
api_key_user = result.first()
if not api_key_user:
logger.debug(f"API key not found in database for prefix: {key_prefix}")
return None
api_key, user = api_key_user
# Cache for future requests
await self._cache_api_key_data(key_prefix, api_key, user)
return {
"api_key": api_key,
"user": user,
"api_key_id": api_key.id
}
except Exception as e:
logger.error(f"Error in cached API key lookup for {key_prefix}: {e}")
# Fallback to database
return await self._fetch_from_database(key_prefix, db)
logger.error(f"Error retrieving API key for prefix {key_prefix}: {e}")
return None
async def _get_full_api_key_from_db(self, key_prefix: str, db: AsyncSession) -> Optional[APIKey]:
"""Helper to get full API key object from database"""
stmt = select(APIKey).where(APIKey.key_prefix == key_prefix)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def _fetch_from_database(self, key_prefix: str, db: AsyncSession) -> Optional[Dict[str, Any]]:
"""Fetch API key and user data from database with optimized query"""
async def _cache_api_key_data(self, key_prefix: str, api_key: APIKey, user: User):
"""Cache API key and user data"""
try:
# Optimized query with joinedload to eliminate N+1 query problem
stmt = select(APIKey).options(
joinedload(APIKey.user)
).where(APIKey.key_prefix == key_prefix)
# Serialize data for caching
cache_data = {
"api_key_data": {
"id": api_key.id,
"name": api_key.name,
"key_hash": api_key.key_hash,
"key_prefix": api_key.key_prefix,
"user_id": api_key.user_id,
"is_active": api_key.is_active,
"permissions": api_key.permissions,
"scopes": api_key.scopes,
"rate_limit_per_minute": api_key.rate_limit_per_minute,
"rate_limit_per_hour": api_key.rate_limit_per_hour,
"rate_limit_per_day": api_key.rate_limit_per_day,
"allowed_models": api_key.allowed_models,
"allowed_endpoints": api_key.allowed_endpoints,
"allowed_ips": api_key.allowed_ips,
"description": api_key.description,
"tags": api_key.tags,
"created_at": api_key.created_at.isoformat() if api_key.created_at else None,
"updated_at": api_key.updated_at.isoformat() if api_key.updated_at else None,
"last_used_at": api_key.last_used_at.isoformat() if api_key.last_used_at else None,
"expires_at": api_key.expires_at.isoformat() if api_key.expires_at else None,
"total_requests": api_key.total_requests,
"total_tokens": api_key.total_tokens,
"total_cost": api_key.total_cost,
"is_unlimited": api_key.is_unlimited,
"budget_limit_cents": api_key.budget_limit_cents,
"budget_type": api_key.budget_type,
"allowed_chatbots": api_key.allowed_chatbots
},
"user_data": {
"id": user.id,
"email": user.email,
"username": user.username,
"is_active": user.is_active,
"is_superuser": user.is_superuser,
"role": user.role,
"created_at": user.created_at.isoformat() if user.created_at else None,
"updated_at": user.updated_at.isoformat() if user.updated_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None
},
"cached_at": datetime.utcnow().isoformat()
}
await core_cache.cache_api_key(key_prefix, cache_data, self.cache_ttl)
logger.debug(f"Cached API key data for prefix: {key_prefix}")
except Exception as e:
logger.error(f"Error caching API key data for prefix {key_prefix}: {e}")
async def verify_api_key_cached(self, api_key: str, key_prefix: str) -> Optional[bool]:
"""
Verify API key using cached hash to avoid expensive bcrypt operations
Returns: True if verified, False if invalid, None if not cached
"""
try:
# Check verification cache
cached_verification = await core_cache.get_cached_verification(key_prefix)
if cached_verification:
# Check if cache is still valid (within TTL)
cached_timestamp = datetime.fromisoformat(cached_verification["timestamp"])
if datetime.utcnow() - cached_timestamp < timedelta(seconds=self.verification_cache_ttl):
logger.debug(f"API key verification cache hit for prefix: {key_prefix}")
return cached_verification.get("is_valid", False)
return None # Not cached or expired
except Exception as e:
logger.error(f"Error checking verification cache for prefix {key_prefix}: {e}")
return None
async def cache_verification_result(self, api_key: str, key_prefix: str, key_hash: str, is_valid: bool):
"""Cache API key verification result to avoid expensive bcrypt operations"""
try:
await core_cache.cache_verification_result(api_key, key_prefix, key_hash, is_valid, self.verification_cache_ttl)
logger.debug(f"Cached verification result for prefix: {key_prefix}")
except Exception as e:
logger.error(f"Error caching verification result for prefix {key_prefix}: {e}")
async def invalidate_api_key_cache(self, key_prefix: str):
"""Invalidate cached API key data"""
try:
await core_cache.invalidate_api_key(key_prefix)
# Also invalidate verification cache
verification_keys = await core_cache.clear_pattern(f"verify:{key_prefix}*", prefix="auth")
logger.debug(f"Invalidated cache for API key prefix: {key_prefix}")
except Exception as e:
logger.error(f"Error invalidating cache for prefix {key_prefix}: {e}")
async def update_last_used(self, api_key_id: int, db: AsyncSession):
"""Update last used timestamp asynchronously for performance"""
try:
# Use core cache to track update requests to avoid database spam
cache_key = f"last_used_update:{api_key_id}"
# Check if we recently updated (within 5 minutes)
last_update = await core_cache.get(cache_key, prefix="perf")
if last_update:
return # Skip update if recent
# Update database
stmt = (
select(APIKey)
.where(APIKey.id == api_key_id)
)
result = await db.execute(stmt)
api_key = result.scalar_one_or_none()
if not api_key:
logger.warning(f"API key not found: {key_prefix}")
return None
if api_key:
api_key.last_used_at = datetime.utcnow()
await db.commit()
user = api_key.user
if not user or not user.is_active:
logger.warning(f"User not found or inactive for API key: {key_prefix}")
return None
# Cache that we updated to prevent spam
await core_cache.set(cache_key, datetime.utcnow().isoformat(), ttl=300, prefix="perf")
# Return the same structure as the original service
logger.debug(f"Updated last_used_at for API key {api_key_id}")
except Exception as e:
logger.error(f"Error updating last_used for API key {api_key_id}: {e}")
async def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
try:
core_stats = await core_cache.get_stats()
return {
"user_id": user.id,
"user_email": user.email,
"user_role": user.role,
"api_key_id": api_key.id,
"api_key_name": api_key.name,
"api_key": api_key,
"user": user,
"permissions": api_key.permissions,
"scopes": api_key.scopes,
"rate_limits": {
"per_minute": api_key.rate_limit_per_minute,
"per_hour": api_key.rate_limit_per_hour,
"per_day": api_key.rate_limit_per_day
}
"cache_backend": "core_cache",
"cache_enabled": core_stats.get("enabled", False),
"cache_stats": core_stats
}
except Exception as e:
logger.error(f"Error getting cache stats: {e}")
return {
"cache_backend": "core_cache",
"cache_enabled": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Database error fetching API key {key_prefix}: {e}")
return None
async def verify_api_key_cached(self, api_key: str, key_prefix: str) -> bool:
"""Cache API key verification results to avoid repeated bcrypt operations"""
try:
redis = await self.get_redis()
# If Redis is not available, skip caching
if not redis:
logger.debug(f"Redis not available, skipping verification cache for {key_prefix}")
return False # Caller should handle full verification
# Create a hash of the key suffix for cache key (never store the actual key)
import hashlib
key_suffix = api_key[8:] if len(api_key) > 8 else api_key
key_suffix_hash = hashlib.sha256(key_suffix.encode()).hexdigest()[:16]
verification_cache_key = self._get_verification_cache_key(key_prefix, key_suffix_hash)
# Check verification cache
cached_result = await redis.get(verification_cache_key)
if cached_result:
logger.debug(f"API key verification cache hit for {key_prefix}")
return cached_result == "valid"
# Need to do actual verification - get the hash from database
# This should be called only after we've confirmed the key exists
logger.debug(f"API key verification cache miss for {key_prefix}")
return False # Caller should handle full verification
except Exception as e:
logger.warning(f"Error in verification cache for {key_prefix}: {e}")
return False
async def cache_verification_result(self, api_key: str, key_prefix: str, key_hash: str, is_valid: bool):
"""Cache the verification result to avoid future bcrypt operations"""
try:
# Only cache successful verifications and do actual verification
actual_valid = verify_api_key(api_key, key_hash)
if actual_valid != is_valid:
logger.warning(f"Verification mismatch for {key_prefix}")
return
if actual_valid:
redis = await self.get_redis()
# If Redis is not available, skip caching
if not redis:
logger.debug(f"Redis not available, skipping verification result cache for {key_prefix}")
return
# Create a hash of the key suffix for cache key
import hashlib
key_suffix = api_key[8:] if len(api_key) > 8 else api_key
key_suffix_hash = hashlib.sha256(key_suffix.encode()).hexdigest()[:16]
verification_cache_key = self._get_verification_cache_key(key_prefix, key_suffix_hash)
# Cache successful verification
await redis.setex(verification_cache_key, self.verification_cache_ttl, "valid")
logger.debug(f"Cached verification result for {key_prefix}")
except Exception as e:
logger.warning(f"Error caching verification result for {key_prefix}: {e}")
async def invalidate_api_key_cache(self, key_prefix: str):
"""Invalidate cached data for an API key"""
try:
redis = await self.get_redis()
# If Redis is not available, skip invalidation
if not redis:
logger.debug(f"Redis not available, skipping cache invalidation for {key_prefix}")
return
cache_key = self._get_cache_key(key_prefix)
await redis.delete(cache_key)
# Also invalidate verification cache - get all verification keys for this prefix
pattern = f"api_key:verified:{key_prefix}:*"
keys = await redis.keys(pattern)
if keys:
await redis.delete(*keys)
logger.debug(f"Invalidated cache for API key {key_prefix}")
except Exception as e:
logger.warning(f"Error invalidating cache for {key_prefix}: {e}")
async def update_last_used(self, api_key_id: int, db: AsyncSession):
"""Update last used timestamp with write-through cache"""
try:
redis = await self.get_redis()
current_time = datetime.utcnow()
should_update = True
# If Redis is available, check if we've updated recently (avoid too frequent DB writes)
if redis:
cache_key = self._get_last_used_cache_key(api_key_id)
last_update = await redis.get(cache_key)
if last_update:
last_update_time = datetime.fromisoformat(last_update)
if current_time - last_update_time < timedelta(minutes=1):
# Skip update if last update was less than 1 minute ago
should_update = False
if should_update:
# Update database
stmt = select(APIKey).where(APIKey.id == api_key_id)
result = await db.execute(stmt)
api_key = result.scalar_one_or_none()
if api_key:
api_key.last_used_at = current_time
await db.commit()
# Update cache if Redis is available
if redis:
cache_key = self._get_last_used_cache_key(api_key_id)
await redis.setex(cache_key, 300, current_time.isoformat())
logger.debug(f"Updated last used timestamp for API key {api_key_id}")
except Exception as e:
logger.warning(f"Error updating last used timestamp for API key {api_key_id}: {e}")
# Global cached service instance
# Global instance
cached_api_key_service = CachedAPIKeyService()

View File

@@ -111,8 +111,16 @@ class ModuleManager:
# Load saved configurations
await module_config_manager.load_saved_configs()
# Filter out core infrastructure that shouldn't be pluggable modules
EXCLUDED_MODULES = ["cache"] # Cache is now core infrastructure
# Convert manifests to ModuleConfig objects
for name, manifest in discovered_manifests.items():
# Skip modules that are now core infrastructure
if name in EXCLUDED_MODULES:
logger.info(f"Skipping module '{name}' - now integrated as core infrastructure")
continue
saved_config = module_config_manager.get_module_config(name)
module_config = ModuleConfig(

View File

@@ -1,10 +0,0 @@
{
"interval": 30,
"alert_thresholds": {
"cpu_warning": 80,
"cpu_critical": 95,
"memory_warning": 85,
"memory_critical": 95
},
"retention_hours": 24
}

View File

@@ -1,6 +0,0 @@
"""
Cache module for Confidential Empire platform
"""
from .main import CacheModule
__all__ = ["CacheModule"]

View File

@@ -1,281 +0,0 @@
"""
Cache module implementation with Redis backend
"""
import asyncio
import json
import logging
from typing import Any, Dict, Optional, Union
from datetime import datetime, timedelta
import redis.asyncio as redis
from redis.asyncio import Redis
from contextlib import asynccontextmanager
from app.core.config import settings
from app.core.logging import log_module_event
logger = logging.getLogger(__name__)
class CacheModule:
"""Redis-based cache module for request/response caching"""
def __init__(self):
self.redis_client: Optional[Redis] = None
self.config: Dict[str, Any] = {}
self.enabled = False
self.stats = {
"hits": 0,
"misses": 0,
"errors": 0,
"total_requests": 0
}
async def initialize(self):
"""Initialize the cache module"""
try:
# Initialize Redis connection
redis_url = getattr(settings, 'REDIS_URL', 'redis://localhost:6379/0')
self.redis_client = redis.from_url(
redis_url,
encoding="utf-8",
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
# Test connection
await self.redis_client.ping()
self.enabled = True
log_module_event("cache", "initialized", {
"provider": self.config.get("provider", "redis"),
"ttl": self.config.get("ttl", 3600),
"max_size": self.config.get("max_size", 10000)
})
except Exception as e:
logger.error(f"Failed to initialize cache module: {e}")
log_module_event("cache", "initialization_failed", {"error": str(e)})
self.enabled = False
raise
async def cleanup(self):
"""Cleanup cache resources"""
if self.redis_client:
await self.redis_client.close()
self.redis_client = None
self.enabled = False
log_module_event("cache", "cleanup", {"success": True})
def _get_cache_key(self, key: str, prefix: str = "ce") -> str:
"""Generate cache key with prefix"""
return f"{prefix}:{key}"
async def get(self, key: str, default: Any = None) -> Any:
"""Get value from cache"""
if not self.enabled:
return default
try:
cache_key = self._get_cache_key(key)
value = await self.redis_client.get(cache_key)
if value is None:
self.stats["misses"] += 1
return default
self.stats["hits"] += 1
self.stats["total_requests"] += 1
# Try to deserialize JSON
try:
return json.loads(value)
except json.JSONDecodeError:
return value
except Exception as e:
logger.error(f"Cache get error: {e}")
self.stats["errors"] += 1
return default
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set value in cache"""
if not self.enabled:
return False
try:
cache_key = self._get_cache_key(key)
ttl = ttl or self.config.get("ttl", 3600)
# Serialize complex objects as JSON
if isinstance(value, (dict, list, tuple)):
value = json.dumps(value)
await self.redis_client.setex(cache_key, ttl, value)
return True
except Exception as e:
logger.error(f"Cache set error: {e}")
self.stats["errors"] += 1
return False
async def delete(self, key: str) -> bool:
"""Delete key from cache"""
if not self.enabled:
return False
try:
cache_key = self._get_cache_key(key)
result = await self.redis_client.delete(cache_key)
return result > 0
except Exception as e:
logger.error(f"Cache delete error: {e}")
self.stats["errors"] += 1
return False
async def exists(self, key: str) -> bool:
"""Check if key exists in cache"""
if not self.enabled:
return False
try:
cache_key = self._get_cache_key(key)
return await self.redis_client.exists(cache_key) > 0
except Exception as e:
logger.error(f"Cache exists error: {e}")
self.stats["errors"] += 1
return False
async def clear_pattern(self, pattern: str) -> int:
"""Clear keys matching pattern"""
if not self.enabled:
return 0
try:
cache_pattern = self._get_cache_key(pattern)
keys = await self.redis_client.keys(cache_pattern)
if keys:
return await self.redis_client.delete(*keys)
return 0
except Exception as e:
logger.error(f"Cache clear pattern error: {e}")
self.stats["errors"] += 1
return 0
async def clear_all(self) -> bool:
"""Clear all cache entries"""
if not self.enabled:
return False
try:
await self.redis_client.flushdb()
return True
except Exception as e:
logger.error(f"Cache clear all error: {e}")
self.stats["errors"] += 1
return False
async def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
stats = self.stats.copy()
if self.enabled:
try:
info = await self.redis_client.info()
stats.update({
"redis_memory_used": info.get("used_memory_human", "N/A"),
"redis_connected_clients": info.get("connected_clients", 0),
"redis_total_commands": info.get("total_commands_processed", 0),
"hit_rate": round(
(stats["hits"] / stats["total_requests"]) * 100, 2
) if stats["total_requests"] > 0 else 0
})
except Exception as e:
logger.error(f"Error getting Redis stats: {e}")
return stats
async def pre_request_interceptor(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Pre-request interceptor for caching"""
if not self.enabled:
return context
request = context.get("request")
if not request:
return context
# Only cache GET requests
if request.method != "GET":
return context
# Generate cache key from request
cache_key = f"request:{request.method}:{request.url.path}"
if request.query_params:
cache_key += f":{hash(str(request.query_params))}"
# Check if cached response exists
cached_response = await self.get(cache_key)
if cached_response:
log_module_event("cache", "hit", {"cache_key": cache_key})
context["cached_response"] = cached_response
context["cache_hit"] = True
else:
log_module_event("cache", "miss", {"cache_key": cache_key})
context["cache_key"] = cache_key
context["cache_hit"] = False
return context
async def post_response_interceptor(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Post-response interceptor for caching"""
if not self.enabled:
return context
# Skip if this was a cache hit
if context.get("cache_hit"):
return context
cache_key = context.get("cache_key")
response = context.get("response")
if cache_key and response and response.status_code == 200:
# Cache successful responses
cache_data = {
"status_code": response.status_code,
"headers": dict(response.headers),
"body": response.body.decode() if hasattr(response, 'body') else None,
"timestamp": datetime.utcnow().isoformat()
}
await self.set(cache_key, cache_data)
log_module_event("cache", "stored", {"cache_key": cache_key})
return context
# Global cache instance
cache_module = CacheModule()
# Module interface functions
async def initialize():
"""Initialize cache module"""
await cache_module.initialize()
async def cleanup():
"""Cleanup cache module"""
await cache_module.cleanup()
async def pre_request_interceptor(context: Dict[str, Any]) -> Dict[str, Any]:
"""Pre-request interceptor"""
return await cache_module.pre_request_interceptor(context)
async def post_response_interceptor(context: Dict[str, Any]) -> Dict[str, Any]:
"""Post-response interceptor"""
return await cache_module.post_response_interceptor(context)# Force reload
# Trigger reload

View File

@@ -0,0 +1,109 @@
name: rag
version: 1.0.0
description: "Document search, retrieval, and vector storage"
author: "Enclava Team"
category: "ai"
# Module lifecycle
enabled: true
auto_start: true
dependencies: []
optional_dependencies:
- cache
# Module capabilities
provides:
- "document_storage"
- "semantic_search"
- "vector_embeddings"
- "document_processing"
- "workflow_rag_step"
consumes:
- "qdrant_connection"
- "llm_embeddings"
- "document_parsing"
# API endpoints
endpoints:
- path: "/rag/collections"
method: "GET"
description: "List document collections"
- path: "/rag/upload"
method: "POST"
description: "Upload and process documents"
- path: "/rag/search"
method: "POST"
description: "Semantic search in documents"
- path: "/rag/collections/{collection_id}/documents"
method: "GET"
description: "List documents in collection"
# Workflow integration
workflow_steps:
- name: "rag_search"
description: "Search documents for relevant context"
inputs:
- name: "query"
type: "string"
required: true
description: "Search query"
- name: "collection_id"
type: "string"
required: true
description: "Document collection to search"
- name: "top_k"
type: "integer"
required: false
default: 5
description: "Number of top results to return"
outputs:
- name: "results"
type: "array"
description: "Search results with content and metadata"
- name: "context"
type: "string"
description: "Combined context from top results"
# UI Configuration
ui_config:
icon: "search"
color: "#8B5CF6"
category: "AI & ML"
forms:
- name: "collection_config"
title: "Collection Settings"
fields: ["name", "description", "embedding_model"]
- name: "search_config"
title: "Search Configuration"
fields: ["top_k", "similarity_threshold", "rerank_enabled"]
# Permissions
permissions:
- name: "rag.create"
description: "Create document collections"
- name: "rag.upload"
description: "Upload documents to collections"
- name: "rag.search"
description: "Search document collections"
- name: "rag.manage"
description: "Manage all collections (admin)"
# Health checks
health_checks:
- name: "qdrant_connectivity"
description: "Check Qdrant vector database connection"
- name: "embeddings_service"
description: "Check LLM embeddings service"
- name: "document_processing"
description: "Check document parsing capabilities"

View File

@@ -0,0 +1,145 @@
name: workflow
version: 1.0.0
description: "Multi-step automation processes"
author: "Enclava Team"
category: "automation"
# Module lifecycle
enabled: true
auto_start: true
dependencies: []
optional_dependencies:
- rag
- chatbot
- cache
# Module capabilities
provides:
- "workflow_execution"
- "step_orchestration"
- "automation_triggers"
- "workflow_templates"
consumes:
- "llm_completion"
- "rag_search"
- "chatbot_response"
- "external_apis"
# API endpoints
endpoints:
- path: "/workflow/templates"
method: "GET"
description: "List available workflow templates"
- path: "/workflow/create"
method: "POST"
description: "Create new workflow"
- path: "/workflow/execute"
method: "POST"
description: "Execute workflow"
- path: "/workflow/status/{workflow_id}"
method: "GET"
description: "Get workflow execution status"
- path: "/workflow/history"
method: "GET"
description: "Get workflow execution history"
# Workflow integration
workflow_steps:
- name: "conditional_branch"
description: "Conditional logic branching"
inputs:
- name: "condition"
type: "string"
required: true
description: "Condition to evaluate"
- name: "true_path"
type: "object"
required: true
description: "Steps to execute if condition is true"
- name: "false_path"
type: "object"
required: false
description: "Steps to execute if condition is false"
outputs:
- name: "result"
type: "object"
description: "Result from executed branch"
- name: "loop_iteration"
description: "Iterative processing loop"
inputs:
- name: "items"
type: "array"
required: true
description: "Items to iterate over"
- name: "steps"
type: "object"
required: true
description: "Steps to execute for each item"
outputs:
- name: "results"
type: "array"
description: "Results from each iteration"
# UI Configuration
ui_config:
icon: "workflow"
color: "#06B6D4"
category: "Automation"
forms:
- name: "workflow_config"
title: "Workflow Settings"
fields: ["name", "description", "trigger_type"]
- name: "step_config"
title: "Step Configuration"
fields: ["step_type", "parameters", "retry_attempts"]
- name: "scheduling"
title: "Scheduling & Triggers"
fields: ["schedule", "webhook_triggers", "event_triggers"]
# Permissions
permissions:
- name: "workflow.create"
description: "Create new workflows"
- name: "workflow.execute"
description: "Execute workflows"
- name: "workflow.configure"
description: "Configure workflow settings"
- name: "workflow.manage"
description: "Manage all workflows (admin)"
# Analytics events
analytics_events:
- name: "workflow_created"
description: "New workflow template created"
- name: "workflow_executed"
description: "Workflow execution started"
- name: "workflow_completed"
description: "Workflow execution completed"
- name: "workflow_failed"
description: "Workflow execution failed"
# Health checks
health_checks:
- name: "execution_engine"
description: "Check workflow execution engine"
- name: "step_dependencies"
description: "Check availability of workflow step dependencies"
- name: "template_validation"
description: "Validate workflow templates"

View File

@@ -0,0 +1,13 @@
"""
Zammad Integration Module for Enclava Platform
AI-powered ticket summarization for Zammad ticketing system.
Replaces Ollama with Enclava's chatbot system for enhanced security and flexibility.
"""
from .main import ZammadModule
__version__ = "1.0.0"
__author__ = "Enclava Platform"
__all__ = ["ZammadModule"]

View File

@@ -0,0 +1,972 @@
"""
Zammad Integration Module for Enclava Platform
AI-powered ticket summarization using Enclava's chatbot system instead of Ollama.
Provides secure, configurable integration with Zammad ticketing systems.
"""
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Any, Optional, Tuple
from urllib.parse import urljoin
import aiohttp
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_, or_
from sqlalchemy.orm import selectinload
from app.services.base_module import BaseModule, Permission, ModuleHealth
from app.core.config import settings
from app.db.database import async_session_factory
from app.models.user import User
from app.models.chatbot import ChatbotInstance
from app.services.litellm_client import LiteLLMClient
from cryptography.fernet import Fernet
import base64
import os
# Import our module-specific models
from .models import (
ZammadTicket,
ZammadProcessingLog,
ZammadConfiguration,
TicketState,
ProcessingStatus
)
logger = logging.getLogger(__name__)
class ZammadModule(BaseModule):
"""Zammad Integration Module for AI-powered ticket summarization"""
def __init__(self, config: Dict[str, Any] = None):
super().__init__("zammad", config)
self.name = "Zammad Integration"
self.description = "AI-powered ticket summarization for Zammad ticketing system"
self.version = "1.0.0"
# Core services
self.llm_client = None
self.session_pool = None
# Processing state
self.auto_process_task = None
self.processing_lock = asyncio.Lock()
# Initialize encryption for API tokens
self._init_encryption()
async def initialize(self) -> None:
"""Initialize the Zammad module"""
try:
logger.info("Initializing Zammad module...")
# Initialize LLM client for chatbot integration
self.llm_client = LiteLLMClient()
# Create HTTP session pool for Zammad API calls
timeout = aiohttp.ClientTimeout(total=60, connect=10)
self.session_pool = aiohttp.ClientSession(
timeout=timeout,
headers={
"Content-Type": "application/json",
"User-Agent": "Enclava-Zammad-Integration/1.0.0"
}
)
# Verify database tables exist (they should be created by migration)
await self._verify_database_tables()
# Start auto-processing if enabled
await self._start_auto_processing()
self.initialized = True
self.health.status = "healthy"
self.health.message = "Zammad module initialized successfully"
logger.info("Zammad module initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Zammad module: {e}")
self.health.status = "error"
self.health.message = f"Initialization failed: {str(e)}"
raise
async def cleanup(self) -> None:
"""Cleanup module resources"""
try:
logger.info("Cleaning up Zammad module...")
# Stop auto-processing
if self.auto_process_task and not self.auto_process_task.done():
self.auto_process_task.cancel()
try:
await self.auto_process_task
except asyncio.CancelledError:
pass
# Close HTTP session
if self.session_pool and not self.session_pool.closed:
await self.session_pool.close()
self.initialized = False
logger.info("Zammad module cleanup completed")
except Exception as e:
logger.error(f"Error during Zammad module cleanup: {e}")
def get_required_permissions(self) -> List[Permission]:
"""Return list of permissions this module requires"""
return [
Permission("zammad", "read", "Read Zammad tickets and configurations"),
Permission("zammad", "write", "Create and update Zammad ticket summaries"),
Permission("zammad", "configure", "Configure Zammad integration settings"),
Permission("chatbot", "use", "Use chatbot for AI summarization"),
]
async def process_request(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Process a module request"""
action = request.get("action", "unknown")
user_id = context.get("user_id")
logger.info(f"Processing Zammad request: {action} for user {user_id}")
# Route to appropriate handler based on action
if action == "process_tickets":
return await self._handle_process_tickets(request, context)
elif action == "get_ticket_summary":
return await self._handle_get_ticket_summary(request, context)
elif action == "process_single_ticket":
return await self._handle_process_single_ticket(request, context)
elif action == "get_status":
return await self._handle_get_status(request, context)
elif action == "get_configurations":
return await self._handle_get_configurations(request, context)
elif action == "save_configuration":
return await self._handle_save_configuration(request, context)
elif action == "test_connection":
return await self._handle_test_connection(request, context)
else:
raise ValueError(f"Unknown action: {action}")
async def _handle_process_tickets(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle batch ticket processing request"""
async with self.processing_lock:
user_id = context.get("user_id")
config_id = request.get("config_id")
filters = request.get("filters", {})
# Get user configuration
config = await self._get_user_configuration(user_id, config_id)
if not config:
raise ValueError("Configuration not found")
# Create processing batch
batch_id = str(uuid.uuid4())
# Start processing
result = await self._process_tickets_batch(
config=config,
batch_id=batch_id,
user_id=user_id,
filters=filters
)
return {
"batch_id": batch_id,
"status": "completed",
"result": result
}
async def _handle_get_ticket_summary(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get ticket summary request"""
ticket_id = request.get("ticket_id")
if not ticket_id:
raise ValueError("ticket_id is required")
async with async_session_factory() as db:
# Get ticket from database
stmt = select(ZammadTicket).where(ZammadTicket.zammad_ticket_id == ticket_id)
result = await db.execute(stmt)
ticket = result.scalar_one_or_none()
if not ticket:
return {"error": "Ticket not found", "ticket_id": ticket_id}
return {"ticket": ticket.to_dict()}
async def _handle_process_single_ticket(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle single ticket processing request"""
user_id = context.get("user_id")
ticket_id = request.get("ticket_id")
config_id = request.get("config_id")
if not ticket_id:
raise ValueError("ticket_id is required")
# Get user configuration
config = await self._get_user_configuration(user_id, config_id)
if not config:
raise ValueError("Configuration not found")
# Process single ticket
result = await self._process_single_ticket(config, ticket_id, user_id)
return {"ticket_id": ticket_id, "result": result}
async def _handle_get_status(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get module status request"""
user_id = context.get("user_id")
async with async_session_factory() as db:
# Import func for count queries
from sqlalchemy import func
# Get processing statistics - use func.count() to get actual counts
total_tickets_result = await db.scalar(
select(func.count(ZammadTicket.id)).where(ZammadTicket.processed_by_user_id == user_id)
)
total_tickets = total_tickets_result or 0
processed_tickets_result = await db.scalar(
select(func.count(ZammadTicket.id)).where(
and_(
ZammadTicket.processed_by_user_id == user_id,
ZammadTicket.processing_status == ProcessingStatus.COMPLETED.value
)
)
)
processed_tickets = processed_tickets_result or 0
failed_tickets_result = await db.scalar(
select(func.count(ZammadTicket.id)).where(
and_(
ZammadTicket.processed_by_user_id == user_id,
ZammadTicket.processing_status == ProcessingStatus.FAILED.value
)
)
)
failed_tickets = failed_tickets_result or 0
# Get recent processing logs
recent_logs = await db.execute(
select(ZammadProcessingLog)
.where(ZammadProcessingLog.initiated_by_user_id == user_id)
.order_by(ZammadProcessingLog.started_at.desc())
.limit(10)
)
logs = [log.to_dict() for log in recent_logs.scalars()]
return {
"module_health": self.get_health().__dict__,
"module_metrics": self.get_metrics().__dict__,
"statistics": {
"total_tickets": total_tickets,
"processed_tickets": processed_tickets,
"failed_tickets": failed_tickets,
"success_rate": (processed_tickets / max(total_tickets, 1)) * 100 if total_tickets > 0 else 0
},
"recent_processing_logs": logs
}
async def _handle_get_configurations(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get configurations request"""
user_id = context.get("user_id")
async with async_session_factory() as db:
stmt = (
select(ZammadConfiguration)
.where(ZammadConfiguration.user_id == user_id)
.where(ZammadConfiguration.is_active == True)
.order_by(ZammadConfiguration.is_default.desc(), ZammadConfiguration.created_at.desc())
)
result = await db.execute(stmt)
configs = [config.to_dict() for config in result.scalars()]
return {"configurations": configs}
async def _handle_save_configuration(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle save configuration request"""
user_id = context.get("user_id")
config_data = request.get("configuration", {})
# Validate required fields
required_fields = ["name", "zammad_url", "api_token", "chatbot_id"]
for field in required_fields:
if not config_data.get(field):
raise ValueError(f"Required field missing: {field}")
async with async_session_factory() as db:
# Verify chatbot exists and user has access
chatbot_stmt = select(ChatbotInstance).where(
and_(
ChatbotInstance.id == config_data["chatbot_id"],
ChatbotInstance.created_by == str(user_id),
ChatbotInstance.is_active == True
)
)
chatbot = await db.scalar(chatbot_stmt)
if not chatbot:
raise ValueError("Chatbot not found or access denied")
# Encrypt API token
encrypted_token = self._encrypt_data(config_data["api_token"])
# Create new configuration
config = ZammadConfiguration(
user_id=user_id,
name=config_data["name"],
description=config_data.get("description"),
is_default=config_data.get("is_default", False),
zammad_url=config_data["zammad_url"].rstrip("/"),
api_token_encrypted=encrypted_token,
chatbot_id=config_data["chatbot_id"],
process_state=config_data.get("process_state", "open"),
max_tickets=config_data.get("max_tickets", 10),
skip_existing=config_data.get("skip_existing", True),
auto_process=config_data.get("auto_process", False),
process_interval=config_data.get("process_interval", 30),
summary_template=config_data.get("summary_template"),
custom_settings=config_data.get("custom_settings", {})
)
# If this is set as default, unset other defaults
if config.is_default:
await db.execute(
ZammadConfiguration.__table__.update()
.where(ZammadConfiguration.user_id == user_id)
.values(is_default=False)
)
db.add(config)
await db.commit()
await db.refresh(config)
return {"configuration": config.to_dict()}
async def _handle_test_connection(self, request: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Handle test Zammad connection request"""
zammad_url = request.get("zammad_url")
api_token = request.get("api_token")
if not zammad_url or not api_token:
raise ValueError("zammad_url and api_token are required")
result = await self._test_zammad_connection(zammad_url, api_token)
return result
async def _process_tickets_batch(self, config: ZammadConfiguration, batch_id: str, user_id: int, filters: Dict[str, Any]) -> Dict[str, Any]:
"""Process a batch of tickets"""
async with async_session_factory() as db:
# Create processing log
log = ZammadProcessingLog(
batch_id=batch_id,
initiated_by_user_id=user_id,
config_used=config.to_dict(),
filters_applied=filters,
status="running"
)
db.add(log)
await db.commit()
start_time = datetime.now(timezone.utc) # Keep as timezone-aware for calculations
try:
# Get tickets from Zammad
tickets = await self._fetch_zammad_tickets(config, filters)
log.tickets_found = len(tickets)
logger.info(f"Fetched {len(tickets)} tickets from Zammad for processing")
# Process each ticket
processed = 0
failed = 0
skipped = 0
for i, ticket_data in enumerate(tickets, 1):
try:
# Validate ticket data structure
if not isinstance(ticket_data, dict):
logger.error(f"Ticket {i} is not a dictionary: {type(ticket_data)}")
failed += 1
continue
ticket_id = ticket_data.get('id', f'unknown-{i}')
logger.info(f"Processing ticket {i}/{len(tickets)}: ID {ticket_id}")
logger.info(f"Ticket {i} data type: {type(ticket_data)}")
logger.info(f"Ticket {i} content: {str(ticket_data)[:300]}...")
result = await self._process_ticket_data(config, ticket_data, user_id)
if result["status"] == "processed":
processed += 1
elif result["status"] == "skipped":
skipped += 1
else:
failed += 1
except Exception as e:
# Safely get ticket ID for error reporting
ticket_id = ticket_data.get('id', f'unknown-{i}') if isinstance(ticket_data, dict) else f'unknown-{i}'
logger.error(f"Error processing ticket {ticket_id}: {e}")
logger.debug(f"Ticket data type: {type(ticket_data)}, content: {str(ticket_data)[:200]}...")
failed += 1
# Update log
end_time = datetime.now(timezone.utc)
processing_time = (end_time - start_time).total_seconds()
log.completed_at = self._to_naive_utc(end_time)
log.tickets_processed = processed
log.tickets_failed = failed
log.tickets_skipped = skipped
log.processing_time_seconds = int(processing_time)
log.average_time_per_ticket = int((processing_time / max(len(tickets), 1)) * 1000)
log.status = "completed"
await db.commit()
return {
"tickets_found": len(tickets),
"tickets_processed": processed,
"tickets_failed": failed,
"tickets_skipped": skipped,
"processing_time_seconds": int(processing_time)
}
except Exception as e:
# Update log with error
log.status = "failed"
log.errors_encountered = [str(e)]
log.completed_at = self._to_naive_utc(datetime.now(timezone.utc))
await db.commit()
raise
async def _process_single_ticket(self, config: ZammadConfiguration, ticket_id: int, user_id: int) -> Dict[str, Any]:
"""Process a single ticket"""
# Fetch ticket details from Zammad
ticket_data = await self._fetch_single_zammad_ticket(config, ticket_id)
if not ticket_data:
return {"status": "error", "message": "Ticket not found"}
# Process the ticket
result = await self._process_ticket_data(config, ticket_data, user_id)
return result
async def _process_ticket_data(self, config: ZammadConfiguration, ticket_data: Dict[str, Any], user_id: int) -> Dict[str, Any]:
"""Process individual ticket data"""
logger.info(f"Processing ticket data: type={type(ticket_data)}, keys={list(ticket_data.keys()) if isinstance(ticket_data, dict) else 'N/A'}")
# Ensure ticket_data is a dictionary
if not isinstance(ticket_data, dict):
raise ValueError(f"Expected ticket_data to be a dictionary, got {type(ticket_data)}")
ticket_id = ticket_data.get("id")
if ticket_id is None:
raise ValueError("Ticket data missing 'id' field")
# Debug nested field types that might be causing issues
logger.info(f"Ticket {ticket_id} customer field type: {type(ticket_data.get('customer'))}")
logger.info(f"Ticket {ticket_id} customer value: {ticket_data.get('customer')}")
logger.info(f"Ticket {ticket_id} group field type: {type(ticket_data.get('group'))}")
logger.info(f"Ticket {ticket_id} state field type: {type(ticket_data.get('state'))}")
logger.info(f"Ticket {ticket_id} priority field type: {type(ticket_data.get('priority'))}")
async with async_session_factory() as db:
# Check if ticket already exists and should be skipped
existing = await db.scalar(
select(ZammadTicket).where(ZammadTicket.zammad_ticket_id == ticket_id)
)
if existing and config.skip_existing and existing.processing_status == ProcessingStatus.COMPLETED.value:
return {"status": "skipped", "reason": "already_processed"}
try:
# Fetch full ticket details and conversation
logger.info(f"Ticket {ticket_id}: Fetching full ticket details with articles...")
try:
full_ticket = await self._fetch_ticket_with_articles(config, ticket_id)
if not full_ticket:
return {"status": "error", "message": "Could not fetch ticket details"}
logger.info(f"Ticket {ticket_id}: Successfully fetched full ticket details")
except Exception as e:
logger.error(f"Ticket {ticket_id}: Error fetching full ticket details: {e}")
raise
# Generate summary using chatbot
logger.info(f"Ticket {ticket_id}: Generating AI summary...")
try:
summary = await self._generate_ticket_summary(config, full_ticket)
logger.info(f"Ticket {ticket_id}: Successfully generated AI summary")
except Exception as e:
logger.error(f"Ticket {ticket_id}: Error generating summary: {e}")
raise
# Create/update ticket record
if existing:
ticket_record = existing
ticket_record.processing_status = ProcessingStatus.PROCESSING.value
else:
ticket_record = ZammadTicket(
zammad_ticket_id=ticket_id,
ticket_number=ticket_data.get("number"),
title=ticket_data.get("title"),
state=ticket_data.get("state"),
priority=ticket_data.get("priority"),
customer_email=self._safe_get_customer_email(ticket_data),
processing_status=ProcessingStatus.PROCESSING.value,
processed_by_user_id=user_id,
chatbot_id=config.chatbot_id
)
db.add(ticket_record)
# Update with summary and processing info
ticket_record.summary = summary
ticket_record.context_data = full_ticket
ticket_record.processed_at = self._to_naive_utc(datetime.now(timezone.utc))
ticket_record.processing_status = ProcessingStatus.COMPLETED.value
ticket_record.config_snapshot = config.to_dict()
# Safely parse Zammad timestamps and convert to naive UTC for DB
if ticket_data.get("created_at"):
parsed_dt = self._safe_parse_datetime(ticket_data["created_at"])
ticket_record.zammad_created_at = self._to_naive_utc(parsed_dt)
if ticket_data.get("updated_at"):
parsed_dt = self._safe_parse_datetime(ticket_data["updated_at"])
ticket_record.zammad_updated_at = self._to_naive_utc(parsed_dt)
ticket_record.zammad_article_count = len(full_ticket.get("articles", []))
await db.commit()
# Post summary to Zammad as internal note
await self._post_summary_to_zammad(config, ticket_id, summary)
return {"status": "processed", "summary": summary}
except Exception as e:
logger.error(f"Error processing ticket {ticket_id}: {e}")
# Update record with error
if existing:
ticket_record = existing
else:
ticket_record = ZammadTicket(
zammad_ticket_id=ticket_id,
ticket_number=ticket_data.get("number"),
title=ticket_data.get("title"),
state=ticket_data.get("state"),
processing_status=ProcessingStatus.FAILED.value,
processed_by_user_id=user_id,
chatbot_id=config.chatbot_id
)
db.add(ticket_record)
ticket_record.processing_status = ProcessingStatus.FAILED.value
ticket_record.error_message = str(e)
ticket_record.processed_at = self._to_naive_utc(datetime.now(timezone.utc))
await db.commit()
return {"status": "error", "message": str(e)}
async def _generate_ticket_summary(self, config: ZammadConfiguration, ticket_data: Dict[str, Any]) -> str:
"""Generate AI summary for ticket using Enclava chatbot"""
# Build context for the LLM
context = self._build_ticket_context(ticket_data)
# Get summary template
template = config.summary_template or (
"Generate a concise summary of this support ticket including key issues, "
"customer concerns, and any actions taken."
)
# Prepare messages for chatbot
messages = [
{
"role": "system",
"content": f"{template}\n\nPlease provide a professional summary that would be helpful for support agents."
},
{
"role": "user",
"content": context
}
]
# Generate summary using LLM client
response = await self.llm_client.create_chat_completion(
messages=messages,
model=await self._get_chatbot_model(config.chatbot_id),
user_id=str(config.user_id),
api_key_id=0, # Using 0 for module requests
temperature=0.3,
max_tokens=500
)
# Extract content from LiteLLM response
if "choices" in response and len(response["choices"]) > 0:
return response["choices"][0]["message"]["content"].strip()
return "Unable to generate summary."
def _build_ticket_context(self, ticket_data: Dict[str, Any]) -> str:
"""Build formatted context string for AI processing"""
context_parts = []
# Basic ticket information
context_parts.append(f"Ticket #{ticket_data.get('number', 'Unknown')}")
context_parts.append(f"Title: {ticket_data.get('title', 'No title')}")
context_parts.append(f"State: {ticket_data.get('state', 'Unknown')}")
if ticket_data.get('priority'):
context_parts.append(f"Priority: {ticket_data['priority']}")
customer_email = self._safe_get_customer_email(ticket_data)
if customer_email:
context_parts.append(f"Customer: {customer_email}")
# Add conversation history
articles = ticket_data.get('articles', [])
if articles:
context_parts.append("\nConversation History:")
for i, article in enumerate(articles[-10:], 1): # Last 10 articles
try:
# Safely extract article data
if not isinstance(article, dict):
logger.warning(f"Article {i} is not a dictionary: {type(article)}")
continue
sender = article.get('from', 'Unknown')
content = article.get('body', '').strip()
if content:
# Clean up HTML if present
if '<' in content and '>' in content:
import re
content = re.sub(r'<[^>]+>', '', content)
content = content.replace('&nbsp;', ' ')
content = content.replace('&amp;', '&')
content = content.replace('&lt;', '<')
content = content.replace('&gt;', '>')
# Truncate very long messages
if len(content) > 1000:
content = content[:1000] + "... [truncated]"
context_parts.append(f"\n{i}. From: {sender}")
context_parts.append(f" {content}")
except Exception as e:
logger.warning(f"Error processing article {i}: {e}")
continue
return "\n".join(context_parts)
async def _get_chatbot_model(self, chatbot_id: str) -> str:
"""Get the model name for the specified chatbot"""
async with async_session_factory() as db:
chatbot = await db.scalar(
select(ChatbotInstance).where(ChatbotInstance.id == chatbot_id)
)
if not chatbot:
raise ValueError(f"Chatbot {chatbot_id} not found")
# Default to a reasonable model if not specified
return getattr(chatbot, 'model', 'privatemode-llama-3-70b')
async def _fetch_zammad_tickets(self, config: ZammadConfiguration, filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch tickets from Zammad API"""
# Decrypt API token
api_token = self._decrypt_data(config.api_token_encrypted)
url = urljoin(config.zammad_url, "/api/v1/tickets")
headers = {
"Authorization": f"Token token={api_token}",
"Content-Type": "application/json"
}
# Build query parameters
params = {
"expand": "true",
"per_page": filters.get("limit", config.max_tickets)
}
# Add state filter
state = filters.get("state", config.process_state)
if state and state != "all":
params["state"] = state
async with self.session_pool.get(url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
# Handle different Zammad API response formats
if isinstance(data, list):
# Zammad returned a list directly
tickets = data
logger.info(f"Zammad API returned list directly with {len(tickets)} tickets")
elif isinstance(data, dict):
# Zammad returned a dictionary with "tickets" key
tickets = data.get("tickets", [])
logger.info(f"Zammad API returned dict with {len(tickets)} tickets")
logger.debug(f"Zammad API response structure: keys={list(data.keys())}")
else:
logger.error(f"Unexpected Zammad API response type: {type(data)}")
raise Exception(f"Zammad API returned unexpected data type: {type(data)}")
# Validate that tickets is actually a list
if not isinstance(tickets, list):
logger.error(f"Expected tickets to be a list, got {type(tickets)}: {str(tickets)[:200]}...")
raise Exception(f"Zammad API returned invalid ticket data structure: expected list, got {type(tickets)}")
return tickets
else:
error_text = await response.text()
raise Exception(f"Zammad API error {response.status}: {error_text}")
async def _fetch_single_zammad_ticket(self, config: ZammadConfiguration, ticket_id: int) -> Optional[Dict[str, Any]]:
"""Fetch single ticket from Zammad API"""
api_token = self._decrypt_data(config.api_token_encrypted)
url = urljoin(config.zammad_url, f"/api/v1/tickets/{ticket_id}")
headers = {
"Authorization": f"Token token={api_token}",
"Content-Type": "application/json"
}
params = {"expand": "true"}
async with self.session_pool.get(url, headers=headers, params=params) as response:
if response.status == 200:
return await response.json()
elif response.status == 404:
return None
else:
error_text = await response.text()
raise Exception(f"Zammad API error {response.status}: {error_text}")
async def _fetch_ticket_with_articles(self, config: ZammadConfiguration, ticket_id: int) -> Optional[Dict[str, Any]]:
"""Fetch ticket with full conversation articles"""
# Get basic ticket info
ticket = await self._fetch_single_zammad_ticket(config, ticket_id)
if not ticket:
return None
# Fetch articles
api_token = self._decrypt_data(config.api_token_encrypted)
articles_url = urljoin(config.zammad_url, f"/api/v1/ticket_articles/by_ticket/{ticket_id}")
headers = {
"Authorization": f"Token token={api_token}",
"Content-Type": "application/json"
}
async with self.session_pool.get(articles_url, headers=headers) as response:
if response.status == 200:
articles_data = await response.json()
# Handle different Zammad articles API response formats
if isinstance(articles_data, list):
# Articles returned as list directly
articles = articles_data
logger.info(f"Articles API returned list directly with {len(articles)} articles for ticket {ticket_id}")
elif isinstance(articles_data, dict):
# Articles returned as dictionary with "articles" key
articles = articles_data.get("articles", [])
logger.info(f"Articles API returned dict with {len(articles)} articles for ticket {ticket_id}")
else:
logger.error(f"Unexpected articles API response type for ticket {ticket_id}: {type(articles_data)}")
articles = []
ticket["articles"] = articles
else:
logger.warning(f"Could not fetch articles for ticket {ticket_id}: {response.status}")
ticket["articles"] = []
return ticket
async def _post_summary_to_zammad(self, config: ZammadConfiguration, ticket_id: int, summary: str) -> bool:
"""Post AI summary as internal note to Zammad ticket"""
try:
api_token = self._decrypt_data(config.api_token_encrypted)
url = urljoin(config.zammad_url, "/api/v1/ticket_articles")
headers = {
"Authorization": f"Token token={api_token}",
"Content-Type": "application/json"
}
# Create internal note payload
article_data = {
"ticket_id": ticket_id,
"type": "note",
"internal": True, # This ensures only agents can see it
"subject": "AI Summary - Enclava",
"body": f"**AI-Generated Summary**\n\n{summary}\n\n---\n*Generated by Enclava AI at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC*"
}
async with self.session_pool.post(url, headers=headers, json=article_data) as response:
if response.status in (200, 201):
logger.info(f"Successfully posted AI summary to ticket {ticket_id}")
return True
else:
error_text = await response.text()
logger.error(f"Failed to post summary to ticket {ticket_id}: {response.status} - {error_text}")
return False
except Exception as e:
logger.error(f"Error posting summary to Zammad ticket {ticket_id}: {e}")
return False
async def _test_zammad_connection(self, zammad_url: str, api_token: str) -> Dict[str, Any]:
"""Test connection to Zammad instance"""
try:
url = urljoin(zammad_url.rstrip("/"), "/api/v1/users/me")
headers = {
"Authorization": f"Token token={api_token}",
"Content-Type": "application/json"
}
async with self.session_pool.get(url, headers=headers) as response:
if response.status == 200:
user_data = await response.json()
return {
"status": "success",
"message": "Connection successful",
"user": user_data.get("email", "Unknown"),
"zammad_version": response.headers.get("X-Zammad-Version", "Unknown")
}
else:
error_text = await response.text()
return {
"status": "error",
"message": f"Connection failed: HTTP {response.status}",
"details": error_text
}
except Exception as e:
return {
"status": "error",
"message": f"Connection error: {str(e)}"
}
async def _get_user_configuration(self, user_id: int, config_id: Optional[int] = None) -> Optional[ZammadConfiguration]:
"""Get user configuration by ID or default"""
async with async_session_factory() as db:
if config_id:
stmt = select(ZammadConfiguration).where(
and_(
ZammadConfiguration.id == config_id,
ZammadConfiguration.user_id == user_id,
ZammadConfiguration.is_active == True
)
)
else:
# Get default configuration
stmt = select(ZammadConfiguration).where(
and_(
ZammadConfiguration.user_id == user_id,
ZammadConfiguration.is_active == True,
ZammadConfiguration.is_default == True
)
).order_by(ZammadConfiguration.created_at.desc())
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def _verify_database_tables(self):
"""Verify that required database tables exist"""
# This would be handled by Alembic migrations in production
# For now, just log that we expect the tables to exist
logger.info("Verifying database tables for Zammad module")
async def _start_auto_processing(self):
"""Start auto-processing task if any configurations have it enabled"""
# This would start a background task to periodically check for auto-process configs
# and process new tickets automatically
logger.info("Auto-processing monitoring not implemented yet")
pass
def _init_encryption(self):
"""Initialize encryption for API tokens"""
# Use a fixed key for demo - in production, this should be from environment
key = os.environ.get('ZAMMAD_ENCRYPTION_KEY', 'demo-key-for-zammad-tokens-12345678901234567890123456789012')
# Ensure key is exactly 32 bytes for Fernet
key = key.encode()[:32].ljust(32, b'0')
self.encryption_key = base64.urlsafe_b64encode(key)
self.cipher = Fernet(self.encryption_key)
def _encrypt_data(self, data: str) -> str:
"""Encrypt sensitive data"""
return self.cipher.encrypt(data.encode()).decode()
def _decrypt_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
def _safe_get_customer_email(self, ticket_data: Dict[str, Any]) -> Optional[str]:
"""Safely extract customer email from ticket data"""
try:
customer = ticket_data.get('customer')
if not customer:
return None
# Handle case where customer is a dictionary
if isinstance(customer, dict):
return customer.get('email')
# Handle case where customer is a list (sometimes Zammad returns a list)
elif isinstance(customer, list) and len(customer) > 0:
first_customer = customer[0]
if isinstance(first_customer, dict):
return first_customer.get('email')
# Handle case where customer is just the email string
elif isinstance(customer, str) and '@' in customer:
return customer
return None
except Exception as e:
logger.warning(f"Could not extract customer email from ticket data: {e}")
return None
def _safe_parse_datetime(self, datetime_str: str) -> Optional[datetime]:
"""Safely parse datetime string from Zammad API to timezone-aware datetime"""
if not datetime_str:
return None
try:
# Handle different Zammad datetime formats
if datetime_str.endswith('Z'):
# ISO format with Z suffix: "2025-08-20T12:07:28.857000Z"
return datetime.fromisoformat(datetime_str.replace("Z", "+00:00"))
elif '+' in datetime_str or '-' in datetime_str[-6:]:
# Already has timezone info: "2025-08-20T12:07:28.857000+00:00"
return datetime.fromisoformat(datetime_str)
else:
# No timezone info - assume UTC: "2025-08-20T12:07:28.857000"
dt = datetime.fromisoformat(datetime_str)
if dt.tzinfo is None:
# Make it timezone-aware as UTC
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception as e:
logger.warning(f"Could not parse datetime '{datetime_str}': {e}")
return None
def _to_naive_utc(self, dt: datetime) -> datetime:
"""Convert timezone-aware datetime to naive UTC for database storage"""
if dt is None:
return None
if dt.tzinfo is None:
# Already naive, assume it's UTC
return dt
# Convert to UTC and make naive
return dt.astimezone(timezone.utc).replace(tzinfo=None)

View File

@@ -0,0 +1,241 @@
"""
Database models for Zammad Integration Module
"""
from datetime import datetime
from typing import Optional, Dict, Any
from enum import Enum
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, JSON, ForeignKey, Index
from sqlalchemy.orm import relationship
from app.db.database import Base
class TicketState(str, Enum):
"""Zammad ticket state enumeration"""
NEW = "new"
OPEN = "open"
PENDING_REMINDER = "pending reminder"
PENDING_CLOSE = "pending close"
CLOSED = "closed"
MERGED = "merged"
REMOVED = "removed"
class ProcessingStatus(str, Enum):
"""Ticket processing status"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class ZammadTicket(Base):
"""Model for tracking Zammad tickets and their processing status"""
__tablename__ = "zammad_tickets"
# Primary key
id = Column(Integer, primary_key=True, index=True)
# Zammad ticket information
zammad_ticket_id = Column(Integer, unique=True, index=True, nullable=False)
ticket_number = Column(String, index=True, nullable=False)
title = Column(String, nullable=False)
state = Column(String, nullable=False) # Zammad state
priority = Column(String, nullable=True)
customer_email = Column(String, nullable=True)
# Processing information
processing_status = Column(String, default=ProcessingStatus.PENDING.value, nullable=False)
processed_at = Column(DateTime, nullable=True)
processed_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True)
chatbot_id = Column(String, nullable=True)
# Summary and context
summary = Column(Text, nullable=True)
context_data = Column(JSON, nullable=True) # Original ticket data
error_message = Column(Text, nullable=True)
# Metadata
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Zammad specific metadata
zammad_created_at = Column(DateTime, nullable=True)
zammad_updated_at = Column(DateTime, nullable=True)
zammad_article_count = Column(Integer, default=0, nullable=False)
# Processing configuration snapshot
config_snapshot = Column(JSON, nullable=True) # Config used during processing
# Relationships
processed_by = relationship("User", foreign_keys=[processed_by_user_id])
# Indexes for better query performance
__table_args__ = (
Index("idx_zammad_tickets_status_created", "processing_status", "created_at"),
Index("idx_zammad_tickets_state_processed", "state", "processed_at"),
Index("idx_zammad_tickets_user_status", "processed_by_user_id", "processing_status"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API responses"""
return {
"id": self.id,
"zammad_ticket_id": self.zammad_ticket_id,
"ticket_number": self.ticket_number,
"title": self.title,
"state": self.state,
"priority": self.priority,
"customer_email": self.customer_email,
"processing_status": self.processing_status,
"processed_at": self.processed_at.isoformat() if self.processed_at else None,
"processed_by_user_id": self.processed_by_user_id,
"chatbot_id": self.chatbot_id,
"summary": self.summary,
"error_message": self.error_message,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"zammad_created_at": self.zammad_created_at.isoformat() if self.zammad_created_at else None,
"zammad_updated_at": self.zammad_updated_at.isoformat() if self.zammad_updated_at else None,
"zammad_article_count": self.zammad_article_count
}
class ZammadProcessingLog(Base):
"""Model for logging Zammad processing activities"""
__tablename__ = "zammad_processing_logs"
# Primary key
id = Column(Integer, primary_key=True, index=True)
# Processing batch information
batch_id = Column(String, index=True, nullable=False) # UUID for batch processing
started_at = Column(DateTime, default=datetime.utcnow, nullable=False)
completed_at = Column(DateTime, nullable=True)
initiated_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True)
# Processing configuration
config_used = Column(JSON, nullable=True)
filters_applied = Column(JSON, nullable=True) # State, limit, etc.
# Results
tickets_found = Column(Integer, default=0, nullable=False)
tickets_processed = Column(Integer, default=0, nullable=False)
tickets_failed = Column(Integer, default=0, nullable=False)
tickets_skipped = Column(Integer, default=0, nullable=False)
# Performance metrics
processing_time_seconds = Column(Integer, nullable=True)
average_time_per_ticket = Column(Integer, nullable=True) # milliseconds
# Error tracking
errors_encountered = Column(JSON, nullable=True) # List of error messages
status = Column(String, default="running", nullable=False) # running, completed, failed
# Relationships
initiated_by = relationship("User", foreign_keys=[initiated_by_user_id])
# Indexes
__table_args__ = (
Index("idx_processing_logs_batch_status", "batch_id", "status"),
Index("idx_processing_logs_user_started", "initiated_by_user_id", "started_at"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API responses"""
return {
"id": self.id,
"batch_id": self.batch_id,
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"initiated_by_user_id": self.initiated_by_user_id,
"config_used": self.config_used,
"filters_applied": self.filters_applied,
"tickets_found": self.tickets_found,
"tickets_processed": self.tickets_processed,
"tickets_failed": self.tickets_failed,
"tickets_skipped": self.tickets_skipped,
"processing_time_seconds": self.processing_time_seconds,
"average_time_per_ticket": self.average_time_per_ticket,
"errors_encountered": self.errors_encountered,
"status": self.status
}
class ZammadConfiguration(Base):
"""Model for storing Zammad module configurations per user"""
__tablename__ = "zammad_configurations"
# Primary key
id = Column(Integer, primary_key=True, index=True)
# User association
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Configuration name and description
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
is_default = Column(Boolean, default=False, nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
# Zammad connection settings
zammad_url = Column(String, nullable=False)
api_token_encrypted = Column(String, nullable=False) # Encrypted API token
# Processing settings
chatbot_id = Column(String, nullable=False)
process_state = Column(String, default="open", nullable=False)
max_tickets = Column(Integer, default=10, nullable=False)
skip_existing = Column(Boolean, default=True, nullable=False)
auto_process = Column(Boolean, default=False, nullable=False)
process_interval = Column(Integer, default=30, nullable=False) # minutes
# Customization
summary_template = Column(Text, nullable=True)
custom_settings = Column(JSON, nullable=True) # Additional custom settings
# Metadata
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
last_used_at = Column(DateTime, nullable=True)
# Relationships
user = relationship("User", foreign_keys=[user_id])
# Indexes
__table_args__ = (
Index("idx_zammad_config_user_active", "user_id", "is_active"),
Index("idx_zammad_config_user_default", "user_id", "is_default"),
)
def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]:
"""Convert to dictionary for API responses"""
result = {
"id": self.id,
"user_id": self.user_id,
"name": self.name,
"description": self.description,
"is_default": self.is_default,
"is_active": self.is_active,
"zammad_url": self.zammad_url,
"chatbot_id": self.chatbot_id,
"process_state": self.process_state,
"max_tickets": self.max_tickets,
"skip_existing": self.skip_existing,
"auto_process": self.auto_process,
"process_interval": self.process_interval,
"summary_template": self.summary_template,
"custom_settings": self.custom_settings,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None
}
if include_sensitive:
result["api_token_encrypted"] = self.api_token_encrypted
return result

View File

@@ -0,0 +1,72 @@
name: zammad
version: 1.0.0
description: "AI-powered ticket summarization for Zammad ticketing system"
author: "Enclava Team"
category: "integration"
# Module lifecycle
enabled: true
auto_start: true
dependencies:
- chatbot
optional_dependencies: []
# Module capabilities
provides:
- "ticket_summarization"
- "zammad_integration"
- "batch_processing"
consumes:
- "chatbot_completion"
- "llm_completion"
# API endpoints
endpoints:
- path: "/zammad/configurations"
method: "GET"
description: "List Zammad configurations"
- path: "/zammad/configurations"
method: "POST"
description: "Create new Zammad configuration"
- path: "/zammad/test-connection"
method: "POST"
description: "Test Zammad connection"
- path: "/zammad/process"
method: "POST"
description: "Process tickets for summarization"
- path: "/zammad/status"
method: "GET"
description: "Get processing status and statistics"
# UI Configuration
ui_config:
icon: "ticket"
color: "#3B82F6"
category: "Integration"
# Permissions
permissions:
- name: "zammad.read"
description: "Read Zammad tickets and configurations"
- name: "zammad.write"
description: "Create and update Zammad ticket summaries"
- name: "zammad.configure"
description: "Configure Zammad integration settings"
- name: "chatbot.use"
description: "Use chatbot for AI summarization"
# Health checks
health_checks:
- name: "zammad_connectivity"
description: "Check Zammad API connection"
- name: "chatbot_availability"
description: "Check chatbot module availability"

View File

@@ -48,15 +48,18 @@ qdrant-client==1.7.0
# Text Processing
tiktoken==0.5.1
# NLP and Content Processing (required for RAG module with integrated content processing)
nltk==3.8.1
spacy==3.7.2
# Basic document processing (lightweight)
markitdown==0.0.1a2
python-docx==1.1.0
sentence-transformers==2.6.1
# Optional heavy ML dependencies (commented out for lighter deployments)
# transformers==4.35.2
# Advanced NLP processing (OPTIONAL - only for entity extraction)
# Install requirements-nlp.txt separately if you need these features:
# nltk==3.8.1
# spacy==3.7.2
# Heavy ML dependencies (REMOVED - unused in codebase)
# sentence-transformers==2.6.1 # REMOVED - not used anywhere in codebase
# transformers==4.35.2 # REMOVED - already commented out
# Configuration
pyyaml==6.0.1

View File

@@ -17,7 +17,7 @@ class LiveModuleIntegrationTest:
self.client = httpx.AsyncClient(timeout=30.0)
async def test_all_modules_loaded(self):
"""Test that all 7 modules are loaded and operational"""
"""Test that all 5 modules are loaded and operational"""
print("🧪 Testing module loading...")
response = await self.client.get(f"{self.base_url}/api/v1/modules/")
@@ -27,12 +27,12 @@ class LiveModuleIntegrationTest:
print(f"✓ API Response: {response.status_code}")
print(f"✓ Total modules: {data['total']}")
# Verify we have all 7 modules
assert data["total"] >= 7, f"Expected at least 7 modules, got {data['total']}"
assert data["module_count"] >= 7
# Verify we have all 5 modules (updated after 2025-08-10 cleanup)
assert data["total"] >= 5, f"Expected at least 5 modules, got {data['total']}"
assert data["module_count"] >= 5
assert data["initialized"] is True
expected_modules = ['cache', 'analytics', 'rag', 'content', 'security', 'monitoring', 'config']
expected_modules = ['cache', 'chatbot', 'rag', 'signal', 'workflow']
loaded_modules = [mod["name"] for mod in data["modules"]]
for expected in expected_modules:

View File

@@ -1,6 +1,22 @@
name: enclava
services:
# Database migration service - runs once to apply migrations
enclava-migrate:
build:
context: ./backend
dockerfile: Dockerfile
environment:
- DATABASE_URL=postgresql://enclava_user:enclava_pass@enclava-postgres:5432/enclava_db
depends_on:
- enclava-postgres
command: ["/usr/local/bin/migrate.sh"]
volumes:
- ./backend:/app
networks:
- enclava-net
restart: "no" # Run once and exit
# Main application backend
enclava-backend:
build:
@@ -18,6 +34,7 @@ services:
- ADMIN_PASSWORD=${ADMIN_PASSWORD:-admin123}
- LOG_LLM_PROMPTS=${LOG_LLM_PROMPTS:-false}
depends_on:
- enclava-migrate
- enclava-postgres
- enclava-redis
- enclava-qdrant

View File

@@ -0,0 +1,41 @@
import { NextRequest, NextResponse } from 'next/server'
export async function GET(request: NextRequest) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
// Make request to backend Zammad chatbots endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/chatbots`
const response = await fetch(url, {
method: 'GET',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
}
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error fetching Zammad chatbots:', error)
return NextResponse.json(
{ error: 'Failed to fetch Zammad chatbots' },
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,87 @@
import { NextRequest, NextResponse } from 'next/server'
export async function PUT(request: NextRequest, { params }: { params: { id: string } }) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
const body = await request.json()
const configId = params.id
// Make request to backend Zammad configurations endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/configurations/${configId}`
const response = await fetch(url, {
method: 'PUT',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
},
body: JSON.stringify(body)
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error updating Zammad configuration:', error)
return NextResponse.json(
{ error: 'Failed to update Zammad configuration' },
{ status: 500 }
)
}
}
export async function DELETE(request: NextRequest, { params }: { params: { id: string } }) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
const configId = params.id
// Make request to backend Zammad configurations endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/configurations/${configId}`
const response = await fetch(url, {
method: 'DELETE',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
}
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error deleting Zammad configuration:', error)
return NextResponse.json(
{ error: 'Failed to delete Zammad configuration' },
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,84 @@
import { NextRequest, NextResponse } from 'next/server'
export async function GET(request: NextRequest) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
// Make request to backend Zammad configurations endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/configurations`
const response = await fetch(url, {
method: 'GET',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
}
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error fetching Zammad configurations:', error)
return NextResponse.json(
{ error: 'Failed to fetch Zammad configurations' },
{ status: 500 }
)
}
}
export async function POST(request: NextRequest) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
const body = await request.json()
// Make request to backend Zammad configurations endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/configurations`
const response = await fetch(url, {
method: 'POST',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
},
body: JSON.stringify(body)
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error creating Zammad configuration:', error)
return NextResponse.json(
{ error: 'Failed to create Zammad configuration' },
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,44 @@
import { NextRequest, NextResponse } from 'next/server'
export async function POST(request: NextRequest) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
const body = await request.json()
// Make request to backend Zammad process endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/process`
const response = await fetch(url, {
method: 'POST',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
},
body: JSON.stringify(body)
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error processing Zammad tickets:', error)
return NextResponse.json(
{ error: 'Failed to process Zammad tickets' },
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,51 @@
import { NextRequest, NextResponse } from 'next/server'
export async function GET(request: NextRequest) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
// Get query parameters
const { searchParams } = new URL(request.url)
const limit = searchParams.get('limit')
const offset = searchParams.get('offset')
// Build query string
const queryParams = new URLSearchParams()
if (limit) queryParams.set('limit', limit)
if (offset) queryParams.set('offset', offset)
// Make request to backend Zammad processing-logs endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/processing-logs?${queryParams.toString()}`
const response = await fetch(url, {
method: 'GET',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
}
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error fetching Zammad processing logs:', error)
return NextResponse.json(
{ error: 'Failed to fetch Zammad processing logs' },
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,41 @@
import { NextRequest, NextResponse } from 'next/server'
export async function GET(request: NextRequest) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
// Make request to backend Zammad status endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/status`
const response = await fetch(url, {
method: 'GET',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
}
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error fetching Zammad status:', error)
return NextResponse.json(
{ error: 'Failed to fetch Zammad status' },
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,44 @@
import { NextRequest, NextResponse } from 'next/server'
export async function POST(request: NextRequest) {
try {
// Extract authorization header from the incoming request
const authHeader = request.headers.get('authorization')
if (!authHeader) {
return NextResponse.json(
{ error: 'Authorization header required' },
{ status: 401 }
)
}
const body = await request.json()
// Make request to backend Zammad test-connection endpoint
const baseUrl = process.env.INTERNAL_API_URL || process.env.NEXT_PUBLIC_API_URL
const url = `${baseUrl}/api/v1/zammad/test-connection`
const response = await fetch(url, {
method: 'POST',
headers: {
'Authorization': authHeader,
'Content-Type': 'application/json'
},
body: JSON.stringify(body)
})
const data = await response.json()
if (!response.ok) {
return NextResponse.json(data, { status: response.status })
}
return NextResponse.json(data)
} catch (error) {
console.error('Error testing Zammad connection:', error)
return NextResponse.json(
{ error: 'Failed to test Zammad connection' },
{ status: 500 }
)
}
}

View File

@@ -317,6 +317,16 @@ function ModulesPageContent() {
Configure
</Button>
)}
{module.name === 'zammad' && (
<Button
size="sm"
variant="outline"
onClick={() => window.location.href = '/zammad'}
>
<Settings className="mr-2 h-3 w-3" />
Configure
</Button>
)}
</div>
{module.health && (

View File

@@ -0,0 +1,14 @@
"use client"
import { ProtectedRoute } from "@/components/auth/ProtectedRoute"
import { ZammadConfig } from "@/components/modules/ZammadConfig"
export default function ZammadPage() {
return (
<ProtectedRoute>
<div className="container mx-auto px-4 py-8">
<ZammadConfig />
</div>
</ProtectedRoute>
)
}

View File

@@ -0,0 +1,795 @@
"use client"
import { useState, useEffect } from "react"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
import { Button } from "@/components/ui/button"
import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label"
import { Switch } from "@/components/ui/switch"
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"
import { Textarea } from "@/components/ui/textarea"
import { Badge } from "@/components/ui/badge"
import { Alert, AlertDescription } from "@/components/ui/alert"
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"
import { Dialog, DialogContent, DialogDescription, DialogFooter, DialogHeader, DialogTitle, DialogTrigger } from "@/components/ui/dialog"
import { useToast } from "@/hooks/use-toast"
import {
Settings,
Save,
RefreshCw,
Ticket,
Bot,
Plus,
Edit,
Trash2,
TestTube,
Play,
History,
CheckCircle,
XCircle,
AlertTriangle,
Clock
} from "lucide-react"
interface ZammadConfiguration {
id?: number
name: string
description?: string
is_default: boolean
zammad_url: string
api_token: string
chatbot_id: string
process_state: string
max_tickets: number
skip_existing: boolean
auto_process: boolean
process_interval: number
summary_template?: string
custom_settings?: Record<string, any>
created_at?: string
updated_at?: string
last_used_at?: string
}
interface Chatbot {
id: string
name: string
chatbot_type: string
model: string
description?: string
}
interface ProcessingLog {
id: number
batch_id: string
started_at: string
completed_at?: string
tickets_found: number
tickets_processed: number
tickets_failed: number
tickets_skipped: number
processing_time_seconds?: number
status: string
}
interface ModuleStatus {
module_health: {
status: string
message: string
uptime: number
}
statistics: {
total_tickets: number
processed_tickets: number
failed_tickets: number
success_rate: number
}
}
export function ZammadConfig() {
const { toast } = useToast()
const [configurations, setConfigurations] = useState<ZammadConfiguration[]>([])
const [chatbots, setChatbots] = useState<Chatbot[]>([])
const [processingLogs, setProcessingLogs] = useState<ProcessingLog[]>([])
const [moduleStatus, setModuleStatus] = useState<ModuleStatus | null>(null)
const [loading, setLoading] = useState(true)
const [saving, setSaving] = useState(false)
const [testingConnection, setTestingConnection] = useState(false)
const [processing, setProcessing] = useState(false)
// Form states
const [isDialogOpen, setIsDialogOpen] = useState(false)
const [editingConfig, setEditingConfig] = useState<ZammadConfiguration | null>(null)
const [newConfig, setNewConfig] = useState<Partial<ZammadConfiguration>>({
name: "",
description: "",
is_default: false,
zammad_url: "http://localhost:8080",
api_token: "",
chatbot_id: "",
process_state: "open",
max_tickets: 10,
skip_existing: true,
auto_process: false,
process_interval: 30,
summary_template: "Generate a concise summary of this support ticket including key issues, customer concerns, and any actions taken."
})
useEffect(() => {
fetchData()
}, [])
const fetchData = async () => {
try {
setLoading(true)
await Promise.all([
fetchConfigurations(),
fetchChatbots(),
fetchProcessingLogs(),
fetchModuleStatus()
])
} catch (error) {
console.error("Error fetching Zammad data:", error)
toast({
title: "Error",
description: "Failed to load Zammad configuration",
variant: "destructive"
})
} finally {
setLoading(false)
}
}
const fetchConfigurations = async () => {
const response = await fetch("/api/v1/zammad/configurations", {
headers: {
'Authorization': `Bearer ${localStorage.getItem('token')}`,
}
})
if (response.ok) {
const data = await response.json()
setConfigurations(data.configurations || [])
}
}
const fetchChatbots = async () => {
const response = await fetch("/api/v1/zammad/chatbots", {
headers: {
'Authorization': `Bearer ${localStorage.getItem('token')}`,
}
})
if (response.ok) {
const data = await response.json()
setChatbots(data.chatbots || [])
}
}
const fetchProcessingLogs = async () => {
const response = await fetch("/api/v1/zammad/processing-logs?limit=5", {
headers: {
'Authorization': `Bearer ${localStorage.getItem('token')}`,
}
})
if (response.ok) {
const data = await response.json()
setProcessingLogs(data.logs || [])
}
}
const fetchModuleStatus = async () => {
const response = await fetch("/api/v1/zammad/status", {
headers: {
'Authorization': `Bearer ${localStorage.getItem('token')}`,
}
})
if (response.ok) {
const data = await response.json()
setModuleStatus(data)
}
}
const handleSaveConfiguration = async () => {
try {
setSaving(true)
const url = editingConfig
? `/api/v1/zammad/configurations/${editingConfig.id}`
: "/api/v1/zammad/configurations"
const method = editingConfig ? "PUT" : "POST"
const response = await fetch(url, {
method,
headers: {
"Content-Type": "application/json",
'Authorization': `Bearer ${localStorage.getItem('token')}`,
},
body: JSON.stringify(newConfig)
})
if (!response.ok) {
const errorData = await response.json()
throw new Error(errorData.detail || "Failed to save configuration")
}
toast({
title: "Success",
description: editingConfig
? "Configuration updated successfully"
: "Configuration created successfully"
})
setIsDialogOpen(false)
setEditingConfig(null)
setNewConfig({
name: "",
description: "",
is_default: false,
zammad_url: "http://localhost:8080",
api_token: "",
chatbot_id: "",
process_state: "open",
max_tickets: 10,
skip_existing: true,
auto_process: false,
process_interval: 30,
summary_template: "Generate a concise summary of this support ticket including key issues, customer concerns, and any actions taken."
})
await fetchConfigurations()
} catch (error) {
console.error("Error saving configuration:", error)
toast({
title: "Error",
description: error instanceof Error ? error.message : "Failed to save configuration",
variant: "destructive"
})
} finally {
setSaving(false)
}
}
const handleTestConnection = async () => {
if (!newConfig.zammad_url || !newConfig.api_token) {
toast({
title: "Error",
description: "Please enter Zammad URL and API token",
variant: "destructive"
})
return
}
try {
setTestingConnection(true)
const response = await fetch("/api/v1/zammad/test-connection", {
method: "POST",
headers: {
"Content-Type": "application/json",
'Authorization': `Bearer ${localStorage.getItem('token')}`,
},
body: JSON.stringify({
zammad_url: newConfig.zammad_url,
api_token: newConfig.api_token
})
})
const data = await response.json()
console.log("Test connection response:", data)
if (data.status === "success") {
toast({
title: "✅ Connection Successful",
description: `Connected to Zammad as ${data.user}`,
duration: 5000
})
} else {
toast({
title: "❌ Connection Failed",
description: data.message || "Unknown error occurred",
variant: "destructive",
duration: 8000
})
}
} catch (error) {
console.error("Error testing connection:", error)
toast({
title: "⚠️ Connection Test Error",
description: `Failed to test connection: ${error instanceof Error ? error.message : 'Unknown error'}`,
variant: "destructive",
duration: 8000
})
} finally {
setTestingConnection(false)
}
}
const handleProcessTickets = async (configId?: number) => {
try {
setProcessing(true)
const response = await fetch("/api/v1/zammad/process", {
method: "POST",
headers: {
"Content-Type": "application/json",
'Authorization': `Bearer ${localStorage.getItem('token')}`,
},
body: JSON.stringify({
config_id: configId,
filters: {}
})
})
if (!response.ok) {
const errorData = await response.json()
throw new Error(errorData.detail || "Failed to start processing")
}
const data = await response.json()
toast({
title: "Processing Started",
description: data.message || "Ticket processing has been started"
})
// Refresh logs after a short delay
setTimeout(() => {
fetchProcessingLogs()
fetchModuleStatus()
}, 2000)
} catch (error) {
console.error("Error processing tickets:", error)
toast({
title: "Error",
description: error instanceof Error ? error.message : "Failed to process tickets",
variant: "destructive"
})
} finally {
setProcessing(false)
}
}
const handleDeleteConfiguration = async (id: number) => {
try {
const response = await fetch(`/api/v1/zammad/configurations/${id}`, {
method: "DELETE",
headers: {
'Authorization': `Bearer ${localStorage.getItem('token')}`,
}
})
if (!response.ok) {
throw new Error("Failed to delete configuration")
}
toast({
title: "Success",
description: "Configuration deleted successfully"
})
await fetchConfigurations()
} catch (error) {
console.error("Error deleting configuration:", error)
toast({
title: "Error",
description: "Failed to delete configuration",
variant: "destructive"
})
}
}
const handleEditConfiguration = (config: ZammadConfiguration) => {
setEditingConfig(config)
setNewConfig({
...config,
api_token: "" // Don't pre-fill the API token for security
})
setIsDialogOpen(true)
}
const getStatusIcon = (status: string) => {
switch (status) {
case "completed":
return <CheckCircle className="h-4 w-4 text-green-500" />
case "failed":
return <XCircle className="h-4 w-4 text-red-500" />
case "running":
return <Clock className="h-4 w-4 text-blue-500" />
default:
return <AlertTriangle className="h-4 w-4 text-yellow-500" />
}
}
const getStatusBadge = (status: string) => {
const variants: Record<string, "default" | "secondary" | "destructive" | "outline"> = {
completed: "default",
failed: "destructive",
running: "secondary"
}
return <Badge variant={variants[status] || "outline"}>{status}</Badge>
}
if (loading) {
return (
<div className="container mx-auto px-4 py-8">
<div className="flex items-center justify-center min-h-[400px]">
<div className="animate-spin rounded-full h-12 w-12 border-b-2 border-empire-gold"></div>
</div>
</div>
)
}
return (
<div className="space-y-6">
{/* Header */}
<div className="flex items-center justify-between">
<div>
<h1 className="text-3xl font-bold flex items-center">
<Ticket className="mr-3 h-8 w-8" />
Zammad Integration
</h1>
<p className="text-muted-foreground">
AI-powered ticket summarization for Zammad ticketing systems
</p>
</div>
<div className="flex space-x-2">
<Button onClick={fetchData} variant="outline">
<RefreshCw className="mr-2 h-4 w-4" />
Refresh
</Button>
<Dialog open={isDialogOpen} onOpenChange={setIsDialogOpen}>
<DialogTrigger asChild>
<Button>
<Plus className="mr-2 h-4 w-4" />
Add Configuration
</Button>
</DialogTrigger>
<DialogContent className="max-w-2xl max-h-[80vh] overflow-y-auto">
<DialogHeader>
<DialogTitle>
{editingConfig ? "Edit Configuration" : "Add Zammad Configuration"}
</DialogTitle>
<DialogDescription>
Configure connection to your Zammad instance and processing settings
</DialogDescription>
</DialogHeader>
<div className="space-y-4">
{/* Basic Settings */}
<div className="grid grid-cols-2 gap-4">
<div>
<Label htmlFor="name">Configuration Name</Label>
<Input
id="name"
value={newConfig.name || ""}
onChange={(e) => setNewConfig({ ...newConfig, name: e.target.value })}
placeholder="My Zammad Instance"
/>
</div>
<div className="flex items-center space-x-2">
<Switch
checked={newConfig.is_default || false}
onCheckedChange={(checked) => setNewConfig({ ...newConfig, is_default: checked })}
/>
<Label>Default Configuration</Label>
</div>
</div>
<div>
<Label htmlFor="description">Description</Label>
<Input
id="description"
value={newConfig.description || ""}
onChange={(e) => setNewConfig({ ...newConfig, description: e.target.value })}
placeholder="Optional description"
/>
</div>
{/* Zammad Connection */}
<div className="space-y-4">
<h4 className="font-medium">Zammad Connection</h4>
<div className="grid grid-cols-2 gap-4">
<div>
<Label htmlFor="zammad_url">Zammad URL</Label>
<Input
id="zammad_url"
value={newConfig.zammad_url || ""}
onChange={(e) => setNewConfig({ ...newConfig, zammad_url: e.target.value })}
placeholder="http://localhost:8080"
/>
</div>
<div>
<Label htmlFor="api_token">API Token</Label>
<Input
id="api_token"
type="password"
value={newConfig.api_token || ""}
onChange={(e) => setNewConfig({ ...newConfig, api_token: e.target.value })}
placeholder="Your Zammad API token"
/>
</div>
</div>
<Button
onClick={handleTestConnection}
variant="outline"
disabled={testingConnection}
>
<TestTube className="mr-2 h-4 w-4" />
{testingConnection ? "Testing..." : "Test Connection"}
</Button>
</div>
{/* Chatbot Selection */}
<div>
<Label htmlFor="chatbot_id">Chatbot</Label>
<Select
value={newConfig.chatbot_id || ""}
onValueChange={(value) => setNewConfig({ ...newConfig, chatbot_id: value })}
>
<SelectTrigger>
<SelectValue placeholder="Select a chatbot" />
</SelectTrigger>
<SelectContent>
{chatbots.map((chatbot) => (
<SelectItem key={chatbot.id} value={chatbot.id}>
{chatbot.name} ({chatbot.chatbot_type})
</SelectItem>
))}
</SelectContent>
</Select>
</div>
{/* Processing Settings */}
<div className="space-y-4">
<h4 className="font-medium">Processing Settings</h4>
<div className="grid grid-cols-2 gap-4">
<div>
<Label htmlFor="process_state">Process State</Label>
<Select
value={newConfig.process_state || "open"}
onValueChange={(value) => setNewConfig({ ...newConfig, process_state: value })}
>
<SelectTrigger>
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="open">Open</SelectItem>
<SelectItem value="pending">Pending</SelectItem>
<SelectItem value="closed">Closed</SelectItem>
<SelectItem value="all">All</SelectItem>
</SelectContent>
</Select>
</div>
<div>
<Label htmlFor="max_tickets">Max Tickets Per Run</Label>
<Input
id="max_tickets"
type="number"
min="1"
max="100"
value={newConfig.max_tickets || 10}
onChange={(e) => setNewConfig({ ...newConfig, max_tickets: parseInt(e.target.value) })}
/>
</div>
</div>
<div className="grid grid-cols-2 gap-4">
<div className="flex items-center space-x-2">
<Switch
checked={newConfig.skip_existing || false}
onCheckedChange={(checked) => setNewConfig({ ...newConfig, skip_existing: checked })}
/>
<Label>Skip Existing Summaries</Label>
</div>
<div className="flex items-center space-x-2">
<Switch
checked={newConfig.auto_process || false}
onCheckedChange={(checked) => setNewConfig({ ...newConfig, auto_process: checked })}
/>
<Label>Auto Process</Label>
</div>
</div>
{newConfig.auto_process && (
<div>
<Label htmlFor="process_interval">Process Interval (minutes)</Label>
<Input
id="process_interval"
type="number"
min="5"
max="1440"
value={newConfig.process_interval || 30}
onChange={(e) => setNewConfig({ ...newConfig, process_interval: parseInt(e.target.value) })}
/>
</div>
)}
</div>
{/* Summary Template */}
<div>
<Label htmlFor="summary_template">Summary Template</Label>
<Textarea
id="summary_template"
value={newConfig.summary_template || ""}
onChange={(e) => setNewConfig({ ...newConfig, summary_template: e.target.value })}
placeholder="Custom template for AI summaries"
rows={3}
/>
</div>
</div>
<DialogFooter>
<Button variant="outline" onClick={() => setIsDialogOpen(false)}>
Cancel
</Button>
<Button onClick={handleSaveConfiguration} disabled={saving}>
<Save className="mr-2 h-4 w-4" />
{saving ? "Saving..." : "Save"}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
</div>
</div>
{/* Module Status */}
{moduleStatus && (
<Card>
<CardHeader>
<CardTitle className="flex items-center">
<Settings className="mr-2 h-5 w-5" />
Module Status
</CardTitle>
</CardHeader>
<CardContent>
<div className="grid grid-cols-1 md:grid-cols-4 gap-4">
<div className="text-center">
<div className="text-2xl font-bold">{moduleStatus.statistics.total_tickets}</div>
<p className="text-xs text-muted-foreground">Total Tickets</p>
</div>
<div className="text-center">
<div className="text-2xl font-bold text-green-600">{moduleStatus.statistics.processed_tickets}</div>
<p className="text-xs text-muted-foreground">Processed</p>
</div>
<div className="text-center">
<div className="text-2xl font-bold text-red-600">{moduleStatus.statistics.failed_tickets}</div>
<p className="text-xs text-muted-foreground">Failed</p>
</div>
<div className="text-center">
<div className="text-2xl font-bold">{moduleStatus.statistics.success_rate.toFixed(1)}%</div>
<p className="text-xs text-muted-foreground">Success Rate</p>
</div>
</div>
</CardContent>
</Card>
)}
<Tabs defaultValue="configurations" className="space-y-6">
<TabsList>
<TabsTrigger value="configurations">Configurations</TabsTrigger>
<TabsTrigger value="processing">Processing Logs</TabsTrigger>
</TabsList>
<TabsContent value="configurations" className="space-y-4">
{configurations.length === 0 ? (
<Card>
<CardContent className="text-center py-8">
<p className="text-muted-foreground">No configurations found. Create your first configuration to get started.</p>
</CardContent>
</Card>
) : (
configurations.map((config) => (
<Card key={config.id}>
<CardHeader>
<div className="flex items-center justify-between">
<div>
<CardTitle className="flex items-center">
{config.name}
{config.is_default && (
<Badge variant="secondary" className="ml-2">Default</Badge>
)}
</CardTitle>
<CardDescription>
{config.description || config.zammad_url}
</CardDescription>
</div>
<div className="flex items-center space-x-2">
<Button
size="sm"
variant="outline"
onClick={() => handleProcessTickets(config.id)}
disabled={processing}
>
<Play className="mr-2 h-3 w-3" />
Process
</Button>
<Button
size="sm"
variant="outline"
onClick={() => handleEditConfiguration(config)}
>
<Edit className="mr-2 h-3 w-3" />
Edit
</Button>
<Button
size="sm"
variant="outline"
onClick={() => handleDeleteConfiguration(config.id!)}
>
<Trash2 className="mr-2 h-3 w-3" />
Delete
</Button>
</div>
</div>
</CardHeader>
<CardContent>
<div className="grid grid-cols-1 md:grid-cols-3 gap-4 text-sm">
<div>
<span className="font-medium">State:</span> {config.process_state}
</div>
<div>
<span className="font-medium">Max Tickets:</span> {config.max_tickets}
</div>
<div>
<span className="font-medium">Auto Process:</span> {config.auto_process ? "Yes" : "No"}
</div>
</div>
</CardContent>
</Card>
))
)}
</TabsContent>
<TabsContent value="processing" className="space-y-4">
{processingLogs.length === 0 ? (
<Card>
<CardContent className="text-center py-8">
<p className="text-muted-foreground">No processing logs found.</p>
</CardContent>
</Card>
) : (
processingLogs.map((log) => (
<Card key={log.id}>
<CardHeader>
<div className="flex items-center justify-between">
<div className="flex items-center space-x-3">
{getStatusIcon(log.status)}
<div>
<CardTitle className="text-lg">Batch {log.batch_id.slice(0, 8)}</CardTitle>
<CardDescription>
Started: {new Date(log.started_at).toLocaleString()}
</CardDescription>
</div>
</div>
{getStatusBadge(log.status)}
</div>
</CardHeader>
<CardContent>
<div className="grid grid-cols-2 md:grid-cols-4 gap-4 text-sm">
<div>
<span className="font-medium">Found:</span> {log.tickets_found}
</div>
<div>
<span className="font-medium">Processed:</span> {log.tickets_processed}
</div>
<div>
<span className="font-medium">Failed:</span> {log.tickets_failed}
</div>
<div>
<span className="font-medium">Skipped:</span> {log.tickets_skipped}
</div>
</div>
{log.processing_time_seconds && (
<div className="mt-2 text-sm">
<span className="font-medium">Duration:</span> {log.processing_time_seconds}s
</div>
)}
</CardContent>
</Card>
))
)}
</TabsContent>
</Tabs>
</div>
)
}