Source code for agentscope_runtime.engine.services.sandbox.sandbox_service

# -*- coding: utf-8 -*-
# mypy: disable-error-code="list-item"
from typing import List, Optional

from ....sandbox.enums import SandboxType
from ....sandbox.manager import SandboxManager
from ....sandbox.registry import SandboxRegistry
from ....engine.services.base import ServiceWithLifecycleManager


[docs] class SandboxService(ServiceWithLifecycleManager):
[docs] def __init__(self, base_url=None, bearer_token=None): self.manager_api = None self.base_url = base_url self.bearer_token = bearer_token self._health = False
[docs] async def start(self) -> None: if self.manager_api is None: self.manager_api = SandboxManager( base_url=self.base_url, bearer_token=self.bearer_token, ) self._health = True
[docs] async def stop(self) -> None: # Release all environments if self.manager_api is None: self._health = False return session_keys = self.manager_api.list_session_keys() if session_keys: for session_ctx_id in session_keys: env_ids = self.manager_api.get_session_mapping(session_ctx_id) if env_ids: for env_id in env_ids: self.manager_api.release(env_id) if self.base_url is None: # Embedded mode self.manager_api.cleanup() self.manager_api = None
[docs] async def health(self) -> bool: return self._health
[docs] def connect( self, session_id: str, user_id: Optional[str] = None, sandbox_types=None, ) -> List: # Create a composite key session_ctx_id = self._create_session_ctx_id(session_id, user_id) env_ids = self.manager_api.get_session_mapping(session_ctx_id) # Check if the session_ctx_id already has an environment if env_ids: # Connect to existing environment return self._connect_existing_environment(env_ids) else: # Create a new environment return self._create_new_environment( session_ctx_id, sandbox_types, )
def _create_new_environment( self, session_ctx_id: str, sandbox_types: Optional[List[str]] = None, ): if sandbox_types is None: sandbox_types = [SandboxType.BASE] sandboxes = [] for env_type in sandbox_types: if env_type is None: continue box_type = SandboxType(env_type) if box_type != SandboxType.AGENTBAY: box_id = self.manager_api.create_from_pool( sandbox_type=box_type.value, meta={"session_ctx_id": session_ctx_id}, ) else: box_id = None box_cls = SandboxRegistry.get_classes_by_type(box_type) box = box_cls( sandbox_id=box_id, base_url=self.manager_api.base_url, bearer_token=self.bearer_token, ) # All the operation must be done after replace this action in # embedded mode if self.base_url is None: # Embedded mode box.manager_api = self.manager_api sandboxes.append(box) return sandboxes def _connect_existing_environment(self, env_ids: List[str]): boxes = [] for env_id in env_ids: # Check if this is an AgentBay session ID if self._is_agentbay_session_id(env_id): try: from ....sandbox.box.agentbay.agentbay_sandbox import ( AgentbaySandbox, ) # Connect to existing AgentBay session sandbox = AgentbaySandbox( sandbox_id=env_id, base_url=self.base_url, bearer_token=self.bearer_token, sandbox_type=SandboxType.AGENTBAY, ) boxes.append(sandbox) continue except Exception as e: import logging logger = logging.getLogger(__name__) logger.error( f"Failed to connect to AgentBay session {env_id}: {e}", ) continue # Standard sandbox connection info = self.manager_api.get_info(env_id) version = info.get("version", "") env_type = None for x in SandboxType: if x.value in version: env_type = x.value break if env_type is None: continue wb_type = SandboxType(env_type) box_cls = SandboxRegistry.get_classes_by_type(wb_type) box = box_cls( sandbox_id=env_id, base_url=self.manager_api.base_url, bearer_token=self.bearer_token, ) if self.base_url is None: # Embedded mode box.manager_api = self.manager_api boxes.append(box) return boxes def _is_agentbay_session_id(self, session_id: str) -> bool: """ Check if a session ID belongs to AgentBay. AgentBay session IDs typically start with 'session-' prefix. Args: session_id: Session ID to check Returns: True if this appears to be an AgentBay session ID """ return session_id.startswith("session-")
[docs] def release(self, session_id, user_id=None): session_ctx_id = self._create_session_ctx_id(session_id, user_id) env_ids = self.manager_api.get_session_mapping(session_ctx_id) if env_ids: for env_id in env_ids: # Check if this is an AgentBay session if self._is_agentbay_session_id(env_id): # AgentBay sessions are cleaned up automatically # when the sandbox object is destroyed continue # Standard sandbox release self.manager_api.release(env_id) return True
def _create_session_ctx_id(self, session_id, user_id): # Create a composite key from session_id and user_id if user_id is None: return session_id return f"{session_id}_{user_id}"