mirror of
https://github.com/aljazceru/enclava.git
synced 2026-01-15 05:34:45 +01:00
454 lines
16 KiB
Python
454 lines
16 KiB
Python
"""
|
|
Tool Execution Service with Docker Sandboxing
|
|
Secure execution environment for user-defined tools
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from typing import Dict, Any, Optional, List
|
|
from datetime import datetime, timedelta
|
|
import docker
|
|
from docker.errors import DockerException, ContainerError, ImageNotFound
|
|
import psutil
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
from fastapi import HTTPException, status
|
|
|
|
from app.models.tool import Tool, ToolExecution, ToolStatus, ToolType
|
|
from app.models.user import User
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ToolExecutionService:
|
|
"""Service for secure tool execution with Docker sandboxing"""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
self.docker_client = None
|
|
self._init_docker()
|
|
|
|
def _init_docker(self):
|
|
"""Initialize Docker client"""
|
|
try:
|
|
self.docker_client = docker.from_env()
|
|
# Test Docker connection
|
|
self.docker_client.ping()
|
|
logger.info("Docker client initialized successfully")
|
|
except DockerException as e:
|
|
logger.error(f"Failed to initialize Docker client: {e}")
|
|
self.docker_client = None
|
|
|
|
async def execute_tool(
|
|
self,
|
|
tool_id: int,
|
|
user_id: int,
|
|
parameters: Dict[str, Any],
|
|
timeout_override: Optional[int] = None,
|
|
) -> ToolExecution:
|
|
"""Execute a tool with the given parameters"""
|
|
|
|
# Get tool and validate access
|
|
tool = await self._get_tool_and_validate_access(tool_id, user_id)
|
|
|
|
# Create execution record
|
|
execution = ToolExecution(
|
|
tool_id=tool_id,
|
|
executed_by_user_id=user_id,
|
|
parameters=parameters,
|
|
status=ToolStatus.PENDING,
|
|
)
|
|
|
|
self.db.add(execution)
|
|
await self.db.commit()
|
|
await self.db.refresh(execution)
|
|
|
|
try:
|
|
# Update status to running
|
|
execution.status = ToolStatus.RUNNING
|
|
execution.started_at = datetime.utcnow()
|
|
await self.db.commit()
|
|
|
|
# Execute based on tool type
|
|
if tool.tool_type == ToolType.DOCKER:
|
|
await self._execute_docker_tool(
|
|
execution, tool, parameters, timeout_override
|
|
)
|
|
elif tool.tool_type == ToolType.PYTHON:
|
|
await self._execute_python_tool(
|
|
execution, tool, parameters, timeout_override
|
|
)
|
|
elif tool.tool_type == ToolType.BASH:
|
|
await self._execute_bash_tool(
|
|
execution, tool, parameters, timeout_override
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported tool type: {tool.tool_type}")
|
|
|
|
# Update tool usage
|
|
tool.increment_usage()
|
|
await self.db.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Tool execution failed: {e}")
|
|
execution.status = ToolStatus.FAILED
|
|
execution.error_message = str(e)
|
|
execution.completed_at = datetime.utcnow()
|
|
await self.db.commit()
|
|
|
|
await self.db.refresh(execution)
|
|
return execution
|
|
|
|
async def _get_tool_and_validate_access(self, tool_id: int, user_id: int) -> Tool:
|
|
"""Get tool and validate user access"""
|
|
stmt = select(Tool).where(Tool.id == tool_id)
|
|
result = await self.db.execute(stmt)
|
|
tool = result.scalar_one_or_none()
|
|
|
|
if not tool:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Tool not found"
|
|
)
|
|
|
|
if not tool.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Tool is not active"
|
|
)
|
|
|
|
# Get user for permission check
|
|
user_stmt = select(User).where(User.id == user_id)
|
|
user_result = await self.db.execute(user_stmt)
|
|
user = user_result.scalar_one_or_none()
|
|
|
|
if not user or not tool.can_be_used_by(user):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied to this tool",
|
|
)
|
|
|
|
return tool
|
|
|
|
async def _execute_docker_tool(
|
|
self,
|
|
execution: ToolExecution,
|
|
tool: Tool,
|
|
parameters: Dict[str, Any],
|
|
timeout_override: Optional[int] = None,
|
|
):
|
|
"""Execute tool in Docker container"""
|
|
if not self.docker_client:
|
|
raise RuntimeError("Docker is not available")
|
|
|
|
timeout = timeout_override or tool.timeout_seconds
|
|
memory_limit = f"{tool.max_memory_mb}m"
|
|
|
|
# Prepare execution environment
|
|
env_vars = {"TOOL_PARAMETERS": json.dumps(parameters), "PYTHONUNBUFFERED": "1"}
|
|
|
|
# Create temporary directory for code
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
code_file = os.path.join(temp_dir, "tool_code.py")
|
|
with open(code_file, "w") as f:
|
|
f.write(tool.code)
|
|
|
|
container = None
|
|
try:
|
|
# Run container
|
|
container = self.docker_client.containers.run(
|
|
image=tool.docker_image or "python:3.11-slim",
|
|
command=tool.docker_command or ["python", "/app/tool_code.py"],
|
|
environment=env_vars,
|
|
volumes={temp_dir: {"bind": "/app", "mode": "ro"}},
|
|
mem_limit=memory_limit,
|
|
cpu_quota=int(
|
|
tool.max_cpu_seconds * 100000
|
|
), # Convert to microseconds
|
|
cpu_period=100000,
|
|
network_disabled=True, # No network access for security
|
|
detach=True,
|
|
remove=False, # Keep container for log retrieval
|
|
working_dir="/app",
|
|
)
|
|
|
|
execution.container_id = container.id
|
|
await self.db.commit()
|
|
|
|
# Wait for completion with timeout
|
|
try:
|
|
result = container.wait(timeout=timeout)
|
|
execution.return_code = result["StatusCode"]
|
|
except asyncio.TimeoutError:
|
|
container.kill()
|
|
execution.status = ToolStatus.TIMEOUT
|
|
execution.error_message = (
|
|
f"Tool execution timed out after {timeout} seconds"
|
|
)
|
|
execution.completed_at = datetime.utcnow()
|
|
return
|
|
|
|
# Get output and logs
|
|
try:
|
|
output = container.logs(stdout=True, stderr=False).decode("utf-8")
|
|
error_logs = container.logs(stdout=False, stderr=True).decode(
|
|
"utf-8"
|
|
)
|
|
|
|
execution.output = output
|
|
execution.docker_logs = error_logs
|
|
|
|
if execution.return_code == 0:
|
|
execution.status = ToolStatus.COMPLETED
|
|
else:
|
|
execution.status = ToolStatus.FAILED
|
|
execution.error_message = error_logs or "Tool execution failed"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve container output: {e}")
|
|
execution.error_message = f"Failed to retrieve output: {e}"
|
|
execution.status = ToolStatus.FAILED
|
|
|
|
# Get resource usage stats
|
|
try:
|
|
stats = container.stats(stream=False)
|
|
memory_usage = stats["memory_stats"].get("usage", 0)
|
|
execution.memory_used_mb = memory_usage / (
|
|
1024 * 1024
|
|
) # Convert to MB
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get container stats: {e}")
|
|
|
|
except ContainerError as e:
|
|
execution.status = ToolStatus.FAILED
|
|
execution.error_message = f"Container execution failed: {e}"
|
|
execution.return_code = e.exit_status
|
|
|
|
except ImageNotFound:
|
|
execution.status = ToolStatus.FAILED
|
|
execution.error_message = f"Docker image not found: {tool.docker_image}"
|
|
|
|
except Exception as e:
|
|
execution.status = ToolStatus.FAILED
|
|
execution.error_message = f"Unexpected error: {e}"
|
|
|
|
finally:
|
|
# Cleanup container
|
|
if container:
|
|
try:
|
|
container.remove(force=True)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove container: {e}")
|
|
|
|
execution.completed_at = datetime.utcnow()
|
|
if execution.started_at:
|
|
duration = execution.completed_at - execution.started_at
|
|
execution.execution_time_ms = int(duration.total_seconds() * 1000)
|
|
|
|
await self.db.commit()
|
|
|
|
async def _execute_python_tool(
|
|
self,
|
|
execution: ToolExecution,
|
|
tool: Tool,
|
|
parameters: Dict[str, Any],
|
|
timeout_override: Optional[int] = None,
|
|
):
|
|
"""Execute Python tool in Docker container for security"""
|
|
# Use Docker for Python execution too for security
|
|
docker_tool = Tool(
|
|
id=tool.id,
|
|
name=tool.name,
|
|
display_name=tool.display_name,
|
|
description=tool.description,
|
|
tool_type=ToolType.DOCKER,
|
|
code=tool.code,
|
|
parameters_schema=tool.parameters_schema,
|
|
return_schema=tool.return_schema,
|
|
timeout_seconds=tool.timeout_seconds,
|
|
max_memory_mb=tool.max_memory_mb,
|
|
max_cpu_seconds=tool.max_cpu_seconds,
|
|
docker_image="python:3.11-slim",
|
|
docker_command=["python", "/app/tool_code.py"],
|
|
is_public=tool.is_public,
|
|
is_approved=tool.is_approved,
|
|
created_by_user_id=tool.created_by_user_id,
|
|
)
|
|
|
|
await self._execute_docker_tool(
|
|
execution, docker_tool, parameters, timeout_override
|
|
)
|
|
|
|
async def _execute_bash_tool(
|
|
self,
|
|
execution: ToolExecution,
|
|
tool: Tool,
|
|
parameters: Dict[str, Any],
|
|
timeout_override: Optional[int] = None,
|
|
):
|
|
"""Execute Bash tool in Docker container for security"""
|
|
# Wrap bash script for Docker execution
|
|
bash_wrapper = f"""#!/bin/bash
|
|
set -e
|
|
export TOOL_PARAMETERS='{json.dumps(parameters)}'
|
|
|
|
{tool.code}
|
|
"""
|
|
|
|
docker_tool = Tool(
|
|
id=tool.id,
|
|
name=tool.name,
|
|
display_name=tool.display_name,
|
|
description=tool.description,
|
|
tool_type=ToolType.DOCKER,
|
|
code=bash_wrapper,
|
|
parameters_schema=tool.parameters_schema,
|
|
return_schema=tool.return_schema,
|
|
timeout_seconds=tool.timeout_seconds,
|
|
max_memory_mb=tool.max_memory_mb,
|
|
max_cpu_seconds=tool.max_cpu_seconds,
|
|
docker_image="ubuntu:20.04",
|
|
docker_command=["bash", "/app/tool_code.py"],
|
|
is_public=tool.is_public,
|
|
is_approved=tool.is_approved,
|
|
created_by_user_id=tool.created_by_user_id,
|
|
)
|
|
|
|
await self._execute_docker_tool(
|
|
execution, docker_tool, parameters, timeout_override
|
|
)
|
|
|
|
async def cancel_execution(self, execution_id: int, user_id: int) -> ToolExecution:
|
|
"""Cancel a running tool execution"""
|
|
stmt = select(ToolExecution).where(ToolExecution.id == execution_id)
|
|
result = await self.db.execute(stmt)
|
|
execution = result.scalar_one_or_none()
|
|
|
|
if not execution:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Execution not found"
|
|
)
|
|
|
|
# Check permission (only creator or admin can cancel)
|
|
if execution.executed_by_user_id != user_id:
|
|
user_stmt = select(User).where(User.id == user_id)
|
|
user_result = await self.db.execute(user_stmt)
|
|
user = user_result.scalar_one_or_none()
|
|
|
|
if not user or not user.has_permission("manage_tools"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="Access denied"
|
|
)
|
|
|
|
if not execution.is_running():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Execution is not running",
|
|
)
|
|
|
|
# Kill container if it exists
|
|
if execution.container_id and self.docker_client:
|
|
try:
|
|
container = self.docker_client.containers.get(execution.container_id)
|
|
container.kill()
|
|
container.remove(force=True)
|
|
logger.info(f"Killed container {execution.container_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to kill container: {e}")
|
|
|
|
# Update execution status
|
|
execution.status = ToolStatus.CANCELLED
|
|
execution.completed_at = datetime.utcnow()
|
|
execution.error_message = "Execution cancelled by user"
|
|
|
|
await self.db.commit()
|
|
await self.db.refresh(execution)
|
|
|
|
return execution
|
|
|
|
async def get_execution_logs(
|
|
self, execution_id: int, user_id: int
|
|
) -> Dict[str, Any]:
|
|
"""Get real-time logs for a running execution"""
|
|
stmt = select(ToolExecution).where(ToolExecution.id == execution_id)
|
|
result = await self.db.execute(stmt)
|
|
execution = result.scalar_one_or_none()
|
|
|
|
if not execution:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Execution not found"
|
|
)
|
|
|
|
# Check permission
|
|
if execution.executed_by_user_id != user_id:
|
|
user_stmt = select(User).where(User.id == user_id)
|
|
user_result = await self.db.execute(user_stmt)
|
|
user = user_result.scalar_one_or_none()
|
|
|
|
if not user or not user.has_permission("manage_tools"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="Access denied"
|
|
)
|
|
|
|
logs = {
|
|
"execution_id": execution_id,
|
|
"status": execution.status,
|
|
"output": execution.output or "",
|
|
"error_message": execution.error_message or "",
|
|
"docker_logs": execution.docker_logs or "",
|
|
"is_running": execution.is_running(),
|
|
}
|
|
|
|
# Get live logs if container is running
|
|
if execution.container_id and execution.is_running() and self.docker_client:
|
|
try:
|
|
container = self.docker_client.containers.get(execution.container_id)
|
|
live_logs = container.logs(
|
|
stdout=True, stderr=True, stream=False, tail=100
|
|
).decode("utf-8")
|
|
logs["live_logs"] = live_logs
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get live logs: {e}")
|
|
logs["live_logs"] = ""
|
|
|
|
return logs
|
|
|
|
async def cleanup_old_executions(self, days_old: int = 30):
|
|
"""Clean up old execution records and containers"""
|
|
cutoff_date = datetime.utcnow() - timedelta(days=days_old)
|
|
|
|
# Get old executions
|
|
stmt = select(ToolExecution).where(
|
|
ToolExecution.created_at < cutoff_date,
|
|
ToolExecution.status.in_(
|
|
[ToolStatus.COMPLETED, ToolStatus.FAILED, ToolStatus.CANCELLED]
|
|
),
|
|
)
|
|
result = await self.db.execute(stmt)
|
|
old_executions = result.scalars().all()
|
|
|
|
cleaned_count = 0
|
|
for execution in old_executions:
|
|
# Clean up any remaining containers
|
|
if execution.container_id and self.docker_client:
|
|
try:
|
|
container = self.docker_client.containers.get(
|
|
execution.container_id
|
|
)
|
|
container.remove(force=True)
|
|
logger.debug(f"Removed old container {execution.container_id}")
|
|
except Exception:
|
|
pass # Container probably already gone
|
|
|
|
# Delete execution record
|
|
await self.db.delete(execution)
|
|
cleaned_count += 1
|
|
|
|
await self.db.commit()
|
|
logger.info(f"Cleaned up {cleaned_count} old tool executions")
|
|
|
|
return cleaned_count
|