Source code for continuity.core.file_processor

"""
Simple file processing for Continuity inbox.
Phase 3: Simplified to handle only basic markdown file processing.
"""

from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List

from ..config.base import ContinuityConfig


[docs] class SimpleFileProcessor: """Simple file processor that handles markdown files only.""" def __init__(self, base_path: Path, config: ContinuityConfig): """Initialize simple file processor. Args: base_path: Base directory containing .continuity config: Configuration instance """ self.config = config self.base_path = base_path self.inbox_path = base_path / "inbox" self.archive_path = self.inbox_path / "archive"
[docs] def ensure_directories(self) -> None: """Create necessary directories.""" self.inbox_path.mkdir(parents=True, exist_ok=True) self.archive_path.mkdir(parents=True, exist_ok=True)
[docs] def scan_inbox(self) -> List[Path]: """Scan inbox for markdown files to process.""" if not self.inbox_path.exists(): return [] # Only process markdown files files = list(self.inbox_path.glob("*.md")) return sorted(files, key=lambda f: f.stat().st_mtime)
[docs] def detect_recipient(self, content: str) -> str: """Detect recipient from content (simple @mention detection).""" # Look for @ai or @human mentions if "@ai" in content.lower() or "@agent" in content.lower(): return self.config.ai_user elif "@human" in content.lower() or "@user" in content.lower(): return self.config.human_user else: return self.config.ai_user # Default recipient
[docs] def extract_title(self, content: str, filename: str) -> str: """Extract title from content or use filename.""" lines = content.strip().split("\n") # Look for markdown heading for line in lines[:5]: # Check first 5 lines if line.startswith("# "): return line[2:].strip() # Use filename as fallback return filename.replace("-", " ").replace("_", " ").title()
[docs] def process_file(self, file_path: Path) -> Dict[str, Any]: """Process a single markdown file.""" try: content = file_path.read_text(encoding="utf-8").strip() if not content: return {"success": False, "error": "Empty file", "file": file_path.name} # Extract metadata recipient = self.detect_recipient(content) title = self.extract_title(content, file_path.stem) # Create message metadata metadata = { "from": "file-processor", "to": recipient, "title": title, "timestamp": datetime.now().isoformat(), "source_file": file_path.name, } return { "success": True, "content": content, "metadata": metadata, "recipient": recipient, "title": title, "file": file_path.name, } except Exception as e: return {"success": False, "error": str(e), "file": file_path.name}
[docs] def archive_file(self, file_path: Path) -> None: """Archive processed file.""" self.ensure_directories() # Create timestamped archive name timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") archive_name = f"{timestamp}-{file_path.name}" archive_path = self.archive_path / archive_name # Move to archive file_path.rename(archive_path)
[docs] def get_status(self) -> Dict[str, Any]: """Get simple inbox status.""" files = self.scan_inbox() return { "inbox_path": str(self.inbox_path), "files_waiting": len(files), "files": [f.name for f in files], "archive_path": str(self.archive_path), }