From 13b4384297724f051bf6adf4b4b02e34e9f0358f Mon Sep 17 00:00:00 2001 From: Gigi Date: Tue, 1 Apr 2025 11:03:01 +0100 Subject: [PATCH] Restructure voice memo processing to handle files one at a time with immediate summarization --- src/process_voice_memos.sh | 76 ++++++++-------- src/watch_voice_memos.py | 172 +++++++++++++++++++++++++------------ 2 files changed, 156 insertions(+), 92 deletions(-) diff --git a/src/process_voice_memos.sh b/src/process_voice_memos.sh index 643519d..59dfc4e 100755 --- a/src/process_voice_memos.sh +++ b/src/process_voice_memos.sh @@ -1,49 +1,47 @@ #!/bin/bash +# Check if a file argument was provided +if [ $# -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi + +# Get the input file +input_file="$1" + +# Check if the input file exists +if [ ! -f "$input_file" ]; then + echo "Error: File $input_file does not exist" + exit 1 +fi + # Set the directory paths VOICE_MEMO_DIR="VoiceMemos" TRANSCRIPT_DIR="$VOICE_MEMO_DIR/transcripts" -# Check if the voice memo directory exists -if [ ! -d "$VOICE_MEMO_DIR" ]; then - echo "Error: $VOICE_MEMO_DIR directory does not exist" - exit 1 -fi - # Create transcripts directory if it doesn't exist mkdir -p "$TRANSCRIPT_DIR" -# Check if there are any m4a files in the directory -if ! ls "$VOICE_MEMO_DIR"/*.m4a 1> /dev/null 2>&1; then - echo "No m4a files found in $VOICE_MEMO_DIR" - exit 0 -fi +# Get the filename without the path and extension +filename=$(basename "$input_file" .m4a) +transcript_file="$TRANSCRIPT_DIR/$filename.txt" -# Activate the virtual environment -source vibenv/bin/activate - -# Process each m4a file -for file in "$VOICE_MEMO_DIR"/*.m4a; do - if [ -f "$file" ]; then - # Get the filename without the path and extension - filename=$(basename "$file" .m4a) - transcript_file="$TRANSCRIPT_DIR/$filename.txt" - - # Only process if transcript doesn't exist - if [ ! -f "$transcript_file" ]; then - echo "Processing file: $file" - echo "Transcribing audio..." - - # Use whisper to transcribe the audio with tiny model - whisper "$file" --model tiny --output_dir "$TRANSCRIPT_DIR" --output_format txt - - echo "Transcription saved to: $transcript_file" - echo "----------------------------------------" - else - echo "Skipping $file - transcript already exists" - fi - fi -done - -# Deactivate the virtual environment -deactivate \ No newline at end of file +# Only process if transcript doesn't exist +if [ ! -f "$transcript_file" ]; then + echo "Processing file: $input_file" + echo "Transcribing audio..." + + # Activate the virtual environment + source vibenv/bin/activate + + # Use whisper to transcribe the audio with tiny model + whisper "$input_file" --model tiny --output_dir "$TRANSCRIPT_DIR" --output_format txt + + # Deactivate the virtual environment + deactivate + + echo "Transcription saved to: $transcript_file" + echo "----------------------------------------" +else + echo "Skipping $input_file - transcript already exists" +fi \ No newline at end of file diff --git a/src/watch_voice_memos.py b/src/watch_voice_memos.py index 8037318..28c3f53 100755 --- a/src/watch_voice_memos.py +++ b/src/watch_voice_memos.py @@ -3,18 +3,70 @@ import sys import time import subprocess +import re +import ollama from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler +def generate_summary(transcript_text: str) -> str: + """Generate a summary of the transcript.""" + prompt_dir = Path(__file__).parent.parent / "prompts" + with open(prompt_dir / "summary.md", 'r', encoding='utf-8') as f: + prompt_template = f.read() + + prompt = prompt_template.format(transcript=transcript_text) + response = ollama.chat(model='llama2', messages=[ + { + 'role': 'user', + 'content': prompt + } + ]) + return response['message']['content'].strip() + +def determine_content_type(transcript_text: str) -> str: + """Determine the type of content in the transcript.""" + text = transcript_text.lower() + + if re.search(r'\bblog post\b', text) or re.search(r'\bdraft\b', text): + return "blog_post" + elif re.search(r'\bidea\b', text) and re.search(r'\bapp\b', text): + return "idea_app" + return "default" + +def generate_additional_content(content_type: str, transcript_text: str, summary: str) -> str: + """Generate additional content based on the content type.""" + prompt_dir = Path(__file__).parent.parent / "prompts" + with open(prompt_dir / f"{content_type}.md", 'r', encoding='utf-8') as f: + prompt_template = f.read() + + prompt = prompt_template.format(transcript=transcript_text, summary=summary) + response = ollama.chat(model='llama2', messages=[ + { + 'role': 'user', + 'content': prompt + } + ]) + return response['message']['content'].strip() + +def count_words(text: str) -> int: + """Count the number of words in a text string.""" + return len(text.split()) + class VoiceMemoHandler(FileSystemEventHandler): def __init__(self, voice_memo_dir: Path, transcript_dir: Path, summary_dir: Path): self.voice_memo_dir = voice_memo_dir.resolve() # Store resolved path self.transcript_dir = transcript_dir.resolve() # Store resolved path self.summary_dir = summary_dir.resolve() # Store resolved path + self.draft_dir = voice_memo_dir.parent / "drafts" # New directory for blog posts + self.prompt_dir = voice_memo_dir.parent / "prompts" # New directory for app ideas self.processing_lock = False self.base_dir = Path(__file__).parent.parent + # Create additional directories if they don't exist + self.draft_dir.mkdir(parents=True, exist_ok=True) + self.prompt_dir.mkdir(parents=True, exist_ok=True) + def on_created(self, event): if self.processing_lock: return @@ -25,12 +77,7 @@ class VoiceMemoHandler(FileSystemEventHandler): # Handle new voice memo if file_path.parent == self.voice_memo_dir and file_path.suffix.lower() == '.m4a': print(f"\nNew voice memo detected: {file_path.name}") - self.process_voice_memo() - - # Handle new transcript - elif file_path.parent == self.transcript_dir and file_path.suffix.lower() == '.txt': - print(f"\nNew transcript detected: {file_path.name}") - self.process_transcript() + self.process_voice_memo(file_path) def on_modified(self, event): if self.processing_lock: @@ -42,12 +89,7 @@ class VoiceMemoHandler(FileSystemEventHandler): # Handle modified voice memo if file_path.parent == self.voice_memo_dir and file_path.suffix.lower() == '.m4a': print(f"\nVoice memo modified: {file_path.name}") - self.process_voice_memo() - - # Handle modified transcript - elif file_path.parent == self.transcript_dir and file_path.suffix.lower() == '.txt': - print(f"\nTranscript modified: {file_path.name}") - self.process_transcript() + self.process_voice_memo(file_path) def on_deleted(self, event): if self.processing_lock: @@ -60,55 +102,83 @@ class VoiceMemoHandler(FileSystemEventHandler): if file_path.parent == self.voice_memo_dir and file_path.suffix.lower() == '.m4a': print(f"\nVoice memo deleted: {file_path.name}") print(" Note: Corresponding transcript and summary files remain unchanged") - - # Handle deleted transcript - elif file_path.parent == self.transcript_dir and file_path.suffix.lower() == '.txt': - print(f"\nTranscript deleted: {file_path.name}") - print(" Note: Corresponding summary file remains unchanged") - # Reprocess voice memos to regenerate the transcript - print(" Regenerating transcript...") - self.process_voice_memo() - - # Handle deleted summary - elif file_path.parent == self.summary_dir and file_path.suffix.lower() == '.txt': - print(f"\nSummary deleted: {file_path.name}") - # Regenerate the summary - print(" Regenerating summary...") - self.process_transcript() - def process_voice_memo(self): - """Run the voice memo processing script""" + def process_voice_memo(self, voice_memo_path: Path): + """Process a single voice memo file""" try: self.processing_lock = True - print("Processing voice memo...") - result = subprocess.run([str(self.base_dir / 'src' / 'process_voice_memos.sh')], + print(f"Processing voice memo: {voice_memo_path.name}") + + # Generate transcript + result = subprocess.run([str(self.base_dir / 'src' / 'process_voice_memos.sh'), str(voice_memo_path)], capture_output=True, text=True) + if result.returncode == 0: - print("Voice memo processing completed successfully") + print("Voice memo transcription completed successfully") + + # Get the transcript file path + transcript_file = self.transcript_dir / f"{voice_memo_path.stem}.txt" + if not transcript_file.exists(): + print(f"Error: Transcript file not found at {transcript_file}") + return + + # Process the transcript + self.process_transcript(transcript_file) else: - print(f"Error processing voice memo: {result.stderr}") + print(f"Error transcribing voice memo: {result.stderr}") + except Exception as e: - print(f"Error running voice memo script: {str(e)}") + print(f"Error processing voice memo: {str(e)}") finally: self.processing_lock = False - def process_transcript(self): - """Run the transcript summarization script""" + def process_transcript(self, transcript_file: Path): + """Process a single transcript file""" try: - self.processing_lock = True - print("Processing transcript...") - result = subprocess.run(['python', str(self.base_dir / 'src' / 'summarize_transcripts.py')], - capture_output=True, - text=True) - if result.returncode == 0: - print("Transcript processing completed successfully") - else: - print(f"Error processing transcript: {result.stderr}") + print(f"Processing transcript: {transcript_file.name}") + + # Read transcript + with open(transcript_file, 'r', encoding='utf-8') as f: + transcript_text = f.read() + + word_count = count_words(transcript_text) + print(f" Read transcript ({len(transcript_text)} characters, {word_count} words)") + + # Skip if transcript is too short + if word_count <= 210: + print(" Transcript is too short (≤210 words), skipping processing") + return + + # Generate summary + print(" Generating summary...") + summary = generate_summary(transcript_text) + + # Save summary + summary_file = self.summary_dir / f"{transcript_file.stem}_summary.txt" + with open(summary_file, 'w', encoding='utf-8') as f: + f.write(summary) + print(f" Summary saved to {summary_file}") + + # Determine content type and generate additional content if needed + content_type = determine_content_type(transcript_text) + if content_type != "default": + print(f" Generating additional content for type: {content_type}") + additional_content = generate_additional_content(content_type, transcript_text, summary) + + # Save to appropriate directory with timestamp + timestamp = time.strftime("%Y%m%d_%H%M%S") + if content_type == "blog_post": + output_file = self.draft_dir / f"{transcript_file.stem}_{timestamp}.md" + else: # idea_app + output_file = self.prompt_dir / f"{transcript_file.stem}_{timestamp}.md" + + with open(output_file, 'w', encoding='utf-8') as f: + f.write(additional_content) + print(f" Additional content saved to {output_file}") + except Exception as e: - print(f"Error running summarization script: {str(e)}") - finally: - self.processing_lock = False + print(f"Error processing transcript: {str(e)}") def main(): # Set up directory paths @@ -139,17 +209,13 @@ def main(): event_handler = VoiceMemoHandler(voice_memo_dir, transcript_dir, summary_dir) observer = Observer() - # Watch all directories using resolved paths to handle symlinks + # Watch voice memo directory only (we'll handle transcripts immediately after creation) observer.schedule(event_handler, str(voice_memo_dir.resolve()), recursive=False) - observer.schedule(event_handler, str(transcript_dir.resolve()), recursive=False) - observer.schedule(event_handler, str(summary_dir.resolve()), recursive=False) # Start the observer observer.start() print(f"\nWatching for changes in:") print(f"- Voice memos: {voice_memo_dir.resolve()}") - print(f"- Transcripts: {transcript_dir.resolve()}") - print(f"- Summaries: {summary_dir.resolve()}") print("\nPress Ctrl+C to stop...") try: