Restructure voice memo processing to handle files one at a time with immediate summarization

This commit is contained in:
Gigi
2025-04-01 11:03:01 +01:00
parent d3b2348a54
commit 13b4384297
2 changed files with 156 additions and 92 deletions

View File

@@ -1,49 +1,47 @@
#!/bin/bash
# Check if a file argument was provided
if [ $# -ne 1 ]; then
echo "Usage: $0 <voice_memo_file>"
exit 1
fi
# Get the input file
input_file="$1"
# Check if the input file exists
if [ ! -f "$input_file" ]; then
echo "Error: File $input_file does not exist"
exit 1
fi
# Set the directory paths
VOICE_MEMO_DIR="VoiceMemos"
TRANSCRIPT_DIR="$VOICE_MEMO_DIR/transcripts"
# Check if the voice memo directory exists
if [ ! -d "$VOICE_MEMO_DIR" ]; then
echo "Error: $VOICE_MEMO_DIR directory does not exist"
exit 1
fi
# Create transcripts directory if it doesn't exist
mkdir -p "$TRANSCRIPT_DIR"
# Check if there are any m4a files in the directory
if ! ls "$VOICE_MEMO_DIR"/*.m4a 1> /dev/null 2>&1; then
echo "No m4a files found in $VOICE_MEMO_DIR"
exit 0
fi
# Get the filename without the path and extension
filename=$(basename "$input_file" .m4a)
transcript_file="$TRANSCRIPT_DIR/$filename.txt"
# Activate the virtual environment
source vibenv/bin/activate
# Process each m4a file
for file in "$VOICE_MEMO_DIR"/*.m4a; do
if [ -f "$file" ]; then
# Get the filename without the path and extension
filename=$(basename "$file" .m4a)
transcript_file="$TRANSCRIPT_DIR/$filename.txt"
# Only process if transcript doesn't exist
if [ ! -f "$transcript_file" ]; then
echo "Processing file: $file"
echo "Transcribing audio..."
# Use whisper to transcribe the audio with tiny model
whisper "$file" --model tiny --output_dir "$TRANSCRIPT_DIR" --output_format txt
echo "Transcription saved to: $transcript_file"
echo "----------------------------------------"
else
echo "Skipping $file - transcript already exists"
fi
fi
done
# Deactivate the virtual environment
deactivate
# Only process if transcript doesn't exist
if [ ! -f "$transcript_file" ]; then
echo "Processing file: $input_file"
echo "Transcribing audio..."
# Activate the virtual environment
source vibenv/bin/activate
# Use whisper to transcribe the audio with tiny model
whisper "$input_file" --model tiny --output_dir "$TRANSCRIPT_DIR" --output_format txt
# Deactivate the virtual environment
deactivate
echo "Transcription saved to: $transcript_file"
echo "----------------------------------------"
else
echo "Skipping $input_file - transcript already exists"
fi

View File

@@ -3,18 +3,70 @@
import sys
import time
import subprocess
import re
import ollama
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
def generate_summary(transcript_text: str) -> str:
"""Generate a summary of the transcript."""
prompt_dir = Path(__file__).parent.parent / "prompts"
with open(prompt_dir / "summary.md", 'r', encoding='utf-8') as f:
prompt_template = f.read()
prompt = prompt_template.format(transcript=transcript_text)
response = ollama.chat(model='llama2', messages=[
{
'role': 'user',
'content': prompt
}
])
return response['message']['content'].strip()
def determine_content_type(transcript_text: str) -> str:
"""Determine the type of content in the transcript."""
text = transcript_text.lower()
if re.search(r'\bblog post\b', text) or re.search(r'\bdraft\b', text):
return "blog_post"
elif re.search(r'\bidea\b', text) and re.search(r'\bapp\b', text):
return "idea_app"
return "default"
def generate_additional_content(content_type: str, transcript_text: str, summary: str) -> str:
"""Generate additional content based on the content type."""
prompt_dir = Path(__file__).parent.parent / "prompts"
with open(prompt_dir / f"{content_type}.md", 'r', encoding='utf-8') as f:
prompt_template = f.read()
prompt = prompt_template.format(transcript=transcript_text, summary=summary)
response = ollama.chat(model='llama2', messages=[
{
'role': 'user',
'content': prompt
}
])
return response['message']['content'].strip()
def count_words(text: str) -> int:
"""Count the number of words in a text string."""
return len(text.split())
class VoiceMemoHandler(FileSystemEventHandler):
def __init__(self, voice_memo_dir: Path, transcript_dir: Path, summary_dir: Path):
self.voice_memo_dir = voice_memo_dir.resolve() # Store resolved path
self.transcript_dir = transcript_dir.resolve() # Store resolved path
self.summary_dir = summary_dir.resolve() # Store resolved path
self.draft_dir = voice_memo_dir.parent / "drafts" # New directory for blog posts
self.prompt_dir = voice_memo_dir.parent / "prompts" # New directory for app ideas
self.processing_lock = False
self.base_dir = Path(__file__).parent.parent
# Create additional directories if they don't exist
self.draft_dir.mkdir(parents=True, exist_ok=True)
self.prompt_dir.mkdir(parents=True, exist_ok=True)
def on_created(self, event):
if self.processing_lock:
return
@@ -25,12 +77,7 @@ class VoiceMemoHandler(FileSystemEventHandler):
# Handle new voice memo
if file_path.parent == self.voice_memo_dir and file_path.suffix.lower() == '.m4a':
print(f"\nNew voice memo detected: {file_path.name}")
self.process_voice_memo()
# Handle new transcript
elif file_path.parent == self.transcript_dir and file_path.suffix.lower() == '.txt':
print(f"\nNew transcript detected: {file_path.name}")
self.process_transcript()
self.process_voice_memo(file_path)
def on_modified(self, event):
if self.processing_lock:
@@ -42,12 +89,7 @@ class VoiceMemoHandler(FileSystemEventHandler):
# Handle modified voice memo
if file_path.parent == self.voice_memo_dir and file_path.suffix.lower() == '.m4a':
print(f"\nVoice memo modified: {file_path.name}")
self.process_voice_memo()
# Handle modified transcript
elif file_path.parent == self.transcript_dir and file_path.suffix.lower() == '.txt':
print(f"\nTranscript modified: {file_path.name}")
self.process_transcript()
self.process_voice_memo(file_path)
def on_deleted(self, event):
if self.processing_lock:
@@ -60,55 +102,83 @@ class VoiceMemoHandler(FileSystemEventHandler):
if file_path.parent == self.voice_memo_dir and file_path.suffix.lower() == '.m4a':
print(f"\nVoice memo deleted: {file_path.name}")
print(" Note: Corresponding transcript and summary files remain unchanged")
# Handle deleted transcript
elif file_path.parent == self.transcript_dir and file_path.suffix.lower() == '.txt':
print(f"\nTranscript deleted: {file_path.name}")
print(" Note: Corresponding summary file remains unchanged")
# Reprocess voice memos to regenerate the transcript
print(" Regenerating transcript...")
self.process_voice_memo()
# Handle deleted summary
elif file_path.parent == self.summary_dir and file_path.suffix.lower() == '.txt':
print(f"\nSummary deleted: {file_path.name}")
# Regenerate the summary
print(" Regenerating summary...")
self.process_transcript()
def process_voice_memo(self):
"""Run the voice memo processing script"""
def process_voice_memo(self, voice_memo_path: Path):
"""Process a single voice memo file"""
try:
self.processing_lock = True
print("Processing voice memo...")
result = subprocess.run([str(self.base_dir / 'src' / 'process_voice_memos.sh')],
print(f"Processing voice memo: {voice_memo_path.name}")
# Generate transcript
result = subprocess.run([str(self.base_dir / 'src' / 'process_voice_memos.sh'), str(voice_memo_path)],
capture_output=True,
text=True)
if result.returncode == 0:
print("Voice memo processing completed successfully")
print("Voice memo transcription completed successfully")
# Get the transcript file path
transcript_file = self.transcript_dir / f"{voice_memo_path.stem}.txt"
if not transcript_file.exists():
print(f"Error: Transcript file not found at {transcript_file}")
return
# Process the transcript
self.process_transcript(transcript_file)
else:
print(f"Error processing voice memo: {result.stderr}")
print(f"Error transcribing voice memo: {result.stderr}")
except Exception as e:
print(f"Error running voice memo script: {str(e)}")
print(f"Error processing voice memo: {str(e)}")
finally:
self.processing_lock = False
def process_transcript(self):
"""Run the transcript summarization script"""
def process_transcript(self, transcript_file: Path):
"""Process a single transcript file"""
try:
self.processing_lock = True
print("Processing transcript...")
result = subprocess.run(['python', str(self.base_dir / 'src' / 'summarize_transcripts.py')],
capture_output=True,
text=True)
if result.returncode == 0:
print("Transcript processing completed successfully")
else:
print(f"Error processing transcript: {result.stderr}")
print(f"Processing transcript: {transcript_file.name}")
# Read transcript
with open(transcript_file, 'r', encoding='utf-8') as f:
transcript_text = f.read()
word_count = count_words(transcript_text)
print(f" Read transcript ({len(transcript_text)} characters, {word_count} words)")
# Skip if transcript is too short
if word_count <= 210:
print(" Transcript is too short (≤210 words), skipping processing")
return
# Generate summary
print(" Generating summary...")
summary = generate_summary(transcript_text)
# Save summary
summary_file = self.summary_dir / f"{transcript_file.stem}_summary.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(summary)
print(f" Summary saved to {summary_file}")
# Determine content type and generate additional content if needed
content_type = determine_content_type(transcript_text)
if content_type != "default":
print(f" Generating additional content for type: {content_type}")
additional_content = generate_additional_content(content_type, transcript_text, summary)
# Save to appropriate directory with timestamp
timestamp = time.strftime("%Y%m%d_%H%M%S")
if content_type == "blog_post":
output_file = self.draft_dir / f"{transcript_file.stem}_{timestamp}.md"
else: # idea_app
output_file = self.prompt_dir / f"{transcript_file.stem}_{timestamp}.md"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(additional_content)
print(f" Additional content saved to {output_file}")
except Exception as e:
print(f"Error running summarization script: {str(e)}")
finally:
self.processing_lock = False
print(f"Error processing transcript: {str(e)}")
def main():
# Set up directory paths
@@ -139,17 +209,13 @@ def main():
event_handler = VoiceMemoHandler(voice_memo_dir, transcript_dir, summary_dir)
observer = Observer()
# Watch all directories using resolved paths to handle symlinks
# Watch voice memo directory only (we'll handle transcripts immediately after creation)
observer.schedule(event_handler, str(voice_memo_dir.resolve()), recursive=False)
observer.schedule(event_handler, str(transcript_dir.resolve()), recursive=False)
observer.schedule(event_handler, str(summary_dir.resolve()), recursive=False)
# Start the observer
observer.start()
print(f"\nWatching for changes in:")
print(f"- Voice memos: {voice_memo_dir.resolve()}")
print(f"- Transcripts: {transcript_dir.resolve()}")
print(f"- Summaries: {summary_dir.resolve()}")
print("\nPress Ctrl+C to stop...")
try: