Source code for continuity.utils.process_lock

"""
Process lock system to prevent multiple server instances.
"""

import contextlib
import fcntl
import os
from pathlib import Path
from typing import Optional


[docs] class ProcessLock: """Exclusive process lock to prevent multiple server instances.""" def __init__(self, name: str = "continuity_server"): """Initialize process lock. Args: name: Name of the lock (used for lock file name) """ self.name = name self.lock_file = Path.home() / ".continuity" / f"{name}.lock" self.lock_file.parent.mkdir(exist_ok=True) self.fd: Optional[int] = None
[docs] def acquire(self) -> bool: """Acquire exclusive lock. Returns: True if lock acquired successfully, False if already locked """ try: self.fd = os.open(self.lock_file, os.O_CREAT | os.O_WRONLY | os.O_TRUNC) fcntl.lockf(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Write PID to lock file pid_bytes = str(os.getpid()).encode("utf-8") os.write(self.fd, pid_bytes) os.fsync(self.fd) return True except OSError: if self.fd is not None: with contextlib.suppress(Exception): os.close(self.fd) self.fd = None return False
[docs] def release(self) -> None: """Release lock and clean up.""" if self.fd is not None: with contextlib.suppress(Exception): os.close(self.fd) self.fd = None with contextlib.suppress(Exception): self.lock_file.unlink()
[docs] def is_locked(self) -> bool: """Check if lock is currently held. Returns: True if lock file exists and is locked """ if not self.lock_file.exists(): return False try: # Try to open and lock the file test_fd = os.open(self.lock_file, os.O_RDONLY) try: fcntl.lockf(test_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # If we can lock it, it's not actually locked fcntl.lockf(test_fd, fcntl.LOCK_UN) os.close(test_fd) return False except OSError: # Can't lock, so it's locked by another process os.close(test_fd) return True except Exception: # Error accessing file, assume not locked return False
[docs] def get_lock_pid(self) -> Optional[int]: """Get PID of process holding the lock. Returns: PID of lock holder, or None if not locked or can't read """ if not self.lock_file.exists(): return None try: with open(self.lock_file) as f: pid_str = f.read().strip() return int(pid_str) except Exception: return None
[docs] def __enter__(self): """Context manager entry.""" if not self.acquire(): raise RuntimeError(f"Could not acquire lock: {self.name}") return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.release()