"""
Notification system for tracking new activity across threads and mailboxes.
This module implements the pull notification system for Continuity,
providing gentle awareness of new activity without interruption.
"""
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from ..config.base import ContinuityConfig
from .constants import CONTINUITY_DIR_NAME, NOTIFICATION_STATE_FILE
[docs]
class NotificationState:
"""Manages notification state for tracking new activity."""
def __init__(self, project_path: Path, config: Optional[ContinuityConfig] = None):
self.project_path = project_path
self.config = config or ContinuityConfig()
self.state_file = project_path / CONTINUITY_DIR_NAME / NOTIFICATION_STATE_FILE
self.state_file.parent.mkdir(parents=True, exist_ok=True)
self._state = self._load_state()
def _load_state(self) -> Dict[str, Any]:
"""Load notification state from file."""
if self.state_file.exists():
try:
with open(self.state_file) as f:
return yaml.safe_load(f) or {}
except Exception:
pass
# Default state
return {
"last_check": datetime.now().isoformat(),
"seen_posts": {},
"user_preferences": {
"notification_style": "gentle",
"auto_mark_seen": False,
"show_timestamps": True,
},
"project_name": self.project_path.name,
}
def _save_state(self) -> None:
"""Save notification state to file."""
try:
with open(self.state_file, "w") as f:
yaml.dump(self._state, f, default_flow_style=False)
except Exception:
pass # Fail silently for now
[docs]
def get_last_check(self) -> datetime:
"""Get last check timestamp."""
try:
return datetime.fromisoformat(self._state["last_check"])
except (KeyError, ValueError):
return datetime.now()
[docs]
def mark_checked(self) -> None:
"""Mark current time as last check."""
self._state["last_check"] = datetime.now().isoformat()
self._save_state()
[docs]
def get_seen_posts(self, thread_name: str) -> int:
"""Get number of seen posts for a thread."""
return self._state.get("seen_posts", {}).get(thread_name, 0)
[docs]
def mark_thread_seen(self, thread_name: str, post_count: int) -> None:
"""Mark a thread as seen up to a certain post count."""
if "seen_posts" not in self._state:
self._state["seen_posts"] = {}
self._state["seen_posts"][thread_name] = post_count
self._save_state()
[docs]
def get_notification_style(self) -> str:
"""Get user's notification style preference."""
return self._state.get("user_preferences", {}).get(
"notification_style", "gentle"
)
[docs]
class ActivityDetector:
"""Detects new activity in threads and mailboxes."""
def __init__(self, notification_state: NotificationState):
self.state = notification_state
[docs]
def get_thread_activity(self, threads_path: Path) -> List[Dict[str, Any]]:
"""Get threads with new activity since last check."""
new_activity: List[Dict[str, Any]] = []
if not threads_path.exists():
return new_activity
for thread_dir in threads_path.iterdir():
if not thread_dir.is_dir() or thread_dir.name.startswith("."):
continue
# Count posts in thread
post_files = [
f
for f in thread_dir.iterdir()
if f.is_file() and f.name.startswith("p") and f.suffix == ".md"
]
current_post_count = len(post_files)
# Check if there are new posts
seen_count = self.state.get_seen_posts(thread_dir.name)
new_posts = current_post_count - seen_count
if new_posts > 0:
# Get the latest post timestamp
latest_timestamp = None
if post_files:
latest_file = max(post_files, key=lambda f: f.stat().st_mtime)
latest_timestamp = datetime.fromtimestamp(
latest_file.stat().st_mtime
)
new_activity.append(
{
"thread_name": thread_dir.name,
"new_posts": new_posts,
"total_posts": current_post_count,
"latest_timestamp": latest_timestamp,
}
)
return new_activity
[docs]
def get_mailbox_activity(self, mailbox_path: Path, user: str) -> int:
"""Get unread message count for a user."""
unread_count = 0
user_mailbox = mailbox_path / user / "mailbox"
if user_mailbox.exists():
for msg_file in user_mailbox.iterdir():
if msg_file.is_file() and msg_file.suffix == ".md":
unread_count += 1
return unread_count