mirror of
https://github.com/aljazceru/vibeline.git
synced 2026-01-14 20:14:35 +01:00
Simplify watch_voice_memos.py and improve logging
This commit is contained in:
@@ -12,16 +12,19 @@ from watchdog.events import FileSystemEventHandler
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
# Set up logging
|
||||
# Set up logging with more detailed formatting
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
format='%(asctime)s - %(levelname)s - [%(name)s] %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S',
|
||||
handlers=[
|
||||
logging.FileHandler('voice_memo_watcher.log'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger('voice_memo_watcher')
|
||||
|
||||
def generate_summary(transcript_text: str) -> str:
|
||||
"""Generate a summary of the transcript."""
|
||||
prompt_dir = Path(__file__).parent.parent / "prompts"
|
||||
@@ -70,49 +73,68 @@ def get_file_modification_time(file_path):
|
||||
"""Get the last modification time of a file."""
|
||||
return os.path.getmtime(file_path)
|
||||
|
||||
def process_voice_memo(file_path):
|
||||
def process_voice_memo(file_path: Path) -> None:
|
||||
"""Process a voice memo file using process.sh."""
|
||||
try:
|
||||
logging.info(f"Processing voice memo: {file_path}")
|
||||
subprocess.run(['./process.sh', str(file_path)], check=True)
|
||||
logging.info(f"Successfully processed: {file_path}")
|
||||
logger.info(f"Processing voice memo: {file_path.name}")
|
||||
result = subprocess.run(
|
||||
['./process.sh', str(file_path)],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
logger.debug(f"Process output: {result.stdout}")
|
||||
logger.info(f"Successfully processed: {file_path.name}")
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error(f"Error processing {file_path}: {e}")
|
||||
logger.error(f"Error processing {file_path.name}: {e}")
|
||||
logger.debug(f"Error output: {e.stderr}")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error processing {file_path.name}: {e}")
|
||||
|
||||
def watch_voice_memos():
|
||||
def watch_voice_memos() -> None:
|
||||
"""Watch the VoiceMemos directory for new or modified .m4a files."""
|
||||
voice_memos_dir = Path("VoiceMemos")
|
||||
processed_files = {} # Dictionary to store file paths and their last modification times
|
||||
|
||||
logging.info("Starting voice memo watcher...")
|
||||
logging.info(f"Watching directory: {voice_memos_dir}")
|
||||
# Verify the VoiceMemos directory exists
|
||||
if not voice_memos_dir.exists():
|
||||
logger.error(f"VoiceMemos directory does not exist at: {voice_memos_dir.absolute()}")
|
||||
return
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info("Starting voice memo watcher")
|
||||
logger.info(f"Watching directory: {voice_memos_dir.absolute()}")
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Recursively find all .m4a files
|
||||
current_files = set()
|
||||
for file_path in voice_memos_dir.rglob("*.m4a"):
|
||||
current_mtime = get_file_modification_time(file_path)
|
||||
|
||||
# Check if file is new or modified
|
||||
if str(file_path) not in processed_files or processed_files[str(file_path)] != current_mtime:
|
||||
logging.info(f"New or modified file detected: {file_path}")
|
||||
process_voice_memo(file_path)
|
||||
processed_files[str(file_path)] = current_mtime
|
||||
current_files.add(str(file_path))
|
||||
try:
|
||||
current_mtime = os.path.getmtime(file_path)
|
||||
|
||||
# Check if file is new or modified
|
||||
if str(file_path) not in processed_files or processed_files[str(file_path)] != current_mtime:
|
||||
process_voice_memo(file_path)
|
||||
processed_files[str(file_path)] = current_mtime
|
||||
except FileNotFoundError:
|
||||
logger.debug(f"File disappeared while processing: {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking file {file_path}: {e}")
|
||||
|
||||
# Check for deleted files
|
||||
for file_path in list(processed_files.keys()):
|
||||
if not os.path.exists(file_path):
|
||||
logging.info(f"File deleted: {file_path}")
|
||||
del processed_files[file_path]
|
||||
deleted_files = set(processed_files.keys()) - current_files
|
||||
for file_path in deleted_files:
|
||||
logger.info(f"File removed: {Path(file_path).name}")
|
||||
del processed_files[file_path]
|
||||
|
||||
time.sleep(1) # Wait for 1 second before next check
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Voice memo watcher stopped by user")
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error(f"Error in watch_voice_memos: {e}")
|
||||
time.sleep(5) # Wait longer if there's an error
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Watcher stopped by user")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in watcher: {e}")
|
||||
raise # Re-raise the exception after logging
|
||||
|
||||
def main():
|
||||
# Set up argument parser
|
||||
@@ -170,4 +192,8 @@ def main():
|
||||
observer.join()
|
||||
|
||||
if __name__ == "__main__":
|
||||
watch_voice_memos()
|
||||
try:
|
||||
watch_voice_memos()
|
||||
except Exception as e:
|
||||
logger.critical(f"Fatal error: {e}")
|
||||
raise
|
||||
Reference in New Issue
Block a user