"""
Core mailbox functionality for Continuity.
This module provides the main business logic for managing asynchronous
messages between humans and AI assistants. It handles message storage,
retrieval, and organization using various storage backends.
The module contains the core classes for message handling:
- :class:`Message`: Represents individual messages with metadata
- :class:`MailboxManager`: High-level mailbox operations
Example:
Basic usage::
from continuity.core.mailbox import MailboxManager, Message
from continuity.config.base import ContinuityConfig
config = ContinuityConfig.load()
manager = MailboxManager(config)
# Create and send a message
message = Message(
id="msg-001",
content="Hello AI assistant!",
timestamp=datetime.now(),
sender="human_user",
recipient="ai_user"
)
success = manager.send_message(message)
"""
import hashlib
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from ..config.base import ContinuityConfig
from .constants import CONTINUITY_DIR_NAME
[docs]
class Message:
"""Represents a message in the async mailbox system.
A Message encapsulates all the information needed for asynchronous
communication between humans and AI assistants. Messages support
rich metadata, read/unread status tracking, and serialization.
Args:
id: Unique identifier for the message
content: The main message content (supports Markdown)
timestamp: When the message was created
sender: Username of the message sender
recipient: Username of the message recipient
read: Whether the message has been read (defaults to False)
title: Optional message title (extracted from content if not provided)
metadata: Optional dictionary of additional metadata
Attributes:
id (str): Unique message identifier
content (str): Message content
timestamp (datetime): Creation timestamp
sender (str): Sender username
recipient (str): Recipient username
read (bool): Read status
title (str): Message title
metadata (Dict[str, Any]): Additional metadata
Example:
Creating a simple message::
message = Message(
id="msg-001",
content="Hello Ada! How are you?",
timestamp=datetime.now(),
sender="human",
recipient="agent"
)
Creating a message with metadata::
message = Message(
id="msg-002",
content="Please review the API design",
timestamp=datetime.now(),
sender="human_user",
recipient="ai_user",
title="API Review Request",
metadata={
"priority": "high",
"project": "continuity",
"tags": ["api", "review"]
}
)
"""
def __init__(
self,
id: str,
content: str,
timestamp: datetime,
sender: str,
recipient: str,
read: bool = False,
title: str = "",
metadata: Optional[Dict[str, Any]] = None,
):
self.id = id
self.content = content
self.timestamp = timestamp
self.sender = sender
self.recipient = recipient
self.read = read
self.title = title
self.metadata = metadata or {}
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Serialize message to dictionary format.
Converts the message object to a dictionary suitable for JSON
serialization or storage. The timestamp is converted to ISO format.
Returns:
Dictionary containing all message fields with JSON-serializable values.
Example:
>>> message = Message(id="test", content="Hello", ...)
>>> data = message.to_dict()
>>> print(data['timestamp'])
'2023-06-17T12:00:00'
"""
return {
"id": self.id,
"content": self.content,
"title": self.title,
"timestamp": self.timestamp.isoformat(),
"sender": self.sender,
"recipient": self.recipient,
"read": self.read,
"metadata": self.metadata,
}
[docs]
@classmethod
def from_file(cls, filepath: Path) -> "Message":
"""Load message from file with frontmatter parsing.
Reads a message from a markdown file with YAML frontmatter.
The frontmatter can contain metadata like sender, recipient, title, etc.
If no title is provided in frontmatter, it will be extracted from
the first heading in the content or derived from the filename.
Args:
filepath: Path to the message file to load
Returns:
Message object parsed from the file
Raises:
FileNotFoundError: If the specified file doesn't exist
PermissionError: If the file cannot be read
Example:
File structure::
---
from: human
to: agent
priority: high
---
# API Review Request
Please review the new API design...
Loading the message::
>>> message = Message.from_file(Path("api-review.md"))
>>> print(message.sender)
'michael'
>>> print(message.title)
'API Review Request'
"""
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Parse frontmatter if present
metadata = {}
sender = "unknown"
recipient = "unknown"
title = ""
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
frontmatter = parts[1].strip()
content = parts[2].strip()
# Parse YAML-like frontmatter
for line in frontmatter.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip()
value = value.strip()
if key == "from":
sender = value
elif key == "to":
recipient = value
elif key == "title":
title = value
else:
metadata[key] = value
# Extract title from content if not in frontmatter
if not title:
lines = content.split("\n")
for line in lines:
if line.startswith("# "):
title = line[2:].strip()
break
if not title:
title = filepath.stem.replace("_", " ").replace("-", " ").title()
return cls(
id=filepath.stem,
content=content,
title=title,
timestamp=datetime.fromtimestamp(filepath.stat().st_mtime),
sender=sender,
recipient=recipient,
read=(filepath.parent.name == "read"),
metadata=metadata,
)
[docs]
class MailboxManager:
"""Core mailbox functionality."""
def __init__(self, base_path: Path, config: ContinuityConfig):
self.config = config
self.base_path = base_path
self.base_path.mkdir(parents=True, exist_ok=True)
def _get_mailbox_directory(self, user: str) -> str:
"""Map user names to mailbox directories using agent/human structure."""
# Handle AI agent names consistently
if user in ["agent"] or user == self.config.ai_agent_nickname:
return "agent"
# Handle human user names consistently
elif user in ["human"] or user == self.config.human_user:
return "human"
else:
# Default fallback - assume it's the human user
return "human"
def _get_mailbox_path(self, user: str) -> Path:
"""Get the mailbox path for a user."""
mailbox_dir = self._get_mailbox_directory(user)
return self.base_path / mailbox_dir / "mailbox"
def _get_read_path(self, user: str) -> Path:
"""Get the read mailbox path for a user."""
mailbox_dir = self._get_mailbox_directory(user)
return self.base_path / mailbox_dir / "mailbox" / "read"
def _get_archive_path(self, user: str) -> Path:
"""Get the archive path for a user."""
mailbox_dir = self._get_mailbox_directory(user)
return self.base_path / mailbox_dir / "mailbox" / "archive"
[docs]
def check_mailbox(self, user: Optional[str] = None) -> List[Message]:
"""Check mailbox for all messages (unread, read, and archived)."""
if user is None:
user = self.config.ai_user
all_messages = []
all_messages.extend(self.get_unread_messages(user))
all_messages.extend(self.get_read_messages(user))
all_messages.extend(self.get_archived_messages(user))
return sorted(all_messages, key=lambda m: m.timestamp, reverse=True)
[docs]
def get_unread_messages(self, user: Optional[str] = None) -> List[Message]:
"""Get only unread messages."""
if user is None:
user = self.config.ai_agent_nickname
mailbox_path = self._get_mailbox_path(user)
if not mailbox_path.exists():
return []
messages = []
for msg_file in mailbox_path.glob("*.md"):
if msg_file.is_file():
try:
message = Message.from_file(msg_file)
messages.append(message)
except Exception as e:
# Skip problematic files but log the issue
print(f"Warning: Could not parse {msg_file}: {e}")
return sorted(messages, key=lambda m: m.timestamp, reverse=True)
[docs]
def get_read_messages(self, user: Optional[str] = None) -> List[Message]:
"""Get read messages from the read folder."""
if user is None:
user = self.config.ai_agent_nickname
read_path = self._get_read_path(user)
if not read_path.exists():
return []
messages = []
for msg_file in read_path.glob("*.md"):
if msg_file.is_file():
try:
message = Message.from_file(msg_file)
message.read = True # Ensure read status is set
messages.append(message)
except Exception as e:
print(f"Warning: Could not parse {msg_file}: {e}")
return sorted(messages, key=lambda m: m.timestamp, reverse=True)
[docs]
def get_message_by_id(self, user: str, message_id: str) -> Optional[Message]:
"""Get a specific message by ID from unread, read, and archive folders."""
# Check unread first
mailbox_path = self._get_mailbox_path(user)
msg_files = list(mailbox_path.glob(f"{message_id}*.md"))
if msg_files:
try:
return Message.from_file(msg_files[0])
except Exception as e:
print(f"Warning: Could not parse {msg_files[0]}: {e}")
# Check read folder
read_path = self._get_read_path(user)
if read_path.exists():
read_files = list(read_path.glob(f"{message_id}*.md"))
if read_files:
try:
message = Message.from_file(read_files[0])
message.read = True
return message
except Exception as e:
print(f"Warning: Could not parse {read_files[0]}: {e}")
# Check archive folder
archive_path = self._get_archive_path(user)
if archive_path.exists():
archive_files = list(archive_path.glob(f"{message_id}*.md"))
if archive_files:
try:
message = Message.from_file(archive_files[0])
message.read = True # Archived messages are considered read
return message
except Exception as e:
print(f"Warning: Could not parse {archive_files[0]}: {e}")
return None
[docs]
def send_message(
self,
content: str,
to: Optional[str] = None,
from_user: Optional[str] = None,
title: str = "",
) -> Message:
"""Send a message to a mailbox."""
if to is None:
to = self.config.human_user
if from_user is None:
from_user = self.config.ai_agent_nickname
mailbox_path = self._get_mailbox_path(to)
mailbox_path.mkdir(parents=True, exist_ok=True)
# Create message with hash-based unique ID
timestamp = datetime.now()
# Generate unique hash-based ID from content + timestamp + sender
content_hash_input = f"{content}{timestamp.isoformat()}{from_user}{to}"
content_hash = hashlib.sha256(content_hash_input.encode("utf-8")).hexdigest()[
:8
]
# Create readable ID: date + time + hash
readable_timestamp = timestamp.strftime("%Y-%m-%d-%H%M%S")
msg_id = f"{readable_timestamp}-{content_hash}"
# Create filename with title if provided
if title:
# Sanitize title for filename
safe_title = re.sub(r"[^\w\s-]", "", title.lower())
safe_title = re.sub(r"[-\s]+", "-", safe_title)
filename = f"{msg_id}-{safe_title}.md"
else:
filename = f"{msg_id}-message.md"
filepath = mailbox_path / filename
# Write message with metadata
with open(filepath, "w", encoding="utf-8") as f:
f.write("---\n")
f.write(f"from: {from_user}\n")
f.write(f"to: {to}\n")
f.write(f"timestamp: {timestamp.isoformat()}\n")
if title:
f.write(f"title: {title}\n")
f.write("---\n\n")
f.write(content)
# Return Message with the actual file stem as ID for consistency
return Message(filepath.stem, content, timestamp, from_user, to, title=title)
[docs]
def mark_read(self, user: str, message_id: str) -> bool:
"""Mark a message as read by moving it."""
mailbox_path = self._get_mailbox_path(user)
read_path = self._get_read_path(user)
read_path.mkdir(parents=True, exist_ok=True)
# Find the message file (might have additional suffix)
msg_files = list(mailbox_path.glob(f"{message_id}*.md"))
if not msg_files:
return False
msg_file = msg_files[0] # Take first match
if msg_file.exists():
msg_file.rename(read_path / msg_file.name)
return True
return False
[docs]
def mark_unread(self, user: str, message_id: str) -> bool:
"""Mark a message as unread by moving it back from read folder."""
mailbox_path = self._get_mailbox_path(user)
read_path = self._get_read_path(user)
# Find the message file in read folder
msg_files = list(read_path.glob(f"{message_id}*.md"))
if not msg_files:
return False
msg_file = msg_files[0] # Take first match
if msg_file.exists():
mailbox_path.mkdir(parents=True, exist_ok=True)
msg_file.rename(mailbox_path / msg_file.name)
return True
return False
[docs]
def delete_message(self, user: str, message_id: str) -> bool:
"""Delete a message permanently."""
mailbox_path = self._get_mailbox_path(user)
read_path = self._get_read_path(user)
# Try to find the message in both unread and read folders
msg_files = list(mailbox_path.glob(f"{message_id}*.md"))
if not msg_files and read_path.exists():
msg_files = list(read_path.glob(f"{message_id}*.md"))
if not msg_files:
return False
msg_file = msg_files[0] # Take first match
if msg_file.exists():
msg_file.unlink() # Delete the file
return True
return False
[docs]
def archive_message(
self, user: str, message_id: str, from_read: bool = False
) -> bool:
"""Archive a message by moving it to archive folder."""
source_path = (
self._get_read_path(user) if from_read else self._get_mailbox_path(user)
)
archive_path = self._get_archive_path(user)
archive_path.mkdir(parents=True, exist_ok=True)
msg_files = list(source_path.glob(f"{message_id}*.md"))
if not msg_files:
return False
msg_file = msg_files[0]
if msg_file.exists():
msg_file.rename(archive_path / msg_file.name)
return True
return False
[docs]
def get_archived_messages(self, user: str) -> List[Message]:
"""Get all archived messages for a user."""
archive_path = self._get_archive_path(user)
if not archive_path.exists():
return []
messages = []
for msg_file in sorted(archive_path.glob("*.md"), reverse=True):
if msg_file.is_file():
try:
message = Message.from_file(msg_file)
messages.append(message)
except Exception as e:
# Skip problematic files but log the issue
print(f"Warning: Could not parse {msg_file}: {e}")
return messages
[docs]
def unarchive_message(
self, user: str, message_id: str, to_unread: bool = False
) -> bool:
"""Restore a message from archive."""
archive_path = self._get_archive_path(user)
target_path = (
self._get_mailbox_path(user) if to_unread else self._get_read_path(user)
)
target_path.mkdir(parents=True, exist_ok=True)
msg_files = list(archive_path.glob(f"{message_id}*.md"))
if not msg_files:
return False
msg_file = msg_files[0]
if msg_file.exists():
msg_file.rename(target_path / msg_file.name)
return True
return False
[docs]
class ContinuityCore:
"""Core Continuity functionality."""
def __init__(self, base_path: Path, config: ContinuityConfig):
self.config = config
self.base_path = base_path
# Only work with proper .continuity projects
project_continuity = base_path / CONTINUITY_DIR_NAME
if not project_continuity.exists():
from ..cli.commands import UserError
raise UserError(
f"No continuity project found at {base_path}",
"Run 'continuity init' to initialize a project first",
)
self.mailbox = MailboxManager(project_continuity, config=self.config)
self.storage_path = self.config.get_storage_path()
# Don't auto-create storage path - let individual operations handle it
# Hello world functionality
self.hello_history = self.storage_path / "hello_history.jsonl"
# Focus mode state
self.focus_file = self.storage_path / "focus_state.json"
self._focus_state = self._load_focus_state()
[docs]
def say_hello(self, name: str = "World") -> Dict[str, Any]:
"""Hello world with persistence."""
timestamp = datetime.now()
entry = {
"timestamp": timestamp.isoformat(),
"name": name,
"message": f"Hello, {name}!",
}
# Ensure storage directory exists
self.storage_path.mkdir(parents=True, exist_ok=True)
# Append to history
with open(self.hello_history, "a") as f:
f.write(json.dumps(entry) + "\n")
return entry
[docs]
def get_hello_history(self) -> List[Dict[str, Any]]:
"""Get hello history."""
if not self.hello_history.exists():
return []
history = []
with open(self.hello_history) as f:
for line in f:
if line.strip():
try:
history.append(json.loads(line.strip()))
except json.JSONDecodeError:
continue
return history
[docs]
def get_status(
self, max_messages: int = 5, timeout_seconds: int = 10
) -> Dict[str, Any]:
"""Get overall continuity status with safety limits.
Args:
max_messages: Maximum recent messages to include
timeout_seconds: Maximum time to spend gathering status
"""
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Status check timed out")
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
unread_agent = self.mailbox.get_unread_messages(
self.config.ai_agent_nickname
)[:max_messages]
unread_human = self.mailbox.get_unread_messages(self.config.human_user)[
:max_messages
]
# Get archived counts
archived_agent = self.mailbox.get_archived_messages(
self.config.ai_agent_nickname
)
archived_human = self.mailbox.get_archived_messages(self.config.human_user)
# Only include summary info for messages to prevent memory issues
recent_messages = []
for msg in (unread_agent + unread_human)[:max_messages]:
try:
msg_dict = msg.to_dict()
# Truncate content for status display
if len(msg_dict.get("content", "")) > 200:
msg_dict["content"] = msg_dict["content"][:200] + "..."
recent_messages.append(msg_dict)
except Exception:
# Skip problematic messages
continue
# Determine effective storage path for display
project_continuity = self.base_path / CONTINUITY_DIR_NAME
effective_storage = (
project_continuity if project_continuity.exists() else self.base_path
)
return {
"mailbox": {
"ai_unread": len(unread_agent),
"human_unread": len(unread_human),
"ai_archived": len(archived_agent),
"human_archived": len(archived_human),
"recent_messages": recent_messages,
},
"storage_path": str(self.storage_path),
"base_path": str(effective_storage),
}
except TimeoutError:
# Determine effective storage path for display
project_continuity = self.base_path / CONTINUITY_DIR_NAME
effective_storage = (
project_continuity if project_continuity.exists() else self.base_path
)
return {
"error": "Status check timed out",
"mailbox": {
"ai_unread": 0,
"human_unread": 0,
"ai_archived": 0,
"human_archived": 0,
"recent_messages": [],
},
"storage_path": str(self.storage_path),
"base_path": str(effective_storage),
}
finally:
signal.alarm(0) # Clear the alarm
[docs]
def prime_context(self, project_path: Optional[Path] = None) -> Dict[str, Any]:
"""Prime context with project information and communication updates."""
result: Dict[str, Any] = {
"files_read": [],
"messages_processed": 0,
"summary": [],
}
# Load project documentation
if project_path:
readme_path = project_path / "README.md"
ai_instructions_path = project_path / "CLAUDE.md"
if readme_path.exists():
result["files_read"].append(str(readme_path))
result["summary"].append(
f"Loaded project README from {project_path.name}"
)
if ai_instructions_path.exists():
result["files_read"].append(str(ai_instructions_path))
result["summary"].append(
f"Loaded project instructions from {project_path.name}"
)
# Check mailbox
unread_messages = self.mailbox.get_unread_messages(self.config.ai_user)
result["messages_processed"] = len(unread_messages)
if unread_messages:
result["summary"].append(f"Found {len(unread_messages)} unread messages")
return result
def _load_focus_state(self) -> Optional[Dict[str, Any]]:
"""Load focus state from file."""
if not self.focus_file.exists():
return None
try:
with open(self.focus_file) as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return None
def _save_focus_state(self) -> None:
"""Save focus state to file."""
if self._focus_state:
# Ensure storage directory exists
self.storage_path.mkdir(parents=True, exist_ok=True)
with open(self.focus_file, "w") as f:
json.dump(self._focus_state, f, indent=2)
elif self.focus_file.exists():
self.focus_file.unlink()
[docs]
def mark_message_read(self, message_id: str, user: str) -> bool:
"""Mark a message as read."""
return self.mailbox.mark_read(user, message_id)