Source code for continuity.core.mailbox

"""
Core mailbox functionality for Continuity.

This module provides the main business logic for managing asynchronous
messages between humans and AI assistants. It handles message storage,
retrieval, and organization using various storage backends.

The module contains the core classes for message handling:
- :class:`Message`: Represents individual messages with metadata
- :class:`MailboxManager`: High-level mailbox operations

Example:
    Basic usage::

        from continuity.core.mailbox import MailboxManager, Message
        from continuity.config.base import ContinuityConfig

        config = ContinuityConfig.load()
        manager = MailboxManager(config)

        # Create and send a message
        message = Message(
            id="msg-001",
            content="Hello AI assistant!",
            timestamp=datetime.now(),
            sender="human_user",
            recipient="ai_user"
        )

        success = manager.send_message(message)
"""

import hashlib
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

from ..config.base import ContinuityConfig
from .constants import CONTINUITY_DIR_NAME


[docs] class Message: """Represents a message in the async mailbox system. A Message encapsulates all the information needed for asynchronous communication between humans and AI assistants. Messages support rich metadata, read/unread status tracking, and serialization. Args: id: Unique identifier for the message content: The main message content (supports Markdown) timestamp: When the message was created sender: Username of the message sender recipient: Username of the message recipient read: Whether the message has been read (defaults to False) title: Optional message title (extracted from content if not provided) metadata: Optional dictionary of additional metadata Attributes: id (str): Unique message identifier content (str): Message content timestamp (datetime): Creation timestamp sender (str): Sender username recipient (str): Recipient username read (bool): Read status title (str): Message title metadata (Dict[str, Any]): Additional metadata Example: Creating a simple message:: message = Message( id="msg-001", content="Hello Ada! How are you?", timestamp=datetime.now(), sender="human", recipient="agent" ) Creating a message with metadata:: message = Message( id="msg-002", content="Please review the API design", timestamp=datetime.now(), sender="human_user", recipient="ai_user", title="API Review Request", metadata={ "priority": "high", "project": "continuity", "tags": ["api", "review"] } ) """ def __init__( self, id: str, content: str, timestamp: datetime, sender: str, recipient: str, read: bool = False, title: str = "", metadata: Optional[Dict[str, Any]] = None, ): self.id = id self.content = content self.timestamp = timestamp self.sender = sender self.recipient = recipient self.read = read self.title = title self.metadata = metadata or {}
[docs] def to_dict(self) -> Dict[str, Any]: """Serialize message to dictionary format. Converts the message object to a dictionary suitable for JSON serialization or storage. The timestamp is converted to ISO format. Returns: Dictionary containing all message fields with JSON-serializable values. Example: >>> message = Message(id="test", content="Hello", ...) >>> data = message.to_dict() >>> print(data['timestamp']) '2023-06-17T12:00:00' """ return { "id": self.id, "content": self.content, "title": self.title, "timestamp": self.timestamp.isoformat(), "sender": self.sender, "recipient": self.recipient, "read": self.read, "metadata": self.metadata, }
[docs] @classmethod def from_file(cls, filepath: Path) -> "Message": """Load message from file with frontmatter parsing. Reads a message from a markdown file with YAML frontmatter. The frontmatter can contain metadata like sender, recipient, title, etc. If no title is provided in frontmatter, it will be extracted from the first heading in the content or derived from the filename. Args: filepath: Path to the message file to load Returns: Message object parsed from the file Raises: FileNotFoundError: If the specified file doesn't exist PermissionError: If the file cannot be read Example: File structure:: --- from: human to: agent priority: high --- # API Review Request Please review the new API design... Loading the message:: >>> message = Message.from_file(Path("api-review.md")) >>> print(message.sender) 'michael' >>> print(message.title) 'API Review Request' """ with open(filepath, encoding="utf-8") as f: content = f.read() # Parse frontmatter if present metadata = {} sender = "unknown" recipient = "unknown" title = "" if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: frontmatter = parts[1].strip() content = parts[2].strip() # Parse YAML-like frontmatter for line in frontmatter.split("\n"): if ":" in line: key, value = line.split(":", 1) key = key.strip() value = value.strip() if key == "from": sender = value elif key == "to": recipient = value elif key == "title": title = value else: metadata[key] = value # Extract title from content if not in frontmatter if not title: lines = content.split("\n") for line in lines: if line.startswith("# "): title = line[2:].strip() break if not title: title = filepath.stem.replace("_", " ").replace("-", " ").title() return cls( id=filepath.stem, content=content, title=title, timestamp=datetime.fromtimestamp(filepath.stat().st_mtime), sender=sender, recipient=recipient, read=(filepath.parent.name == "read"), metadata=metadata, )
[docs] class MailboxManager: """Core mailbox functionality.""" def __init__(self, base_path: Path, config: ContinuityConfig): self.config = config self.base_path = base_path self.base_path.mkdir(parents=True, exist_ok=True) def _get_mailbox_directory(self, user: str) -> str: """Map user names to mailbox directories using agent/human structure.""" # Handle AI agent names consistently if user in ["agent"] or user == self.config.ai_agent_nickname: return "agent" # Handle human user names consistently elif user in ["human"] or user == self.config.human_user: return "human" else: # Default fallback - assume it's the human user return "human" def _get_mailbox_path(self, user: str) -> Path: """Get the mailbox path for a user.""" mailbox_dir = self._get_mailbox_directory(user) return self.base_path / mailbox_dir / "mailbox" def _get_read_path(self, user: str) -> Path: """Get the read mailbox path for a user.""" mailbox_dir = self._get_mailbox_directory(user) return self.base_path / mailbox_dir / "mailbox" / "read" def _get_archive_path(self, user: str) -> Path: """Get the archive path for a user.""" mailbox_dir = self._get_mailbox_directory(user) return self.base_path / mailbox_dir / "mailbox" / "archive"
[docs] def check_mailbox(self, user: Optional[str] = None) -> List[Message]: """Check mailbox for all messages (unread, read, and archived).""" if user is None: user = self.config.ai_user all_messages = [] all_messages.extend(self.get_unread_messages(user)) all_messages.extend(self.get_read_messages(user)) all_messages.extend(self.get_archived_messages(user)) return sorted(all_messages, key=lambda m: m.timestamp, reverse=True)
[docs] def get_unread_messages(self, user: Optional[str] = None) -> List[Message]: """Get only unread messages.""" if user is None: user = self.config.ai_agent_nickname mailbox_path = self._get_mailbox_path(user) if not mailbox_path.exists(): return [] messages = [] for msg_file in mailbox_path.glob("*.md"): if msg_file.is_file(): try: message = Message.from_file(msg_file) messages.append(message) except Exception as e: # Skip problematic files but log the issue print(f"Warning: Could not parse {msg_file}: {e}") return sorted(messages, key=lambda m: m.timestamp, reverse=True)
[docs] def get_read_messages(self, user: Optional[str] = None) -> List[Message]: """Get read messages from the read folder.""" if user is None: user = self.config.ai_agent_nickname read_path = self._get_read_path(user) if not read_path.exists(): return [] messages = [] for msg_file in read_path.glob("*.md"): if msg_file.is_file(): try: message = Message.from_file(msg_file) message.read = True # Ensure read status is set messages.append(message) except Exception as e: print(f"Warning: Could not parse {msg_file}: {e}") return sorted(messages, key=lambda m: m.timestamp, reverse=True)
[docs] def get_message_by_id(self, user: str, message_id: str) -> Optional[Message]: """Get a specific message by ID from unread, read, and archive folders.""" # Check unread first mailbox_path = self._get_mailbox_path(user) msg_files = list(mailbox_path.glob(f"{message_id}*.md")) if msg_files: try: return Message.from_file(msg_files[0]) except Exception as e: print(f"Warning: Could not parse {msg_files[0]}: {e}") # Check read folder read_path = self._get_read_path(user) if read_path.exists(): read_files = list(read_path.glob(f"{message_id}*.md")) if read_files: try: message = Message.from_file(read_files[0]) message.read = True return message except Exception as e: print(f"Warning: Could not parse {read_files[0]}: {e}") # Check archive folder archive_path = self._get_archive_path(user) if archive_path.exists(): archive_files = list(archive_path.glob(f"{message_id}*.md")) if archive_files: try: message = Message.from_file(archive_files[0]) message.read = True # Archived messages are considered read return message except Exception as e: print(f"Warning: Could not parse {archive_files[0]}: {e}") return None
[docs] def send_message( self, content: str, to: Optional[str] = None, from_user: Optional[str] = None, title: str = "", ) -> Message: """Send a message to a mailbox.""" if to is None: to = self.config.human_user if from_user is None: from_user = self.config.ai_agent_nickname mailbox_path = self._get_mailbox_path(to) mailbox_path.mkdir(parents=True, exist_ok=True) # Create message with hash-based unique ID timestamp = datetime.now() # Generate unique hash-based ID from content + timestamp + sender content_hash_input = f"{content}{timestamp.isoformat()}{from_user}{to}" content_hash = hashlib.sha256(content_hash_input.encode("utf-8")).hexdigest()[ :8 ] # Create readable ID: date + time + hash readable_timestamp = timestamp.strftime("%Y-%m-%d-%H%M%S") msg_id = f"{readable_timestamp}-{content_hash}" # Create filename with title if provided if title: # Sanitize title for filename safe_title = re.sub(r"[^\w\s-]", "", title.lower()) safe_title = re.sub(r"[-\s]+", "-", safe_title) filename = f"{msg_id}-{safe_title}.md" else: filename = f"{msg_id}-message.md" filepath = mailbox_path / filename # Write message with metadata with open(filepath, "w", encoding="utf-8") as f: f.write("---\n") f.write(f"from: {from_user}\n") f.write(f"to: {to}\n") f.write(f"timestamp: {timestamp.isoformat()}\n") if title: f.write(f"title: {title}\n") f.write("---\n\n") f.write(content) # Return Message with the actual file stem as ID for consistency return Message(filepath.stem, content, timestamp, from_user, to, title=title)
[docs] def mark_read(self, user: str, message_id: str) -> bool: """Mark a message as read by moving it.""" mailbox_path = self._get_mailbox_path(user) read_path = self._get_read_path(user) read_path.mkdir(parents=True, exist_ok=True) # Find the message file (might have additional suffix) msg_files = list(mailbox_path.glob(f"{message_id}*.md")) if not msg_files: return False msg_file = msg_files[0] # Take first match if msg_file.exists(): msg_file.rename(read_path / msg_file.name) return True return False
[docs] def mark_unread(self, user: str, message_id: str) -> bool: """Mark a message as unread by moving it back from read folder.""" mailbox_path = self._get_mailbox_path(user) read_path = self._get_read_path(user) # Find the message file in read folder msg_files = list(read_path.glob(f"{message_id}*.md")) if not msg_files: return False msg_file = msg_files[0] # Take first match if msg_file.exists(): mailbox_path.mkdir(parents=True, exist_ok=True) msg_file.rename(mailbox_path / msg_file.name) return True return False
[docs] def delete_message(self, user: str, message_id: str) -> bool: """Delete a message permanently.""" mailbox_path = self._get_mailbox_path(user) read_path = self._get_read_path(user) # Try to find the message in both unread and read folders msg_files = list(mailbox_path.glob(f"{message_id}*.md")) if not msg_files and read_path.exists(): msg_files = list(read_path.glob(f"{message_id}*.md")) if not msg_files: return False msg_file = msg_files[0] # Take first match if msg_file.exists(): msg_file.unlink() # Delete the file return True return False
[docs] def archive_message( self, user: str, message_id: str, from_read: bool = False ) -> bool: """Archive a message by moving it to archive folder.""" source_path = ( self._get_read_path(user) if from_read else self._get_mailbox_path(user) ) archive_path = self._get_archive_path(user) archive_path.mkdir(parents=True, exist_ok=True) msg_files = list(source_path.glob(f"{message_id}*.md")) if not msg_files: return False msg_file = msg_files[0] if msg_file.exists(): msg_file.rename(archive_path / msg_file.name) return True return False
[docs] def get_archived_messages(self, user: str) -> List[Message]: """Get all archived messages for a user.""" archive_path = self._get_archive_path(user) if not archive_path.exists(): return [] messages = [] for msg_file in sorted(archive_path.glob("*.md"), reverse=True): if msg_file.is_file(): try: message = Message.from_file(msg_file) messages.append(message) except Exception as e: # Skip problematic files but log the issue print(f"Warning: Could not parse {msg_file}: {e}") return messages
[docs] def unarchive_message( self, user: str, message_id: str, to_unread: bool = False ) -> bool: """Restore a message from archive.""" archive_path = self._get_archive_path(user) target_path = ( self._get_mailbox_path(user) if to_unread else self._get_read_path(user) ) target_path.mkdir(parents=True, exist_ok=True) msg_files = list(archive_path.glob(f"{message_id}*.md")) if not msg_files: return False msg_file = msg_files[0] if msg_file.exists(): msg_file.rename(target_path / msg_file.name) return True return False
[docs] class ContinuityCore: """Core Continuity functionality.""" def __init__(self, base_path: Path, config: ContinuityConfig): self.config = config self.base_path = base_path # Only work with proper .continuity projects project_continuity = base_path / CONTINUITY_DIR_NAME if not project_continuity.exists(): from ..cli.commands import UserError raise UserError( f"No continuity project found at {base_path}", "Run 'continuity init' to initialize a project first", ) self.mailbox = MailboxManager(project_continuity, config=self.config) self.storage_path = self.config.get_storage_path() # Don't auto-create storage path - let individual operations handle it # Hello world functionality self.hello_history = self.storage_path / "hello_history.jsonl" # Focus mode state self.focus_file = self.storage_path / "focus_state.json" self._focus_state = self._load_focus_state()
[docs] def say_hello(self, name: str = "World") -> Dict[str, Any]: """Hello world with persistence.""" timestamp = datetime.now() entry = { "timestamp": timestamp.isoformat(), "name": name, "message": f"Hello, {name}!", } # Ensure storage directory exists self.storage_path.mkdir(parents=True, exist_ok=True) # Append to history with open(self.hello_history, "a") as f: f.write(json.dumps(entry) + "\n") return entry
[docs] def get_hello_history(self) -> List[Dict[str, Any]]: """Get hello history.""" if not self.hello_history.exists(): return [] history = [] with open(self.hello_history) as f: for line in f: if line.strip(): try: history.append(json.loads(line.strip())) except json.JSONDecodeError: continue return history
[docs] def get_status( self, max_messages: int = 5, timeout_seconds: int = 10 ) -> Dict[str, Any]: """Get overall continuity status with safety limits. Args: max_messages: Maximum recent messages to include timeout_seconds: Maximum time to spend gathering status """ import signal def timeout_handler(signum, frame): raise TimeoutError("Status check timed out") # Set timeout signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(timeout_seconds) try: unread_agent = self.mailbox.get_unread_messages( self.config.ai_agent_nickname )[:max_messages] unread_human = self.mailbox.get_unread_messages(self.config.human_user)[ :max_messages ] # Get archived counts archived_agent = self.mailbox.get_archived_messages( self.config.ai_agent_nickname ) archived_human = self.mailbox.get_archived_messages(self.config.human_user) # Only include summary info for messages to prevent memory issues recent_messages = [] for msg in (unread_agent + unread_human)[:max_messages]: try: msg_dict = msg.to_dict() # Truncate content for status display if len(msg_dict.get("content", "")) > 200: msg_dict["content"] = msg_dict["content"][:200] + "..." recent_messages.append(msg_dict) except Exception: # Skip problematic messages continue # Determine effective storage path for display project_continuity = self.base_path / CONTINUITY_DIR_NAME effective_storage = ( project_continuity if project_continuity.exists() else self.base_path ) return { "mailbox": { "ai_unread": len(unread_agent), "human_unread": len(unread_human), "ai_archived": len(archived_agent), "human_archived": len(archived_human), "recent_messages": recent_messages, }, "storage_path": str(self.storage_path), "base_path": str(effective_storage), } except TimeoutError: # Determine effective storage path for display project_continuity = self.base_path / CONTINUITY_DIR_NAME effective_storage = ( project_continuity if project_continuity.exists() else self.base_path ) return { "error": "Status check timed out", "mailbox": { "ai_unread": 0, "human_unread": 0, "ai_archived": 0, "human_archived": 0, "recent_messages": [], }, "storage_path": str(self.storage_path), "base_path": str(effective_storage), } finally: signal.alarm(0) # Clear the alarm
[docs] def prime_context(self, project_path: Optional[Path] = None) -> Dict[str, Any]: """Prime context with project information and communication updates.""" result: Dict[str, Any] = { "files_read": [], "messages_processed": 0, "summary": [], } # Load project documentation if project_path: readme_path = project_path / "README.md" ai_instructions_path = project_path / "CLAUDE.md" if readme_path.exists(): result["files_read"].append(str(readme_path)) result["summary"].append( f"Loaded project README from {project_path.name}" ) if ai_instructions_path.exists(): result["files_read"].append(str(ai_instructions_path)) result["summary"].append( f"Loaded project instructions from {project_path.name}" ) # Check mailbox unread_messages = self.mailbox.get_unread_messages(self.config.ai_user) result["messages_processed"] = len(unread_messages) if unread_messages: result["summary"].append(f"Found {len(unread_messages)} unread messages") return result
def _load_focus_state(self) -> Optional[Dict[str, Any]]: """Load focus state from file.""" if not self.focus_file.exists(): return None try: with open(self.focus_file) as f: return json.load(f) except (OSError, json.JSONDecodeError): return None def _save_focus_state(self) -> None: """Save focus state to file.""" if self._focus_state: # Ensure storage directory exists self.storage_path.mkdir(parents=True, exist_ok=True) with open(self.focus_file, "w") as f: json.dump(self._focus_state, f, indent=2) elif self.focus_file.exists(): self.focus_file.unlink()
[docs] def mark_message_read(self, message_id: str, user: str) -> bool: """Mark a message as read.""" return self.mailbox.mark_read(user, message_id)