Source code for agentscope_runtime.engine.deployers.utils.service_utils.interrupt.base_backend

# -*- coding: utf-8 -*-
import enum
from abc import ABC, abstractmethod
from typing import AsyncGenerator, Optional


[docs] class TaskState(enum.Enum): """Lifecycle states for distributed task execution.""" IDLE = "IDLE" RUNNING = "RUNNING" STOPPED = "STOPPED" FINISHED = "FINISHED" ERROR = "ERROR"
[docs] class InterruptSignal(enum.Enum): """Control signals for inter-service communication.""" STOP = "STOP" PAUSE = "PAUSE" RESUME = "RESUME"
[docs] class BaseInterruptBackend(ABC): """Abstract interface for task state persistence and signaling."""
[docs] @abstractmethod async def publish_event(self, channel: str, message: str) -> None: """Broadcast a control message to the specified channel."""
[docs] @abstractmethod async def subscribe_listen( self, channel: str, ) -> AsyncGenerator[str, None]: """Subscribe to a signal stream and yield incoming messages.""" yield ""
[docs] @abstractmethod async def set_task_state( self, key: str, state: TaskState, ttl: int = 3600, ) -> None: """Set the task state with an optional time-to-live (TTL)."""
[docs] @abstractmethod async def compare_and_set_state( self, key: str, new_state: TaskState, expected_state: TaskState, negate: bool = False, ttl: int = 3600, ) -> bool: """ Perform an atomic Compare-And-Set (CAS) operation on the task state. This method updates the state to `new_state` only if the current state matches the `expected_state` condition. Args: key (str): The unique identifier for the task. new_state (TaskState): The state to be set if the condition is met. expected_state (TaskState): The state to compare against the current persisted state. negate (bool): If True, the update occurs only if the current state is NOT equal to `expected_state`. If False (default), the update occurs only if the current state IS equal to `expected_state`. ttl (int): Time-to-live for the state record in seconds. Returns: bool: True if the state was successfully updated, False otherwise. """
[docs] @abstractmethod async def get_task_state(self, key: str) -> Optional[TaskState]: """Retrieve the current state of a task."""
[docs] @abstractmethod async def delete_task_state(self, key: str) -> None: """Permanently remove the state record of a task."""
[docs] @abstractmethod async def aclose(self) -> None: """Asynchronously release backend connection resources."""