mirror of
https://github.com/aljazceru/vibeline.git
synced 2026-01-15 20:44:27 +01:00
105 lines
3.6 KiB
Python
Executable File
105 lines
3.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
from pathlib import Path
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
class VoiceMemoHandler(FileSystemEventHandler):
|
|
def __init__(self, voice_memo_dir: Path, transcript_dir: Path):
|
|
self.voice_memo_dir = voice_memo_dir
|
|
self.transcript_dir = transcript_dir
|
|
self.processing_lock = False
|
|
|
|
def on_created(self, event):
|
|
if self.processing_lock:
|
|
return
|
|
|
|
if not event.is_directory:
|
|
file_path = Path(event.src_path)
|
|
|
|
# Handle new voice memo
|
|
if file_path.parent == self.voice_memo_dir and file_path.suffix.lower() == '.m4a':
|
|
print(f"\nNew voice memo detected: {file_path.name}")
|
|
self.process_voice_memo()
|
|
|
|
# Handle new transcript
|
|
elif file_path.parent == self.transcript_dir and file_path.suffix.lower() == '.txt':
|
|
print(f"\nNew transcript detected: {file_path.name}")
|
|
self.process_transcript()
|
|
|
|
def process_voice_memo(self):
|
|
"""Run the voice memo processing script"""
|
|
try:
|
|
self.processing_lock = True
|
|
print("Processing voice memo...")
|
|
result = subprocess.run(['./process_voice_memos.sh'],
|
|
capture_output=True,
|
|
text=True)
|
|
if result.returncode == 0:
|
|
print("Voice memo processing completed successfully")
|
|
else:
|
|
print(f"Error processing voice memo: {result.stderr}")
|
|
except Exception as e:
|
|
print(f"Error running voice memo script: {str(e)}")
|
|
finally:
|
|
self.processing_lock = False
|
|
|
|
def process_transcript(self):
|
|
"""Run the transcript summarization script"""
|
|
try:
|
|
self.processing_lock = True
|
|
print("Processing transcript...")
|
|
result = subprocess.run(['./summarize_transcripts.py'],
|
|
capture_output=True,
|
|
text=True)
|
|
if result.returncode == 0:
|
|
print("Transcript processing completed successfully")
|
|
else:
|
|
print(f"Error processing transcript: {result.stderr}")
|
|
except Exception as e:
|
|
print(f"Error running summarization script: {str(e)}")
|
|
finally:
|
|
self.processing_lock = False
|
|
|
|
def main():
|
|
# Set up directory paths
|
|
voice_memo_dir = Path("VoiceMemos")
|
|
transcript_dir = voice_memo_dir / "transcripts"
|
|
|
|
# Verify directories exist
|
|
if not voice_memo_dir.exists():
|
|
print(f"Error: {voice_memo_dir} directory does not exist")
|
|
sys.exit(1)
|
|
if not transcript_dir.exists():
|
|
print(f"Error: {transcript_dir} directory does not exist")
|
|
sys.exit(1)
|
|
|
|
# Set up event handler and observer
|
|
event_handler = VoiceMemoHandler(voice_memo_dir, transcript_dir)
|
|
observer = Observer()
|
|
|
|
# Watch both directories
|
|
observer.schedule(event_handler, str(voice_memo_dir), recursive=False)
|
|
observer.schedule(event_handler, str(transcript_dir), recursive=False)
|
|
|
|
# Start the observer
|
|
observer.start()
|
|
print(f"\nWatching for changes in:")
|
|
print(f"- Voice memos: {voice_memo_dir}")
|
|
print(f"- Transcripts: {transcript_dir}")
|
|
print("\nPress Ctrl+C to stop...")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
observer.stop()
|
|
print("\nStopping file watcher...")
|
|
|
|
observer.join()
|
|
|
|
if __name__ == "__main__":
|
|
main() |