Source code for agentscope_runtime.engine.tracing.base

# -*- coding: utf-8 -*-
import time
import traceback
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Any, Dict, List

from opentelemetry.propagate import inject, extract
from opentelemetry.context.context import Context


# Handler Interface
[docs] class TracerHandler(ABC): """Abstract base class for tracer handlers."""
[docs] @abstractmethod def on_start( self, event_name: str, payload: Dict[str, Any], **kwargs: Any, ) -> None: """Handle the start of a trace event. Args: event_name (str): The name of event being traced. payload (Dict[str, Any]): The payload data for the event. **kwargs (Any): Additional keyword arguments. """ raise NotImplementedError("Subclasses must implement on_start method")
[docs] @abstractmethod def on_end( self, event_name: str, start_payload: Dict[str, Any], end_payload: Dict[str, Any], start_time: float, **kwargs: Any, ) -> None: """Handle the end of a trace event. Args: event_name (str): The name of event being traced. start_payload (Dict[str, Any]): The payload data from event start. end_payload (Dict[str, Any]): The payload data from event end. start_time (float): The timestamp when the event started. **kwargs (Any): Additional keyword arguments. """ raise NotImplementedError("Subclasses must implement on_end method")
[docs] @abstractmethod def on_log(self, message: str, **kwargs: Any) -> None: """Handle a log message during tracing. Args: message (str): The log message. **kwargs (Any): Additional keyword arguments. """ raise NotImplementedError("Subclasses must implement on_log method")
[docs] @abstractmethod def on_error( self, event_name: str, start_payload: Dict[str, Any], error: Exception, start_time: float, traceback_info: str, **kwargs: Any, ) -> None: """Handle an error during tracing. Args: event_name (str): The type of event being traced. start_payload (Dict[str, Any]): The payload data from event start. error (Exception): The exception that occurred. start_time (float): The timestamp when the event started. traceback_info (str): The traceback information. **kwargs (Any): Additional keyword arguments. """ raise NotImplementedError("Subclasses must implement on_error method")
[docs] class BaseLogHandler(TracerHandler): """Basic log handler implementation using Python's logging module.""" import logging logger = logging.getLogger(__name__)
[docs] def on_start( self, event_name: str, payload: Dict[str, Any], **kwargs: Any, ) -> None: """Log the start of a trace event. Args: event_name (str): The name of event being traced. payload (Dict[str, Any]): The payload data for the event. **kwargs (Any): Additional keyword arguments. """ self.logger.info(f"Event {event_name} started with payload: {payload}")
[docs] def on_end( self, event_name: str, start_payload: Dict[str, Any], end_payload: Dict[str, Any], start_time: float, **kwargs: Any, ) -> None: """Log the end of a trace event. Args: event_name (str): The name of event being traced. start_payload (Dict[str, Any]): The payload data from event start. end_payload (Dict[str, Any]): The payload data from event end. start_time (float): The timestamp when the event started. **kwargs (Any): Additional keyword arguments. """ self.logger.info( f"Event {event_name} ended with start payload: {start_payload}, " f"end payload: {end_payload}, duration: " f"{time.time() - start_time} seconds, kwargs: {kwargs}", )
[docs] def on_log(self, message: str, **kwargs: Any) -> None: """Log a message during tracing. Args: message (str): The log message. **kwargs (Any): Additional keyword arguments. """ self.logger.info(f"Log: {message}")
[docs] def on_error( self, event_name: str, start_payload: Dict[str, Any], error: Exception, start_time: float, traceback_info: str, **kwargs: Any, ) -> None: """Log an error during tracing. Args: event_name (str): The name of event being traced. start_payload (Dict[str, Any]): The payload data from event start. error (Exception): The exception that occurred. start_time (float): The timestamp when the event started. traceback_info (str): The traceback information. **kwargs (Any): Additional keyword arguments. """ self.logger.error( f"Error in event {event_name} with payload: {start_payload}, " f"error: {error}, " f"traceback: {traceback_info}, duration: " f"{time.time() - start_time} seconds, kwargs: {kwargs}", )
[docs] class Tracer: """ Tracer class for logging events. Usage example:: with tracer.event(TraceType.LLM, payload) as event: event.log("message") # ...logic here... end_payload = {"xxx": "value"} # optional on_end call for additional payload and kwargs event.on_end(end_payload, if_success=True) """
[docs] def __init__(self, handlers: List[TracerHandler]): """Initialize the tracer with a list of handlers. Args: handlers (List[TracerHandler]): List of handlers to process trace events. """ self.handlers = handlers
[docs] @contextmanager def event( self, span: Any, event_name: str, payload: Dict[str, Any], **kwargs: Any, ) -> Any: """Create a context manager for tracing an event. Args: span(Any): span of event event_name (str): The name of event being traced. payload (Dict[str, Any]): The payload data for the event. **kwargs (Any): Additional keyword arguments. Yields: EventContext: The event context for logging and managing the trace. """ start_time = time.time() for handle in self.handlers: handle.on_start( event_name, payload, **kwargs, ) event_context = EventContext( span, self.handlers, event_name, start_time, payload, ) try: yield event_context except Exception as e: traceback_info = traceback.format_exc() for handle in self.handlers: handle.on_error( event_name, payload, e, start_time, traceback_info=traceback_info, ) raise event_context.finalize(payload)
[docs] def log(self, message: str, **kwargs: Any) -> None: """Log a message using all registered handlers. Args: message (str): The log message. **kwargs (Any): Additional keyword arguments. """ for handle in self.handlers: handle.on_log(message, **kwargs)
[docs] class EventContext: """Context manager for individual trace events."""
[docs] def __init__( self, span: Any, handlers: List[TracerHandler], event_name: str, start_time: float, start_payload: Dict[str, Any], ) -> None: """Initialize the event context. Args: handlers (List[TracerHandler]): List of handlers to process trace events. event_name (str): The name of event being traced. start_time (float): The timestamp when the event started. start_payload (Dict[str, Any]): The payload data from event start. """ self.span = span self.handlers = handlers self.event_name = event_name self.start_time = start_time self.start_payload = start_payload self.end_payload = {} self.kwargs = {}
[docs] def on_end(self, payload: Dict[str, Any], **kwargs: Any) -> None: """Set the end payload and additional kwargs for the event. Args: payload (Dict[str, Any]): The payload data for event end. **kwargs (Any): Additional keyword arguments. """ self.end_payload = payload self.kwargs = kwargs
[docs] def on_log(self, message: str, **kwargs: Any) -> None: """Log a message within this event context. Args: message (str): The log message. **kwargs (Any): Additional keyword arguments. """ kwargs["event_name"] = self.event_name kwargs["start_time"] = self.start_time kwargs["start_payload"] = self.start_payload for handle in self.handlers: handle.on_log(message, **kwargs)
[docs] def finalize(self, start_payload: Dict[str, Any] = None) -> None: """Public method to finalize the event. Args: start_payload (Dict[str, Any], optional): The payload data from event start. """ self._end(start_payload)
def _end(self, start_payload: Dict[str, Any] = None) -> None: """Finalize the event by calling on_end for all handlers. Args: start_payload (Dict[str, Any], optional): The payload data from event start. """ for handle in self.handlers: handle.on_end( self.event_name, start_payload, self.end_payload, self.start_time, **self.kwargs, )
[docs] def set_attribute(self, key: str, value: Any) -> None: """Set attribute for the current span. Args: key (str): key of attribute value(str): value of attribute """ self.span.set_attribute(key, value)
[docs] def get_trace_context(self) -> Context: carrier = {} inject(carrier) context = extract(carrier) return context