Source code for continuity.core.inbox

"""
Inbox functionality for Continuity.

Provides automatic processing of files dropped into the inbox directory,
adding metadata and moving them to the appropriate mailbox.
"""

import re
import shutil
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import yaml

from ..config.base import ContinuityConfig


[docs] class InboxProcessor: """Processes files dropped into the inbox directory.""" def __init__(self, base_path: Path, config: ContinuityConfig): """Initialize inbox processor. Args: base_path: Base directory containing .continuity config: Configuration instance """ self.config = config self.base_path = base_path self.inbox_path = base_path / "inbox" self.archive_path = self.inbox_path / "archive"
[docs] def ensure_directories(self) -> None: """Create inbox directories if they don't exist.""" self.inbox_path.mkdir(parents=True, exist_ok=True) self.archive_path.mkdir(parents=True, exist_ok=True)
[docs] def scan_inbox(self) -> List[Path]: """Get list of files in inbox to process. Returns: List of file paths to process """ if not self.inbox_path.exists(): return [] files = [] for item in self.inbox_path.iterdir(): # Skip directories and hidden files if ( item.is_file() and not item.name.startswith(".") and item.parent != self.archive_path ): files.append(item) return sorted(files) # Process in alphabetical order
[docs] def extract_metadata(self, content: str) -> Tuple[Dict[str, Any], str]: """Extract frontmatter metadata and content from file. Args: content: File content Returns: Tuple of (metadata dict, content without frontmatter) """ if content.startswith("---\n"): # Has frontmatter parts = content.split("---\n", 2) if len(parts) >= 3: try: metadata = yaml.safe_load(parts[1]) or {} return metadata, parts[2].strip() except yaml.YAMLError: pass # No frontmatter or invalid YAML return {}, content.strip()
[docs] def infer_recipient(self, content: str, metadata: Dict[str, Any]) -> str: """Determine recipient from explicit metadata only. Args: content: Message content (unused, for compatibility) metadata: Extracted metadata Returns: Recipient username (defaults to ai_user for inbox processing) """ # Check metadata first recipient = None if "to" in metadata: recipient = metadata["to"] elif "recipient" in metadata: recipient = metadata["recipient"] # Normalize recipient names to internal usernames if recipient: # Map display names to internal usernames if recipient.lower() == self.config.ai_agent_nickname.lower(): return self.config.ai_user elif recipient.lower() == self.config.human_user.lower(): return self.config.human_user # Return as-is if no mapping found return recipient # Inbox processing: always route to agent by default # This is predictable and follows Unix philosophy return self.config.ai_user
[docs] def extract_title( self, content: str, metadata: Dict[str, Any], filename: str ) -> str: """Extract or generate title for message. Args: content: Message content metadata: Extracted metadata filename: Original filename Returns: Title string """ # Check metadata first if "title" in metadata: return metadata["title"] # Look for markdown heading lines = content.split("\n") for line in lines[:5]: # Check first 5 lines if line.startswith("# "): return line[2:].strip() # Use filename without extension base_name = Path(filename).stem # Remove timestamp if present (YYYY-MM-DD-HHMMSS- prefix) if re.match(r"^\d{4}-\d{2}-\d{2}-\d{6}-", base_name): base_name = base_name[20:] # Convert underscores/hyphens to spaces and title case title = base_name.replace("_", " ").replace("-", " ").title() return title if title else "Untitled Message"
[docs] def process_file(self, filepath: Path, archive: bool = True) -> Optional[Path]: """Process a single file from inbox. Args: filepath: Path to file to process archive: Whether to archive the original file Returns: Path to created message file, or None if failed """ try: # Read file content content = filepath.read_text(encoding="utf-8") # Extract metadata and content metadata, clean_content = self.extract_metadata(content) # Determine recipient recipient = self.infer_recipient(clean_content, metadata) # Extract or generate title title = self.extract_title(clean_content, metadata, filepath.name) # Get sender (default to opposite of recipient) sender = metadata.get("from", metadata.get("sender")) if not sender: sender = ( self.config.human_user if recipient == self.config.ai_user else self.config.ai_user ) # Generate timestamp and filename timestamp = datetime.now() timestamp_str = timestamp.strftime("%Y-%m-%d-%H%M%S") # Clean title for filename clean_title = "".join( c for c in title if c.isalnum() or c in (" ", "-", "_") ).strip() clean_title = clean_title.replace(" ", "-").lower()[:50] # Limit length filename = ( f"{timestamp_str}-{clean_title}.md" if clean_title else f"{timestamp_str}-message.md" ) # Build final message content final_metadata = { "from": sender, "to": recipient, "timestamp": timestamp.isoformat(), } # Add title only if it's not already in the content as a heading if not clean_content.startswith(f"# {title}"): final_metadata["title"] = title # Any extra metadata from original for key, value in metadata.items(): if key not in [ "from", "to", "sender", "recipient", "title", "timestamp", ]: final_metadata[key] = value # Build final content yaml_str = yaml.dump( final_metadata, default_flow_style=False, sort_keys=False ) # Add title as heading if not present if not clean_content.startswith("#"): final_content = f"---\n{yaml_str}---\n\n# {title}\n\n{clean_content}" else: final_content = f"---\n{yaml_str}---\n\n{clean_content}" # Determine mailbox path mailbox_path = self.base_path / recipient / "mailbox" mailbox_path.mkdir(parents=True, exist_ok=True) # Write message file message_path = mailbox_path / filename message_path.write_text(final_content, encoding="utf-8") # Archive original if requested if archive: archive_dest = self.archive_path / f"{timestamp_str}-{filepath.name}" shutil.move(str(filepath), str(archive_dest)) return message_path except Exception as e: print(f"Error processing {filepath}: {e}") return None
[docs] def process_all(self, archive: bool = True) -> Dict[str, Any]: """Process all files in inbox. Args: archive: Whether to archive processed files Returns: Dict with processing results """ self.ensure_directories() files = self.scan_inbox() results: Dict[str, Any] = {"processed": [], "failed": [], "total": len(files)} for filepath in files: message_path = self.process_file(filepath, archive=archive) if message_path: results["processed"].append( { "original": filepath.name, "message": message_path.name, "recipient": message_path.parent.parent.name, } ) else: results["failed"].append(filepath.name) return results
[docs] def watch(self, callback=None, interval: int = 2) -> None: """Watch inbox for new files (future enhancement). Args: callback: Optional callback for each processed file interval: Check interval in seconds """ # This is a placeholder for future file watching functionality raise NotImplementedError("File watching not yet implemented")