[docs]classServiceManager(ABC):""" Abstract base class for service managers. Provides common functionality for service registration and lifecycle management. """
@abstractmethoddef_register_default_services(self):""" Register default services for this manager. Override in subclasses. """
[docs]defregister(self,service_class:Type,*args,name:str=None,**kwargs):""" Register a service. Args: service_class: The class of the service to register. *args: Positional arguments for service initialization. name: Optional service name. Defaults to class name without 'Service' suffix and converted to lowercase. **kwargs: Keyword arguments for service initialization. Returns: self: For method chaining """ifnameisNone:name=service_class.__name__.replace("Service","").lower()# Check if service name already existsifany(service[3]==nameforserviceinself.services):raiseValueError(f"Service with name '{name}' is already registered",)self.services.append((service_class,args,kwargs,name))logger.debug(f"Registered service: {name} ({service_class.__name__})")returnself
[docs]defregister_service(self,name:str,service:Service):"""Register an already instantiated service. Args: name: Service name service: Service instance Returns: self: For method chaining """ifnameinself.service_instances:raiseValueError(f"Service with name '{name}' is already registered",)self.service_instances[name]=servicelogger.debug(f"Registered service instance: {name}")returnself
[docs]asyncdef__aenter__(self):"""Start all registered services using AsyncExitStack."""try:# Track services that were registered with register() to avoid# duplicate processingregistered_names=set()# Start services that were registered with register()forservice_class,args,kwargs,nameinself.services:logger.debug(f"Starting service: {name}")instance=service_class(*args,**kwargs)# Use AsyncExitStack to manage the contextawaitself._exit_stack.enter_async_context(instance)self.service_instances[name]=instanceregistered_names.add(name)# Track this service as processedlogger.debug(f"Successfully started service: {name}")# Start services that were registered with register_service()# These services are already instantiated, just need to enter# their contextforname,serviceinlist(self.service_instances.items()):if(namenotinregistered_names):# Only process services not from register() methodlogger.debug(f"Starting pre-instantiated service: {name}")awaitself._exit_stack.enter_async_context(service)logger.debug(f"Successfully started pre-instantiated service:"f" {name}",)returnselfexceptExceptionase:logger.error(f"Failed to start services: {e}")# Ensure proper cleanup if initialization failsawaitself._exit_stack.aclose()raise
[docs]asyncdef__aexit__(self,exc_type,exc_val,exc_tb):"""Close all services using AsyncExitStack."""logger.debug("Stopping all services")awaitself._exit_stack.aclose()self.service_instances.clear()logger.debug("All services stopped")returnFalse
[docs]def__getattr__(self,name:str):""" Enable attribute access for services, e.g., manager.env, manager.session. """ifnameinself.service_instances:returnself.service_instances[name]raiseAttributeError(f"Service '{name}' not found")
[docs]def__getitem__(self,name:str):"""Enable dictionary-style access for services."""ifnameinself.service_instances:returnself.service_instances[name]raiseKeyError(f"Service '{name}' not found")
[docs]defget(self,name:str,default=None):"""Explicitly retrieve a service instance with optional default."""returnself.service_instances.get(name,default)
[docs]defhas_service(self,name:str)->bool:"""Check if a service exists."""returnnameinself.service_instances
[docs]deflist_services(self)->List[str]:"""List all registered service names."""returnlist(self.service_instances.keys())
@propertydefall_services(self)->Dict[str,Any]:"""Retrieve all service instances."""returnself.service_instances.copy()
[docs]asyncdefhealth_check(self)->Dict[str,bool]:"""Check health of all services."""health_status={}forname,serviceinself.service_instances.items():try:ifhasattr(service,"health"):health_status[name]=awaitservice.health()else:health_status[name]=True# Assume healthy if no health methodexceptExceptionase:logger.error(f"Health check failed for service {name}: {e}")health_status[name]=Falsereturnhealth_status