"""Package builder with parallel execution and proper locking."""
|
|
import asyncio
|
import fcntl
|
import os
|
import shutil
|
import subprocess
|
from dataclasses import dataclass, field
|
from datetime import datetime
|
from enum import Enum
|
from pathlib import Path
|
from concurrent.futures import ProcessPoolExecutor
|
from typing import Any
|
|
from archbuild.aur import AURClient
|
from archbuild.config import Config, PackageOverride
|
from archbuild.logging import get_logger
|
from archbuild.resolver import DependencyResolver
|
|
logger = get_logger("builder")
|
|
|
class BuildStatus(Enum):
|
"""Build status for a package."""
|
|
PENDING = "pending"
|
BUILDING = "building"
|
SUCCESS = "success"
|
FAILED = "failed"
|
SKIPPED = "skipped"
|
|
|
@dataclass
|
class BuildResult:
|
"""Result of a package build."""
|
|
package: str
|
status: BuildStatus
|
version: str | None = None
|
duration: float = 0.0
|
error: str | None = None
|
artifacts: list[Path] = field(default_factory=list)
|
timestamp: datetime = field(default_factory=datetime.now)
|
|
|
class FileLock:
|
"""Context manager for file-based locking using flock."""
|
|
def __init__(self, path: Path):
|
"""Initialize lock on file path.
|
|
Args:
|
path: Path to lock file
|
"""
|
self.path = path
|
self.fd: int | None = None
|
|
def __enter__(self) -> "FileLock":
|
"""Acquire exclusive lock."""
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
self.fd = os.open(str(self.path), os.O_RDWR | os.O_CREAT)
|
fcntl.flock(self.fd, fcntl.LOCK_EX)
|
logger.debug(f"Acquired lock: {self.path}")
|
return self
|
|
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
"""Release lock."""
|
if self.fd is not None:
|
fcntl.flock(self.fd, fcntl.LOCK_UN)
|
os.close(self.fd)
|
logger.debug(f"Released lock: {self.path}")
|
|
|
def _run_makepkg(
|
package_dir: Path,
|
sign: bool = False,
|
key: str = "",
|
clean: bool = True,
|
skip_checksums: bool = False,
|
extra_args: list[str] | None = None,
|
env_overrides: dict[str, str] | None = None,
|
) -> tuple[bool, str, list[Path]]:
|
"""Run makepkg in a subprocess.
|
|
This runs in a separate process for parallelization.
|
|
Args:
|
package_dir: Directory containing PKGBUILD
|
sign: Whether to sign packages
|
key: GPG key for signing
|
clean: Clean build directory after build
|
skip_checksums: Skip checksum verification
|
extra_args: Additional makepkg arguments
|
env_overrides: Environment variable overrides
|
|
Returns:
|
Tuple of (success, error_message, artifact_paths)
|
"""
|
cmd = ["makepkg", "-s", "--noconfirm"]
|
|
if clean:
|
cmd.append("-c")
|
if sign and key:
|
cmd.extend(["--sign", "--key", key])
|
if skip_checksums:
|
cmd.append("--skipchecksums")
|
if extra_args:
|
cmd.extend(extra_args)
|
|
env = os.environ.copy()
|
if env_overrides:
|
env.update(env_overrides)
|
|
try:
|
result = subprocess.run(
|
cmd,
|
cwd=package_dir,
|
capture_output=True,
|
text=True,
|
env=env,
|
timeout=3600, # 1 hour timeout
|
)
|
|
if result.returncode != 0:
|
return False, result.stderr or result.stdout, []
|
|
# Find built packages
|
artifacts = list(package_dir.glob("*.pkg.tar.*"))
|
artifacts = [a for a in artifacts if not a.name.endswith(".sig")]
|
|
return True, "", artifacts
|
|
except subprocess.TimeoutExpired:
|
return False, "Build timed out after 1 hour", []
|
except Exception as e:
|
return False, str(e), []
|
|
|
class Builder:
|
"""Package builder with parallel execution support."""
|
|
def __init__(
|
self,
|
config: Config,
|
aur_client: AURClient,
|
):
|
"""Initialize builder.
|
|
Args:
|
config: Application configuration
|
aur_client: AUR client for package info
|
"""
|
self.config = config
|
self.aur_client = aur_client
|
self.resolver = DependencyResolver(aur_client)
|
self._lock_dir = config.repository.build_dir / ".locks"
|
self._executor: ProcessPoolExecutor | None = None
|
|
async def __aenter__(self) -> "Builder":
|
"""Async context manager entry."""
|
if self.config.building.parallel:
|
max_workers = self.config.building.max_workers
|
self._executor = ProcessPoolExecutor(max_workers=max_workers)
|
logger.info(f"Builder initialized with {max_workers} workers (parallel)")
|
else:
|
self._executor = None
|
logger.info("Builder initialized (sequential)")
|
return self
|
|
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
"""Async context manager exit."""
|
if self._executor:
|
self._executor.shutdown(wait=True)
|
self._executor = None
|
|
def _get_lock_path(self, package: str) -> Path:
|
"""Get lock file path for package.
|
|
Args:
|
package: Package name
|
|
Returns:
|
Path to lock file
|
"""
|
return self._lock_dir / f"{package}.lock"
|
|
def _get_package_dir(self, package: str) -> Path:
|
"""Get build directory for package.
|
|
Args:
|
package: Package name
|
|
Returns:
|
Path to package build directory
|
"""
|
return self.config.repository.build_dir / package
|
|
def _get_override(self, package: str) -> PackageOverride:
|
"""Get package-specific overrides.
|
|
Args:
|
package: Package name
|
|
Returns:
|
PackageOverride (default if not specified)
|
"""
|
# Check for package-specific override first, then fall back to _default
|
if package in self.config.package_overrides:
|
return self.config.package_overrides[package]
|
return self.config.package_overrides.get("_default", PackageOverride())
|
|
async def _clone_or_update(self, package: str) -> bool:
|
"""Clone or update package from AUR.
|
|
Args:
|
package: Package name
|
|
Returns:
|
True if there were updates (or new clone)
|
"""
|
pkg_dir = self._get_package_dir(package)
|
|
if pkg_dir.exists():
|
# Update existing repo
|
result = subprocess.run(
|
["git", "reset", "--hard"],
|
cwd=pkg_dir,
|
capture_output=True,
|
)
|
result = subprocess.run(
|
["git", "pull"],
|
cwd=pkg_dir,
|
capture_output=True,
|
text=True,
|
)
|
return "Already up to date" not in result.stdout
|
else:
|
# Clone new repo
|
pkg_info = await self.aur_client.get_package(package)
|
if not pkg_info:
|
raise ValueError(f"Package not found in AUR: {package}")
|
|
pkg_dir.parent.mkdir(parents=True, exist_ok=True)
|
subprocess.run(
|
["git", "clone", pkg_info.git_url, str(pkg_dir)],
|
check=True,
|
capture_output=True,
|
)
|
return True
|
|
def _is_vcs_package(self, package_dir: Path) -> bool:
|
"""Check if package is a VCS package (needs rebuild on each run).
|
|
Args:
|
package_dir: Path to package directory
|
|
Returns:
|
True if VCS package
|
"""
|
pkgbuild = package_dir / "PKGBUILD"
|
if not pkgbuild.exists():
|
return False
|
|
content = pkgbuild.read_text()
|
return "pkgver()" in content
|
|
async def build_package(
|
self,
|
package: str,
|
force: bool = False,
|
) -> BuildResult:
|
"""Build a single package.
|
|
Args:
|
package: Package name
|
force: Force rebuild even if up to date
|
|
Returns:
|
BuildResult with status and artifacts
|
"""
|
start_time = datetime.now()
|
pkg_dir = self._get_package_dir(package)
|
override = self._get_override(package)
|
|
logger.info(f"Building package: {package}")
|
|
try:
|
# Clone or update
|
has_updates = await self._clone_or_update(package)
|
is_vcs = self._is_vcs_package(pkg_dir)
|
|
# Skip if no updates and not forced
|
if not has_updates and not is_vcs and not force:
|
logger.info(f"Skipping {package}: already up to date")
|
return BuildResult(
|
package=package,
|
status=BuildStatus.SKIPPED,
|
duration=(datetime.now() - start_time).total_seconds(),
|
)
|
|
# Run build with retries
|
last_error = ""
|
for attempt in range(self.config.building.retry_attempts):
|
if attempt > 0:
|
delay = self.config.building.retry_delay * (2 ** (attempt - 1))
|
logger.warning(
|
f"Retrying {package} (attempt {attempt + 1}/"
|
f"{self.config.building.retry_attempts}) after {delay}s"
|
)
|
await asyncio.sleep(delay)
|
|
# Run makepkg in executor
|
loop = asyncio.get_event_loop()
|
success, error, artifacts = await loop.run_in_executor(
|
self._executor,
|
_run_makepkg,
|
pkg_dir,
|
self.config.signing.enabled,
|
self.config.signing.key,
|
self.config.building.clean,
|
override.skip_checksums,
|
override.extra_args,
|
override.env,
|
)
|
|
if success:
|
duration = (datetime.now() - start_time).total_seconds()
|
logger.info(f"Successfully built {package} in {duration:.1f}s")
|
return BuildResult(
|
package=package,
|
status=BuildStatus.SUCCESS,
|
duration=duration,
|
artifacts=artifacts,
|
)
|
|
last_error = error
|
|
# All retries failed
|
duration = (datetime.now() - start_time).total_seconds()
|
logger.error(f"Failed to build {package}: {last_error}")
|
return BuildResult(
|
package=package,
|
status=BuildStatus.FAILED,
|
duration=duration,
|
error=last_error,
|
)
|
|
except Exception as e:
|
duration = (datetime.now() - start_time).total_seconds()
|
logger.exception(f"Error building {package}")
|
return BuildResult(
|
package=package,
|
status=BuildStatus.FAILED,
|
duration=duration,
|
error=str(e),
|
)
|
|
async def build_all(
|
self,
|
force: bool = False,
|
) -> list[BuildResult]:
|
"""Build all packages in build directory.
|
|
Args:
|
force: Force rebuild all packages
|
|
Returns:
|
List of build results
|
"""
|
# Update system if configured
|
if self.config.building.update_system:
|
logger.info("Updating system...")
|
subprocess.run(
|
["sudo", "pacman", "-Syu", "--noconfirm"],
|
check=False,
|
)
|
|
# Find all packages
|
build_dir = self.config.repository.build_dir
|
packages = [
|
d.name for d in build_dir.iterdir()
|
if d.is_dir() and not d.name.startswith(".")
|
]
|
|
if not packages:
|
logger.warning("No packages found in build directory")
|
return []
|
|
logger.info(f"Building {len(packages)} packages")
|
|
# Build in parallel or sequentially
|
if self.config.building.parallel:
|
tasks = [self.build_package(pkg, force) for pkg in packages]
|
results = await asyncio.gather(*tasks)
|
else:
|
results = []
|
for pkg in packages:
|
result = await self.build_package(pkg, force)
|
results.append(result)
|
|
# Summary
|
success = sum(1 for r in results if r.status == BuildStatus.SUCCESS)
|
failed = sum(1 for r in results if r.status == BuildStatus.FAILED)
|
skipped = sum(1 for r in results if r.status == BuildStatus.SKIPPED)
|
|
logger.info(f"Build complete: {success} succeeded, {failed} failed, {skipped} skipped")
|
|
return list(results)
|
|
async def add_package(self, package: str) -> BuildResult:
|
"""Add and build a new package with dependencies.
|
|
Args:
|
package: Package name
|
|
Returns:
|
BuildResult for the main package
|
"""
|
logger.info(f"Adding package: {package}")
|
|
# Resolve dependencies
|
build_order = await self.resolver.resolve([package])
|
|
if package not in build_order.packages:
|
logger.info(f"Package {package} does not need to be built")
|
return BuildResult(
|
package=package,
|
status=BuildStatus.SKIPPED,
|
)
|
|
# Build dependencies first
|
results: list[BuildResult] = []
|
for dep in build_order:
|
if dep != package:
|
logger.info(f"Building dependency: {dep}")
|
result = await self.build_package(dep, force=True)
|
results.append(result)
|
|
if result.status == BuildStatus.FAILED:
|
logger.error(f"Dependency {dep} failed, aborting")
|
return BuildResult(
|
package=package,
|
status=BuildStatus.FAILED,
|
error=f"Dependency {dep} failed to build",
|
)
|
|
# Build main package
|
return await self.build_package(package, force=True)
|
|
def remove_package(self, package: str) -> bool:
|
"""Remove a package from the build directory.
|
|
Args:
|
package: Package name
|
|
Returns:
|
True if removed successfully
|
"""
|
pkg_dir = self._get_package_dir(package)
|
|
if pkg_dir.exists():
|
shutil.rmtree(pkg_dir)
|
logger.info(f"Removed package: {package}")
|
return True
|
|
logger.warning(f"Package not found: {package}")
|
return False
|