initial commit

This commit is contained in:
2025-09-11 09:59:16 +02:00
commit ab17a8ac21
19 changed files with 2587 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
*/*/target/*
*/target/*
.history
*.log
*.tmp
*.bak
*.swp

81
Dockerfile Normal file
View File

@@ -0,0 +1,81 @@
# Use NVIDIA CUDA base image with PyTorch pre-installed for faster builds
# This image includes CUDA, cuDNN, and PyTorch - saving significant build time
FROM nvidia/cuda:12.1.0-cudnn8-runtime-ubuntu22.04 as base
# Set timezone to avoid interactive prompts
ENV DEBIAN_FRONTEND=noninteractive
ENV TZ=UTC
# Install Python 3.11 and system dependencies
RUN apt-get update && apt-get install -y \
python3.11 \
python3.11-dev \
python3-pip \
ffmpeg \
libsndfile1 \
git \
tzdata \
&& rm -rf /var/lib/apt/lists/* \
&& update-alternatives --install /usr/bin/python python /usr/bin/python3.11 1 \
&& update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1
# Set Python environment
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Set working directory
WORKDIR /app
# Install PyTorch with CUDA support first (if not using pre-built image)
# This is much faster than installing via requirements.txt
RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# Copy requirements and install remaining dependencies
COPY requirements.txt .
# Remove torch from requirements since we already installed it
RUN grep -v "^torch" requirements.txt > requirements_no_torch.txt && \
pip install -r requirements_no_torch.txt
# Install grpcio-tools for protobuf generation
RUN pip install grpcio-tools==1.60.0
# Copy proto files and generate gRPC code
COPY proto/ ./proto/
RUN mkdir -p ./src && \
python -m grpc_tools.protoc \
-I./proto \
--python_out=./src \
--grpc_python_out=./src \
./proto/transcription.proto
# Note: Don't modify imports - they work as-is when sys.path is set correctly
# Copy application code
COPY src/ ./src/
COPY entrypoint.sh ./
RUN chmod +x ./entrypoint.sh
# Environment variables
ENV MODEL_PATH=large-v3 \
GRPC_PORT=50051 \
WEBSOCKET_PORT=8765 \
ENABLE_WEBSOCKET=true \
CACHE_DIR=/app/models \
TORCH_HOME=/app/models \
HF_HOME=/app/models
# Create model cache directory
RUN mkdir -p /app/models
# Volume for model cache
VOLUME ["/app/models"]
# Expose ports
EXPOSE 50051 8765
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()" || exit 1
ENTRYPOINT ["/app/entrypoint.sh"]

69
Dockerfile.pytorch Normal file
View File

@@ -0,0 +1,69 @@
# Alternative: Use official PyTorch image with CUDA support
# This image has PyTorch, CUDA, and cuDNN pre-installed
FROM pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime
# Set timezone to avoid interactive prompts
ENV DEBIAN_FRONTEND=noninteractive
ENV TZ=UTC
# Install system dependencies
RUN apt-get update && apt-get install -y \
ffmpeg \
libsndfile1 \
git \
tzdata \
&& rm -rf /var/lib/apt/lists/*
# Set Python environment
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Set working directory
WORKDIR /app
# Copy requirements and install dependencies (excluding torch)
COPY requirements.txt .
RUN grep -v "^torch" requirements.txt > requirements_no_torch.txt && \
pip install -r requirements_no_torch.txt && \
pip install grpcio-tools==1.60.0
# Copy proto files and generate gRPC code
COPY proto/ ./proto/
RUN mkdir -p ./src && \
python -m grpc_tools.protoc \
-I./proto \
--python_out=./src \
--grpc_python_out=./src \
./proto/transcription.proto
# Note: Don't modify imports - they work as-is when sys.path is set correctly
# Copy application code
COPY src/ ./src/
COPY entrypoint.sh ./
RUN chmod +x ./entrypoint.sh
# Environment variables
ENV MODEL_PATH=large-v3 \
GRPC_PORT=50051 \
WEBSOCKET_PORT=8765 \
ENABLE_WEBSOCKET=true \
CACHE_DIR=/app/models \
TORCH_HOME=/app/models \
HF_HOME=/app/models
# Create model cache directory
RUN mkdir -p /app/models
# Volume for model cache
VOLUME ["/app/models"]
# Expose ports
EXPOSE 50051 8765
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()" || exit 1
ENTRYPOINT ["/app/entrypoint.sh"]

102
Makefile Normal file
View File

@@ -0,0 +1,102 @@
# Makefile for Transcription API Service
.PHONY: help build run stop clean test proto rust-client
# Variables
DOCKER_IMAGE = transcription-api
DOCKER_TAG = latest
GRPC_PORT = 50051
WEBSOCKET_PORT = 8765
MODEL = base
help: ## Show this help message
@echo "Usage: make [target]"
@echo ""
@echo "Available targets:"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf " %-15s %s\n", $$1, $$2}'
build: ## Build Docker image
docker build -t $(DOCKER_IMAGE):$(DOCKER_TAG) .
run: ## Run service with docker-compose
MODEL_PATH=$(MODEL) docker compose up -d
@echo "Service started!"
@echo "gRPC endpoint: localhost:$(GRPC_PORT)"
@echo "WebSocket endpoint: ws://localhost:$(WEBSOCKET_PORT)"
run-gpu: ## Run service with GPU support
MODEL_PATH=large-v3 CUDA_VISIBLE_DEVICES=0 docker compose up -d
@echo "Service started with GPU support!"
stop: ## Stop the service
docker compose down
logs: ## Show service logs
docker compose logs -f
clean: ## Clean up containers and volumes
docker compose down -v
docker system prune -f
proto: ## Generate protobuf code
python -m grpc_tools.protoc \
-I./proto \
--python_out=./src \
--grpc_python_out=./src \
./proto/transcription.proto
@echo "Generated Python protobuf code in src/"
rust-client: ## Build Rust client examples
cd examples/rust-client && cargo build --release
@echo "Rust clients built in examples/rust-client/target/release/"
test-grpc: ## Test gRPC connection
@command -v grpcurl >/dev/null 2>&1 || { echo "grpcurl not installed. Install from https://github.com/fullstorydev/grpcurl"; exit 1; }
grpcurl -plaintext localhost:$(GRPC_PORT) list
grpcurl -plaintext localhost:$(GRPC_PORT) transcription.TranscriptionService/HealthCheck
test-websocket: ## Test WebSocket connection
@echo "Testing WebSocket connection..."
@python3 -c "import asyncio, websockets, json; \
async def test(): \
async with websockets.connect('ws://localhost:$(WEBSOCKET_PORT)') as ws: \
data = await ws.recv(); \
print('Connected:', json.loads(data)); \
asyncio.run(test())"
install-deps: ## Install Python dependencies
pip install -r requirements.txt
docker-push: ## Push Docker image to registry
docker tag $(DOCKER_IMAGE):$(DOCKER_TAG) $(DOCKER_REGISTRY)/$(DOCKER_IMAGE):$(DOCKER_TAG)
docker push $(DOCKER_REGISTRY)/$(DOCKER_IMAGE):$(DOCKER_TAG)
# Model management
download-models: ## Download Whisper models
@echo "Downloading Whisper models..."
python -c "import whisper; \
for model in ['tiny', 'base', 'small']: \
print(f'Downloading {model}...'); \
whisper.load_model(model)"
# Development
dev-run: ## Run service locally (without Docker)
cd src && python transcription_server.py
dev-install: ## Install development dependencies
pip install -r requirements.txt
pip install black flake8 pytest pytest-asyncio
format: ## Format Python code
black src/
lint: ## Lint Python code
flake8 src/
# Benchmarking
benchmark: ## Run performance benchmark
@echo "Running transcription benchmark..."
time curl -X POST \
-H "Content-Type: application/octet-stream" \
--data-binary @test_audio.wav \
http://localhost:$(GRPC_PORT)/benchmark

412
README.md Normal file
View File

@@ -0,0 +1,412 @@
# Transcription API Service
A high-performance, standalone transcription service with gRPC and WebSocket support, optimized for real-time speech-to-text applications. Perfect for desktop applications, web services, and IoT devices.
## Features
- **Dual Protocol Support**: Both gRPC (recommended) and WebSocket
- **Real-Time Streaming**: Bidirectional audio streaming with immediate transcription
- **Multiple Models**: Support for all Whisper models (tiny to large-v3)
- **Language Support**: 50+ languages with automatic detection
- **Docker Ready**: Simple deployment with Docker Compose
- **Production Ready**: Health checks, monitoring, and graceful shutdown
- **Rust Client Examples**: Ready-to-use Rust client for desktop applications
## Quick Start
### Using Docker Compose (Recommended)
```bash
# Clone the repository
cd transcription-api
# Start the service (uses 'base' model by default)
docker compose up -d
# Check logs
docker compose logs -f
# Stop the service
docker compose down
```
### Configuration
Edit `.env` or `docker-compose.yml` to configure:
```env
MODEL_PATH=base # tiny, base, small, medium, large, large-v3
GRPC_PORT=50051 # gRPC service port
WEBSOCKET_PORT=8765 # WebSocket service port
ENABLE_WEBSOCKET=true # Enable WebSocket support
CUDA_VISIBLE_DEVICES=0 # GPU device ID (if available)
```
## API Protocols
### gRPC (Recommended for Desktop Apps)
**Why gRPC?**
- Strongly typed with Protocol Buffers
- Excellent performance with HTTP/2
- Built-in streaming support
- Auto-generated client code
- Better error handling
**Proto Definition**: See `proto/transcription.proto`
**Service Methods**:
- `StreamTranscribe`: Bidirectional streaming for real-time transcription
- `TranscribeFile`: Single file transcription
- `GetCapabilities`: Query available models and languages
- `HealthCheck`: Service health status
### WebSocket (Alternative)
**Protocol**:
```javascript
// Connect
ws://localhost:8765
// Send audio
{
"type": "audio",
"data": "base64_encoded_pcm16_audio"
}
// Receive transcription
{
"type": "transcription",
"text": "Hello world",
"start_time": 0.0,
"end_time": 1.5,
"is_final": true,
"timestamp": 1234567890
}
// Stop
{
"type": "stop"
}
```
## Rust Client Usage
### Installation
```toml
# Add to your Cargo.toml
[dependencies]
tonic = "0.10"
tokio = { version = "1.35", features = ["full"] }
# ... see examples/rust-client/Cargo.toml for full list
```
### Live Microphone Transcription
```rust
use transcription_client::TranscriptionClient;
#[tokio::main]
async fn main() -> Result<()> {
// Connect to service
let mut client = TranscriptionClient::connect("http://localhost:50051").await?;
// Start streaming from microphone
let stream = client.stream_from_microphone(
"auto", // language
"transcribe", // task
"base" // model
).await?;
// Process transcriptions
while let Some(transcription) = stream.next().await {
println!("{}", transcription.text);
}
Ok(())
}
```
### Build and Run Examples
```bash
cd examples/rust-client
# Build
cargo build --release
# Run live transcription from microphone
cargo run --bin live-transcribe
# Transcribe a file
cargo run --bin file-transcribe -- audio.wav
# Stream a WAV file
cargo run --bin stream-transcribe -- audio.wav --realtime
```
## Audio Requirements
- **Format**: PCM16 (16-bit signed integer)
- **Sample Rate**: 16kHz
- **Channels**: Mono
- **Chunk Size**: Minimum ~500 bytes (flexible for real-time)
## Performance Optimization
### For Real-Time Applications
1. **Use gRPC**: Lower latency than WebSocket
2. **Small Chunks**: Send audio in 0.5-1 second chunks
3. **Model Selection**:
- `tiny`: Fastest, lowest accuracy (real-time on CPU)
- `base`: Good balance (near real-time on CPU)
- `small`: Better accuracy (may lag on CPU)
- `large-v3`: Best accuracy (requires GPU for real-time)
### GPU Acceleration
```yaml
# docker-compose.yml
environment:
- CUDA_VISIBLE_DEVICES=0
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
```
## Architecture
```
┌─────────────┐
│ Rust App │
│ (Desktop) │
└──────┬──────┘
│ gRPC/HTTP2
┌─────────────┐
│ Transcription│
│ Service │
│ ┌────────┐ │
│ │Whisper │ │
│ │ Model │ │
│ └────────┘ │
└─────────────┘
```
### Components
1. **gRPC Server**: Handles streaming audio and returns transcriptions
2. **WebSocket Server**: Alternative protocol for web clients
3. **Transcription Engine**: Whisper/SimulStreaming for speech-to-text
4. **Session Manager**: Handles multiple concurrent streams
5. **Model Cache**: Prevents re-downloading models
## Advanced Configuration
### Using SimulStreaming
For even lower latency, mount SimulStreaming:
```yaml
volumes:
- ./SimulStreaming:/app/SimulStreaming
environment:
- SIMULSTREAMING_PATH=/app/SimulStreaming
```
### Custom Models
Mount your own Whisper models:
```yaml
volumes:
- ./models:/app/models
environment:
- MODEL_PATH=/app/models/custom-model.pt
```
### Monitoring
The service exposes metrics on `/metrics` (when enabled):
```bash
curl http://localhost:9090/metrics
```
## API Reference
### gRPC Methods
#### StreamTranscribe
```protobuf
rpc StreamTranscribe(stream AudioChunk) returns (stream TranscriptionResult);
```
Bidirectional streaming for real-time transcription. Send audio chunks, receive transcriptions.
#### TranscribeFile
```protobuf
rpc TranscribeFile(AudioFile) returns (TranscriptionResponse);
```
Transcribe a complete audio file in one request.
#### GetCapabilities
```protobuf
rpc GetCapabilities(Empty) returns (Capabilities);
```
Query available models, languages, and features.
#### HealthCheck
```protobuf
rpc HealthCheck(Empty) returns (HealthStatus);
```
Check service health and status.
## Language Support
Supports 50+ languages including:
- English (en)
- Spanish (es)
- French (fr)
- German (de)
- Italian (it)
- Portuguese (pt)
- Russian (ru)
- Chinese (zh)
- Japanese (ja)
- Korean (ko)
- And many more...
Use `"auto"` for automatic language detection.
## Troubleshooting
### Service won't start
- Check if ports 50051 and 8765 are available
- Ensure Docker has enough memory (minimum 4GB)
- Check logs: `docker compose logs transcription-api`
### Slow transcription
- Use a smaller model (tiny or base)
- Enable GPU if available
- Reduce audio quality to 16kHz mono
- Send smaller chunks more frequently
### Connection refused
- Check firewall settings
- Ensure service is running: `docker compose ps`
- Verify correct ports in client configuration
### High memory usage
- Models are cached in memory for performance
- Use smaller models for limited memory systems
- Set memory limits in docker-compose.yml
## Development
### Building from Source
```bash
# Install dependencies
pip install -r requirements.txt
# Generate gRPC code
python -m grpc_tools.protoc \
-I./proto \
--python_out=./src \
--grpc_python_out=./src \
./proto/transcription.proto
# Run the service
python src/transcription_server.py
```
### Running Tests
```bash
# Test gRPC connection
grpcurl -plaintext localhost:50051 list
# Test health check
grpcurl -plaintext localhost:50051 transcription.TranscriptionService/HealthCheck
# Test with example audio
python test_client.py
```
## Production Deployment
### Docker Swarm
```bash
docker stack deploy -c docker-compose.yml transcription
```
### Kubernetes
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: transcription-api
spec:
replicas: 3
selector:
matchLabels:
app: transcription-api
template:
metadata:
labels:
app: transcription-api
spec:
containers:
- name: transcription-api
image: transcription-api:latest
ports:
- containerPort: 50051
name: grpc
- containerPort: 8765
name: websocket
env:
- name: MODEL_PATH
value: "base"
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
```
### Security
For production:
1. Enable TLS for gRPC
2. Use WSS for WebSocket
3. Add authentication
4. Rate limiting
5. Input validation
## License
MIT License - See LICENSE file for details
## Contributing
Contributions welcome! Please read CONTRIBUTING.md for guidelines.
## Support
- GitHub Issues: [Report bugs or request features]
- Documentation: [Full API documentation]
- Examples: See `examples/` directory

78
build.sh Executable file
View File

@@ -0,0 +1,78 @@
#!/bin/bash
# Build script with options for different configurations
set -e
# Default values
DOCKERFILE="Dockerfile"
USE_CACHE=true
PLATFORM="linux/amd64"
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--pytorch)
DOCKERFILE="Dockerfile.pytorch"
echo "Using PyTorch base image (faster build)"
shift
;;
--cuda)
DOCKERFILE="Dockerfile"
echo "Using NVIDIA CUDA base image"
shift
;;
--no-cache)
USE_CACHE=false
echo "Building without cache"
shift
;;
--platform)
PLATFORM="$2"
echo "Building for platform: $PLATFORM"
shift 2
;;
--help)
echo "Usage: ./build.sh [options]"
echo "Options:"
echo " --pytorch Use PyTorch base image (fastest)"
echo " --cuda Use NVIDIA CUDA base image (default)"
echo " --no-cache Build without using cache"
echo " --platform Target platform (default: linux/amd64)"
echo " --help Show this help message"
exit 0
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done
# Build command
BUILD_CMD="docker build"
if [ "$USE_CACHE" = false ]; then
BUILD_CMD="$BUILD_CMD --no-cache"
fi
BUILD_CMD="$BUILD_CMD --platform $PLATFORM -f $DOCKERFILE -t transcription-api:latest ."
echo "Building transcription-api..."
echo "Command: $BUILD_CMD"
# Execute build
eval $BUILD_CMD
if [ $? -eq 0 ]; then
echo ""
echo "Build successful!"
echo ""
echo "To run the service:"
echo " docker compose up -d"
echo ""
echo "Or with GPU support:"
echo " docker compose --profile gpu up -d"
else
echo "Build failed!"
exit 1
fi

105
docker-compose.yml Normal file
View File

@@ -0,0 +1,105 @@
services:
# Main service with GPU support (if available)
# For CPU-only: Use --profile cpu or set CUDA_VISIBLE_DEVICES=""
transcription-api:
build:
context: .
dockerfile: ${DOCKERFILE:-Dockerfile} # Can use Dockerfile.pytorch for faster builds
container_name: transcription-api
environment:
# Model configuration
- MODEL_PATH=${MODEL_PATH:-large-v3} # Options: tiny, base, small, medium, large, large-v2, large-v3
# Model cache paths (shared with main project)
- HF_HOME=/app/models
- TORCH_HOME=/app/models
- TRANSFORMERS_CACHE=/app/models
# Server ports
- GRPC_PORT=50051
- WEBSOCKET_PORT=8765
- ENABLE_WEBSOCKET=true
# Performance tuning
- OMP_NUM_THREADS=4
- CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-0} # GPU 0 by default
ports:
- "50051:50051" # gRPC port
- "8765:8765" # WebSocket port
volumes:
# Model cache - prevents re-downloading models
- whisper-models:/app/models
# Optional: Mount SimulStreaming if available
# - ./SimulStreaming:/app/SimulStreaming
restart: unless-stopped
# Resource limits (adjust based on your system)
deploy:
resources:
limits:
cpus: '4'
memory: 8G
reservations:
cpus: '2'
memory: 4G
# GPU support (requires nvidia-docker2 or Docker 19.03+)
devices:
- driver: nvidia
count: all
capabilities: [gpu]
# Health check
healthcheck:
test: ["CMD", "python", "-c", "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
# CPU-only service (for systems without GPU)
transcription-api-cpu:
profiles: ["cpu"] # Only start with --profile cpu
build:
context: .
dockerfile: ${DOCKERFILE:-Dockerfile.pytorch}
container_name: transcription-api-cpu
environment:
- MODEL_PATH=${MODEL_PATH:-base} # Smaller model for CPU
- HF_HOME=/app/models
- TORCH_HOME=/app/models
- TRANSFORMERS_CACHE=/app/models
- GRPC_PORT=50051
- WEBSOCKET_PORT=8765
- ENABLE_WEBSOCKET=true
- CUDA_VISIBLE_DEVICES= # No GPU
ports:
- "50051:50051"
- "8765:8765"
volumes:
- whisper-models:/app/models
deploy:
resources:
limits:
cpus: '4'
memory: 8G
reservations:
cpus: '2'
memory: 4G
# No GPU devices for CPU profile
healthcheck:
test: ["CMD", "python", "-c", "import grpc; channel = grpc.insecure_channel('localhost:50051'); channel.channel_ready()"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
volumes:
# Share models with the main transcription project
# This references the volume from the parent project
whisper-models:
external: true
name: real-time-transcriptions_whisper-models

59
entrypoint.sh Normal file
View File

@@ -0,0 +1,59 @@
#!/bin/bash
set -e
# Download model if not already cached
# Whisper stores models as .pt files in the root of the cache directory
if [ ! -z "$MODEL_PATH" ]; then
MODEL_FILE="/app/models/$MODEL_PATH.pt"
# Check if model file exists
if [ ! -f "$MODEL_FILE" ]; then
echo "Model $MODEL_PATH not found at $MODEL_FILE, downloading..."
python -c "
import whisper
import os
# Set all cache paths to use the shared volume
os.environ['TORCH_HOME'] = '/app/models'
os.environ['HF_HOME'] = '/app/models'
os.environ['TRANSFORMERS_CACHE'] = '/app/models'
os.environ['XDG_CACHE_HOME'] = '/app/models'
model_name = '$MODEL_PATH'
print(f'Downloading model {model_name}...')
model = whisper.load_model(model_name, download_root='/app/models')
print(f'Model {model_name} downloaded and cached successfully')
"
else
echo "Model $MODEL_PATH already cached at $MODEL_FILE"
# Just verify it loads properly
python -c "
import whisper
import os
os.environ['TORCH_HOME'] = '/app/models'
os.environ['XDG_CACHE_HOME'] = '/app/models'
model = whisper.load_model('$MODEL_PATH', download_root='/app/models')
print(f'Model $MODEL_PATH loaded successfully from cache')
"
fi
fi
# Generate gRPC code if not already generated
if [ ! -f "/app/src/transcription_pb2.py" ]; then
echo "Generating gRPC code from proto files..."
python -m grpc_tools.protoc \
-I/app/proto \
--python_out=/app/src \
--grpc_python_out=/app/src \
/app/proto/transcription.proto
# Fix imports in generated files (keep absolute import)
# No need to modify - the generated import should work as-is
fi
# Start the transcription server
echo "Starting Transcription API Server..."
echo "gRPC Port: $GRPC_PORT"
echo "WebSocket Port: $WEBSOCKET_PORT (Enabled: $ENABLE_WEBSOCKET)"
echo "Model: $MODEL_PATH"
cd /app/src
exec python transcription_server.py

View File

@@ -0,0 +1,42 @@
[package]
name = "transcription-client"
version = "0.1.0"
edition = "2021"
[dependencies]
# gRPC and Protocol Buffers
tonic = "0.10"
prost = "0.12"
tokio = { version = "1.35", features = ["full"] }
tokio-stream = "0.1"
# Audio handling
hound = "3.5" # WAV file reading/writing
cpal = "0.15" # Cross-platform audio capture
rodio = "0.17" # Audio playback
# Utilities
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
clap = { version = "4.4", features = ["derive"] }
futures-util = "0.3"
[build-dependencies]
tonic-build = "0.10"
[[bin]]
name = "stream-transcribe"
path = "src/stream_transcribe.rs"
[[bin]]
name = "file-transcribe"
path = "src/file_transcribe.rs"
[[bin]]
name = "live-transcribe"
path = "src/live_transcribe.rs"
[[bin]]
name = "realtime-playback"
path = "src/realtime_playback.rs"

View File

@@ -0,0 +1,126 @@
# Rust Transcription Client Examples
This directory contains Rust client examples for the Transcription API service.
## Available Clients
### 1. `file-transcribe` - File Transcription
Transcribe audio files either by sending the entire file or streaming in real-time chunks.
```bash
# Send entire file at once (fast, but no real-time feedback)
cargo run --bin file-transcribe -- audio.wav
# Stream file in chunks for real-time transcription (like YouTube)
cargo run --bin file-transcribe -- audio.wav --stream
# With VAD (Voice Activity Detection) to filter silence
cargo run --bin file-transcribe -- audio.wav --stream --vad
# Specify model and language
cargo run --bin file-transcribe -- audio.wav --stream --model large-v3 --language en
```
### 2. `realtime-playback` - Play Audio with Live Transcription
Plays audio through your speakers while showing real-time transcriptions, similar to YouTube's live captions.
```bash
# Basic usage - plays audio and shows transcriptions
cargo run --bin realtime-playback -- audio.wav
# With timestamps for each transcription
cargo run --bin realtime-playback -- audio.wav --timestamps
# With VAD to reduce noise transcriptions
cargo run --bin realtime-playback -- audio.wav --vad
# Using a specific model
cargo run --bin realtime-playback -- audio.wav --model large-v3
```
### 3. `stream-transcribe` - Stream WAV Files
Streams WAV files chunk by chunk for transcription.
```bash
# Stream without delays (fast processing)
cargo run --bin stream-transcribe -- audio.wav
# Simulate real-time streaming with delays
cargo run --bin stream-transcribe -- audio.wav --realtime
```
### 4. `live-transcribe` - Live Microphone Transcription
Captures audio from your microphone and transcribes in real-time.
```bash
# Use default microphone
cargo run --bin live-transcribe
# Specify server and language
cargo run --bin live-transcribe -- --server http://localhost:50051 --language en
```
## Building
```bash
# Build all binaries
cargo build --release
# Build specific binary
cargo build --release --bin realtime-playback
```
## Common Options
All clients support these common options:
- `--server <URL>` - gRPC server address (default: http://localhost:50051)
- `--language <code>` - Language code: en, es, fr, de, etc., or "auto" (default: auto)
- `--model <name>` - Model to use: tiny, base, small, medium, large-v3 (default: base)
- `--vad` - Enable Voice Activity Detection to filter silence
## Features
### Real-time Streaming
The `--stream` flag in `file-transcribe` and the `realtime-playback` binary both support real-time streaming, which means:
- Audio is sent in small chunks (0.5 second intervals)
- Transcriptions appear as the audio is being processed
- Similar experience to YouTube's live captions
- Lower latency compared to sending entire file
### Voice Activity Detection (VAD)
When `--vad` is enabled, the service will:
- Filter out silence and background noise
- Reduce false transcriptions (like repeated "Thank you")
- Improve transcription quality for speech-only content
### Audio Playback
The `realtime-playback` binary uses the `rodio` library to:
- Play audio through your system's default audio output
- Synchronize playback with transcription display
- Support multiple audio formats (WAV, MP3, FLAC, etc.)
## Requirements
- Rust 1.70 or later
- The Transcription API server running (usually on localhost:50051)
- For live transcription: A working microphone
- For playback: Audio output device (speakers/headphones)
## Troubleshooting
### "Connection refused" error
Make sure the Transcription API server is running:
```bash
cd ../../
docker compose up
```
### No audio playback
- Check your system's default audio output device
- Ensure the audio file format is supported (WAV, MP3, FLAC)
- Try with a different audio file
### Poor transcription quality
- Use a larger model (e.g., `--model large-v3`)
- Enable VAD to filter noise (`--vad`)
- Ensure audio quality is good (16kHz or higher recommended)

View File

@@ -0,0 +1,10 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Compile protobuf files
tonic_build::configure()
.build_server(false) // We only need the client
.compile(
&["../../proto/transcription.proto"],
&["../../proto"],
)?;
Ok(())
}

View File

@@ -0,0 +1,232 @@
//! File transcription using gRPC
//!
//! This example shows how to transcribe an audio file.
//! Use --stream flag for real-time streaming instead of sending the entire file.
use anyhow::Result;
use clap::Parser;
use std::fs;
use tonic::transport::Channel;
use tracing::{info, debug};
use futures_util::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::time::Duration;
use tokio::time;
// Import generated protobuf types
pub mod transcription {
tonic::include_proto!("transcription");
}
use transcription::{
transcription_service_client::TranscriptionServiceClient, AudioConfig, AudioFile, AudioChunk,
};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Audio file path
file: String,
/// gRPC server address
#[arg(short, long, default_value = "http://localhost:50051")]
server: String,
/// Language code (e.g., "en", "es", "auto")
#[arg(short, long, default_value = "auto")]
language: String,
/// Task: transcribe or translate
#[arg(short, long, default_value = "transcribe")]
task: String,
/// Model to use
#[arg(short, long, default_value = "base")]
model: String,
/// Stream the file in chunks for real-time transcription
#[arg(long)]
stream: bool,
/// Enable VAD (Voice Activity Detection)
#[arg(short, long)]
vad: bool,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt::init();
let args = Args::parse();
info!("Reading audio file: {}", args.file);
info!("Connecting to transcription service at {}", args.server);
let mut client = TranscriptionServiceClient::connect(args.server.clone()).await?;
if args.stream {
// Stream mode - send file in chunks for real-time transcription
stream_file(&mut client, &args).await?;
} else {
// Normal mode - send entire file at once
transcribe_entire_file(&mut client, &args).await?;
}
Ok(())
}
async fn transcribe_entire_file(
client: &mut TranscriptionServiceClient<Channel>,
args: &Args,
) -> Result<()> {
let audio_data = fs::read(&args.file)?;
// Determine format from extension
let format = match args.file.split('.').last() {
Some("wav") => "wav",
Some("mp3") => "mp3",
Some("webm") => "webm",
_ => "wav", // Default to WAV
};
let config = AudioConfig {
language: args.language.clone(),
task: args.task.clone(),
model: args.model.clone(),
sample_rate: 16000,
vad_enabled: args.vad,
};
let request = AudioFile {
audio_data,
format: format.to_string(),
config: Some(config),
};
info!("Sending entire file for transcription...");
let response = client.transcribe_file(request).await?;
let result = response.into_inner();
println!("\n=== Transcription Results ===");
println!("Language: {}", result.detected_language);
println!("Duration: {:.2} seconds", result.duration_seconds);
println!("\nFull Text:");
println!("{}", result.full_text);
if !result.segments.is_empty() {
println!("\n=== Segments ===");
for (i, segment) in result.segments.iter().enumerate() {
println!(
"[{:03}] {:.2}s - {:.2}s (conf: {:.2}): {}",
i + 1,
segment.start_time,
segment.end_time,
segment.confidence,
segment.text
);
}
}
Ok(())
}
async fn stream_file(
client: &mut TranscriptionServiceClient<Channel>,
args: &Args,
) -> Result<()> {
let audio_data = fs::read(&args.file)?;
info!("Streaming file in real-time chunks...");
// Create channel for audio chunks
let (tx, rx) = mpsc::channel::<AudioChunk>(100);
// Spawn task to send audio chunks
let tx_clone = tx.clone();
let language = args.language.clone();
let task = args.task.clone();
let model = args.model.clone();
let vad = args.vad;
tokio::spawn(async move {
// Send configuration first
let config = AudioConfig {
language,
task,
model,
sample_rate: 16000,
vad_enabled: vad,
};
let config_chunk = AudioChunk {
audio_data: vec![],
session_id: "file-stream".to_string(),
config: Some(config),
};
if tx_clone.send(config_chunk).await.is_err() {
return;
}
// Assuming PCM16 audio at 16kHz
// Send in 3 second chunks for better accuracy (96000 bytes = 48000 samples = 3 seconds)
let chunk_size = 96000;
for (idx, chunk) in audio_data.chunks(chunk_size).enumerate() {
let audio_chunk = AudioChunk {
audio_data: chunk.to_vec(),
session_id: String::new(),
config: None,
};
debug!("Sending chunk {} ({} bytes)", idx, chunk.len());
if tx_clone.send(audio_chunk).await.is_err() {
break;
}
// Simulate real-time streaming (3 seconds per chunk)
time::sleep(Duration::from_secs(3)).await;
}
info!("Finished streaming all chunks");
});
// Create stream and start transcription
let stream = ReceiverStream::new(rx);
let response = client.stream_transcribe(stream).await?;
let mut result_stream = response.into_inner();
println!("\n=== Real-time Transcription ===");
println!("Streaming and transcribing...\n");
let mut full_transcript = String::new();
// Process results
while let Some(result) = result_stream.next().await {
match result {
Ok(transcription) => {
println!("[{:06.2}s - {:06.2}s] {}",
transcription.start_time,
transcription.end_time,
transcription.text);
if transcription.is_final {
full_transcript.push_str(&transcription.text);
full_transcript.push(' ');
}
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
println!("\n=== Full Transcript ===");
println!("{}", full_transcript.trim());
Ok(())
}

View File

@@ -0,0 +1,188 @@
//! Live microphone transcription using gRPC streaming
//!
//! This example shows how to capture audio from the microphone
//! and stream it to the transcription service in real-time.
use anyhow::Result;
use clap::Parser;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use futures_util::StreamExt;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc as tokio_mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info};
// Import generated protobuf types
pub mod transcription {
tonic::include_proto!("transcription");
}
use transcription::{
transcription_service_client::TranscriptionServiceClient, AudioChunk, AudioConfig,
};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// gRPC server address
#[arg(short, long, default_value = "http://localhost:50051")]
server: String,
/// Language code (e.g., "en", "es", "auto")
#[arg(short, long, default_value = "auto")]
language: String,
/// Task: transcribe or translate
#[arg(short, long, default_value = "transcribe")]
task: String,
/// Model to use
#[arg(short, long, default_value = "base")]
model: String,
/// Session ID
#[arg(long)]
session_id: Option<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt::init();
let args = Args::parse();
info!("Connecting to transcription service at {}", args.server);
let mut client = TranscriptionServiceClient::connect(args.server).await?;
// Create channel for audio data
let (audio_tx, audio_rx) = tokio_mpsc::channel::<AudioChunk>(100);
// Start audio capture in a separate thread
let audio_tx_clone = audio_tx.clone();
std::thread::spawn(move || {
if let Err(e) = capture_audio(audio_tx_clone) {
error!("Audio capture error: {}", e);
}
});
// Send initial configuration
let session_id = args.session_id.unwrap_or_else(|| {
format!("rust-client-{}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs())
});
info!("Starting transcription session: {}", session_id);
// Create the first chunk with configuration
let config = AudioConfig {
language: args.language.clone(),
task: args.task.clone(),
model: args.model.clone(),
sample_rate: 16000,
vad_enabled: false,
};
// Send a configuration chunk first
let config_chunk = AudioChunk {
audio_data: vec![],
session_id: session_id.clone(),
config: Some(config),
};
audio_tx.send(config_chunk).await?;
// Create stream from receiver
let audio_stream = ReceiverStream::new(audio_rx);
// Start bidirectional streaming
let response = client.stream_transcribe(audio_stream).await?;
let mut stream = response.into_inner();
info!("Listening... Press Ctrl+C to stop");
// Process transcription results
while let Some(result) = stream.next().await {
match result {
Ok(transcription) => {
if transcription.is_final {
println!("\n[FINAL] {}", transcription.text);
} else {
print!("\r[PARTIAL] {} ", transcription.text);
use std::io::{self, Write};
io::stdout().flush()?;
}
}
Err(e) => {
error!("Transcription error: {}", e);
break;
}
}
}
Ok(())
}
/// Capture audio from the default microphone
fn capture_audio(tx: tokio_mpsc::Sender<AudioChunk>) -> Result<()> {
let host = cpal::default_host();
let device = host.default_input_device()
.ok_or_else(|| anyhow::anyhow!("No input device available"))?;
info!("Using audio device: {}", device.name()?);
// Configure audio capture for 16kHz mono PCM16
let config = cpal::StreamConfig {
channels: 1,
sample_rate: cpal::SampleRate(16000),
buffer_size: cpal::BufferSize::Default,
};
// Buffer to accumulate audio samples
let buffer = Arc::new(Mutex::new(Vec::new()));
let buffer_clone = buffer.clone();
// Create audio stream
let stream = device.build_input_stream(
&config,
move |data: &[i16], _: &cpal::InputCallbackInfo| {
let mut buf = buffer_clone.lock().unwrap();
buf.extend_from_slice(data);
// Send chunks of ~0.5 seconds (8000 samples at 16kHz)
while buf.len() >= 8000 {
let chunk: Vec<i16> = buf.drain(..8000).collect();
// Convert i16 to bytes
let bytes: Vec<u8> = chunk.iter()
.flat_map(|&sample| sample.to_le_bytes())
.collect();
// Send audio chunk
let audio_chunk = AudioChunk {
audio_data: bytes,
session_id: String::new(), // Already set in config chunk
config: None,
};
// Use blocking send since we're in a non-async context
if let Err(e) = tx.blocking_send(audio_chunk) {
error!("Failed to send audio chunk: {}", e);
}
}
},
move |err| {
error!("Audio stream error: {}", err);
},
None,
)?;
stream.play()?;
// Keep the stream alive
std::thread::park();
Ok(())
}

View File

@@ -0,0 +1,216 @@
//! Real-time audio playback with synchronized transcription
//!
//! This example plays an audio file while streaming it for transcription,
//! showing transcriptions in real-time similar to YouTube.
use anyhow::Result;
use clap::Parser;
use futures_util::StreamExt;
use rodio::{Decoder, OutputStream, Source};
use std::fs::File;
use std::io::BufReader;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::time;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info, debug};
use hound::WavReader;
// Import generated protobuf types
pub mod transcription {
tonic::include_proto!("transcription");
}
use transcription::{
transcription_service_client::TranscriptionServiceClient, AudioChunk, AudioConfig,
};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Audio file path (WAV, MP3, FLAC, etc.)
file: String,
/// gRPC server address
#[arg(short, long, default_value = "http://localhost:50051")]
server: String,
/// Language code (e.g., "en", "es", "auto")
#[arg(short, long, default_value = "auto")]
language: String,
/// Model to use
#[arg(short, long, default_value = "base")]
model: String,
/// Enable VAD (Voice Activity Detection)
#[arg(short, long)]
vad: bool,
/// Show timestamps
#[arg(short = 't', long)]
timestamps: bool,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt::init();
let args = Args::parse();
let file_path = args.file.clone();
info!("Loading audio file: {}", file_path);
// Start audio playback in a separate thread
let (_stream, stream_handle) = OutputStream::try_default()?;
let file = BufReader::new(File::open(&file_path)?);
let source = Decoder::new(file)?;
let sample_rate = source.sample_rate();
let channels = source.channels();
// Convert to f32 samples for playback
let source = source.convert_samples::<f32>();
info!("Audio format: {} Hz, {} channels", sample_rate, channels);
// Also open the file for streaming to transcription service
// We need to read the raw audio data for transcription
let mut wav_reader = WavReader::open(&file_path)?;
let wav_spec = wav_reader.spec();
// Collect all samples for streaming
let samples: Vec<i16> = wav_reader.samples::<i16>()
.collect::<hound::Result<Vec<_>>>()?;
info!("Connecting to transcription service at {}", args.server);
let mut client = TranscriptionServiceClient::connect(args.server.clone()).await?;
// Create channel for audio chunks
let (tx, rx) = mpsc::channel::<AudioChunk>(100);
// Calculate chunk duration for synchronization
let chunk_samples = 16000 * 3; // 3 second chunks at 16kHz for better accuracy
let chunk_duration = Duration::from_secs(3);
// Start playback
println!("\n🎵 Starting audio playback with real-time transcription...\n");
println!("{}", "".repeat(80));
let start_time = Instant::now();
// Play audio
stream_handle.play_raw(source)?;
// Spawn task to stream audio chunks to transcription service
let tx_clone = tx.clone();
let show_timestamps = args.timestamps;
tokio::spawn(async move {
// Send configuration first
let config = AudioConfig {
language: args.language.clone(),
task: "transcribe".to_string(),
model: args.model.clone(),
sample_rate: 16000,
vad_enabled: args.vad,
};
let config_chunk = AudioChunk {
audio_data: vec![],
session_id: "realtime-playback".to_string(),
config: Some(config),
};
if tx_clone.send(config_chunk).await.is_err() {
return;
}
// Stream audio chunks synchronized with playback
for (chunk_idx, chunk) in samples.chunks(chunk_samples).enumerate() {
let chunk_start = Instant::now();
// Convert samples to bytes
let bytes: Vec<u8> = chunk.iter()
.flat_map(|&s| s.to_le_bytes())
.collect();
let audio_chunk = AudioChunk {
audio_data: bytes,
session_id: String::new(),
config: None,
};
debug!("Sending chunk {} ({} samples)", chunk_idx, chunk.len());
if tx_clone.send(audio_chunk).await.is_err() {
break;
}
// Synchronize with playback timing
let elapsed = chunk_start.elapsed();
if elapsed < chunk_duration {
time::sleep(chunk_duration - elapsed).await;
}
}
info!("Finished streaming audio chunks");
});
// Create stream and start transcription
let stream = ReceiverStream::new(rx);
let response = client.stream_transcribe(stream).await?;
let mut result_stream = response.into_inner();
// Process transcription results
let mut last_text = String::new();
let mut current_line = String::new();
while let Some(result) = result_stream.next().await {
match result {
Ok(transcription) => {
let elapsed = start_time.elapsed().as_secs_f32();
// Clear previous line if text has changed significantly
if !transcription.text.is_empty() && transcription.text != last_text {
if show_timestamps {
// Show with timestamps
println!("[{:06.2}s] {}",
elapsed,
transcription.text);
} else {
// Update current line for continuous display
if transcription.is_final {
// Final transcription for this segment
println!("{}", transcription.text);
current_line.clear();
} else {
// Interim result - update in place
print!("\r{:<80}", transcription.text);
use std::io::{self, Write};
io::stdout().flush()?;
current_line = transcription.text.clone();
}
}
last_text = transcription.text.clone();
}
}
Err(e) => {
eprintln!("\nTranscription error: {}", e);
break;
}
}
}
// Clear any remaining interim text
if !current_line.is_empty() {
println!();
}
println!("\n{}", "".repeat(80));
println!("✅ Playback and transcription complete!");
// Keep the program alive until playback finishes
time::sleep(Duration::from_secs(2)).await;
Ok(())
}

View File

@@ -0,0 +1,142 @@
//! Stream WAV file for real-time transcription
//!
//! This example shows how to stream a WAV file chunk by chunk
//! to simulate real-time transcription.
use anyhow::Result;
use clap::Parser;
use futures_util::StreamExt;
use hound::WavReader;
use std::fs::File;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tracing::info;
// Import generated protobuf types
pub mod transcription {
tonic::include_proto!("transcription");
}
use transcription::{
transcription_service_client::TranscriptionServiceClient, AudioChunk, AudioConfig,
};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// WAV file path
file: String,
/// gRPC server address
#[arg(short, long, default_value = "http://localhost:50051")]
server: String,
/// Language code (e.g., "en", "es", "auto")
#[arg(short, long, default_value = "auto")]
language: String,
/// Simulate real-time by adding delays
#[arg(short, long)]
realtime: bool,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt::init();
let args = Args::parse();
info!("Reading WAV file: {}", args.file);
let mut reader = WavReader::open(&args.file)?;
let spec = reader.spec();
info!("WAV specs: {} Hz, {} channels, {} bits",
spec.sample_rate, spec.channels, spec.bits_per_sample);
// Collect samples
let samples: Vec<i16> = reader.samples::<i16>()
.collect::<hound::Result<Vec<_>>>()?;
info!("Connecting to transcription service at {}", args.server);
let mut client = TranscriptionServiceClient::connect(args.server).await?;
// Create channel for audio chunks
let (tx, rx) = mpsc::channel::<AudioChunk>(100);
// Spawn task to send audio chunks
let tx_clone = tx.clone();
let realtime = args.realtime;
tokio::spawn(async move {
// Send configuration first
let config = AudioConfig {
language: args.language.clone(),
task: "transcribe".to_string(),
model: "base".to_string(),
sample_rate: 16000,
vad_enabled: false,
};
let config_chunk = AudioChunk {
audio_data: vec![],
session_id: "stream-test".to_string(),
config: Some(config),
};
if tx_clone.send(config_chunk).await.is_err() {
return;
}
// Send audio in chunks of 3 seconds for better accuracy (48000 samples at 16kHz)
let chunk_size = 48000;
for chunk in samples.chunks(chunk_size) {
// Convert samples to bytes
let bytes: Vec<u8> = chunk.iter()
.flat_map(|&s| s.to_le_bytes())
.collect();
let audio_chunk = AudioChunk {
audio_data: bytes,
session_id: String::new(),
config: None,
};
if tx_clone.send(audio_chunk).await.is_err() {
break;
}
// Simulate real-time streaming
if realtime {
time::sleep(Duration::from_secs(3)).await;
}
}
});
// Create stream and start transcription
let stream = ReceiverStream::new(rx);
let response = client.stream_transcribe(stream).await?;
let mut result_stream = response.into_inner();
info!("Streaming audio and receiving transcriptions...");
// Process results
while let Some(result) = result_stream.next().await {
match result {
Ok(transcription) => {
println!("[{:.2}s - {:.2}s] {}",
transcription.start_time,
transcription.end_time,
transcription.text);
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
Ok(())
}

84
generate_proto.py Normal file
View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""
Generate Python code from protobuf definitions
Run this before starting the service for the first time
"""
import os
import sys
import subprocess
def generate_proto():
"""Generate Python code from proto files"""
proto_dir = "proto"
output_dir = "src"
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Find all .proto files
proto_files = [f for f in os.listdir(proto_dir) if f.endswith('.proto')]
if not proto_files:
print("No .proto files found in proto/ directory")
return False
for proto_file in proto_files:
proto_path = os.path.join(proto_dir, proto_file)
print(f"Generating code for {proto_file}...")
# Generate Python code
cmd = [
sys.executable, "-m", "grpc_tools.protoc",
f"-I{proto_dir}",
f"--python_out={output_dir}",
f"--grpc_python_out={output_dir}",
proto_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"✓ Generated {proto_file.replace('.proto', '_pb2.py')} and {proto_file.replace('.proto', '_pb2_grpc.py')}")
except subprocess.CalledProcessError as e:
print(f"✗ Failed to generate code for {proto_file}")
print(f" Error: {e.stderr}")
return False
# Fix imports in generated files
print("Fixing imports in generated files...")
grpc_file = os.path.join(output_dir, "transcription_pb2_grpc.py")
if os.path.exists(grpc_file):
with open(grpc_file, 'r') as f:
content = f.read()
# Fix relative import
content = content.replace(
"import transcription_pb2",
"from . import transcription_pb2"
)
with open(grpc_file, 'w') as f:
f.write(content)
print("✓ Fixed imports")
print("\nProtobuf generation complete!")
print(f"Generated files are in {output_dir}/")
return True
if __name__ == "__main__":
# Check if grpcio-tools is installed
try:
import grpc_tools
except ImportError:
print("Error: grpcio-tools not installed")
print("Run: pip install grpcio-tools")
sys.exit(1)
if generate_proto():
print("\nYou can now start the service with:")
print(" python src/transcription_server.py")
print("Or with Docker:")
print(" docker compose up")
else:
sys.exit(1)

91
proto/transcription.proto Normal file
View File

@@ -0,0 +1,91 @@
syntax = "proto3";
package transcription;
// The transcription service provides real-time speech-to-text capabilities
service TranscriptionService {
// Bidirectional streaming: send audio chunks, receive transcriptions
rpc StreamTranscribe(stream AudioChunk) returns (stream TranscriptionResult);
// Unary call for single audio file transcription
rpc TranscribeFile(AudioFile) returns (TranscriptionResponse);
// Get available models and languages
rpc GetCapabilities(Empty) returns (Capabilities);
// Health check
rpc HealthCheck(Empty) returns (HealthStatus);
}
// Audio chunk for streaming
message AudioChunk {
bytes audio_data = 1; // PCM16 audio data (16-bit, 16kHz, mono)
string session_id = 2; // Optional session ID for tracking
AudioConfig config = 3; // Optional config (only needed in first chunk)
}
// Audio configuration
message AudioConfig {
string language = 1; // Language code (e.g., "en", "es", "auto")
string task = 2; // "transcribe" or "translate"
string model = 3; // Model size: "tiny", "base", "small", "medium", "large-v3"
int32 sample_rate = 4; // Sample rate (default: 16000)
bool vad_enabled = 5; // Voice Activity Detection
}
// Transcription result for streaming
message TranscriptionResult {
string text = 1; // Transcribed text
float start_time = 2; // Start time in seconds
float end_time = 3; // End time in seconds
bool is_final = 4; // Is this a final result?
float confidence = 5; // Confidence score (0-1)
string language = 6; // Detected language
string session_id = 7; // Session ID for tracking
int64 timestamp_ms = 8; // Server timestamp in milliseconds
}
// Complete audio file for transcription
message AudioFile {
bytes audio_data = 1; // Complete audio file data
string format = 2; // Format: "wav", "mp3", "webm", "raw_pcm16"
AudioConfig config = 3; // Audio configuration
}
// Response for file transcription
message TranscriptionResponse {
repeated TranscriptionSegment segments = 1;
string full_text = 2; // Complete transcription
string detected_language = 3;
float duration_seconds = 4;
}
// Transcription segment
message TranscriptionSegment {
string text = 1;
float start_time = 2;
float end_time = 3;
float confidence = 4;
}
// Service capabilities
message Capabilities {
repeated string available_models = 1;
repeated string supported_languages = 2;
repeated string supported_formats = 3;
int32 max_audio_length_seconds = 4;
bool streaming_supported = 5;
bool vad_supported = 6;
}
// Health status
message HealthStatus {
bool healthy = 1;
string status = 2;
string model_loaded = 3;
int64 uptime_seconds = 4;
int32 active_sessions = 5;
}
// Empty message for requests without parameters
message Empty {}

25
requirements.txt Normal file
View File

@@ -0,0 +1,25 @@
# Core dependencies
grpcio==1.60.0
grpcio-tools==1.60.0
websockets==12.0
fastapi==0.109.0
uvicorn[standard]==0.27.0
# Audio processing
numpy==1.24.3
soundfile==0.12.1
librosa==0.10.1
# SimulStreaming/Whisper dependencies
torch>=2.0.0
transformers>=4.36.0
openai-whisper>=20231117
# Utilities
python-multipart==0.0.6
aiofiles==23.2.1
pydantic==2.5.3
python-dotenv==1.0.0
# Monitoring
prometheus-client==0.19.0

518
src/transcription_server.py Normal file
View File

@@ -0,0 +1,518 @@
#!/usr/bin/env python3
"""
Standalone Transcription Service with gRPC and WebSocket support
Optimized for real-time streaming transcription
"""
import os
import sys
import asyncio
import logging
import time
import json
import base64
from typing import Optional, AsyncIterator, Dict, List
from dataclasses import dataclass, asdict
from concurrent import futures
import threading
from datetime import datetime
# Add current directory to path for generated protobuf imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import grpc
import numpy as np
import soundfile
import librosa
import torch
# Add SimulStreaming to path if available
simulstreaming_path = os.environ.get('SIMULSTREAMING_PATH', '/app/SimulStreaming')
if os.path.exists(simulstreaming_path):
sys.path.insert(0, simulstreaming_path)
USE_SIMULSTREAMING = True
try:
from simulstreaming_whisper import simulwhisper_args, simul_asr_factory
except ImportError:
USE_SIMULSTREAMING = False
import whisper
else:
USE_SIMULSTREAMING = False
import whisper
# Import generated protobuf classes (will be generated later)
from transcription_pb2 import (
AudioChunk, AudioFile, TranscriptionResult, TranscriptionResponse,
TranscriptionSegment, Capabilities, HealthStatus, Empty, AudioConfig
)
import transcription_pb2_grpc
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
SAMPLE_RATE = 16000
MAX_AUDIO_LENGTH = 30 * 60 # 30 minutes
@dataclass
class TranscriptionSession:
"""Manages a single transcription session"""
session_id: str
config: AudioConfig
audio_buffer: bytearray
start_time: float
last_activity: float
transcriptions: List[dict]
class TranscriptionEngine:
"""Core transcription engine using Whisper or SimulStreaming"""
def __init__(self, model_name: str = "large-v3"):
self.model_name = model_name
self.model = None
self.processor = None
self.online_processor = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.load_model()
def load_model(self):
"""Load the transcription model"""
if USE_SIMULSTREAMING:
self._load_simulstreaming()
else:
self._load_whisper()
def _load_simulstreaming(self):
"""Load SimulStreaming for real-time transcription"""
try:
import argparse
parser = argparse.ArgumentParser()
# Add SimulStreaming arguments
simulwhisper_args(parser)
args = parser.parse_args([
'--model_path', self.model_name,
'--lan', 'auto',
'--task', 'transcribe',
'--backend', 'whisper',
'--min-chunk-size', '0.5',
'--beams', '1',
])
# Create processor
self.processor, self.online_processor = simul_asr_factory(args)
logger.info(f"Loaded SimulStreaming with model: {self.model_name}")
except Exception as e:
logger.error(f"Failed to load SimulStreaming: {e}")
logger.info("Falling back to standard Whisper")
USE_SIMULSTREAMING = False
self._load_whisper()
def _load_whisper(self):
"""Load standard Whisper model"""
try:
# Use the shared volume for model caching
download_root = os.environ.get('TORCH_HOME', '/app/models')
self.model = whisper.load_model(self.model_name, device=self.device, download_root=download_root)
logger.info(f"Loaded Whisper model: {self.model_name} on {self.device} from {download_root}")
except Exception as e:
logger.error(f"Failed to load Whisper model: {e}")
raise
def is_speech(self, audio: np.ndarray, energy_threshold: float = 0.002, zero_crossing_threshold: int = 50) -> bool:
"""
Simple Voice Activity Detection
Returns True if the audio chunk likely contains speech
"""
# Check if audio is too quiet (likely silence)
energy = np.sqrt(np.mean(audio**2))
if energy < energy_threshold:
return False
# Check zero crossing rate (helps distinguish speech from noise)
zero_crossings = np.sum(np.abs(np.diff(np.sign(audio))) > 0)
# Speech typically has moderate zero crossing rate
# Pure noise tends to have very high zero crossing rate
if zero_crossings > len(audio) * zero_crossing_threshold / SAMPLE_RATE:
return False
return True
def transcribe_chunk(self, audio_data: bytes, language: str = "auto", vad_enabled: bool = True) -> Optional[dict]:
"""Transcribe a single audio chunk"""
try:
# Convert bytes to numpy array
audio = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
# Check if audio contains speech (VAD) - only if enabled
if vad_enabled:
energy = np.sqrt(np.mean(audio**2))
if not self.is_speech(audio):
logger.info(f"No speech detected in audio chunk (energy: {energy:.4f}), skipping transcription")
return None
else:
logger.info(f"Speech detected in chunk (energy: {energy:.4f})")
if USE_SIMULSTREAMING and self.online_processor:
# Use SimulStreaming for real-time processing
self.online_processor.insert_audio_chunk(audio)
result = self.online_processor.process_iter()
if result and result[0] is not None:
return {
'text': result[2],
'start_time': result[0],
'end_time': result[1],
'is_final': True,
'confidence': 0.95 # SimulStreaming doesn't provide confidence
}
else:
# Use standard Whisper
if self.model:
# Pad audio to minimum length if needed
if len(audio) < SAMPLE_RATE:
audio = np.pad(audio, (0, SAMPLE_RATE - len(audio)))
# Use more conservative settings to reduce hallucinations
result = self.model.transcribe(
audio,
language=None if language == "auto" else language,
fp16=self.device == "cuda",
temperature=0.0, # More deterministic, less hallucination
no_speech_threshold=0.6, # Higher threshold for detecting non-speech
logprob_threshold=-1.0, # Filter out low probability results
compression_ratio_threshold=2.4 # Filter out repetitive results
)
if result and result.get('text'):
text = result['text'].strip()
# Filter out common hallucinations
hallucination_phrases = [
"thank you", "thanks", "you", "uh", "um",
"thank you for watching", "please subscribe",
"bye", "bye-bye", ".", "...", ""
]
# Check if the result is just a hallucination
text_lower = text.lower().strip()
if text_lower in hallucination_phrases:
logger.debug(f"Filtered out hallucination: {text}")
return None
# Check for repetitive text (another sign of hallucination)
words = text.lower().split()
if len(words) > 1 and len(set(words)) == 1:
logger.debug(f"Filtered out repetitive text: {text}")
return None
return {
'text': text,
'start_time': 0,
'end_time': len(audio) / SAMPLE_RATE,
'is_final': True,
'confidence': 0.9,
'language': result.get('language', language)
}
except Exception as e:
logger.error(f"Error transcribing chunk: {e}")
return None
def transcribe_file(self, audio_data: bytes, format: str, config: AudioConfig) -> dict:
"""Transcribe a complete audio file"""
try:
# Convert audio to numpy array based on format
if format == "raw_pcm16":
audio = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
else:
# Use librosa for other formats
import io
audio, _ = librosa.load(io.BytesIO(audio_data), sr=SAMPLE_RATE)
# Transcribe with Whisper
if self.model:
result = self.model.transcribe(
audio,
language=None if config.language == "auto" else config.language,
task=config.task or "transcribe",
fp16=self.device == "cuda"
)
segments = []
for seg in result.get('segments', []):
segments.append({
'text': seg['text'].strip(),
'start_time': seg['start'],
'end_time': seg['end'],
'confidence': seg.get('avg_logprob', 0) + 1.0 # Convert to 0-1 range
})
return {
'segments': segments,
'full_text': result['text'].strip(),
'detected_language': result.get('language', config.language),
'duration_seconds': len(audio) / SAMPLE_RATE
}
except Exception as e:
logger.error(f"Error transcribing file: {e}")
return {
'segments': [],
'full_text': '',
'detected_language': 'unknown',
'duration_seconds': 0
}
class TranscriptionServicer(transcription_pb2_grpc.TranscriptionServiceServicer):
"""gRPC service implementation"""
def __init__(self):
self.engine = TranscriptionEngine()
self.sessions: Dict[str, TranscriptionSession] = {}
self.start_time = time.time()
async def StreamTranscribe(self, request_iterator: AsyncIterator[AudioChunk],
context: grpc.aio.ServicerContext) -> AsyncIterator[TranscriptionResult]:
"""Bidirectional streaming transcription"""
session_id = None
config = None
audio_buffer = bytearray()
try:
async for chunk in request_iterator:
# Get session ID and config from first chunk
if not session_id:
session_id = chunk.session_id or str(time.time())
config = chunk.config or AudioConfig(
language="auto",
task="transcribe"
)
# Add audio to buffer
audio_buffer.extend(chunk.audio_data)
# Process when we have enough audio (3 seconds for better accuracy)
min_bytes = int(SAMPLE_RATE * 3.0 * 2) # 3 seconds of PCM16
while len(audio_buffer) >= min_bytes:
# Extract chunk to process
audio_chunk = bytes(audio_buffer[:min_bytes])
audio_buffer = audio_buffer[min_bytes:]
# Transcribe
logger.debug(f"Processing audio chunk of {len(audio_chunk)} bytes")
result = self.engine.transcribe_chunk(
audio_chunk,
language=config.language,
vad_enabled=config.vad_enabled if config else False
)
logger.debug(f"Transcription result: {result}")
if result:
# Send transcription result
yield TranscriptionResult(
text=result['text'],
start_time=result['start_time'],
end_time=result['end_time'],
is_final=result['is_final'],
confidence=result.get('confidence', 0.9),
language=result.get('language', config.language),
session_id=session_id,
timestamp_ms=int(time.time() * 1000)
)
# Process remaining audio
if audio_buffer:
result = self.engine.transcribe_chunk(
bytes(audio_buffer),
language=config.language,
vad_enabled=config.vad_enabled if config else False
)
if result:
yield TranscriptionResult(
text=result['text'],
start_time=result['start_time'],
end_time=result['end_time'],
is_final=True,
confidence=result.get('confidence', 0.9),
language=result.get('language', config.language),
session_id=session_id,
timestamp_ms=int(time.time() * 1000)
)
except Exception as e:
logger.error(f"Error in StreamTranscribe: {e}")
context.abort(grpc.StatusCode.INTERNAL, str(e))
async def TranscribeFile(self, request: AudioFile, context: grpc.aio.ServicerContext) -> TranscriptionResponse:
"""Transcribe a complete audio file"""
try:
result = self.engine.transcribe_file(
request.audio_data,
request.format,
request.config
)
segments = [
TranscriptionSegment(
text=seg['text'],
start_time=seg['start_time'],
end_time=seg['end_time'],
confidence=seg['confidence']
)
for seg in result['segments']
]
return TranscriptionResponse(
segments=segments,
full_text=result['full_text'],
detected_language=result['detected_language'],
duration_seconds=result['duration_seconds']
)
except Exception as e:
logger.error(f"Error in TranscribeFile: {e}")
context.abort(grpc.StatusCode.INTERNAL, str(e))
async def GetCapabilities(self, request: Empty, context: grpc.aio.ServicerContext) -> Capabilities:
"""Get service capabilities"""
return Capabilities(
available_models=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"],
supported_languages=["auto", "en", "es", "fr", "de", "it", "pt", "ru", "zh", "ja", "ko"],
supported_formats=["wav", "mp3", "webm", "raw_pcm16"],
max_audio_length_seconds=MAX_AUDIO_LENGTH,
streaming_supported=True,
vad_supported=False # Can be implemented later
)
async def HealthCheck(self, request: Empty, context: grpc.aio.ServicerContext) -> HealthStatus:
"""Health check endpoint"""
return HealthStatus(
healthy=True,
status="running",
model_loaded=self.engine.model_name,
uptime_seconds=int(time.time() - self.start_time),
active_sessions=len(self.sessions)
)
async def serve_grpc(port: int = 50051):
"""Start the gRPC server"""
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
('grpc.max_send_message_length', 100 * 1024 * 1024), # 100MB
('grpc.max_receive_message_length', 100 * 1024 * 1024),
]
)
servicer = TranscriptionServicer()
transcription_pb2_grpc.add_TranscriptionServiceServicer_to_server(servicer, server)
server.add_insecure_port(f'[::]:{port}')
await server.start()
logger.info(f"gRPC server started on port {port}")
await server.wait_for_termination()
# WebSocket support for compatibility
async def handle_websocket(websocket, path):
"""Handle WebSocket connections for compatibility"""
import websockets
engine = TranscriptionEngine()
session_id = str(time.time())
audio_buffer = bytearray()
try:
# Send connection confirmation
await websocket.send(json.dumps({
'type': 'connected',
'session_id': session_id
}))
async for message in websocket:
data = json.loads(message)
if data['type'] == 'audio':
# Decode base64 audio
audio_data = base64.b64decode(data['data'])
audio_buffer.extend(audio_data)
# Process when we have enough audio
min_bytes = int(SAMPLE_RATE * 0.5 * 2)
while len(audio_buffer) >= min_bytes:
chunk = bytes(audio_buffer[:min_bytes])
audio_buffer = audio_buffer[min_bytes:]
result = engine.transcribe_chunk(chunk)
if result:
await websocket.send(json.dumps({
'type': 'transcription',
'text': result['text'],
'start_time': result['start_time'],
'end_time': result['end_time'],
'is_final': result['is_final'],
'timestamp': int(time.time() * 1000)
}))
elif data['type'] == 'stop':
# Process remaining audio
if audio_buffer:
result = engine.transcribe_chunk(bytes(audio_buffer))
if result:
await websocket.send(json.dumps({
'type': 'transcription',
'text': result['text'],
'is_final': True,
'timestamp': int(time.time() * 1000)
}))
break
except websockets.exceptions.ConnectionClosed:
logger.info(f"WebSocket connection closed: {session_id}")
except Exception as e:
logger.error(f"WebSocket error: {e}")
async def serve_websocket(port: int = 8765):
"""Start the WebSocket server"""
import websockets
logger.info(f"WebSocket server started on port {port}")
async with websockets.serve(handle_websocket, "0.0.0.0", port):
await asyncio.Future() # Run forever
async def main():
"""Main entry point"""
grpc_port = int(os.environ.get('GRPC_PORT', '50051'))
ws_port = int(os.environ.get('WEBSOCKET_PORT', '8765'))
enable_websocket = os.environ.get('ENABLE_WEBSOCKET', 'true').lower() == 'true'
tasks = [serve_grpc(grpc_port)]
if enable_websocket:
tasks.append(serve_websocket(ws_port))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())