mirror of https://github.com/Chizi123/Arch-autobuild-repo.git

Joel Grunbaum
2 days ago dc96e98618fe4739210d10ee546b105e36433d02
Intiial python rewrite of repository
1 files modified
18 files added
3514 ■■■■■ changed files
.gitignore 31 ●●●●● patch | view | raw | blame | history
README.md 129 ●●●●● patch | view | raw | blame | history
agents.md 52 ●●●●● patch | view | raw | blame | history
config/config.example.yaml 87 ●●●●● patch | view | raw | blame | history
pyproject.toml 62 ●●●●● patch | view | raw | blame | history
src/archbuild/__init__.py 3 ●●●●● patch | view | raw | blame | history
src/archbuild/aur.py 299 ●●●●● patch | view | raw | blame | history
src/archbuild/builder.py 456 ●●●●● patch | view | raw | blame | history
src/archbuild/cli.py 381 ●●●●● patch | view | raw | blame | history
src/archbuild/config.py 202 ●●●●● patch | view | raw | blame | history
src/archbuild/logging.py 62 ●●●●● patch | view | raw | blame | history
src/archbuild/notifications.py 346 ●●●●● patch | view | raw | blame | history
src/archbuild/repo.py 310 ●●●●● patch | view | raw | blame | history
src/archbuild/resolver.py 317 ●●●●● patch | view | raw | blame | history
tests/__init__.py patch | view | raw | blame | history
tests/integration_test.py 370 ●●●●● patch | view | raw | blame | history
tests/test_aur.py 127 ●●●●● patch | view | raw | blame | history
tests/test_config.py 149 ●●●●● patch | view | raw | blame | history
tests/test_resolver.py 131 ●●●●● patch | view | raw | blame | history
.gitignore
@@ -1,3 +1,32 @@
# Legacy
chroot
repo
vars.sh
# Python
__pycache__/
*.py[cod]
*$py.class
.venv/
env/
venv/
ENV/
# Distribution / Build
dist/
local_build/
build/
*.egg-info/
# Tests / Coverage
.pytest_cache/
.coverage
htmlcov/
.mypy_cache/
.ruff_cache/
# Project Specific
config.yaml
repo/
waitlist*
errorfile
logs/
README.md
New file
@@ -0,0 +1,129 @@
# Archbuild
A modern, sustainable AUR package building and repository management tool for Arch Linux.
## Features
- **Automatic AUR builds** - Clone and build packages from the AUR
- **Dependency resolution** - Automatically resolve and build AUR dependencies in correct order
- **Parallel builds** - Build multiple packages concurrently with configurable worker count
- **Package signing** - Optional GPG signing for packages and database
- **Retry logic** - Configurable retries with exponential backoff for transient failures
- **Package retention** - Keep configurable number of old versions, auto-cleanup
- **Notifications** - Email and webhook (Discord, Slack, ntfy) notifications on failures
- **VCS package handling** - Automatically rebuild git/svn/hg packages
- **Modern Python** - Type-safe, async, well-tested codebase
## Installation
```bash
# From source
pip install -e .
# Or with development dependencies
pip install -e ".[dev]"
```
## Quick Start
1. **Create configuration**:
   ```bash
   cp config/config.example.yaml config.yaml
   # Edit config.yaml with your settings
   ```
2. **Initialize repository**:
   ```bash
   archbuild -c config.yaml init
   ```
3. **Add packages**:
   ```bash
   archbuild add yay paru
   ```
4. **Build all packages**:
   ```bash
   archbuild build-all
   ```
5. **Build a specific package**:
   ```bash
   archbuild build <package>
   ```
## Commands
| Command                    | Description                                |
|----------------------------|--------------------------------------------|
| `init`                     | Initialize repository directories          |
| `add <packages...>`        | Add and build new packages                 |
| `remove <packages...>`     | Remove packages from repo                  |
| `build-all [-f]`           | Build all packages, `-f` forces rebuild    |
| `build <package>`          | Build a specific package                   |
| `check`                    | Check for packages moved to official repos |
| `list`                     | List packages in repository                |
| `remake`                   | Rebuild repository database                |
| `cleanup`                  | Remove old package versions                |
| `migrate-config <vars.sh>` | Migrate legacy config                      |
| `test-notifications`       | Test notification setup                    |
## Configuration
See `config/config.example.yaml` for all options. Key settings:
```yaml
repository:
  name: "myrepo"
  path: "/repo/x86_64"
  build_dir: "/repo/build"
building:
  parallel: true
  max_workers: 4
  retry_attempts: 3
retention:
  keep_versions: 3
notifications:
  email:
    enabled: true
    to: "admin@example.com"
```
## Migration from Bash Version
```bash
archbuild migrate-config vars.sh -o config.yaml
```
## Systemd Timer
Create `/etc/systemd/system/archbuild.service`:
```ini
[Unit]
Description=Build AUR packages
[Service]
Type=oneshot
ExecStart=/usr/bin/archbuild -c /etc/archbuild/config.yaml build-all
User=builduser
```
Create `/etc/systemd/system/archbuild.timer`:
```ini
[Unit]
Description=Run archbuild daily
[Timer]
OnCalendar=daily
Persistent=true
[Install]
WantedBy=timers.target
```
## License
MIT
agents.md
New file
@@ -0,0 +1,52 @@
# Archbuild Agent Guidance
This document provides guidance for AI agents maintaining or extending the Archbuild codebase.
## Architecture Highlights
Archbuild is a modern Python rewrite of a legacy Bash-based Arch Linux package autobuilder. It prioritizes reliability, maintainability, and extensibility.
- **Asynchronous I/O**: Uses `asyncio` and `aiohttp` for AUR RPC queries and network-bound tasks.
- **Process Isolation**: Uses `ProcessPoolExecutor` for long-running `makepkg` builds to avoid GIL limitations.
- **Robust Locking**: Uses `fcntl.flock()` for file-based locking of the repository database and package builds.
- **Configuration**: Uses Pydantic V2 for YAML configuration loading and validation.
- **Logging**: Uses `rich` for structured, colorful terminal logging and optional file logging.
## Core Modules
- `cli.py`: Command-line interface using `click`.
- `config.py`: Configuration models and migration tools.
- `aur.py`: AUR RPC API client with caching and retry logic.
- `resolver.py`: Dependency resolution and topological sorting.
- `builder.py`: Build orchestration and parallelization.
- `repo.py`: Repository database and package retention management.
- `notifications.py`: Extensible notification system (Email, Webhooks).
## Development Patterns
### Async Usage
Always use async context managers for `AURClient` and `Builder` to ensure sessions and executors are properly closed.
```python
async with AURClient() as aur:
    async with Builder(config, aur) as builder:
        await builder.build_all()
```
### Locking
Always wrap repository updates in the repository lock found in `RepoManager`.
### Adding New Commands
Add new commands to `cli.py` using Click. If a command needs AUR access or building, use the `AURClient` and `Builder` inside an async function wrapped by `run_async`.
### Testing
- Unit tests are in `tests/test_*.py`.
- Integration tests are in `tests/integration_test.py` and require an Arch Linux environment.
- Use `pytest` for running unit tests.
## Maintenance Checklist
- [ ] Ensure all Pydantic models in `config.py` are up to date with new features.
- [ ] Maintain the topological sort logic in `resolver.py` (Kahn's algorithm).
- [ ] Keep `DLAGENTS` and `VCSCLIENTS` in `integration_test.py` in sync with system defaults.
- [ ] Update `README.md` when adding new CLI commands or configuration options.
config/config.example.yaml
New file
@@ -0,0 +1,87 @@
# Archbuild Configuration
# Copy this file to config.yaml and edit as needed
repository:
  # Repository name (used in pacman.conf)
  name: "myrepo"
  # Directory containing the repository packages and database
  path: "/repo/x86_64"
  # Directory for cloning and building packages
  build_dir: "/repo/build"
  # Compression format for packages (zst, xz, gz)
  compression: "zst"
building:
  # Enable parallel builds
  parallel: true
  # Maximum number of concurrent build workers
  max_workers: 4
  # Clean build directory after successful build
  clean: true
  # Update system before building (sudo pacman -Syu)
  update_system: false
  # Number of retry attempts on build failure
  retry_attempts: 3
  # Base delay between retries in seconds (exponential backoff)
  retry_delay: 5
signing:
  # Enable package signing
  enabled: false
  # GPG key ID for signing (leave empty to use default key)
  key: ""
retention:
  # Number of old package versions to keep
  keep_versions: 3
  # Automatically clean old versions after build
  cleanup_on_build: true
notifications:
  email:
    # Enable email notifications on build failures
    enabled: false
    # Recipient email address
    to: ""
    # Sender email address
    from: "archbuild@localhost"
    # SMTP server settings
    smtp_host: "localhost"
    smtp_port: 25
    use_tls: false
    username: ""
    password: ""
  # Webhook notifications (Discord, Slack, ntfy, etc.)
  # Uncomment and configure as needed
  webhooks: []
  # Example Discord webhook:
  # webhooks:
  #   - enabled: true
  #     type: "discord"
  #     url: "https://discord.com/api/webhooks/..."
  #
  # Example ntfy webhook:
  # webhooks:
  #   - enabled: true
  #     type: "ntfy"
  #     url: "https://ntfy.sh/your-topic"
# Per-package build overrides
# Use this for packages that need special handling
package_overrides: {}
  # Example: Microsoft fonts need checksum skipping
  # ttf-ms-win10:
  #   skip_checksums: true
  #
  # Example: Custom environment variables
  # some-package:
  #   env:
  #     LC_ALL: "C"
  #   extra_args:
  #     - "--nocheck"
# Logging configuration
log_level: "INFO"  # DEBUG, INFO, WARNING, ERROR, CRITICAL
# Uncomment to also log to file:
# log_file: "/var/log/archbuild.log"
pyproject.toml
New file
@@ -0,0 +1,62 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "archbuild"
version = "2.0.0"
description = "Automatic AUR package building and repository management for Arch Linux"
readme = "README.md"
license = "MIT"
requires-python = ">=3.11"
authors = [
    { name = "Joel", email = "joelgrun@gmail.com" }
]
keywords = ["arch", "linux", "aur", "pacman", "repository", "automation"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Environment :: Console",
    "Intended Audience :: System Administrators",
    "License :: OSI Approved :: MIT License",
    "Operating System :: POSIX :: Linux",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Topic :: System :: Software Distribution",
]
dependencies = [
    "click>=8.0",
    "pyyaml>=6.0",
    "pydantic>=2.0",
    "aiohttp>=3.8",
    "rich>=13.0",
]
[project.optional-dependencies]
dev = [
    "pytest>=7.0",
    "pytest-asyncio>=0.21",
    "pytest-cov>=4.0",
    "mypy>=1.0",
    "ruff>=0.1",
]
[project.scripts]
archbuild = "archbuild.cli:main"
[tool.hatch.build.targets.wheel]
packages = ["src/archbuild"]
[tool.ruff]
target-version = "py311"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]
[tool.mypy]
python_version = "3.11"
strict = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
src/archbuild/__init__.py
New file
@@ -0,0 +1,3 @@
"""Archbuild - Automatic AUR package building and repository management."""
__version__ = "2.0.0"
src/archbuild/aur.py
New file
@@ -0,0 +1,299 @@
"""Async AUR RPC API client with caching and retry logic."""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
import aiohttp
from archbuild.logging import get_logger
logger = get_logger("aur")
AUR_RPC_URL = "https://aur.archlinux.org/rpc"
AUR_PACKAGE_URL = "https://aur.archlinux.org/packages"
AUR_GIT_URL = "https://aur.archlinux.org"
class PackageType(Enum):
    """Package type from AUR."""
    NORMAL = "normal"
    SPLIT = "split"
@dataclass
class Package:
    """AUR package metadata."""
    name: str
    version: str
    description: str
    url: str | None
    maintainer: str | None
    votes: int
    popularity: float
    out_of_date: datetime | None
    first_submitted: datetime
    last_modified: datetime
    depends: list[str] = field(default_factory=list)
    makedepends: list[str] = field(default_factory=list)
    checkdepends: list[str] = field(default_factory=list)
    optdepends: list[str] = field(default_factory=list)
    provides: list[str] = field(default_factory=list)
    conflicts: list[str] = field(default_factory=list)
    replaces: list[str] = field(default_factory=list)
    license: list[str] = field(default_factory=list)
    keywords: list[str] = field(default_factory=list)
    @property
    def git_url(self) -> str:
        """Get the git clone URL for this package."""
        return f"{AUR_GIT_URL}/{self.name}.git"
    @property
    def aur_url(self) -> str:
        """Get the AUR web page URL for this package."""
        return f"{AUR_PACKAGE_URL}/{self.name}"
    @classmethod
    def from_rpc(cls, data: dict[str, Any]) -> "Package":
        """Create Package from AUR RPC response data.
        Args:
            data: Package data from AUR RPC API
        Returns:
            Package instance
        """
        return cls(
            name=data["Name"],
            version=data["Version"],
            description=data.get("Description", ""),
            url=data.get("URL"),
            maintainer=data.get("Maintainer"),
            votes=data.get("NumVotes", 0),
            popularity=data.get("Popularity", 0.0),
            out_of_date=(
                datetime.fromtimestamp(data["OutOfDate"]) if data.get("OutOfDate") else None
            ),
            first_submitted=datetime.fromtimestamp(data["FirstSubmitted"]),
            last_modified=datetime.fromtimestamp(data["LastModified"]),
            depends=data.get("Depends", []),
            makedepends=data.get("MakeDepends", []),
            checkdepends=data.get("CheckDepends", []),
            optdepends=data.get("OptDepends", []),
            provides=data.get("Provides", []),
            conflicts=data.get("Conflicts", []),
            replaces=data.get("Replaces", []),
            license=data.get("License", []),
            keywords=data.get("Keywords", []),
        )
@dataclass
class CacheEntry:
    """Cache entry with TTL."""
    data: Package
    expires: datetime
class AURClient:
    """Async client for AUR RPC API with caching and retry."""
    def __init__(
        self,
        cache_ttl: int = 300,
        max_retries: int = 3,
        retry_delay: float = 1.0,
        batch_size: int = 100,
    ):
        """Initialize AUR client.
        Args:
            cache_ttl: Cache time-to-live in seconds
            max_retries: Maximum number of retry attempts
            retry_delay: Base delay between retries (exponential backoff)
            batch_size: Maximum packages per batch request
        """
        self.cache_ttl = cache_ttl
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.batch_size = batch_size
        self._cache: dict[str, CacheEntry] = {}
        self._session: aiohttp.ClientSession | None = None
    async def __aenter__(self) -> "AURClient":
        """Async context manager entry."""
        self._session = aiohttp.ClientSession()
        return self
    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        if self._session:
            await self._session.close()
            self._session = None
    def _get_cached(self, name: str) -> Package | None:
        """Get package from cache if not expired.
        Args:
            name: Package name
        Returns:
            Cached package or None if not cached/expired
        """
        entry = self._cache.get(name)
        if entry and entry.expires > datetime.now():
            return entry.data
        return None
    def _set_cached(self, package: Package) -> None:
        """Store package in cache.
        Args:
            package: Package to cache
        """
        self._cache[package.name] = CacheEntry(
            data=package,
            expires=datetime.now() + timedelta(seconds=self.cache_ttl),
        )
    async def _request(
        self,
        params: list[tuple[str, Any]] | dict[str, Any],
    ) -> dict[str, Any]:
        """Make request to AUR RPC API with retry logic.
        Args:
            params: Query parameters (as list of tuples for repeated keys, or dict)
        Returns:
            JSON response data
        Raises:
            aiohttp.ClientError: If request fails after all retries
        """
        if not self._session:
            raise RuntimeError("AURClient must be used as async context manager")
        last_error: Exception | None = None
        for attempt in range(self.max_retries):
            try:
                async with self._session.get(AUR_RPC_URL, params=params) as response:
                    response.raise_for_status()
                    data = await response.json()
                    if data.get("type") == "error":
                        raise ValueError(f"AUR API error: {data.get('error')}")
                    return data
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                last_error = e
                if attempt < self.max_retries - 1:
                    delay = self.retry_delay * (2**attempt)
                    logger.warning(
                        f"AUR request failed (attempt {attempt + 1}/{self.max_retries}), "
                        f"retrying in {delay}s: {e}"
                    )
                    await asyncio.sleep(delay)
        raise last_error or RuntimeError("Request failed")
    async def get_package(self, name: str) -> Package | None:
        """Get a single package by name.
        Args:
            name: Package name
        Returns:
            Package if found, None otherwise
        """
        # Check cache first
        cached = self._get_cached(name)
        if cached:
            logger.debug(f"Cache hit for package: {name}")
            return cached
        packages = await self.get_packages([name])
        return packages[0] if packages else None
    async def get_packages(self, names: list[str]) -> list[Package]:
        """Get multiple packages by name using batch queries.
        Args:
            names: List of package names
        Returns:
            List of found packages (may be fewer than requested)
        """
        # Separate cached and uncached packages
        result: list[Package] = []
        uncached: list[str] = []
        for name in names:
            cached = self._get_cached(name)
            if cached:
                result.append(cached)
            else:
                uncached.append(name)
        if not uncached:
            return result
        # Batch request uncached packages
        for i in range(0, len(uncached), self.batch_size):
            batch = uncached[i : i + self.batch_size]
            # Build params as list of tuples for repeated arg[] keys
            params: list[tuple[str, Any]] = [("v", 5), ("type", "info")]
            for name in batch:
                params.append(("arg[]", name))
            data = await self._request(params)
            for pkg_data in data.get("results", []):
                package = Package.from_rpc(pkg_data)
                self._set_cached(package)
                result.append(package)
        return result
    async def search(self, query: str, by: str = "name-desc") -> list[Package]:
        """Search AUR packages.
        Args:
            query: Search query string
            by: Search field (name, name-desc, maintainer, depends, makedepends, optdepends, checkdepends)
        Returns:
            List of matching packages
        """
        params = {"v": 5, "type": "search", "by": by, "arg": query}
        data = await self._request(params)
        packages = []
        for pkg_data in data.get("results", []):
            package = Package.from_rpc(pkg_data)
            self._set_cached(package)
            packages.append(package)
        return packages
    async def is_available(self, name: str) -> bool:
        """Check if a package exists in the AUR.
        Args:
            name: Package name
        Returns:
            True if package exists
        """
        package = await self.get_package(name)
        return package is not None
    def clear_cache(self) -> None:
        """Clear the package cache."""
        self._cache.clear()
src/archbuild/builder.py
New file
@@ -0,0 +1,456 @@
"""Package builder with parallel execution and proper locking."""
import asyncio
import fcntl
import os
import shutil
import subprocess
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
from typing import Any
from archbuild.aur import AURClient
from archbuild.config import Config, PackageOverride
from archbuild.logging import get_logger
from archbuild.resolver import DependencyResolver
logger = get_logger("builder")
class BuildStatus(Enum):
    """Build status for a package."""
    PENDING = "pending"
    BUILDING = "building"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"
@dataclass
class BuildResult:
    """Result of a package build."""
    package: str
    status: BuildStatus
    version: str | None = None
    duration: float = 0.0
    error: str | None = None
    artifacts: list[Path] = field(default_factory=list)
    timestamp: datetime = field(default_factory=datetime.now)
class FileLock:
    """Context manager for file-based locking using flock."""
    def __init__(self, path: Path):
        """Initialize lock on file path.
        Args:
            path: Path to lock file
        """
        self.path = path
        self.fd: int | None = None
    def __enter__(self) -> "FileLock":
        """Acquire exclusive lock."""
        self.path.parent.mkdir(parents=True, exist_ok=True)
        self.fd = os.open(str(self.path), os.O_RDWR | os.O_CREAT)
        fcntl.flock(self.fd, fcntl.LOCK_EX)
        logger.debug(f"Acquired lock: {self.path}")
        return self
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Release lock."""
        if self.fd is not None:
            fcntl.flock(self.fd, fcntl.LOCK_UN)
            os.close(self.fd)
            logger.debug(f"Released lock: {self.path}")
def _run_makepkg(
    package_dir: Path,
    sign: bool = False,
    key: str = "",
    clean: bool = True,
    skip_checksums: bool = False,
    extra_args: list[str] | None = None,
    env_overrides: dict[str, str] | None = None,
) -> tuple[bool, str, list[Path]]:
    """Run makepkg in a subprocess.
    This runs in a separate process for parallelization.
    Args:
        package_dir: Directory containing PKGBUILD
        sign: Whether to sign packages
        key: GPG key for signing
        clean: Clean build directory after build
        skip_checksums: Skip checksum verification
        extra_args: Additional makepkg arguments
        env_overrides: Environment variable overrides
    Returns:
        Tuple of (success, error_message, artifact_paths)
    """
    cmd = ["makepkg", "-s", "--noconfirm"]
    if clean:
        cmd.append("-c")
    if sign and key:
        cmd.extend(["--sign", "--key", key])
    if skip_checksums:
        cmd.append("--skipchecksums")
    if extra_args:
        cmd.extend(extra_args)
    env = os.environ.copy()
    if env_overrides:
        env.update(env_overrides)
    try:
        result = subprocess.run(
            cmd,
            cwd=package_dir,
            capture_output=True,
            text=True,
            env=env,
            timeout=3600,  # 1 hour timeout
        )
        if result.returncode != 0:
            return False, result.stderr or result.stdout, []
        # Find built packages
        artifacts = list(package_dir.glob("*.pkg.tar.*"))
        artifacts = [a for a in artifacts if not a.name.endswith(".sig")]
        return True, "", artifacts
    except subprocess.TimeoutExpired:
        return False, "Build timed out after 1 hour", []
    except Exception as e:
        return False, str(e), []
class Builder:
    """Package builder with parallel execution support."""
    def __init__(
        self,
        config: Config,
        aur_client: AURClient,
    ):
        """Initialize builder.
        Args:
            config: Application configuration
            aur_client: AUR client for package info
        """
        self.config = config
        self.aur_client = aur_client
        self.resolver = DependencyResolver(aur_client)
        self._lock_dir = config.repository.build_dir / ".locks"
        self._executor: ProcessPoolExecutor | None = None
    async def __aenter__(self) -> "Builder":
        """Async context manager entry."""
        max_workers = self.config.building.max_workers if self.config.building.parallel else 1
        self._executor = ProcessPoolExecutor(max_workers=max_workers)
        logger.info(f"Builder initialized with {max_workers} workers")
        return self
    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        if self._executor:
            self._executor.shutdown(wait=True)
            self._executor = None
    def _get_lock_path(self, package: str) -> Path:
        """Get lock file path for package.
        Args:
            package: Package name
        Returns:
            Path to lock file
        """
        return self._lock_dir / f"{package}.lock"
    def _get_package_dir(self, package: str) -> Path:
        """Get build directory for package.
        Args:
            package: Package name
        Returns:
            Path to package build directory
        """
        return self.config.repository.build_dir / package
    def _get_override(self, package: str) -> PackageOverride:
        """Get package-specific overrides.
        Args:
            package: Package name
        Returns:
            PackageOverride (default if not specified)
        """
        # Check for package-specific override first, then fall back to _default
        if package in self.config.package_overrides:
            return self.config.package_overrides[package]
        return self.config.package_overrides.get("_default", PackageOverride())
    async def _clone_or_update(self, package: str) -> bool:
        """Clone or update package from AUR.
        Args:
            package: Package name
        Returns:
            True if there were updates (or new clone)
        """
        pkg_dir = self._get_package_dir(package)
        if pkg_dir.exists():
            # Update existing repo
            result = subprocess.run(
                ["git", "reset", "--hard"],
                cwd=pkg_dir,
                capture_output=True,
            )
            result = subprocess.run(
                ["git", "pull"],
                cwd=pkg_dir,
                capture_output=True,
                text=True,
            )
            return "Already up to date" not in result.stdout
        else:
            # Clone new repo
            pkg_info = await self.aur_client.get_package(package)
            if not pkg_info:
                raise ValueError(f"Package not found in AUR: {package}")
            pkg_dir.parent.mkdir(parents=True, exist_ok=True)
            subprocess.run(
                ["git", "clone", pkg_info.git_url, str(pkg_dir)],
                check=True,
                capture_output=True,
            )
            return True
    def _is_vcs_package(self, package_dir: Path) -> bool:
        """Check if package is a VCS package (needs rebuild on each run).
        Args:
            package_dir: Path to package directory
        Returns:
            True if VCS package
        """
        pkgbuild = package_dir / "PKGBUILD"
        if not pkgbuild.exists():
            return False
        content = pkgbuild.read_text()
        return "pkgver()" in content
    async def build_package(
        self,
        package: str,
        force: bool = False,
    ) -> BuildResult:
        """Build a single package.
        Args:
            package: Package name
            force: Force rebuild even if up to date
        Returns:
            BuildResult with status and artifacts
        """
        start_time = datetime.now()
        pkg_dir = self._get_package_dir(package)
        override = self._get_override(package)
        logger.info(f"Building package: {package}")
        try:
            # Clone or update
            has_updates = await self._clone_or_update(package)
            is_vcs = self._is_vcs_package(pkg_dir)
            # Skip if no updates and not forced
            if not has_updates and not is_vcs and not force:
                logger.info(f"Skipping {package}: already up to date")
                return BuildResult(
                    package=package,
                    status=BuildStatus.SKIPPED,
                    duration=(datetime.now() - start_time).total_seconds(),
                )
            # Run build with retries
            last_error = ""
            for attempt in range(self.config.building.retry_attempts):
                if attempt > 0:
                    delay = self.config.building.retry_delay * (2 ** (attempt - 1))
                    logger.warning(
                        f"Retrying {package} (attempt {attempt + 1}/"
                        f"{self.config.building.retry_attempts}) after {delay}s"
                    )
                    await asyncio.sleep(delay)
                # Run makepkg in executor
                loop = asyncio.get_event_loop()
                success, error, artifacts = await loop.run_in_executor(
                    self._executor,
                    _run_makepkg,
                    pkg_dir,
                    self.config.signing.enabled,
                    self.config.signing.key,
                    self.config.building.clean,
                    override.skip_checksums,
                    override.extra_args,
                    override.env,
                )
                if success:
                    duration = (datetime.now() - start_time).total_seconds()
                    logger.info(f"Successfully built {package} in {duration:.1f}s")
                    return BuildResult(
                        package=package,
                        status=BuildStatus.SUCCESS,
                        duration=duration,
                        artifacts=artifacts,
                    )
                last_error = error
            # All retries failed
            duration = (datetime.now() - start_time).total_seconds()
            logger.error(f"Failed to build {package}: {last_error}")
            return BuildResult(
                package=package,
                status=BuildStatus.FAILED,
                duration=duration,
                error=last_error,
            )
        except Exception as e:
            duration = (datetime.now() - start_time).total_seconds()
            logger.exception(f"Error building {package}")
            return BuildResult(
                package=package,
                status=BuildStatus.FAILED,
                duration=duration,
                error=str(e),
            )
    async def build_all(
        self,
        force: bool = False,
    ) -> list[BuildResult]:
        """Build all packages in build directory.
        Args:
            force: Force rebuild all packages
        Returns:
            List of build results
        """
        # Update system if configured
        if self.config.building.update_system:
            logger.info("Updating system...")
            subprocess.run(
                ["sudo", "pacman", "-Syu", "--noconfirm"],
                check=False,
            )
        # Find all packages
        build_dir = self.config.repository.build_dir
        packages = [
            d.name for d in build_dir.iterdir()
            if d.is_dir() and not d.name.startswith(".")
        ]
        if not packages:
            logger.warning("No packages found in build directory")
            return []
        logger.info(f"Building {len(packages)} packages")
        # Build in parallel or sequentially
        if self.config.building.parallel:
            tasks = [self.build_package(pkg, force) for pkg in packages]
            results = await asyncio.gather(*tasks)
        else:
            results = []
            for pkg in packages:
                result = await self.build_package(pkg, force)
                results.append(result)
        # Summary
        success = sum(1 for r in results if r.status == BuildStatus.SUCCESS)
        failed = sum(1 for r in results if r.status == BuildStatus.FAILED)
        skipped = sum(1 for r in results if r.status == BuildStatus.SKIPPED)
        logger.info(f"Build complete: {success} succeeded, {failed} failed, {skipped} skipped")
        return list(results)
    async def add_package(self, package: str) -> BuildResult:
        """Add and build a new package with dependencies.
        Args:
            package: Package name
        Returns:
            BuildResult for the main package
        """
        logger.info(f"Adding package: {package}")
        # Resolve dependencies
        build_order = await self.resolver.resolve([package])
        # Build dependencies first
        results: list[BuildResult] = []
        for dep in build_order:
            if dep != package:
                logger.info(f"Building dependency: {dep}")
                result = await self.build_package(dep, force=True)
                results.append(result)
                if result.status == BuildStatus.FAILED:
                    logger.error(f"Dependency {dep} failed, aborting")
                    return BuildResult(
                        package=package,
                        status=BuildStatus.FAILED,
                        error=f"Dependency {dep} failed to build",
                    )
        # Build main package
        return await self.build_package(package, force=True)
    def remove_package(self, package: str) -> bool:
        """Remove a package from the build directory.
        Args:
            package: Package name
        Returns:
            True if removed successfully
        """
        pkg_dir = self._get_package_dir(package)
        if pkg_dir.exists():
            shutil.rmtree(pkg_dir)
            logger.info(f"Removed package: {package}")
            return True
        logger.warning(f"Package not found: {package}")
        return False
src/archbuild/cli.py
New file
@@ -0,0 +1,381 @@
"""Command-line interface using Click."""
import asyncio
import sys
from pathlib import Path
from typing import Any
import click
from rich.console import Console
from rich.table import Table
from archbuild import __version__
from archbuild.aur import AURClient
from archbuild.builder import Builder, BuildStatus
from archbuild.config import Config, load_config, migrate_vars_sh, save_config
from archbuild.logging import console as log_console, setup_logging
from archbuild.notifications import NotificationManager
from archbuild.repo import RepoManager
console = Console()
def run_async(coro: Any) -> Any:
    """Run async function in sync context."""
    return asyncio.get_event_loop().run_until_complete(coro)
class Context:
    """CLI context holding shared state."""
    def __init__(self, config_path: Path):
        self.config_path = config_path
        self._config: Config | None = None
    @property
    def config(self) -> Config:
        if self._config is None:
            self._config = load_config(self.config_path)
            setup_logging(self._config.log_level, self._config.log_file)
        return self._config
pass_context = click.make_pass_decorator(Context)
@click.group()
@click.option(
    "-c", "--config",
    type=click.Path(exists=False, path_type=Path),
    default=Path("config.yaml"),
    help="Path to configuration file",
)
@click.version_option(__version__, prog_name="archbuild")
@click.pass_context
def cli(ctx: click.Context, config: Path) -> None:
    """Archbuild - Automatic AUR package building and repository management.
    A modern, sustainable replacement for legacy Bash-based AUR build systems.
    """
    ctx.obj = Context(config)
@cli.command()
@click.option("--force", "-f", is_flag=True, help="Force rebuild all packages")
@pass_context
def build_all(ctx: Context, force: bool) -> None:
    """Build all packages in the build directory."""
    config = ctx.config
    async def _build_all() -> None:
        async with AURClient() as aur:
            async with Builder(config, aur) as builder:
                results = await builder.build_all(force=force)
                # Add to repository
                repo = RepoManager(config)
                for result in results:
                    if result.status == BuildStatus.SUCCESS:
                        repo.add_packages(result)
                # Send notifications
                notifier = NotificationManager(config)
                await notifier.notify(results)
                # Print summary
                _print_results(results)
    run_async(_build_all())
@cli.command()
@click.argument("package")
@click.option("--force", "-f", is_flag=True, help="Force rebuild package")
@pass_context
def build(ctx: Context, package: str, force: bool) -> None:
    """Build a specific package in the build directory."""
    config = ctx.config
    async def _build() -> None:
        async with AURClient() as aur:
            async with Builder(config, aur) as builder:
                result = await builder.build_package(package, force=force)
                if result.status == BuildStatus.SUCCESS:
                    repo = RepoManager(config)
                    repo.add_packages(result)
                # Send notifications
                notifier = NotificationManager(config)
                await notifier.notify([result])
                # Print summary
                _print_results([result])
    run_async(_build())
@cli.command()
@click.argument("packages", nargs=-1, required=True)
@pass_context
def add(ctx: Context, packages: tuple[str, ...]) -> None:
    """Add and build new packages from the AUR."""
    config = ctx.config
    async def _add() -> None:
        async with AURClient() as aur:
            async with Builder(config, aur) as builder:
                repo = RepoManager(config)
                results = []
                for package in packages:
                    console.print(f"[bold blue]Adding package:[/] {package}")
                    result = await builder.add_package(package)
                    results.append(result)
                    if result.status == BuildStatus.SUCCESS:
                        repo.add_packages(result)
                        console.print(f"[green]✓[/] {package} added successfully")
                    else:
                        console.print(f"[red]✗[/] {package} failed: {result.error}")
                _print_results(results)
    run_async(_add())
@cli.command()
@click.argument("packages", nargs=-1, required=True)
@click.option("--all-official", "-a", is_flag=True, help="Remove packages that moved to official repos")
@pass_context
def remove(ctx: Context, packages: tuple[str, ...], all_official: bool) -> None:
    """Remove packages from the repository and build directory."""
    config = ctx.config
    async def _remove() -> None:
        async with AURClient() as aur:
            async with Builder(config, aur) as builder:
                repo = RepoManager(config)
                if all_official:
                    # Find packages now in official repos
                    from archbuild.resolver import DependencyResolver
                    resolver = DependencyResolver(aur)
                    for pkg in repo.list_packages():
                        if resolver.is_in_official_repos(pkg.name):
                            console.print(f"[yellow]Removing {pkg.name}[/] (now in official repos)")
                            builder.remove_package(pkg.name)
                            repo.remove_package(pkg.name)
                else:
                    for package in packages:
                        builder.remove_package(package)
                        repo.remove_package(package)
                        console.print(f"[green]✓[/] Removed {package}")
    run_async(_remove())
@cli.command()
@pass_context
def check(ctx: Context) -> None:
    """Check for packages moved to official repos or removed from AUR."""
    config = ctx.config
    async def _check() -> None:
        async with AURClient() as aur:
            from archbuild.resolver import DependencyResolver
            resolver = DependencyResolver(aur)
            repo = RepoManager(config)
            packages = repo.list_packages()
            in_official: list[str] = []
            not_in_aur: list[str] = []
            with console.status("Checking packages..."):
                for pkg in packages:
                    if resolver.is_in_official_repos(pkg.name):
                        in_official.append(pkg.name)
                    elif not await aur.is_available(pkg.name):
                        not_in_aur.append(pkg.name)
            if in_official:
                console.print("\n[yellow]Packages now in official repos:[/]")
                for pkg in in_official:
                    console.print(f"  • {pkg}")
            if not_in_aur:
                console.print("\n[red]Packages not found in AUR:[/]")
                for pkg in not_in_aur:
                    console.print(f"  • {pkg}")
            if not in_official and not not_in_aur:
                console.print("[green]All packages OK[/]")
    run_async(_check())
@cli.command()
@pass_context
def remake(ctx: Context) -> None:
    """Rebuild the repository database from scratch."""
    config = ctx.config
    repo = RepoManager(config)
    if repo.rebuild_database():
        console.print("[green]Repository database rebuilt successfully[/]")
    else:
        console.print("[red]Failed to rebuild repository database[/]")
        sys.exit(1)
@cli.command()
@pass_context
def cleanup(ctx: Context) -> None:
    """Clean up old package versions based on retention settings."""
    config = ctx.config
    repo = RepoManager(config)
    removed = repo.cleanup()
    console.print(f"[green]Removed {removed} old package version(s)[/]")
@cli.command("list")
@pass_context
def list_packages(ctx: Context) -> None:
    """List all packages in the repository."""
    config = ctx.config
    repo = RepoManager(config)
    packages = repo.list_packages()
    if not packages:
        console.print("[yellow]No packages in repository[/]")
        return
    table = Table(title=f"Packages in {config.repository.name}")
    table.add_column("Name", style="cyan")
    table.add_column("Version", style="green")
    table.add_column("Size", justify="right")
    table.add_column("Modified", style="dim")
    for pkg in packages:
        size_mb = pkg.size / (1024 * 1024)
        table.add_row(
            pkg.name,
            pkg.version,
            f"{size_mb:.1f} MB",
            pkg.modified.strftime("%Y-%m-%d %H:%M"),
        )
    console.print(table)
@cli.command()
@pass_context
def test_notifications(ctx: Context) -> None:
    """Test notification configuration by sending test messages."""
    config = ctx.config
    async def _test() -> None:
        notifier = NotificationManager(config)
        results = await notifier.test()
        for backend, success in results.items():
            if success:
                console.print(f"[green]✓[/] {backend}: OK")
            else:
                console.print(f"[red]✗[/] {backend}: Failed")
    run_async(_test())
@cli.command()
@click.argument("vars_file", type=click.Path(exists=True, path_type=Path))
@click.option(
    "-o", "--output",
    type=click.Path(path_type=Path),
    default=Path("config.yaml"),
    help="Output config file path",
)
def migrate_config(vars_file: Path, output: Path) -> None:
    """Migrate legacy vars.sh to new YAML config format."""
    console.print(f"[blue]Migrating {vars_file} to {output}...[/]")
    try:
        data = migrate_vars_sh(vars_file)
        config = Config.model_validate(data)
        save_config(config, output)
        console.print(f"[green]✓[/] Configuration saved to {output}")
        console.print("[yellow]Note:[/] Please review and update the generated config.")
    except Exception as e:
        console.print(f"[red]Migration failed:[/] {e}")
        sys.exit(1)
@cli.command()
@pass_context
def init(ctx: Context) -> None:
    """Initialize repository directories and configuration."""
    config = ctx.config
    # Create directories
    config.repository.path.mkdir(parents=True, exist_ok=True)
    config.repository.build_dir.mkdir(parents=True, exist_ok=True)
    repo = RepoManager(config)
    repo.ensure_repo_exists()
    console.print(f"[green]✓[/] Repository initialized at {config.repository.path}")
    console.print(f"[green]✓[/] Build directory: {config.repository.build_dir}")
    # Check pacman.conf
    pacman_conf = Path("/etc/pacman.conf")
    if pacman_conf.exists():
        content = pacman_conf.read_text()
        if config.repository.name not in content:
            console.print(
                f"\n[yellow]Note:[/] Add this to /etc/pacman.conf:\n"
                f"[{config.repository.name}]\n"
                f"SigLevel = Optional TrustAll\n"
                f"Server = file://{config.repository.path}"
            )
def _print_results(results: list[Any]) -> None:
    """Print build results summary table."""
    if not results:
        return
    table = Table(title="Build Results")
    table.add_column("Package", style="cyan")
    table.add_column("Status")
    table.add_column("Duration", justify="right")
    table.add_column("Error", style="dim", max_width=40)
    for result in results:
        status_style = {
            BuildStatus.SUCCESS: "[green]✓ Success[/]",
            BuildStatus.FAILED: "[red]✗ Failed[/]",
            BuildStatus.SKIPPED: "[yellow]⏭ Skipped[/]",
            BuildStatus.PENDING: "[blue]⏳ Pending[/]",
            BuildStatus.BUILDING: "[blue]⚙ Building[/]",
        }
        table.add_row(
            result.package,
            status_style.get(result.status, str(result.status)),
            f"{result.duration:.1f}s",
            (result.error or "")[:40] if result.error else "",
        )
    console.print(table)
def main() -> None:
    """Entry point for the CLI."""
    cli()
if __name__ == "__main__":
    main()
src/archbuild/config.py
New file
@@ -0,0 +1,202 @@
"""Configuration loading and validation using Pydantic."""
from pathlib import Path
from typing import Any
import yaml
from pydantic import BaseModel, Field, field_validator
class RepositoryConfig(BaseModel):
    """Repository settings."""
    name: str = Field(..., description="Repository name")
    path: Path = Field(..., description="Path to repository directory")
    build_dir: Path = Field(..., description="Path to build directory")
    compression: str = Field(default="zst", description="Compression format")
class BuildingConfig(BaseModel):
    """Build settings."""
    parallel: bool = Field(default=True, description="Enable parallel builds")
    max_workers: int = Field(default=4, ge=1, le=32, description="Maximum parallel workers")
    clean: bool = Field(default=True, description="Clean build directory after build")
    update_system: bool = Field(default=False, description="Update system before building")
    retry_attempts: int = Field(default=3, ge=1, le=10, description="Retry attempts on failure")
    retry_delay: int = Field(default=5, ge=1, description="Base delay between retries (seconds)")
class SigningConfig(BaseModel):
    """Package signing settings."""
    enabled: bool = Field(default=False, description="Enable package signing")
    key: str = Field(default="", description="GPG key ID for signing")
class EmailConfig(BaseModel):
    """Email notification settings."""
    enabled: bool = Field(default=False, description="Enable email notifications")
    to: str = Field(default="", description="Recipient email address")
    from_addr: str = Field(default="", alias="from", description="Sender email address")
    smtp_host: str = Field(default="localhost", description="SMTP server host")
    smtp_port: int = Field(default=25, description="SMTP server port")
    use_tls: bool = Field(default=False, description="Use TLS for SMTP")
    username: str = Field(default="", description="SMTP username")
    password: str = Field(default="", description="SMTP password")
class WebhookConfig(BaseModel):
    """Webhook notification settings (extensible for future use)."""
    enabled: bool = Field(default=False, description="Enable webhook notifications")
    url: str = Field(default="", description="Webhook URL")
    type: str = Field(default="generic", description="Webhook type (generic, discord, slack)")
class NotificationsConfig(BaseModel):
    """Notification settings."""
    email: EmailConfig = Field(default_factory=EmailConfig)
    webhooks: list[WebhookConfig] = Field(default_factory=list)
class PackageRetentionConfig(BaseModel):
    """Package retention settings."""
    keep_versions: int = Field(default=3, ge=1, le=100, description="Number of old versions to keep")
    cleanup_on_build: bool = Field(default=True, description="Clean old versions after build")
class PackageOverride(BaseModel):
    """Per-package build overrides."""
    skip_checksums: bool = Field(default=False, description="Skip checksum verification")
    extra_args: list[str] = Field(default_factory=list, description="Extra makepkg arguments")
    env: dict[str, str] = Field(default_factory=dict, description="Environment variables")
class Config(BaseModel):
    """Main configuration model."""
    repository: RepositoryConfig
    building: BuildingConfig = Field(default_factory=BuildingConfig)
    signing: SigningConfig = Field(default_factory=SigningConfig)
    notifications: NotificationsConfig = Field(default_factory=NotificationsConfig)
    retention: PackageRetentionConfig = Field(default_factory=PackageRetentionConfig)
    package_overrides: dict[str, PackageOverride] = Field(
        default_factory=dict, description="Per-package overrides"
    )
    log_level: str = Field(default="INFO", description="Logging level")
    log_file: Path | None = Field(default=None, description="Optional log file path")
    @field_validator("log_level")
    @classmethod
    def validate_log_level(cls, v: str) -> str:
        valid_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
        if v.upper() not in valid_levels:
            raise ValueError(f"Invalid log level: {v}. Must be one of {valid_levels}")
        return v.upper()
def load_config(path: Path) -> Config:
    """Load and validate configuration from YAML file.
    Args:
        path: Path to configuration file
    Returns:
        Validated Config object
    Raises:
        FileNotFoundError: If config file doesn't exist
        ValidationError: If config is invalid
    """
    if not path.exists():
        raise FileNotFoundError(f"Configuration file not found: {path}")
    with open(path) as f:
        data = yaml.safe_load(f)
    return Config.model_validate(data)
def migrate_vars_sh(vars_path: Path) -> dict[str, Any]:
    """Migrate old vars.sh to new config format.
    Args:
        vars_path: Path to vars.sh file
    Returns:
        Dictionary suitable for Config.model_validate()
    """
    variables: dict[str, str] = {}
    with open(vars_path) as f:
        for line in f:
            line = line.strip()
            if line and not line.startswith("#") and "=" in line:
                # Handle export statements
                if line.startswith("export "):
                    line = line[7:]
                key, _, value = line.partition("=")
                # Remove quotes
                value = value.strip("'\"")
                variables[key] = value
    # Convert to new format
    config: dict[str, Any] = {
        "repository": {
            "name": variables.get("REPONAME", ""),
            "path": variables.get("REPODIR", "/repo/x86_64"),
            "build_dir": variables.get("BUILDDIR", "/repo/build"),
            "compression": variables.get("COMPRESSION", "zst"),
        },
        "building": {
            "parallel": variables.get("PARALLEL", "N") == "Y",
            "clean": variables.get("CLEAN", "N") == "Y",
            "update_system": variables.get("UPDATE", "N") == "Y",
        },
        "signing": {
            "enabled": variables.get("SIGN", "N") == "Y",
            "key": variables.get("KEY", ""),
        },
        "notifications": {
            "email": {
                "enabled": bool(variables.get("TO_EMAIL")),
                "to": variables.get("TO_EMAIL", ""),
                "from": variables.get("FROM_EMAIL", ""),
            }
        },
        "retention": {
            "keep_versions": int(variables.get("NUM_OLD", "5")),
        },
    }
    return config
def save_config(config: Config, path: Path) -> None:
    """Save configuration to YAML file.
    Args:
        config: Config object to save
        path: Path to save configuration file
    """
    data = config.model_dump(by_alias=True, exclude_none=True)
    # Convert Path objects to strings for YAML
    def convert_paths(obj: Any) -> Any:
        if isinstance(obj, Path):
            return str(obj)
        elif isinstance(obj, dict):
            return {k: convert_paths(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [convert_paths(v) for v in obj]
        return obj
    data = convert_paths(data)
    with open(path, "w") as f:
        yaml.dump(data, f, default_flow_style=False, sort_keys=False)
src/archbuild/logging.py
New file
@@ -0,0 +1,62 @@
"""Structured logging setup using Rich."""
import logging
from pathlib import Path
from rich.console import Console
from rich.logging import RichHandler
console = Console()
def setup_logging(
    level: str = "INFO",
    log_file: Path | None = None,
) -> logging.Logger:
    """Configure logging with Rich handler for pretty console output.
    Args:
        level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        log_file: Optional path to log file
    Returns:
        Configured logger instance
    """
    handlers: list[logging.Handler] = [
        RichHandler(
            console=console,
            rich_tracebacks=True,
            tracebacks_show_locals=True,
            show_time=True,
            show_path=False,
        )
    ]
    if log_file:
        log_file.parent.mkdir(parents=True, exist_ok=True)
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(
            logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
        )
        handlers.append(file_handler)
    logging.basicConfig(
        level=getattr(logging, level.upper()),
        format="%(message)s",
        datefmt="[%X]",
        handlers=handlers,
    )
    return logging.getLogger("archbuild")
def get_logger(name: str) -> logging.Logger:
    """Get a child logger for a specific module.
    Args:
        name: Module name for the logger
    Returns:
        Logger instance
    """
    return logging.getLogger(f"archbuild.{name}")
src/archbuild/notifications.py
New file
@@ -0,0 +1,346 @@
"""Notification system with email and extensible webhook support."""
import asyncio
import smtplib
import ssl
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any
import aiohttp
from archbuild.builder import BuildResult, BuildStatus
from archbuild.config import Config, EmailConfig, WebhookConfig
from archbuild.logging import get_logger
logger = get_logger("notifications")
@dataclass
class BuildSummary:
    """Summary of build results for notifications."""
    total: int
    success: int
    failed: int
    skipped: int
    failed_packages: list[str]
    duration: float
    timestamp: datetime
    @classmethod
    def from_results(cls, results: list[BuildResult]) -> "BuildSummary":
        """Create summary from build results."""
        return cls(
            total=len(results),
            success=sum(1 for r in results if r.status == BuildStatus.SUCCESS),
            failed=sum(1 for r in results if r.status == BuildStatus.FAILED),
            skipped=sum(1 for r in results if r.status == BuildStatus.SKIPPED),
            failed_packages=[r.package for r in results if r.status == BuildStatus.FAILED],
            duration=sum(r.duration for r in results),
            timestamp=datetime.now(),
        )
class NotificationBackend(ABC):
    """Abstract base class for notification backends."""
    @abstractmethod
    async def send(self, summary: BuildSummary, config: Config) -> bool:
        """Send notification.
        Args:
            summary: Build summary
            config: Application config
        Returns:
            True if sent successfully
        """
        pass
class EmailBackend(NotificationBackend):
    """Email notification backend."""
    def __init__(self, email_config: EmailConfig):
        """Initialize email backend.
        Args:
            email_config: Email configuration
        """
        self.config = email_config
    def _format_message(self, summary: BuildSummary, repo_name: str) -> str:
        """Format email message body.
        Args:
            summary: Build summary
            repo_name: Repository name
        Returns:
            Formatted message string
        """
        lines = [
            f"Build Report for {repo_name}",
            f"Time: {summary.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
            "",
            f"Total packages: {summary.total}",
            f"  Successful: {summary.success}",
            f"  Failed: {summary.failed}",
            f"  Skipped: {summary.skipped}",
            f"Total duration: {summary.duration:.1f}s",
        ]
        if summary.failed_packages:
            lines.extend([
                "",
                "Failed packages:",
            ])
            for pkg in summary.failed_packages:
                lines.append(f"  - {pkg}")
        return "\n".join(lines)
    async def send(self, summary: BuildSummary, config: Config) -> bool:
        """Send email notification."""
        if not self.config.enabled:
            return True
        if not self.config.to:
            logger.warning("Email notification enabled but no recipient configured")
            return False
        # Only send on failures
        if summary.failed == 0:
            logger.debug("No failures, skipping email notification")
            return True
        try:
            msg = MIMEMultipart()
            msg["From"] = self.config.from_addr or f"archbuild@localhost"
            msg["To"] = self.config.to
            msg["Subject"] = f"Build Errors - {config.repository.name}"
            body = self._format_message(summary, config.repository.name)
            msg.attach(MIMEText(body, "plain"))
            # Send email
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, self._send_email, msg)
            logger.info(f"Sent email notification to {self.config.to}")
            return True
        except Exception as e:
            logger.error(f"Failed to send email: {e}")
            return False
    def _send_email(self, msg: MIMEMultipart) -> None:
        """Send email synchronously (called from executor)."""
        if self.config.use_tls:
            context = ssl.create_default_context()
            with smtplib.SMTP_SSL(
                self.config.smtp_host,
                self.config.smtp_port,
                context=context,
            ) as server:
                if self.config.username and self.config.password:
                    server.login(self.config.username, self.config.password)
                server.send_message(msg)
        else:
            with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port) as server:
                if self.config.username and self.config.password:
                    server.login(self.config.username, self.config.password)
                server.send_message(msg)
class WebhookBackend(NotificationBackend):
    """Generic webhook notification backend.
    Extensible for Discord, Slack, ntfy, Gotify, etc.
    """
    def __init__(self, webhook_config: WebhookConfig):
        """Initialize webhook backend.
        Args:
            webhook_config: Webhook configuration
        """
        self.config = webhook_config
    def _format_payload(self, summary: BuildSummary, repo_name: str) -> dict[str, Any]:
        """Format webhook payload based on webhook type.
        Args:
            summary: Build summary
            repo_name: Repository name
        Returns:
            Payload dictionary
        """
        base_content = (
            f"**Build Report for {repo_name}**\n"
            f"✅ Success: {summary.success} | "
            f"❌ Failed: {summary.failed} | "
            f"⏭️ Skipped: {summary.skipped}"
        )
        if summary.failed_packages:
            base_content += f"\n\nFailed: {', '.join(summary.failed_packages)}"
        if self.config.type == "discord":
            return {
                "content": base_content,
                "embeds": [{
                    "title": f"Build Report - {repo_name}",
                    "color": 0xFF0000 if summary.failed else 0x00FF00,
                    "fields": [
                        {"name": "Success", "value": str(summary.success), "inline": True},
                        {"name": "Failed", "value": str(summary.failed), "inline": True},
                        {"name": "Skipped", "value": str(summary.skipped), "inline": True},
                    ],
                    "timestamp": summary.timestamp.isoformat(),
                }],
            }
        elif self.config.type == "slack":
            return {
                "text": base_content,
                "blocks": [
                    {
                        "type": "section",
                        "text": {"type": "mrkdwn", "text": base_content},
                    }
                ],
            }
        elif self.config.type == "ntfy":
            return {
                "topic": repo_name,
                "title": f"Build Report - {repo_name}",
                "message": base_content,
                "priority": 5 if summary.failed else 3,
                "tags": ["package", "failed"] if summary.failed else ["package"],
            }
        else:  # generic
            return {
                "repository": repo_name,
                "summary": {
                    "total": summary.total,
                    "success": summary.success,
                    "failed": summary.failed,
                    "skipped": summary.skipped,
                },
                "failed_packages": summary.failed_packages,
                "timestamp": summary.timestamp.isoformat(),
            }
    async def send(self, summary: BuildSummary, config: Config) -> bool:
        """Send webhook notification."""
        if not self.config.enabled:
            return True
        if not self.config.url:
            logger.warning("Webhook notification enabled but no URL configured")
            return False
        # Only send on failures (configurable in future)
        if summary.failed == 0:
            logger.debug("No failures, skipping webhook notification")
            return True
        try:
            payload = self._format_payload(summary, config.repository.name)
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    self.config.url,
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=30),
                ) as response:
                    response.raise_for_status()
            logger.info(f"Sent webhook notification to {self.config.url}")
            return True
        except Exception as e:
            logger.error(f"Failed to send webhook: {e}")
            return False
class NotificationManager:
    """Manage multiple notification backends."""
    def __init__(self, config: Config):
        """Initialize notification manager.
        Args:
            config: Application configuration
        """
        self.config = config
        self.backends: list[NotificationBackend] = []
        # Add email backend if configured
        if config.notifications.email.enabled:
            self.backends.append(EmailBackend(config.notifications.email))
        # Add webhook backends
        for webhook in config.notifications.webhooks:
            if webhook.enabled:
                self.backends.append(WebhookBackend(webhook))
    async def notify(self, results: list[BuildResult]) -> dict[str, bool]:
        """Send notifications for build results.
        Args:
            results: List of build results
        Returns:
            Dict mapping backend type to success status
        """
        summary = BuildSummary.from_results(results)
        statuses: dict[str, bool] = {}
        for backend in self.backends:
            name = type(backend).__name__
            try:
                success = await backend.send(summary, self.config)
                statuses[name] = success
            except Exception as e:
                logger.error(f"Notification backend {name} failed: {e}")
                statuses[name] = False
        return statuses
    async def test(self) -> dict[str, bool]:
        """Send test notification through all backends.
        Returns:
            Dict mapping backend type to success status
        """
        # Create dummy test summary
        test_summary = BuildSummary(
            total=3,
            success=2,
            failed=1,
            skipped=0,
            failed_packages=["test-package"],
            duration=123.45,
            timestamp=datetime.now(),
        )
        statuses: dict[str, bool] = {}
        for backend in self.backends:
            name = type(backend).__name__
            try:
                success = await backend.send(test_summary, self.config)
                statuses[name] = success
            except Exception as e:
                logger.error(f"Test notification failed for {name}: {e}")
                statuses[name] = False
        return statuses
src/archbuild/repo.py
New file
@@ -0,0 +1,310 @@
"""Repository management with repo-add/repo-remove wrappers."""
import shutil
import subprocess
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
from archbuild.builder import BuildResult, BuildStatus, FileLock
from archbuild.config import Config
from archbuild.logging import get_logger
logger = get_logger("repo")
@dataclass
class PackageInfo:
    """Information about a package in the repository."""
    name: str
    version: str
    filename: str
    size: int
    modified: datetime
class RepoManager:
    """Manage the pacman repository database."""
    def __init__(self, config: Config):
        """Initialize repository manager.
        Args:
            config: Application configuration
        """
        self.config = config
        self._lock_path = config.repository.path / ".repo.lock"
    @property
    def db_path(self) -> Path:
        """Get path to repository database file."""
        compression = self.config.repository.compression or "zst"
        return self.config.repository.path / f"{self.config.repository.name}.db.tar.{compression}"
    def _get_repo_lock(self) -> FileLock:
        """Get lock for repository operations."""
        return FileLock(self._lock_path)
    def _run_repo_command(self, cmd: list[str]) -> subprocess.CompletedProcess[str]:
        """Run a repo-add or repo-remove command.
        Args:
            cmd: Command and arguments
        Returns:
            Completed process result
        """
        if self.config.signing.enabled and self.config.signing.key:
            cmd.extend(["--sign", "--key", self.config.signing.key])
        logger.debug(f"Running: {' '.join(cmd)}")
        return subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            cwd=self.config.repository.path,
        )
    def ensure_repo_exists(self) -> None:
        """Ensure repository directory and database exist."""
        self.config.repository.path.mkdir(parents=True, exist_ok=True)
        if not self.db_path.exists():
            logger.info(f"Creating new repository: {self.config.repository.name}")
            # Create empty database
            result = self._run_repo_command([
                "repo-add",
                str(self.db_path),
            ])
            if result.returncode != 0:
                logger.warning(f"Could not create empty database: {result.stderr}")
    def add_packages(self, build_result: BuildResult) -> bool:
        """Add built packages to the repository.
        Args:
            build_result: Result from package build
        Returns:
            True if packages were added successfully
        """
        if build_result.status != BuildStatus.SUCCESS:
            logger.warning(f"Cannot add {build_result.package}: build was not successful")
            return False
        if not build_result.artifacts:
            logger.warning(f"No artifacts to add for {build_result.package}")
            return False
        with self._get_repo_lock():
            self.ensure_repo_exists()
            # Remove old versions of this package
            self._remove_old_packages(build_result.package)
            # Copy artifacts to repo directory
            copied_files: list[Path] = []
            for artifact in build_result.artifacts:
                dest = self.config.repository.path / artifact.name
                shutil.copy2(artifact, dest)
                copied_files.append(dest)
                # Also copy signature if exists
                sig_path = artifact.with_suffix(artifact.suffix + ".sig")
                if sig_path.exists():
                    shutil.copy2(sig_path, self.config.repository.path / sig_path.name)
            # Add to database
            result = self._run_repo_command([
                "repo-add",
                str(self.db_path),
            ] + [str(f) for f in copied_files])
            if result.returncode != 0:
                logger.error(f"Failed to add packages to database: {result.stderr}")
                return False
            logger.info(f"Added {len(copied_files)} package(s) to repository")
            return True
    def remove_package(self, package: str) -> bool:
        """Remove a package from the repository.
        Args:
            package: Package name to remove
        Returns:
            True if removed successfully
        """
        with self._get_repo_lock():
            # Remove from database
            result = self._run_repo_command([
                "repo-remove",
                str(self.db_path),
                package,
            ])
            if result.returncode != 0:
                logger.warning(f"Failed to remove {package} from database: {result.stderr}")
            # Remove package files
            removed = 0
            for f in self.config.repository.path.glob(f"{package}-*.pkg.tar.*"):
                f.unlink()
                removed += 1
            logger.info(f"Removed {package} ({removed} files)")
            return True
    def _remove_old_packages(self, package: str) -> int:
        """Remove old versions of a package beyond retention limit.
        Args:
            package: Package name
        Returns:
            Number of packages removed
        """
        keep_versions = self.config.retention.keep_versions
        pattern = f"{package}-*.pkg.tar.*"
        # Find all package files
        files = list(self.config.repository.path.glob(pattern))
        files = [f for f in files if not f.name.endswith(".sig")]
        if len(files) <= keep_versions:
            return 0
        # Sort by modification time, oldest first
        files.sort(key=lambda f: f.stat().st_mtime)
        # Remove oldest files exceeding retention
        to_remove = files[:-keep_versions] if keep_versions > 0 else files
        removed = 0
        for f in to_remove:
            f.unlink()
            # Also remove signature
            sig = f.with_suffix(f.suffix + ".sig")
            if sig.exists():
                sig.unlink()
            removed += 1
        if removed:
            logger.info(f"Cleaned up {removed} old version(s) of {package}")
        return removed
    def list_packages(self) -> list[PackageInfo]:
        """List all packages in the repository.
        Returns:
            List of PackageInfo objects
        """
        packages: list[PackageInfo] = []
        for f in self.config.repository.path.glob("*.pkg.tar.*"):
            if f.name.endswith(".sig"):
                continue
            # Parse package name and version from filename
            # Format: name-version-rel-arch.pkg.tar.zst
            parts = f.stem.replace(".pkg.tar", "").rsplit("-", 3)
            if len(parts) >= 3:
                name = "-".join(parts[:-2])
                version = f"{parts[-2]}-{parts[-1]}" if len(parts) > 2 else parts[-1]
            else:
                name = f.stem
                version = "unknown"
            stat = f.stat()
            packages.append(PackageInfo(
                name=name,
                version=version,
                filename=f.name,
                size=stat.st_size,
                modified=datetime.fromtimestamp(stat.st_mtime),
            ))
        return sorted(packages, key=lambda p: p.name)
    def rebuild_database(self) -> bool:
        """Rebuild the repository database from scratch.
        Returns:
            True if successful
        """
        with self._get_repo_lock():
            logger.info("Rebuilding repository database")
            # Remove old database
            for f in self.config.repository.path.glob(f"{self.config.repository.name}.db*"):
                f.unlink()
            for f in self.config.repository.path.glob(f"{self.config.repository.name}.files*"):
                f.unlink()
            # Find all packages
            packages = list(self.config.repository.path.glob("*.pkg.tar.*"))
            packages = [p for p in packages if not p.name.endswith(".sig")]
            if not packages:
                logger.warning("No packages found to add to database")
                return True
            # Add all packages
            result = self._run_repo_command([
                "repo-add",
                str(self.db_path),
            ] + [str(p) for p in packages])
            if result.returncode != 0:
                logger.error(f"Failed to rebuild database: {result.stderr}")
                return False
            logger.info(f"Database rebuilt with {len(packages)} packages")
            return True
    def cleanup(self) -> int:
        """Run cleanup on all packages, removing old versions.
        Returns:
            Total number of old packages removed
        """
        # Get unique package names
        packages = self.list_packages()
        unique_names = set(p.name for p in packages)
        total_removed = 0
        for name in unique_names:
            total_removed += self._remove_old_packages(name)
        return total_removed
    def check_integrity(self) -> list[str]:
        """Check repository integrity.
        Returns:
            List of issues found
        """
        issues: list[str] = []
        # Check database exists
        if not self.db_path.exists():
            issues.append(f"Database not found: {self.db_path}")
            return issues
        # Check for orphaned packages (in dir but not in db)
        # This would require parsing the db, simplified for now
        # Check for missing signatures
        if self.config.signing.enabled:
            for pkg in self.config.repository.path.glob("*.pkg.tar.*"):
                if pkg.name.endswith(".sig"):
                    continue
                sig = pkg.with_suffix(pkg.suffix + ".sig")
                if not sig.exists():
                    issues.append(f"Missing signature: {pkg.name}")
        return issues
src/archbuild/resolver.py
New file
@@ -0,0 +1,317 @@
"""Dependency resolution with topological sorting."""
import subprocess
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from archbuild.aur import AURClient, Package
from archbuild.logging import get_logger
logger = get_logger("resolver")
class DependencyType(Enum):
    """Type of dependency."""
    RUNTIME = "depends"
    BUILD = "makedepends"
    CHECK = "checkdepends"
@dataclass
class Dependency:
    """A package dependency."""
    name: str
    version_constraint: str | None = None
    dep_type: DependencyType = DependencyType.RUNTIME
    is_aur: bool = False
    @classmethod
    def parse(cls, dep_string: str, dep_type: DependencyType = DependencyType.RUNTIME) -> "Dependency":
        """Parse dependency string like 'package>=1.0'.
        Args:
            dep_string: Dependency string
            dep_type: Type of dependency
        Returns:
            Dependency instance
        """
        # Handle version constraints
        for op in [">=", "<=", "=", ">", "<"]:
            if op in dep_string:
                name, constraint = dep_string.split(op, 1)
                return cls(name=name, version_constraint=f"{op}{constraint}", dep_type=dep_type)
        return cls(name=dep_string, dep_type=dep_type)
@dataclass
class BuildOrder:
    """Ordered list of packages to build."""
    packages: list[str] = field(default_factory=list)
    aur_dependencies: dict[str, list[Dependency]] = field(default_factory=dict)
    def __iter__(self):
        return iter(self.packages)
    def __len__(self):
        return len(self.packages)
class DependencyResolver:
    """Resolve dependencies for AUR packages using topological sort."""
    def __init__(self, aur_client: AURClient):
        """Initialize resolver with AUR client.
        Args:
            aur_client: AURClient instance for fetching package info
        """
        self.aur_client = aur_client
        self._pacman_cache: set[str] = set()
        self._pacman_checked = False
    def _refresh_pacman_cache(self) -> None:
        """Refresh cache of packages available from official repos."""
        try:
            result = subprocess.run(
                ["pacman", "-Ssq"],
                capture_output=True,
                text=True,
                check=True,
            )
            self._pacman_cache = set(result.stdout.strip().split("\n"))
            self._pacman_checked = True
            logger.debug(f"Cached {len(self._pacman_cache)} packages from official repos")
        except subprocess.CalledProcessError as e:
            logger.warning(f"Failed to get pacman package list: {e}")
            self._pacman_cache = set()
    def is_in_official_repos(self, name: str) -> bool:
        """Check if package is available in official repositories.
        Args:
            name: Package name (without version constraint)
        Returns:
            True if available in official repos
        """
        if not self._pacman_checked:
            self._refresh_pacman_cache()
        # Strip version constraint
        base_name = name.split(">=")[0].split("<=")[0].split("=")[0].split(">")[0].split("<")[0]
        return base_name in self._pacman_cache
    def is_installed(self, name: str) -> bool:
        """Check if package is already installed.
        Args:
            name: Package name
        Returns:
            True if installed
        """
        base_name = name.split(">=")[0].split("<=")[0].split("=")[0].split(">")[0].split("<")[0]
        result = subprocess.run(
            ["pacman", "-Qi", base_name],
            capture_output=True,
        )
        return result.returncode == 0
    async def _get_all_dependencies(
        self,
        package: Package,
        visited: set[str],
        graph: dict[str, set[str]],
        packages: dict[str, Package],
    ) -> None:
        """Recursively fetch all AUR dependencies.
        Args:
            package: Package to get dependencies for
            visited: Set of already visited package names
            graph: Dependency graph (package -> dependencies)
            packages: Map of package names to Package objects
        """
        if package.name in visited:
            return
        visited.add(package.name)
        packages[package.name] = package
        graph[package.name] = set()
        # Collect all dependencies
        all_deps: list[str] = []
        all_deps.extend(package.depends)
        all_deps.extend(package.makedepends)
        aur_deps: list[str] = []
        for dep in all_deps:
            dep_parsed = Dependency.parse(dep)
            base_name = dep_parsed.name
            # Skip if in official repos or already installed
            if self.is_in_official_repos(base_name):
                continue
            if self.is_installed(base_name):
                continue
            aur_deps.append(base_name)
            graph[package.name].add(base_name)
        # Fetch AUR dependencies
        if aur_deps:
            dep_packages = await self.aur_client.get_packages(aur_deps)
            for dep_pkg in dep_packages:
                await self._get_all_dependencies(dep_pkg, visited, graph, packages)
    def _topological_sort(self, graph: dict[str, set[str]]) -> list[str]:
        """Perform topological sort on dependency graph.
        Args:
            graph: Dependency graph (package -> its dependencies)
        Returns:
            List of packages in build order (dependencies first)
        Raises:
            ValueError: If circular dependency detected
        """
        # Ensure all nodes are in the graph (including leaf dependencies)
        all_nodes: set[str] = set(graph.keys())
        for deps in graph.values():
            all_nodes.update(deps)
        # Add missing nodes to graph
        for node in all_nodes:
            if node not in graph:
                graph[node] = set()
        # Kahn's algorithm
        # in_degree[X] = number of packages that X depends on (outgoing edges)
        # We want to process packages whose dependencies are all processed
        in_degree: dict[str, int] = {node: len(graph[node]) for node in graph}
        # Start with nodes that have no dependencies (leaves)
        queue = [node for node in graph if in_degree[node] == 0]
        result: list[str] = []
        while queue:
            node = queue.pop(0)
            result.append(node)
            # For each package that depends on this node, decrement its in-degree
            for pkg, deps in graph.items():
                if node in deps:
                    in_degree[pkg] -= 1
                    if in_degree[pkg] == 0:
                        queue.append(pkg)
        if len(result) != len(graph):
            # Find cycle
            remaining = set(graph.keys()) - set(result)
            raise ValueError(f"Circular dependency detected involving: {remaining}")
        # Result is already in correct order (dependencies first)
        return result
    def detect_cycles(self, graph: dict[str, set[str]]) -> list[list[str]]:
        """Detect circular dependencies in graph.
        Args:
            graph: Dependency graph
        Returns:
            List of cycles (each cycle is a list of package names)
        """
        cycles: list[list[str]] = []
        visited: set[str] = set()
        rec_stack: set[str] = set()
        path: list[str] = []
        def dfs(node: str) -> bool:
            visited.add(node)
            rec_stack.add(node)
            path.append(node)
            for neighbor in graph.get(node, set()):
                if neighbor not in visited:
                    if dfs(neighbor):
                        return True
                elif neighbor in rec_stack:
                    # Found cycle
                    cycle_start = path.index(neighbor)
                    cycles.append(path[cycle_start:] + [neighbor])
                    return True
            path.pop()
            rec_stack.remove(node)
            return False
        for node in graph:
            if node not in visited:
                dfs(node)
        return cycles
    async def resolve(self, package_names: list[str]) -> BuildOrder:
        """Resolve dependencies and determine build order.
        Args:
            package_names: List of packages to resolve
        Returns:
            BuildOrder with packages in correct build order
        Raises:
            ValueError: If package not found or circular dependency
        """
        logger.info(f"Resolving dependencies for: {', '.join(package_names)}")
        # Fetch requested packages
        packages_list = await self.aur_client.get_packages(package_names)
        if len(packages_list) != len(package_names):
            found = {p.name for p in packages_list}
            missing = set(package_names) - found
            raise ValueError(f"Packages not found in AUR: {missing}")
        # Build dependency graph
        visited: set[str] = set()
        graph: dict[str, set[str]] = {}
        packages: dict[str, Package] = {}
        for package in packages_list:
            await self._get_all_dependencies(package, visited, graph, packages)
        # Check for cycles
        cycles = self.detect_cycles(graph)
        if cycles:
            raise ValueError(f"Circular dependencies detected: {cycles}")
        # Topological sort
        build_order = self._topological_sort(graph)
        # Build AUR dependency map
        aur_deps: dict[str, list[Dependency]] = {}
        for name in build_order:
            pkg = packages.get(name)
            if pkg:
                deps: list[Dependency] = []
                for dep in pkg.depends:
                    parsed = Dependency.parse(dep, DependencyType.RUNTIME)
                    if not self.is_in_official_repos(parsed.name):
                        parsed.is_aur = True
                    deps.append(parsed)
                for dep in pkg.makedepends:
                    parsed = Dependency.parse(dep, DependencyType.BUILD)
                    if not self.is_in_official_repos(parsed.name):
                        parsed.is_aur = True
                    deps.append(parsed)
                aur_deps[name] = deps
        logger.info(f"Build order: {' -> '.join(build_order)}")
        return BuildOrder(packages=build_order, aur_dependencies=aur_deps)
tests/__init__.py
tests/integration_test.py
New file
@@ -0,0 +1,370 @@
#!/usr/bin/env python3
"""
Integration test script for archbuild.
This script creates a temporary repository, initializes it with a basic config,
adds test packages, and verifies they build and are added correctly.
Usage:
    python tests/integration_test.py [--keep-temp]
Options:
    --keep-temp    Don't delete temporary directory after test (for debugging)
"""
import asyncio
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
# Add src to path for development
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from archbuild.aur import AURClient
from archbuild.builder import Builder, BuildStatus
from archbuild.config import Config, RepositoryConfig, BuildingConfig, SigningConfig, PackageOverride
from archbuild.logging import setup_logging, console
from archbuild.repo import RepoManager
# Test packages - real packages that exist in the AUR
# Chosen for small size and fast build times
TEST_PACKAGES = [
    "neofetch-git",  # Small bash script, very fast build
    "yay",           # Popular AUR helper
]
# Alternative packages if the above aren't available
FALLBACK_PACKAGES = [
    "gtk2",          # Legacy GTK library
    "paru",          # Another AUR helper
]
class IntegrationTest:
    """Integration test runner."""
    def __init__(self, keep_temp: bool = False):
        self.keep_temp = keep_temp
        self.temp_dir: Path | None = None
        self.config: Config | None = None
        self.passed = 0
        self.failed = 0
    def setup(self) -> None:
        """Set up temporary test environment."""
        console.print("\n[bold blue]═══ Setting up test environment ═══[/]")
        # Create temp directory
        self.temp_dir = Path(tempfile.mkdtemp(prefix="archbuild_test_"))
        console.print(f"  Created temp directory: {self.temp_dir}")
        # Create subdirectories
        repo_dir = self.temp_dir / "repo"
        build_dir = self.temp_dir / "build"
        repo_dir.mkdir()
        build_dir.mkdir()
        # Create custom makepkg.conf that disables debug packages
        # (avoids needing debugedit which may not be installed)
        makepkg_conf = self.temp_dir / "makepkg.conf"
        makepkg_conf.write_text("""
# Minimal makepkg.conf for testing
CARCH="x86_64"
CHOST="x86_64-pc-linux-gnu"
CFLAGS="-O2 -pipe"
CXXFLAGS="$CFLAGS"
LDFLAGS=""
MAKEFLAGS="-j$(nproc)"
OPTIONS=(!debug !strip !staticlibs)
PKGEXT='.pkg.tar.zst'
SRCEXT='.src.tar.gz'
PACKAGER="Integration Test <test@test.local>"
DLAGENTS=('file::/usr/bin/curl -qgC - -o %o %u'
          'ftp::/usr/bin/curl -qgfC - --ftp-pasv --retry 3 --retry-delay 3 -o %o %u'
          'http::/usr/bin/curl -qgb "" -fLC - --retry 3 --retry-delay 3 -o %o %u'
          'https::/usr/bin/curl -qgb "" -fLC - --retry 3 --retry-delay 3 -o %o %u'
          'rsync::/usr/bin/rsync --no-motd -z %u %o'
          'scp::/usr/bin/scp -C %u %o')
VCSCLIENTS=('git::git'
            'hg::mercurial'
            'svn::subversion')
""")
        self.config = Config(
            repository=RepositoryConfig(
                name="testrepo",
                path=repo_dir,
                build_dir=build_dir,
                compression="zst",
            ),
            building=BuildingConfig(
                parallel=False,  # Sequential for cleaner test output
                max_workers=1,
                clean=True,
                update_system=False,
                retry_attempts=2,
            ),
            signing=SigningConfig(
                enabled=False,
            ),
            log_level="INFO",
            # Apply package overrides to use our custom makepkg.conf
            package_overrides={
                "_default": PackageOverride(
                    extra_args=["--config", str(makepkg_conf)],
                ),
            },
        )
        setup_logging(self.config.log_level)
        console.print("  [green]✓[/] Configuration created")
    def teardown(self) -> None:
        """Clean up test environment."""
        if self.temp_dir and self.temp_dir.exists():
            if self.keep_temp:
                console.print(f"\n[yellow]Keeping temp directory:[/] {self.temp_dir}")
            else:
                shutil.rmtree(self.temp_dir)
                console.print("\n[dim]Cleaned up temp directory[/]")
    def check(self, condition: bool, message: str) -> bool:
        """Check a condition and report pass/fail."""
        if condition:
            console.print(f"  [green]✓[/] {message}")
            self.passed += 1
            return True
        else:
            console.print(f"  [red]✗[/] {message}")
            self.failed += 1
            return False
    async def test_init(self) -> bool:
        """Test repository initialization."""
        console.print("\n[bold blue]═══ Test: Repository Initialization ═══[/]")
        repo = RepoManager(self.config)
        repo.ensure_repo_exists()
        # Check directories exist
        self.check(
            self.config.repository.path.exists(),
            f"Repository directory exists: {self.config.repository.path}"
        )
        self.check(
            self.config.repository.build_dir.exists(),
            f"Build directory exists: {self.config.repository.build_dir}"
        )
        return self.failed == 0
    async def test_aur_client(self) -> list[str]:
        """Test AUR client and find available test packages."""
        console.print("\n[bold blue]═══ Test: AUR Client ═══[/]")
        available_packages = []
        async with AURClient() as aur:
            # Test package lookup
            for pkg_name in TEST_PACKAGES + FALLBACK_PACKAGES:
                pkg = await aur.get_package(pkg_name)
                if pkg:
                    self.check(True, f"Found package: {pkg_name} ({pkg.version})")
                    available_packages.append(pkg_name)
                    if len(available_packages) >= 2:
                        break
                else:
                    console.print(f"  [yellow]⚠[/] Package not found: {pkg_name}")
        self.check(
            len(available_packages) >= 1,
            f"Found {len(available_packages)} test package(s)"
        )
        return available_packages
    async def test_build_packages(self, packages: list[str]) -> dict[str, BuildStatus]:
        """Test building packages."""
        console.print("\n[bold blue]═══ Test: Package Building ═══[/]")
        results: dict[str, BuildStatus] = {}
        async with AURClient() as aur:
            async with Builder(self.config, aur) as builder:
                for pkg_name in packages:
                    console.print(f"\n  Building {pkg_name}...")
                    result = await builder.build_package(pkg_name, force=True)
                    results[pkg_name] = result.status
                    if result.status == BuildStatus.SUCCESS:
                        self.check(True, f"Built {pkg_name} successfully ({result.duration:.1f}s)")
                        self.check(
                            len(result.artifacts) > 0,
                            f"  Created {len(result.artifacts)} artifact(s)"
                        )
                        for artifact in result.artifacts:
                            console.print(f"    → {artifact.name}")
                    else:
                        self.check(False, f"Failed to build {pkg_name}: {result.error}")
        return results
    async def test_repo_add(self, packages: list[str]) -> None:
        """Test adding packages to repository."""
        console.print("\n[bold blue]═══ Test: Repository Management ═══[/]")
        repo = RepoManager(self.config)
        async with AURClient() as aur:
            async with Builder(self.config, aur) as builder:
                for pkg_name in packages:
                    # Get the build result with artifacts
                    pkg_dir = self.config.repository.build_dir / pkg_name
                    if not pkg_dir.exists():
                        continue
                    # Find artifacts
                    artifacts = list(pkg_dir.glob("*.pkg.tar.*"))
                    artifacts = [a for a in artifacts if not a.name.endswith(".sig")]
                    if artifacts:
                        # Create a mock build result for add_packages
                        from archbuild.builder import BuildResult
                        mock_result = BuildResult(
                            package=pkg_name,
                            status=BuildStatus.SUCCESS,
                            artifacts=artifacts,
                        )
                        success = repo.add_packages(mock_result)
                        self.check(success, f"Added {pkg_name} to repository")
        # List packages in repo
        pkg_list = repo.list_packages()
        self.check(len(pkg_list) > 0, f"Repository contains {len(pkg_list)} package(s)")
        for pkg in pkg_list:
            console.print(f"    → {pkg.name} {pkg.version} ({pkg.size / 1024:.1f} KB)")
    async def test_repo_database(self) -> None:
        """Test repository database integrity."""
        console.print("\n[bold blue]═══ Test: Database Integrity ═══[/]")
        db_path = self.config.repository.path / f"{self.config.repository.name}.db.tar.zst"
        self.check(db_path.exists(), f"Database file exists: {db_path.name}")
        if db_path.exists():
            # Try to list contents with tar
            result = subprocess.run(
                ["tar", "-tf", str(db_path)],
                capture_output=True,
                text=True,
            )
            if result.returncode == 0:
                entries = [e for e in result.stdout.strip().split("\n") if e]
                self.check(
                    len(entries) > 0,
                    f"Database contains {len(entries)} entries"
                )
        # Check for integrity issues
        repo = RepoManager(self.config)
        issues = repo.check_integrity()
        self.check(
            len(issues) == 0,
            f"No integrity issues found" if not issues else f"Issues: {issues}"
        )
    async def test_cleanup(self) -> None:
        """Test package cleanup functionality."""
        console.print("\n[bold blue]═══ Test: Cleanup ═══[/]")
        repo = RepoManager(self.config)
        removed = repo.cleanup()
        self.check(True, f"Cleanup removed {removed} old version(s)")
    async def run(self) -> bool:
        """Run all integration tests."""
        console.print("[bold magenta]╔════════════════════════════════════════════╗[/]")
        console.print("[bold magenta]║     Archbuild Integration Test Suite       ║[/]")
        console.print("[bold magenta]╚════════════════════════════════════════════╝[/]")
        try:
            self.setup()
            # Run tests
            await self.test_init()
            packages = await self.test_aur_client()
            if not packages:
                console.print("[red]No test packages available, cannot continue[/]")
                return False
            build_results = await self.test_build_packages(packages)  # Build all found packages
            successful = [p for p, s in build_results.items() if s == BuildStatus.SUCCESS]
            if successful:
                await self.test_repo_add(successful)
                await self.test_repo_database()
                await self.test_cleanup()
            else:
                console.print("[yellow]No successful builds to test repository with[/]")
            # Summary
            console.print("\n[bold blue]═══ Test Summary ═══[/]")
            total = self.passed + self.failed
            console.print(f"  Total:  {total}")
            console.print(f"  [green]Passed: {self.passed}[/]")
            console.print(f"  [red]Failed: {self.failed}[/]")
            if self.failed == 0:
                console.print("\n[bold green]✓ All tests passed![/]")
                return True
            else:
                console.print(f"\n[bold red]✗ {self.failed} test(s) failed[/]")
                return False
        except KeyboardInterrupt:
            console.print("\n[yellow]Test interrupted[/]")
            return False
        except Exception as e:
            console.print(f"\n[bold red]Test error:[/] {e}")
            import traceback
            traceback.print_exc()
            return False
        finally:
            self.teardown()
async def main() -> int:
    """Main entry point."""
    keep_temp = "--keep-temp" in sys.argv
    # Check if running on Arch Linux
    if not Path("/etc/arch-release").exists():
        console.print("[yellow]Warning: Not running on Arch Linux[/]")
        console.print("[yellow]Some tests may fail or be skipped[/]")
    # Check for required tools
    for tool in ["makepkg", "pacman", "git"]:
        result = subprocess.run(["which", tool], capture_output=True)
        if result.returncode != 0:
            console.print(f"[red]Required tool not found: {tool}[/]")
            return 1
    test = IntegrationTest(keep_temp=keep_temp)
    success = await test.run()
    return 0 if success else 1
if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
tests/test_aur.py
New file
@@ -0,0 +1,127 @@
"""Tests for AUR client."""
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from datetime import datetime
from archbuild.aur import AURClient, Package
@pytest.fixture
def sample_package_data():
    """Sample AUR RPC response data."""
    return {
        "Name": "test-package",
        "Version": "1.0.0-1",
        "Description": "A test package",
        "URL": "https://example.com",
        "Maintainer": "testuser",
        "NumVotes": 100,
        "Popularity": 1.5,
        "OutOfDate": None,
        "FirstSubmitted": 1609459200,
        "LastModified": 1640995200,
        "Depends": ["dep1", "dep2"],
        "MakeDepends": ["makedep1"],
        "CheckDepends": [],
        "OptDepends": [],
        "Provides": [],
        "Conflicts": [],
        "Replaces": [],
        "License": ["MIT"],
        "Keywords": ["test"],
    }
class TestPackage:
    """Tests for Package dataclass."""
    def test_from_rpc(self, sample_package_data):
        """Test creating Package from RPC data."""
        pkg = Package.from_rpc(sample_package_data)
        assert pkg.name == "test-package"
        assert pkg.version == "1.0.0-1"
        assert pkg.description == "A test package"
        assert pkg.maintainer == "testuser"
        assert pkg.votes == 100
        assert pkg.depends == ["dep1", "dep2"]
        assert pkg.makedepends == ["makedep1"]
    def test_git_url(self, sample_package_data):
        """Test git_url property."""
        pkg = Package.from_rpc(sample_package_data)
        assert pkg.git_url == "https://aur.archlinux.org/test-package.git"
    def test_aur_url(self, sample_package_data):
        """Test aur_url property."""
        pkg = Package.from_rpc(sample_package_data)
        assert pkg.aur_url == "https://aur.archlinux.org/packages/test-package"
class TestAURClient:
    """Tests for AURClient."""
    @pytest.mark.asyncio
    async def test_get_package_cached(self, sample_package_data):
        """Test that cached packages are returned."""
        async with AURClient(cache_ttl=300) as client:
            # Manually add to cache
            pkg = Package.from_rpc(sample_package_data)
            client._set_cached(pkg)
            # Should return from cache without network request
            result = await client.get_package("test-package")
            assert result is not None
            assert result.name == "test-package"
    @pytest.mark.asyncio
    async def test_cache_expiry(self, sample_package_data):
        """Test cache TTL expiry."""
        async with AURClient(cache_ttl=0) as client:
            pkg = Package.from_rpc(sample_package_data)
            client._set_cached(pkg)
            # Cache should be expired immediately
            assert client._get_cached("test-package") is None
    @pytest.mark.asyncio
    async def test_batch_request(self, sample_package_data):
        """Test batch package requests."""
        async with AURClient() as client:
            # Mock the request method
            mock_response = {
                "type": "multiinfo",
                "results": [sample_package_data],
            }
            client._request = AsyncMock(return_value=mock_response)
            packages = await client.get_packages(["test-package"])
            assert len(packages) == 1
            assert packages[0].name == "test-package"
class TestDependencyParsing:
    """Tests for dependency string parsing."""
    def test_parse_simple(self):
        """Test parsing simple dependency."""
        from archbuild.resolver import Dependency
        dep = Dependency.parse("package")
        assert dep.name == "package"
        assert dep.version_constraint is None
    def test_parse_with_version(self):
        """Test parsing dependency with version."""
        from archbuild.resolver import Dependency
        dep = Dependency.parse("package>=1.0")
        assert dep.name == "package"
        assert dep.version_constraint == ">=1.0"
    def test_parse_exact_version(self):
        """Test parsing dependency with exact version."""
        from archbuild.resolver import Dependency
        dep = Dependency.parse("package=2.0")
        assert dep.name == "package"
        assert dep.version_constraint == "=2.0"
tests/test_config.py
New file
@@ -0,0 +1,149 @@
"""Tests for configuration loading and validation."""
import pytest
from pathlib import Path
from tempfile import NamedTemporaryFile
from archbuild.config import (
    Config,
    load_config,
    migrate_vars_sh,
    save_config,
    BuildingConfig,
    RepositoryConfig,
)
class TestConfig:
    """Tests for Config model."""
    def test_minimal_config(self):
        """Test minimal valid configuration."""
        config = Config(
            repository=RepositoryConfig(
                name="test",
                path=Path("/repo"),
                build_dir=Path("/build"),
            )
        )
        assert config.repository.name == "test"
        assert config.building.parallel is True  # default
        assert config.building.max_workers == 4  # default
    def test_building_config_defaults(self):
        """Test BuildingConfig defaults."""
        config = BuildingConfig()
        assert config.parallel is True
        assert config.max_workers == 4
        assert config.clean is True
        assert config.retry_attempts == 3
    def test_max_workers_validation(self):
        """Test max_workers bounds."""
        with pytest.raises(ValueError):
            BuildingConfig(max_workers=0)
        with pytest.raises(ValueError):
            BuildingConfig(max_workers=100)
    def test_log_level_validation(self):
        """Test log level validation."""
        config = Config(
            repository=RepositoryConfig(
                name="test",
                path=Path("/repo"),
                build_dir=Path("/build"),
            ),
            log_level="debug",
        )
        assert config.log_level == "DEBUG"
        with pytest.raises(ValueError):
            Config(
                repository=RepositoryConfig(
                    name="test",
                    path=Path("/repo"),
                    build_dir=Path("/build"),
                ),
                log_level="invalid",
            )
class TestLoadConfig:
    """Tests for config file loading."""
    def test_load_yaml(self, tmp_path):
        """Test loading YAML config file."""
        config_file = tmp_path / "config.yaml"
        config_file.write_text("""
repository:
  name: myrepo
  path: /repo/x86_64
  build_dir: /repo/build
building:
  max_workers: 8
""")
        config = load_config(config_file)
        assert config.repository.name == "myrepo"
        assert config.building.max_workers == 8
    def test_file_not_found(self):
        """Test error on missing config file."""
        with pytest.raises(FileNotFoundError):
            load_config(Path("/nonexistent/config.yaml"))
class TestMigrateVarsSh:
    """Tests for vars.sh migration."""
    def test_migrate_basic(self, tmp_path):
        """Test basic vars.sh migration."""
        vars_file = tmp_path / "vars.sh"
        vars_file.write_text("""
REPODIR=/repo/x86_64
BUILDDIR=/repo/build
REPONAME=myrepo
PARALLEL=Y
SIGN=N
NUM_OLD=5
""")
        data = migrate_vars_sh(vars_file)
        assert data["repository"]["name"] == "myrepo"
        assert data["repository"]["path"] == "/repo/x86_64"
        assert data["building"]["parallel"] is True
        assert data["signing"]["enabled"] is False
        assert data["retention"]["keep_versions"] == 5
    def test_migrate_with_export(self, tmp_path):
        """Test migration handles export statements."""
        vars_file = tmp_path / "vars.sh"
        vars_file.write_text("""
export REPONAME="testrepo"
export REPODIR="/test/repo"
export BUILDDIR="/test/build"
""")
        data = migrate_vars_sh(vars_file)
        assert data["repository"]["name"] == "testrepo"
class TestSaveConfig:
    """Tests for config saving."""
    def test_round_trip(self, tmp_path):
        """Test config save/load round trip."""
        config = Config(
            repository=RepositoryConfig(
                name="test",
                path=Path("/repo"),
                build_dir=Path("/build"),
            ),
            building=BuildingConfig(max_workers=6),
        )
        config_file = tmp_path / "config.yaml"
        save_config(config, config_file)
        loaded = load_config(config_file)
        assert loaded.repository.name == "test"
        assert loaded.building.max_workers == 6
tests/test_resolver.py
New file
@@ -0,0 +1,131 @@
"""Tests for dependency resolver."""
import pytest
from unittest.mock import AsyncMock, patch
from archbuild.resolver import DependencyResolver, Dependency, DependencyType, BuildOrder
class TestDependency:
    """Tests for Dependency class."""
    def test_parse_simple(self):
        """Test parsing simple dependency name."""
        dep = Dependency.parse("packagename")
        assert dep.name == "packagename"
        assert dep.version_constraint is None
        assert dep.dep_type == DependencyType.RUNTIME
    def test_parse_with_gte(self):
        """Test parsing with >= constraint."""
        dep = Dependency.parse("package>=1.0")
        assert dep.name == "package"
        assert dep.version_constraint == ">=1.0"
    def test_parse_with_lte(self):
        """Test parsing with <= constraint."""
        dep = Dependency.parse("package<=2.0")
        assert dep.name == "package"
        assert dep.version_constraint == "<=2.0"
    def test_parse_build_dep(self):
        """Test parsing as build dependency."""
        dep = Dependency.parse("makedep", DependencyType.BUILD)
        assert dep.dep_type == DependencyType.BUILD
class TestBuildOrder:
    """Tests for BuildOrder class."""
    def test_iteration(self):
        """Test BuildOrder iteration."""
        order = BuildOrder(packages=["a", "b", "c"])
        assert list(order) == ["a", "b", "c"]
    def test_length(self):
        """Test BuildOrder length."""
        order = BuildOrder(packages=["a", "b"])
        assert len(order) == 2
class TestDependencyResolver:
    """Tests for DependencyResolver."""
    @pytest.fixture
    def mock_aur_client(self):
        """Create mock AUR client."""
        client = AsyncMock()
        return client
    def test_topological_sort_simple(self, mock_aur_client):
        """Test topological sort with simple graph."""
        resolver = DependencyResolver(mock_aur_client)
        # A depends on B, B depends on C
        graph = {
            "A": {"B"},
            "B": {"C"},
            "C": set(),
        }
        order = resolver._topological_sort(graph)
        # C must come before B, B must come before A
        assert order.index("C") < order.index("B")
        assert order.index("B") < order.index("A")
    def test_topological_sort_parallel(self, mock_aur_client):
        """Test topological sort with parallel dependencies."""
        resolver = DependencyResolver(mock_aur_client)
        # A depends on B and C (parallel)
        graph = {
            "A": {"B", "C"},
            "B": set(),
            "C": set(),
        }
        order = resolver._topological_sort(graph)
        # B and C must come before A
        assert order.index("B") < order.index("A")
        assert order.index("C") < order.index("A")
    def test_detect_cycles_no_cycle(self, mock_aur_client):
        """Test cycle detection with no cycles."""
        resolver = DependencyResolver(mock_aur_client)
        graph = {
            "A": {"B"},
            "B": {"C"},
            "C": set(),
        }
        cycles = resolver.detect_cycles(graph)
        assert len(cycles) == 0
    def test_detect_cycles_with_cycle(self, mock_aur_client):
        """Test cycle detection with cycle."""
        resolver = DependencyResolver(mock_aur_client)
        # A -> B -> C -> A (cycle)
        graph = {
            "A": {"B"},
            "B": {"C"},
            "C": {"A"},
        }
        cycles = resolver.detect_cycles(graph)
        assert len(cycles) > 0
    @patch("archbuild.resolver.subprocess.run")
    def test_is_in_official_repos(self, mock_run, mock_aur_client):
        """Test checking official repos."""
        mock_run.return_value.returncode = 0
        mock_run.return_value.stdout = "base\ngit\nvim\n"
        resolver = DependencyResolver(mock_aur_client)
        resolver._refresh_pacman_cache()
        assert resolver.is_in_official_repos("git")
        assert not resolver.is_in_official_repos("yay")