"""Package builder with parallel execution and proper locking.""" import asyncio import fcntl import os import shutil import subprocess from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path from concurrent.futures import ProcessPoolExecutor from typing import Any, TYPE_CHECKING if TYPE_CHECKING: from archrepobuild.repo import RepoManager from archrepobuild.aur import AURClient from archrepobuild.config import Config, PackageOverride from archrepobuild.logging import get_logger from archrepobuild.resolver import DependencyResolver logger = get_logger("builder") class BuildStatus(Enum): """Build status for a package.""" PENDING = "pending" BUILDING = "building" SUCCESS = "success" FAILED = "failed" SKIPPED = "skipped" @dataclass class BuildResult: """Result of a package build.""" package: str status: BuildStatus version: str | None = None duration: float = 0.0 error: str | None = None artifacts: list[Path] = field(default_factory=list) timestamp: datetime = field(default_factory=datetime.now) class FileLock: """Context manager for file-based locking using flock.""" def __init__(self, path: Path): """Initialize lock on file path. Args: path: Path to lock file """ self.path = path self.fd: int | None = None def __enter__(self) -> "FileLock": """Acquire exclusive lock.""" self.path.parent.mkdir(parents=True, exist_ok=True) self.fd = os.open(str(self.path), os.O_RDWR | os.O_CREAT) fcntl.flock(self.fd, fcntl.LOCK_EX) logger.debug(f"Acquired lock: {self.path}") return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Release lock.""" if self.fd is not None: fcntl.flock(self.fd, fcntl.LOCK_UN) os.close(self.fd) logger.debug(f"Released lock: {self.path}") def _run_makepkg( package_dir: Path, sign: bool = False, key: str = "", clean: bool = True, force: bool = False, skip_checksums: bool = False, extra_args: list[str] | None = None, env_overrides: dict[str, str] | None = None, ) -> tuple[bool, str, list[Path]]: """Run makepkg in a subprocess. This runs in a separate process for parallelization. Args: package_dir: Directory containing PKGBUILD sign: Whether to sign packages key: GPG key for signing clean: Clean build directory after build skip_checksums: Skip checksum verification extra_args: Additional makepkg arguments env_overrides: Environment variable overrides Returns: Tuple of (success, error_message, artifact_paths) """ cmd = ["makepkg", "-s", "--noconfirm"] if clean: cmd.append("-c") if sign and key: cmd.extend(["--sign", "--key", key]) if force: cmd.append("-f") if skip_checksums: cmd.append("--skipchecksums") if extra_args: cmd.extend(extra_args) env = os.environ.copy() if env_overrides: env.update(env_overrides) try: result = subprocess.run( cmd, cwd=package_dir, capture_output=True, text=True, env=env, timeout=3600, # 1 hour timeout ) if result.returncode != 0: error = result.stderr or result.stdout if "A package has already been built" in error: logger.info("Package already built, treating as success") # Find built packages anyway artifacts = list(package_dir.glob("*.pkg.tar.*")) artifacts = [a for a in artifacts if not a.name.endswith(".sig")] return True, "", artifacts return False, error, [] # Find built packages artifacts = list(package_dir.glob("*.pkg.tar.*")) artifacts = [a for a in artifacts if not a.name.endswith(".sig")] return True, "", artifacts except subprocess.TimeoutExpired: return False, "Build timed out after 1 hour", [] except Exception as e: return False, str(e), [] class Builder: """Package builder with parallel execution support.""" def __init__( self, config: Config, aur_client: AURClient, repo: RepoManager | None = None, ): """Initialize builder. Args: config: Application configuration aur_client: AUR client for package info repo: Optional repository manager for incremental registration """ self.config = config self.aur_client = aur_client self.repo = repo self.resolver = DependencyResolver(aur_client) self._lock_dir = config.repository.build_dir / ".locks" self._executor: ProcessPoolExecutor | None = None async def __aenter__(self) -> "Builder": """Async context manager entry.""" if self.config.building.parallel: max_workers = self.config.building.max_workers self._executor = ProcessPoolExecutor(max_workers=max_workers) logger.info(f"Builder initialized with {max_workers} workers (parallel)") else: self._executor = None logger.info("Builder initialized (sequential)") return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" if self._executor: self._executor.shutdown(wait=True) self._executor = None def _get_lock_path(self, package: str) -> Path: """Get lock file path for package. Args: package: Package name Returns: Path to lock file """ return self._lock_dir / f"{package}.lock" def _get_package_dir(self, package: str) -> Path: """Get build directory for package. Args: package: Package name Returns: Path to package build directory """ return self.config.repository.build_dir / package def _get_override(self, package: str) -> PackageOverride: """Get package-specific overrides. Args: package: Package name Returns: PackageOverride (default if not specified) """ # Check for package-specific override first, then fall back to _default if package in self.config.package_overrides: return self.config.package_overrides[package] return self.config.package_overrides.get("_default", PackageOverride()) async def _clone_or_update(self, package: str) -> bool: """Clone or update package from AUR. Args: package: Package name Returns: True if there were updates (or new clone) """ pkg_dir = self._get_package_dir(package) if pkg_dir.exists(): # Update existing repo result = subprocess.run( ["git", "reset", "--hard"], cwd=pkg_dir, capture_output=True, ) result = subprocess.run( ["git", "pull"], cwd=pkg_dir, capture_output=True, text=True, ) return "Already up to date" not in result.stdout else: # Clone new repo pkg_info = await self.aur_client.get_package(package) if not pkg_info: raise ValueError(f"Package not found in AUR: {package}") pkg_dir.parent.mkdir(parents=True, exist_ok=True) try: subprocess.run( ["git", "clone", pkg_info.git_url, str(pkg_dir)], check=True, capture_output=True, text=True, ) except subprocess.CalledProcessError as e: logger.error(f"Failed to clone {package} from {pkg_info.git_url}: {e.stderr}") raise ValueError(f"Failed to clone package from AUR: {e.stderr}") return True def _is_vcs_package(self, package_dir: Path) -> bool: """Check if package is a VCS package (needs rebuild on each run). Args: package_dir: Path to package directory Returns: True if VCS package """ pkgbuild = package_dir / "PKGBUILD" if not pkgbuild.exists(): return False content = pkgbuild.read_text() return "pkgver()" in content async def build_package( self, package: str, force: bool = False, ) -> BuildResult: """Build a single package. Args: package: Package name force: Force rebuild even if up to date Returns: BuildResult with status and artifacts """ start_time = datetime.now() pkg_dir = self._get_package_dir(package) override = self._get_override(package) logger.info(f"Building package: {package}") try: # Clone or update has_updates = await self._clone_or_update(package) is_vcs = self._is_vcs_package(pkg_dir) # Skip if no updates and not forced if not has_updates and not is_vcs and not force: logger.info(f"Skipping {package}: already up to date") return BuildResult( package=package, status=BuildStatus.SKIPPED, duration=(datetime.now() - start_time).total_seconds(), ) # Run build with retries last_error = "" for attempt in range(self.config.building.retry_attempts): if attempt > 0: delay = self.config.building.retry_delay * (2 ** (attempt - 1)) logger.warning( f"Retrying {package} (attempt {attempt + 1}/" f"{self.config.building.retry_attempts}) after {delay}s" ) await asyncio.sleep(delay) # Run makepkg in executor loop = asyncio.get_event_loop() success, error, artifacts = await loop.run_in_executor( self._executor, _run_makepkg, pkg_dir, self.config.signing.enabled, self.config.signing.key, self.config.building.clean, force or is_vcs, override.skip_checksums, override.extra_args, override.env, ) if success: duration = (datetime.now() - start_time).total_seconds() logger.info(f"Successfully built {package} in {duration:.1f}s") return BuildResult( package=package, status=BuildStatus.SUCCESS, duration=duration, artifacts=artifacts, ) last_error = error # All retries failed duration = (datetime.now() - start_time).total_seconds() logger.error(f"Failed to build {package}: {last_error}") return BuildResult( package=package, status=BuildStatus.FAILED, duration=duration, error=last_error, ) except Exception as e: duration = (datetime.now() - start_time).total_seconds() logger.exception(f"Error building {package}") return BuildResult( package=package, status=BuildStatus.FAILED, duration=duration, error=str(e), ) async def build_all( self, force: bool = False, ) -> list[BuildResult]: """Build all packages in build directory. Args: force: Force rebuild all packages Returns: List of build results """ # Update system if configured if self.config.building.update_system: logger.info("Updating system...") subprocess.run( ["sudo", "pacman", "-Syu", "--noconfirm"], check=False, ) # Find all packages build_dir = self.config.repository.build_dir packages = [ d.name for d in build_dir.iterdir() if d.is_dir() and not d.name.startswith(".") ] if not packages: logger.warning("No packages found in build directory") return [] logger.info(f"Building {len(packages)} packages") # Build in parallel or sequentially if self.config.building.parallel: tasks = [self.build_package(pkg, force) for pkg in packages] results = await asyncio.gather(*tasks) else: results = [] for pkg in packages: result = await self.build_package(pkg, force) results.append(result) # Summary success = sum(1 for r in results if r.status == BuildStatus.SUCCESS) failed = sum(1 for r in results if r.status == BuildStatus.FAILED) skipped = sum(1 for r in results if r.status == BuildStatus.SKIPPED) logger.info(f"Build complete: {success} succeeded, {failed} failed, {skipped} skipped") return list(results) async def download_package(self, package: str) -> BuildResult: """Download a package from a repository using pacman. Args: package: Package name Returns: BuildResult with status and artifact path """ start_time = datetime.now() logger.info(f"Downloading package from repositories: {package}") dest_dir = self.config.repository.build_dir / "downloads" dest_dir.mkdir(parents=True, exist_ok=True) try: # Use pacman -Sw to download to a specific directory is not directly possible # But we can use pacman -Sp to get the URL and download it result = subprocess.run( ["pacman", "-Sp", "--noconfirm", package], capture_output=True, text=True, check=True, ) urls = [line for line in result.stdout.strip().split("\n") if line.startswith("http") or line.startswith("ftp") or line.startswith("file")] if not urls: raise ValueError(f"Could not find download URL for package: {package}") artifacts: list[Path] = [] import aiohttp async with aiohttp.ClientSession() as session: for url in urls: filename = url.split("/")[-1] dest_path = dest_dir / filename logger.debug(f"Downloading {url} to {dest_path}") async with session.get(url) as response: response.raise_for_status() with open(dest_path, "wb") as f: while True: chunk = await response.content.read(8192) if not chunk: break f.write(chunk) artifacts.append(dest_path) duration = (datetime.now() - start_time).total_seconds() logger.info(f"Successfully downloaded {package} in {duration:.1f}s") return BuildResult( package=package, status=BuildStatus.SUCCESS, duration=duration, artifacts=artifacts, ) except Exception as e: duration = (datetime.now() - start_time).total_seconds() logger.error(f"Failed to download {package}: {e}") return BuildResult( package=package, status=BuildStatus.FAILED, duration=duration, error=str(e), ) async def add_package(self, package: str) -> BuildResult: """Add and build (or download) a new package with dependencies. Args: package: Package name Returns: BuildResult for the main package """ logger.info(f"Adding package: {package}") # Resolve dependencies build_order = await self.resolver.resolve([package]) # Filter build order: skip managed repo, download others, build AUR final_results: list[BuildResult] = [] for pkg_name in build_order: repo = self.resolver.is_in_repos(pkg_name) if repo == self.config.repository.name: logger.info(f"Package {pkg_name} already in managed repository, skipping") if pkg_name == package: return BuildResult(package=package, status=BuildStatus.SKIPPED) continue if repo: logger.info(f"Package {pkg_name} found in {repo}, downloading...") result = await self.download_package(pkg_name) else: logger.info(f"Package {pkg_name} only in AUR, building...") result = await self.build_package(pkg_name, force=True) final_results.append(result) if result.status == BuildStatus.SUCCESS: if self.repo: logger.info(f"Adding {pkg_name} to repository") self.repo.add_packages(result) # Refresh resolver cache to recognize the newly added package self.resolver._refresh_pacman_cache(sync=True) elif result.status == BuildStatus.FAILED: logger.error(f"Failed to process {pkg_name}, aborting") if pkg_name == package: return result return BuildResult( package=package, status=BuildStatus.FAILED, error=f"Dependency {pkg_name} failed: {result.error}", ) # Return result for the main package for r in final_results: if r.package == package: return r return BuildResult(package=package, status=BuildStatus.SKIPPED) def remove_package(self, package: str) -> bool: """Remove a package from the build directory. Args: package: Package name Returns: True if removed successfully """ pkg_dir = self._get_package_dir(package) if pkg_dir.exists(): shutil.rmtree(pkg_dir) logger.info(f"Removed package: {package}") return True logger.warning(f"Package not found: {package}") return False