commit ab17a8ac21dc8b9c7b626783444c55a277958698 Author: Aljaz Ceru Date: Thu Sep 11 09:59:16 2025 +0200 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dbd8fb5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +*/*/target/* +*/target/* +.history +*.log +*.tmp +*.bak +*.swp diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5582c8e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,81 @@ +# Use NVIDIA CUDA base image with PyTorch pre-installed for faster builds +# This image includes CUDA, cuDNN, and PyTorch - saving significant build time +FROM nvidia/cuda:12.1.0-cudnn8-runtime-ubuntu22.04 as base + +# Set timezone to avoid interactive prompts +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=UTC + +# Install Python 3.11 and system dependencies +RUN apt-get update && apt-get install -y \ + python3.11 \ + python3.11-dev \ + python3-pip \ + ffmpeg \ + libsndfile1 \ + git \ + tzdata \ + && rm -rf /var/lib/apt/lists/* \ + && update-alternatives --install /usr/bin/python python /usr/bin/python3.11 1 \ + && update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1 + +# Set Python environment +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Set working directory +WORKDIR /app + +# Install PyTorch with CUDA support first (if not using pre-built image) +# This is much faster than installing via requirements.txt +RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 + +# Copy requirements and install remaining dependencies +COPY requirements.txt . +# Remove torch from requirements since we already installed it +RUN grep -v "^torch" requirements.txt > requirements_no_torch.txt && \ + pip install -r requirements_no_torch.txt + +# Install grpcio-tools for protobuf generation +RUN pip install grpcio-tools==1.60.0 + +# Copy proto files and generate gRPC code +COPY proto/ ./proto/ +RUN mkdir -p ./src && \ + python -m grpc_tools.protoc \ + -I./proto \ + --python_out=./src \ + --grpc_python_out=./src \ + ./proto/transcription.proto +# Note: Don't modify imports - they work as-is when sys.path is set correctly + +# Copy application code +COPY src/ ./src/ +COPY entrypoint.sh ./ +RUN chmod +x ./entrypoint.sh + +# Environment variables +ENV MODEL_PATH=large-v3 \ + GRPC_PORT=50051 \ + WEBSOCKET_PORT=8765 \ + ENABLE_WEBSOCKET=true \ + CACHE_DIR=/app/models \ + TORCH_HOME=/app/models \ + HF_HOME=/app/models + +# Create model cache directory +RUN mkdir -p /app/models + +# Volume for model cache +VOLUME ["/app/models"] + +# Expose ports +EXPOSE 50051 8765 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD python -c "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()" || exit 1 + +ENTRYPOINT ["/app/entrypoint.sh"] \ No newline at end of file diff --git a/Dockerfile.pytorch b/Dockerfile.pytorch new file mode 100644 index 0000000..6cde66f --- /dev/null +++ b/Dockerfile.pytorch @@ -0,0 +1,69 @@ +# Alternative: Use official PyTorch image with CUDA support +# This image has PyTorch, CUDA, and cuDNN pre-installed +FROM pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime + +# Set timezone to avoid interactive prompts +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=UTC + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + ffmpeg \ + libsndfile1 \ + git \ + tzdata \ + && rm -rf /var/lib/apt/lists/* + +# Set Python environment +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Set working directory +WORKDIR /app + +# Copy requirements and install dependencies (excluding torch) +COPY requirements.txt . +RUN grep -v "^torch" requirements.txt > requirements_no_torch.txt && \ + pip install -r requirements_no_torch.txt && \ + pip install grpcio-tools==1.60.0 + +# Copy proto files and generate gRPC code +COPY proto/ ./proto/ +RUN mkdir -p ./src && \ + python -m grpc_tools.protoc \ + -I./proto \ + --python_out=./src \ + --grpc_python_out=./src \ + ./proto/transcription.proto +# Note: Don't modify imports - they work as-is when sys.path is set correctly + +# Copy application code +COPY src/ ./src/ +COPY entrypoint.sh ./ +RUN chmod +x ./entrypoint.sh + +# Environment variables +ENV MODEL_PATH=large-v3 \ + GRPC_PORT=50051 \ + WEBSOCKET_PORT=8765 \ + ENABLE_WEBSOCKET=true \ + CACHE_DIR=/app/models \ + TORCH_HOME=/app/models \ + HF_HOME=/app/models + +# Create model cache directory +RUN mkdir -p /app/models + +# Volume for model cache +VOLUME ["/app/models"] + +# Expose ports +EXPOSE 50051 8765 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD python -c "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()" || exit 1 + +ENTRYPOINT ["/app/entrypoint.sh"] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bda921a --- /dev/null +++ b/Makefile @@ -0,0 +1,102 @@ +# Makefile for Transcription API Service + +.PHONY: help build run stop clean test proto rust-client + +# Variables +DOCKER_IMAGE = transcription-api +DOCKER_TAG = latest +GRPC_PORT = 50051 +WEBSOCKET_PORT = 8765 +MODEL = base + +help: ## Show this help message + @echo "Usage: make [target]" + @echo "" + @echo "Available targets:" + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf " %-15s %s\n", $$1, $$2}' + +build: ## Build Docker image + docker build -t $(DOCKER_IMAGE):$(DOCKER_TAG) . + +run: ## Run service with docker-compose + MODEL_PATH=$(MODEL) docker compose up -d + @echo "Service started!" + @echo "gRPC endpoint: localhost:$(GRPC_PORT)" + @echo "WebSocket endpoint: ws://localhost:$(WEBSOCKET_PORT)" + +run-gpu: ## Run service with GPU support + MODEL_PATH=large-v3 CUDA_VISIBLE_DEVICES=0 docker compose up -d + @echo "Service started with GPU support!" + +stop: ## Stop the service + docker compose down + +logs: ## Show service logs + docker compose logs -f + +clean: ## Clean up containers and volumes + docker compose down -v + docker system prune -f + +proto: ## Generate protobuf code + python -m grpc_tools.protoc \ + -I./proto \ + --python_out=./src \ + --grpc_python_out=./src \ + ./proto/transcription.proto + @echo "Generated Python protobuf code in src/" + +rust-client: ## Build Rust client examples + cd examples/rust-client && cargo build --release + @echo "Rust clients built in examples/rust-client/target/release/" + +test-grpc: ## Test gRPC connection + @command -v grpcurl >/dev/null 2>&1 || { echo "grpcurl not installed. Install from https://github.com/fullstorydev/grpcurl"; exit 1; } + grpcurl -plaintext localhost:$(GRPC_PORT) list + grpcurl -plaintext localhost:$(GRPC_PORT) transcription.TranscriptionService/HealthCheck + +test-websocket: ## Test WebSocket connection + @echo "Testing WebSocket connection..." + @python3 -c "import asyncio, websockets, json; \ + async def test(): \ + async with websockets.connect('ws://localhost:$(WEBSOCKET_PORT)') as ws: \ + data = await ws.recv(); \ + print('Connected:', json.loads(data)); \ + asyncio.run(test())" + +install-deps: ## Install Python dependencies + pip install -r requirements.txt + +docker-push: ## Push Docker image to registry + docker tag $(DOCKER_IMAGE):$(DOCKER_TAG) $(DOCKER_REGISTRY)/$(DOCKER_IMAGE):$(DOCKER_TAG) + docker push $(DOCKER_REGISTRY)/$(DOCKER_IMAGE):$(DOCKER_TAG) + +# Model management +download-models: ## Download Whisper models + @echo "Downloading Whisper models..." + python -c "import whisper; \ + for model in ['tiny', 'base', 'small']: \ + print(f'Downloading {model}...'); \ + whisper.load_model(model)" + +# Development +dev-run: ## Run service locally (without Docker) + cd src && python transcription_server.py + +dev-install: ## Install development dependencies + pip install -r requirements.txt + pip install black flake8 pytest pytest-asyncio + +format: ## Format Python code + black src/ + +lint: ## Lint Python code + flake8 src/ + +# Benchmarking +benchmark: ## Run performance benchmark + @echo "Running transcription benchmark..." + time curl -X POST \ + -H "Content-Type: application/octet-stream" \ + --data-binary @test_audio.wav \ + http://localhost:$(GRPC_PORT)/benchmark \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2f2612c --- /dev/null +++ b/README.md @@ -0,0 +1,412 @@ +# Transcription API Service + +A high-performance, standalone transcription service with gRPC and WebSocket support, optimized for real-time speech-to-text applications. Perfect for desktop applications, web services, and IoT devices. + +## Features + +- **Dual Protocol Support**: Both gRPC (recommended) and WebSocket +- **Real-Time Streaming**: Bidirectional audio streaming with immediate transcription +- **Multiple Models**: Support for all Whisper models (tiny to large-v3) +- **Language Support**: 50+ languages with automatic detection +- **Docker Ready**: Simple deployment with Docker Compose +- **Production Ready**: Health checks, monitoring, and graceful shutdown +- **Rust Client Examples**: Ready-to-use Rust client for desktop applications + +## Quick Start + +### Using Docker Compose (Recommended) + +```bash +# Clone the repository +cd transcription-api + +# Start the service (uses 'base' model by default) +docker compose up -d + +# Check logs +docker compose logs -f + +# Stop the service +docker compose down +``` + +### Configuration + +Edit `.env` or `docker-compose.yml` to configure: + +```env +MODEL_PATH=base # tiny, base, small, medium, large, large-v3 +GRPC_PORT=50051 # gRPC service port +WEBSOCKET_PORT=8765 # WebSocket service port +ENABLE_WEBSOCKET=true # Enable WebSocket support +CUDA_VISIBLE_DEVICES=0 # GPU device ID (if available) +``` + +## API Protocols + +### gRPC (Recommended for Desktop Apps) + +**Why gRPC?** +- Strongly typed with Protocol Buffers +- Excellent performance with HTTP/2 +- Built-in streaming support +- Auto-generated client code +- Better error handling + +**Proto Definition**: See `proto/transcription.proto` + +**Service Methods**: +- `StreamTranscribe`: Bidirectional streaming for real-time transcription +- `TranscribeFile`: Single file transcription +- `GetCapabilities`: Query available models and languages +- `HealthCheck`: Service health status + +### WebSocket (Alternative) + +**Protocol**: +```javascript +// Connect +ws://localhost:8765 + +// Send audio +{ + "type": "audio", + "data": "base64_encoded_pcm16_audio" +} + +// Receive transcription +{ + "type": "transcription", + "text": "Hello world", + "start_time": 0.0, + "end_time": 1.5, + "is_final": true, + "timestamp": 1234567890 +} + +// Stop +{ + "type": "stop" +} +``` + +## Rust Client Usage + +### Installation + +```toml +# Add to your Cargo.toml +[dependencies] +tonic = "0.10" +tokio = { version = "1.35", features = ["full"] } +# ... see examples/rust-client/Cargo.toml for full list +``` + +### Live Microphone Transcription + +```rust +use transcription_client::TranscriptionClient; + +#[tokio::main] +async fn main() -> Result<()> { + // Connect to service + let mut client = TranscriptionClient::connect("http://localhost:50051").await?; + + // Start streaming from microphone + let stream = client.stream_from_microphone( + "auto", // language + "transcribe", // task + "base" // model + ).await?; + + // Process transcriptions + while let Some(transcription) = stream.next().await { + println!("{}", transcription.text); + } + + Ok(()) +} +``` + +### Build and Run Examples + +```bash +cd examples/rust-client + +# Build +cargo build --release + +# Run live transcription from microphone +cargo run --bin live-transcribe + +# Transcribe a file +cargo run --bin file-transcribe -- audio.wav + +# Stream a WAV file +cargo run --bin stream-transcribe -- audio.wav --realtime +``` + +## Audio Requirements + +- **Format**: PCM16 (16-bit signed integer) +- **Sample Rate**: 16kHz +- **Channels**: Mono +- **Chunk Size**: Minimum ~500 bytes (flexible for real-time) + +## Performance Optimization + +### For Real-Time Applications + +1. **Use gRPC**: Lower latency than WebSocket +2. **Small Chunks**: Send audio in 0.5-1 second chunks +3. **Model Selection**: + - `tiny`: Fastest, lowest accuracy (real-time on CPU) + - `base`: Good balance (near real-time on CPU) + - `small`: Better accuracy (may lag on CPU) + - `large-v3`: Best accuracy (requires GPU for real-time) + +### GPU Acceleration + +```yaml +# docker-compose.yml +environment: + - CUDA_VISIBLE_DEVICES=0 +deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] +``` + +## Architecture + +``` +┌─────────────┐ +│ Rust App │ +│ (Desktop) │ +└──────┬──────┘ + │ gRPC/HTTP2 + ▼ +┌─────────────┐ +│ Transcription│ +│ Service │ +│ ┌────────┐ │ +│ │Whisper │ │ +│ │ Model │ │ +│ └────────┘ │ +└─────────────┘ +``` + +### Components + +1. **gRPC Server**: Handles streaming audio and returns transcriptions +2. **WebSocket Server**: Alternative protocol for web clients +3. **Transcription Engine**: Whisper/SimulStreaming for speech-to-text +4. **Session Manager**: Handles multiple concurrent streams +5. **Model Cache**: Prevents re-downloading models + +## Advanced Configuration + +### Using SimulStreaming + +For even lower latency, mount SimulStreaming: + +```yaml +volumes: + - ./SimulStreaming:/app/SimulStreaming +environment: + - SIMULSTREAMING_PATH=/app/SimulStreaming +``` + +### Custom Models + +Mount your own Whisper models: + +```yaml +volumes: + - ./models:/app/models +environment: + - MODEL_PATH=/app/models/custom-model.pt +``` + +### Monitoring + +The service exposes metrics on `/metrics` (when enabled): + +```bash +curl http://localhost:9090/metrics +``` + +## API Reference + +### gRPC Methods + +#### StreamTranscribe +```protobuf +rpc StreamTranscribe(stream AudioChunk) returns (stream TranscriptionResult); +``` + +Bidirectional streaming for real-time transcription. Send audio chunks, receive transcriptions. + +#### TranscribeFile +```protobuf +rpc TranscribeFile(AudioFile) returns (TranscriptionResponse); +``` + +Transcribe a complete audio file in one request. + +#### GetCapabilities +```protobuf +rpc GetCapabilities(Empty) returns (Capabilities); +``` + +Query available models, languages, and features. + +#### HealthCheck +```protobuf +rpc HealthCheck(Empty) returns (HealthStatus); +``` + +Check service health and status. + +## Language Support + +Supports 50+ languages including: +- English (en) +- Spanish (es) +- French (fr) +- German (de) +- Italian (it) +- Portuguese (pt) +- Russian (ru) +- Chinese (zh) +- Japanese (ja) +- Korean (ko) +- And many more... + +Use `"auto"` for automatic language detection. + +## Troubleshooting + +### Service won't start +- Check if ports 50051 and 8765 are available +- Ensure Docker has enough memory (minimum 4GB) +- Check logs: `docker compose logs transcription-api` + +### Slow transcription +- Use a smaller model (tiny or base) +- Enable GPU if available +- Reduce audio quality to 16kHz mono +- Send smaller chunks more frequently + +### Connection refused +- Check firewall settings +- Ensure service is running: `docker compose ps` +- Verify correct ports in client configuration + +### High memory usage +- Models are cached in memory for performance +- Use smaller models for limited memory systems +- Set memory limits in docker-compose.yml + +## Development + +### Building from Source + +```bash +# Install dependencies +pip install -r requirements.txt + +# Generate gRPC code +python -m grpc_tools.protoc \ + -I./proto \ + --python_out=./src \ + --grpc_python_out=./src \ + ./proto/transcription.proto + +# Run the service +python src/transcription_server.py +``` + +### Running Tests + +```bash +# Test gRPC connection +grpcurl -plaintext localhost:50051 list + +# Test health check +grpcurl -plaintext localhost:50051 transcription.TranscriptionService/HealthCheck + +# Test with example audio +python test_client.py +``` + +## Production Deployment + +### Docker Swarm + +```bash +docker stack deploy -c docker-compose.yml transcription +``` + +### Kubernetes + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: transcription-api +spec: + replicas: 3 + selector: + matchLabels: + app: transcription-api + template: + metadata: + labels: + app: transcription-api + spec: + containers: + - name: transcription-api + image: transcription-api:latest + ports: + - containerPort: 50051 + name: grpc + - containerPort: 8765 + name: websocket + env: + - name: MODEL_PATH + value: "base" + resources: + requests: + memory: "4Gi" + cpu: "2" + limits: + memory: "8Gi" + cpu: "4" +``` + +### Security + +For production: +1. Enable TLS for gRPC +2. Use WSS for WebSocket +3. Add authentication +4. Rate limiting +5. Input validation + +## License + +MIT License - See LICENSE file for details + +## Contributing + +Contributions welcome! Please read CONTRIBUTING.md for guidelines. + +## Support + +- GitHub Issues: [Report bugs or request features] +- Documentation: [Full API documentation] +- Examples: See `examples/` directory \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..4cae13f --- /dev/null +++ b/build.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# Build script with options for different configurations + +set -e + +# Default values +DOCKERFILE="Dockerfile" +USE_CACHE=true +PLATFORM="linux/amd64" + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + --pytorch) + DOCKERFILE="Dockerfile.pytorch" + echo "Using PyTorch base image (faster build)" + shift + ;; + --cuda) + DOCKERFILE="Dockerfile" + echo "Using NVIDIA CUDA base image" + shift + ;; + --no-cache) + USE_CACHE=false + echo "Building without cache" + shift + ;; + --platform) + PLATFORM="$2" + echo "Building for platform: $PLATFORM" + shift 2 + ;; + --help) + echo "Usage: ./build.sh [options]" + echo "Options:" + echo " --pytorch Use PyTorch base image (fastest)" + echo " --cuda Use NVIDIA CUDA base image (default)" + echo " --no-cache Build without using cache" + echo " --platform Target platform (default: linux/amd64)" + echo " --help Show this help message" + exit 0 + ;; + *) + echo "Unknown option: $1" + exit 1 + ;; + esac +done + +# Build command +BUILD_CMD="docker build" + +if [ "$USE_CACHE" = false ]; then + BUILD_CMD="$BUILD_CMD --no-cache" +fi + +BUILD_CMD="$BUILD_CMD --platform $PLATFORM -f $DOCKERFILE -t transcription-api:latest ." + +echo "Building transcription-api..." +echo "Command: $BUILD_CMD" + +# Execute build +eval $BUILD_CMD + +if [ $? -eq 0 ]; then + echo "" + echo "Build successful!" + echo "" + echo "To run the service:" + echo " docker compose up -d" + echo "" + echo "Or with GPU support:" + echo " docker compose --profile gpu up -d" +else + echo "Build failed!" + exit 1 +fi \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e30b2f8 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,105 @@ +services: + # Main service with GPU support (if available) + # For CPU-only: Use --profile cpu or set CUDA_VISIBLE_DEVICES="" + transcription-api: + build: + context: . + dockerfile: ${DOCKERFILE:-Dockerfile} # Can use Dockerfile.pytorch for faster builds + container_name: transcription-api + environment: + # Model configuration + - MODEL_PATH=${MODEL_PATH:-large-v3} # Options: tiny, base, small, medium, large, large-v2, large-v3 + + # Model cache paths (shared with main project) + - HF_HOME=/app/models + - TORCH_HOME=/app/models + - TRANSFORMERS_CACHE=/app/models + + # Server ports + - GRPC_PORT=50051 + - WEBSOCKET_PORT=8765 + - ENABLE_WEBSOCKET=true + + # Performance tuning + - OMP_NUM_THREADS=4 + - CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-0} # GPU 0 by default + + ports: + - "50051:50051" # gRPC port + - "8765:8765" # WebSocket port + + volumes: + # Model cache - prevents re-downloading models + - whisper-models:/app/models + + # Optional: Mount SimulStreaming if available + # - ./SimulStreaming:/app/SimulStreaming + + restart: unless-stopped + + # Resource limits (adjust based on your system) + deploy: + resources: + limits: + cpus: '4' + memory: 8G + reservations: + cpus: '2' + memory: 4G + # GPU support (requires nvidia-docker2 or Docker 19.03+) + devices: + - driver: nvidia + count: all + capabilities: [gpu] + + # Health check + healthcheck: + test: ["CMD", "python", "-c", "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + + # CPU-only service (for systems without GPU) + transcription-api-cpu: + profiles: ["cpu"] # Only start with --profile cpu + build: + context: . + dockerfile: ${DOCKERFILE:-Dockerfile.pytorch} + container_name: transcription-api-cpu + environment: + - MODEL_PATH=${MODEL_PATH:-base} # Smaller model for CPU + - HF_HOME=/app/models + - TORCH_HOME=/app/models + - TRANSFORMERS_CACHE=/app/models + - GRPC_PORT=50051 + - WEBSOCKET_PORT=8765 + - ENABLE_WEBSOCKET=true + - CUDA_VISIBLE_DEVICES= # No GPU + ports: + - "50051:50051" + - "8765:8765" + volumes: + - whisper-models:/app/models + deploy: + resources: + limits: + cpus: '4' + memory: 8G + reservations: + cpus: '2' + memory: 4G + # No GPU devices for CPU profile + healthcheck: + test: ["CMD", "python", "-c", "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + +volumes: + # Share models with the main transcription project + # This references the volume from the parent project + whisper-models: + external: true + name: real-time-transcriptions_whisper-models diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..3ad7b70 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,59 @@ +#!/bin/bash +set -e + +# Download model if not already cached +# Whisper stores models as .pt files in the root of the cache directory +if [ ! -z "$MODEL_PATH" ]; then + MODEL_FILE="/app/models/$MODEL_PATH.pt" + + # Check if model file exists + if [ ! -f "$MODEL_FILE" ]; then + echo "Model $MODEL_PATH not found at $MODEL_FILE, downloading..." + python -c " +import whisper +import os +# Set all cache paths to use the shared volume +os.environ['TORCH_HOME'] = '/app/models' +os.environ['HF_HOME'] = '/app/models' +os.environ['TRANSFORMERS_CACHE'] = '/app/models' +os.environ['XDG_CACHE_HOME'] = '/app/models' +model_name = '$MODEL_PATH' +print(f'Downloading model {model_name}...') +model = whisper.load_model(model_name, download_root='/app/models') +print(f'Model {model_name} downloaded and cached successfully') +" + else + echo "Model $MODEL_PATH already cached at $MODEL_FILE" + # Just verify it loads properly + python -c " +import whisper +import os +os.environ['TORCH_HOME'] = '/app/models' +os.environ['XDG_CACHE_HOME'] = '/app/models' +model = whisper.load_model('$MODEL_PATH', download_root='/app/models') +print(f'Model $MODEL_PATH loaded successfully from cache') +" + fi +fi + +# Generate gRPC code if not already generated +if [ ! -f "/app/src/transcription_pb2.py" ]; then + echo "Generating gRPC code from proto files..." + python -m grpc_tools.protoc \ + -I/app/proto \ + --python_out=/app/src \ + --grpc_python_out=/app/src \ + /app/proto/transcription.proto + + # Fix imports in generated files (keep absolute import) + # No need to modify - the generated import should work as-is +fi + +# Start the transcription server +echo "Starting Transcription API Server..." +echo "gRPC Port: $GRPC_PORT" +echo "WebSocket Port: $WEBSOCKET_PORT (Enabled: $ENABLE_WEBSOCKET)" +echo "Model: $MODEL_PATH" + +cd /app/src +exec python transcription_server.py \ No newline at end of file diff --git a/examples/rust-client/Cargo.toml b/examples/rust-client/Cargo.toml new file mode 100644 index 0000000..008fd98 --- /dev/null +++ b/examples/rust-client/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "transcription-client" +version = "0.1.0" +edition = "2021" + +[dependencies] +# gRPC and Protocol Buffers +tonic = "0.10" +prost = "0.12" +tokio = { version = "1.35", features = ["full"] } +tokio-stream = "0.1" + +# Audio handling +hound = "3.5" # WAV file reading/writing +cpal = "0.15" # Cross-platform audio capture +rodio = "0.17" # Audio playback + +# Utilities +anyhow = "1.0" +tracing = "0.1" +tracing-subscriber = "0.3" +clap = { version = "4.4", features = ["derive"] } +futures-util = "0.3" + +[build-dependencies] +tonic-build = "0.10" + +[[bin]] +name = "stream-transcribe" +path = "src/stream_transcribe.rs" + +[[bin]] +name = "file-transcribe" +path = "src/file_transcribe.rs" + +[[bin]] +name = "live-transcribe" +path = "src/live_transcribe.rs" + +[[bin]] +name = "realtime-playback" +path = "src/realtime_playback.rs" \ No newline at end of file diff --git a/examples/rust-client/README.md b/examples/rust-client/README.md new file mode 100644 index 0000000..0679650 --- /dev/null +++ b/examples/rust-client/README.md @@ -0,0 +1,126 @@ +# Rust Transcription Client Examples + +This directory contains Rust client examples for the Transcription API service. + +## Available Clients + +### 1. `file-transcribe` - File Transcription +Transcribe audio files either by sending the entire file or streaming in real-time chunks. + +```bash +# Send entire file at once (fast, but no real-time feedback) +cargo run --bin file-transcribe -- audio.wav + +# Stream file in chunks for real-time transcription (like YouTube) +cargo run --bin file-transcribe -- audio.wav --stream + +# With VAD (Voice Activity Detection) to filter silence +cargo run --bin file-transcribe -- audio.wav --stream --vad + +# Specify model and language +cargo run --bin file-transcribe -- audio.wav --stream --model large-v3 --language en +``` + +### 2. `realtime-playback` - Play Audio with Live Transcription +Plays audio through your speakers while showing real-time transcriptions, similar to YouTube's live captions. + +```bash +# Basic usage - plays audio and shows transcriptions +cargo run --bin realtime-playback -- audio.wav + +# With timestamps for each transcription +cargo run --bin realtime-playback -- audio.wav --timestamps + +# With VAD to reduce noise transcriptions +cargo run --bin realtime-playback -- audio.wav --vad + +# Using a specific model +cargo run --bin realtime-playback -- audio.wav --model large-v3 +``` + +### 3. `stream-transcribe` - Stream WAV Files +Streams WAV files chunk by chunk for transcription. + +```bash +# Stream without delays (fast processing) +cargo run --bin stream-transcribe -- audio.wav + +# Simulate real-time streaming with delays +cargo run --bin stream-transcribe -- audio.wav --realtime +``` + +### 4. `live-transcribe` - Live Microphone Transcription +Captures audio from your microphone and transcribes in real-time. + +```bash +# Use default microphone +cargo run --bin live-transcribe + +# Specify server and language +cargo run --bin live-transcribe -- --server http://localhost:50051 --language en +``` + +## Building + +```bash +# Build all binaries +cargo build --release + +# Build specific binary +cargo build --release --bin realtime-playback +``` + +## Common Options + +All clients support these common options: +- `--server ` - gRPC server address (default: http://localhost:50051) +- `--language ` - Language code: en, es, fr, de, etc., or "auto" (default: auto) +- `--model ` - Model to use: tiny, base, small, medium, large-v3 (default: base) +- `--vad` - Enable Voice Activity Detection to filter silence + +## Features + +### Real-time Streaming +The `--stream` flag in `file-transcribe` and the `realtime-playback` binary both support real-time streaming, which means: +- Audio is sent in small chunks (0.5 second intervals) +- Transcriptions appear as the audio is being processed +- Similar experience to YouTube's live captions +- Lower latency compared to sending entire file + +### Voice Activity Detection (VAD) +When `--vad` is enabled, the service will: +- Filter out silence and background noise +- Reduce false transcriptions (like repeated "Thank you") +- Improve transcription quality for speech-only content + +### Audio Playback +The `realtime-playback` binary uses the `rodio` library to: +- Play audio through your system's default audio output +- Synchronize playback with transcription display +- Support multiple audio formats (WAV, MP3, FLAC, etc.) + +## Requirements + +- Rust 1.70 or later +- The Transcription API server running (usually on localhost:50051) +- For live transcription: A working microphone +- For playback: Audio output device (speakers/headphones) + +## Troubleshooting + +### "Connection refused" error +Make sure the Transcription API server is running: +```bash +cd ../../ +docker compose up +``` + +### No audio playback +- Check your system's default audio output device +- Ensure the audio file format is supported (WAV, MP3, FLAC) +- Try with a different audio file + +### Poor transcription quality +- Use a larger model (e.g., `--model large-v3`) +- Enable VAD to filter noise (`--vad`) +- Ensure audio quality is good (16kHz or higher recommended) \ No newline at end of file diff --git a/examples/rust-client/build.rs b/examples/rust-client/build.rs new file mode 100644 index 0000000..333d21d --- /dev/null +++ b/examples/rust-client/build.rs @@ -0,0 +1,10 @@ +fn main() -> Result<(), Box> { + // Compile protobuf files + tonic_build::configure() + .build_server(false) // We only need the client + .compile( + &["../../proto/transcription.proto"], + &["../../proto"], + )?; + Ok(()) +} \ No newline at end of file diff --git a/examples/rust-client/src/file_transcribe.rs b/examples/rust-client/src/file_transcribe.rs new file mode 100644 index 0000000..4f83bf4 --- /dev/null +++ b/examples/rust-client/src/file_transcribe.rs @@ -0,0 +1,232 @@ +//! File transcription using gRPC +//! +//! This example shows how to transcribe an audio file. +//! Use --stream flag for real-time streaming instead of sending the entire file. + +use anyhow::Result; +use clap::Parser; +use std::fs; +use tonic::transport::Channel; +use tracing::{info, debug}; +use futures_util::StreamExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use std::time::Duration; +use tokio::time; + +// Import generated protobuf types +pub mod transcription { + tonic::include_proto!("transcription"); +} + +use transcription::{ + transcription_service_client::TranscriptionServiceClient, AudioConfig, AudioFile, AudioChunk, +}; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Audio file path + file: String, + + /// gRPC server address + #[arg(short, long, default_value = "http://localhost:50051")] + server: String, + + /// Language code (e.g., "en", "es", "auto") + #[arg(short, long, default_value = "auto")] + language: String, + + /// Task: transcribe or translate + #[arg(short, long, default_value = "transcribe")] + task: String, + + /// Model to use + #[arg(short, long, default_value = "base")] + model: String, + + /// Stream the file in chunks for real-time transcription + #[arg(long)] + stream: bool, + + /// Enable VAD (Voice Activity Detection) + #[arg(short, long)] + vad: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + + info!("Reading audio file: {}", args.file); + + info!("Connecting to transcription service at {}", args.server); + let mut client = TranscriptionServiceClient::connect(args.server.clone()).await?; + + if args.stream { + // Stream mode - send file in chunks for real-time transcription + stream_file(&mut client, &args).await?; + } else { + // Normal mode - send entire file at once + transcribe_entire_file(&mut client, &args).await?; + } + + Ok(()) +} + +async fn transcribe_entire_file( + client: &mut TranscriptionServiceClient, + args: &Args, +) -> Result<()> { + let audio_data = fs::read(&args.file)?; + + // Determine format from extension + let format = match args.file.split('.').last() { + Some("wav") => "wav", + Some("mp3") => "mp3", + Some("webm") => "webm", + _ => "wav", // Default to WAV + }; + + let config = AudioConfig { + language: args.language.clone(), + task: args.task.clone(), + model: args.model.clone(), + sample_rate: 16000, + vad_enabled: args.vad, + }; + + let request = AudioFile { + audio_data, + format: format.to_string(), + config: Some(config), + }; + + info!("Sending entire file for transcription..."); + let response = client.transcribe_file(request).await?; + let result = response.into_inner(); + + println!("\n=== Transcription Results ==="); + println!("Language: {}", result.detected_language); + println!("Duration: {:.2} seconds", result.duration_seconds); + println!("\nFull Text:"); + println!("{}", result.full_text); + + if !result.segments.is_empty() { + println!("\n=== Segments ==="); + for (i, segment) in result.segments.iter().enumerate() { + println!( + "[{:03}] {:.2}s - {:.2}s (conf: {:.2}): {}", + i + 1, + segment.start_time, + segment.end_time, + segment.confidence, + segment.text + ); + } + } + + Ok(()) +} + +async fn stream_file( + client: &mut TranscriptionServiceClient, + args: &Args, +) -> Result<()> { + let audio_data = fs::read(&args.file)?; + + info!("Streaming file in real-time chunks..."); + + // Create channel for audio chunks + let (tx, rx) = mpsc::channel::(100); + + // Spawn task to send audio chunks + let tx_clone = tx.clone(); + let language = args.language.clone(); + let task = args.task.clone(); + let model = args.model.clone(); + let vad = args.vad; + + tokio::spawn(async move { + // Send configuration first + let config = AudioConfig { + language, + task, + model, + sample_rate: 16000, + vad_enabled: vad, + }; + + let config_chunk = AudioChunk { + audio_data: vec![], + session_id: "file-stream".to_string(), + config: Some(config), + }; + + if tx_clone.send(config_chunk).await.is_err() { + return; + } + + // Assuming PCM16 audio at 16kHz + // Send in 3 second chunks for better accuracy (96000 bytes = 48000 samples = 3 seconds) + let chunk_size = 96000; + + for (idx, chunk) in audio_data.chunks(chunk_size).enumerate() { + let audio_chunk = AudioChunk { + audio_data: chunk.to_vec(), + session_id: String::new(), + config: None, + }; + + debug!("Sending chunk {} ({} bytes)", idx, chunk.len()); + + if tx_clone.send(audio_chunk).await.is_err() { + break; + } + + // Simulate real-time streaming (3 seconds per chunk) + time::sleep(Duration::from_secs(3)).await; + } + + info!("Finished streaming all chunks"); + }); + + // Create stream and start transcription + let stream = ReceiverStream::new(rx); + let response = client.stream_transcribe(stream).await?; + let mut result_stream = response.into_inner(); + + println!("\n=== Real-time Transcription ==="); + println!("Streaming and transcribing...\n"); + + let mut full_transcript = String::new(); + + // Process results + while let Some(result) = result_stream.next().await { + match result { + Ok(transcription) => { + println!("[{:06.2}s - {:06.2}s] {}", + transcription.start_time, + transcription.end_time, + transcription.text); + + if transcription.is_final { + full_transcript.push_str(&transcription.text); + full_transcript.push(' '); + } + } + Err(e) => { + eprintln!("Error: {}", e); + break; + } + } + } + + println!("\n=== Full Transcript ==="); + println!("{}", full_transcript.trim()); + + Ok(()) +} \ No newline at end of file diff --git a/examples/rust-client/src/live_transcribe.rs b/examples/rust-client/src/live_transcribe.rs new file mode 100644 index 0000000..7883cb7 --- /dev/null +++ b/examples/rust-client/src/live_transcribe.rs @@ -0,0 +1,188 @@ +//! Live microphone transcription using gRPC streaming +//! +//! This example shows how to capture audio from the microphone +//! and stream it to the transcription service in real-time. + +use anyhow::Result; +use clap::Parser; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use futures_util::StreamExt; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc as tokio_mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{error, info}; + +// Import generated protobuf types +pub mod transcription { + tonic::include_proto!("transcription"); +} + +use transcription::{ + transcription_service_client::TranscriptionServiceClient, AudioChunk, AudioConfig, +}; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// gRPC server address + #[arg(short, long, default_value = "http://localhost:50051")] + server: String, + + /// Language code (e.g., "en", "es", "auto") + #[arg(short, long, default_value = "auto")] + language: String, + + /// Task: transcribe or translate + #[arg(short, long, default_value = "transcribe")] + task: String, + + /// Model to use + #[arg(short, long, default_value = "base")] + model: String, + + /// Session ID + #[arg(long)] + session_id: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + + info!("Connecting to transcription service at {}", args.server); + let mut client = TranscriptionServiceClient::connect(args.server).await?; + + // Create channel for audio data + let (audio_tx, audio_rx) = tokio_mpsc::channel::(100); + + // Start audio capture in a separate thread + let audio_tx_clone = audio_tx.clone(); + std::thread::spawn(move || { + if let Err(e) = capture_audio(audio_tx_clone) { + error!("Audio capture error: {}", e); + } + }); + + // Send initial configuration + let session_id = args.session_id.unwrap_or_else(|| { + format!("rust-client-{}", std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs()) + }); + + info!("Starting transcription session: {}", session_id); + + // Create the first chunk with configuration + let config = AudioConfig { + language: args.language.clone(), + task: args.task.clone(), + model: args.model.clone(), + sample_rate: 16000, + vad_enabled: false, + }; + + // Send a configuration chunk first + let config_chunk = AudioChunk { + audio_data: vec![], + session_id: session_id.clone(), + config: Some(config), + }; + + audio_tx.send(config_chunk).await?; + + // Create stream from receiver + let audio_stream = ReceiverStream::new(audio_rx); + + // Start bidirectional streaming + let response = client.stream_transcribe(audio_stream).await?; + let mut stream = response.into_inner(); + + info!("Listening... Press Ctrl+C to stop"); + + // Process transcription results + while let Some(result) = stream.next().await { + match result { + Ok(transcription) => { + if transcription.is_final { + println!("\n[FINAL] {}", transcription.text); + } else { + print!("\r[PARTIAL] {} ", transcription.text); + use std::io::{self, Write}; + io::stdout().flush()?; + } + } + Err(e) => { + error!("Transcription error: {}", e); + break; + } + } + } + + Ok(()) +} + +/// Capture audio from the default microphone +fn capture_audio(tx: tokio_mpsc::Sender) -> Result<()> { + let host = cpal::default_host(); + let device = host.default_input_device() + .ok_or_else(|| anyhow::anyhow!("No input device available"))?; + + info!("Using audio device: {}", device.name()?); + + // Configure audio capture for 16kHz mono PCM16 + let config = cpal::StreamConfig { + channels: 1, + sample_rate: cpal::SampleRate(16000), + buffer_size: cpal::BufferSize::Default, + }; + + // Buffer to accumulate audio samples + let buffer = Arc::new(Mutex::new(Vec::new())); + let buffer_clone = buffer.clone(); + + // Create audio stream + let stream = device.build_input_stream( + &config, + move |data: &[i16], _: &cpal::InputCallbackInfo| { + let mut buf = buffer_clone.lock().unwrap(); + buf.extend_from_slice(data); + + // Send chunks of ~0.5 seconds (8000 samples at 16kHz) + while buf.len() >= 8000 { + let chunk: Vec = buf.drain(..8000).collect(); + + // Convert i16 to bytes + let bytes: Vec = chunk.iter() + .flat_map(|&sample| sample.to_le_bytes()) + .collect(); + + // Send audio chunk + let audio_chunk = AudioChunk { + audio_data: bytes, + session_id: String::new(), // Already set in config chunk + config: None, + }; + + // Use blocking send since we're in a non-async context + if let Err(e) = tx.blocking_send(audio_chunk) { + error!("Failed to send audio chunk: {}", e); + } + } + }, + move |err| { + error!("Audio stream error: {}", err); + }, + None, + )?; + + stream.play()?; + + // Keep the stream alive + std::thread::park(); + + Ok(()) +} \ No newline at end of file diff --git a/examples/rust-client/src/realtime_playback.rs b/examples/rust-client/src/realtime_playback.rs new file mode 100644 index 0000000..3c64cb5 --- /dev/null +++ b/examples/rust-client/src/realtime_playback.rs @@ -0,0 +1,216 @@ +//! Real-time audio playback with synchronized transcription +//! +//! This example plays an audio file while streaming it for transcription, +//! showing transcriptions in real-time similar to YouTube. + +use anyhow::Result; +use clap::Parser; +use futures_util::StreamExt; +use rodio::{Decoder, OutputStream, Source}; +use std::fs::File; +use std::io::BufReader; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tokio::time; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{info, debug}; +use hound::WavReader; + +// Import generated protobuf types +pub mod transcription { + tonic::include_proto!("transcription"); +} + +use transcription::{ + transcription_service_client::TranscriptionServiceClient, AudioChunk, AudioConfig, +}; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Audio file path (WAV, MP3, FLAC, etc.) + file: String, + + /// gRPC server address + #[arg(short, long, default_value = "http://localhost:50051")] + server: String, + + /// Language code (e.g., "en", "es", "auto") + #[arg(short, long, default_value = "auto")] + language: String, + + /// Model to use + #[arg(short, long, default_value = "base")] + model: String, + + /// Enable VAD (Voice Activity Detection) + #[arg(short, long)] + vad: bool, + + /// Show timestamps + #[arg(short = 't', long)] + timestamps: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + let file_path = args.file.clone(); + + info!("Loading audio file: {}", file_path); + + // Start audio playback in a separate thread + let (_stream, stream_handle) = OutputStream::try_default()?; + let file = BufReader::new(File::open(&file_path)?); + let source = Decoder::new(file)?; + let sample_rate = source.sample_rate(); + let channels = source.channels(); + // Convert to f32 samples for playback + let source = source.convert_samples::(); + + info!("Audio format: {} Hz, {} channels", sample_rate, channels); + + // Also open the file for streaming to transcription service + // We need to read the raw audio data for transcription + let mut wav_reader = WavReader::open(&file_path)?; + let wav_spec = wav_reader.spec(); + + // Collect all samples for streaming + let samples: Vec = wav_reader.samples::() + .collect::>>()?; + + info!("Connecting to transcription service at {}", args.server); + let mut client = TranscriptionServiceClient::connect(args.server.clone()).await?; + + // Create channel for audio chunks + let (tx, rx) = mpsc::channel::(100); + + // Calculate chunk duration for synchronization + let chunk_samples = 16000 * 3; // 3 second chunks at 16kHz for better accuracy + let chunk_duration = Duration::from_secs(3); + + // Start playback + println!("\n🎵 Starting audio playback with real-time transcription...\n"); + println!("{}", "─".repeat(80)); + + let start_time = Instant::now(); + + // Play audio + stream_handle.play_raw(source)?; + + // Spawn task to stream audio chunks to transcription service + let tx_clone = tx.clone(); + let show_timestamps = args.timestamps; + tokio::spawn(async move { + // Send configuration first + let config = AudioConfig { + language: args.language.clone(), + task: "transcribe".to_string(), + model: args.model.clone(), + sample_rate: 16000, + vad_enabled: args.vad, + }; + + let config_chunk = AudioChunk { + audio_data: vec![], + session_id: "realtime-playback".to_string(), + config: Some(config), + }; + + if tx_clone.send(config_chunk).await.is_err() { + return; + } + + // Stream audio chunks synchronized with playback + for (chunk_idx, chunk) in samples.chunks(chunk_samples).enumerate() { + let chunk_start = Instant::now(); + + // Convert samples to bytes + let bytes: Vec = chunk.iter() + .flat_map(|&s| s.to_le_bytes()) + .collect(); + + let audio_chunk = AudioChunk { + audio_data: bytes, + session_id: String::new(), + config: None, + }; + + debug!("Sending chunk {} ({} samples)", chunk_idx, chunk.len()); + + if tx_clone.send(audio_chunk).await.is_err() { + break; + } + + // Synchronize with playback timing + let elapsed = chunk_start.elapsed(); + if elapsed < chunk_duration { + time::sleep(chunk_duration - elapsed).await; + } + } + + info!("Finished streaming audio chunks"); + }); + + // Create stream and start transcription + let stream = ReceiverStream::new(rx); + let response = client.stream_transcribe(stream).await?; + let mut result_stream = response.into_inner(); + + // Process transcription results + let mut last_text = String::new(); + let mut current_line = String::new(); + + while let Some(result) = result_stream.next().await { + match result { + Ok(transcription) => { + let elapsed = start_time.elapsed().as_secs_f32(); + + // Clear previous line if text has changed significantly + if !transcription.text.is_empty() && transcription.text != last_text { + if show_timestamps { + // Show with timestamps + println!("[{:06.2}s] {}", + elapsed, + transcription.text); + } else { + // Update current line for continuous display + if transcription.is_final { + // Final transcription for this segment + println!("{}", transcription.text); + current_line.clear(); + } else { + // Interim result - update in place + print!("\r{:<80}", transcription.text); + use std::io::{self, Write}; + io::stdout().flush()?; + current_line = transcription.text.clone(); + } + } + + last_text = transcription.text.clone(); + } + } + Err(e) => { + eprintln!("\nTranscription error: {}", e); + break; + } + } + } + + // Clear any remaining interim text + if !current_line.is_empty() { + println!(); + } + + println!("\n{}", "─".repeat(80)); + println!("✅ Playback and transcription complete!"); + + // Keep the program alive until playback finishes + time::sleep(Duration::from_secs(2)).await; + + Ok(()) +} \ No newline at end of file diff --git a/examples/rust-client/src/stream_transcribe.rs b/examples/rust-client/src/stream_transcribe.rs new file mode 100644 index 0000000..e00747f --- /dev/null +++ b/examples/rust-client/src/stream_transcribe.rs @@ -0,0 +1,142 @@ +//! Stream WAV file for real-time transcription +//! +//! This example shows how to stream a WAV file chunk by chunk +//! to simulate real-time transcription. + +use anyhow::Result; +use clap::Parser; +use futures_util::StreamExt; +use hound::WavReader; +use std::fs::File; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::time; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; +use tracing::info; + +// Import generated protobuf types +pub mod transcription { + tonic::include_proto!("transcription"); +} + +use transcription::{ + transcription_service_client::TranscriptionServiceClient, AudioChunk, AudioConfig, +}; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// WAV file path + file: String, + + /// gRPC server address + #[arg(short, long, default_value = "http://localhost:50051")] + server: String, + + /// Language code (e.g., "en", "es", "auto") + #[arg(short, long, default_value = "auto")] + language: String, + + /// Simulate real-time by adding delays + #[arg(short, long)] + realtime: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + + info!("Reading WAV file: {}", args.file); + let mut reader = WavReader::open(&args.file)?; + let spec = reader.spec(); + + info!("WAV specs: {} Hz, {} channels, {} bits", + spec.sample_rate, spec.channels, spec.bits_per_sample); + + // Collect samples + let samples: Vec = reader.samples::() + .collect::>>()?; + + info!("Connecting to transcription service at {}", args.server); + let mut client = TranscriptionServiceClient::connect(args.server).await?; + + // Create channel for audio chunks + let (tx, rx) = mpsc::channel::(100); + + // Spawn task to send audio chunks + let tx_clone = tx.clone(); + let realtime = args.realtime; + tokio::spawn(async move { + // Send configuration first + let config = AudioConfig { + language: args.language.clone(), + task: "transcribe".to_string(), + model: "base".to_string(), + sample_rate: 16000, + vad_enabled: false, + }; + + let config_chunk = AudioChunk { + audio_data: vec![], + session_id: "stream-test".to_string(), + config: Some(config), + }; + + if tx_clone.send(config_chunk).await.is_err() { + return; + } + + // Send audio in chunks of 3 seconds for better accuracy (48000 samples at 16kHz) + let chunk_size = 48000; + for chunk in samples.chunks(chunk_size) { + // Convert samples to bytes + let bytes: Vec = chunk.iter() + .flat_map(|&s| s.to_le_bytes()) + .collect(); + + let audio_chunk = AudioChunk { + audio_data: bytes, + session_id: String::new(), + config: None, + }; + + if tx_clone.send(audio_chunk).await.is_err() { + break; + } + + // Simulate real-time streaming + if realtime { + time::sleep(Duration::from_secs(3)).await; + } + } + }); + + // Create stream and start transcription + let stream = ReceiverStream::new(rx); + let response = client.stream_transcribe(stream).await?; + let mut result_stream = response.into_inner(); + + info!("Streaming audio and receiving transcriptions..."); + + // Process results + while let Some(result) = result_stream.next().await { + match result { + Ok(transcription) => { + println!("[{:.2}s - {:.2}s] {}", + transcription.start_time, + transcription.end_time, + transcription.text); + } + Err(e) => { + eprintln!("Error: {}", e); + break; + } + } + } + + Ok(()) +} \ No newline at end of file diff --git a/generate_proto.py b/generate_proto.py new file mode 100644 index 0000000..a11bb13 --- /dev/null +++ b/generate_proto.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +""" +Generate Python code from protobuf definitions +Run this before starting the service for the first time +""" + +import os +import sys +import subprocess + +def generate_proto(): + """Generate Python code from proto files""" + proto_dir = "proto" + output_dir = "src" + + # Create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # Find all .proto files + proto_files = [f for f in os.listdir(proto_dir) if f.endswith('.proto')] + + if not proto_files: + print("No .proto files found in proto/ directory") + return False + + for proto_file in proto_files: + proto_path = os.path.join(proto_dir, proto_file) + print(f"Generating code for {proto_file}...") + + # Generate Python code + cmd = [ + sys.executable, "-m", "grpc_tools.protoc", + f"-I{proto_dir}", + f"--python_out={output_dir}", + f"--grpc_python_out={output_dir}", + proto_path + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + print(f"✓ Generated {proto_file.replace('.proto', '_pb2.py')} and {proto_file.replace('.proto', '_pb2_grpc.py')}") + except subprocess.CalledProcessError as e: + print(f"✗ Failed to generate code for {proto_file}") + print(f" Error: {e.stderr}") + return False + + # Fix imports in generated files + print("Fixing imports in generated files...") + grpc_file = os.path.join(output_dir, "transcription_pb2_grpc.py") + if os.path.exists(grpc_file): + with open(grpc_file, 'r') as f: + content = f.read() + + # Fix relative import + content = content.replace( + "import transcription_pb2", + "from . import transcription_pb2" + ) + + with open(grpc_file, 'w') as f: + f.write(content) + + print("✓ Fixed imports") + + print("\nProtobuf generation complete!") + print(f"Generated files are in {output_dir}/") + return True + +if __name__ == "__main__": + # Check if grpcio-tools is installed + try: + import grpc_tools + except ImportError: + print("Error: grpcio-tools not installed") + print("Run: pip install grpcio-tools") + sys.exit(1) + + if generate_proto(): + print("\nYou can now start the service with:") + print(" python src/transcription_server.py") + print("Or with Docker:") + print(" docker compose up") + else: + sys.exit(1) \ No newline at end of file diff --git a/proto/transcription.proto b/proto/transcription.proto new file mode 100644 index 0000000..18a9625 --- /dev/null +++ b/proto/transcription.proto @@ -0,0 +1,91 @@ +syntax = "proto3"; + +package transcription; + +// The transcription service provides real-time speech-to-text capabilities +service TranscriptionService { + // Bidirectional streaming: send audio chunks, receive transcriptions + rpc StreamTranscribe(stream AudioChunk) returns (stream TranscriptionResult); + + // Unary call for single audio file transcription + rpc TranscribeFile(AudioFile) returns (TranscriptionResponse); + + // Get available models and languages + rpc GetCapabilities(Empty) returns (Capabilities); + + // Health check + rpc HealthCheck(Empty) returns (HealthStatus); +} + +// Audio chunk for streaming +message AudioChunk { + bytes audio_data = 1; // PCM16 audio data (16-bit, 16kHz, mono) + string session_id = 2; // Optional session ID for tracking + AudioConfig config = 3; // Optional config (only needed in first chunk) +} + +// Audio configuration +message AudioConfig { + string language = 1; // Language code (e.g., "en", "es", "auto") + string task = 2; // "transcribe" or "translate" + string model = 3; // Model size: "tiny", "base", "small", "medium", "large-v3" + int32 sample_rate = 4; // Sample rate (default: 16000) + bool vad_enabled = 5; // Voice Activity Detection +} + +// Transcription result for streaming +message TranscriptionResult { + string text = 1; // Transcribed text + float start_time = 2; // Start time in seconds + float end_time = 3; // End time in seconds + bool is_final = 4; // Is this a final result? + float confidence = 5; // Confidence score (0-1) + string language = 6; // Detected language + string session_id = 7; // Session ID for tracking + int64 timestamp_ms = 8; // Server timestamp in milliseconds +} + +// Complete audio file for transcription +message AudioFile { + bytes audio_data = 1; // Complete audio file data + string format = 2; // Format: "wav", "mp3", "webm", "raw_pcm16" + AudioConfig config = 3; // Audio configuration +} + +// Response for file transcription +message TranscriptionResponse { + repeated TranscriptionSegment segments = 1; + string full_text = 2; // Complete transcription + string detected_language = 3; + float duration_seconds = 4; +} + +// Transcription segment +message TranscriptionSegment { + string text = 1; + float start_time = 2; + float end_time = 3; + float confidence = 4; +} + +// Service capabilities +message Capabilities { + repeated string available_models = 1; + repeated string supported_languages = 2; + repeated string supported_formats = 3; + int32 max_audio_length_seconds = 4; + bool streaming_supported = 5; + bool vad_supported = 6; +} + +// Health status +message HealthStatus { + bool healthy = 1; + string status = 2; + string model_loaded = 3; + int64 uptime_seconds = 4; + int32 active_sessions = 5; +} + +// Empty message for requests without parameters +message Empty {} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a9f1919 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,25 @@ +# Core dependencies +grpcio==1.60.0 +grpcio-tools==1.60.0 +websockets==12.0 +fastapi==0.109.0 +uvicorn[standard]==0.27.0 + +# Audio processing +numpy==1.24.3 +soundfile==0.12.1 +librosa==0.10.1 + +# SimulStreaming/Whisper dependencies +torch>=2.0.0 +transformers>=4.36.0 +openai-whisper>=20231117 + +# Utilities +python-multipart==0.0.6 +aiofiles==23.2.1 +pydantic==2.5.3 +python-dotenv==1.0.0 + +# Monitoring +prometheus-client==0.19.0 \ No newline at end of file diff --git a/src/transcription_server.py b/src/transcription_server.py new file mode 100644 index 0000000..c53f9e0 --- /dev/null +++ b/src/transcription_server.py @@ -0,0 +1,518 @@ +#!/usr/bin/env python3 +""" +Standalone Transcription Service with gRPC and WebSocket support +Optimized for real-time streaming transcription +""" + +import os +import sys +import asyncio +import logging +import time +import json +import base64 +from typing import Optional, AsyncIterator, Dict, List +from dataclasses import dataclass, asdict +from concurrent import futures +import threading +from datetime import datetime + +# Add current directory to path for generated protobuf imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import grpc +import numpy as np +import soundfile +import librosa +import torch + +# Add SimulStreaming to path if available +simulstreaming_path = os.environ.get('SIMULSTREAMING_PATH', '/app/SimulStreaming') +if os.path.exists(simulstreaming_path): + sys.path.insert(0, simulstreaming_path) + USE_SIMULSTREAMING = True + try: + from simulstreaming_whisper import simulwhisper_args, simul_asr_factory + except ImportError: + USE_SIMULSTREAMING = False + import whisper +else: + USE_SIMULSTREAMING = False + import whisper + +# Import generated protobuf classes (will be generated later) +from transcription_pb2 import ( + AudioChunk, AudioFile, TranscriptionResult, TranscriptionResponse, + TranscriptionSegment, Capabilities, HealthStatus, Empty, AudioConfig +) +import transcription_pb2_grpc + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +SAMPLE_RATE = 16000 +MAX_AUDIO_LENGTH = 30 * 60 # 30 minutes + + +@dataclass +class TranscriptionSession: + """Manages a single transcription session""" + session_id: str + config: AudioConfig + audio_buffer: bytearray + start_time: float + last_activity: float + transcriptions: List[dict] + + +class TranscriptionEngine: + """Core transcription engine using Whisper or SimulStreaming""" + + def __init__(self, model_name: str = "large-v3"): + self.model_name = model_name + self.model = None + self.processor = None + self.online_processor = None + self.device = "cuda" if torch.cuda.is_available() else "cpu" + self.load_model() + + def load_model(self): + """Load the transcription model""" + if USE_SIMULSTREAMING: + self._load_simulstreaming() + else: + self._load_whisper() + + def _load_simulstreaming(self): + """Load SimulStreaming for real-time transcription""" + try: + import argparse + parser = argparse.ArgumentParser() + + # Add SimulStreaming arguments + simulwhisper_args(parser) + + args = parser.parse_args([ + '--model_path', self.model_name, + '--lan', 'auto', + '--task', 'transcribe', + '--backend', 'whisper', + '--min-chunk-size', '0.5', + '--beams', '1', + ]) + + # Create processor + self.processor, self.online_processor = simul_asr_factory(args) + logger.info(f"Loaded SimulStreaming with model: {self.model_name}") + + except Exception as e: + logger.error(f"Failed to load SimulStreaming: {e}") + logger.info("Falling back to standard Whisper") + USE_SIMULSTREAMING = False + self._load_whisper() + + def _load_whisper(self): + """Load standard Whisper model""" + try: + # Use the shared volume for model caching + download_root = os.environ.get('TORCH_HOME', '/app/models') + self.model = whisper.load_model(self.model_name, device=self.device, download_root=download_root) + logger.info(f"Loaded Whisper model: {self.model_name} on {self.device} from {download_root}") + except Exception as e: + logger.error(f"Failed to load Whisper model: {e}") + raise + + def is_speech(self, audio: np.ndarray, energy_threshold: float = 0.002, zero_crossing_threshold: int = 50) -> bool: + """ + Simple Voice Activity Detection + Returns True if the audio chunk likely contains speech + """ + # Check if audio is too quiet (likely silence) + energy = np.sqrt(np.mean(audio**2)) + if energy < energy_threshold: + return False + + # Check zero crossing rate (helps distinguish speech from noise) + zero_crossings = np.sum(np.abs(np.diff(np.sign(audio))) > 0) + + # Speech typically has moderate zero crossing rate + # Pure noise tends to have very high zero crossing rate + if zero_crossings > len(audio) * zero_crossing_threshold / SAMPLE_RATE: + return False + + return True + + def transcribe_chunk(self, audio_data: bytes, language: str = "auto", vad_enabled: bool = True) -> Optional[dict]: + """Transcribe a single audio chunk""" + try: + # Convert bytes to numpy array + audio = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0 + + # Check if audio contains speech (VAD) - only if enabled + if vad_enabled: + energy = np.sqrt(np.mean(audio**2)) + if not self.is_speech(audio): + logger.info(f"No speech detected in audio chunk (energy: {energy:.4f}), skipping transcription") + return None + else: + logger.info(f"Speech detected in chunk (energy: {energy:.4f})") + + if USE_SIMULSTREAMING and self.online_processor: + # Use SimulStreaming for real-time processing + self.online_processor.insert_audio_chunk(audio) + result = self.online_processor.process_iter() + + if result and result[0] is not None: + return { + 'text': result[2], + 'start_time': result[0], + 'end_time': result[1], + 'is_final': True, + 'confidence': 0.95 # SimulStreaming doesn't provide confidence + } + else: + # Use standard Whisper + if self.model: + # Pad audio to minimum length if needed + if len(audio) < SAMPLE_RATE: + audio = np.pad(audio, (0, SAMPLE_RATE - len(audio))) + + # Use more conservative settings to reduce hallucinations + result = self.model.transcribe( + audio, + language=None if language == "auto" else language, + fp16=self.device == "cuda", + temperature=0.0, # More deterministic, less hallucination + no_speech_threshold=0.6, # Higher threshold for detecting non-speech + logprob_threshold=-1.0, # Filter out low probability results + compression_ratio_threshold=2.4 # Filter out repetitive results + ) + + if result and result.get('text'): + text = result['text'].strip() + + # Filter out common hallucinations + hallucination_phrases = [ + "thank you", "thanks", "you", "uh", "um", + "thank you for watching", "please subscribe", + "bye", "bye-bye", ".", "...", "" + ] + + # Check if the result is just a hallucination + text_lower = text.lower().strip() + if text_lower in hallucination_phrases: + logger.debug(f"Filtered out hallucination: {text}") + return None + + # Check for repetitive text (another sign of hallucination) + words = text.lower().split() + if len(words) > 1 and len(set(words)) == 1: + logger.debug(f"Filtered out repetitive text: {text}") + return None + + return { + 'text': text, + 'start_time': 0, + 'end_time': len(audio) / SAMPLE_RATE, + 'is_final': True, + 'confidence': 0.9, + 'language': result.get('language', language) + } + + except Exception as e: + logger.error(f"Error transcribing chunk: {e}") + + return None + + def transcribe_file(self, audio_data: bytes, format: str, config: AudioConfig) -> dict: + """Transcribe a complete audio file""" + try: + # Convert audio to numpy array based on format + if format == "raw_pcm16": + audio = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0 + else: + # Use librosa for other formats + import io + audio, _ = librosa.load(io.BytesIO(audio_data), sr=SAMPLE_RATE) + + # Transcribe with Whisper + if self.model: + result = self.model.transcribe( + audio, + language=None if config.language == "auto" else config.language, + task=config.task or "transcribe", + fp16=self.device == "cuda" + ) + + segments = [] + for seg in result.get('segments', []): + segments.append({ + 'text': seg['text'].strip(), + 'start_time': seg['start'], + 'end_time': seg['end'], + 'confidence': seg.get('avg_logprob', 0) + 1.0 # Convert to 0-1 range + }) + + return { + 'segments': segments, + 'full_text': result['text'].strip(), + 'detected_language': result.get('language', config.language), + 'duration_seconds': len(audio) / SAMPLE_RATE + } + + except Exception as e: + logger.error(f"Error transcribing file: {e}") + + return { + 'segments': [], + 'full_text': '', + 'detected_language': 'unknown', + 'duration_seconds': 0 + } + + +class TranscriptionServicer(transcription_pb2_grpc.TranscriptionServiceServicer): + """gRPC service implementation""" + + def __init__(self): + self.engine = TranscriptionEngine() + self.sessions: Dict[str, TranscriptionSession] = {} + self.start_time = time.time() + + async def StreamTranscribe(self, request_iterator: AsyncIterator[AudioChunk], + context: grpc.aio.ServicerContext) -> AsyncIterator[TranscriptionResult]: + """Bidirectional streaming transcription""" + session_id = None + config = None + audio_buffer = bytearray() + + try: + async for chunk in request_iterator: + # Get session ID and config from first chunk + if not session_id: + session_id = chunk.session_id or str(time.time()) + config = chunk.config or AudioConfig( + language="auto", + task="transcribe" + ) + + # Add audio to buffer + audio_buffer.extend(chunk.audio_data) + + # Process when we have enough audio (3 seconds for better accuracy) + min_bytes = int(SAMPLE_RATE * 3.0 * 2) # 3 seconds of PCM16 + + while len(audio_buffer) >= min_bytes: + # Extract chunk to process + audio_chunk = bytes(audio_buffer[:min_bytes]) + audio_buffer = audio_buffer[min_bytes:] + + # Transcribe + logger.debug(f"Processing audio chunk of {len(audio_chunk)} bytes") + result = self.engine.transcribe_chunk( + audio_chunk, + language=config.language, + vad_enabled=config.vad_enabled if config else False + ) + logger.debug(f"Transcription result: {result}") + + if result: + # Send transcription result + yield TranscriptionResult( + text=result['text'], + start_time=result['start_time'], + end_time=result['end_time'], + is_final=result['is_final'], + confidence=result.get('confidence', 0.9), + language=result.get('language', config.language), + session_id=session_id, + timestamp_ms=int(time.time() * 1000) + ) + + # Process remaining audio + if audio_buffer: + result = self.engine.transcribe_chunk( + bytes(audio_buffer), + language=config.language, + vad_enabled=config.vad_enabled if config else False + ) + + if result: + yield TranscriptionResult( + text=result['text'], + start_time=result['start_time'], + end_time=result['end_time'], + is_final=True, + confidence=result.get('confidence', 0.9), + language=result.get('language', config.language), + session_id=session_id, + timestamp_ms=int(time.time() * 1000) + ) + + except Exception as e: + logger.error(f"Error in StreamTranscribe: {e}") + context.abort(grpc.StatusCode.INTERNAL, str(e)) + + async def TranscribeFile(self, request: AudioFile, context: grpc.aio.ServicerContext) -> TranscriptionResponse: + """Transcribe a complete audio file""" + try: + result = self.engine.transcribe_file( + request.audio_data, + request.format, + request.config + ) + + segments = [ + TranscriptionSegment( + text=seg['text'], + start_time=seg['start_time'], + end_time=seg['end_time'], + confidence=seg['confidence'] + ) + for seg in result['segments'] + ] + + return TranscriptionResponse( + segments=segments, + full_text=result['full_text'], + detected_language=result['detected_language'], + duration_seconds=result['duration_seconds'] + ) + + except Exception as e: + logger.error(f"Error in TranscribeFile: {e}") + context.abort(grpc.StatusCode.INTERNAL, str(e)) + + async def GetCapabilities(self, request: Empty, context: grpc.aio.ServicerContext) -> Capabilities: + """Get service capabilities""" + return Capabilities( + available_models=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"], + supported_languages=["auto", "en", "es", "fr", "de", "it", "pt", "ru", "zh", "ja", "ko"], + supported_formats=["wav", "mp3", "webm", "raw_pcm16"], + max_audio_length_seconds=MAX_AUDIO_LENGTH, + streaming_supported=True, + vad_supported=False # Can be implemented later + ) + + async def HealthCheck(self, request: Empty, context: grpc.aio.ServicerContext) -> HealthStatus: + """Health check endpoint""" + return HealthStatus( + healthy=True, + status="running", + model_loaded=self.engine.model_name, + uptime_seconds=int(time.time() - self.start_time), + active_sessions=len(self.sessions) + ) + + +async def serve_grpc(port: int = 50051): + """Start the gRPC server""" + server = grpc.aio.server( + futures.ThreadPoolExecutor(max_workers=10), + options=[ + ('grpc.max_send_message_length', 100 * 1024 * 1024), # 100MB + ('grpc.max_receive_message_length', 100 * 1024 * 1024), + ] + ) + + servicer = TranscriptionServicer() + transcription_pb2_grpc.add_TranscriptionServiceServicer_to_server(servicer, server) + + server.add_insecure_port(f'[::]:{port}') + await server.start() + + logger.info(f"gRPC server started on port {port}") + await server.wait_for_termination() + + +# WebSocket support for compatibility +async def handle_websocket(websocket, path): + """Handle WebSocket connections for compatibility""" + import websockets + + engine = TranscriptionEngine() + session_id = str(time.time()) + audio_buffer = bytearray() + + try: + # Send connection confirmation + await websocket.send(json.dumps({ + 'type': 'connected', + 'session_id': session_id + })) + + async for message in websocket: + data = json.loads(message) + + if data['type'] == 'audio': + # Decode base64 audio + audio_data = base64.b64decode(data['data']) + audio_buffer.extend(audio_data) + + # Process when we have enough audio + min_bytes = int(SAMPLE_RATE * 0.5 * 2) + + while len(audio_buffer) >= min_bytes: + chunk = bytes(audio_buffer[:min_bytes]) + audio_buffer = audio_buffer[min_bytes:] + + result = engine.transcribe_chunk(chunk) + + if result: + await websocket.send(json.dumps({ + 'type': 'transcription', + 'text': result['text'], + 'start_time': result['start_time'], + 'end_time': result['end_time'], + 'is_final': result['is_final'], + 'timestamp': int(time.time() * 1000) + })) + + elif data['type'] == 'stop': + # Process remaining audio + if audio_buffer: + result = engine.transcribe_chunk(bytes(audio_buffer)) + if result: + await websocket.send(json.dumps({ + 'type': 'transcription', + 'text': result['text'], + 'is_final': True, + 'timestamp': int(time.time() * 1000) + })) + break + + except websockets.exceptions.ConnectionClosed: + logger.info(f"WebSocket connection closed: {session_id}") + except Exception as e: + logger.error(f"WebSocket error: {e}") + + +async def serve_websocket(port: int = 8765): + """Start the WebSocket server""" + import websockets + + logger.info(f"WebSocket server started on port {port}") + async with websockets.serve(handle_websocket, "0.0.0.0", port): + await asyncio.Future() # Run forever + + +async def main(): + """Main entry point""" + grpc_port = int(os.environ.get('GRPC_PORT', '50051')) + ws_port = int(os.environ.get('WEBSOCKET_PORT', '8765')) + enable_websocket = os.environ.get('ENABLE_WEBSOCKET', 'true').lower() == 'true' + + tasks = [serve_grpc(grpc_port)] + + if enable_websocket: + tasks.append(serve_websocket(ws_port)) + + await asyncio.gather(*tasks) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file