diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 2941f2c..8406302 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -11,7 +11,6 @@ from .rag_collection import RagCollection from .rag_document import RagDocument from .chatbot import ChatbotInstance, ChatbotConversation, ChatbotMessage, ChatbotAnalytics from .prompt_template import PromptTemplate, ChatbotPromptVariable -from .workflow import WorkflowDefinition, WorkflowExecution, WorkflowStepLog from .plugin import Plugin, PluginConfiguration, PluginInstance, PluginAuditLog, PluginCronJob, PluginAPIGateway __all__ = [ @@ -28,9 +27,6 @@ __all__ = [ "ChatbotAnalytics", "PromptTemplate", "ChatbotPromptVariable", - "WorkflowDefinition", - "WorkflowExecution", - "WorkflowStepLog", "Plugin", "PluginConfiguration", "PluginInstance", diff --git a/backend/app/models/workflow.py b/backend/app/models/workflow.py deleted file mode 100644 index f1f3bf8..0000000 --- a/backend/app/models/workflow.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Database models for workflow module -""" -from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, JSON, ForeignKey, Enum as SQLEnum -from sqlalchemy.orm import relationship -from sqlalchemy.ext.declarative import declarative_base -from datetime import datetime -import uuid -import enum - -from app.db.database import Base - - -class WorkflowStatus(enum.Enum): - """Workflow execution statuses""" - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - - -class WorkflowDefinition(Base): - """Workflow definition/template""" - __tablename__ = "workflow_definitions" - - id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) - name = Column(String(255), nullable=False) - description = Column(Text) - version = Column(String(50), default="1.0.0") - - # Workflow definition stored as JSON - steps = Column(JSON, nullable=False) - variables = Column(JSON, default={}) - workflow_metadata = Column("metadata", JSON, default={}) - - # Configuration - timeout = Column(Integer) # Timeout in seconds - is_active = Column(Boolean, default=True) - - # Metadata - created_by = Column(String, nullable=False) # User ID - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - executions = relationship("WorkflowExecution", back_populates="workflow", cascade="all, delete-orphan") - - def __repr__(self): - return f"" - - -class WorkflowExecution(Base): - """Workflow execution instance""" - __tablename__ = "workflow_executions" - - id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) - workflow_id = Column(String, ForeignKey("workflow_definitions.id"), nullable=False) - - # Execution state - status = Column(SQLEnum(WorkflowStatus), default=WorkflowStatus.PENDING) - current_step = Column(String) # Current step ID - - # Execution data - input_data = Column(JSON, default={}) - context = Column(JSON, default={}) - results = Column(JSON, default={}) - error = Column(Text) - - # Timing - started_at = Column(DateTime) - completed_at = Column(DateTime) - - # Metadata - executed_by = Column(String, nullable=False) # User ID or system - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - # Relationships - workflow = relationship("WorkflowDefinition", back_populates="executions") - step_logs = relationship("WorkflowStepLog", back_populates="execution", cascade="all, delete-orphan") - - def __repr__(self): - return f"" - - -class WorkflowStepLog(Base): - """Individual step execution log""" - __tablename__ = "workflow_step_logs" - - id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) - execution_id = Column(String, ForeignKey("workflow_executions.id"), nullable=False) - - # Step information - step_id = Column(String, nullable=False) - step_name = Column(String(255), nullable=False) - step_type = Column(String(50), nullable=False) - - # Execution details - status = Column(String(50), nullable=False) # started, completed, failed - input_data = Column(JSON, default={}) - output_data = Column(JSON, default={}) - error = Column(Text) - - # Timing - started_at = Column(DateTime, default=datetime.utcnow) - completed_at = Column(DateTime) - duration_ms = Column(Integer) # Duration in milliseconds - - # Metadata - retry_count = Column(Integer, default=0) - created_at = Column(DateTime, default=datetime.utcnow) - - # Relationships - execution = relationship("WorkflowExecution", back_populates="step_logs") - - def __repr__(self): - return f"" \ No newline at end of file diff --git a/backend/app/services/module_config_manager.py b/backend/app/services/module_config_manager.py index b06308a..b4223b2 100644 --- a/backend/app/services/module_config_manager.py +++ b/backend/app/services/module_config_manager.py @@ -32,7 +32,6 @@ class ModuleManifest: provides: List[str] = None consumes: List[str] = None endpoints: List[Dict] = None - workflow_steps: List[Dict] = None permissions: List[Dict] = None analytics_events: List[Dict] = None health_checks: List[Dict] = None @@ -50,8 +49,6 @@ class ModuleManifest: self.consumes = [] if self.endpoints is None: self.endpoints = [] - if self.workflow_steps is None: - self.workflow_steps = [] if self.permissions is None: self.permissions = [] if self.analytics_events is None: @@ -266,15 +263,6 @@ class ModuleConfigManager: return modules - def get_workflow_steps(self) -> Dict[str, List[Dict]]: - """Get all available workflow steps from modules""" - workflow_steps = {} - - for name, manifest in self.manifests.items(): - if manifest.workflow_steps: - workflow_steps[name] = manifest.workflow_steps - - return workflow_steps async def update_module_status(self, module_name: str, enabled: bool) -> bool: """Update module enabled status""" diff --git a/backend/app/services/module_manager.py b/backend/app/services/module_manager.py index 85e1523..4f6e58b 100644 --- a/backend/app/services/module_manager.py +++ b/backend/app/services/module_manager.py @@ -155,8 +155,7 @@ class ModuleManager: logger.warning("Falling back to legacy module configuration") default_modules = [ - ModuleConfig(name="rag", enabled=True, config={}), - ModuleConfig(name="workflow", enabled=True, config={}) + ModuleConfig(name="rag", enabled=True, config={}) ] for config in default_modules: @@ -591,7 +590,6 @@ class ModuleManager: "provides": manifest.provides, "consumes": manifest.consumes, "endpoints": manifest.endpoints, - "workflow_steps": manifest.workflow_steps, "permissions": manifest.permissions, "ui_config": manifest.ui_config, "has_schema": module_config_manager.get_module_schema(module_name) is not None, @@ -630,9 +628,6 @@ class ModuleManager: log_module_event(module_name, "config_update_failed", {"error": str(e)}) return False - def get_workflow_steps(self) -> Dict[str, List[Dict]]: - """Get all available workflow steps from modules""" - return module_config_manager.get_workflow_steps() async def get_module_health(self, module_name: str) -> Dict: """Get module health status""" diff --git a/backend/app/services/workflow_execution_service.py b/backend/app/services/workflow_execution_service.py deleted file mode 100644 index c743393..0000000 --- a/backend/app/services/workflow_execution_service.py +++ /dev/null @@ -1,434 +0,0 @@ -""" -Workflow Execution Service -Handles workflow execution tracking with proper user context and audit trails -""" -import asyncio -import uuid -from datetime import datetime -from typing import Dict, List, Any, Optional -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import Session -from sqlalchemy import select, update -import json - -from app.core.logging import get_logger -from app.models.workflow import WorkflowDefinition, WorkflowExecution, WorkflowStepLog, WorkflowStatus -from app.models.user import User -from app.utils.exceptions import APIException - -logger = get_logger(__name__) - - -class WorkflowExecutionService: - """Service for managing workflow executions with user context tracking""" - - def __init__(self, db: AsyncSession): - self.db = db - - async def create_execution_record( - self, - workflow_id: str, - user_context: Dict[str, Any], - execution_params: Optional[Dict] = None - ) -> WorkflowExecution: - """Create a new workflow execution record with user context""" - - # Extract user information from context - user_id = user_context.get("user_id") or user_context.get("id", "system") - username = user_context.get("username", "system") - session_id = user_context.get("session_id") - - # Create execution record - execution_record = WorkflowExecution( - id=str(uuid.uuid4()), - workflow_id=workflow_id, - status=WorkflowStatus.PENDING, - input_data=execution_params or {}, - context={ - "user_id": user_id, - "username": username, - "session_id": session_id, - "started_by": "workflow_execution_service", - "created_at": datetime.utcnow().isoformat() - }, - executed_by=str(user_id), - started_at=datetime.utcnow() - ) - - try: - self.db.add(execution_record) - await self.db.commit() - await self.db.refresh(execution_record) - - logger.info(f"Created workflow execution record {execution_record.id} for workflow {workflow_id} by user {username} ({user_id})") - return execution_record - - except Exception as e: - await self.db.rollback() - logger.error(f"Failed to create execution record: {e}") - raise APIException(f"Failed to create execution record: {e}") - - async def start_execution( - self, - execution_id: str, - workflow_context: Optional[Dict[str, Any]] = None - ) -> bool: - """Mark execution as started and update context""" - - try: - # Update execution record to running status - stmt = update(WorkflowExecution).where( - WorkflowExecution.id == execution_id - ).values( - status=WorkflowStatus.RUNNING, - started_at=datetime.utcnow(), - context=workflow_context or {} - ) - - await self.db.execute(stmt) - await self.db.commit() - - logger.info(f"Started workflow execution {execution_id}") - return True - - except Exception as e: - await self.db.rollback() - logger.error(f"Failed to start execution {execution_id}: {e}") - return False - - async def complete_execution( - self, - execution_id: str, - results: Dict[str, Any], - step_history: Optional[List[Dict[str, Any]]] = None - ) -> bool: - """Mark execution as completed with results""" - - try: - # Update execution record - stmt = update(WorkflowExecution).where( - WorkflowExecution.id == execution_id - ).values( - status=WorkflowStatus.COMPLETED, - completed_at=datetime.utcnow(), - results=results - ) - - await self.db.execute(stmt) - - # Log individual steps if provided - if step_history: - await self._log_execution_steps(execution_id, step_history) - - await self.db.commit() - - logger.info(f"Completed workflow execution {execution_id} with {len(results)} results") - return True - - except Exception as e: - await self.db.rollback() - logger.error(f"Failed to complete execution {execution_id}: {e}") - return False - - async def fail_execution( - self, - execution_id: str, - error_message: str, - step_history: Optional[List[Dict[str, Any]]] = None - ) -> bool: - """Mark execution as failed with error details""" - - try: - # Update execution record - stmt = update(WorkflowExecution).where( - WorkflowExecution.id == execution_id - ).values( - status=WorkflowStatus.FAILED, - completed_at=datetime.utcnow(), - error=error_message - ) - - await self.db.execute(stmt) - - # Log individual steps if provided - if step_history: - await self._log_execution_steps(execution_id, step_history) - - await self.db.commit() - - logger.error(f"Failed workflow execution {execution_id}: {error_message}") - return True - - except Exception as e: - await self.db.rollback() - logger.error(f"Failed to record execution failure {execution_id}: {e}") - return False - - async def cancel_execution(self, execution_id: str, reason: str = "User cancelled") -> bool: - """Cancel a workflow execution""" - - try: - stmt = update(WorkflowExecution).where( - WorkflowExecution.id == execution_id - ).values( - status=WorkflowStatus.CANCELLED, - completed_at=datetime.utcnow(), - error=f"Cancelled: {reason}" - ) - - await self.db.execute(stmt) - await self.db.commit() - - logger.info(f"Cancelled workflow execution {execution_id}: {reason}") - return True - - except Exception as e: - await self.db.rollback() - logger.error(f"Failed to cancel execution {execution_id}: {e}") - return False - - async def get_execution_status(self, execution_id: str) -> Optional[WorkflowExecution]: - """Get current execution status and details""" - - try: - stmt = select(WorkflowExecution).where(WorkflowExecution.id == execution_id) - result = await self.db.execute(stmt) - execution = result.scalar_one_or_none() - - if execution: - logger.debug(f"Retrieved execution status for {execution_id}: {execution.status}") - return execution - else: - logger.warning(f"Execution {execution_id} not found") - return None - - except Exception as e: - logger.error(f"Failed to get execution status for {execution_id}: {e}") - return None - - async def get_user_executions( - self, - user_id: str, - limit: int = 50, - status_filter: Optional[WorkflowStatus] = None - ) -> List[WorkflowExecution]: - """Get workflow executions for a specific user""" - - try: - stmt = select(WorkflowExecution).where(WorkflowExecution.executed_by == str(user_id)) - - if status_filter: - stmt = stmt.where(WorkflowExecution.status == status_filter) - - stmt = stmt.order_by(WorkflowExecution.created_at.desc()).limit(limit) - - result = await self.db.execute(stmt) - executions = result.scalars().all() - - logger.debug(f"Retrieved {len(executions)} executions for user {user_id}") - return list(executions) - - except Exception as e: - logger.error(f"Failed to get executions for user {user_id}: {e}") - return [] - - async def get_workflow_executions( - self, - workflow_id: str, - limit: int = 50 - ) -> List[WorkflowExecution]: - """Get all executions for a specific workflow""" - - try: - stmt = select(WorkflowExecution).where( - WorkflowExecution.workflow_id == workflow_id - ).order_by(WorkflowExecution.created_at.desc()).limit(limit) - - result = await self.db.execute(stmt) - executions = result.scalars().all() - - logger.debug(f"Retrieved {len(executions)} executions for workflow {workflow_id}") - return list(executions) - - except Exception as e: - logger.error(f"Failed to get executions for workflow {workflow_id}: {e}") - return [] - - async def get_execution_history(self, execution_id: str) -> List[WorkflowStepLog]: - """Get detailed step history for an execution""" - - try: - stmt = select(WorkflowStepLog).where( - WorkflowStepLog.execution_id == execution_id - ).order_by(WorkflowStepLog.started_at.asc()) - - result = await self.db.execute(stmt) - step_logs = result.scalars().all() - - logger.debug(f"Retrieved {len(step_logs)} step logs for execution {execution_id}") - return list(step_logs) - - except Exception as e: - logger.error(f"Failed to get execution history for {execution_id}: {e}") - return [] - - async def _log_execution_steps( - self, - execution_id: str, - step_history: List[Dict[str, Any]] - ): - """Log individual step executions""" - - try: - step_logs = [] - for step_data in step_history: - step_log = WorkflowStepLog( - id=str(uuid.uuid4()), - execution_id=execution_id, - step_id=step_data.get("step_id", "unknown"), - step_name=step_data.get("step_name", "Unknown Step"), - step_type=step_data.get("step_type", "unknown"), - status=step_data.get("status", "completed"), - input_data=step_data.get("input_data", {}), - output_data=step_data.get("output_data", {}), - error=step_data.get("error"), - started_at=datetime.fromisoformat(step_data.get("started_at", datetime.utcnow().isoformat())), - completed_at=datetime.fromisoformat(step_data.get("completed_at", datetime.utcnow().isoformat())) if step_data.get("completed_at") else None, - duration_ms=step_data.get("duration_ms"), - retry_count=step_data.get("retry_count", 0) - ) - step_logs.append(step_log) - - if step_logs: - self.db.add_all(step_logs) - logger.debug(f"Added {len(step_logs)} step logs for execution {execution_id}") - - except Exception as e: - logger.error(f"Failed to log execution steps for {execution_id}: {e}") - - async def get_execution_statistics( - self, - user_id: Optional[str] = None, - workflow_id: Optional[str] = None, - days: int = 30 - ) -> Dict[str, Any]: - """Get execution statistics for analytics""" - - try: - from sqlalchemy import func - from datetime import timedelta - - # Base query - stmt = select(WorkflowExecution) - - # Apply filters - if user_id: - stmt = stmt.where(WorkflowExecution.executed_by == str(user_id)) - if workflow_id: - stmt = stmt.where(WorkflowExecution.workflow_id == workflow_id) - - # Date filter - cutoff_date = datetime.utcnow() - timedelta(days=days) - stmt = stmt.where(WorkflowExecution.created_at >= cutoff_date) - - # Get all matching executions - result = await self.db.execute(stmt) - executions = result.scalars().all() - - # Calculate statistics - total_executions = len(executions) - completed = len([e for e in executions if e.status == WorkflowStatus.COMPLETED]) - failed = len([e for e in executions if e.status == WorkflowStatus.FAILED]) - cancelled = len([e for e in executions if e.status == WorkflowStatus.CANCELLED]) - running = len([e for e in executions if e.status == WorkflowStatus.RUNNING]) - - # Calculate average execution time for completed workflows - completed_executions = [e for e in executions if e.status == WorkflowStatus.COMPLETED and e.started_at and e.completed_at] - avg_duration = None - if completed_executions: - total_duration = sum([(e.completed_at - e.started_at).total_seconds() for e in completed_executions]) - avg_duration = total_duration / len(completed_executions) - - statistics = { - "total_executions": total_executions, - "completed": completed, - "failed": failed, - "cancelled": cancelled, - "running": running, - "success_rate": (completed / total_executions * 100) if total_executions > 0 else 0, - "failure_rate": (failed / total_executions * 100) if total_executions > 0 else 0, - "average_duration_seconds": avg_duration, - "period_days": days, - "generated_at": datetime.utcnow().isoformat() - } - - logger.debug(f"Generated execution statistics: {statistics}") - return statistics - - except Exception as e: - logger.error(f"Failed to generate execution statistics: {e}") - return { - "error": str(e), - "generated_at": datetime.utcnow().isoformat() - } - - def create_user_context( - self, - user_id: str, - username: Optional[str] = None, - session_id: Optional[str] = None, - additional_context: Optional[Dict[str, Any]] = None - ) -> Dict[str, Any]: - """Create standardized user context for workflow execution""" - - context = { - "user_id": user_id, - "username": username or f"user_{user_id}", - "session_id": session_id or str(uuid.uuid4()), - "timestamp": datetime.utcnow().isoformat(), - "source": "workflow_execution_service" - } - - if additional_context: - context.update(additional_context) - - return context - - def extract_user_context_from_request(self, request_context: Dict[str, Any]) -> Dict[str, Any]: - """Extract user context from API request context""" - - # Try to get user from different possible sources - user = request_context.get("user") or request_context.get("current_user") - - if user: - if isinstance(user, dict): - return self.create_user_context( - user_id=str(user.get("id", "unknown")), - username=user.get("username") or user.get("email"), - session_id=request_context.get("session_id") - ) - else: - # Assume user is a model instance - return self.create_user_context( - user_id=str(getattr(user, 'id', 'unknown')), - username=getattr(user, 'username', None) or getattr(user, 'email', None), - session_id=request_context.get("session_id") - ) - - # Fallback to API key or system context - api_key_id = request_context.get("api_key_id") - if api_key_id: - return self.create_user_context( - user_id=f"api_key_{api_key_id}", - username=f"API Key {api_key_id}", - session_id=request_context.get("session_id"), - additional_context={"auth_type": "api_key"} - ) - - # Last resort: system context - return self.create_user_context( - user_id="system", - username="System", - session_id=request_context.get("session_id"), - additional_context={"auth_type": "system", "note": "No user context available"} - ) \ No newline at end of file diff --git a/backend/modules/chatbot/module.yaml b/backend/modules/chatbot/module.yaml index 5beeb91..7d9fbd3 100644 --- a/backend/modules/chatbot/module.yaml +++ b/backend/modules/chatbot/module.yaml @@ -9,7 +9,6 @@ enabled: true auto_start: true dependencies: - rag - - workflow optional_dependencies: - analytics @@ -22,12 +21,10 @@ provides: - "chat_completion" - "conversation_management" - "chatbot_configuration" - - "workflow_chat_step" consumes: - "rag_search" - "llm_completion" - - "workflow_execution" # API endpoints endpoints: @@ -43,39 +40,6 @@ endpoints: method: "GET" description: "List user chatbots" -# Workflow integration -workflow_steps: - - name: "chatbot_response" - description: "Generate chatbot response with optional RAG context" - inputs: - - name: "message" - type: "string" - required: true - description: "User message to respond to" - - name: "chatbot_id" - type: "string" - required: true - description: "ID of configured chatbot instance" - - name: "use_rag" - type: "boolean" - required: false - default: false - description: "Whether to use RAG for context" - - name: "context" - type: "object" - required: false - description: "Additional context data" - outputs: - - name: "response" - type: "string" - description: "Generated chatbot response" - - name: "conversation_id" - type: "string" - description: "Conversation ID for follow-up" - - name: "sources" - type: "array" - description: "RAG sources used (if any)" - # UI Configuration ui_config: icon: "message-circle" diff --git a/backend/modules/rag/module.yaml b/backend/modules/rag/module.yaml index 8dcba8d..cfe8b53 100644 --- a/backend/modules/rag/module.yaml +++ b/backend/modules/rag/module.yaml @@ -17,7 +17,6 @@ provides: - "semantic_search" - "vector_embeddings" - "document_processing" - - "workflow_rag_step" consumes: - "qdrant_connection" @@ -42,32 +41,6 @@ endpoints: method: "GET" description: "List documents in collection" -# Workflow integration -workflow_steps: - - name: "rag_search" - description: "Search documents for relevant context" - inputs: - - name: "query" - type: "string" - required: true - description: "Search query" - - name: "collection_id" - type: "string" - required: true - description: "Document collection to search" - - name: "top_k" - type: "integer" - required: false - default: 5 - description: "Number of top results to return" - outputs: - - name: "results" - type: "array" - description: "Search results with content and metadata" - - name: "context" - type: "string" - description: "Combined context from top results" - # UI Configuration ui_config: icon: "search" diff --git a/backend/modules/workflow/__init__.py b/backend/modules/workflow/__init__.py deleted file mode 100644 index f1946d7..0000000 --- a/backend/modules/workflow/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -""" -Workflow Module for Confidential Empire - -Provides workflow orchestration capabilities for chaining multiple LLM calls, -conditional logic, and data transformations. -""" - -from .main import WorkflowModule - -__all__ = ["WorkflowModule"] \ No newline at end of file diff --git a/backend/modules/workflow/main.py b/backend/modules/workflow/main.py deleted file mode 100644 index ca3cbd2..0000000 --- a/backend/modules/workflow/main.py +++ /dev/null @@ -1,1656 +0,0 @@ -""" -Workflow Module Implementation - -This module provides workflow orchestration capabilities including: -- Chaining multiple LLM calls -- Conditional logic execution -- Data transformations -- Workflow state management -- Parallel and sequential execution -""" - -import asyncio -import json -import uuid -from datetime import datetime -from typing import Dict, List, Any, Optional, Union, Callable -from enum import Enum -from dataclasses import dataclass, asdict -from pydantic import BaseModel, Field - -from fastapi import APIRouter, HTTPException, Depends -from sqlalchemy.orm import Session -from sqlalchemy import select -from app.core.security import get_current_user -from app.core.logging import get_logger -from app.services.llm.service import llm_service -from app.services.llm.models import ChatRequest as LLMChatRequest, ChatMessage as LLMChatMessage -from app.services.llm.exceptions import LLMError, ProviderError, SecurityError -from app.services.base_module import Permission -from app.db.database import SessionLocal -from app.models.workflow import WorkflowDefinition as DBWorkflowDefinition, WorkflowExecution as DBWorkflowExecution -from app.services.workflow_execution_service import WorkflowExecutionService - -# Import protocols for type hints and dependency injection -from ..protocols import ChatbotServiceProtocol -# Note: LiteLLMClientProtocol replaced with direct LLM service usage - -logger = get_logger(__name__) - - -class WorkflowStepType(str, Enum): - """Types of workflow steps""" - LLM_CALL = "llm_call" - CONDITION = "condition" - TRANSFORM = "transform" - PARALLEL = "parallel" - LOOP = "loop" - DELAY = "delay" - CHATBOT = "chatbot" - # Brand-AI inspired step types - AI_GENERATION = "ai_generation" - AGGREGATE = "aggregate" - OUTPUT = "output" - EMAIL = "email" - STATUS_UPDATE = "status_update" - FILTER = "filter" - MAP = "map" - REDUCE = "reduce" - - -class WorkflowStatus(str, Enum): - """Workflow execution statuses""" - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - - -@dataclass -class WorkflowContext: - """Context passed through workflow execution""" - workflow_id: str - execution_id: str - variables: Dict[str, Any] - results: Dict[str, Any] - metadata: Dict[str, Any] - step_history: List[Dict[str, Any]] - - -class WorkflowStep(BaseModel): - """Base workflow step definition""" - id: str = Field(default_factory=lambda: str(uuid.uuid4())) - name: str - type: WorkflowStepType - config: Dict[str, Any] = Field(default_factory=dict) - conditions: Optional[List[str]] = None # JavaScript-like expressions - retry_count: int = 0 - timeout: Optional[int] = None - enabled: bool = True - - -class LLMCallStep(WorkflowStep): - """LLM call step configuration""" - type: WorkflowStepType = WorkflowStepType.LLM_CALL - model: str - messages: List[Dict[str, str]] - parameters: Dict[str, Any] = Field(default_factory=dict) - output_variable: str = "result" - - -class ConditionalStep(WorkflowStep): - """Conditional execution step""" - type: WorkflowStepType = WorkflowStepType.CONDITION - condition: str # JavaScript-like expression - true_steps: List[WorkflowStep] = Field(default_factory=list) - false_steps: List[WorkflowStep] = Field(default_factory=list) - - -class TransformStep(WorkflowStep): - """Data transformation step""" - type: WorkflowStepType = WorkflowStepType.TRANSFORM - input_variable: str - output_variable: str - transformation: str # Python expression or JSON path - - -class ParallelStep(WorkflowStep): - """Parallel execution step""" - type: WorkflowStepType = WorkflowStepType.PARALLEL - steps: List[WorkflowStep] = Field(default_factory=list) - wait_for_all: bool = True - - -class ChatbotStep(WorkflowStep): - """Chatbot interaction step""" - type: WorkflowStepType = WorkflowStepType.CHATBOT - chatbot_id: str # ID of the chatbot instance to use - message_template: str # Template for user message (supports variable interpolation) - conversation_id: Optional[str] = None # Existing conversation ID (optional) - output_variable: str = "chatbot_response" # Variable name to store response - context_variables: Optional[Dict[str, str]] = None # Map workflow vars to chatbot context - create_new_conversation: bool = False # Whether to create a new conversation each time - save_conversation_id: Optional[str] = None # Variable to save conversation ID to - - -# Brand-AI inspired step classes -class AIGenerationStep(WorkflowStep): - """AI Generation step for various operations""" - type: WorkflowStepType = WorkflowStepType.AI_GENERATION - operation: str # 'market_research', 'brand_names', 'analysis', etc. - model: str = "openrouter/anthropic/claude-3.5-sonnet" - prompt_template: Optional[str] = None - variables: Dict[str, Any] = Field(default_factory=dict) - category: Optional[str] = None # For brand naming categories - temperature: float = 0.7 - max_tokens: int = 1000 - output_key: str = "result" - - -class AggregateStep(WorkflowStep): - """Aggregate multiple inputs into single output""" - type: WorkflowStepType = WorkflowStepType.AGGREGATE - strategy: str = "merge" # 'merge', 'concat', 'sum', 'average' - input_keys: List[str] = Field(default_factory=list) - output_key: str = "aggregated_result" - - -class FilterStep(WorkflowStep): - """Filter data based on conditions""" - type: WorkflowStepType = WorkflowStepType.FILTER - input_key: str - output_key: str - filter_expression: str # Python expression to evaluate - keep_original: bool = False - - -class MapStep(WorkflowStep): - """Transform each item in a collection""" - type: WorkflowStepType = WorkflowStepType.MAP - input_key: str - output_key: str - transform_expression: str # Python expression for transformation - parallel: bool = False - - -class ReduceStep(WorkflowStep): - """Reduce collection to single value""" - type: WorkflowStepType = WorkflowStepType.REDUCE - input_key: str - output_key: str - reduce_expression: str # Python expression for reduction - initial_value: Any = None - - -class OutputStep(WorkflowStep): - """Save data to output destination""" - type: WorkflowStepType = WorkflowStepType.OUTPUT - input_key: str - destination: str = "database" # 'database', 'file', 'api' - format: str = "json" - save_path: Optional[str] = None - - -class EmailStep(WorkflowStep): - """Send email notifications""" - type: WorkflowStepType = WorkflowStepType.EMAIL - recipient: str - subject: str - template: str - variables: Dict[str, Any] = Field(default_factory=dict) - continue_on_failure: bool = True - - -class StatusUpdateStep(WorkflowStep): - """Update workflow or external status""" - type: WorkflowStepType = WorkflowStepType.STATUS_UPDATE - status_key: str - status_value: str - target: str = "workflow" # 'workflow', 'external' - webhook_url: Optional[str] = None - - -class WorkflowDefinition(BaseModel): - """Complete workflow definition""" - id: str = Field(default_factory=lambda: str(uuid.uuid4())) - name: str - description: str = "" - version: str = "1.0.0" - steps: List[WorkflowStep] - variables: Dict[str, Any] = Field(default_factory=dict) - metadata: Dict[str, Any] = Field(default_factory=dict) - timeout: Optional[int] = None - - -class WorkflowExecution(BaseModel): - """Workflow execution state""" - id: str = Field(default_factory=lambda: str(uuid.uuid4())) - workflow_id: str - status: WorkflowStatus = WorkflowStatus.PENDING - current_step: Optional[str] = None - context: Dict[str, Any] = Field(default_factory=dict) - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - error: Optional[str] = None - results: Dict[str, Any] = Field(default_factory=dict) - - -class WorkflowEngine: - """Core workflow execution engine with user context tracking""" - - def __init__(self, chatbot_service: Optional[ChatbotServiceProtocol] = None, execution_service: Optional[WorkflowExecutionService] = None): - self.chatbot_service = chatbot_service - self.execution_service = execution_service - self.executions: Dict[str, WorkflowExecution] = {} - self.workflows: Dict[str, WorkflowDefinition] = {} - - async def execute_workflow(self, workflow: WorkflowDefinition, - input_data: Dict[str, Any] = None, - user_context: Optional[Dict[str, Any]] = None) -> WorkflowExecution: - """Execute a workflow definition with proper user context tracking""" - - # Create user context if not provided - if not user_context: - user_context = {"user_id": "system", "username": "System", "session_id": str(uuid.uuid4())} - - # Create execution record in database if service is available - db_execution = None - if self.execution_service: - try: - db_execution = await self.execution_service.create_execution_record( - workflow_id=workflow.id, - user_context=user_context, - execution_params=input_data - ) - - # Start the execution - await self.execution_service.start_execution( - db_execution.id, - workflow_context={"workflow_name": workflow.name} - ) - - except Exception as e: - logger.error(f"Failed to create database execution record: {e}") - - # Create in-memory execution for backward compatibility - execution = WorkflowExecution( - workflow_id=workflow.id, - status=WorkflowStatus.RUNNING, - started_at=datetime.utcnow() - ) - - # Use database execution ID if available - if db_execution: - execution.id = db_execution.id - - # Initialize context with user information - context = WorkflowContext( - workflow_id=workflow.id, - execution_id=execution.id, - variables={ - **workflow.variables, - **(input_data or {}), - # Add user context to variables for step access - "_user_id": user_context.get("user_id", "system"), - "_username": user_context.get("username", "System"), - "_session_id": user_context.get("session_id") - }, - results={}, - metadata={ - "user_context": user_context, - "execution_started_by": user_context.get("username", "System") - }, - step_history=[] - ) - - try: - logger.info(f"Starting workflow execution {execution.id} for workflow {workflow.name} by user {user_context.get('username', 'System')}") - - # Execute steps - await self._execute_steps(workflow.steps, context) - - execution.status = WorkflowStatus.COMPLETED - execution.results = context.results - execution.completed_at = datetime.utcnow() - - # Update database execution record if available - if self.execution_service and db_execution: - await self.execution_service.complete_execution( - db_execution.id, - context.results, - context.step_history - ) - - logger.info(f"Completed workflow execution {execution.id} successfully") - - except Exception as e: - error_message = str(e) - logger.error(f"Workflow execution {execution.id} failed: {error_message}") - - execution.status = WorkflowStatus.FAILED - execution.error = error_message - execution.completed_at = datetime.utcnow() - - # Update database execution record if available - if self.execution_service and db_execution: - await self.execution_service.fail_execution( - db_execution.id, - error_message, - context.step_history - ) - - self.executions[execution.id] = execution - return execution - - async def _execute_steps(self, steps: List[WorkflowStep], context: WorkflowContext): - """Execute a list of workflow steps""" - for step in steps: - if not step.enabled: - continue - - # Check conditions - if step.conditions and not self._evaluate_conditions(step.conditions, context): - logger.info(f"Skipping step {step.id} due to unmet conditions") - continue - - logger.info(f"Executing step: {step.name} ({step.type})") - context.step_history.append({ - "step_id": step.id, - "step_name": step.name, - "step_type": step.type, - "started_at": datetime.utcnow().isoformat() - }) - - try: - if step.type == WorkflowStepType.LLM_CALL: - await self._execute_llm_step(step, context) - elif step.type == WorkflowStepType.CONDITION: - await self._execute_conditional_step(step, context) - elif step.type == WorkflowStepType.TRANSFORM: - await self._execute_transform_step(step, context) - elif step.type == WorkflowStepType.PARALLEL: - await self._execute_parallel_step(step, context) - elif step.type == WorkflowStepType.DELAY: - await self._execute_delay_step(step, context) - elif step.type == WorkflowStepType.CHATBOT: - await self._execute_chatbot_step(step, context) - # Brand-AI inspired step types - elif step.type == WorkflowStepType.AI_GENERATION: - await self._execute_ai_generation_step(step, context) - elif step.type == WorkflowStepType.AGGREGATE: - await self._execute_aggregate_step(step, context) - elif step.type == WorkflowStepType.FILTER: - await self._execute_filter_step(step, context) - elif step.type == WorkflowStepType.MAP: - await self._execute_map_step(step, context) - elif step.type == WorkflowStepType.REDUCE: - await self._execute_reduce_step(step, context) - elif step.type == WorkflowStepType.OUTPUT: - await self._execute_output_step(step, context) - elif step.type == WorkflowStepType.EMAIL: - await self._execute_email_step(step, context) - elif step.type == WorkflowStepType.STATUS_UPDATE: - await self._execute_status_update_step(step, context) - else: - raise ValueError(f"Unknown step type: {step.type}") - - except Exception as e: - if step.retry_count > 0: - logger.warning(f"Step {step.id} failed, retrying...") - step.retry_count -= 1 - await self._execute_steps([step], context) - else: - raise - - async def _execute_llm_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute an LLM call step with proper user context""" - llm_step = LLMCallStep(**step.dict()) - - # Template message content with context variables - messages = self._template_messages(llm_step.messages, context.variables) - - # Convert messages to LLM service format - llm_messages = [LLMChatMessage(role=msg["role"], content=msg["content"]) for msg in messages] - - # Get user context from workflow metadata - user_context = context.metadata.get("user_context", {}) - user_id = user_context.get("user_id", "system") - - # Create LLM service request with proper user context - llm_request = LLMChatRequest( - model=llm_step.model, - messages=llm_messages, - user_id=str(user_id), # Use actual user ID from context - api_key_id=0, # Workflow module uses internal service - **{k: v for k, v in llm_step.parameters.items() if k in ['temperature', 'max_tokens', 'top_p', 'frequency_penalty', 'presence_penalty', 'stop']} - ) - - # Make LLM call - response = await llm_service.create_chat_completion(llm_request) - - # Store result - result = response.choices[0].message.content if response.choices else "" - context.variables[llm_step.output_variable] = result - context.results[step.id] = result - - logger.info(f"LLM step {step.id} completed for user {user_context.get('username', user_id)}") - - async def _execute_conditional_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute a conditional step""" - cond_step = ConditionalStep(**step.dict()) - - condition_result = self._evaluate_expression(cond_step.condition, context.variables) - - if condition_result: - await self._execute_steps(cond_step.true_steps, context) - else: - await self._execute_steps(cond_step.false_steps, context) - - async def _execute_transform_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute a data transformation step""" - transform_step = TransformStep(**step.dict()) - - input_value = context.variables.get(transform_step.input_variable) - - # Simple transformation evaluation (could be extended) - if transform_step.transformation.startswith("json:"): - # JSON path transformation - result = self._apply_json_path(input_value, transform_step.transformation[5:]) - else: - # Python expression evaluation (limited scope for security) - result = self._evaluate_transform(transform_step.transformation, input_value) - - context.variables[transform_step.output_variable] = result - context.results[step.id] = result - - async def _execute_parallel_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute steps in parallel""" - parallel_step = ParallelStep(**step.dict()) - - # Create tasks for parallel execution - tasks = [] - for sub_step in parallel_step.steps: - # Create a copy of context for each parallel branch - parallel_context = WorkflowContext( - workflow_id=context.workflow_id, - execution_id=context.execution_id, - variables=context.variables.copy(), - results=context.results.copy(), - metadata=context.metadata.copy(), - step_history=context.step_history.copy() - ) - - task = asyncio.create_task(self._execute_steps([sub_step], parallel_context)) - tasks.append((task, parallel_context)) - - # Wait for completion - if parallel_step.wait_for_all: - completed_contexts = [] - for task, parallel_context in tasks: - await task - completed_contexts.append(parallel_context) - - # Merge results back to main context - for parallel_context in completed_contexts: - context.variables.update(parallel_context.variables) - context.results.update(parallel_context.results) - else: - # Wait for any to complete - done, pending = await asyncio.wait([task for task, _ in tasks], - return_when=asyncio.FIRST_COMPLETED) - - # Cancel pending tasks - for task in pending: - task.cancel() - - async def _execute_delay_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute a delay step""" - delay_seconds = step.config.get("seconds", 1) - await asyncio.sleep(delay_seconds) - - async def _execute_chatbot_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute a chatbot interaction step""" - chatbot_step = ChatbotStep(**step.dict()) - - # Template the message with context variables - message = self._template_string(chatbot_step.message_template, context.variables) - - try: - # Use the injected chatbot service - if not self.chatbot_service: - raise ValueError("Chatbot service not available") - - # Prepare context variables for the chatbot - chatbot_context = {} - if chatbot_step.context_variables: - for chatbot_var, workflow_var in chatbot_step.context_variables.items(): - if workflow_var in context.variables: - chatbot_context[chatbot_var] = context.variables[workflow_var] - - # Prepare conversation ID - conversation_id = chatbot_step.conversation_id - if conversation_id and conversation_id in context.variables: - conversation_id = context.variables[conversation_id] - - # Create a chat request object that matches the protocol - from ..chatbot.main import ChatRequest - chat_request = ChatRequest( - message=message, - chatbot_id=chatbot_step.chatbot_id, - conversation_id=conversation_id if not chatbot_step.create_new_conversation else None, - context=chatbot_context - ) - - # Make the chatbot call using the service protocol with proper user context - # Get user context from workflow metadata - user_context = context.metadata.get("user_context", {}) - user_id = user_context.get("user_id", "system") - - response = await self.chatbot_service.chat_completion( - request=chat_request, - user_id=str(user_id), # Use actual user ID from context - db=None # Database session needed for conversation persistence - ) - - # Extract response data for compatibility - response_data = { - "response": response.response, - "conversation_id": response.conversation_id, - "message_id": response.message_id, - "sources": response.sources, - "metadata": response.metadata if hasattr(response, 'metadata') else {} - } - - # Store the response in context variables - context.variables[chatbot_step.output_variable] = response_data.get("response", "") - - # Save conversation ID if requested - if chatbot_step.save_conversation_id and "conversation_id" in response_data: - context.variables[chatbot_step.save_conversation_id] = response_data["conversation_id"] - - # Store complete result for step tracking - context.results[step.id] = { - "response": response_data.get("response", ""), - "conversation_id": response_data.get("conversation_id"), - "message_id": response_data.get("message_id"), - "sources": response_data.get("sources", []), - "metadata": response_data.get("metadata", {}) - } - - logger.info(f"Chatbot step {step.id} completed successfully") - - except Exception as e: - error_msg = f"Chatbot step failed: {str(e)}" - logger.error(error_msg) - - # Store error response and continue workflow - context.variables[chatbot_step.output_variable] = f"Error: {error_msg}" - context.results[step.id] = { - "error": error_msg, - "response": "", - "conversation_id": None - } - - def _template_messages(self, messages: List[Dict[str, str]], - variables: Dict[str, Any]) -> List[Dict[str, str]]: - """Template message content with variables""" - templated = [] - for message in messages: - templated_message = message.copy() - for key, value in templated_message.items(): - if isinstance(value, str): - templated_message[key] = self._template_string(value, variables) - templated.append(templated_message) - return templated - - def _template_string(self, template: str, variables: Dict[str, Any]) -> str: - """Simple string templating with variables""" - try: - return template.format(**variables) - except KeyError as e: - logger.warning(f"Template variable not found: {e}") - return template - - def _evaluate_conditions(self, conditions: List[str], context: WorkflowContext) -> bool: - """Evaluate a list of conditions (all must be true)""" - for condition in conditions: - if not self._evaluate_expression(condition, context.variables): - return False - return True - - def _evaluate_expression(self, expression: str, variables: Dict[str, Any]) -> bool: - """Safely evaluate a boolean expression""" - # Simple expression evaluation (could be enhanced with a proper parser) - try: - # Replace variable references - for var_name, var_value in variables.items(): - if isinstance(var_value, str): - expression = expression.replace(f"${var_name}", f"'{var_value}'") - else: - expression = expression.replace(f"${var_name}", str(var_value)) - - # Evaluate using eval (limited scope for security) - return bool(eval(expression, {"__builtins__": {}}, {})) - except Exception as e: - logger.error(f"Failed to evaluate expression '{expression}': {e}") - return False - - def _evaluate_transform(self, transformation: str, input_value: Any) -> Any: - """Evaluate a transformation expression""" - try: - # Simple transformations - if transformation == "upper": - return str(input_value).upper() - elif transformation == "lower": - return str(input_value).lower() - elif transformation == "length": - return len(input_value) if hasattr(input_value, "__len__") else 0 - elif transformation.startswith("extract:"): - # Extract JSON field - field = transformation[8:] - if isinstance(input_value, dict): - return input_value.get(field) - - return input_value - except Exception as e: - logger.error(f"Transform failed: {e}") - return input_value - - def _apply_json_path(self, data: Any, path: str) -> Any: - """Apply a simple JSON path to extract data""" - try: - parts = path.split(".") - result = data - for part in parts: - if isinstance(result, dict): - result = result.get(part) - elif isinstance(result, list) and part.isdigit(): - result = result[int(part)] - else: - return None - return result - except Exception as e: - logger.error(f"JSON path failed: {e}") - return None - - # Brand-AI inspired step execution methods - async def _execute_ai_generation_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute AI generation step for various operations""" - ai_step = AIGenerationStep(**step.dict()) - - # Prepare variables for templating - variables = {**context.variables, **ai_step.variables} - - # Generate content based on operation type - if ai_step.operation == "market_research": - result = await self._generate_market_research(variables, ai_step) - elif ai_step.operation == "brand_names": - result = await self._generate_brand_names(variables, ai_step) - elif ai_step.operation == "analysis": - result = await self._generate_analysis(variables, ai_step) - elif ai_step.operation == "custom_prompt": - result = await self._generate_custom_prompt(variables, ai_step) - else: - raise ValueError(f"Unknown AI operation: {ai_step.operation}") - - # Store result - context.variables[ai_step.output_key] = result - context.results[step.id] = result - logger.info(f"AI generation step {step.id} completed") - - async def _generate_market_research(self, variables: Dict[str, Any], step: AIGenerationStep) -> str: - """Generate market research content""" - prompt = step.prompt_template or f""" - Conduct market research for the following business: - Industry: {variables.get('industry', 'Not specified')} - Target audience: {variables.get('target_audience', 'Not specified')} - Competitors: {variables.get('competitors', 'Not specified')} - - Provide insights on market trends, opportunities, and competitive landscape. - """ - - messages = [{"role": "user", "content": self._template_string(prompt, variables)}] - - # Convert to LLM service format - llm_messages = [LLMChatMessage(role=msg["role"], content=msg["content"]) for msg in messages] - - llm_request = LLMChatRequest( - model=step.model, - messages=llm_messages, - user_id=str(variables.get("_user_id", "system")), - api_key_id=0, - temperature=step.temperature, - max_tokens=step.max_tokens - ) - - response = await llm_service.create_chat_completion(llm_request) - - return response.choices[0].message.content if response.choices else "" - - async def _generate_brand_names(self, variables: Dict[str, Any], step: AIGenerationStep) -> List[Dict[str, str]]: - """Generate brand names for a specific category""" - category = step.category or "general" - prompt = step.prompt_template or f""" - Generate 10 creative brand names for a {variables.get('industry', 'business')} company. - Category: {category} - Description: {variables.get('description', 'Not specified')} - Target audience: {variables.get('target_audience', 'Not specified')} - - Return names in JSON format: {{"name1": "description1", "name2": "description2", ...}} - """ - - messages = [{"role": "user", "content": self._template_string(prompt, variables)}] - - response = await self.litellm_client.create_chat_completion( - model=step.model, - messages=messages, - user_id=str(variables.get("_user_id", "system")), - api_key_id="workflow", - temperature=step.temperature, - max_tokens=step.max_tokens - ) - - content = response.get("choices", [{}])[0].get("message", {}).get("content", "") - - # Parse JSON response - try: - import json - names_dict = json.loads(content) - return [{"name": name, "description": desc} for name, desc in names_dict.items()] - except json.JSONDecodeError: - logger.error(f"Failed to parse brand names JSON: {content}") - return [] - - async def _generate_analysis(self, variables: Dict[str, Any], step: AIGenerationStep) -> str: - """Generate general analysis content""" - prompt = step.prompt_template or f""" - Analyze the following data: - {json.dumps(variables, indent=2)} - - Provide detailed insights and recommendations. - """ - - messages = [{"role": "user", "content": self._template_string(prompt, variables)}] - - # Convert to LLM service format - llm_messages = [LLMChatMessage(role=msg["role"], content=msg["content"]) for msg in messages] - - llm_request = LLMChatRequest( - model=step.model, - messages=llm_messages, - user_id=str(variables.get("_user_id", "system")), - api_key_id=0, - temperature=step.temperature, - max_tokens=step.max_tokens - ) - - response = await llm_service.create_chat_completion(llm_request) - - return response.choices[0].message.content if response.choices else "" - - async def _generate_custom_prompt(self, variables: Dict[str, Any], step: AIGenerationStep) -> str: - """Generate content using custom prompt template""" - if not step.prompt_template: - raise ValueError("Custom prompt step requires prompt_template") - - messages = [{"role": "user", "content": self._template_string(step.prompt_template, variables)}] - - # Convert to LLM service format - llm_messages = [LLMChatMessage(role=msg["role"], content=msg["content"]) for msg in messages] - - llm_request = LLMChatRequest( - model=step.model, - messages=llm_messages, - user_id=str(variables.get("_user_id", "system")), - api_key_id=0, - temperature=step.temperature, - max_tokens=step.max_tokens - ) - - response = await llm_service.create_chat_completion(llm_request) - - return response.choices[0].message.content if response.choices else "" - - async def _execute_aggregate_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute aggregate step to combine multiple inputs""" - agg_step = AggregateStep(**step.dict()) - - # Collect input values - inputs = [] - for input_key in agg_step.input_keys: - value = context.variables.get(input_key) - if value is not None: - inputs.append(value) - - # Apply aggregation strategy - if agg_step.strategy == "merge": - result = {} - for inp in inputs: - if isinstance(inp, dict): - result.update(inp) - elif agg_step.strategy == "concat": - result = [] - for inp in inputs: - if isinstance(inp, list): - result.extend(inp) - else: - result.append(inp) - elif agg_step.strategy == "sum": - result = sum(inp for inp in inputs if isinstance(inp, (int, float))) - elif agg_step.strategy == "average": - numeric_inputs = [inp for inp in inputs if isinstance(inp, (int, float))] - result = sum(numeric_inputs) / len(numeric_inputs) if numeric_inputs else 0 - else: - result = inputs - - context.variables[agg_step.output_key] = result - context.results[step.id] = result - - async def _execute_filter_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute filter step to filter data based on conditions""" - filter_step = FilterStep(**step.dict()) - - input_data = context.variables.get(filter_step.input_key, []) - - if not isinstance(input_data, list): - input_data = [input_data] - - filtered_data = [] - for item in input_data: - # Create temporary context for evaluation - temp_vars = {**context.variables, "item": item} - if self._evaluate_expression(filter_step.filter_expression, temp_vars): - filtered_data.append(item) - - result = filtered_data - if filter_step.keep_original: - result = {"original": input_data, "filtered": filtered_data} - - context.variables[filter_step.output_key] = result - context.results[step.id] = result - - async def _execute_map_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute map step to transform each item in a collection""" - map_step = MapStep(**step.dict()) - - input_data = context.variables.get(map_step.input_key, []) - - if not isinstance(input_data, list): - input_data = [input_data] - - if map_step.parallel: - # Parallel execution for independent transformations - tasks = [] - for item in input_data: - task = self._transform_item(item, map_step.transform_expression, context.variables) - tasks.append(task) - transformed_data = await asyncio.gather(*tasks) - else: - # Sequential execution - transformed_data = [] - for item in input_data: - transformed_item = await self._transform_item(item, map_step.transform_expression, context.variables) - transformed_data.append(transformed_item) - - context.variables[map_step.output_key] = transformed_data - context.results[step.id] = transformed_data - - async def _transform_item(self, item: Any, expression: str, context_vars: Dict[str, Any]) -> Any: - """Transform a single item using expression""" - # Create temporary context for transformation - temp_vars = {**context_vars, "item": item} - return self._evaluate_transform(expression, item) - - async def _execute_reduce_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute reduce step to reduce collection to single value""" - reduce_step = ReduceStep(**step.dict()) - - input_data = context.variables.get(reduce_step.input_key, []) - - if not isinstance(input_data, list): - input_data = [input_data] - - accumulator = reduce_step.initial_value - - for item in input_data: - temp_vars = {**context.variables, "acc": accumulator, "item": item} - # Simple reduction operations - if reduce_step.reduce_expression == "sum": - accumulator = (accumulator or 0) + (item if isinstance(item, (int, float)) else 0) - elif reduce_step.reduce_expression == "count": - accumulator = (accumulator or 0) + 1 - elif reduce_step.reduce_expression == "max": - accumulator = max(accumulator or float('-inf'), item) if isinstance(item, (int, float)) else accumulator - elif reduce_step.reduce_expression == "min": - accumulator = min(accumulator or float('inf'), item) if isinstance(item, (int, float)) else accumulator - else: - # Custom expression evaluation could be added here - accumulator = item - - context.variables[reduce_step.output_key] = accumulator - context.results[step.id] = accumulator - - async def _execute_output_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute output step to save data""" - output_step = OutputStep(**step.dict()) - - data = context.variables.get(output_step.input_key) - - if output_step.destination == "database": - # Save to database (placeholder - would need actual DB integration) - logger.info(f"Saving data to database: {output_step.save_path}") - context.results[f"{step.id}_saved"] = True - elif output_step.destination == "file": - # Save to file - import json - import os - - os.makedirs(os.path.dirname(output_step.save_path or "/tmp/workflow_output.json"), exist_ok=True) - with open(output_step.save_path or "/tmp/workflow_output.json", "w") as f: - if output_step.format == "json": - json.dump(data, f, indent=2) - else: - f.write(str(data)) - elif output_step.destination == "api": - # Send to API endpoint (placeholder) - logger.info(f"Sending data to API: {output_step.save_path}") - - context.results[step.id] = {"saved": True, "destination": output_step.destination} - - async def _execute_email_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute email step to send notifications""" - email_step = EmailStep(**step.dict()) - - try: - # Template email content - variables = {**context.variables, **email_step.variables} - subject = self._template_string(email_step.subject, variables) - body = self._template_string(email_step.template, variables) - - # Email sending would be implemented here - logger.info(f"Sending email to {email_step.recipient}: {subject}") - - context.results[step.id] = {"sent": True, "recipient": email_step.recipient} - except Exception as e: - if not email_step.continue_on_failure: - raise - logger.error(f"Email step failed but continuing: {e}") - context.results[step.id] = {"sent": False, "error": str(e)} - - async def _execute_status_update_step(self, step: WorkflowStep, context: WorkflowContext): - """Execute status update step""" - status_step = StatusUpdateStep(**step.dict()) - - if status_step.target == "workflow": - context.variables[status_step.status_key] = status_step.status_value - elif status_step.target == "external" and status_step.webhook_url: - # Send webhook (placeholder) - logger.info(f"Sending status update to webhook: {status_step.webhook_url}") - - context.results[step.id] = {"updated": True, "status": status_step.status_value} - - -class WorkflowModule: - """Workflow module for Confidential Empire""" - - def __init__(self, chatbot_service: Optional[ChatbotServiceProtocol] = None): - self.config = {} - self.engine = None - self.chatbot_service = chatbot_service - self.router = APIRouter(prefix="/workflow", tags=["workflow"]) - self.initialized = False - - logger.info("Workflow module created") - - async def initialize(self, config: Optional[Dict[str, Any]] = None): - """Initialize the workflow module""" - if config: - self.config = config - - # Initialize the workflow engine with execution service - # Create execution service if database is available - execution_service = None - try: - from app.db.database import async_session_factory - # Create an async session for the execution service - async_db = async_session_factory() - execution_service = WorkflowExecutionService(async_db) - logger.info("Workflow execution service initialized successfully") - except Exception as e: - logger.warning(f"Failed to initialize execution service: {e}") - - self.engine = WorkflowEngine(chatbot_service=self.chatbot_service, execution_service=execution_service) - self.setup_routes() - self.initialized = True - - logger.info("Workflow module initialized") - - def setup_routes(self): - """Setup workflow API routes""" - - @self.router.post("/execute") - async def execute_workflow( - workflow_def: WorkflowDefinition, - input_data: Optional[Dict[str, Any]] = None, - current_user: Dict[str, Any] = Depends(get_current_user) - ): - """Execute a workflow with proper user context""" - if not self.initialized or not self.engine: - raise HTTPException(status_code=503, detail="Workflow module not initialized") - - try: - # Create user context from authenticated user - user_context = { - "user_id": str(current_user.get("id", "system")), - "username": current_user.get("username") or current_user.get("email", "Unknown User"), - "session_id": str(uuid.uuid4()) - } - - # Execute workflow with user context - execution = await self.engine.execute_workflow( - workflow_def, - input_data, - user_context=user_context - ) - - return { - "execution_id": execution.id, - "status": execution.status, - "results": execution.results if execution.status == WorkflowStatus.COMPLETED else None, - "error": execution.error, - "executed_by": user_context.get("username", "Unknown") - } - except Exception as e: - logger.error(f"Workflow execution failed: {e}") - raise HTTPException(status_code=500, detail=str(e)) - - @self.router.get("/execution/{execution_id}") - async def get_execution(execution_id: str): - """Get workflow execution status""" - if not self.initialized or not self.engine: - raise HTTPException(status_code=503, detail="Workflow module not initialized") - - execution = self.engine.executions.get(execution_id) - if not execution: - raise HTTPException(status_code=404, detail="Execution not found") - - return { - "execution_id": execution.id, - "workflow_id": execution.workflow_id, - "status": execution.status, - "current_step": execution.current_step, - "started_at": execution.started_at, - "completed_at": execution.completed_at, - "results": execution.results, - "error": execution.error - } - - @self.router.post("/validate") - async def validate_workflow(workflow_def: WorkflowDefinition): - """Validate a workflow definition""" - try: - # Basic validation - errors = [] - - if not workflow_def.steps: - errors.append("Workflow must have at least one step") - - # Validate step references - step_ids = {step.id for step in workflow_def.steps} - for step in workflow_def.steps: - if step.type == WorkflowStepType.CONDITION: - cond_step = ConditionalStep(**step.dict()) - for sub_step in cond_step.true_steps + cond_step.false_steps: - if sub_step.id not in step_ids: - errors.append(f"Invalid step reference: {sub_step.id}") - - return { - "valid": len(errors) == 0, - "errors": errors - } - except Exception as e: - return { - "valid": False, - "errors": [str(e)] - } - - @self.router.get("/templates") - async def get_workflow_templates(): - """Get predefined workflow templates""" - # Load chatbot integration templates from file - chatbot_templates = [] - try: - import os - template_file = os.path.join(os.path.dirname(__file__), "templates", "chatbot_integration_examples.json") - if os.path.exists(template_file): - with open(template_file, 'r') as f: - chatbot_data = json.load(f) - for template in chatbot_data.get("templates", []): - chatbot_templates.append({ - "id": template["id"], - "name": template["name"], - "description": template["description"], - "definition": template, - "category": "chatbot_integration" - }) - except Exception as e: - logger.warning(f"Could not load chatbot templates: {e}") - - # Built-in templates - templates = [ - { - "id": "simple_chat", - "name": "Simple Chat Workflow", - "description": "Basic LLM chat interaction", - "definition": { - "name": "Simple Chat", - "steps": [ - { - "name": "Chat Response", - "type": "llm_call", - "model": "gpt-4", - "messages": [ - {"role": "user", "content": "{user_input}"} - ], - "output_variable": "response" - } - ], - "variables": { - "user_input": "Hello, how are you?" - } - } - }, - { - "id": "sentiment_analysis", - "name": "Sentiment Analysis Workflow", - "description": "Analyze text sentiment with follow-up actions", - "definition": { - "name": "Sentiment Analysis", - "steps": [ - { - "name": "Analyze Sentiment", - "type": "llm_call", - "model": "gpt-4", - "messages": [ - { - "role": "system", - "content": "Analyze the sentiment of the following text and respond with only: positive, negative, or neutral" - }, - {"role": "user", "content": "{text_to_analyze}"} - ], - "output_variable": "sentiment" - }, - { - "name": "Conditional Response", - "type": "condition", - "condition": "$sentiment == 'negative'", - "true_steps": [ - { - "name": "Generate Positive Response", - "type": "llm_call", - "model": "gpt-4", - "messages": [ - { - "role": "system", - "content": "Generate a helpful and positive response to address the negative sentiment" - }, - {"role": "user", "content": "{text_to_analyze}"} - ], - "output_variable": "response" - } - ], - "false_steps": [ - { - "name": "Generate Standard Response", - "type": "llm_call", - "model": "gpt-4", - "messages": [ - {"role": "user", "content": "Thank you for your {sentiment} feedback!"} - ], - "output_variable": "response" - } - ] - } - ] - } - } - ] - - # Combine built-in templates with chatbot templates - all_templates = templates + chatbot_templates - return {"templates": all_templates} - - async def intercept_llm_request(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Workflow module request interceptor""" - # Skip interception if module not initialized - if not self.initialized or not self.engine: - return context - - request = context.get("request", {}) - - # Check if this is a workflow execution request - if request.get("workflow_execution"): - workflow_id = request.get("workflow_id") - if workflow_id in self.engine.workflows: - # Execute workflow instead of direct LLM call - workflow = self.engine.workflows[workflow_id] - execution = await self.engine.execute_workflow(workflow, request.get("input_data", {})) - - # Return workflow results - context["workflow_result"] = execution.results - context["skip_llm_call"] = True - - return context - - async def intercept_llm_response(self, context: Dict[str, Any], response: Dict[str, Any]) -> Dict[str, Any]: - """Workflow module response interceptor""" - if context.get("workflow_result"): - # Return workflow results instead of LLM response - return { - "choices": [ - { - "message": { - "role": "assistant", - "content": json.dumps(context["workflow_result"]) - } - } - ], - "usage": {"total_tokens": 0}, - "workflow_execution": True - } - - return response - - def get_interceptor_priority(self) -> int: - """Workflow should run early in the chain""" - return 15 - - async def on_enable(self): - """Called when module is enabled""" - logger.info("Workflow module enabled") - - async def on_disable(self): - """Called when module is disabled""" - logger.info("Workflow module disabled") - - async def get_workflow_templates(self, request_data: Dict[str, Any] = None): - """Get predefined workflow templates - for modules API""" - # Load chatbot integration templates from file - chatbot_templates = [] - try: - import os - template_file = os.path.join(os.path.dirname(__file__), "templates", "chatbot_integration_examples.json") - if os.path.exists(template_file): - with open(template_file, 'r') as f: - chatbot_data = json.load(f) - for template in chatbot_data.get("templates", []): - chatbot_templates.append({ - "id": template["id"], - "name": template["name"], - "description": template["description"], - "definition": template, - "category": "chatbot_integration" - }) - except Exception as e: - logger.warning(f"Could not load chatbot templates: {e}") - - # Built-in templates - templates = [ - { - "id": "simple_chat", - "name": "Simple Chat Workflow", - "description": "Basic LLM chat interaction", - "definition": { - "name": "Simple Chat", - "steps": [ - { - "name": "Chat Response", - "type": "llm_call", - "model": "gpt-4", - "messages": [ - {"role": "user", "content": "{user_input}"} - ], - "output_variable": "response" - } - ], - "variables": { - "user_input": "Hello, how are you?" - } - } - }, - { - "id": "sentiment_analysis", - "name": "Sentiment Analysis Workflow", - "description": "Analyze text sentiment with follow-up actions", - "definition": { - "name": "Sentiment Analysis", - "steps": [ - { - "name": "Analyze Sentiment", - "type": "llm_call", - "model": "gpt-4", - "messages": [ - {"role": "system", "content": "Analyze the sentiment of the following text. Respond with only: positive, negative, or neutral."}, - {"role": "user", "content": "{text_input}"} - ], - "output_variable": "sentiment" - } - ], - "variables": { - "text_input": "I love this product!" - } - } - } - ] - - all_templates = chatbot_templates + templates - return {"templates": all_templates} - - async def execute_workflow(self, request_data: Dict[str, Any]): - """Execute a workflow - for modules API""" - if not self.initialized or not self.engine: - raise HTTPException(status_code=500, detail="Workflow engine not initialized") - - workflow_def = WorkflowDefinition(**request_data.get("workflow_def", {})) - input_data = request_data.get("input_data", {}) - - execution = await self.engine.execute_workflow(workflow_def, input_data) - return { - "execution_id": execution.id, - "status": execution.status.value, - "workflow_id": execution.workflow_id - } - - async def validate_workflow(self, request_data: Dict[str, Any]): - """Validate a workflow definition - for modules API""" - try: - # Basic validation - workflow_def = request_data.get("workflow_def", {}) - errors = [] - - if not workflow_def.get("name"): - errors.append("Workflow must have a name") - - if not workflow_def.get("steps"): - errors.append("Workflow must have at least one step") - - # Validate step references - step_ids = {step["id"] for step in workflow_def.get("steps", []) if "id" in step} - for step in workflow_def.get("steps", []): - if step.get("type") == "condition": - cond_step = ConditionalStep(**step) - for sub_step in cond_step.true_steps + cond_step.false_steps: - if sub_step.id not in step_ids: - errors.append(f"Step '{step['name']}' references unknown step '{sub_step.id}'") - - return { - "valid": len(errors) == 0, - "errors": errors - } - except Exception as e: - return { - "valid": False, - "errors": [f"Validation error: {str(e)}"] - } - - async def get_workflows(self, request_data: Dict[str, Any] = None): - """Get all workflows - for modules API""" - try: - # Create database session - db = SessionLocal() - try: - # Fetch workflows from database - db_workflows = db.query(DBWorkflowDefinition).filter( - DBWorkflowDefinition.is_active == True - ).all() - - # Convert to API format - workflows = [] - for workflow in db_workflows: - workflows.append({ - "id": workflow.id, - "name": workflow.name, - "description": workflow.description, - "version": workflow.version, - "steps": workflow.steps, - "variables": workflow.variables, - "metadata": workflow.workflow_metadata, - "timeout": workflow.timeout, - "created_at": workflow.created_at.isoformat() + "Z", - "updated_at": workflow.updated_at.isoformat() + "Z", - "is_active": workflow.is_active, - "created_by": workflow.created_by - }) - - logger.info(f"Retrieved {len(workflows)} workflows from database") - return {"workflows": workflows} - - finally: - db.close() - - except Exception as e: - logger.error(f"Error getting workflows: {e}") - return {"error": str(e)} - - async def get_workflow(self, request_data: Dict[str, Any]): - """Get a specific workflow - for modules API""" - try: - workflow_id = request_data.get("workflow_id") - if not workflow_id: - return {"error": "workflow_id is required"} - - # Create database session - db = SessionLocal() - try: - # Fetch workflow from database - db_workflow = db.query(DBWorkflowDefinition).filter( - DBWorkflowDefinition.id == workflow_id, - DBWorkflowDefinition.is_active == True - ).first() - - if not db_workflow: - return {"error": f"Workflow {workflow_id} not found"} - - # Convert to API format - workflow = { - "id": db_workflow.id, - "name": db_workflow.name, - "description": db_workflow.description, - "version": db_workflow.version, - "steps": db_workflow.steps, - "variables": db_workflow.variables, - "metadata": db_workflow.workflow_metadata, - "timeout": db_workflow.timeout, - "created_at": db_workflow.created_at.isoformat() + "Z", - "updated_at": db_workflow.updated_at.isoformat() + "Z", - "is_active": db_workflow.is_active, - "created_by": db_workflow.created_by - } - - return {"workflow": workflow} - - finally: - db.close() - - except Exception as e: - logger.error(f"Error getting workflow: {e}") - return {"error": str(e)} - - async def create_workflow(self, request_data: Dict[str, Any]): - """Create a new workflow - for modules API""" - try: - workflow_def = request_data.get("workflow_def", {}) - - # Generate ID if not provided - if "id" not in workflow_def: - workflow_def["id"] = f"wf-{uuid.uuid4().hex[:8]}" - - # Extract required fields - name = workflow_def.get("name", "Untitled Workflow") - description = workflow_def.get("description", "") - version = workflow_def.get("version", "1.0.0") - steps = workflow_def.get("steps", []) - variables = workflow_def.get("variables", {}) - workflow_metadata = workflow_def.get("metadata", {}) - timeout = workflow_def.get("timeout") - - # Create database session - db = SessionLocal() - try: - # Create database record - db_workflow = DBWorkflowDefinition( - id=workflow_def["id"], - name=name, - description=description, - version=version, - steps=steps, - variables=variables, - workflow_metadata=workflow_metadata, - timeout=timeout, - created_by="system", # Note: This method needs user context parameter to track creator properly - is_active=True - ) - - db.add(db_workflow) - db.commit() - db.refresh(db_workflow) - - logger.info(f"Created workflow in database: {workflow_def['id']}") - - # Return workflow data - return { - "workflow": { - "id": db_workflow.id, - "name": db_workflow.name, - "description": db_workflow.description, - "version": db_workflow.version, - "steps": db_workflow.steps, - "variables": db_workflow.variables, - "metadata": db_workflow.workflow_metadata, - "timeout": db_workflow.timeout, - "created_at": db_workflow.created_at.isoformat() + "Z", - "updated_at": db_workflow.updated_at.isoformat() + "Z", - "is_active": db_workflow.is_active - } - } - finally: - db.close() - - except Exception as e: - logger.error(f"Error creating workflow: {e}") - return {"error": str(e)} - - async def update_workflow(self, request_data: Dict[str, Any]): - """Update an existing workflow - for modules API""" - try: - workflow_id = request_data.get("workflow_id") - workflow_def = request_data.get("workflow_def", {}) - - if not workflow_id: - return {"error": "workflow_id is required"} - - # Ensure ID matches - workflow_def["id"] = workflow_id - - # Update timestamp - import datetime - workflow_def["updated_at"] = datetime.datetime.utcnow().isoformat() + "Z" - - # In a real implementation, this would update in the database - logger.info(f"Updated workflow: {workflow_id}") - - return {"workflow": workflow_def} - except Exception as e: - logger.error(f"Error updating workflow: {e}") - return {"error": str(e)} - - async def delete_workflow(self, request_data: Dict[str, Any]): - """Delete a workflow - for modules API""" - try: - workflow_id = request_data.get("workflow_id") - - if not workflow_id: - return {"error": "workflow_id is required"} - - # Create database session - db = SessionLocal() - try: - # Fetch workflow from database - db_workflow = db.query(DBWorkflowDefinition).filter( - DBWorkflowDefinition.id == workflow_id, - DBWorkflowDefinition.is_active == True - ).first() - - if not db_workflow: - return {"error": f"Workflow {workflow_id} not found"} - - # Soft delete - mark as inactive instead of hard delete - # This preserves execution history while making the workflow unavailable - db_workflow.is_active = False - db.commit() - - logger.info(f"Workflow {workflow_id} marked as deleted (soft delete)") - - return {"success": True, "message": f"Workflow {workflow_id} deleted successfully"} - - finally: - db.close() - - except Exception as e: - logger.error(f"Error deleting workflow: {e}") - return {"error": str(e)} - - async def get_executions(self, request_data: Dict[str, Any] = None): - """Get workflow executions - for modules API""" - try: - # For now, return sample execution data - # In a real implementation, this would fetch from a database - executions = [ - { - "id": "exec-1", - "workflow_id": "sample-customer-support", - "status": "completed", - "started_at": "2024-01-01T12:00:00Z", - "completed_at": "2024-01-01T12:05:00Z", - "results": { - "response": "Customer inquiry resolved successfully", - "steps_completed": 3, - "tokens_used": 250 - } - } - ] - - return {"executions": executions} - except Exception as e: - logger.error(f"Error getting executions: {e}") - return {"error": str(e)} - - async def cancel_execution(self, request_data: Dict[str, Any]): - """Cancel a workflow execution - for modules API""" - try: - execution_id = request_data.get("execution_id") - - if not execution_id: - return {"error": "execution_id is required"} - - # In a real implementation, this would cancel the running execution - logger.info(f"Cancelled execution: {execution_id}") - - return {"success": True, "message": f"Execution {execution_id} cancelled successfully"} - except Exception as e: - logger.error(f"Error cancelling execution: {e}") - return {"error": str(e)} - - def get_required_permissions(self) -> List[Permission]: - """Return required permissions for this module""" - return [ - Permission("workflows", "create", "Create workflows"), - Permission("workflows", "execute", "Execute workflows"), - Permission("workflows", "view", "View workflow status"), - Permission("workflows", "manage", "Manage workflows"), - ] - - -# Create module instance (chatbot service will be injected via factory) -workflow_module = WorkflowModule() \ No newline at end of file diff --git a/backend/modules/workflow/module.yaml b/backend/modules/workflow/module.yaml deleted file mode 100644 index 87ad6c3..0000000 --- a/backend/modules/workflow/module.yaml +++ /dev/null @@ -1,145 +0,0 @@ -name: workflow -version: 1.0.0 -description: "Multi-step automation processes" -author: "Enclava Team" -category: "automation" - -# Module lifecycle -enabled: true -auto_start: true -dependencies: [] -optional_dependencies: - - rag - - chatbot - - cache - -# Module capabilities -provides: - - "workflow_execution" - - "step_orchestration" - - "automation_triggers" - - "workflow_templates" - -consumes: - - "llm_completion" - - "rag_search" - - "chatbot_response" - - "external_apis" - -# API endpoints -endpoints: - - path: "/workflow/templates" - method: "GET" - description: "List available workflow templates" - - - path: "/workflow/create" - method: "POST" - description: "Create new workflow" - - - path: "/workflow/execute" - method: "POST" - description: "Execute workflow" - - - path: "/workflow/status/{workflow_id}" - method: "GET" - description: "Get workflow execution status" - - - path: "/workflow/history" - method: "GET" - description: "Get workflow execution history" - -# Workflow integration -workflow_steps: - - name: "conditional_branch" - description: "Conditional logic branching" - inputs: - - name: "condition" - type: "string" - required: true - description: "Condition to evaluate" - - name: "true_path" - type: "object" - required: true - description: "Steps to execute if condition is true" - - name: "false_path" - type: "object" - required: false - description: "Steps to execute if condition is false" - outputs: - - name: "result" - type: "object" - description: "Result from executed branch" - - - name: "loop_iteration" - description: "Iterative processing loop" - inputs: - - name: "items" - type: "array" - required: true - description: "Items to iterate over" - - name: "steps" - type: "object" - required: true - description: "Steps to execute for each item" - outputs: - - name: "results" - type: "array" - description: "Results from each iteration" - -# UI Configuration -ui_config: - icon: "workflow" - color: "#06B6D4" - category: "Automation" - - forms: - - name: "workflow_config" - title: "Workflow Settings" - fields: ["name", "description", "trigger_type"] - - - name: "step_config" - title: "Step Configuration" - fields: ["step_type", "parameters", "retry_attempts"] - - - name: "scheduling" - title: "Scheduling & Triggers" - fields: ["schedule", "webhook_triggers", "event_triggers"] - -# Permissions -permissions: - - name: "workflow.create" - description: "Create new workflows" - - - name: "workflow.execute" - description: "Execute workflows" - - - name: "workflow.configure" - description: "Configure workflow settings" - - - name: "workflow.manage" - description: "Manage all workflows (admin)" - -# Analytics events -analytics_events: - - name: "workflow_created" - description: "New workflow template created" - - - name: "workflow_executed" - description: "Workflow execution started" - - - name: "workflow_completed" - description: "Workflow execution completed" - - - name: "workflow_failed" - description: "Workflow execution failed" - -# Health checks -health_checks: - - name: "execution_engine" - description: "Check workflow execution engine" - - - name: "step_dependencies" - description: "Check availability of workflow step dependencies" - - - name: "template_validation" - description: "Validate workflow templates" \ No newline at end of file diff --git a/backend/modules/workflow/templates/chatbot_integration_examples.json b/backend/modules/workflow/templates/chatbot_integration_examples.json deleted file mode 100644 index 157fc7b..0000000 --- a/backend/modules/workflow/templates/chatbot_integration_examples.json +++ /dev/null @@ -1,389 +0,0 @@ -{ - "templates": [ - { - "id": "simple_chatbot_interaction", - "name": "Simple Chatbot Interaction", - "description": "Basic workflow that processes user input through a configured chatbot", - "version": "1.0", - "variables": { - "user_message": "Hello, I need help with my account", - "chatbot_id": "customer_support_bot" - }, - "steps": [ - { - "id": "chatbot_response", - "name": "Get Chatbot Response", - "type": "chatbot", - "chatbot_id": "{chatbot_id}", - "message_template": "{user_message}", - "output_variable": "bot_response", - "create_new_conversation": true, - "save_conversation_id": "conversation_id" - } - ], - "outputs": { - "response": "{bot_response}", - "conversation_id": "{conversation_id}" - }, - "metadata": { - "created_by": "system", - "use_case": "customer_support", - "tags": ["chatbot", "simple", "customer_support"] - } - }, - { - "id": "multi_turn_customer_support", - "name": "Multi-Turn Customer Support Flow", - "description": "Advanced customer support workflow with intent classification, knowledge base lookup, and escalation", - "version": "1.0", - "variables": { - "user_message": "My order hasn't arrived yet", - "customer_support_chatbot": "support_assistant", - "escalation_chatbot": "human_handoff_bot", - "rag_collection": "support_knowledge_base" - }, - "steps": [ - { - "id": "classify_intent", - "name": "Classify User Intent", - "type": "llm_call", - "model": "gpt-3.5-turbo", - "messages": [ - { - "role": "system", - "content": "You are an intent classifier. Classify the user's message into one of: order_inquiry, technical_support, billing, general_question, escalation_needed. Respond with only the classification." - }, - { - "role": "user", - "content": "{user_message}" - } - ], - "output_variable": "intent" - }, - { - "id": "handle_order_inquiry", - "name": "Handle Order Inquiry", - "type": "chatbot", - "conditions": ["{intent} == 'order_inquiry'"], - "chatbot_id": "{customer_support_chatbot}", - "message_template": "Customer inquiry about order: {user_message}", - "output_variable": "support_response", - "context_variables": { - "intent": "intent", - "inquiry_type": "order_inquiry" - } - }, - { - "id": "handle_technical_support", - "name": "Handle Technical Support", - "type": "chatbot", - "conditions": ["{intent} == 'technical_support'"], - "chatbot_id": "{customer_support_chatbot}", - "message_template": "Technical support request: {user_message}", - "output_variable": "support_response", - "context_variables": { - "intent": "intent", - "inquiry_type": "technical_support" - } - }, - { - "id": "escalate_to_human", - "name": "Escalate to Human Agent", - "type": "chatbot", - "conditions": ["{intent} == 'escalation_needed'"], - "chatbot_id": "{escalation_chatbot}", - "message_template": "Customer needs human assistance: {user_message}", - "output_variable": "escalation_response" - }, - { - "id": "general_response", - "name": "General Support Response", - "type": "chatbot", - "conditions": ["{intent} == 'general_question' or {intent} == 'billing'"], - "chatbot_id": "{customer_support_chatbot}", - "message_template": "{user_message}", - "output_variable": "support_response" - }, - { - "id": "format_final_response", - "name": "Format Final Response", - "type": "transform", - "input_variable": "support_response", - "output_variable": "final_response", - "transformation": "extract:response" - } - ], - "outputs": { - "intent": "{intent}", - "response": "{final_response}", - "escalation_response": "{escalation_response}" - }, - "error_handling": { - "retry_failed_steps": true, - "max_retries": 2, - "fallback_response": "I apologize, but I'm experiencing technical difficulties. Please try again later or contact support directly." - }, - "metadata": { - "created_by": "system", - "use_case": "customer_support", - "tags": ["chatbot", "multi_turn", "intent_classification", "escalation"] - } - }, - { - "id": "research_assistant_workflow", - "name": "AI Research Assistant", - "description": "Research workflow that uses specialized chatbots for different research tasks", - "version": "1.0", - "variables": { - "research_topic": "artificial intelligence trends 2024", - "researcher_chatbot": "ai_researcher", - "analyst_chatbot": "data_analyst", - "writer_chatbot": "content_writer" - }, - "steps": [ - { - "id": "initial_research", - "name": "Conduct Initial Research", - "type": "chatbot", - "chatbot_id": "{researcher_chatbot}", - "message_template": "Please research the following topic and provide key findings: {research_topic}", - "output_variable": "research_findings", - "create_new_conversation": true, - "save_conversation_id": "research_conversation" - }, - { - "id": "analyze_findings", - "name": "Analyze Research Findings", - "type": "chatbot", - "chatbot_id": "{analyst_chatbot}", - "message_template": "Please analyze these research findings and identify key trends and insights: {research_findings}", - "output_variable": "analysis_results", - "create_new_conversation": true - }, - { - "id": "create_summary", - "name": "Create Executive Summary", - "type": "chatbot", - "chatbot_id": "{writer_chatbot}", - "message_template": "Create an executive summary based on this research and analysis:\n\nTopic: {research_topic}\nResearch: {research_findings}\nAnalysis: {analysis_results}", - "output_variable": "executive_summary" - }, - { - "id": "follow_up_questions", - "name": "Generate Follow-up Questions", - "type": "chatbot", - "chatbot_id": "{researcher_chatbot}", - "message_template": "Based on this research on {research_topic}, what are 5 important follow-up questions that should be investigated further?", - "output_variable": "follow_up_questions", - "conversation_id": "research_conversation" - } - ], - "outputs": { - "research_findings": "{research_findings}", - "analysis": "{analysis_results}", - "summary": "{executive_summary}", - "next_steps": "{follow_up_questions}", - "conversation_id": "{research_conversation}" - }, - "metadata": { - "created_by": "system", - "use_case": "research_automation", - "tags": ["chatbot", "research", "analysis", "multi_agent"] - } - }, - { - "id": "content_creation_pipeline", - "name": "AI Content Creation Pipeline", - "description": "Multi-stage content creation using different specialized chatbots", - "version": "1.0", - "variables": { - "content_brief": "Write a blog post about sustainable technology innovations", - "target_audience": "tech-savvy professionals", - "content_length": "1500 words", - "research_bot": "researcher_assistant", - "writer_bot": "creative_writer", - "editor_bot": "content_editor" - }, - "steps": [ - { - "id": "research_phase", - "name": "Research Content Topic", - "type": "chatbot", - "chatbot_id": "{research_bot}", - "message_template": "Research this content brief: {content_brief}. Target audience: {target_audience}. Provide key points, statistics, and current trends.", - "output_variable": "research_data", - "create_new_conversation": true, - "save_conversation_id": "content_conversation" - }, - { - "id": "create_outline", - "name": "Create Content Outline", - "type": "chatbot", - "chatbot_id": "{writer_bot}", - "message_template": "Create a detailed outline for: {content_brief}\nTarget audience: {target_audience}\nLength: {content_length}\nResearch data: {research_data}", - "output_variable": "content_outline" - }, - { - "id": "write_content", - "name": "Write First Draft", - "type": "chatbot", - "chatbot_id": "{writer_bot}", - "message_template": "Write the full content based on this outline: {content_outline}\nBrief: {content_brief}\nResearch: {research_data}\nTarget length: {content_length}", - "output_variable": "first_draft" - }, - { - "id": "edit_content", - "name": "Edit and Polish Content", - "type": "chatbot", - "chatbot_id": "{editor_bot}", - "message_template": "Please edit and improve this content for clarity, engagement, and professional tone:\n\n{first_draft}", - "output_variable": "final_content" - }, - { - "id": "generate_metadata", - "name": "Generate SEO Metadata", - "type": "parallel", - "steps": [ - { - "id": "create_title_options", - "name": "Generate Title Options", - "type": "chatbot", - "chatbot_id": "{writer_bot}", - "message_template": "Generate 5 compelling SEO-optimized titles for this content: {final_content}", - "output_variable": "title_options" - }, - { - "id": "create_meta_description", - "name": "Create Meta Description", - "type": "chatbot", - "chatbot_id": "{editor_bot}", - "message_template": "Create an SEO-optimized meta description (150-160 characters) for this content: {final_content}", - "output_variable": "meta_description" - } - ] - } - ], - "outputs": { - "research": "{research_data}", - "outline": "{content_outline}", - "draft": "{first_draft}", - "final_content": "{final_content}", - "titles": "{title_options}", - "meta_description": "{meta_description}", - "conversation_id": "{content_conversation}" - }, - "metadata": { - "created_by": "system", - "use_case": "content_creation", - "tags": ["chatbot", "content", "writing", "parallel", "seo"] - } - }, - { - "id": "demo_chatbot_workflow", - "name": "Demo Interactive Chatbot", - "description": "A demonstration workflow showcasing interactive chatbot capabilities with multi-turn conversation, context awareness, and intelligent response handling", - "version": "1.0.0", - "steps": [ - { - "id": "welcome_interaction", - "name": "Welcome User", - "type": "chatbot", - "chatbot_id": "demo_assistant", - "message_template": "Hello! I'm a demo AI assistant. What can I help you with today? Feel free to ask me about anything - technology, general questions, or just have a conversation!", - "output_variable": "welcome_response", - "create_new_conversation": true, - "save_conversation_id": "demo_conversation_id", - "enabled": true - }, - { - "id": "analyze_user_intent", - "name": "Analyze User Intent", - "type": "llm_call", - "model": "gpt-3.5-turbo", - "messages": [ - { - "role": "system", - "content": "Analyze the user's response and classify their intent. Categories: question, casual_chat, technical_help, information_request, creative_task, other. Respond with only the category name." - }, - { - "role": "user", - "content": "{user_input}" - } - ], - "output_variable": "user_intent", - "parameters": { - "temperature": 0.3, - "max_tokens": 50 - }, - "enabled": true - }, - { - "id": "personalized_response", - "name": "Generate Personalized Response", - "type": "chatbot", - "chatbot_id": "demo_assistant", - "message_template": "User intent: {user_intent}. User message: {user_input}. Please provide a helpful, engaging response tailored to their specific need.", - "output_variable": "personalized_response", - "conversation_id": "demo_conversation_id", - "context_variables": { - "intent": "user_intent", - "previous_welcome": "welcome_response" - }, - "enabled": true - }, - { - "id": "follow_up_suggestions", - "name": "Generate Follow-up Suggestions", - "type": "llm_call", - "model": "gpt-3.5-turbo", - "messages": [ - { - "role": "system", - "content": "Based on the conversation, suggest 2-3 relevant follow-up questions or topics the user might be interested in. Format as a simple list." - }, - { - "role": "user", - "content": "User intent: {user_intent}, Response given: {personalized_response}" - } - ], - "output_variable": "follow_up_suggestions", - "parameters": { - "temperature": 0.7, - "max_tokens": 150 - }, - "enabled": true - }, - { - "id": "conversation_summary", - "name": "Create Conversation Summary", - "type": "transform", - "input_variable": "personalized_response", - "output_variable": "conversation_summary", - "transformation": "extract:content", - "enabled": true - } - ], - "variables": { - "user_input": "I'm interested in learning about artificial intelligence and how it's changing the world", - "demo_assistant": "assistant" - }, - "metadata": { - "created_by": "demo_system", - "use_case": "demonstration", - "tags": ["demo", "chatbot", "interactive", "multi_turn"], - "demo_instructions": { - "description": "This workflow demonstrates key chatbot capabilities including conversation continuity, intent analysis, personalized responses, and follow-up suggestions.", - "usage": "Execute this workflow with different user inputs to see how the chatbot adapts its responses based on intent analysis and conversation context.", - "features": [ - "Multi-turn conversation with persistent conversation ID", - "Intent classification for tailored responses", - "Context-aware personalized interactions", - "Automatic follow-up suggestions", - "Conversation summarization" - ] - } - }, - "timeout": 300 - } - ] -} \ No newline at end of file diff --git a/backend/tests/e2e/test_chatbot_rag_workflow.py b/backend/tests/e2e/test_chatbot_rag_workflow.py deleted file mode 100644 index a195918..0000000 --- a/backend/tests/e2e/test_chatbot_rag_workflow.py +++ /dev/null @@ -1,471 +0,0 @@ -""" -Complete chatbot workflow tests with RAG integration. -Test the entire pipeline from document upload to chat responses with knowledge retrieval. -""" - -import pytest -import asyncio -from typing import Dict, Any, List - -from tests.clients.chatbot_api_client import ChatbotAPITestClient -from tests.fixtures.test_data_manager import TestDataManager - - -class TestChatbotRAGWorkflow: - """Test complete chatbot workflow with RAG integration""" - - BASE_URL = "http://localhost:3001" # Through nginx - - @pytest.fixture - async def api_client(self): - """Chatbot API test client""" - return ChatbotAPITestClient(self.BASE_URL) - - @pytest.fixture - async def authenticated_client(self, api_client): - """Pre-authenticated API client""" - # Register and authenticate test user - email = "ragtest@example.com" - password = "testpass123" - username = "ragtestuser" - - # Register user - register_result = await api_client.register_user(email, password, username) - if register_result["status_code"] not in [201, 409]: # 409 = already exists - pytest.fail(f"Failed to register user: {register_result}") - - # Authenticate - auth_result = await api_client.authenticate(email, password) - if not auth_result["success"]: - pytest.fail(f"Failed to authenticate: {auth_result}") - - return api_client - - @pytest.fixture - def sample_documents(self): - """Sample documents for RAG testing""" - return { - "installation_guide": { - "filename": "installation_guide.md", - "content": """ - # Enclava Platform Installation Guide - - ## System Requirements - - Python 3.8 or higher - - Docker and Docker Compose - - PostgreSQL 13+ - - Redis 6+ - - At least 4GB RAM - - ## Installation Steps - 1. Clone the repository - 2. Copy .env.example to .env - 3. Run docker-compose up --build - 4. Access the application at http://localhost:3000 - - ## Troubleshooting - - If port 3000 is in use, modify docker-compose.yml - - Check Docker daemon is running - - Ensure all required ports are available - """, - "test_questions": [ - { - "question": "What are the system requirements for Enclava?", - "expected_keywords": ["Python 3.8", "Docker", "PostgreSQL", "Redis", "4GB RAM"], - "min_keywords": 3 - }, - { - "question": "How do I install Enclava?", - "expected_keywords": ["clone", "repository", ".env", "docker-compose up", "localhost:3000"], - "min_keywords": 3 - }, - { - "question": "What should I do if port 3000 is in use?", - "expected_keywords": ["modify", "docker-compose.yml", "port"], - "min_keywords": 2 - } - ] - }, - "api_reference": { - "filename": "api_reference.md", - "content": """ - # Enclava API Reference - - ## Authentication - All API requests require authentication using Bearer tokens or API keys. - - ## Endpoints - - ### GET /api/v1/models - List available AI models - Response: {"data": [{"id": "model-name", "object": "model", ...}]} - - ### POST /api/v1/chat/completions - Create chat completion - Body: {"model": "model-name", "messages": [...], "temperature": 0.7} - Response: {"choices": [{"message": {"content": "response"}}]} - - ### POST /api/v1/embeddings - Generate text embeddings - Body: {"model": "embedding-model", "input": "text to embed"} - Response: {"data": [{"embedding": [...]}]} - - ## Rate Limits - - Free tier: 60 requests per minute - - Pro tier: 600 requests per minute - """, - "test_questions": [ - { - "question": "How do I authenticate with the Enclava API?", - "expected_keywords": ["Bearer token", "API key", "authentication"], - "min_keywords": 2 - }, - { - "question": "What is the endpoint for chat completions?", - "expected_keywords": ["/api/v1/chat/completions", "POST"], - "min_keywords": 1 - }, - { - "question": "What are the rate limits?", - "expected_keywords": ["60 requests", "600 requests", "per minute", "free tier", "pro tier"], - "min_keywords": 3 - } - ] - } - } - - @pytest.mark.asyncio - async def test_complete_rag_workflow(self, authenticated_client, sample_documents): - """Test complete RAG workflow from document upload to chat response""" - - # Test with installation guide document - doc_info = sample_documents["installation_guide"] - - result = await authenticated_client.test_rag_workflow( - collection_name="Installation Guide Collection", - document_content=doc_info["content"], - chatbot_name="Installation Assistant", - test_question=doc_info["test_questions"][0]["question"] - ) - - assert result["success"], f"RAG workflow failed: {result.get('error')}" - assert result["workflow_complete"], "Workflow did not complete successfully" - assert result["rag_working"], "RAG functionality is not working" - - # Verify all workflow steps succeeded - workflow_results = result["results"] - assert workflow_results["collection_creation"]["success"] - assert workflow_results["document_upload"]["success"] - assert workflow_results["document_processing"]["success"] - assert workflow_results["chatbot_creation"]["success"] - assert workflow_results["api_key_creation"]["success"] - assert workflow_results["chat_test"]["success"] - - # Verify RAG sources were provided - rag_verification = workflow_results["rag_verification"] - assert rag_verification["has_sources"] - assert rag_verification["source_count"] > 0 - - @pytest.mark.asyncio - async def test_rag_knowledge_accuracy(self, authenticated_client, sample_documents): - """Test RAG system accuracy with known documents and questions""" - - for doc_key, doc_info in sample_documents.items(): - # Create RAG workflow for this document - workflow_result = await authenticated_client.test_rag_workflow( - collection_name=f"Test Collection - {doc_key}", - document_content=doc_info["content"], - chatbot_name=f"Test Assistant - {doc_key}", - test_question=doc_info["test_questions"][0]["question"] # Use first question for setup - ) - - if not workflow_result["success"]: - pytest.fail(f"Failed to set up RAG workflow for {doc_key}: {workflow_result.get('error')}") - - # Extract chatbot info for testing - chatbot_id = workflow_result["results"]["chatbot_creation"]["data"]["id"] - api_key = workflow_result["results"]["api_key_creation"]["data"]["key"] - - # Test each question for this document - for question_data in doc_info["test_questions"]: - chat_result = await authenticated_client.chat_with_bot( - chatbot_id=chatbot_id, - message=question_data["question"], - api_key=api_key - ) - - assert chat_result["success"], f"Chat failed for question: {question_data['question']}" - - # Analyze response accuracy - response_text = chat_result["data"]["response"].lower() - keywords_found = sum( - 1 for keyword in question_data["expected_keywords"] - if keyword.lower() in response_text - ) - - accuracy = keywords_found / len(question_data["expected_keywords"]) - min_accuracy = question_data["min_keywords"] / len(question_data["expected_keywords"]) - - assert accuracy >= min_accuracy, \ - f"Accuracy {accuracy:.2f} below minimum {min_accuracy:.2f} for question: {question_data['question']} in {doc_key}" - - # Verify sources were provided - assert "sources" in chat_result["data"], f"No sources provided for question in {doc_key}" - assert len(chat_result["data"]["sources"]) > 0, f"Empty sources for question in {doc_key}" - - @pytest.mark.asyncio - async def test_conversation_memory_with_rag(self, authenticated_client, sample_documents): - """Test conversation memory functionality with RAG""" - - # Set up RAG chatbot - doc_info = sample_documents["api_reference"] - workflow_result = await authenticated_client.test_rag_workflow( - collection_name="Memory Test Collection", - document_content=doc_info["content"], - chatbot_name="Memory Test Assistant", - test_question="What is the API reference?" - ) - - assert workflow_result["success"], f"Failed to set up RAG workflow: {workflow_result.get('error')}" - - chatbot_id = workflow_result["results"]["chatbot_creation"]["data"]["id"] - api_key = workflow_result["results"]["api_key_creation"]["data"]["key"] - - # Test conversation memory - memory_result = await authenticated_client.test_conversation_memory(chatbot_id, api_key) - - # Verify conversation was maintained - assert memory_result["conversation_maintained"], "Conversation ID was not maintained across messages" - - # Verify memory is working (may be challenging with RAG, so we're lenient) - conversation_results = memory_result["conversation_results"] - assert len(conversation_results) >= 3, "Not all conversation messages were processed" - - # All messages should have gotten responses - for result in conversation_results: - assert "response" in result or "error" in result, "Message did not get a response" - - @pytest.mark.asyncio - async def test_multi_document_rag(self, authenticated_client, sample_documents): - """Test RAG with multiple documents in one collection""" - - # Create collection - collection_result = await authenticated_client.create_rag_collection( - name="Multi-Document Collection", - description="Collection with multiple documents for testing" - ) - assert collection_result["success"], f"Failed to create collection: {collection_result}" - - collection_id = collection_result["data"]["id"] - - # Upload multiple documents - uploaded_docs = [] - for doc_key, doc_info in sample_documents.items(): - upload_result = await authenticated_client.upload_document( - collection_id=collection_id, - file_content=doc_info["content"], - filename=doc_info["filename"] - ) - - assert upload_result["success"], f"Failed to upload {doc_key}: {upload_result}" - - # Wait for processing - doc_id = upload_result["data"]["id"] - processing_result = await authenticated_client.wait_for_document_processing(doc_id) - assert processing_result["success"], f"Processing failed for {doc_key}: {processing_result}" - - uploaded_docs.append(doc_key) - - # Create chatbot with access to all documents - chatbot_result = await authenticated_client.create_chatbot( - name="Multi-Doc Assistant", - use_rag=True, - rag_collection="Multi-Document Collection" - ) - assert chatbot_result["success"], f"Failed to create chatbot: {chatbot_result}" - - chatbot_id = chatbot_result["data"]["id"] - - # Create API key - api_key_result = await authenticated_client.create_api_key_for_chatbot(chatbot_id) - assert api_key_result["success"], f"Failed to create API key: {api_key_result}" - - api_key = api_key_result["data"]["key"] - - # Test questions that should draw from different documents - test_questions = [ - "How do I install Enclava?", # Should use installation guide - "What are the API endpoints?", # Should use API reference - "Tell me about both installation and API usage" # Should use both documents - ] - - for question in test_questions: - chat_result = await authenticated_client.chat_with_bot( - chatbot_id=chatbot_id, - message=question, - api_key=api_key - ) - - assert chat_result["success"], f"Chat failed for multi-doc question: {question}" - assert "sources" in chat_result["data"], f"No sources for multi-doc question: {question}" - assert len(chat_result["data"]["sources"]) > 0, f"Empty sources for multi-doc question: {question}" - - @pytest.mark.asyncio - async def test_rag_collection_isolation(self, authenticated_client, sample_documents): - """Test that RAG collections are properly isolated""" - - # Create two separate collections with different documents - doc1 = sample_documents["installation_guide"] - doc2 = sample_documents["api_reference"] - - # Collection 1 with installation guide - workflow1 = await authenticated_client.test_rag_workflow( - collection_name="Installation Only Collection", - document_content=doc1["content"], - chatbot_name="Installation Only Bot", - test_question="What is installation?" - ) - assert workflow1["success"], "Failed to create first RAG workflow" - - # Collection 2 with API reference - workflow2 = await authenticated_client.test_rag_workflow( - collection_name="API Only Collection", - document_content=doc2["content"], - chatbot_name="API Only Bot", - test_question="What is API?" - ) - assert workflow2["success"], "Failed to create second RAG workflow" - - # Extract chatbot info - bot1_id = workflow1["results"]["chatbot_creation"]["data"]["id"] - bot1_key = workflow1["results"]["api_key_creation"]["data"]["key"] - - bot2_id = workflow2["results"]["chatbot_creation"]["data"]["id"] - bot2_key = workflow2["results"]["api_key_creation"]["data"]["key"] - - # Test cross-contamination - # Bot 1 (installation only) should not know about API details - api_question = "What are the rate limits?" - result1 = await authenticated_client.chat_with_bot(bot1_id, api_question, api_key=bot1_key) - - if result1["success"]: - response1 = result1["data"]["response"].lower() - # Should not have detailed API rate limit info since it only has installation docs - has_rate_info = "60 requests" in response1 or "600 requests" in response1 - # This is a soft assertion since the bot might still give a generic response - - # Bot 2 (API only) should not know about installation details - install_question = "What are the system requirements?" - result2 = await authenticated_client.chat_with_bot(bot2_id, install_question, api_key=bot2_key) - - if result2["success"]: - response2 = result2["data"]["response"].lower() - # Should not have detailed system requirements since it only has API docs - has_install_info = "python 3.8" in response2 or "docker" in response2 - # This is a soft assertion since the bot might still give a generic response - - @pytest.mark.asyncio - async def test_rag_error_handling(self, authenticated_client): - """Test RAG error handling scenarios""" - - # Test chatbot with non-existent collection - chatbot_result = await authenticated_client.create_chatbot( - name="Error Test Bot", - use_rag=True, - rag_collection="NonExistentCollection" - ) - - # Should either fail to create or handle gracefully - if chatbot_result["success"]: - # If creation succeeded, test that chat handles missing collection gracefully - chatbot_id = chatbot_result["data"]["id"] - - api_key_result = await authenticated_client.create_api_key_for_chatbot(chatbot_id) - if api_key_result["success"]: - api_key = api_key_result["data"]["key"] - - chat_result = await authenticated_client.chat_with_bot( - chatbot_id=chatbot_id, - message="Tell me about something", - api_key=api_key - ) - - # Should handle gracefully - either succeed with fallback or fail gracefully - # Don't assert success/failure, just ensure it doesn't crash - assert "data" in chat_result or "error" in chat_result - - @pytest.mark.asyncio - async def test_rag_document_types(self, authenticated_client): - """Test RAG with different document types and formats""" - - document_types = { - "markdown": { - "filename": "test.md", - "content": "# Markdown Test\n\nThis is **bold** text and *italic* text.\n\n- List item 1\n- List item 2" - }, - "plain_text": { - "filename": "test.txt", - "content": "This is plain text content for testing document processing and retrieval." - }, - "json_like": { - "filename": "config.txt", - "content": '{"setting": "value", "number": 42, "enabled": true}' - } - } - - # Create collection - collection_result = await authenticated_client.create_rag_collection( - name="Document Types Collection", - description="Testing different document formats" - ) - assert collection_result["success"], f"Failed to create collection: {collection_result}" - - collection_id = collection_result["data"]["id"] - - # Upload each document type - for doc_type, doc_info in document_types.items(): - upload_result = await authenticated_client.upload_document( - collection_id=collection_id, - file_content=doc_info["content"], - filename=doc_info["filename"] - ) - - assert upload_result["success"], f"Failed to upload {doc_type}: {upload_result}" - - # Wait for processing - doc_id = upload_result["data"]["id"] - processing_result = await authenticated_client.wait_for_document_processing(doc_id, timeout=30) - assert processing_result["success"], f"Processing failed for {doc_type}: {processing_result}" - - # Create chatbot to test all document types - chatbot_result = await authenticated_client.create_chatbot( - name="Document Types Bot", - use_rag=True, - rag_collection="Document Types Collection" - ) - assert chatbot_result["success"], f"Failed to create chatbot: {chatbot_result}" - - chatbot_id = chatbot_result["data"]["id"] - - api_key_result = await authenticated_client.create_api_key_for_chatbot(chatbot_id) - assert api_key_result["success"], f"Failed to create API key: {api_key_result}" - - api_key = api_key_result["data"]["key"] - - # Test questions for different document types - test_questions = [ - "What is bold text?", # Should find markdown - "What is the plain text content?", # Should find plain text - "What is the setting value?", # Should find JSON-like content - ] - - for question in test_questions: - chat_result = await authenticated_client.chat_with_bot( - chatbot_id=chatbot_id, - message=question, - api_key=api_key - ) - - assert chat_result["success"], f"Chat failed for document type question: {question}" - # Should have sources even if the answer quality varies - assert "sources" in chat_result["data"], f"No sources for question: {question}" \ No newline at end of file diff --git a/backend/tests/integration/test_workflow_system.py b/backend/tests/integration/test_workflow_system.py deleted file mode 100644 index d153d3c..0000000 --- a/backend/tests/integration/test_workflow_system.py +++ /dev/null @@ -1,186 +0,0 @@ -#!/usr/bin/env python3 -""" -Test the enhanced workflow system with brand-ai patterns -""" - -import asyncio -import aiohttp -import json -import time - -async def test_enhanced_workflow_system(): - async with aiohttp.ClientSession() as session: - try: - print("🔧 Testing Enhanced Workflow System") - print("=" * 50) - - # Register and login test user - timestamp = int(time.time()) - user_data = { - "email": f"workflowtest{timestamp}@example.com", - "password": "TestPassword123!", - "username": f"workflowtest{timestamp}" - } - - async with session.post("http://localhost:58000/api/v1/auth/register", json=user_data) as response: - if response.status != 201: - error_data = await response.json() - print(f"❌ Registration failed: {error_data}") - return - print("✅ User registered") - - # Login - login_data = {"email": user_data["email"], "password": user_data["password"]} - async with session.post("http://localhost:58000/api/v1/auth/login", json=login_data) as response: - if response.status != 200: - error_data = await response.json() - print(f"❌ Login failed: {error_data}") - return - - login_result = await response.json() - token = login_result['access_token'] - headers = {'Authorization': f'Bearer {token}'} - print("✅ Login successful") - - # Test 1: Check workflow module status - print("\n📊 Test 1: Module Status") - async with session.get("http://localhost:58000/api/v1/modules/", headers=headers) as response: - if response.status == 200: - modules_data = await response.json() - workflow_module = None - for module in modules_data.get('modules', []): - if module.get('name') == 'workflow': - workflow_module = module - break - - if workflow_module: - print(f"✅ Workflow module found: {workflow_module.get('status', 'unknown')}") - print(f" Initialized: {workflow_module.get('initialized', False)}") - print(f" Version: {workflow_module.get('version', 'unknown')}") - else: - print("❌ Workflow module not found") - return - else: - print(f"❌ Failed to get module status: {response.status}") - return - - # Test 2: Create a simple brand naming workflow - print("\n🏭 Test 2: Brand Naming Workflow") - brand_workflow = { - "name": "Brand Name Generation", - "description": "Generate brand names using AI generation step", - "version": "1.0.0", - "steps": [ - { - "id": "ai_gen_step", - "name": "Generate Brand Names", - "type": "ai_generation", - "config": { - "operation": "brand_names", - "category": "semantic", - "model": "openrouter/anthropic/claude-3.5-sonnet", - "temperature": 0.8, - "max_tokens": 500, - "output_key": "brand_names", - "prompt_template": "Generate 3 creative brand names for a {industry} company targeting {target_audience}. Company description: {description}. Return as a JSON list with name and description fields." - }, - "enabled": True - }, - { - "id": "filter_step", - "name": "Filter Quality Names", - "type": "filter", - "config": { - "input_key": "brand_names", - "output_key": "filtered_names", - "filter_expression": "len(item.get('name', '')) > 3", - "keep_original": False - }, - "enabled": True - } - ], - "variables": { - "industry": "technology", - "target_audience": "developers", - "description": "A platform for AI development tools" - } - } - - # Test workflow execution - FastAPI expects both parameters - payload = { - "workflow_def": brand_workflow, - "input_data": { - "industry": "technology", - "target_audience": "developers", - "description": "A platform for AI development tools" - } - } - - async with session.post( - "http://localhost:58000/api/modules/v1/workflow/execute", - json=payload, - headers=headers, - timeout=aiohttp.ClientTimeout(total=30) - ) as response: - print(f"Workflow execution status: {response.status}") - if response.status == 200: - result = await response.json() - execution_id = result.get("execution_id") - print(f"✅ Workflow started: {execution_id}") - print(f" Status: {result.get('status', 'unknown')}") - - # Check for results - if result.get("results"): - print(f" Results available: {len(result.get('results', {}))}") - - # Display brand names if available - brand_names = result.get("results", {}).get("brand_names", []) - if brand_names: - print(f" Generated brand names: {len(brand_names)}") - for i, name_info in enumerate(brand_names[:3]): # Show first 3 - name = name_info.get('name', 'Unknown') - desc = name_info.get('description', 'No description') - print(f" {i+1}. {name}: {desc[:50]}...") - - # Display filtered results - filtered_names = result.get("results", {}).get("filtered_names", []) - if filtered_names: - print(f" Filtered to {len(filtered_names)} quality names") - else: - print(" No results yet (workflow may still be running)") - - elif response.status == 503: - print("⚠️ Workflow module not initialized (expected on first run)") - else: - error_data = await response.json() - print(f"❌ Workflow execution failed: {error_data}") - - # Test 3: Check workflow templates - print("\n📈 Test 3: Workflow Templates") - try: - async with session.get("http://localhost:58000/api/modules/v1/workflow/templates", headers=headers) as response: - if response.status == 200: - templates = await response.json() - print(f"✅ Workflow templates available: {len(templates.get('templates', []))}") - for template in templates.get('templates', [])[:2]: # Show first 2 - print(f" - {template.get('name')}: {template.get('description')}") - else: - print(f"ℹ️ Templates not available: {response.status}") - except Exception as e: - print(f"ℹ️ Templates endpoint error: {e}") - - print(f"\n🎯 Enhanced Workflow System Test Complete!") - print("The workflow system now supports:") - print(" ✅ Brand-AI inspired step types (AI Generation, Filter, Map, Reduce, etc.)") - print(" ✅ AI-powered content generation") - print(" ✅ Data transformation and filtering") - print(" ✅ Complex workflow orchestration") - print(" ✅ Variable templating and context management") - - except Exception as e: - print(f"❌ Test error: {e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - asyncio.run(test_enhanced_workflow_system()) \ No newline at end of file diff --git a/backend/tests/integration_test.py b/backend/tests/integration_test.py index be224e8..9887f5d 100644 --- a/backend/tests/integration_test.py +++ b/backend/tests/integration_test.py @@ -27,12 +27,12 @@ class LiveModuleIntegrationTest: print(f"✓ API Response: {response.status_code}") print(f"✓ Total modules: {data['total']}") - # Verify we have all 5 modules (updated after 2025-08-10 cleanup) - assert data["total"] >= 5, f"Expected at least 5 modules, got {data['total']}" - assert data["module_count"] >= 5 + # Verify we have all 2 modules (rag and chatbot only) + assert data["total"] >= 2, f"Expected at least 2 modules, got {data['total']}" + assert data["module_count"] >= 2 assert data["initialized"] is True - expected_modules = ['cache', 'chatbot', 'rag', 'signal', 'workflow'] + expected_modules = ['chatbot', 'rag'] loaded_modules = [mod["name"] for mod in data["modules"]] for expected in expected_modules: @@ -64,14 +64,6 @@ class LiveModuleIntegrationTest: modules_by_name = {mod["name"]: mod for mod in modules_data["modules"]} - # Test Cache Module - if "cache" in modules_by_name: - cache_stats = modules_by_name["cache"].get("stats", {}) - expected_cache_fields = ["hits", "misses", "errors", "total_requests"] - for field in expected_cache_fields: - assert field in cache_stats, f"Cache module missing {field} stat" - print("✓ Cache module stats structure verified") - # Test Monitoring Module if "monitoring" in modules_by_name: monitor_stats = modules_by_name["monitoring"].get("stats", {}) diff --git a/backend/tests/test_modules.py b/backend/tests/test_modules.py index 860721e..3fac591 100644 --- a/backend/tests/test_modules.py +++ b/backend/tests/test_modules.py @@ -23,10 +23,8 @@ sys.path.insert(0, str(backend_path)) sys.path.insert(0, str(backend_path / "modules")) try: - from modules.cache.main import CacheModule from modules.rag.main import RAGModule from modules.chatbot.main import ChatbotModule - from modules.workflow.main import WorkflowModule from app.services.module_manager import ModuleManager, ModuleConfig except ImportError as e: @@ -50,17 +48,15 @@ except ImportError as e: def get_stats(self): return {"mock": True} - CacheModule = MockModule RAGModule = MockModule ChatbotModule = MockModule - WorkflowModule = MockModule # Mock ModuleManager for testing class MockModuleManager: def __init__(self): self.initialized = False self.modules = {} - self.module_order = ['cache', 'rag', 'chatbot', 'workflow'] + self.module_order = ['rag', 'chatbot'] async def initialize(self): self.initialized = True @@ -94,26 +90,6 @@ except ImportError as e: class TestModuleIndividual: """Test individual module functionality""" - @pytest.mark.asyncio - async def test_cache_module_initialization(self): - """Test cache module initialization and basic operations""" - cache_module = CacheModule() - - # Test initialization - result = await cache_module.initialize() - assert result is True - assert cache_module.initialized is True - - # Test stats retrieval - stats = cache_module.get_stats() - assert isinstance(stats, dict) - assert 'hits' in stats - assert 'misses' in stats - assert 'errors' in stats - - # Test cleanup - await cache_module.cleanup() - @pytest.mark.asyncio async def test_chatbot_module_initialization(self): """Test chatbot module initialization and basic operations""" @@ -148,60 +124,6 @@ class TestModuleIndividual: await rag_module.cleanup() - @pytest.mark.asyncio - async def test_workflow_module_initialization(self): - """Test workflow module initialization and basic operations""" - workflow_module = WorkflowModule() - - # Test initialization - result = await workflow_module.initialize() - assert result is True - assert workflow_module.initialized is True - - # Test stats retrieval - stats = workflow_module.get_stats() - assert isinstance(stats, dict) - - await workflow_module.cleanup() - - @pytest.mark.asyncio - async def test_signal_module_initialization(self): - """Test signal bot module initialization and basic operations""" - try: - from modules.signal.main import SignalBotModule - except ImportError as e: - # Skip test if SignalBot dependencies not available - pytest.skip(f"Signal bot module dependencies not available: {e}") - - signal_module = SignalBotModule() - - # Test initialization (may fail if SignalBot not available, which is OK) - try: - result = await signal_module.initialize() - assert signal_module.module_id == "signal" - - # Test configuration - assert hasattr(signal_module, 'config') - assert hasattr(signal_module, 'stats') - - # Test permissions - permissions = signal_module.get_required_permissions() - assert len(permissions) == 3 - assert any(p.resource == "signal" and p.action == "manage" for p in permissions) - - except ImportError: - # SignalBot library not available, skip functionality tests - pytest.skip("SignalBot library not available") - except Exception as e: - # Other initialization errors are acceptable for testing - logger.info(f"Signal module initialization failed (expected): {e}") - - # Test cleanup - try: - await signal_module.cleanup() - except Exception: - pass # Cleanup errors are acceptable - class TestModuleIntegration: """Test module integration and interactions""" @@ -216,19 +138,19 @@ class TestModuleIntegration: assert module_manager.initialized is True # Check all expected modules are loaded - expected_modules = ['cache', 'chatbot', 'rag', 'workflow', 'signal'] + expected_modules = ['chatbot', 'rag'] loaded_modules = module_manager.list_modules() for module_name in expected_modules: assert module_name in loaded_modules, f"Module {module_name} not loaded" # Test module retrieval - cache_module = module_manager.get_module('cache') - assert cache_module is not None - rag_module = module_manager.get_module('rag') assert rag_module is not None + chatbot_module = module_manager.get_module('chatbot') + assert chatbot_module is not None + await module_manager.cleanup() @pytest.mark.asyncio @@ -269,18 +191,18 @@ class TestModuleHotReload: await module_manager.initialize() # Get initial module reference - initial_module = module_manager.get_module('cache') + initial_module = module_manager.get_module('rag') assert initial_module is not None # Test reload - await module_manager.reload_module('cache') + await module_manager.reload_module('rag') # Get module after reload - reloaded_module = module_manager.get_module('cache') + reloaded_module = module_manager.get_module('rag') assert reloaded_module is not None # Module should be reloaded (may be same object or different) - assert module_manager.is_module_loaded('cache') + assert module_manager.is_module_loaded('rag') await module_manager.cleanup() @@ -304,26 +226,6 @@ class TestModulePerformance: await module_manager.cleanup() - @pytest.mark.asyncio - async def test_cache_module_performance(self): - """Test cache module performance""" - cache_module = CacheModule() - await cache_module.initialize() - - # Test multiple rapid operations - start_time = time.time() - - for i in range(10): - stats = cache_module.get_stats() - assert isinstance(stats, dict) - - operations_time = time.time() - start_time - - # 10 operations should complete quickly - assert operations_time < 1.0, f"Cache operations took {operations_time:.2f}s" - - await cache_module.cleanup() - class TestModuleErrorHandling: @@ -338,7 +240,7 @@ class TestModuleErrorHandling: await module_manager.initialize() # At least some modules should load successfully - assert len(module_manager.list_modules()) >= 5 + assert len(module_manager.list_modules()) >= 2 await module_manager.cleanup() @@ -426,10 +328,6 @@ if __name__ == "__main__": # Test individual modules test_individual = TestModuleIndividual() - print("Testing cache module...") - await test_individual.test_cache_module_initialization() - print("✓ Cache module test passed") - print("Testing chatbot module...") await test_individual.test_chatbot_module_initialization() print("✓ Chatbot module test passed") @@ -438,14 +336,6 @@ if __name__ == "__main__": await test_individual.test_rag_module_initialization() print("✓ RAG module with content processing test passed") - print("Testing workflow module...") - await test_individual.test_workflow_module_initialization() - print("✓ Workflow module test passed") - - print("Testing signal bot module...") - await test_individual.test_signal_module_initialization() - print("✓ Signal bot module test passed") - # Test integration test_integration = TestModuleIntegration() diff --git a/frontend/src/app/api/workflows/[id]/route.ts b/frontend/src/app/api/workflows/[id]/route.ts deleted file mode 100644 index d32f3e7..0000000 --- a/frontend/src/app/api/workflows/[id]/route.ts +++ /dev/null @@ -1,167 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function GET( - request: NextRequest, - { params }: { params: { id: string } } -) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const workflowId = params.id - - // Fetch workflow from the backend workflow module - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'get_workflow', - workflow_id: workflowId - }), - cache: 'no-store' - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to fetch workflow', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - return NextResponse.json({ - workflow: data.response?.workflow - }) - } catch (error) { - console.error('Error fetching workflow:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} - -export async function PUT( - request: NextRequest, - { params }: { params: { id: string } } -) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const workflowId = params.id - const workflowData = await request.json() - - // Validate workflow first - const validateResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'validate_workflow', - workflow_def: workflowData - }) - }) - - if (!validateResponse.ok) { - const errorData = await validateResponse.json() - return NextResponse.json( - { error: 'Workflow validation failed', details: errorData }, - { status: 400 } - ) - } - - // Update workflow via backend workflow module - const updateResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'update_workflow', - workflow_id: workflowId, - workflow_def: workflowData - }) - }) - - if (!updateResponse.ok) { - const errorData = await updateResponse.text() - return NextResponse.json( - { error: 'Failed to update workflow', details: errorData }, - { status: updateResponse.status } - ) - } - - const updateData = await updateResponse.json() - return NextResponse.json({ - workflow: updateData.response?.workflow - }) - } catch (error) { - console.error('Error updating workflow:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} - -export async function DELETE( - request: NextRequest, - { params }: { params: { id: string } } -) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const workflowId = params.id - - // Delete workflow via backend workflow module - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'delete_workflow', - workflow_id: workflowId - }) - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to delete workflow', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - - // Check if the backend returned an error - if (data.response?.error) { - return NextResponse.json( - { error: data.response.error }, - { status: 404 } - ) - } - - return NextResponse.json({ - success: true, - message: `Workflow ${workflowId} deleted successfully`, - data: data.response - }) - } catch (error) { - console.error('Error deleting workflow:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/execute/route.ts b/frontend/src/app/api/workflows/execute/route.ts deleted file mode 100644 index 44894a0..0000000 --- a/frontend/src/app/api/workflows/execute/route.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function POST(request: NextRequest) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const { workflow_def, input_data } = await request.json() - - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'execute_workflow', - workflow_def, - input_data: input_data || {} - }) - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to execute workflow', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - return NextResponse.json(data) - } catch (error) { - console.error('Error executing workflow:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/executions/[id]/cancel/route.ts b/frontend/src/app/api/workflows/executions/[id]/cancel/route.ts deleted file mode 100644 index 060e772..0000000 --- a/frontend/src/app/api/workflows/executions/[id]/cancel/route.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function POST( - request: NextRequest, - { params }: { params: { id: string } } -) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const executionId = params.id - - // Cancel execution via workflow module - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'cancel_execution', - execution_id: executionId - }), - cache: 'no-store' - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to cancel execution', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - return NextResponse.json({ - success: true, - message: `Execution ${executionId} cancelled successfully`, - data: data.response - }) - } catch (error) { - console.error('Error cancelling execution:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/executions/route.ts b/frontend/src/app/api/workflows/executions/route.ts deleted file mode 100644 index ef299b7..0000000 --- a/frontend/src/app/api/workflows/executions/route.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function GET(request: NextRequest) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - // Fetch executions from the backend workflow module - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'get_executions' - }), - cache: 'no-store' - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to fetch workflow executions', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - return NextResponse.json({ - executions: data.response?.executions || [] - }) - } catch (error) { - console.error('Error fetching workflow executions:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/import/route.ts b/frontend/src/app/api/workflows/import/route.ts deleted file mode 100644 index 14010fc..0000000 --- a/frontend/src/app/api/workflows/import/route.ts +++ /dev/null @@ -1,145 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function POST(request: NextRequest) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const formData = await request.formData() - const file = formData.get('workflow_file') as File - - if (!file) { - return NextResponse.json( - { error: 'No file provided' }, - { status: 400 } - ) - } - - // Check file type - if (!file.name.endsWith('.json')) { - return NextResponse.json( - { error: 'Only JSON files are supported' }, - { status: 400 } - ) - } - - // Check file size (max 10MB) - if (file.size > 10 * 1024 * 1024) { - return NextResponse.json( - { error: 'File too large. Maximum size is 10MB' }, - { status: 400 } - ) - } - - // Read file content - const fileContent = await file.text() - - let workflowData - try { - workflowData = JSON.parse(fileContent) - } catch (error) { - return NextResponse.json( - { error: 'Invalid JSON format' }, - { status: 400 } - ) - } - - // Validate required fields - const requiredFields = ['name', 'description', 'steps'] - for (const field of requiredFields) { - if (!workflowData[field]) { - return NextResponse.json( - { error: `Missing required field: ${field}` }, - { status: 400 } - ) - } - } - - // Generate new ID if not provided - if (!workflowData.id) { - workflowData.id = `imported-${Date.now()}-${Math.random().toString(36).substr(2, 9)}` - } - - // Add import metadata - workflowData.metadata = { - ...workflowData.metadata, - imported_at: new Date().toISOString(), - imported_from: file.name, - imported_by: 'user' - } - - // Validate workflow through backend - const validateResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'validate_workflow', - workflow_def: workflowData - }) - }) - - if (!validateResponse.ok) { - const errorData = await validateResponse.json() - return NextResponse.json( - { - error: 'Workflow validation failed', - details: errorData.response?.errors || ['Unknown validation error'], - workflow_data: workflowData - }, - { status: 400 } - ) - } - - const validationResult = await validateResponse.json() - if (!validationResult.response?.valid) { - return NextResponse.json( - { - error: 'Workflow validation failed', - details: validationResult.response?.errors || ['Workflow is not valid'], - workflow_data: workflowData - }, - { status: 400 } - ) - } - - // Create workflow via backend - const createResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'create_workflow', - workflow_def: workflowData - }) - }) - - if (!createResponse.ok) { - const errorData = await createResponse.text() - return NextResponse.json( - { error: 'Failed to create workflow', details: errorData }, - { status: createResponse.status } - ) - } - - const createData = await createResponse.json() - return NextResponse.json({ - success: true, - message: 'Workflow imported successfully', - workflow: createData.response?.workflow || workflowData, - validation_passed: true - }) - - } catch (error) { - console.error('Error importing workflow:', error) - return NextResponse.json( - { error: 'Internal server error', details: error instanceof Error ? error.message : 'Unknown error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/route.ts b/frontend/src/app/api/workflows/route.ts deleted file mode 100644 index b6295c3..0000000 --- a/frontend/src/app/api/workflows/route.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function GET(request: NextRequest) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - // Fetch workflows from the backend workflow module - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'get_workflows' - }), - cache: 'no-store' - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to fetch workflows', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - return NextResponse.json({ - workflows: data.response?.workflows || [] - }) - } catch (error) { - console.error('Error fetching workflows:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} - -export async function POST(request: NextRequest) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const workflowData = await request.json() - - // Validate workflow first - const validateResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'validate_workflow', - workflow_def: workflowData - }) - }) - - if (!validateResponse.ok) { - const errorData = await validateResponse.json() - return NextResponse.json( - { error: 'Workflow validation failed', details: errorData }, - { status: 400 } - ) - } - - // Create workflow via backend workflow module - const createResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'create_workflow', - workflow_def: workflowData - }) - }) - - if (!createResponse.ok) { - const errorData = await createResponse.text() - return NextResponse.json( - { error: 'Failed to create workflow', details: errorData }, - { status: createResponse.status } - ) - } - - const createData = await createResponse.json() - return NextResponse.json({ - workflow: createData.response?.workflow - }) - } catch (error) { - console.error('Error creating workflow:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/templates/[id]/route.ts b/frontend/src/app/api/workflows/templates/[id]/route.ts deleted file mode 100644 index de5b8fc..0000000 --- a/frontend/src/app/api/workflows/templates/[id]/route.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function GET( - request: NextRequest, - { params }: { params: { id: string } } -) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const templateId = params.id - - // First get all templates - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'get_workflow_templates' - }) - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to fetch workflow templates', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - const templates = data.templates || [] - - // Find the specific template - const template = templates.find((t: any) => t.id === templateId) - - if (!template) { - return NextResponse.json( - { error: `Template with id '${templateId}' not found` }, - { status: 404 } - ) - } - - return NextResponse.json({ template }) - } catch (error) { - console.error('Error fetching workflow template:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/templates/route.ts b/frontend/src/app/api/workflows/templates/route.ts deleted file mode 100644 index 48e1dc8..0000000 --- a/frontend/src/app/api/workflows/templates/route.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function GET(request: NextRequest) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const response = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'get_workflow_templates' - }), - cache: 'no-store' - }) - - if (!response.ok) { - const errorData = await response.text() - return NextResponse.json( - { error: 'Failed to fetch workflow templates', details: errorData }, - { status: response.status } - ) - } - - const data = await response.json() - return NextResponse.json(data) - } catch (error) { - console.error('Error fetching workflow templates:', error) - return NextResponse.json( - { error: 'Internal server error' }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/api/workflows/test/route.ts b/frontend/src/app/api/workflows/test/route.ts deleted file mode 100644 index 0385dd2..0000000 --- a/frontend/src/app/api/workflows/test/route.ts +++ /dev/null @@ -1,77 +0,0 @@ -import { NextRequest, NextResponse } from 'next/server' - -const BACKEND_URL = process.env.INTERNAL_API_URL || 'http://enclava-backend:8000' - -export async function POST(request: NextRequest) { - try { - const adminToken = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZW1haWwiOiJhZG1pbkBleGFtcGxlLmNvbSIsImlzX3N1cGVydXNlciI6dHJ1ZSwicm9sZSI6InN1cGVyX2FkbWluIiwiZXhwIjoxNzg0Nzk2NDI2LjA0NDYxOX0.YOTlUY8nowkaLAXy5EKfnZEpbDgGCabru5R0jdq_DOQ' - - const { workflow, test_data } = await request.json() - - // First validate the workflow - const validateResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'validate_workflow', - workflow_def: workflow - }) - }) - - if (!validateResponse.ok) { - const errorData = await validateResponse.json() - return NextResponse.json( - { - status: 'failed', - error: 'Workflow validation failed', - details: errorData - }, - { status: 400 } - ) - } - - // If validation passes, try a test execution - const executeResponse = await fetch(`${BACKEND_URL}/api/modules/workflow/execute`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${adminToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - action: 'execute_workflow', - workflow_def: workflow, - input_data: test_data || {} - }) - }) - - if (!executeResponse.ok) { - const errorData = await executeResponse.text() - return NextResponse.json( - { - status: 'failed', - error: 'Workflow test execution failed', - details: errorData - }, - { status: 500 } - ) - } - - const executionData = await executeResponse.json() - return NextResponse.json({ - status: 'success', - execution: executionData - }) - } catch (error) { - console.error('Error testing workflow:', error) - return NextResponse.json( - { - status: 'failed', - error: 'Internal server error' - }, - { status: 500 } - ) - } -} \ No newline at end of file diff --git a/frontend/src/app/workflows/builder/page.tsx b/frontend/src/app/workflows/builder/page.tsx deleted file mode 100644 index 3c8649e..0000000 --- a/frontend/src/app/workflows/builder/page.tsx +++ /dev/null @@ -1,805 +0,0 @@ -"use client" - -import { useState, useEffect, useCallback } from "react" -import { useRouter, useSearchParams } from "next/navigation" -import { Button } from "@/components/ui/button" -import { Input } from "@/components/ui/input" -import { Label } from "@/components/ui/label" -import { Textarea } from "@/components/ui/textarea" -import { Badge } from "@/components/ui/badge" -import { useToast } from "@/hooks/use-toast" -import { apiClient } from "@/lib/api-client" -import { config } from "@/lib/config" -import { - Save, - Play, - ArrowLeft, - Plus, - Settings, - Trash2, - Move, - Zap, - GitBranch, - MessageSquare, - Database, - Workflow as WorkflowIcon, - GripVertical -} from "lucide-react" -import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from "@/components/ui/dialog" -import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select" -import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs" -import { Separator } from "@/components/ui/separator" -import ProtectedRoute from "@/components/ProtectedRoute" -import StepConfigurationPanel from "@/components/workflows/StepConfigurationPanel" - -interface WorkflowStep { - id: string - name: string - type: string - config: Record - position: { x: number; y: number } - connections: string[] -} - -interface WorkflowDefinition { - id?: string - name: string - description: string - version: string - steps: WorkflowStep[] - variables: Record - outputs: Record - metadata: Record -} - -const stepTypes = [ - { - type: "llm_call", - name: "LLM Call", - description: "Make a call to a language model", - icon: , - color: "bg-blue-100 text-blue-800 dark:bg-blue-900 dark:text-blue-300" - }, - { - type: "chatbot", - name: "Chatbot", - description: "Use a configured chatbot", - icon: , - color: "bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-300" - }, - { - type: "transform", - name: "Transform", - description: "Transform or process data", - icon: , - color: "bg-purple-100 text-purple-800 dark:bg-purple-900 dark:text-purple-300" - }, - { - type: "conditional", - name: "Conditional", - description: "Conditional logic and branching", - icon: , - color: "bg-orange-100 text-orange-800 dark:bg-orange-900 dark:text-orange-300" - }, - { - type: "parallel", - name: "Parallel", - description: "Execute multiple steps in parallel", - icon: , - color: "bg-indigo-100 text-indigo-800 dark:bg-indigo-900 dark:text-indigo-300" - }, - { - type: "rag_search", - name: "RAG Search", - description: "Search documents in RAG collection", - icon: , - color: "bg-teal-100 text-teal-800 dark:bg-teal-900 dark:text-teal-300" - } -] - -export default function WorkflowBuilderPage() { - const { toast } = useToast() - const router = useRouter() - const searchParams = useSearchParams() - const workflowId = searchParams.get('id') - const templateId = searchParams.get('template') - - const [workflow, setWorkflow] = useState({ - name: "", - description: "", - version: "1.0.0", - steps: [], - variables: {}, - outputs: {}, - metadata: {} - }) - - const [selectedStep, setSelectedStep] = useState(null) - const [showStepPalette, setShowStepPalette] = useState(false) - const [showVariablePanel, setShowVariablePanel] = useState(false) - const [loading, setLoading] = useState(true) - const [saving, setSaving] = useState(false) - const [draggedStepIndex, setDraggedStepIndex] = useState(null) - const [dragOverIndex, setDragOverIndex] = useState(null) - - // Configuration data - const [availableChatbots, setAvailableChatbots] = useState>([]) - const [availableCollections, setAvailableCollections] = useState([]) - - // Helper function to ensure all steps have position properties - const ensureStepPositions = (steps: any[]): WorkflowStep[] => { - return steps.map((step, index) => ({ - ...step, - position: step.position || { x: 100, y: 100 + index * 120 }, - connections: step.connections || [] - })) - } - - const loadWorkflow = useCallback(async () => { - try { - setLoading(true) - - if (workflowId) { - // Load existing workflow - const data = await apiClient.get(`/api-internal/v1/workflows/${workflowId}`) - setWorkflow({ - ...data.workflow, - steps: ensureStepPositions(data.workflow.steps || []) - }) - } else if (templateId) { - // Load from template - const data = await apiClient.get(`/api-internal/v1/workflows/templates/${templateId}`) - setWorkflow({ - ...data.template.definition, - id: undefined, // Remove ID for new workflow - name: `${data.template.definition.name} (Copy)`, - steps: ensureStepPositions(data.template.definition.steps || []) - }) - } - } catch (error) { - toast({ - title: "Error", - description: "Failed to load workflow", - variant: "destructive" - }) - } finally { - setLoading(false) - } - }, [workflowId, templateId, toast]) - - const loadConfigurationData = useCallback(async () => { - try { - // Get auth token (same pattern as playground) - const token = localStorage.getItem('token') - const authHeaders = token ? { 'Authorization': `Bearer ${token}` } : {} - - // Load available chatbots (existing platform component) - try { - const chatbotsData = await apiClient.get('/api-internal/v1/chatbot/list') - - // Backend returns a direct array of chatbot objects - let chatbotOptions: Array<{id: string, name: string}> = [] - - if (Array.isArray(chatbotsData)) { - // Map to both ID and name for display purposes - chatbotOptions = chatbotsData.map((chatbot: any) => ({ - id: chatbot.id || '', - name: chatbot.name || chatbot.id || 'Unnamed Chatbot' - })) - } - - // Store full chatbot objects for better UX (names + IDs) - setAvailableChatbots(chatbotOptions) - } catch (error) { - // Silently handle error - chatbots will be empty array - } - - // Load available RAG collections (existing platform component) - try { - const collectionsData = await apiClient.get('/api-internal/v1/rag/collections') - setAvailableCollections(collectionsData.collections?.map((c: any) => c.name) || []) - } catch (error) { - // Silently handle error - collections will be empty array - } - } catch (error) { - // Silently handle error - configuration will use defaults - } - }, []) - - useEffect(() => { - loadWorkflow() - loadConfigurationData() - }, [loadWorkflow, loadConfigurationData]) - - const saveWorkflow = async () => { - try { - setSaving(true) - - const url = workflowId ? `/api-internal/v1/workflows/${workflowId}` : '/api-internal/v1/workflows' - - if (workflowId) { - await apiClient.put(url, workflow) - } else { - await apiClient.post(url, workflow) - } - - toast({ - title: "Success", - description: workflowId ? "Workflow updated" : "Workflow created" - }) - - // Redirect to workflow list if this was a new workflow - if (!workflowId) { - router.push('/workflows') - } - } catch (error) { - toast({ - title: "Error", - description: "Failed to save workflow", - variant: "destructive" - }) - } finally { - setSaving(false) - } - } - - const testWorkflow = async () => { - try { - const result = await apiClient.post('/api-internal/v1/workflows/test', { - workflow, - test_data: {} - }) - - toast({ - title: "Test Result", - description: `Workflow test completed: ${result.status}` - }) - } catch (error) { - toast({ - title: "Test Failed", - description: "Failed to test workflow", - variant: "destructive" - }) - } - } - - const addStep = (stepType: string) => { - const newStep: WorkflowStep = { - id: `step_${Date.now()}`, - name: `New ${stepTypes.find(t => t.type === stepType)?.name || stepType}`, - type: stepType, - config: {}, - position: { x: 100, y: 100 + workflow.steps.length * 120 }, - connections: [] - } - - setWorkflow(prev => ({ - ...prev, - steps: [...prev.steps, newStep] - })) - - setSelectedStep(newStep) - setShowStepPalette(false) - } - - const updateStep = (stepId: string, updates: Partial) => { - setWorkflow(prev => ({ - ...prev, - steps: prev.steps.map(step => - step.id === stepId ? { ...step, ...updates } : step - ) - })) - - if (selectedStep?.id === stepId) { - setSelectedStep(prev => prev ? { ...prev, ...updates } : null) - } - } - - const deleteStep = (stepId: string) => { - setWorkflow(prev => ({ - ...prev, - steps: prev.steps.filter(step => step.id !== stepId) - })) - - if (selectedStep?.id === stepId) { - setSelectedStep(null) - } - } - - const addVariable = (name: string, value: any) => { - setWorkflow(prev => ({ - ...prev, - variables: { ...prev.variables, [name]: value } - })) - } - - const removeVariable = (name: string) => { - const { [name]: removed, ...rest } = workflow.variables - setWorkflow(prev => ({ ...prev, variables: rest })) - } - - const reorderSteps = (fromIndex: number, toIndex: number) => { - if (fromIndex === toIndex) return - - const newSteps = [...workflow.steps] - const [draggedStep] = newSteps.splice(fromIndex, 1) - newSteps.splice(toIndex, 0, draggedStep) - - // Recalculate canvas positions based on new order (vertical layout) - const updatedSteps = newSteps.map((step, index) => ({ - ...step, - position: { - x: 100, - y: 100 + index * 120 - } - })) - - setWorkflow(prev => ({ - ...prev, - steps: updatedSteps - })) - - // Update selected step if it was the one being dragged - if (selectedStep?.id === draggedStep.id) { - setSelectedStep(draggedStep) - } - } - - const handleDragStart = (e: React.DragEvent, index: number) => { - setDraggedStepIndex(index) - e.dataTransfer.effectAllowed = 'move' - e.dataTransfer.setData('text/html', e.currentTarget.outerHTML) - - // Add visual feedback - if (e.currentTarget instanceof HTMLElement) { - e.currentTarget.style.opacity = '0.5' - } - } - - const handleDragEnd = (e: React.DragEvent) => { - setDraggedStepIndex(null) - setDragOverIndex(null) - - // Reset visual feedback - if (e.currentTarget instanceof HTMLElement) { - e.currentTarget.style.opacity = '1' - } - } - - const handleDragOver = (e: React.DragEvent, overIndex: number) => { - e.preventDefault() - e.dataTransfer.dropEffect = 'move' - - if (draggedStepIndex !== null && draggedStepIndex !== overIndex) { - setDragOverIndex(overIndex) - } - } - - const handleDragLeave = (e: React.DragEvent) => { - // Only clear if we're leaving the entire step element, not just a child - if (!e.currentTarget.contains(e.relatedTarget as Node)) { - setDragOverIndex(null) - } - } - - const handleDrop = (e: React.DragEvent, dropIndex: number) => { - e.preventDefault() - - if (draggedStepIndex !== null && draggedStepIndex !== dropIndex) { - reorderSteps(draggedStepIndex, dropIndex) - } - - setDraggedStepIndex(null) - setDragOverIndex(null) - } - - if (loading) { - return ( - -
-
-
-
-
-
- ) - } - - return ( - -
- {/* Header */} -
-
-
-
- -
-

- - {workflowId ? 'Edit Workflow' : 'New Workflow'} -

-

- {workflow.name || 'Untitled Workflow'} -

-
-
- -
- - - -
-
-
-
- - {/* Main Content */} -
- {/* Left Panel - Step Palette */} -
-
-

Add Steps

-
- {stepTypes.map((stepType) => ( - - ))} -
- - - -
-

Workflow Steps

-

- Drag steps to reorder them -

-
-
- {workflow.steps.map((step, index) => ( -
handleDragStart(e, index)} - onDragEnd={handleDragEnd} - onDragOver={(e) => handleDragOver(e, index)} - onDragLeave={handleDragLeave} - onDrop={(e) => handleDrop(e, index)} - className={`p-2 rounded text-sm cursor-pointer border transition-all ${ - selectedStep?.id === step.id - ? 'bg-primary/10 border-primary' - : 'bg-background border-border hover:bg-accent' - } ${ - draggedStepIndex === index ? 'opacity-50' : '' - } ${ - dragOverIndex === index && draggedStepIndex !== index - ? 'border-primary/50 bg-primary/5' - : '' - }`} - onClick={() => setSelectedStep(step)} - > -
-
- - - {index + 1} - - {stepTypes.find(t => t.type === step.type)?.icon} - {step.name} -
- -
- - {step.type} - -
- ))} - - {workflow.steps.length === 0 && ( -

- No steps added yet -

- )} -
-
-
- - {/* Center Panel - Canvas */} -
-
- {/* Canvas Content */} -
- {workflow.steps.length === 0 ? ( -
-
- -

Start Building Your Workflow

-

- Add steps from the left panel to create your workflow -

- -
-
- ) : ( -
- {/* Workflow Steps */} - {workflow.steps.map((step, index) => ( -
setSelectedStep(step)} - > -
-
- {stepTypes.find(t => t.type === step.type)?.icon} - {index + 1} -
- - {step.type} - -
-

{step.name}

-

- {stepTypes.find(t => t.type === step.type)?.description} -

-
- ))} - - {/* Connection Lines - Vertical layout */} - - {workflow.steps.map((step, index) => { - if (index < workflow.steps.length - 1) { - const nextStep = workflow.steps[index + 1] - return ( - - ) - } - return null - })} - - - - - - -
- )} -
-
-
- - {/* Right Panel - Properties */} -
-
- - - Workflow - - Step - - - - {/* Workflow Properties */} - -
- - setWorkflow(prev => ({ ...prev, name: e.target.value }))} - placeholder="Enter workflow name" - /> -
- -
- -