"""
Simple project initialization functionality for Continuity.
Phase 3: Simplified to remove complex knowledge base and advanced features.
"""
import getpass
import socket
from pathlib import Path
from typing import Any, Dict, List, Optional
from ..config.base import ContinuityConfig
from .constants import (
ABOUT_FILE,
AGENT_DIR,
AI_AGENT_REFERENCE_FILE,
ARCHIVE_DIR,
CLI_REFERENCE_FILE,
CONFIG_FILE_NAME,
CONTINUITY_DIR_NAME,
HUMAN_DIR,
INBOX_ARCHIVE_DIR,
INBOX_DIR,
MAILBOX_DIR,
READ_DIR,
)
[docs]
class ProjectInitializer:
"""Handles project initialization and setup."""
def __init__(self, project_path: Path, config: ContinuityConfig):
"""Initialize project initializer.
Args:
project_path: Path to initialize project in
config: Configuration instance
"""
self.config = config
self.project_path = project_path
self.continuity_path = project_path / CONTINUITY_DIR_NAME
self._generator_factory = None
def _get_generator_factory(self):
"""Get document generator factory, creating if necessary.
Returns:
DocumentGeneratorFactory instance
"""
if self._generator_factory is None:
from .generators import DocumentGeneratorFactory
self._generator_factory = DocumentGeneratorFactory(
self.project_path, self.config
)
return self._generator_factory
def _get_agent_mailbox_path(self) -> Path:
"""Get agent mailbox path."""
return self.continuity_path / AGENT_DIR / MAILBOX_DIR
def _get_agent_mailbox_read_path(self) -> Path:
"""Get agent mailbox read path."""
return self.continuity_path / AGENT_DIR / MAILBOX_DIR / READ_DIR
def _get_agent_mailbox_archive_path(self) -> Path:
"""Get agent mailbox archive path."""
return self.continuity_path / AGENT_DIR / MAILBOX_DIR / ARCHIVE_DIR
def _get_human_mailbox_path(self) -> Path:
"""Get human mailbox path."""
return self.continuity_path / HUMAN_DIR / MAILBOX_DIR
def _get_human_mailbox_read_path(self) -> Path:
"""Get human mailbox read path."""
return self.continuity_path / HUMAN_DIR / MAILBOX_DIR / READ_DIR
def _get_inbox_path(self) -> Path:
"""Get inbox path."""
return self.continuity_path / INBOX_DIR
def _get_inbox_archive_path(self) -> Path:
"""Get inbox archive path."""
return self.continuity_path / INBOX_DIR / INBOX_ARCHIVE_DIR
def _get_cli_reference_path(self) -> Path:
"""Get CLI reference file path."""
return self.continuity_path / CLI_REFERENCE_FILE
def _get_ai_agent_reference_path(self) -> Path:
"""Get AI agent reference file path."""
return self.continuity_path / AI_AGENT_REFERENCE_FILE
def _get_config_path(self) -> Path:
"""Get config file path."""
return self.continuity_path / CONFIG_FILE_NAME
def _get_gitignore_path(self) -> Path:
"""Get .gitignore file path."""
return self.project_path / ".gitignore"
def _get_about_path(self) -> Path:
"""Get ABOUT.md file path."""
return self.continuity_path / ABOUT_FILE
[docs]
def detect_human_user(self) -> str:
"""Detect a sensible human username.
Returns:
Username string
"""
# Try to get username from various sources
username = None
# Try git config first
try:
import subprocess
result = subprocess.run(
["git", "config", "user.name"],
capture_output=True,
text=True,
cwd=self.project_path,
)
if result.returncode == 0 and result.stdout.strip():
git_name = result.stdout.strip()
# Convert "First Last" to "first" for username
username = git_name.split()[0].lower()
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Fallback to system username
if not username:
username = getpass.getuser()
# Fallback to hostname if username is generic
if username in ["user", "admin", "root"]:
username = socket.gethostname().split(".")[0].lower()
# Clean username (alphanumeric only)
username = "".join(c for c in username if c.isalnum())
return username or "user"
[docs]
def create_directory_structure(self) -> Dict[str, Path]:
"""Create the basic .continuity directory structure.
Returns:
Dict mapping structure names to paths
"""
# Use agent/human structure as defined in schema.py
structure = {
"base": self.continuity_path,
"agent_mailbox": self._get_agent_mailbox_path(),
"agent_mailbox_read": self._get_agent_mailbox_read_path(),
"agent_mailbox_archive": self._get_agent_mailbox_archive_path(),
"human_mailbox": self._get_human_mailbox_path(),
"human_mailbox_read": self._get_human_mailbox_read_path(),
"inbox": self._get_inbox_path(),
"inbox_archive": self._get_inbox_archive_path(),
}
# Create all directories
for path in structure.values():
path.mkdir(parents=True, exist_ok=True)
return structure
[docs]
def create_cli_reference(self) -> Path:
"""Create the CLI reference file.
Returns:
Path to created CLI reference
"""
from .generators import CliReferenceGenerator
generator = CliReferenceGenerator(self.project_path)
return generator.write()
[docs]
def create_ai_agent_reference(self) -> Path:
"""Create or update the AI Agent Reference file.
Returns:
Path to created/updated AI Agent Reference
"""
from .generators import AgentReferenceGenerator
config = ContinuityConfig.load(self._get_config_path())
generator = AgentReferenceGenerator(self.project_path, config)
return generator.write()
[docs]
def create_project_config(self, human_user: Optional[str] = None) -> Path:
"""Create project-specific configuration.
Args:
human_user: Human username, auto-detected if None
Returns:
Path to created config file
"""
if not human_user:
human_user = self.detect_human_user()
config_path = self._get_config_path()
# Create project-specific config using ContinuityConfig
config = ContinuityConfig(ai_agent_nickname="Claude", human_user=human_user)
config.save(config_path)
return config_path
[docs]
def create_about_file(self) -> Path:
"""Create or update the ABOUT.md file.
Returns:
Path to created/updated ABOUT.md file
"""
from .generators import AboutGenerator
generator = AboutGenerator(self.project_path)
return generator.write()
[docs]
def regenerate_all_documents(self) -> Dict[str, Path]:
"""Regenerate all project documents using the factory pattern.
Returns:
Dict mapping document names to their file paths
"""
factory = self._get_generator_factory()
return factory.generate_all()
[docs]
def create_gitignore_entry(self) -> bool:
"""Add continuity directory to .gitignore if it exists.
Returns:
True if .gitignore was updated, False otherwise
"""
gitignore_path = self._get_gitignore_path()
if not gitignore_path.exists():
return False
# Read existing .gitignore
try:
content = gitignore_path.read_text(encoding="utf-8")
except Exception:
return False
# Check if continuity directory is already ignored
lines = content.splitlines()
for line in lines:
if (
line.strip() == CONTINUITY_DIR_NAME
or line.strip() == f"{CONTINUITY_DIR_NAME}/"
):
return False # Already exists
# Add continuity directory to .gitignore
try:
with open(gitignore_path, "a", encoding="utf-8") as f:
if not content.endswith("\n"):
f.write("\n")
f.write(f"\n# Continuity async communication\n{CONTINUITY_DIR_NAME}\n")
return True
except Exception:
return False
[docs]
def create_welcome_message(self, human_user: str) -> Path:
"""Create a welcome message for the human user.
Args:
human_user: Human username
Returns:
Path to created message
"""
from datetime import datetime
mailbox_path = self._get_human_mailbox_path()
timestamp = datetime.now()
timestamp_str = timestamp.strftime("%Y-%m-%d-%H%M%S")
filename = f"{timestamp_str}-welcome-to-continuity.md"
message_path = mailbox_path / filename
welcome_content = f"""---
from: {self.config.ai_user}
to: {human_user}
timestamp: {timestamp.isoformat()}
title: Welcome to Continuity!
---
# Welcome to Continuity!
Your project has been initialized with the Continuity async communication system. Here's what you can do:
## Quick Start
- **Check status**: `continuity status`
- **Send message**: `continuity send "Hello {self.config.ai_agent_nickname}!"`
- **View messages**: `continuity check`
## Inbox Feature
Drop markdown files into `.continuity/inbox/` and run:
- `continuity inbox process` to process them automatically
## Configuration
- **Project config**: `.continuity/config.yaml`
- **Global config**: `~/.config/continuity/config.yaml`
- **Environment**: Use `EDITOR`, `VISUAL`, `PAGER` variables
## Documentation
- See `.continuity/CLI_REFERENCE.md` for complete command reference
- Edit `.continuity/config` to configure additional settings
## Next Steps
1. Try sending your first message: `continuity send "Hi {self.config.ai_agent_nickname}, project is set up!"`
2. Edit `.continuity/config` to customize settings
3. Check out the documentation in `.continuity/CLI_REFERENCE.md`
Happy collaborating!
---
*This message was auto-generated during project initialization.*
"""
message_path.write_text(welcome_content, encoding="utf-8")
return message_path
[docs]
def init_project(self, force: bool = False) -> Dict[str, Any]:
"""Initialize a new continuity project.
Args:
force: Overwrite existing .continuity directory
Returns:
Dict with initialization results
"""
# Create typed result containers
added_dirs: List[str] = []
added_files: List[str] = []
created_dirs: List[str] = []
created_files: List[str] = []
preserved_files: List[str] = []
errors: List[str] = []
results: Dict[str, Any] = {
"success": False,
"created_dirs": created_dirs,
"created_files": created_files,
"added_dirs": added_dirs,
"added_files": added_files,
"preserved_files": preserved_files,
"human_user": None,
"already_exists": False,
"errors": errors,
}
# Check if already exists
if self.continuity_path.exists() and not force:
results["already_exists"] = True
errors.append(
".continuity directory already exists (use --force to overwrite)"
)
return results
try:
# Detect human user
human_user = self.detect_human_user()
results["human_user"] = human_user
# Create directory structure
structure = self.create_directory_structure()
results["created_dirs"] = list(structure.keys())
# Create CLI reference
cli_ref = self.create_cli_reference()
results["created_files"].append(str(cli_ref.relative_to(self.project_path)))
# Create AI Agent reference
ai_ref = self.create_ai_agent_reference()
results["created_files"].append(str(ai_ref.relative_to(self.project_path)))
# Create project config
config_file = self.create_project_config(human_user)
results["created_files"].append(
str(config_file.relative_to(self.project_path))
)
# Create ABOUT.md
about_file = self.create_about_file()
results["created_files"].append(
str(about_file.relative_to(self.project_path))
)
# Update .gitignore
gitignore_updated = self.create_gitignore_entry()
if gitignore_updated:
results["created_files"].append(".gitignore (updated)")
# Create welcome message
welcome_msg = self.create_welcome_message(human_user)
results["created_files"].append(
str(welcome_msg.relative_to(self.project_path))
)
results["success"] = True
except Exception as e:
results["errors"].append(str(e))
return results
[docs]
def doctor_project(self, auto_yes: bool = False) -> Dict[str, Any]:
"""Non-destructive project doctor - ensure current structure exists without touching existing files.
Returns:
Dict with doctor results
"""
# Create typed result containers for better error tracking
added_dirs: List[str] = []
added_files: List[str] = []
preserved_files: List[str] = []
errors: List[str] = []
results = {
"success": False,
"added_dirs": added_dirs,
"added_files": added_files,
"preserved_files": preserved_files,
"human_user": None,
"errors": errors,
}
try:
# Check if .continuity exists
if not self.continuity_path.exists():
# Project isn't initialized at all - run full init
return self.init_project(force=False)
# Detect human user
human_user = self.detect_human_user()
results["human_user"] = human_user
# Create current structure (matches create_directory_structure)
current_structure = {
"base": self.continuity_path,
"agent_mailbox": self._get_agent_mailbox_path(),
"agent_mailbox_read": self._get_agent_mailbox_read_path(),
"agent_mailbox_archive": self._get_agent_mailbox_archive_path(),
"human_mailbox": self._get_human_mailbox_path(),
"human_mailbox_read": self._get_human_mailbox_read_path(),
"inbox": self._get_inbox_path(),
"inbox_archive": self._get_inbox_archive_path(),
}
# Create missing directories (existing ones left untouched)
for _name, path in current_structure.items():
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
added_dirs.append(str(path.relative_to(self.project_path)))
# Check and create missing files
files_to_check = [
(
"CLI_REFERENCE.md",
self.create_cli_reference,
self._get_cli_reference_path,
True,
), # Ask to replace
(
"AI_AGENT_REFERENCE.md",
self.create_ai_agent_reference,
self._get_ai_agent_reference_path,
True,
), # Ask to replace
(
"ABOUT.md",
self.create_about_file,
self._get_about_path,
True,
), # Ask to replace (regenerated from template)
(
"config",
lambda: self.create_project_config(human_user),
self._get_config_path,
False,
), # Don't replace config
]
# Also check for old config.yaml and migrate if needed
old_config = self.continuity_path / "config.yaml"
new_config = self._get_config_path()
if old_config.exists() and not new_config.exists():
# Migrate from old YAML format to new Unix format
try:
import yaml
with open(old_config, encoding="utf-8") as f:
old_data = yaml.safe_load(f) or {}
# Extract values from old format
users = old_data.get("users", {})
editor = old_data.get("editor", {})
# Create new config with migrated values
config = ContinuityConfig(
ai_agent_nickname=users.get(
"ai_user", "Claude"
).title(), # Convert to nickname
human_user=users.get("human_user", human_user),
)
# Set editor preferences separately if they differ from defaults
if "default_editor" in editor:
config.default_editor = editor["default_editor"]
if "default_viewer" in editor:
config.default_viewer = editor["default_viewer"]
config.save(new_config)
added_files.append("config (migrated from config.yaml)")
# Optionally remove old config
old_config.unlink()
except Exception as e:
print(f"Warning: Could not migrate config.yaml: {e}")
# Fall back to creating new config
created_path = self.create_project_config(human_user)
added_files.append(str(created_path.relative_to(self.project_path)))
for filename, creator_func, path_func, ask_replace in files_to_check:
file_path = path_func()
if not file_path.exists():
created_path = creator_func()
added_files.append(str(created_path.relative_to(self.project_path)))
elif ask_replace:
# Ask if they want to replace reference files with updated versions
should_replace = auto_yes
if not auto_yes:
from rich.prompt import Confirm
try:
should_replace = Confirm.ask(
f"[yellow]Replace existing {filename} with updated version?[/yellow]",
default=False,
)
except (EOFError, KeyboardInterrupt):
# Handle non-interactive mode or interrupted input
should_replace = False
if should_replace:
created_path = creator_func()
results["added_files"].append(
f"{str(created_path.relative_to(self.project_path))} (replaced)"
)
else:
preserved_files.append(
str(file_path.relative_to(self.project_path))
)
else:
results["preserved_files"].append(
str(file_path.relative_to(self.project_path))
)
# Update .gitignore if needed
gitignore_updated = self.create_gitignore_entry()
if gitignore_updated:
added_files.append(".gitignore (updated)")
else:
# Check if .gitignore exists to count as preserved
gitignore_path = self._get_gitignore_path()
if gitignore_path.exists():
results["preserved_files"].append(".gitignore")
results["success"] = True
except Exception as e:
results["errors"].append(str(e))
return results