"""
Inbox functionality for Continuity.
Provides automatic processing of files dropped into the inbox directory,
adding metadata and moving them to the appropriate mailbox.
"""
import re
import shutil
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
from ..config.base import ContinuityConfig
[docs]
class InboxProcessor:
"""Processes files dropped into the inbox directory."""
def __init__(self, base_path: Path, config: ContinuityConfig):
"""Initialize inbox processor.
Args:
base_path: Base directory containing .continuity
config: Configuration instance
"""
self.config = config
self.base_path = base_path
self.inbox_path = base_path / "inbox"
self.archive_path = self.inbox_path / "archive"
[docs]
def ensure_directories(self) -> None:
"""Create inbox directories if they don't exist."""
self.inbox_path.mkdir(parents=True, exist_ok=True)
self.archive_path.mkdir(parents=True, exist_ok=True)
[docs]
def scan_inbox(self) -> List[Path]:
"""Get list of files in inbox to process.
Returns:
List of file paths to process
"""
if not self.inbox_path.exists():
return []
files = []
for item in self.inbox_path.iterdir():
# Skip directories and hidden files
if (
item.is_file()
and not item.name.startswith(".")
and item.parent != self.archive_path
):
files.append(item)
return sorted(files) # Process in alphabetical order
[docs]
def infer_recipient(self, content: str, metadata: Dict[str, Any]) -> str:
"""Determine recipient from explicit metadata only.
Args:
content: Message content (unused, for compatibility)
metadata: Extracted metadata
Returns:
Recipient username (defaults to ai_user for inbox processing)
"""
# Check metadata first
recipient = None
if "to" in metadata:
recipient = metadata["to"]
elif "recipient" in metadata:
recipient = metadata["recipient"]
# Normalize recipient names to internal usernames
if recipient:
# Map display names to internal usernames
if recipient.lower() == self.config.ai_agent_nickname.lower():
return self.config.ai_user
elif recipient.lower() == self.config.human_user.lower():
return self.config.human_user
# Return as-is if no mapping found
return recipient
# Inbox processing: always route to agent by default
# This is predictable and follows Unix philosophy
return self.config.ai_user
[docs]
def process_file(self, filepath: Path, archive: bool = True) -> Optional[Path]:
"""Process a single file from inbox.
Args:
filepath: Path to file to process
archive: Whether to archive the original file
Returns:
Path to created message file, or None if failed
"""
try:
# Read file content
content = filepath.read_text(encoding="utf-8")
# Extract metadata and content
metadata, clean_content = self.extract_metadata(content)
# Determine recipient
recipient = self.infer_recipient(clean_content, metadata)
# Extract or generate title
title = self.extract_title(clean_content, metadata, filepath.name)
# Get sender (default to opposite of recipient)
sender = metadata.get("from", metadata.get("sender"))
if not sender:
sender = (
self.config.human_user
if recipient == self.config.ai_user
else self.config.ai_user
)
# Generate timestamp and filename
timestamp = datetime.now()
timestamp_str = timestamp.strftime("%Y-%m-%d-%H%M%S")
# Clean title for filename
clean_title = "".join(
c for c in title if c.isalnum() or c in (" ", "-", "_")
).strip()
clean_title = clean_title.replace(" ", "-").lower()[:50] # Limit length
filename = (
f"{timestamp_str}-{clean_title}.md"
if clean_title
else f"{timestamp_str}-message.md"
)
# Build final message content
final_metadata = {
"from": sender,
"to": recipient,
"timestamp": timestamp.isoformat(),
}
# Add title only if it's not already in the content as a heading
if not clean_content.startswith(f"# {title}"):
final_metadata["title"] = title
# Any extra metadata from original
for key, value in metadata.items():
if key not in [
"from",
"to",
"sender",
"recipient",
"title",
"timestamp",
]:
final_metadata[key] = value
# Build final content
yaml_str = yaml.dump(
final_metadata, default_flow_style=False, sort_keys=False
)
# Add title as heading if not present
if not clean_content.startswith("#"):
final_content = f"---\n{yaml_str}---\n\n# {title}\n\n{clean_content}"
else:
final_content = f"---\n{yaml_str}---\n\n{clean_content}"
# Determine mailbox path
mailbox_path = self.base_path / recipient / "mailbox"
mailbox_path.mkdir(parents=True, exist_ok=True)
# Write message file
message_path = mailbox_path / filename
message_path.write_text(final_content, encoding="utf-8")
# Archive original if requested
if archive:
archive_dest = self.archive_path / f"{timestamp_str}-{filepath.name}"
shutil.move(str(filepath), str(archive_dest))
return message_path
except Exception as e:
print(f"Error processing {filepath}: {e}")
return None
[docs]
def process_all(self, archive: bool = True) -> Dict[str, Any]:
"""Process all files in inbox.
Args:
archive: Whether to archive processed files
Returns:
Dict with processing results
"""
self.ensure_directories()
files = self.scan_inbox()
results: Dict[str, Any] = {"processed": [], "failed": [], "total": len(files)}
for filepath in files:
message_path = self.process_file(filepath, archive=archive)
if message_path:
results["processed"].append(
{
"original": filepath.name,
"message": message_path.name,
"recipient": message_path.parent.parent.name,
}
)
else:
results["failed"].append(filepath.name)
return results
[docs]
def watch(self, callback=None, interval: int = 2) -> None:
"""Watch inbox for new files (future enhancement).
Args:
callback: Optional callback for each processed file
interval: Check interval in seconds
"""
# This is a placeholder for future file watching functionality
raise NotImplementedError("File watching not yet implemented")