Source code for agentscope_runtime.engine.deployers.utils.docker_image_utils.docker_image_builder

# -*- coding: utf-8 -*-
# pylint:disable=too-many-branches

import json
import logging
import os
import subprocess
from typing import Optional, Dict
from pydantic import BaseModel

logger = logging.getLogger(__name__)


[docs] class RegistryConfig(BaseModel): """Container registry configuration""" registry_url: str = "" username: str = None password: str = None namespace: Optional[str] = "agentscope-runtime" image_pull_secret: str = None
[docs] def get_full_url(self) -> str: # Handle different registry URL formats if self.registry_url == "localhost": return self.registry_url return f"{self.registry_url}/{self.namespace}"
[docs] class BuildConfig(BaseModel): """Configuration for Docker image building""" no_cache: bool = False quiet: bool = False build_args: Dict[str, str] = {} platform: Optional[str] = None target: Optional[str] = None source_updated: bool = False
[docs] class DockerImageBuilder: """ Responsible solely for building and managing Docker images. Separated from project packaging for better separation of concerns. """
[docs] def __init__(self): """ Initialize Docker image builder. """ self._ensure_docker_available()
@staticmethod def _ensure_docker_available(): """Ensure Docker is available on the system""" try: result = subprocess.run( ["docker", "--version"], check=True, capture_output=True, text=True, ) logger.debug(f"Docker available: {result.stdout.strip()}") except (subprocess.CalledProcessError, FileNotFoundError) as e: raise RuntimeError( "Docker is not installed or not available in PATH. " "Please install Docker to use this functionality.", ) from e
[docs] @staticmethod def get_full_name( image_name: str, image_tag: str = "latest", ): return f"{image_name}:{image_tag}"
[docs] def build_image( self, build_context: str, image_name: str, image_tag: str = "latest", dockerfile_path: Optional[str] = None, config: Optional[BuildConfig] = None, source_updated: bool = False, ) -> str: """ Build Docker image from build context. Args: build_context: Path to build context directory image_name: Name for the Docker image image_tag: Tag for the Docker image dockerfile_path: Optional path to Dockerfile (defaults to Dockerfile in context) config: Build configuration source_updated: Optional flag to determine if source image should be updated. Returns: str: Full image name with tag Raises: subprocess.CalledProcessError: If docker build fails ValueError: If build context doesn't exist """ if not os.path.exists(build_context): raise ValueError(f"Build context does not exist: {build_context}") config = config or BuildConfig() full_image_name = self.get_full_name(image_name, image_tag) # if not source_updated: # return full_image_name logger.info(f"Source Updated: {source_updated}") # Prepare docker build command build_cmd = ["docker", "build", "-t", full_image_name] # Add dockerfile path if specified if dockerfile_path: if not os.path.isabs(dockerfile_path): dockerfile_path = os.path.join(build_context, dockerfile_path) build_cmd.extend(["-f", dockerfile_path]) # Add build arguments if config.build_args: for key, value in config.build_args.items(): build_cmd.extend(["--build-arg", f"{key}={value}"]) # Add platform if specified if config.platform: build_cmd.extend(["--platform", config.platform]) # Add target if specified if config.target: build_cmd.extend(["--target", config.target]) # Add additional options if config.no_cache: build_cmd.append("--no-cache") if config.quiet: build_cmd.append("--quiet") # Add build context path build_cmd.append(build_context) try: if config.quiet: # Capture output for quiet mode result = subprocess.run( build_cmd, check=True, capture_output=True, text=True, cwd=build_context, ) logger.info(f"Built image: {full_image_name}") if result.stdout: logger.debug(f"Build output: {result.stdout}") else: # Stream output for non-quiet mode logger.info(f"Building image: {full_image_name}") logger.debug(f"Build command: {' '.join(build_cmd)}") with subprocess.Popen( build_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, cwd=build_context, ) as process: # Stream output in real-time while True: output = process.stdout.readline() if output == "" and process.poll() is not None: break if output: print(output.strip()) process.wait() if process.returncode != 0: raise subprocess.CalledProcessError( process.returncode, build_cmd, "Docker build failed", ) logger.info(f"Successfully built image: {full_image_name}") return full_image_name except subprocess.CalledProcessError as e: error_msg = f"Docker build failed for image {full_image_name}" if hasattr(e, "output") and e.output: error_msg += f"\nError output: {e.output}" logger.error(error_msg) raise subprocess.CalledProcessError( e.returncode, e.cmd, error_msg, ) from e
[docs] def tag_image( self, image_name: str, registry_config: Optional[RegistryConfig] = None, ) -> str: """ Tag image with registry info. Args: image_name: Full image name to push registry_config: Optional registry config (uses instance config if None) Returns: registry_image_name: Full image name with url """ config = registry_config if not config: raise ValueError("No registry configuration provided") # Construct full registry image name if config.registry_url and not image_name.startswith( config.registry_url, ): registry_image_name = f"{config.get_full_url()}/{image_name}" # Tag the image with registry prefix subprocess.run( ["docker", "tag", image_name, registry_image_name], check=True, capture_output=True, ) else: registry_image_name = image_name return registry_image_name
[docs] def push_image( self, image_name: str, registry_config: Optional[RegistryConfig] = None, quiet: bool = False, ) -> str: """ Push image to registry. Args: image_name: Full image name to push registry_config: Optional registry config (uses instance config if None) quiet: Whether to suppress output Returns: str: Full image name that was pushed Raises: subprocess.CalledProcessError: If push fails ValueError: If no registry configuration is available """ registry_image_name = self.tag_image(image_name, registry_config) try: push_cmd = ["docker", "push", registry_image_name] if quiet: result = subprocess.run( push_cmd, check=True, capture_output=True, text=True, ) logger.info(f"Pushed image: {registry_image_name}") if result.stdout: logger.debug(f"Push output: {result.stdout}") else: logger.info(f"Pushing image: {registry_image_name}") with subprocess.Popen( push_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, ) as process: # Stream output in real-time while True: output = process.stdout.readline() if output == "" and process.poll() is not None: break if output: print(output.strip()) process.wait() if process.returncode != 0: raise subprocess.CalledProcessError( process.returncode, push_cmd, "Docker push failed", ) logger.info( f"Successfully pushed image: {registry_image_name}", ) return registry_image_name except subprocess.CalledProcessError as e: error_msg = f"Docker push failed for image {registry_image_name}" if hasattr(e, "stderr") and e.stderr: error_msg += f"\nError output: {e.stderr}" logger.error(error_msg) raise subprocess.CalledProcessError( e.returncode, e.cmd, error_msg, ) from e
[docs] def build_and_push( self, build_context: str, image_name: str, image_tag: str = "latest", dockerfile_path: Optional[str] = None, build_config: Optional[BuildConfig] = None, registry_config: Optional[RegistryConfig] = None, source_updated: bool = False, ) -> str: """ Build and push image in one operation. Args: build_context: Path to build context directory image_name: Name for the Docker image image_tag: Tag for the Docker image dockerfile_path: Optional path to Dockerfile build_config: Build configuration registry_config: Registry configuration source_updated: Whether the source image was updated or not Returns: str: Full registry image name """ # Build the image built_image = self.build_image( build_context=build_context, image_name=image_name, image_tag=image_tag, dockerfile_path=dockerfile_path, config=build_config, source_updated=source_updated, ) # Push to registry registry_image = self.push_image( image_name=built_image, registry_config=registry_config, quiet=build_config.quiet if build_config else False, ) # make sure return the built name without registry return registry_image.split("/")[-1]
[docs] def remove_image( self, image_name: str, force: bool = False, quiet: bool = True, ) -> bool: """ Remove Docker image. Args: image_name: Name of image to remove force: Force removal quiet: Suppress output Returns: bool: True if successful """ try: cmd = ["docker", "rmi"] if force: cmd.append("-f") cmd.append(image_name) subprocess.run( cmd, check=True, capture_output=quiet, text=True, ) if not quiet: logger.info(f"Removed image: {image_name}") return True except subprocess.CalledProcessError as e: if not quiet: logger.warning(f"Failed to remove image {image_name}: {e}") return False
[docs] def get_image_info(self, image_name: str) -> Dict: """ Get information about a Docker image. Args: image_name: Name of the Docker image Returns: Dict: Image information from docker inspect Raises: ValueError: If image not found or info invalid """ try: result = subprocess.run( ["docker", "inspect", image_name], check=True, capture_output=True, text=True, ) image_info = json.loads(result.stdout)[0] return image_info except subprocess.CalledProcessError as e: raise ValueError(f"Image not found: {image_name}") from e except (json.JSONDecodeError, IndexError) as e: raise ValueError(f"Invalid image info for: {image_name}") from e
[docs] def image_exists(self, image_name: str) -> bool: """ Check if Docker image exists locally. Args: image_name: Name of image to check Returns: bool: True if image exists """ try: self.get_image_info(image_name) return True except ValueError: return False