Source code for continuity.core.init

"""
Simple project initialization functionality for Continuity.
Phase 3: Simplified to remove complex knowledge base and advanced features.
"""

import getpass
import socket
from pathlib import Path
from typing import Any, Dict, List, Optional

from ..config.base import ContinuityConfig
from .constants import (
    ABOUT_FILE,
    AGENT_DIR,
    AI_AGENT_REFERENCE_FILE,
    ARCHIVE_DIR,
    CLI_REFERENCE_FILE,
    CONFIG_FILE_NAME,
    CONTINUITY_DIR_NAME,
    HUMAN_DIR,
    INBOX_ARCHIVE_DIR,
    INBOX_DIR,
    MAILBOX_DIR,
    READ_DIR,
)


[docs] class ProjectInitializer: """Handles project initialization and setup.""" def __init__(self, project_path: Path, config: ContinuityConfig): """Initialize project initializer. Args: project_path: Path to initialize project in config: Configuration instance """ self.config = config self.project_path = project_path self.continuity_path = project_path / CONTINUITY_DIR_NAME self._generator_factory = None def _get_generator_factory(self): """Get document generator factory, creating if necessary. Returns: DocumentGeneratorFactory instance """ if self._generator_factory is None: from .generators import DocumentGeneratorFactory self._generator_factory = DocumentGeneratorFactory( self.project_path, self.config ) return self._generator_factory def _get_agent_mailbox_path(self) -> Path: """Get agent mailbox path.""" return self.continuity_path / AGENT_DIR / MAILBOX_DIR def _get_agent_mailbox_read_path(self) -> Path: """Get agent mailbox read path.""" return self.continuity_path / AGENT_DIR / MAILBOX_DIR / READ_DIR def _get_agent_mailbox_archive_path(self) -> Path: """Get agent mailbox archive path.""" return self.continuity_path / AGENT_DIR / MAILBOX_DIR / ARCHIVE_DIR def _get_human_mailbox_path(self) -> Path: """Get human mailbox path.""" return self.continuity_path / HUMAN_DIR / MAILBOX_DIR def _get_human_mailbox_read_path(self) -> Path: """Get human mailbox read path.""" return self.continuity_path / HUMAN_DIR / MAILBOX_DIR / READ_DIR def _get_inbox_path(self) -> Path: """Get inbox path.""" return self.continuity_path / INBOX_DIR def _get_inbox_archive_path(self) -> Path: """Get inbox archive path.""" return self.continuity_path / INBOX_DIR / INBOX_ARCHIVE_DIR def _get_cli_reference_path(self) -> Path: """Get CLI reference file path.""" return self.continuity_path / CLI_REFERENCE_FILE def _get_ai_agent_reference_path(self) -> Path: """Get AI agent reference file path.""" return self.continuity_path / AI_AGENT_REFERENCE_FILE def _get_config_path(self) -> Path: """Get config file path.""" return self.continuity_path / CONFIG_FILE_NAME def _get_gitignore_path(self) -> Path: """Get .gitignore file path.""" return self.project_path / ".gitignore" def _get_about_path(self) -> Path: """Get ABOUT.md file path.""" return self.continuity_path / ABOUT_FILE
[docs] def detect_human_user(self) -> str: """Detect a sensible human username. Returns: Username string """ # Try to get username from various sources username = None # Try git config first try: import subprocess result = subprocess.run( ["git", "config", "user.name"], capture_output=True, text=True, cwd=self.project_path, ) if result.returncode == 0 and result.stdout.strip(): git_name = result.stdout.strip() # Convert "First Last" to "first" for username username = git_name.split()[0].lower() except (subprocess.CalledProcessError, FileNotFoundError): pass # Fallback to system username if not username: username = getpass.getuser() # Fallback to hostname if username is generic if username in ["user", "admin", "root"]: username = socket.gethostname().split(".")[0].lower() # Clean username (alphanumeric only) username = "".join(c for c in username if c.isalnum()) return username or "user"
[docs] def create_directory_structure(self) -> Dict[str, Path]: """Create the basic .continuity directory structure. Returns: Dict mapping structure names to paths """ # Use agent/human structure as defined in schema.py structure = { "base": self.continuity_path, "agent_mailbox": self._get_agent_mailbox_path(), "agent_mailbox_read": self._get_agent_mailbox_read_path(), "agent_mailbox_archive": self._get_agent_mailbox_archive_path(), "human_mailbox": self._get_human_mailbox_path(), "human_mailbox_read": self._get_human_mailbox_read_path(), "inbox": self._get_inbox_path(), "inbox_archive": self._get_inbox_archive_path(), } # Create all directories for path in structure.values(): path.mkdir(parents=True, exist_ok=True) return structure
[docs] def create_cli_reference(self) -> Path: """Create the CLI reference file. Returns: Path to created CLI reference """ from .generators import CliReferenceGenerator generator = CliReferenceGenerator(self.project_path) return generator.write()
[docs] def create_ai_agent_reference(self) -> Path: """Create or update the AI Agent Reference file. Returns: Path to created/updated AI Agent Reference """ from .generators import AgentReferenceGenerator config = ContinuityConfig.load(self._get_config_path()) generator = AgentReferenceGenerator(self.project_path, config) return generator.write()
[docs] def create_project_config(self, human_user: Optional[str] = None) -> Path: """Create project-specific configuration. Args: human_user: Human username, auto-detected if None Returns: Path to created config file """ if not human_user: human_user = self.detect_human_user() config_path = self._get_config_path() # Create project-specific config using ContinuityConfig config = ContinuityConfig(ai_agent_nickname="Claude", human_user=human_user) config.save(config_path) return config_path
[docs] def create_about_file(self) -> Path: """Create or update the ABOUT.md file. Returns: Path to created/updated ABOUT.md file """ from .generators import AboutGenerator generator = AboutGenerator(self.project_path) return generator.write()
[docs] def regenerate_all_documents(self) -> Dict[str, Path]: """Regenerate all project documents using the factory pattern. Returns: Dict mapping document names to their file paths """ factory = self._get_generator_factory() return factory.generate_all()
[docs] def create_gitignore_entry(self) -> bool: """Add continuity directory to .gitignore if it exists. Returns: True if .gitignore was updated, False otherwise """ gitignore_path = self._get_gitignore_path() if not gitignore_path.exists(): return False # Read existing .gitignore try: content = gitignore_path.read_text(encoding="utf-8") except Exception: return False # Check if continuity directory is already ignored lines = content.splitlines() for line in lines: if ( line.strip() == CONTINUITY_DIR_NAME or line.strip() == f"{CONTINUITY_DIR_NAME}/" ): return False # Already exists # Add continuity directory to .gitignore try: with open(gitignore_path, "a", encoding="utf-8") as f: if not content.endswith("\n"): f.write("\n") f.write(f"\n# Continuity async communication\n{CONTINUITY_DIR_NAME}\n") return True except Exception: return False
[docs] def create_welcome_message(self, human_user: str) -> Path: """Create a welcome message for the human user. Args: human_user: Human username Returns: Path to created message """ from datetime import datetime mailbox_path = self._get_human_mailbox_path() timestamp = datetime.now() timestamp_str = timestamp.strftime("%Y-%m-%d-%H%M%S") filename = f"{timestamp_str}-welcome-to-continuity.md" message_path = mailbox_path / filename welcome_content = f"""--- from: {self.config.ai_user} to: {human_user} timestamp: {timestamp.isoformat()} title: Welcome to Continuity! --- # Welcome to Continuity! Your project has been initialized with the Continuity async communication system. Here's what you can do: ## Quick Start - **Check status**: `continuity status` - **Send message**: `continuity send "Hello {self.config.ai_agent_nickname}!"` - **View messages**: `continuity check` ## Inbox Feature Drop markdown files into `.continuity/inbox/` and run: - `continuity inbox process` to process them automatically ## Configuration - **Project config**: `.continuity/config.yaml` - **Global config**: `~/.config/continuity/config.yaml` - **Environment**: Use `EDITOR`, `VISUAL`, `PAGER` variables ## Documentation - See `.continuity/CLI_REFERENCE.md` for complete command reference - Edit `.continuity/config` to configure additional settings ## Next Steps 1. Try sending your first message: `continuity send "Hi {self.config.ai_agent_nickname}, project is set up!"` 2. Edit `.continuity/config` to customize settings 3. Check out the documentation in `.continuity/CLI_REFERENCE.md` Happy collaborating! --- *This message was auto-generated during project initialization.* """ message_path.write_text(welcome_content, encoding="utf-8") return message_path
[docs] def init_project(self, force: bool = False) -> Dict[str, Any]: """Initialize a new continuity project. Args: force: Overwrite existing .continuity directory Returns: Dict with initialization results """ # Create typed result containers added_dirs: List[str] = [] added_files: List[str] = [] created_dirs: List[str] = [] created_files: List[str] = [] preserved_files: List[str] = [] errors: List[str] = [] results: Dict[str, Any] = { "success": False, "created_dirs": created_dirs, "created_files": created_files, "added_dirs": added_dirs, "added_files": added_files, "preserved_files": preserved_files, "human_user": None, "already_exists": False, "errors": errors, } # Check if already exists if self.continuity_path.exists() and not force: results["already_exists"] = True errors.append( ".continuity directory already exists (use --force to overwrite)" ) return results try: # Detect human user human_user = self.detect_human_user() results["human_user"] = human_user # Create directory structure structure = self.create_directory_structure() results["created_dirs"] = list(structure.keys()) # Create CLI reference cli_ref = self.create_cli_reference() results["created_files"].append(str(cli_ref.relative_to(self.project_path))) # Create AI Agent reference ai_ref = self.create_ai_agent_reference() results["created_files"].append(str(ai_ref.relative_to(self.project_path))) # Create project config config_file = self.create_project_config(human_user) results["created_files"].append( str(config_file.relative_to(self.project_path)) ) # Create ABOUT.md about_file = self.create_about_file() results["created_files"].append( str(about_file.relative_to(self.project_path)) ) # Update .gitignore gitignore_updated = self.create_gitignore_entry() if gitignore_updated: results["created_files"].append(".gitignore (updated)") # Create welcome message welcome_msg = self.create_welcome_message(human_user) results["created_files"].append( str(welcome_msg.relative_to(self.project_path)) ) results["success"] = True except Exception as e: results["errors"].append(str(e)) return results
[docs] def doctor_project(self, auto_yes: bool = False) -> Dict[str, Any]: """Non-destructive project doctor - ensure current structure exists without touching existing files. Returns: Dict with doctor results """ # Create typed result containers for better error tracking added_dirs: List[str] = [] added_files: List[str] = [] preserved_files: List[str] = [] errors: List[str] = [] results = { "success": False, "added_dirs": added_dirs, "added_files": added_files, "preserved_files": preserved_files, "human_user": None, "errors": errors, } try: # Check if .continuity exists if not self.continuity_path.exists(): # Project isn't initialized at all - run full init return self.init_project(force=False) # Detect human user human_user = self.detect_human_user() results["human_user"] = human_user # Create current structure (matches create_directory_structure) current_structure = { "base": self.continuity_path, "agent_mailbox": self._get_agent_mailbox_path(), "agent_mailbox_read": self._get_agent_mailbox_read_path(), "agent_mailbox_archive": self._get_agent_mailbox_archive_path(), "human_mailbox": self._get_human_mailbox_path(), "human_mailbox_read": self._get_human_mailbox_read_path(), "inbox": self._get_inbox_path(), "inbox_archive": self._get_inbox_archive_path(), } # Create missing directories (existing ones left untouched) for _name, path in current_structure.items(): if not path.exists(): path.mkdir(parents=True, exist_ok=True) added_dirs.append(str(path.relative_to(self.project_path))) # Check and create missing files files_to_check = [ ( "CLI_REFERENCE.md", self.create_cli_reference, self._get_cli_reference_path, True, ), # Ask to replace ( "AI_AGENT_REFERENCE.md", self.create_ai_agent_reference, self._get_ai_agent_reference_path, True, ), # Ask to replace ( "ABOUT.md", self.create_about_file, self._get_about_path, True, ), # Ask to replace (regenerated from template) ( "config", lambda: self.create_project_config(human_user), self._get_config_path, False, ), # Don't replace config ] # Also check for old config.yaml and migrate if needed old_config = self.continuity_path / "config.yaml" new_config = self._get_config_path() if old_config.exists() and not new_config.exists(): # Migrate from old YAML format to new Unix format try: import yaml with open(old_config, encoding="utf-8") as f: old_data = yaml.safe_load(f) or {} # Extract values from old format users = old_data.get("users", {}) editor = old_data.get("editor", {}) # Create new config with migrated values config = ContinuityConfig( ai_agent_nickname=users.get( "ai_user", "Claude" ).title(), # Convert to nickname human_user=users.get("human_user", human_user), ) # Set editor preferences separately if they differ from defaults if "default_editor" in editor: config.default_editor = editor["default_editor"] if "default_viewer" in editor: config.default_viewer = editor["default_viewer"] config.save(new_config) added_files.append("config (migrated from config.yaml)") # Optionally remove old config old_config.unlink() except Exception as e: print(f"Warning: Could not migrate config.yaml: {e}") # Fall back to creating new config created_path = self.create_project_config(human_user) added_files.append(str(created_path.relative_to(self.project_path))) for filename, creator_func, path_func, ask_replace in files_to_check: file_path = path_func() if not file_path.exists(): created_path = creator_func() added_files.append(str(created_path.relative_to(self.project_path))) elif ask_replace: # Ask if they want to replace reference files with updated versions should_replace = auto_yes if not auto_yes: from rich.prompt import Confirm try: should_replace = Confirm.ask( f"[yellow]Replace existing {filename} with updated version?[/yellow]", default=False, ) except (EOFError, KeyboardInterrupt): # Handle non-interactive mode or interrupted input should_replace = False if should_replace: created_path = creator_func() results["added_files"].append( f"{str(created_path.relative_to(self.project_path))} (replaced)" ) else: preserved_files.append( str(file_path.relative_to(self.project_path)) ) else: results["preserved_files"].append( str(file_path.relative_to(self.project_path)) ) # Update .gitignore if needed gitignore_updated = self.create_gitignore_entry() if gitignore_updated: added_files.append(".gitignore (updated)") else: # Check if .gitignore exists to count as preserved gitignore_path = self._get_gitignore_path() if gitignore_path.exists(): results["preserved_files"].append(".gitignore") results["success"] = True except Exception as e: results["errors"].append(str(e)) return results