Source code for agentscope_runtime.engine.services.manager
# -*- coding: utf-8 -*-
import logging
from contextlib import AsyncExitStack
from typing import Dict, Any, Type, List
from abc import ABC, abstractmethod
from agentscope_runtime.engine import Service
logger = logging.getLogger(__name__)
[docs]
class ServiceManager(ABC):
"""
Abstract base class for service managers.
Provides common functionality for service registration and lifecycle
management.
"""
[docs]
def __init__(self):
self.services = []
self.service_instances = {}
self._exit_stack = AsyncExitStack()
# Initialize default services
self._register_default_services()
@abstractmethod
def _register_default_services(self):
"""
Register default services for this manager. Override in
subclasses.
"""
[docs]
def register(self, service_class: Type, *args, name: str = None, **kwargs):
"""
Register a service.
Args:
service_class: The class of the service to register.
*args: Positional arguments for service initialization.
name: Optional service name. Defaults to class name without
'Service' suffix and converted to lowercase.
**kwargs: Keyword arguments for service initialization.
Returns:
self: For method chaining
"""
if name is None:
name = service_class.__name__.replace("Service", "").lower()
# Check if service name already exists
if any(service[3] == name for service in self.services):
raise ValueError(
f"Service with name '{name}' is already registered",
)
self.services.append((service_class, args, kwargs, name))
logger.debug(f"Registered service: {name} ({service_class.__name__})")
return self
[docs]
def register_service(self, name: str, service: Service):
"""Register an already instantiated service.
Args:
name: Service name
service: Service instance
Returns:
self: For method chaining
"""
if name in self.service_instances:
raise ValueError(
f"Service with name '{name}' is already registered",
)
self.service_instances[name] = service
logger.debug(f"Registered service instance: {name}")
return self
[docs]
async def __aenter__(self):
"""Start all registered services using AsyncExitStack."""
try:
# Track services that were registered with register() to avoid
# duplicate processing
registered_names = set()
# Start services that were registered with register()
for service_class, args, kwargs, name in self.services:
logger.debug(f"Starting service: {name}")
instance = service_class(*args, **kwargs)
# Use AsyncExitStack to manage the context
await self._exit_stack.enter_async_context(instance)
self.service_instances[name] = instance
registered_names.add(name) # Track this service as processed
logger.debug(f"Successfully started service: {name}")
# Start services that were registered with register_service()
# These services are already instantiated, just need to enter
# their context
for name, service in list(self.service_instances.items()):
if (
name not in registered_names
): # Only process services not from register() method
logger.debug(f"Starting pre-instantiated service: {name}")
await self._exit_stack.enter_async_context(service)
logger.debug(
f"Successfully started pre-instantiated service:"
f" {name}",
)
return self
except Exception as e:
logger.error(f"Failed to start services: {e}")
# Ensure proper cleanup if initialization fails
await self._exit_stack.aclose()
raise
[docs]
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close all services using AsyncExitStack."""
logger.debug("Stopping all services")
await self._exit_stack.aclose()
self.service_instances.clear()
logger.debug("All services stopped")
return False
[docs]
def __getattr__(self, name: str):
"""
Enable attribute access for services, e.g., manager.env,
manager.session.
"""
if name in self.service_instances:
return self.service_instances[name]
raise AttributeError(f"Service '{name}' not found")
[docs]
def __getitem__(self, name: str):
"""Enable dictionary-style access for services."""
if name in self.service_instances:
return self.service_instances[name]
raise KeyError(f"Service '{name}' not found")
[docs]
def get(self, name: str, default=None):
"""Explicitly retrieve a service instance with optional default."""
return self.service_instances.get(name, default)
[docs]
def has_service(self, name: str) -> bool:
"""Check if a service exists."""
return name in self.service_instances
[docs]
def list_services(self) -> List[str]:
"""List all registered service names."""
return list(self.service_instances.keys())
@property
def all_services(self) -> Dict[str, Any]:
"""Retrieve all service instances."""
return self.service_instances.copy()
[docs]
async def health_check(self) -> Dict[str, bool]:
"""Check health of all services."""
health_status = {}
for name, service in self.service_instances.items():
try:
if hasattr(service, "health"):
health_status[name] = await service.health()
else:
health_status[
name
] = True # Assume healthy if no health method
except Exception as e:
logger.error(f"Health check failed for service {name}: {e}")
health_status[name] = False
return health_status