Source code for agentscope_runtime.engine.services.environment_manager

# -*- coding: utf-8 -*-
from typing import List
from contextlib import asynccontextmanager

from .manager import ServiceManager
from .sandbox_service import SandboxService


[docs] class EnvironmentManager(ServiceManager): """ The EnvironmentManager class for managing environment-related services. """
[docs] def __init__(self, sandbox_service: SandboxService = None): self._sandbox_service = sandbox_service super().__init__()
def _register_default_services(self): """Register default services for environment management.""" self._sandbox_service = self._sandbox_service or SandboxService() self.register_service("sandbox", self._sandbox_service)
[docs] def connect_sandbox( self, session_id, user_id, env_types=None, tools=None, ) -> List: return self._sandbox_service.connect( session_id, user_id, env_types=env_types, tools=tools, )
[docs] def release_sandbox(self, session_id, user_id): return self._sandbox_service.release(session_id, user_id)
[docs] @asynccontextmanager async def create_environment_manager( sandbox_service: SandboxService = None, ): manager = EnvironmentManager( sandbox_service=sandbox_service, ) async with manager: yield manager