"""
Process lock system to prevent multiple server instances.
"""
import contextlib
import fcntl
import os
from pathlib import Path
from typing import Optional
[docs]
class ProcessLock:
"""Exclusive process lock to prevent multiple server instances."""
def __init__(self, name: str = "continuity_server"):
"""Initialize process lock.
Args:
name: Name of the lock (used for lock file name)
"""
self.name = name
self.lock_file = Path.home() / ".continuity" / f"{name}.lock"
self.lock_file.parent.mkdir(exist_ok=True)
self.fd: Optional[int] = None
[docs]
def acquire(self) -> bool:
"""Acquire exclusive lock.
Returns:
True if lock acquired successfully, False if already locked
"""
try:
self.fd = os.open(self.lock_file, os.O_CREAT | os.O_WRONLY | os.O_TRUNC)
fcntl.lockf(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Write PID to lock file
pid_bytes = str(os.getpid()).encode("utf-8")
os.write(self.fd, pid_bytes)
os.fsync(self.fd)
return True
except OSError:
if self.fd is not None:
with contextlib.suppress(Exception):
os.close(self.fd)
self.fd = None
return False
[docs]
def release(self) -> None:
"""Release lock and clean up."""
if self.fd is not None:
with contextlib.suppress(Exception):
os.close(self.fd)
self.fd = None
with contextlib.suppress(Exception):
self.lock_file.unlink()
[docs]
def is_locked(self) -> bool:
"""Check if lock is currently held.
Returns:
True if lock file exists and is locked
"""
if not self.lock_file.exists():
return False
try:
# Try to open and lock the file
test_fd = os.open(self.lock_file, os.O_RDONLY)
try:
fcntl.lockf(test_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# If we can lock it, it's not actually locked
fcntl.lockf(test_fd, fcntl.LOCK_UN)
os.close(test_fd)
return False
except OSError:
# Can't lock, so it's locked by another process
os.close(test_fd)
return True
except Exception:
# Error accessing file, assume not locked
return False
[docs]
def get_lock_pid(self) -> Optional[int]:
"""Get PID of process holding the lock.
Returns:
PID of lock holder, or None if not locked or can't read
"""
if not self.lock_file.exists():
return None
try:
with open(self.lock_file) as f:
pid_str = f.read().strip()
return int(pid_str)
except Exception:
return None
[docs]
def __enter__(self):
"""Context manager entry."""
if not self.acquire():
raise RuntimeError(f"Could not acquire lock: {self.name}")
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()