Source code for agentscope_runtime.engine.deployers.utils.service_utils.process_manager

# -*- coding: utf-8 -*-
# pylint:disable=consider-using-with

import asyncio
import os
import subprocess
from typing import Optional

import psutil


[docs] class ProcessManager: """Manager for detached process lifecycle."""
[docs] def __init__(self, shutdown_timeout: int = 30): """Initialize process manager. Args: shutdown_timeout: Timeout in seconds for graceful shutdown """ self.shutdown_timeout = shutdown_timeout self._log_file = None self._log_file_handle = None
[docs] async def start_detached_process( self, script_path: str, host: str = "127.0.0.1", port: int = 8000, env: Optional[dict] = None, ) -> int: """Start a detached process running the given script. Args: script_path: Path to the Python script to run host: Host to bind to port: Port to bind to env: Additional environment variables Returns: Process PID Raises: RuntimeError: If process creation fails """ try: # Prepare environment process_env = os.environ.copy() if env: process_env.update(env) # Create log file path with timestamp and child process PID # We'll update the filename after process starts log_dir = "/tmp/agentscope_runtime_logs" os.makedirs(log_dir, exist_ok=True) # Use a temporary name first, will rename after getting PID import time timestamp = time.strftime("%Y%m%d_%H%M%S") temp_log_file = os.path.join( log_dir, f"process_temp_{timestamp}_{os.getpid()}.log", ) # Open log file (don't use 'with' to keep it open for the # subprocess) log_f = open(temp_log_file, "w", encoding="utf-8") # Start detached process with log file process = subprocess.Popen( [ "python", script_path, "--host", host, "--port", str(port), ], stdout=log_f, stderr=subprocess.STDOUT, # Redirect stderr to stdout stdin=subprocess.DEVNULL, start_new_session=True, # Create new process group env=process_env, cwd=os.path.dirname(script_path), ) # Rename log file with actual process PID log_file = os.path.join(log_dir, f"process_{process.pid}.log") log_f.close() # Close temp file os.rename(temp_log_file, log_file) # Reopen with the correct name log_f = open(log_file, "a", encoding="utf-8") # Store log file path and handle for later retrieval self._log_file = log_file self._log_file_handle = log_f # Verify process started successfully await asyncio.sleep( 0.5, ) # Give process time to start and write logs if process.poll() is not None: # Process failed to start, wait a bit more for logs to be # flushed await asyncio.sleep(0.2) # Read logs and print them logs = self.get_process_logs(max_lines=50) import logging logger = logging.getLogger(__name__) logger.error( f"Process failed to start immediately.\n\n" f"Process logs:\n{logs}", ) raise RuntimeError( "Process failed to start. Check logs above.", ) return process.pid except RuntimeError: # Re-raise RuntimeError with logs already included raise except Exception as e: # For other exceptions, try to include logs if available if self._log_file: logs = self.get_process_logs(max_lines=50) import logging logger = logging.getLogger(__name__) logger.error( f"Failed to start detached process: {e}\n\n" f"Process logs:\n{logs}", ) raise RuntimeError(f"Failed to start detached process: {e}") from e
[docs] async def stop_process_gracefully( self, pid: int, timeout: Optional[int] = None, ) -> bool: """Stop a process gracefully. Args: pid: Process ID to stop timeout: Timeout for graceful shutdown (uses default if None) Returns: True if process was stopped successfully Raises: RuntimeError: If process termination fails """ if timeout is None: timeout = self.shutdown_timeout try: # Check if process exists if not psutil.pid_exists(pid): return True # Already terminated process = psutil.Process(pid) # Send SIGTERM for graceful shutdown process.terminate() # Wait for process to terminate try: process.wait(timeout=timeout) return True except psutil.TimeoutExpired: # Force kill if graceful shutdown failed process.kill() process.wait(timeout=5) # Wait for kill to complete return True except psutil.NoSuchProcess: return True # Process already terminated except psutil.AccessDenied as e: raise RuntimeError( f"Access denied when terminating process {pid}: {e}", ) from e except Exception as e: raise RuntimeError( f"Failed to terminate process {pid}: {e}", ) from e finally: # Close log file handle if open self._close_log_file()
[docs] def is_process_running(self, pid: int) -> bool: """Check if a process is running. Args: pid: Process ID to check Returns: True if process is running """ try: return psutil.pid_exists(pid) except Exception: return False
[docs] def create_pid_file(self, pid: int, file_path: str): """Create a PID file. Args: pid: Process ID file_path: Path to PID file Raises: OSError: If file creation fails """ try: # Ensure directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w", encoding="utf-8") as f: f.write(str(pid)) except Exception as e: raise OSError(f"Failed to create PID file {file_path}: {e}") from e
[docs] def read_pid_file(self, file_path: str) -> Optional[int]: """Read PID from file. Args: file_path: Path to PID file Returns: Process ID or None if file doesn't exist or is invalid """ try: if not os.path.exists(file_path): return None with open(file_path, "r", encoding="utf-8") as f: return int(f.read().strip()) except (ValueError, OSError): return None
[docs] def cleanup_pid_file(self, file_path: str): """Remove PID file. Args: file_path: Path to PID file """ try: if os.path.exists(file_path): os.remove(file_path) except OSError: pass # Ignore cleanup errors
[docs] async def find_process_by_port(self, port: int) -> Optional[int]: """Find process listening on a specific port. Args: port: Port number Returns: Process ID or None if not found """ try: for conn in psutil.net_connections(kind="inet"): if conn.laddr.port == port and conn.status == "LISTEN": if conn.pid: return conn.pid return None except Exception: return None
[docs] def get_process_info(self, pid: int) -> Optional[dict]: """Get information about a process. Args: pid: Process ID Returns: Dictionary with process information or None if process not found """ try: if not psutil.pid_exists(pid): return None process = psutil.Process(pid) return { "pid": pid, "name": process.name(), "status": process.status(), "cpu_percent": process.cpu_percent(), "memory_percent": process.memory_percent(), "create_time": process.create_time(), "cmdline": process.cmdline(), } except (psutil.NoSuchProcess, psutil.AccessDenied): return None
[docs] async def wait_for_port( self, host: str, port: int, timeout: int = 30, ) -> bool: """Wait for a service to become available on a port. Args: host: Host to check port: Port to check timeout: Maximum time to wait Returns: True if service becomes available, False if timeout """ import socket # Normalize host for connection check # When service binds to 0.0.0.0, we need to connect to 127.0.0.1 check_host = self._normalize_host_for_check(host) end_time = asyncio.get_event_loop().time() + timeout while asyncio.get_event_loop().time() < end_time: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(1) result = sock.connect_ex((check_host, port)) if result == 0: return True except Exception: pass await asyncio.sleep(0.5) return False
[docs] def get_process_logs(self, max_lines: int = 50) -> str: """Get the last N lines of process logs. Args: max_lines: Maximum number of lines to return Returns: Log content as string """ if not self._log_file or not os.path.exists(self._log_file): return "No log file available" try: # Flush the log file handle if it's still open if self._log_file_handle and not self._log_file_handle.closed: self._log_file_handle.flush() with open(self._log_file, "r", encoding="utf-8") as f: lines = f.readlines() if not lines: return ( "Log file is empty (process may not have written " "any output yet)" ) # Return last N lines return "".join(lines[-max_lines:]) except Exception as e: return f"Failed to read log file: {e}"
def _close_log_file(self): """Close log file handle if open.""" if self._log_file_handle and not self._log_file_handle.closed: try: self._log_file_handle.close() except Exception: pass # Ignore errors when closing
[docs] def cleanup_log_file(self, keep_file: bool = False): """Clean up log file. Args: keep_file: If True, keep the log file on disk but close the handle. If False, delete the log file. """ self._close_log_file() if not keep_file and self._log_file and os.path.exists(self._log_file): try: os.remove(self._log_file) except Exception: pass # Ignore cleanup errors self._log_file = None self._log_file_handle = None
[docs] @staticmethod def cleanup_old_logs(max_age_hours: int = 24): """Clean up old log files. Args: max_age_hours: Remove log files older than this many hours """ import time log_dir = "/tmp/agentscope_runtime_logs" if not os.path.exists(log_dir): return current_time = time.time() max_age_seconds = max_age_hours * 3600 try: for filename in os.listdir(log_dir): if filename.startswith("process_") and filename.endswith( ".log", ): filepath = os.path.join(log_dir, filename) try: file_age = current_time - os.path.getmtime(filepath) if file_age > max_age_seconds: os.remove(filepath) except Exception: pass # Ignore errors for individual files except Exception: pass # Ignore errors during cleanup
@staticmethod def _normalize_host_for_check(host: str) -> str: """Normalize host for connection check. When a service binds to 0.0.0.0 (all interfaces), it cannot be directly connected to. We need to connect to 127.0.0.1 instead to check if the service is running locally. Args: host: The host the service binds to Returns: The host to use for connection check """ if host in ("0.0.0.0", "::"): return "127.0.0.1" return host