Source code for continuity.core.notifications

"""
Notification system for tracking new activity across threads and mailboxes.

This module implements the pull notification system for Continuity,
providing gentle awareness of new activity without interruption.
"""

from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

import yaml

from ..config.base import ContinuityConfig
from .constants import CONTINUITY_DIR_NAME, NOTIFICATION_STATE_FILE


[docs] class NotificationState: """Manages notification state for tracking new activity.""" def __init__(self, project_path: Path, config: Optional[ContinuityConfig] = None): self.project_path = project_path self.config = config or ContinuityConfig() self.state_file = project_path / CONTINUITY_DIR_NAME / NOTIFICATION_STATE_FILE self.state_file.parent.mkdir(parents=True, exist_ok=True) self._state = self._load_state() def _load_state(self) -> Dict[str, Any]: """Load notification state from file.""" if self.state_file.exists(): try: with open(self.state_file) as f: return yaml.safe_load(f) or {} except Exception: pass # Default state return { "last_check": datetime.now().isoformat(), "seen_posts": {}, "user_preferences": { "notification_style": "gentle", "auto_mark_seen": False, "show_timestamps": True, }, "project_name": self.project_path.name, } def _save_state(self) -> None: """Save notification state to file.""" try: with open(self.state_file, "w") as f: yaml.dump(self._state, f, default_flow_style=False) except Exception: pass # Fail silently for now
[docs] def get_last_check(self) -> datetime: """Get last check timestamp.""" try: return datetime.fromisoformat(self._state["last_check"]) except (KeyError, ValueError): return datetime.now()
[docs] def mark_checked(self) -> None: """Mark current time as last check.""" self._state["last_check"] = datetime.now().isoformat() self._save_state()
[docs] def get_seen_posts(self, thread_name: str) -> int: """Get number of seen posts for a thread.""" return self._state.get("seen_posts", {}).get(thread_name, 0)
[docs] def mark_thread_seen(self, thread_name: str, post_count: int) -> None: """Mark a thread as seen up to a certain post count.""" if "seen_posts" not in self._state: self._state["seen_posts"] = {} self._state["seen_posts"][thread_name] = post_count self._save_state()
[docs] def get_notification_style(self) -> str: """Get user's notification style preference.""" return self._state.get("user_preferences", {}).get( "notification_style", "gentle" )
[docs] class ActivityDetector: """Detects new activity in threads and mailboxes.""" def __init__(self, notification_state: NotificationState): self.state = notification_state
[docs] def get_thread_activity(self, threads_path: Path) -> List[Dict[str, Any]]: """Get threads with new activity since last check.""" new_activity: List[Dict[str, Any]] = [] if not threads_path.exists(): return new_activity for thread_dir in threads_path.iterdir(): if not thread_dir.is_dir() or thread_dir.name.startswith("."): continue # Count posts in thread post_files = [ f for f in thread_dir.iterdir() if f.is_file() and f.name.startswith("p") and f.suffix == ".md" ] current_post_count = len(post_files) # Check if there are new posts seen_count = self.state.get_seen_posts(thread_dir.name) new_posts = current_post_count - seen_count if new_posts > 0: # Get the latest post timestamp latest_timestamp = None if post_files: latest_file = max(post_files, key=lambda f: f.stat().st_mtime) latest_timestamp = datetime.fromtimestamp( latest_file.stat().st_mtime ) new_activity.append( { "thread_name": thread_dir.name, "new_posts": new_posts, "total_posts": current_post_count, "latest_timestamp": latest_timestamp, } ) return new_activity
[docs] def get_mailbox_activity(self, mailbox_path: Path, user: str) -> int: """Get unread message count for a user.""" unread_count = 0 user_mailbox = mailbox_path / user / "mailbox" if user_mailbox.exists(): for msg_file in user_mailbox.iterdir(): if msg_file.is_file() and msg_file.suffix == ".md": unread_count += 1 return unread_count