From dc96e98618fe4739210d10ee546b105e36433d02 Mon Sep 17 00:00:00 2001
From: Joel Grunbaum <joelgrun@gmail.com>
Date: Sat, 07 Feb 2026 21:09:40 +0000
Subject: [PATCH] Intiial python rewrite of repository
---
src/archbuild/cli.py | 381 ++++++
src/archbuild/resolver.py | 317 +++++
.gitignore | 31
src/archbuild/builder.py | 456 +++++++
src/archbuild/logging.py | 62 +
config/config.example.yaml | 87 +
src/archbuild/config.py | 202 +++
src/archbuild/aur.py | 299 +++++
src/archbuild/notifications.py | 346 +++++
tests/integration_test.py | 370 ++++++
tests/test_resolver.py | 131 ++
README.md | 129 ++
src/archbuild/repo.py | 310 +++++
src/archbuild/__init__.py | 3
tests/test_aur.py | 127 ++
pyproject.toml | 62 +
tests/test_config.py | 149 ++
agents.md | 52
tests/__init__.py | 0
19 files changed, 3,513 insertions(+), 1 deletions(-)
diff --git a/.gitignore b/.gitignore
index 92b51e6..55e2f0f 100755
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,32 @@
+# Legacy
chroot
-repo
vars.sh
+
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+.venv/
+env/
+venv/
+ENV/
+
+# Distribution / Build
+dist/
+local_build/
+build/
+*.egg-info/
+
+# Tests / Coverage
+.pytest_cache/
+.coverage
+htmlcov/
+.mypy_cache/
+.ruff_cache/
+
+# Project Specific
+config.yaml
+repo/
+waitlist*
+errorfile
+logs/
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..1ddf3f5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,129 @@
+# Archbuild
+
+A modern, sustainable AUR package building and repository management tool for Arch Linux.
+
+## Features
+
+- **Automatic AUR builds** - Clone and build packages from the AUR
+- **Dependency resolution** - Automatically resolve and build AUR dependencies in correct order
+- **Parallel builds** - Build multiple packages concurrently with configurable worker count
+- **Package signing** - Optional GPG signing for packages and database
+- **Retry logic** - Configurable retries with exponential backoff for transient failures
+- **Package retention** - Keep configurable number of old versions, auto-cleanup
+- **Notifications** - Email and webhook (Discord, Slack, ntfy) notifications on failures
+- **VCS package handling** - Automatically rebuild git/svn/hg packages
+- **Modern Python** - Type-safe, async, well-tested codebase
+
+## Installation
+
+```bash
+# From source
+pip install -e .
+
+# Or with development dependencies
+pip install -e ".[dev]"
+```
+
+## Quick Start
+
+1. **Create configuration**:
+ ```bash
+ cp config/config.example.yaml config.yaml
+ # Edit config.yaml with your settings
+ ```
+
+2. **Initialize repository**:
+ ```bash
+ archbuild -c config.yaml init
+ ```
+
+3. **Add packages**:
+ ```bash
+ archbuild add yay paru
+ ```
+
+4. **Build all packages**:
+ ```bash
+ archbuild build-all
+ ```
+
+5. **Build a specific package**:
+ ```bash
+ archbuild build <package>
+ ```
+
+## Commands
+
+| Command | Description |
+|----------------------------|--------------------------------------------|
+| `init` | Initialize repository directories |
+| `add <packages...>` | Add and build new packages |
+| `remove <packages...>` | Remove packages from repo |
+| `build-all [-f]` | Build all packages, `-f` forces rebuild |
+| `build <package>` | Build a specific package |
+| `check` | Check for packages moved to official repos |
+| `list` | List packages in repository |
+| `remake` | Rebuild repository database |
+| `cleanup` | Remove old package versions |
+| `migrate-config <vars.sh>` | Migrate legacy config |
+| `test-notifications` | Test notification setup |
+
+## Configuration
+
+See `config/config.example.yaml` for all options. Key settings:
+
+```yaml
+repository:
+ name: "myrepo"
+ path: "/repo/x86_64"
+ build_dir: "/repo/build"
+
+building:
+ parallel: true
+ max_workers: 4
+ retry_attempts: 3
+
+retention:
+ keep_versions: 3
+
+notifications:
+ email:
+ enabled: true
+ to: "admin@example.com"
+```
+
+## Migration from Bash Version
+
+```bash
+archbuild migrate-config vars.sh -o config.yaml
+```
+
+## Systemd Timer
+
+Create `/etc/systemd/system/archbuild.service`:
+```ini
+[Unit]
+Description=Build AUR packages
+
+[Service]
+Type=oneshot
+ExecStart=/usr/bin/archbuild -c /etc/archbuild/config.yaml build-all
+User=builduser
+```
+
+Create `/etc/systemd/system/archbuild.timer`:
+```ini
+[Unit]
+Description=Run archbuild daily
+
+[Timer]
+OnCalendar=daily
+Persistent=true
+
+[Install]
+WantedBy=timers.target
+```
+
+## License
+
+MIT
diff --git a/agents.md b/agents.md
new file mode 100644
index 0000000..e3e7705
--- /dev/null
+++ b/agents.md
@@ -0,0 +1,52 @@
+# Archbuild Agent Guidance
+
+This document provides guidance for AI agents maintaining or extending the Archbuild codebase.
+
+## Architecture Highlights
+
+Archbuild is a modern Python rewrite of a legacy Bash-based Arch Linux package autobuilder. It prioritizes reliability, maintainability, and extensibility.
+
+- **Asynchronous I/O**: Uses `asyncio` and `aiohttp` for AUR RPC queries and network-bound tasks.
+- **Process Isolation**: Uses `ProcessPoolExecutor` for long-running `makepkg` builds to avoid GIL limitations.
+- **Robust Locking**: Uses `fcntl.flock()` for file-based locking of the repository database and package builds.
+- **Configuration**: Uses Pydantic V2 for YAML configuration loading and validation.
+- **Logging**: Uses `rich` for structured, colorful terminal logging and optional file logging.
+
+## Core Modules
+
+- `cli.py`: Command-line interface using `click`.
+- `config.py`: Configuration models and migration tools.
+- `aur.py`: AUR RPC API client with caching and retry logic.
+- `resolver.py`: Dependency resolution and topological sorting.
+- `builder.py`: Build orchestration and parallelization.
+- `repo.py`: Repository database and package retention management.
+- `notifications.py`: Extensible notification system (Email, Webhooks).
+
+## Development Patterns
+
+### Async Usage
+Always use async context managers for `AURClient` and `Builder` to ensure sessions and executors are properly closed.
+
+```python
+async with AURClient() as aur:
+ async with Builder(config, aur) as builder:
+ await builder.build_all()
+```
+
+### Locking
+Always wrap repository updates in the repository lock found in `RepoManager`.
+
+### Adding New Commands
+Add new commands to `cli.py` using Click. If a command needs AUR access or building, use the `AURClient` and `Builder` inside an async function wrapped by `run_async`.
+
+### Testing
+- Unit tests are in `tests/test_*.py`.
+- Integration tests are in `tests/integration_test.py` and require an Arch Linux environment.
+- Use `pytest` for running unit tests.
+
+## Maintenance Checklist
+
+- [ ] Ensure all Pydantic models in `config.py` are up to date with new features.
+- [ ] Maintain the topological sort logic in `resolver.py` (Kahn's algorithm).
+- [ ] Keep `DLAGENTS` and `VCSCLIENTS` in `integration_test.py` in sync with system defaults.
+- [ ] Update `README.md` when adding new CLI commands or configuration options.
diff --git a/config/config.example.yaml b/config/config.example.yaml
new file mode 100644
index 0000000..29ea299
--- /dev/null
+++ b/config/config.example.yaml
@@ -0,0 +1,87 @@
+# Archbuild Configuration
+# Copy this file to config.yaml and edit as needed
+
+repository:
+ # Repository name (used in pacman.conf)
+ name: "myrepo"
+ # Directory containing the repository packages and database
+ path: "/repo/x86_64"
+ # Directory for cloning and building packages
+ build_dir: "/repo/build"
+ # Compression format for packages (zst, xz, gz)
+ compression: "zst"
+
+building:
+ # Enable parallel builds
+ parallel: true
+ # Maximum number of concurrent build workers
+ max_workers: 4
+ # Clean build directory after successful build
+ clean: true
+ # Update system before building (sudo pacman -Syu)
+ update_system: false
+ # Number of retry attempts on build failure
+ retry_attempts: 3
+ # Base delay between retries in seconds (exponential backoff)
+ retry_delay: 5
+
+signing:
+ # Enable package signing
+ enabled: false
+ # GPG key ID for signing (leave empty to use default key)
+ key: ""
+
+retention:
+ # Number of old package versions to keep
+ keep_versions: 3
+ # Automatically clean old versions after build
+ cleanup_on_build: true
+
+notifications:
+ email:
+ # Enable email notifications on build failures
+ enabled: false
+ # Recipient email address
+ to: ""
+ # Sender email address
+ from: "archbuild@localhost"
+ # SMTP server settings
+ smtp_host: "localhost"
+ smtp_port: 25
+ use_tls: false
+ username: ""
+ password: ""
+
+ # Webhook notifications (Discord, Slack, ntfy, etc.)
+ # Uncomment and configure as needed
+ webhooks: []
+ # Example Discord webhook:
+ # webhooks:
+ # - enabled: true
+ # type: "discord"
+ # url: "https://discord.com/api/webhooks/..."
+ #
+ # Example ntfy webhook:
+ # webhooks:
+ # - enabled: true
+ # type: "ntfy"
+ # url: "https://ntfy.sh/your-topic"
+
+# Per-package build overrides
+# Use this for packages that need special handling
+package_overrides: {}
+ # Example: Microsoft fonts need checksum skipping
+ # ttf-ms-win10:
+ # skip_checksums: true
+ #
+ # Example: Custom environment variables
+ # some-package:
+ # env:
+ # LC_ALL: "C"
+ # extra_args:
+ # - "--nocheck"
+
+# Logging configuration
+log_level: "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
+# Uncomment to also log to file:
+# log_file: "/var/log/archbuild.log"
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..bd3ddb9
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,62 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "archbuild"
+version = "2.0.0"
+description = "Automatic AUR package building and repository management for Arch Linux"
+readme = "README.md"
+license = "MIT"
+requires-python = ">=3.11"
+authors = [
+ { name = "Joel", email = "joelgrun@gmail.com" }
+]
+keywords = ["arch", "linux", "aur", "pacman", "repository", "automation"]
+classifiers = [
+ "Development Status :: 4 - Beta",
+ "Environment :: Console",
+ "Intended Audience :: System Administrators",
+ "License :: OSI Approved :: MIT License",
+ "Operating System :: POSIX :: Linux",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Topic :: System :: Software Distribution",
+]
+dependencies = [
+ "click>=8.0",
+ "pyyaml>=6.0",
+ "pydantic>=2.0",
+ "aiohttp>=3.8",
+ "rich>=13.0",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=7.0",
+ "pytest-asyncio>=0.21",
+ "pytest-cov>=4.0",
+ "mypy>=1.0",
+ "ruff>=0.1",
+]
+
+[project.scripts]
+archbuild = "archbuild.cli:main"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/archbuild"]
+
+[tool.ruff]
+target-version = "py311"
+line-length = 100
+
+[tool.ruff.lint]
+select = ["E", "F", "I", "N", "W", "UP"]
+
+[tool.mypy]
+python_version = "3.11"
+strict = true
+
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+testpaths = ["tests"]
diff --git a/src/archbuild/__init__.py b/src/archbuild/__init__.py
new file mode 100644
index 0000000..a30eefc
--- /dev/null
+++ b/src/archbuild/__init__.py
@@ -0,0 +1,3 @@
+"""Archbuild - Automatic AUR package building and repository management."""
+
+__version__ = "2.0.0"
diff --git a/src/archbuild/aur.py b/src/archbuild/aur.py
new file mode 100644
index 0000000..5984e00
--- /dev/null
+++ b/src/archbuild/aur.py
@@ -0,0 +1,299 @@
+"""Async AUR RPC API client with caching and retry logic."""
+
+import asyncio
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta
+from enum import Enum
+from typing import Any
+
+import aiohttp
+
+from archbuild.logging import get_logger
+
+logger = get_logger("aur")
+
+AUR_RPC_URL = "https://aur.archlinux.org/rpc"
+AUR_PACKAGE_URL = "https://aur.archlinux.org/packages"
+AUR_GIT_URL = "https://aur.archlinux.org"
+
+
+class PackageType(Enum):
+ """Package type from AUR."""
+
+ NORMAL = "normal"
+ SPLIT = "split"
+
+
+@dataclass
+class Package:
+ """AUR package metadata."""
+
+ name: str
+ version: str
+ description: str
+ url: str | None
+ maintainer: str | None
+ votes: int
+ popularity: float
+ out_of_date: datetime | None
+ first_submitted: datetime
+ last_modified: datetime
+ depends: list[str] = field(default_factory=list)
+ makedepends: list[str] = field(default_factory=list)
+ checkdepends: list[str] = field(default_factory=list)
+ optdepends: list[str] = field(default_factory=list)
+ provides: list[str] = field(default_factory=list)
+ conflicts: list[str] = field(default_factory=list)
+ replaces: list[str] = field(default_factory=list)
+ license: list[str] = field(default_factory=list)
+ keywords: list[str] = field(default_factory=list)
+
+ @property
+ def git_url(self) -> str:
+ """Get the git clone URL for this package."""
+ return f"{AUR_GIT_URL}/{self.name}.git"
+
+ @property
+ def aur_url(self) -> str:
+ """Get the AUR web page URL for this package."""
+ return f"{AUR_PACKAGE_URL}/{self.name}"
+
+ @classmethod
+ def from_rpc(cls, data: dict[str, Any]) -> "Package":
+ """Create Package from AUR RPC response data.
+
+ Args:
+ data: Package data from AUR RPC API
+
+ Returns:
+ Package instance
+ """
+ return cls(
+ name=data["Name"],
+ version=data["Version"],
+ description=data.get("Description", ""),
+ url=data.get("URL"),
+ maintainer=data.get("Maintainer"),
+ votes=data.get("NumVotes", 0),
+ popularity=data.get("Popularity", 0.0),
+ out_of_date=(
+ datetime.fromtimestamp(data["OutOfDate"]) if data.get("OutOfDate") else None
+ ),
+ first_submitted=datetime.fromtimestamp(data["FirstSubmitted"]),
+ last_modified=datetime.fromtimestamp(data["LastModified"]),
+ depends=data.get("Depends", []),
+ makedepends=data.get("MakeDepends", []),
+ checkdepends=data.get("CheckDepends", []),
+ optdepends=data.get("OptDepends", []),
+ provides=data.get("Provides", []),
+ conflicts=data.get("Conflicts", []),
+ replaces=data.get("Replaces", []),
+ license=data.get("License", []),
+ keywords=data.get("Keywords", []),
+ )
+
+
+@dataclass
+class CacheEntry:
+ """Cache entry with TTL."""
+
+ data: Package
+ expires: datetime
+
+
+class AURClient:
+ """Async client for AUR RPC API with caching and retry."""
+
+ def __init__(
+ self,
+ cache_ttl: int = 300,
+ max_retries: int = 3,
+ retry_delay: float = 1.0,
+ batch_size: int = 100,
+ ):
+ """Initialize AUR client.
+
+ Args:
+ cache_ttl: Cache time-to-live in seconds
+ max_retries: Maximum number of retry attempts
+ retry_delay: Base delay between retries (exponential backoff)
+ batch_size: Maximum packages per batch request
+ """
+ self.cache_ttl = cache_ttl
+ self.max_retries = max_retries
+ self.retry_delay = retry_delay
+ self.batch_size = batch_size
+ self._cache: dict[str, CacheEntry] = {}
+ self._session: aiohttp.ClientSession | None = None
+
+ async def __aenter__(self) -> "AURClient":
+ """Async context manager entry."""
+ self._session = aiohttp.ClientSession()
+ return self
+
+ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+ """Async context manager exit."""
+ if self._session:
+ await self._session.close()
+ self._session = None
+
+ def _get_cached(self, name: str) -> Package | None:
+ """Get package from cache if not expired.
+
+ Args:
+ name: Package name
+
+ Returns:
+ Cached package or None if not cached/expired
+ """
+ entry = self._cache.get(name)
+ if entry and entry.expires > datetime.now():
+ return entry.data
+ return None
+
+ def _set_cached(self, package: Package) -> None:
+ """Store package in cache.
+
+ Args:
+ package: Package to cache
+ """
+ self._cache[package.name] = CacheEntry(
+ data=package,
+ expires=datetime.now() + timedelta(seconds=self.cache_ttl),
+ )
+
+ async def _request(
+ self,
+ params: list[tuple[str, Any]] | dict[str, Any],
+ ) -> dict[str, Any]:
+ """Make request to AUR RPC API with retry logic.
+
+ Args:
+ params: Query parameters (as list of tuples for repeated keys, or dict)
+
+ Returns:
+ JSON response data
+
+ Raises:
+ aiohttp.ClientError: If request fails after all retries
+ """
+ if not self._session:
+ raise RuntimeError("AURClient must be used as async context manager")
+
+ last_error: Exception | None = None
+
+ for attempt in range(self.max_retries):
+ try:
+ async with self._session.get(AUR_RPC_URL, params=params) as response:
+ response.raise_for_status()
+ data = await response.json()
+ if data.get("type") == "error":
+ raise ValueError(f"AUR API error: {data.get('error')}")
+ return data
+ except (aiohttp.ClientError, asyncio.TimeoutError) as e:
+ last_error = e
+ if attempt < self.max_retries - 1:
+ delay = self.retry_delay * (2**attempt)
+ logger.warning(
+ f"AUR request failed (attempt {attempt + 1}/{self.max_retries}), "
+ f"retrying in {delay}s: {e}"
+ )
+ await asyncio.sleep(delay)
+
+ raise last_error or RuntimeError("Request failed")
+
+ async def get_package(self, name: str) -> Package | None:
+ """Get a single package by name.
+
+ Args:
+ name: Package name
+
+ Returns:
+ Package if found, None otherwise
+ """
+ # Check cache first
+ cached = self._get_cached(name)
+ if cached:
+ logger.debug(f"Cache hit for package: {name}")
+ return cached
+
+ packages = await self.get_packages([name])
+ return packages[0] if packages else None
+
+ async def get_packages(self, names: list[str]) -> list[Package]:
+ """Get multiple packages by name using batch queries.
+
+ Args:
+ names: List of package names
+
+ Returns:
+ List of found packages (may be fewer than requested)
+ """
+ # Separate cached and uncached packages
+ result: list[Package] = []
+ uncached: list[str] = []
+
+ for name in names:
+ cached = self._get_cached(name)
+ if cached:
+ result.append(cached)
+ else:
+ uncached.append(name)
+
+ if not uncached:
+ return result
+
+ # Batch request uncached packages
+ for i in range(0, len(uncached), self.batch_size):
+ batch = uncached[i : i + self.batch_size]
+
+ # Build params as list of tuples for repeated arg[] keys
+ params: list[tuple[str, Any]] = [("v", 5), ("type", "info")]
+ for name in batch:
+ params.append(("arg[]", name))
+
+ data = await self._request(params)
+
+ for pkg_data in data.get("results", []):
+ package = Package.from_rpc(pkg_data)
+ self._set_cached(package)
+ result.append(package)
+
+ return result
+
+ async def search(self, query: str, by: str = "name-desc") -> list[Package]:
+ """Search AUR packages.
+
+ Args:
+ query: Search query string
+ by: Search field (name, name-desc, maintainer, depends, makedepends, optdepends, checkdepends)
+
+ Returns:
+ List of matching packages
+ """
+ params = {"v": 5, "type": "search", "by": by, "arg": query}
+ data = await self._request(params)
+
+ packages = []
+ for pkg_data in data.get("results", []):
+ package = Package.from_rpc(pkg_data)
+ self._set_cached(package)
+ packages.append(package)
+
+ return packages
+
+ async def is_available(self, name: str) -> bool:
+ """Check if a package exists in the AUR.
+
+ Args:
+ name: Package name
+
+ Returns:
+ True if package exists
+ """
+ package = await self.get_package(name)
+ return package is not None
+
+ def clear_cache(self) -> None:
+ """Clear the package cache."""
+ self._cache.clear()
diff --git a/src/archbuild/builder.py b/src/archbuild/builder.py
new file mode 100644
index 0000000..d55f0b6
--- /dev/null
+++ b/src/archbuild/builder.py
@@ -0,0 +1,456 @@
+"""Package builder with parallel execution and proper locking."""
+
+import asyncio
+import fcntl
+import os
+import shutil
+import subprocess
+from dataclasses import dataclass, field
+from datetime import datetime
+from enum import Enum
+from pathlib import Path
+from concurrent.futures import ProcessPoolExecutor
+from typing import Any
+
+from archbuild.aur import AURClient
+from archbuild.config import Config, PackageOverride
+from archbuild.logging import get_logger
+from archbuild.resolver import DependencyResolver
+
+logger = get_logger("builder")
+
+
+class BuildStatus(Enum):
+ """Build status for a package."""
+
+ PENDING = "pending"
+ BUILDING = "building"
+ SUCCESS = "success"
+ FAILED = "failed"
+ SKIPPED = "skipped"
+
+
+@dataclass
+class BuildResult:
+ """Result of a package build."""
+
+ package: str
+ status: BuildStatus
+ version: str | None = None
+ duration: float = 0.0
+ error: str | None = None
+ artifacts: list[Path] = field(default_factory=list)
+ timestamp: datetime = field(default_factory=datetime.now)
+
+
+class FileLock:
+ """Context manager for file-based locking using flock."""
+
+ def __init__(self, path: Path):
+ """Initialize lock on file path.
+
+ Args:
+ path: Path to lock file
+ """
+ self.path = path
+ self.fd: int | None = None
+
+ def __enter__(self) -> "FileLock":
+ """Acquire exclusive lock."""
+ self.path.parent.mkdir(parents=True, exist_ok=True)
+ self.fd = os.open(str(self.path), os.O_RDWR | os.O_CREAT)
+ fcntl.flock(self.fd, fcntl.LOCK_EX)
+ logger.debug(f"Acquired lock: {self.path}")
+ return self
+
+ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+ """Release lock."""
+ if self.fd is not None:
+ fcntl.flock(self.fd, fcntl.LOCK_UN)
+ os.close(self.fd)
+ logger.debug(f"Released lock: {self.path}")
+
+
+def _run_makepkg(
+ package_dir: Path,
+ sign: bool = False,
+ key: str = "",
+ clean: bool = True,
+ skip_checksums: bool = False,
+ extra_args: list[str] | None = None,
+ env_overrides: dict[str, str] | None = None,
+) -> tuple[bool, str, list[Path]]:
+ """Run makepkg in a subprocess.
+
+ This runs in a separate process for parallelization.
+
+ Args:
+ package_dir: Directory containing PKGBUILD
+ sign: Whether to sign packages
+ key: GPG key for signing
+ clean: Clean build directory after build
+ skip_checksums: Skip checksum verification
+ extra_args: Additional makepkg arguments
+ env_overrides: Environment variable overrides
+
+ Returns:
+ Tuple of (success, error_message, artifact_paths)
+ """
+ cmd = ["makepkg", "-s", "--noconfirm"]
+
+ if clean:
+ cmd.append("-c")
+ if sign and key:
+ cmd.extend(["--sign", "--key", key])
+ if skip_checksums:
+ cmd.append("--skipchecksums")
+ if extra_args:
+ cmd.extend(extra_args)
+
+ env = os.environ.copy()
+ if env_overrides:
+ env.update(env_overrides)
+
+ try:
+ result = subprocess.run(
+ cmd,
+ cwd=package_dir,
+ capture_output=True,
+ text=True,
+ env=env,
+ timeout=3600, # 1 hour timeout
+ )
+
+ if result.returncode != 0:
+ return False, result.stderr or result.stdout, []
+
+ # Find built packages
+ artifacts = list(package_dir.glob("*.pkg.tar.*"))
+ artifacts = [a for a in artifacts if not a.name.endswith(".sig")]
+
+ return True, "", artifacts
+
+ except subprocess.TimeoutExpired:
+ return False, "Build timed out after 1 hour", []
+ except Exception as e:
+ return False, str(e), []
+
+
+class Builder:
+ """Package builder with parallel execution support."""
+
+ def __init__(
+ self,
+ config: Config,
+ aur_client: AURClient,
+ ):
+ """Initialize builder.
+
+ Args:
+ config: Application configuration
+ aur_client: AUR client for package info
+ """
+ self.config = config
+ self.aur_client = aur_client
+ self.resolver = DependencyResolver(aur_client)
+ self._lock_dir = config.repository.build_dir / ".locks"
+ self._executor: ProcessPoolExecutor | None = None
+
+ async def __aenter__(self) -> "Builder":
+ """Async context manager entry."""
+ max_workers = self.config.building.max_workers if self.config.building.parallel else 1
+ self._executor = ProcessPoolExecutor(max_workers=max_workers)
+ logger.info(f"Builder initialized with {max_workers} workers")
+ return self
+
+ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+ """Async context manager exit."""
+ if self._executor:
+ self._executor.shutdown(wait=True)
+ self._executor = None
+
+ def _get_lock_path(self, package: str) -> Path:
+ """Get lock file path for package.
+
+ Args:
+ package: Package name
+
+ Returns:
+ Path to lock file
+ """
+ return self._lock_dir / f"{package}.lock"
+
+ def _get_package_dir(self, package: str) -> Path:
+ """Get build directory for package.
+
+ Args:
+ package: Package name
+
+ Returns:
+ Path to package build directory
+ """
+ return self.config.repository.build_dir / package
+
+ def _get_override(self, package: str) -> PackageOverride:
+ """Get package-specific overrides.
+
+ Args:
+ package: Package name
+
+ Returns:
+ PackageOverride (default if not specified)
+ """
+ # Check for package-specific override first, then fall back to _default
+ if package in self.config.package_overrides:
+ return self.config.package_overrides[package]
+ return self.config.package_overrides.get("_default", PackageOverride())
+
+ async def _clone_or_update(self, package: str) -> bool:
+ """Clone or update package from AUR.
+
+ Args:
+ package: Package name
+
+ Returns:
+ True if there were updates (or new clone)
+ """
+ pkg_dir = self._get_package_dir(package)
+
+ if pkg_dir.exists():
+ # Update existing repo
+ result = subprocess.run(
+ ["git", "reset", "--hard"],
+ cwd=pkg_dir,
+ capture_output=True,
+ )
+ result = subprocess.run(
+ ["git", "pull"],
+ cwd=pkg_dir,
+ capture_output=True,
+ text=True,
+ )
+ return "Already up to date" not in result.stdout
+ else:
+ # Clone new repo
+ pkg_info = await self.aur_client.get_package(package)
+ if not pkg_info:
+ raise ValueError(f"Package not found in AUR: {package}")
+
+ pkg_dir.parent.mkdir(parents=True, exist_ok=True)
+ subprocess.run(
+ ["git", "clone", pkg_info.git_url, str(pkg_dir)],
+ check=True,
+ capture_output=True,
+ )
+ return True
+
+ def _is_vcs_package(self, package_dir: Path) -> bool:
+ """Check if package is a VCS package (needs rebuild on each run).
+
+ Args:
+ package_dir: Path to package directory
+
+ Returns:
+ True if VCS package
+ """
+ pkgbuild = package_dir / "PKGBUILD"
+ if not pkgbuild.exists():
+ return False
+
+ content = pkgbuild.read_text()
+ return "pkgver()" in content
+
+ async def build_package(
+ self,
+ package: str,
+ force: bool = False,
+ ) -> BuildResult:
+ """Build a single package.
+
+ Args:
+ package: Package name
+ force: Force rebuild even if up to date
+
+ Returns:
+ BuildResult with status and artifacts
+ """
+ start_time = datetime.now()
+ pkg_dir = self._get_package_dir(package)
+ override = self._get_override(package)
+
+ logger.info(f"Building package: {package}")
+
+ try:
+ # Clone or update
+ has_updates = await self._clone_or_update(package)
+ is_vcs = self._is_vcs_package(pkg_dir)
+
+ # Skip if no updates and not forced
+ if not has_updates and not is_vcs and not force:
+ logger.info(f"Skipping {package}: already up to date")
+ return BuildResult(
+ package=package,
+ status=BuildStatus.SKIPPED,
+ duration=(datetime.now() - start_time).total_seconds(),
+ )
+
+ # Run build with retries
+ last_error = ""
+ for attempt in range(self.config.building.retry_attempts):
+ if attempt > 0:
+ delay = self.config.building.retry_delay * (2 ** (attempt - 1))
+ logger.warning(
+ f"Retrying {package} (attempt {attempt + 1}/"
+ f"{self.config.building.retry_attempts}) after {delay}s"
+ )
+ await asyncio.sleep(delay)
+
+ # Run makepkg in executor
+ loop = asyncio.get_event_loop()
+ success, error, artifacts = await loop.run_in_executor(
+ self._executor,
+ _run_makepkg,
+ pkg_dir,
+ self.config.signing.enabled,
+ self.config.signing.key,
+ self.config.building.clean,
+ override.skip_checksums,
+ override.extra_args,
+ override.env,
+ )
+
+ if success:
+ duration = (datetime.now() - start_time).total_seconds()
+ logger.info(f"Successfully built {package} in {duration:.1f}s")
+ return BuildResult(
+ package=package,
+ status=BuildStatus.SUCCESS,
+ duration=duration,
+ artifacts=artifacts,
+ )
+
+ last_error = error
+
+ # All retries failed
+ duration = (datetime.now() - start_time).total_seconds()
+ logger.error(f"Failed to build {package}: {last_error}")
+ return BuildResult(
+ package=package,
+ status=BuildStatus.FAILED,
+ duration=duration,
+ error=last_error,
+ )
+
+ except Exception as e:
+ duration = (datetime.now() - start_time).total_seconds()
+ logger.exception(f"Error building {package}")
+ return BuildResult(
+ package=package,
+ status=BuildStatus.FAILED,
+ duration=duration,
+ error=str(e),
+ )
+
+ async def build_all(
+ self,
+ force: bool = False,
+ ) -> list[BuildResult]:
+ """Build all packages in build directory.
+
+ Args:
+ force: Force rebuild all packages
+
+ Returns:
+ List of build results
+ """
+ # Update system if configured
+ if self.config.building.update_system:
+ logger.info("Updating system...")
+ subprocess.run(
+ ["sudo", "pacman", "-Syu", "--noconfirm"],
+ check=False,
+ )
+
+ # Find all packages
+ build_dir = self.config.repository.build_dir
+ packages = [
+ d.name for d in build_dir.iterdir()
+ if d.is_dir() and not d.name.startswith(".")
+ ]
+
+ if not packages:
+ logger.warning("No packages found in build directory")
+ return []
+
+ logger.info(f"Building {len(packages)} packages")
+
+ # Build in parallel or sequentially
+ if self.config.building.parallel:
+ tasks = [self.build_package(pkg, force) for pkg in packages]
+ results = await asyncio.gather(*tasks)
+ else:
+ results = []
+ for pkg in packages:
+ result = await self.build_package(pkg, force)
+ results.append(result)
+
+ # Summary
+ success = sum(1 for r in results if r.status == BuildStatus.SUCCESS)
+ failed = sum(1 for r in results if r.status == BuildStatus.FAILED)
+ skipped = sum(1 for r in results if r.status == BuildStatus.SKIPPED)
+
+ logger.info(f"Build complete: {success} succeeded, {failed} failed, {skipped} skipped")
+
+ return list(results)
+
+ async def add_package(self, package: str) -> BuildResult:
+ """Add and build a new package with dependencies.
+
+ Args:
+ package: Package name
+
+ Returns:
+ BuildResult for the main package
+ """
+ logger.info(f"Adding package: {package}")
+
+ # Resolve dependencies
+ build_order = await self.resolver.resolve([package])
+
+ # Build dependencies first
+ results: list[BuildResult] = []
+ for dep in build_order:
+ if dep != package:
+ logger.info(f"Building dependency: {dep}")
+ result = await self.build_package(dep, force=True)
+ results.append(result)
+
+ if result.status == BuildStatus.FAILED:
+ logger.error(f"Dependency {dep} failed, aborting")
+ return BuildResult(
+ package=package,
+ status=BuildStatus.FAILED,
+ error=f"Dependency {dep} failed to build",
+ )
+
+ # Build main package
+ return await self.build_package(package, force=True)
+
+ def remove_package(self, package: str) -> bool:
+ """Remove a package from the build directory.
+
+ Args:
+ package: Package name
+
+ Returns:
+ True if removed successfully
+ """
+ pkg_dir = self._get_package_dir(package)
+
+ if pkg_dir.exists():
+ shutil.rmtree(pkg_dir)
+ logger.info(f"Removed package: {package}")
+ return True
+
+ logger.warning(f"Package not found: {package}")
+ return False
diff --git a/src/archbuild/cli.py b/src/archbuild/cli.py
new file mode 100644
index 0000000..e016c29
--- /dev/null
+++ b/src/archbuild/cli.py
@@ -0,0 +1,381 @@
+"""Command-line interface using Click."""
+
+import asyncio
+import sys
+from pathlib import Path
+from typing import Any
+
+import click
+from rich.console import Console
+from rich.table import Table
+
+from archbuild import __version__
+from archbuild.aur import AURClient
+from archbuild.builder import Builder, BuildStatus
+from archbuild.config import Config, load_config, migrate_vars_sh, save_config
+from archbuild.logging import console as log_console, setup_logging
+from archbuild.notifications import NotificationManager
+from archbuild.repo import RepoManager
+
+console = Console()
+
+
+def run_async(coro: Any) -> Any:
+ """Run async function in sync context."""
+ return asyncio.get_event_loop().run_until_complete(coro)
+
+
+class Context:
+ """CLI context holding shared state."""
+
+ def __init__(self, config_path: Path):
+ self.config_path = config_path
+ self._config: Config | None = None
+
+ @property
+ def config(self) -> Config:
+ if self._config is None:
+ self._config = load_config(self.config_path)
+ setup_logging(self._config.log_level, self._config.log_file)
+ return self._config
+
+
+pass_context = click.make_pass_decorator(Context)
+
+
+@click.group()
+@click.option(
+ "-c", "--config",
+ type=click.Path(exists=False, path_type=Path),
+ default=Path("config.yaml"),
+ help="Path to configuration file",
+)
+@click.version_option(__version__, prog_name="archbuild")
+@click.pass_context
+def cli(ctx: click.Context, config: Path) -> None:
+ """Archbuild - Automatic AUR package building and repository management.
+
+ A modern, sustainable replacement for legacy Bash-based AUR build systems.
+ """
+ ctx.obj = Context(config)
+
+
+@cli.command()
+@click.option("--force", "-f", is_flag=True, help="Force rebuild all packages")
+@pass_context
+def build_all(ctx: Context, force: bool) -> None:
+ """Build all packages in the build directory."""
+ config = ctx.config
+
+ async def _build_all() -> None:
+ async with AURClient() as aur:
+ async with Builder(config, aur) as builder:
+ results = await builder.build_all(force=force)
+
+ # Add to repository
+ repo = RepoManager(config)
+ for result in results:
+ if result.status == BuildStatus.SUCCESS:
+ repo.add_packages(result)
+
+ # Send notifications
+ notifier = NotificationManager(config)
+ await notifier.notify(results)
+
+ # Print summary
+ _print_results(results)
+
+ run_async(_build_all())
+
+
+@cli.command()
+@click.argument("package")
+@click.option("--force", "-f", is_flag=True, help="Force rebuild package")
+@pass_context
+def build(ctx: Context, package: str, force: bool) -> None:
+ """Build a specific package in the build directory."""
+ config = ctx.config
+
+ async def _build() -> None:
+ async with AURClient() as aur:
+ async with Builder(config, aur) as builder:
+ result = await builder.build_package(package, force=force)
+
+ if result.status == BuildStatus.SUCCESS:
+ repo = RepoManager(config)
+ repo.add_packages(result)
+
+ # Send notifications
+ notifier = NotificationManager(config)
+ await notifier.notify([result])
+
+ # Print summary
+ _print_results([result])
+
+ run_async(_build())
+
+
+@cli.command()
+@click.argument("packages", nargs=-1, required=True)
+@pass_context
+def add(ctx: Context, packages: tuple[str, ...]) -> None:
+ """Add and build new packages from the AUR."""
+ config = ctx.config
+
+ async def _add() -> None:
+ async with AURClient() as aur:
+ async with Builder(config, aur) as builder:
+ repo = RepoManager(config)
+
+ results = []
+ for package in packages:
+ console.print(f"[bold blue]Adding package:[/] {package}")
+ result = await builder.add_package(package)
+ results.append(result)
+
+ if result.status == BuildStatus.SUCCESS:
+ repo.add_packages(result)
+ console.print(f"[green]✓[/] {package} added successfully")
+ else:
+ console.print(f"[red]✗[/] {package} failed: {result.error}")
+
+ _print_results(results)
+
+ run_async(_add())
+
+
+@cli.command()
+@click.argument("packages", nargs=-1, required=True)
+@click.option("--all-official", "-a", is_flag=True, help="Remove packages that moved to official repos")
+@pass_context
+def remove(ctx: Context, packages: tuple[str, ...], all_official: bool) -> None:
+ """Remove packages from the repository and build directory."""
+ config = ctx.config
+
+ async def _remove() -> None:
+ async with AURClient() as aur:
+ async with Builder(config, aur) as builder:
+ repo = RepoManager(config)
+
+ if all_official:
+ # Find packages now in official repos
+ from archbuild.resolver import DependencyResolver
+ resolver = DependencyResolver(aur)
+
+ for pkg in repo.list_packages():
+ if resolver.is_in_official_repos(pkg.name):
+ console.print(f"[yellow]Removing {pkg.name}[/] (now in official repos)")
+ builder.remove_package(pkg.name)
+ repo.remove_package(pkg.name)
+ else:
+ for package in packages:
+ builder.remove_package(package)
+ repo.remove_package(package)
+ console.print(f"[green]✓[/] Removed {package}")
+
+ run_async(_remove())
+
+
+@cli.command()
+@pass_context
+def check(ctx: Context) -> None:
+ """Check for packages moved to official repos or removed from AUR."""
+ config = ctx.config
+
+ async def _check() -> None:
+ async with AURClient() as aur:
+ from archbuild.resolver import DependencyResolver
+ resolver = DependencyResolver(aur)
+ repo = RepoManager(config)
+
+ packages = repo.list_packages()
+ in_official: list[str] = []
+ not_in_aur: list[str] = []
+
+ with console.status("Checking packages..."):
+ for pkg in packages:
+ if resolver.is_in_official_repos(pkg.name):
+ in_official.append(pkg.name)
+ elif not await aur.is_available(pkg.name):
+ not_in_aur.append(pkg.name)
+
+ if in_official:
+ console.print("\n[yellow]Packages now in official repos:[/]")
+ for pkg in in_official:
+ console.print(f" • {pkg}")
+
+ if not_in_aur:
+ console.print("\n[red]Packages not found in AUR:[/]")
+ for pkg in not_in_aur:
+ console.print(f" • {pkg}")
+
+ if not in_official and not not_in_aur:
+ console.print("[green]All packages OK[/]")
+
+ run_async(_check())
+
+
+@cli.command()
+@pass_context
+def remake(ctx: Context) -> None:
+ """Rebuild the repository database from scratch."""
+ config = ctx.config
+ repo = RepoManager(config)
+
+ if repo.rebuild_database():
+ console.print("[green]Repository database rebuilt successfully[/]")
+ else:
+ console.print("[red]Failed to rebuild repository database[/]")
+ sys.exit(1)
+
+
+@cli.command()
+@pass_context
+def cleanup(ctx: Context) -> None:
+ """Clean up old package versions based on retention settings."""
+ config = ctx.config
+ repo = RepoManager(config)
+
+ removed = repo.cleanup()
+ console.print(f"[green]Removed {removed} old package version(s)[/]")
+
+
+@cli.command("list")
+@pass_context
+def list_packages(ctx: Context) -> None:
+ """List all packages in the repository."""
+ config = ctx.config
+ repo = RepoManager(config)
+
+ packages = repo.list_packages()
+
+ if not packages:
+ console.print("[yellow]No packages in repository[/]")
+ return
+
+ table = Table(title=f"Packages in {config.repository.name}")
+ table.add_column("Name", style="cyan")
+ table.add_column("Version", style="green")
+ table.add_column("Size", justify="right")
+ table.add_column("Modified", style="dim")
+
+ for pkg in packages:
+ size_mb = pkg.size / (1024 * 1024)
+ table.add_row(
+ pkg.name,
+ pkg.version,
+ f"{size_mb:.1f} MB",
+ pkg.modified.strftime("%Y-%m-%d %H:%M"),
+ )
+
+ console.print(table)
+
+
+@cli.command()
+@pass_context
+def test_notifications(ctx: Context) -> None:
+ """Test notification configuration by sending test messages."""
+ config = ctx.config
+
+ async def _test() -> None:
+ notifier = NotificationManager(config)
+ results = await notifier.test()
+
+ for backend, success in results.items():
+ if success:
+ console.print(f"[green]✓[/] {backend}: OK")
+ else:
+ console.print(f"[red]✗[/] {backend}: Failed")
+
+ run_async(_test())
+
+
+@cli.command()
+@click.argument("vars_file", type=click.Path(exists=True, path_type=Path))
+@click.option(
+ "-o", "--output",
+ type=click.Path(path_type=Path),
+ default=Path("config.yaml"),
+ help="Output config file path",
+)
+def migrate_config(vars_file: Path, output: Path) -> None:
+ """Migrate legacy vars.sh to new YAML config format."""
+ console.print(f"[blue]Migrating {vars_file} to {output}...[/]")
+
+ try:
+ data = migrate_vars_sh(vars_file)
+ config = Config.model_validate(data)
+ save_config(config, output)
+ console.print(f"[green]✓[/] Configuration saved to {output}")
+ console.print("[yellow]Note:[/] Please review and update the generated config.")
+ except Exception as e:
+ console.print(f"[red]Migration failed:[/] {e}")
+ sys.exit(1)
+
+
+@cli.command()
+@pass_context
+def init(ctx: Context) -> None:
+ """Initialize repository directories and configuration."""
+ config = ctx.config
+
+ # Create directories
+ config.repository.path.mkdir(parents=True, exist_ok=True)
+ config.repository.build_dir.mkdir(parents=True, exist_ok=True)
+
+ repo = RepoManager(config)
+ repo.ensure_repo_exists()
+
+ console.print(f"[green]✓[/] Repository initialized at {config.repository.path}")
+ console.print(f"[green]✓[/] Build directory: {config.repository.build_dir}")
+
+ # Check pacman.conf
+ pacman_conf = Path("/etc/pacman.conf")
+ if pacman_conf.exists():
+ content = pacman_conf.read_text()
+ if config.repository.name not in content:
+ console.print(
+ f"\n[yellow]Note:[/] Add this to /etc/pacman.conf:\n"
+ f"[{config.repository.name}]\n"
+ f"SigLevel = Optional TrustAll\n"
+ f"Server = file://{config.repository.path}"
+ )
+
+
+def _print_results(results: list[Any]) -> None:
+ """Print build results summary table."""
+ if not results:
+ return
+
+ table = Table(title="Build Results")
+ table.add_column("Package", style="cyan")
+ table.add_column("Status")
+ table.add_column("Duration", justify="right")
+ table.add_column("Error", style="dim", max_width=40)
+
+ for result in results:
+ status_style = {
+ BuildStatus.SUCCESS: "[green]✓ Success[/]",
+ BuildStatus.FAILED: "[red]✗ Failed[/]",
+ BuildStatus.SKIPPED: "[yellow]⏭ Skipped[/]",
+ BuildStatus.PENDING: "[blue]⏳ Pending[/]",
+ BuildStatus.BUILDING: "[blue]⚙ Building[/]",
+ }
+
+ table.add_row(
+ result.package,
+ status_style.get(result.status, str(result.status)),
+ f"{result.duration:.1f}s",
+ (result.error or "")[:40] if result.error else "",
+ )
+
+ console.print(table)
+
+
+def main() -> None:
+ """Entry point for the CLI."""
+ cli()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/archbuild/config.py b/src/archbuild/config.py
new file mode 100644
index 0000000..74a09c3
--- /dev/null
+++ b/src/archbuild/config.py
@@ -0,0 +1,202 @@
+"""Configuration loading and validation using Pydantic."""
+
+from pathlib import Path
+from typing import Any
+
+import yaml
+from pydantic import BaseModel, Field, field_validator
+
+
+class RepositoryConfig(BaseModel):
+ """Repository settings."""
+
+ name: str = Field(..., description="Repository name")
+ path: Path = Field(..., description="Path to repository directory")
+ build_dir: Path = Field(..., description="Path to build directory")
+ compression: str = Field(default="zst", description="Compression format")
+
+
+class BuildingConfig(BaseModel):
+ """Build settings."""
+
+ parallel: bool = Field(default=True, description="Enable parallel builds")
+ max_workers: int = Field(default=4, ge=1, le=32, description="Maximum parallel workers")
+ clean: bool = Field(default=True, description="Clean build directory after build")
+ update_system: bool = Field(default=False, description="Update system before building")
+ retry_attempts: int = Field(default=3, ge=1, le=10, description="Retry attempts on failure")
+ retry_delay: int = Field(default=5, ge=1, description="Base delay between retries (seconds)")
+
+
+class SigningConfig(BaseModel):
+ """Package signing settings."""
+
+ enabled: bool = Field(default=False, description="Enable package signing")
+ key: str = Field(default="", description="GPG key ID for signing")
+
+
+class EmailConfig(BaseModel):
+ """Email notification settings."""
+
+ enabled: bool = Field(default=False, description="Enable email notifications")
+ to: str = Field(default="", description="Recipient email address")
+ from_addr: str = Field(default="", alias="from", description="Sender email address")
+ smtp_host: str = Field(default="localhost", description="SMTP server host")
+ smtp_port: int = Field(default=25, description="SMTP server port")
+ use_tls: bool = Field(default=False, description="Use TLS for SMTP")
+ username: str = Field(default="", description="SMTP username")
+ password: str = Field(default="", description="SMTP password")
+
+
+class WebhookConfig(BaseModel):
+ """Webhook notification settings (extensible for future use)."""
+
+ enabled: bool = Field(default=False, description="Enable webhook notifications")
+ url: str = Field(default="", description="Webhook URL")
+ type: str = Field(default="generic", description="Webhook type (generic, discord, slack)")
+
+
+class NotificationsConfig(BaseModel):
+ """Notification settings."""
+
+ email: EmailConfig = Field(default_factory=EmailConfig)
+ webhooks: list[WebhookConfig] = Field(default_factory=list)
+
+
+class PackageRetentionConfig(BaseModel):
+ """Package retention settings."""
+
+ keep_versions: int = Field(default=3, ge=1, le=100, description="Number of old versions to keep")
+ cleanup_on_build: bool = Field(default=True, description="Clean old versions after build")
+
+
+class PackageOverride(BaseModel):
+ """Per-package build overrides."""
+
+ skip_checksums: bool = Field(default=False, description="Skip checksum verification")
+ extra_args: list[str] = Field(default_factory=list, description="Extra makepkg arguments")
+ env: dict[str, str] = Field(default_factory=dict, description="Environment variables")
+
+
+class Config(BaseModel):
+ """Main configuration model."""
+
+ repository: RepositoryConfig
+ building: BuildingConfig = Field(default_factory=BuildingConfig)
+ signing: SigningConfig = Field(default_factory=SigningConfig)
+ notifications: NotificationsConfig = Field(default_factory=NotificationsConfig)
+ retention: PackageRetentionConfig = Field(default_factory=PackageRetentionConfig)
+ package_overrides: dict[str, PackageOverride] = Field(
+ default_factory=dict, description="Per-package overrides"
+ )
+ log_level: str = Field(default="INFO", description="Logging level")
+ log_file: Path | None = Field(default=None, description="Optional log file path")
+
+ @field_validator("log_level")
+ @classmethod
+ def validate_log_level(cls, v: str) -> str:
+ valid_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
+ if v.upper() not in valid_levels:
+ raise ValueError(f"Invalid log level: {v}. Must be one of {valid_levels}")
+ return v.upper()
+
+
+def load_config(path: Path) -> Config:
+ """Load and validate configuration from YAML file.
+
+ Args:
+ path: Path to configuration file
+
+ Returns:
+ Validated Config object
+
+ Raises:
+ FileNotFoundError: If config file doesn't exist
+ ValidationError: If config is invalid
+ """
+ if not path.exists():
+ raise FileNotFoundError(f"Configuration file not found: {path}")
+
+ with open(path) as f:
+ data = yaml.safe_load(f)
+
+ return Config.model_validate(data)
+
+
+def migrate_vars_sh(vars_path: Path) -> dict[str, Any]:
+ """Migrate old vars.sh to new config format.
+
+ Args:
+ vars_path: Path to vars.sh file
+
+ Returns:
+ Dictionary suitable for Config.model_validate()
+ """
+ variables: dict[str, str] = {}
+
+ with open(vars_path) as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith("#") and "=" in line:
+ # Handle export statements
+ if line.startswith("export "):
+ line = line[7:]
+ key, _, value = line.partition("=")
+ # Remove quotes
+ value = value.strip("'\"")
+ variables[key] = value
+
+ # Convert to new format
+ config: dict[str, Any] = {
+ "repository": {
+ "name": variables.get("REPONAME", ""),
+ "path": variables.get("REPODIR", "/repo/x86_64"),
+ "build_dir": variables.get("BUILDDIR", "/repo/build"),
+ "compression": variables.get("COMPRESSION", "zst"),
+ },
+ "building": {
+ "parallel": variables.get("PARALLEL", "N") == "Y",
+ "clean": variables.get("CLEAN", "N") == "Y",
+ "update_system": variables.get("UPDATE", "N") == "Y",
+ },
+ "signing": {
+ "enabled": variables.get("SIGN", "N") == "Y",
+ "key": variables.get("KEY", ""),
+ },
+ "notifications": {
+ "email": {
+ "enabled": bool(variables.get("TO_EMAIL")),
+ "to": variables.get("TO_EMAIL", ""),
+ "from": variables.get("FROM_EMAIL", ""),
+ }
+ },
+ "retention": {
+ "keep_versions": int(variables.get("NUM_OLD", "5")),
+ },
+ }
+
+ return config
+
+
+def save_config(config: Config, path: Path) -> None:
+ """Save configuration to YAML file.
+
+ Args:
+ config: Config object to save
+ path: Path to save configuration file
+ """
+ data = config.model_dump(by_alias=True, exclude_none=True)
+
+ # Convert Path objects to strings for YAML
+ def convert_paths(obj: Any) -> Any:
+ if isinstance(obj, Path):
+ return str(obj)
+ elif isinstance(obj, dict):
+ return {k: convert_paths(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [convert_paths(v) for v in obj]
+ return obj
+
+ data = convert_paths(data)
+
+ with open(path, "w") as f:
+ yaml.dump(data, f, default_flow_style=False, sort_keys=False)
diff --git a/src/archbuild/logging.py b/src/archbuild/logging.py
new file mode 100644
index 0000000..1b40804
--- /dev/null
+++ b/src/archbuild/logging.py
@@ -0,0 +1,62 @@
+"""Structured logging setup using Rich."""
+
+import logging
+from pathlib import Path
+
+from rich.console import Console
+from rich.logging import RichHandler
+
+console = Console()
+
+
+def setup_logging(
+ level: str = "INFO",
+ log_file: Path | None = None,
+) -> logging.Logger:
+ """Configure logging with Rich handler for pretty console output.
+
+ Args:
+ level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
+ log_file: Optional path to log file
+
+ Returns:
+ Configured logger instance
+ """
+ handlers: list[logging.Handler] = [
+ RichHandler(
+ console=console,
+ rich_tracebacks=True,
+ tracebacks_show_locals=True,
+ show_time=True,
+ show_path=False,
+ )
+ ]
+
+ if log_file:
+ log_file.parent.mkdir(parents=True, exist_ok=True)
+ file_handler = logging.FileHandler(log_file)
+ file_handler.setFormatter(
+ logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
+ )
+ handlers.append(file_handler)
+
+ logging.basicConfig(
+ level=getattr(logging, level.upper()),
+ format="%(message)s",
+ datefmt="[%X]",
+ handlers=handlers,
+ )
+
+ return logging.getLogger("archbuild")
+
+
+def get_logger(name: str) -> logging.Logger:
+ """Get a child logger for a specific module.
+
+ Args:
+ name: Module name for the logger
+
+ Returns:
+ Logger instance
+ """
+ return logging.getLogger(f"archbuild.{name}")
diff --git a/src/archbuild/notifications.py b/src/archbuild/notifications.py
new file mode 100644
index 0000000..be00edc
--- /dev/null
+++ b/src/archbuild/notifications.py
@@ -0,0 +1,346 @@
+"""Notification system with email and extensible webhook support."""
+
+import asyncio
+import smtplib
+import ssl
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from datetime import datetime
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from typing import Any
+
+import aiohttp
+
+from archbuild.builder import BuildResult, BuildStatus
+from archbuild.config import Config, EmailConfig, WebhookConfig
+from archbuild.logging import get_logger
+
+logger = get_logger("notifications")
+
+
+@dataclass
+class BuildSummary:
+ """Summary of build results for notifications."""
+
+ total: int
+ success: int
+ failed: int
+ skipped: int
+ failed_packages: list[str]
+ duration: float
+ timestamp: datetime
+
+ @classmethod
+ def from_results(cls, results: list[BuildResult]) -> "BuildSummary":
+ """Create summary from build results."""
+ return cls(
+ total=len(results),
+ success=sum(1 for r in results if r.status == BuildStatus.SUCCESS),
+ failed=sum(1 for r in results if r.status == BuildStatus.FAILED),
+ skipped=sum(1 for r in results if r.status == BuildStatus.SKIPPED),
+ failed_packages=[r.package for r in results if r.status == BuildStatus.FAILED],
+ duration=sum(r.duration for r in results),
+ timestamp=datetime.now(),
+ )
+
+
+class NotificationBackend(ABC):
+ """Abstract base class for notification backends."""
+
+ @abstractmethod
+ async def send(self, summary: BuildSummary, config: Config) -> bool:
+ """Send notification.
+
+ Args:
+ summary: Build summary
+ config: Application config
+
+ Returns:
+ True if sent successfully
+ """
+ pass
+
+
+class EmailBackend(NotificationBackend):
+ """Email notification backend."""
+
+ def __init__(self, email_config: EmailConfig):
+ """Initialize email backend.
+
+ Args:
+ email_config: Email configuration
+ """
+ self.config = email_config
+
+ def _format_message(self, summary: BuildSummary, repo_name: str) -> str:
+ """Format email message body.
+
+ Args:
+ summary: Build summary
+ repo_name: Repository name
+
+ Returns:
+ Formatted message string
+ """
+ lines = [
+ f"Build Report for {repo_name}",
+ f"Time: {summary.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
+ "",
+ f"Total packages: {summary.total}",
+ f" Successful: {summary.success}",
+ f" Failed: {summary.failed}",
+ f" Skipped: {summary.skipped}",
+ f"Total duration: {summary.duration:.1f}s",
+ ]
+
+ if summary.failed_packages:
+ lines.extend([
+ "",
+ "Failed packages:",
+ ])
+ for pkg in summary.failed_packages:
+ lines.append(f" - {pkg}")
+
+ return "\n".join(lines)
+
+ async def send(self, summary: BuildSummary, config: Config) -> bool:
+ """Send email notification."""
+ if not self.config.enabled:
+ return True
+
+ if not self.config.to:
+ logger.warning("Email notification enabled but no recipient configured")
+ return False
+
+ # Only send on failures
+ if summary.failed == 0:
+ logger.debug("No failures, skipping email notification")
+ return True
+
+ try:
+ msg = MIMEMultipart()
+ msg["From"] = self.config.from_addr or f"archbuild@localhost"
+ msg["To"] = self.config.to
+ msg["Subject"] = f"Build Errors - {config.repository.name}"
+
+ body = self._format_message(summary, config.repository.name)
+ msg.attach(MIMEText(body, "plain"))
+
+ # Send email
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, self._send_email, msg)
+
+ logger.info(f"Sent email notification to {self.config.to}")
+ return True
+
+ except Exception as e:
+ logger.error(f"Failed to send email: {e}")
+ return False
+
+ def _send_email(self, msg: MIMEMultipart) -> None:
+ """Send email synchronously (called from executor)."""
+ if self.config.use_tls:
+ context = ssl.create_default_context()
+ with smtplib.SMTP_SSL(
+ self.config.smtp_host,
+ self.config.smtp_port,
+ context=context,
+ ) as server:
+ if self.config.username and self.config.password:
+ server.login(self.config.username, self.config.password)
+ server.send_message(msg)
+ else:
+ with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port) as server:
+ if self.config.username and self.config.password:
+ server.login(self.config.username, self.config.password)
+ server.send_message(msg)
+
+
+class WebhookBackend(NotificationBackend):
+ """Generic webhook notification backend.
+
+ Extensible for Discord, Slack, ntfy, Gotify, etc.
+ """
+
+ def __init__(self, webhook_config: WebhookConfig):
+ """Initialize webhook backend.
+
+ Args:
+ webhook_config: Webhook configuration
+ """
+ self.config = webhook_config
+
+ def _format_payload(self, summary: BuildSummary, repo_name: str) -> dict[str, Any]:
+ """Format webhook payload based on webhook type.
+
+ Args:
+ summary: Build summary
+ repo_name: Repository name
+
+ Returns:
+ Payload dictionary
+ """
+ base_content = (
+ f"**Build Report for {repo_name}**\n"
+ f"✅ Success: {summary.success} | "
+ f"❌ Failed: {summary.failed} | "
+ f"⏭️ Skipped: {summary.skipped}"
+ )
+
+ if summary.failed_packages:
+ base_content += f"\n\nFailed: {', '.join(summary.failed_packages)}"
+
+ if self.config.type == "discord":
+ return {
+ "content": base_content,
+ "embeds": [{
+ "title": f"Build Report - {repo_name}",
+ "color": 0xFF0000 if summary.failed else 0x00FF00,
+ "fields": [
+ {"name": "Success", "value": str(summary.success), "inline": True},
+ {"name": "Failed", "value": str(summary.failed), "inline": True},
+ {"name": "Skipped", "value": str(summary.skipped), "inline": True},
+ ],
+ "timestamp": summary.timestamp.isoformat(),
+ }],
+ }
+
+ elif self.config.type == "slack":
+ return {
+ "text": base_content,
+ "blocks": [
+ {
+ "type": "section",
+ "text": {"type": "mrkdwn", "text": base_content},
+ }
+ ],
+ }
+
+ elif self.config.type == "ntfy":
+ return {
+ "topic": repo_name,
+ "title": f"Build Report - {repo_name}",
+ "message": base_content,
+ "priority": 5 if summary.failed else 3,
+ "tags": ["package", "failed"] if summary.failed else ["package"],
+ }
+
+ else: # generic
+ return {
+ "repository": repo_name,
+ "summary": {
+ "total": summary.total,
+ "success": summary.success,
+ "failed": summary.failed,
+ "skipped": summary.skipped,
+ },
+ "failed_packages": summary.failed_packages,
+ "timestamp": summary.timestamp.isoformat(),
+ }
+
+ async def send(self, summary: BuildSummary, config: Config) -> bool:
+ """Send webhook notification."""
+ if not self.config.enabled:
+ return True
+
+ if not self.config.url:
+ logger.warning("Webhook notification enabled but no URL configured")
+ return False
+
+ # Only send on failures (configurable in future)
+ if summary.failed == 0:
+ logger.debug("No failures, skipping webhook notification")
+ return True
+
+ try:
+ payload = self._format_payload(summary, config.repository.name)
+
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ self.config.url,
+ json=payload,
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as response:
+ response.raise_for_status()
+
+ logger.info(f"Sent webhook notification to {self.config.url}")
+ return True
+
+ except Exception as e:
+ logger.error(f"Failed to send webhook: {e}")
+ return False
+
+
+class NotificationManager:
+ """Manage multiple notification backends."""
+
+ def __init__(self, config: Config):
+ """Initialize notification manager.
+
+ Args:
+ config: Application configuration
+ """
+ self.config = config
+ self.backends: list[NotificationBackend] = []
+
+ # Add email backend if configured
+ if config.notifications.email.enabled:
+ self.backends.append(EmailBackend(config.notifications.email))
+
+ # Add webhook backends
+ for webhook in config.notifications.webhooks:
+ if webhook.enabled:
+ self.backends.append(WebhookBackend(webhook))
+
+ async def notify(self, results: list[BuildResult]) -> dict[str, bool]:
+ """Send notifications for build results.
+
+ Args:
+ results: List of build results
+
+ Returns:
+ Dict mapping backend type to success status
+ """
+ summary = BuildSummary.from_results(results)
+ statuses: dict[str, bool] = {}
+
+ for backend in self.backends:
+ name = type(backend).__name__
+ try:
+ success = await backend.send(summary, self.config)
+ statuses[name] = success
+ except Exception as e:
+ logger.error(f"Notification backend {name} failed: {e}")
+ statuses[name] = False
+
+ return statuses
+
+ async def test(self) -> dict[str, bool]:
+ """Send test notification through all backends.
+
+ Returns:
+ Dict mapping backend type to success status
+ """
+ # Create dummy test summary
+ test_summary = BuildSummary(
+ total=3,
+ success=2,
+ failed=1,
+ skipped=0,
+ failed_packages=["test-package"],
+ duration=123.45,
+ timestamp=datetime.now(),
+ )
+
+ statuses: dict[str, bool] = {}
+ for backend in self.backends:
+ name = type(backend).__name__
+ try:
+ success = await backend.send(test_summary, self.config)
+ statuses[name] = success
+ except Exception as e:
+ logger.error(f"Test notification failed for {name}: {e}")
+ statuses[name] = False
+
+ return statuses
diff --git a/src/archbuild/repo.py b/src/archbuild/repo.py
new file mode 100644
index 0000000..d8d0960
--- /dev/null
+++ b/src/archbuild/repo.py
@@ -0,0 +1,310 @@
+"""Repository management with repo-add/repo-remove wrappers."""
+
+import shutil
+import subprocess
+from dataclasses import dataclass
+from datetime import datetime
+from pathlib import Path
+from typing import Any
+
+from archbuild.builder import BuildResult, BuildStatus, FileLock
+from archbuild.config import Config
+from archbuild.logging import get_logger
+
+logger = get_logger("repo")
+
+
+@dataclass
+class PackageInfo:
+ """Information about a package in the repository."""
+
+ name: str
+ version: str
+ filename: str
+ size: int
+ modified: datetime
+
+
+class RepoManager:
+ """Manage the pacman repository database."""
+
+ def __init__(self, config: Config):
+ """Initialize repository manager.
+
+ Args:
+ config: Application configuration
+ """
+ self.config = config
+ self._lock_path = config.repository.path / ".repo.lock"
+
+ @property
+ def db_path(self) -> Path:
+ """Get path to repository database file."""
+ compression = self.config.repository.compression or "zst"
+ return self.config.repository.path / f"{self.config.repository.name}.db.tar.{compression}"
+
+ def _get_repo_lock(self) -> FileLock:
+ """Get lock for repository operations."""
+ return FileLock(self._lock_path)
+
+ def _run_repo_command(self, cmd: list[str]) -> subprocess.CompletedProcess[str]:
+ """Run a repo-add or repo-remove command.
+
+ Args:
+ cmd: Command and arguments
+
+ Returns:
+ Completed process result
+ """
+ if self.config.signing.enabled and self.config.signing.key:
+ cmd.extend(["--sign", "--key", self.config.signing.key])
+
+ logger.debug(f"Running: {' '.join(cmd)}")
+ return subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ cwd=self.config.repository.path,
+ )
+
+ def ensure_repo_exists(self) -> None:
+ """Ensure repository directory and database exist."""
+ self.config.repository.path.mkdir(parents=True, exist_ok=True)
+
+ if not self.db_path.exists():
+ logger.info(f"Creating new repository: {self.config.repository.name}")
+ # Create empty database
+ result = self._run_repo_command([
+ "repo-add",
+ str(self.db_path),
+ ])
+ if result.returncode != 0:
+ logger.warning(f"Could not create empty database: {result.stderr}")
+
+ def add_packages(self, build_result: BuildResult) -> bool:
+ """Add built packages to the repository.
+
+ Args:
+ build_result: Result from package build
+
+ Returns:
+ True if packages were added successfully
+ """
+ if build_result.status != BuildStatus.SUCCESS:
+ logger.warning(f"Cannot add {build_result.package}: build was not successful")
+ return False
+
+ if not build_result.artifacts:
+ logger.warning(f"No artifacts to add for {build_result.package}")
+ return False
+
+ with self._get_repo_lock():
+ self.ensure_repo_exists()
+
+ # Remove old versions of this package
+ self._remove_old_packages(build_result.package)
+
+ # Copy artifacts to repo directory
+ copied_files: list[Path] = []
+ for artifact in build_result.artifacts:
+ dest = self.config.repository.path / artifact.name
+ shutil.copy2(artifact, dest)
+ copied_files.append(dest)
+
+ # Also copy signature if exists
+ sig_path = artifact.with_suffix(artifact.suffix + ".sig")
+ if sig_path.exists():
+ shutil.copy2(sig_path, self.config.repository.path / sig_path.name)
+
+ # Add to database
+ result = self._run_repo_command([
+ "repo-add",
+ str(self.db_path),
+ ] + [str(f) for f in copied_files])
+
+ if result.returncode != 0:
+ logger.error(f"Failed to add packages to database: {result.stderr}")
+ return False
+
+ logger.info(f"Added {len(copied_files)} package(s) to repository")
+ return True
+
+ def remove_package(self, package: str) -> bool:
+ """Remove a package from the repository.
+
+ Args:
+ package: Package name to remove
+
+ Returns:
+ True if removed successfully
+ """
+ with self._get_repo_lock():
+ # Remove from database
+ result = self._run_repo_command([
+ "repo-remove",
+ str(self.db_path),
+ package,
+ ])
+
+ if result.returncode != 0:
+ logger.warning(f"Failed to remove {package} from database: {result.stderr}")
+
+ # Remove package files
+ removed = 0
+ for f in self.config.repository.path.glob(f"{package}-*.pkg.tar.*"):
+ f.unlink()
+ removed += 1
+
+ logger.info(f"Removed {package} ({removed} files)")
+ return True
+
+ def _remove_old_packages(self, package: str) -> int:
+ """Remove old versions of a package beyond retention limit.
+
+ Args:
+ package: Package name
+
+ Returns:
+ Number of packages removed
+ """
+ keep_versions = self.config.retention.keep_versions
+ pattern = f"{package}-*.pkg.tar.*"
+
+ # Find all package files
+ files = list(self.config.repository.path.glob(pattern))
+ files = [f for f in files if not f.name.endswith(".sig")]
+
+ if len(files) <= keep_versions:
+ return 0
+
+ # Sort by modification time, oldest first
+ files.sort(key=lambda f: f.stat().st_mtime)
+
+ # Remove oldest files exceeding retention
+ to_remove = files[:-keep_versions] if keep_versions > 0 else files
+ removed = 0
+
+ for f in to_remove:
+ f.unlink()
+ # Also remove signature
+ sig = f.with_suffix(f.suffix + ".sig")
+ if sig.exists():
+ sig.unlink()
+ removed += 1
+
+ if removed:
+ logger.info(f"Cleaned up {removed} old version(s) of {package}")
+
+ return removed
+
+ def list_packages(self) -> list[PackageInfo]:
+ """List all packages in the repository.
+
+ Returns:
+ List of PackageInfo objects
+ """
+ packages: list[PackageInfo] = []
+
+ for f in self.config.repository.path.glob("*.pkg.tar.*"):
+ if f.name.endswith(".sig"):
+ continue
+
+ # Parse package name and version from filename
+ # Format: name-version-rel-arch.pkg.tar.zst
+ parts = f.stem.replace(".pkg.tar", "").rsplit("-", 3)
+ if len(parts) >= 3:
+ name = "-".join(parts[:-2])
+ version = f"{parts[-2]}-{parts[-1]}" if len(parts) > 2 else parts[-1]
+ else:
+ name = f.stem
+ version = "unknown"
+
+ stat = f.stat()
+ packages.append(PackageInfo(
+ name=name,
+ version=version,
+ filename=f.name,
+ size=stat.st_size,
+ modified=datetime.fromtimestamp(stat.st_mtime),
+ ))
+
+ return sorted(packages, key=lambda p: p.name)
+
+ def rebuild_database(self) -> bool:
+ """Rebuild the repository database from scratch.
+
+ Returns:
+ True if successful
+ """
+ with self._get_repo_lock():
+ logger.info("Rebuilding repository database")
+
+ # Remove old database
+ for f in self.config.repository.path.glob(f"{self.config.repository.name}.db*"):
+ f.unlink()
+ for f in self.config.repository.path.glob(f"{self.config.repository.name}.files*"):
+ f.unlink()
+
+ # Find all packages
+ packages = list(self.config.repository.path.glob("*.pkg.tar.*"))
+ packages = [p for p in packages if not p.name.endswith(".sig")]
+
+ if not packages:
+ logger.warning("No packages found to add to database")
+ return True
+
+ # Add all packages
+ result = self._run_repo_command([
+ "repo-add",
+ str(self.db_path),
+ ] + [str(p) for p in packages])
+
+ if result.returncode != 0:
+ logger.error(f"Failed to rebuild database: {result.stderr}")
+ return False
+
+ logger.info(f"Database rebuilt with {len(packages)} packages")
+ return True
+
+ def cleanup(self) -> int:
+ """Run cleanup on all packages, removing old versions.
+
+ Returns:
+ Total number of old packages removed
+ """
+ # Get unique package names
+ packages = self.list_packages()
+ unique_names = set(p.name for p in packages)
+
+ total_removed = 0
+ for name in unique_names:
+ total_removed += self._remove_old_packages(name)
+
+ return total_removed
+
+ def check_integrity(self) -> list[str]:
+ """Check repository integrity.
+
+ Returns:
+ List of issues found
+ """
+ issues: list[str] = []
+
+ # Check database exists
+ if not self.db_path.exists():
+ issues.append(f"Database not found: {self.db_path}")
+ return issues
+
+ # Check for orphaned packages (in dir but not in db)
+ # This would require parsing the db, simplified for now
+
+ # Check for missing signatures
+ if self.config.signing.enabled:
+ for pkg in self.config.repository.path.glob("*.pkg.tar.*"):
+ if pkg.name.endswith(".sig"):
+ continue
+ sig = pkg.with_suffix(pkg.suffix + ".sig")
+ if not sig.exists():
+ issues.append(f"Missing signature: {pkg.name}")
+
+ return issues
diff --git a/src/archbuild/resolver.py b/src/archbuild/resolver.py
new file mode 100644
index 0000000..1b89a98
--- /dev/null
+++ b/src/archbuild/resolver.py
@@ -0,0 +1,317 @@
+"""Dependency resolution with topological sorting."""
+
+import subprocess
+from collections import defaultdict
+from dataclasses import dataclass, field
+from enum import Enum
+
+from archbuild.aur import AURClient, Package
+from archbuild.logging import get_logger
+
+logger = get_logger("resolver")
+
+
+class DependencyType(Enum):
+ """Type of dependency."""
+
+ RUNTIME = "depends"
+ BUILD = "makedepends"
+ CHECK = "checkdepends"
+
+
+@dataclass
+class Dependency:
+ """A package dependency."""
+
+ name: str
+ version_constraint: str | None = None
+ dep_type: DependencyType = DependencyType.RUNTIME
+ is_aur: bool = False
+
+ @classmethod
+ def parse(cls, dep_string: str, dep_type: DependencyType = DependencyType.RUNTIME) -> "Dependency":
+ """Parse dependency string like 'package>=1.0'.
+
+ Args:
+ dep_string: Dependency string
+ dep_type: Type of dependency
+
+ Returns:
+ Dependency instance
+ """
+ # Handle version constraints
+ for op in [">=", "<=", "=", ">", "<"]:
+ if op in dep_string:
+ name, constraint = dep_string.split(op, 1)
+ return cls(name=name, version_constraint=f"{op}{constraint}", dep_type=dep_type)
+ return cls(name=dep_string, dep_type=dep_type)
+
+
+@dataclass
+class BuildOrder:
+ """Ordered list of packages to build."""
+
+ packages: list[str] = field(default_factory=list)
+ aur_dependencies: dict[str, list[Dependency]] = field(default_factory=dict)
+
+ def __iter__(self):
+ return iter(self.packages)
+
+ def __len__(self):
+ return len(self.packages)
+
+
+class DependencyResolver:
+ """Resolve dependencies for AUR packages using topological sort."""
+
+ def __init__(self, aur_client: AURClient):
+ """Initialize resolver with AUR client.
+
+ Args:
+ aur_client: AURClient instance for fetching package info
+ """
+ self.aur_client = aur_client
+ self._pacman_cache: set[str] = set()
+ self._pacman_checked = False
+
+ def _refresh_pacman_cache(self) -> None:
+ """Refresh cache of packages available from official repos."""
+ try:
+ result = subprocess.run(
+ ["pacman", "-Ssq"],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+ self._pacman_cache = set(result.stdout.strip().split("\n"))
+ self._pacman_checked = True
+ logger.debug(f"Cached {len(self._pacman_cache)} packages from official repos")
+ except subprocess.CalledProcessError as e:
+ logger.warning(f"Failed to get pacman package list: {e}")
+ self._pacman_cache = set()
+
+ def is_in_official_repos(self, name: str) -> bool:
+ """Check if package is available in official repositories.
+
+ Args:
+ name: Package name (without version constraint)
+
+ Returns:
+ True if available in official repos
+ """
+ if not self._pacman_checked:
+ self._refresh_pacman_cache()
+
+ # Strip version constraint
+ base_name = name.split(">=")[0].split("<=")[0].split("=")[0].split(">")[0].split("<")[0]
+ return base_name in self._pacman_cache
+
+ def is_installed(self, name: str) -> bool:
+ """Check if package is already installed.
+
+ Args:
+ name: Package name
+
+ Returns:
+ True if installed
+ """
+ base_name = name.split(">=")[0].split("<=")[0].split("=")[0].split(">")[0].split("<")[0]
+ result = subprocess.run(
+ ["pacman", "-Qi", base_name],
+ capture_output=True,
+ )
+ return result.returncode == 0
+
+ async def _get_all_dependencies(
+ self,
+ package: Package,
+ visited: set[str],
+ graph: dict[str, set[str]],
+ packages: dict[str, Package],
+ ) -> None:
+ """Recursively fetch all AUR dependencies.
+
+ Args:
+ package: Package to get dependencies for
+ visited: Set of already visited package names
+ graph: Dependency graph (package -> dependencies)
+ packages: Map of package names to Package objects
+ """
+ if package.name in visited:
+ return
+
+ visited.add(package.name)
+ packages[package.name] = package
+ graph[package.name] = set()
+
+ # Collect all dependencies
+ all_deps: list[str] = []
+ all_deps.extend(package.depends)
+ all_deps.extend(package.makedepends)
+
+ aur_deps: list[str] = []
+ for dep in all_deps:
+ dep_parsed = Dependency.parse(dep)
+ base_name = dep_parsed.name
+
+ # Skip if in official repos or already installed
+ if self.is_in_official_repos(base_name):
+ continue
+ if self.is_installed(base_name):
+ continue
+
+ aur_deps.append(base_name)
+ graph[package.name].add(base_name)
+
+ # Fetch AUR dependencies
+ if aur_deps:
+ dep_packages = await self.aur_client.get_packages(aur_deps)
+ for dep_pkg in dep_packages:
+ await self._get_all_dependencies(dep_pkg, visited, graph, packages)
+
+ def _topological_sort(self, graph: dict[str, set[str]]) -> list[str]:
+ """Perform topological sort on dependency graph.
+
+ Args:
+ graph: Dependency graph (package -> its dependencies)
+
+ Returns:
+ List of packages in build order (dependencies first)
+
+ Raises:
+ ValueError: If circular dependency detected
+ """
+ # Ensure all nodes are in the graph (including leaf dependencies)
+ all_nodes: set[str] = set(graph.keys())
+ for deps in graph.values():
+ all_nodes.update(deps)
+
+ # Add missing nodes to graph
+ for node in all_nodes:
+ if node not in graph:
+ graph[node] = set()
+
+ # Kahn's algorithm
+ # in_degree[X] = number of packages that X depends on (outgoing edges)
+ # We want to process packages whose dependencies are all processed
+ in_degree: dict[str, int] = {node: len(graph[node]) for node in graph}
+
+ # Start with nodes that have no dependencies (leaves)
+ queue = [node for node in graph if in_degree[node] == 0]
+ result: list[str] = []
+
+ while queue:
+ node = queue.pop(0)
+ result.append(node)
+
+ # For each package that depends on this node, decrement its in-degree
+ for pkg, deps in graph.items():
+ if node in deps:
+ in_degree[pkg] -= 1
+ if in_degree[pkg] == 0:
+ queue.append(pkg)
+
+ if len(result) != len(graph):
+ # Find cycle
+ remaining = set(graph.keys()) - set(result)
+ raise ValueError(f"Circular dependency detected involving: {remaining}")
+
+ # Result is already in correct order (dependencies first)
+ return result
+
+ def detect_cycles(self, graph: dict[str, set[str]]) -> list[list[str]]:
+ """Detect circular dependencies in graph.
+
+ Args:
+ graph: Dependency graph
+
+ Returns:
+ List of cycles (each cycle is a list of package names)
+ """
+ cycles: list[list[str]] = []
+ visited: set[str] = set()
+ rec_stack: set[str] = set()
+ path: list[str] = []
+
+ def dfs(node: str) -> bool:
+ visited.add(node)
+ rec_stack.add(node)
+ path.append(node)
+
+ for neighbor in graph.get(node, set()):
+ if neighbor not in visited:
+ if dfs(neighbor):
+ return True
+ elif neighbor in rec_stack:
+ # Found cycle
+ cycle_start = path.index(neighbor)
+ cycles.append(path[cycle_start:] + [neighbor])
+ return True
+
+ path.pop()
+ rec_stack.remove(node)
+ return False
+
+ for node in graph:
+ if node not in visited:
+ dfs(node)
+
+ return cycles
+
+ async def resolve(self, package_names: list[str]) -> BuildOrder:
+ """Resolve dependencies and determine build order.
+
+ Args:
+ package_names: List of packages to resolve
+
+ Returns:
+ BuildOrder with packages in correct build order
+
+ Raises:
+ ValueError: If package not found or circular dependency
+ """
+ logger.info(f"Resolving dependencies for: {', '.join(package_names)}")
+
+ # Fetch requested packages
+ packages_list = await self.aur_client.get_packages(package_names)
+ if len(packages_list) != len(package_names):
+ found = {p.name for p in packages_list}
+ missing = set(package_names) - found
+ raise ValueError(f"Packages not found in AUR: {missing}")
+
+ # Build dependency graph
+ visited: set[str] = set()
+ graph: dict[str, set[str]] = {}
+ packages: dict[str, Package] = {}
+
+ for package in packages_list:
+ await self._get_all_dependencies(package, visited, graph, packages)
+
+ # Check for cycles
+ cycles = self.detect_cycles(graph)
+ if cycles:
+ raise ValueError(f"Circular dependencies detected: {cycles}")
+
+ # Topological sort
+ build_order = self._topological_sort(graph)
+
+ # Build AUR dependency map
+ aur_deps: dict[str, list[Dependency]] = {}
+ for name in build_order:
+ pkg = packages.get(name)
+ if pkg:
+ deps: list[Dependency] = []
+ for dep in pkg.depends:
+ parsed = Dependency.parse(dep, DependencyType.RUNTIME)
+ if not self.is_in_official_repos(parsed.name):
+ parsed.is_aur = True
+ deps.append(parsed)
+ for dep in pkg.makedepends:
+ parsed = Dependency.parse(dep, DependencyType.BUILD)
+ if not self.is_in_official_repos(parsed.name):
+ parsed.is_aur = True
+ deps.append(parsed)
+ aur_deps[name] = deps
+
+ logger.info(f"Build order: {' -> '.join(build_order)}")
+ return BuildOrder(packages=build_order, aur_dependencies=aur_deps)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/integration_test.py b/tests/integration_test.py
new file mode 100644
index 0000000..6e1cd5c
--- /dev/null
+++ b/tests/integration_test.py
@@ -0,0 +1,370 @@
+#!/usr/bin/env python3
+"""
+Integration test script for archbuild.
+
+This script creates a temporary repository, initializes it with a basic config,
+adds test packages, and verifies they build and are added correctly.
+
+Usage:
+ python tests/integration_test.py [--keep-temp]
+
+Options:
+ --keep-temp Don't delete temporary directory after test (for debugging)
+"""
+
+import asyncio
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+from pathlib import Path
+
+# Add src to path for development
+sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
+
+from archbuild.aur import AURClient
+from archbuild.builder import Builder, BuildStatus
+from archbuild.config import Config, RepositoryConfig, BuildingConfig, SigningConfig, PackageOverride
+from archbuild.logging import setup_logging, console
+from archbuild.repo import RepoManager
+
+
+# Test packages - real packages that exist in the AUR
+# Chosen for small size and fast build times
+TEST_PACKAGES = [
+ "neofetch-git", # Small bash script, very fast build
+ "yay", # Popular AUR helper
+]
+
+# Alternative packages if the above aren't available
+FALLBACK_PACKAGES = [
+ "gtk2", # Legacy GTK library
+ "paru", # Another AUR helper
+]
+
+
+class IntegrationTest:
+ """Integration test runner."""
+
+ def __init__(self, keep_temp: bool = False):
+ self.keep_temp = keep_temp
+ self.temp_dir: Path | None = None
+ self.config: Config | None = None
+ self.passed = 0
+ self.failed = 0
+
+ def setup(self) -> None:
+ """Set up temporary test environment."""
+ console.print("\n[bold blue]═══ Setting up test environment ═══[/]")
+
+ # Create temp directory
+ self.temp_dir = Path(tempfile.mkdtemp(prefix="archbuild_test_"))
+ console.print(f" Created temp directory: {self.temp_dir}")
+
+ # Create subdirectories
+ repo_dir = self.temp_dir / "repo"
+ build_dir = self.temp_dir / "build"
+ repo_dir.mkdir()
+ build_dir.mkdir()
+
+ # Create custom makepkg.conf that disables debug packages
+ # (avoids needing debugedit which may not be installed)
+ makepkg_conf = self.temp_dir / "makepkg.conf"
+ makepkg_conf.write_text("""
+# Minimal makepkg.conf for testing
+CARCH="x86_64"
+CHOST="x86_64-pc-linux-gnu"
+CFLAGS="-O2 -pipe"
+CXXFLAGS="$CFLAGS"
+LDFLAGS=""
+MAKEFLAGS="-j$(nproc)"
+OPTIONS=(!debug !strip !staticlibs)
+PKGEXT='.pkg.tar.zst'
+SRCEXT='.src.tar.gz'
+PACKAGER="Integration Test <test@test.local>"
+
+DLAGENTS=('file::/usr/bin/curl -qgC - -o %o %u'
+ 'ftp::/usr/bin/curl -qgfC - --ftp-pasv --retry 3 --retry-delay 3 -o %o %u'
+ 'http::/usr/bin/curl -qgb "" -fLC - --retry 3 --retry-delay 3 -o %o %u'
+ 'https::/usr/bin/curl -qgb "" -fLC - --retry 3 --retry-delay 3 -o %o %u'
+ 'rsync::/usr/bin/rsync --no-motd -z %u %o'
+ 'scp::/usr/bin/scp -C %u %o')
+
+VCSCLIENTS=('git::git'
+ 'hg::mercurial'
+ 'svn::subversion')
+""")
+
+ self.config = Config(
+ repository=RepositoryConfig(
+ name="testrepo",
+ path=repo_dir,
+ build_dir=build_dir,
+ compression="zst",
+ ),
+ building=BuildingConfig(
+ parallel=False, # Sequential for cleaner test output
+ max_workers=1,
+ clean=True,
+ update_system=False,
+ retry_attempts=2,
+ ),
+ signing=SigningConfig(
+ enabled=False,
+ ),
+ log_level="INFO",
+ # Apply package overrides to use our custom makepkg.conf
+ package_overrides={
+ "_default": PackageOverride(
+ extra_args=["--config", str(makepkg_conf)],
+ ),
+ },
+ )
+
+ setup_logging(self.config.log_level)
+ console.print(" [green]✓[/] Configuration created")
+
+ def teardown(self) -> None:
+ """Clean up test environment."""
+ if self.temp_dir and self.temp_dir.exists():
+ if self.keep_temp:
+ console.print(f"\n[yellow]Keeping temp directory:[/] {self.temp_dir}")
+ else:
+ shutil.rmtree(self.temp_dir)
+ console.print("\n[dim]Cleaned up temp directory[/]")
+
+ def check(self, condition: bool, message: str) -> bool:
+ """Check a condition and report pass/fail."""
+ if condition:
+ console.print(f" [green]✓[/] {message}")
+ self.passed += 1
+ return True
+ else:
+ console.print(f" [red]✗[/] {message}")
+ self.failed += 1
+ return False
+
+ async def test_init(self) -> bool:
+ """Test repository initialization."""
+ console.print("\n[bold blue]═══ Test: Repository Initialization ═══[/]")
+
+ repo = RepoManager(self.config)
+ repo.ensure_repo_exists()
+
+ # Check directories exist
+ self.check(
+ self.config.repository.path.exists(),
+ f"Repository directory exists: {self.config.repository.path}"
+ )
+ self.check(
+ self.config.repository.build_dir.exists(),
+ f"Build directory exists: {self.config.repository.build_dir}"
+ )
+
+ return self.failed == 0
+
+ async def test_aur_client(self) -> list[str]:
+ """Test AUR client and find available test packages."""
+ console.print("\n[bold blue]═══ Test: AUR Client ═══[/]")
+
+ available_packages = []
+
+ async with AURClient() as aur:
+ # Test package lookup
+ for pkg_name in TEST_PACKAGES + FALLBACK_PACKAGES:
+ pkg = await aur.get_package(pkg_name)
+ if pkg:
+ self.check(True, f"Found package: {pkg_name} ({pkg.version})")
+ available_packages.append(pkg_name)
+ if len(available_packages) >= 2:
+ break
+ else:
+ console.print(f" [yellow]⚠[/] Package not found: {pkg_name}")
+
+ self.check(
+ len(available_packages) >= 1,
+ f"Found {len(available_packages)} test package(s)"
+ )
+
+ return available_packages
+
+ async def test_build_packages(self, packages: list[str]) -> dict[str, BuildStatus]:
+ """Test building packages."""
+ console.print("\n[bold blue]═══ Test: Package Building ═══[/]")
+
+ results: dict[str, BuildStatus] = {}
+
+ async with AURClient() as aur:
+ async with Builder(self.config, aur) as builder:
+ for pkg_name in packages:
+ console.print(f"\n Building {pkg_name}...")
+ result = await builder.build_package(pkg_name, force=True)
+ results[pkg_name] = result.status
+
+ if result.status == BuildStatus.SUCCESS:
+ self.check(True, f"Built {pkg_name} successfully ({result.duration:.1f}s)")
+ self.check(
+ len(result.artifacts) > 0,
+ f" Created {len(result.artifacts)} artifact(s)"
+ )
+ for artifact in result.artifacts:
+ console.print(f" → {artifact.name}")
+ else:
+ self.check(False, f"Failed to build {pkg_name}: {result.error}")
+
+ return results
+
+ async def test_repo_add(self, packages: list[str]) -> None:
+ """Test adding packages to repository."""
+ console.print("\n[bold blue]═══ Test: Repository Management ═══[/]")
+
+ repo = RepoManager(self.config)
+
+ async with AURClient() as aur:
+ async with Builder(self.config, aur) as builder:
+ for pkg_name in packages:
+ # Get the build result with artifacts
+ pkg_dir = self.config.repository.build_dir / pkg_name
+ if not pkg_dir.exists():
+ continue
+
+ # Find artifacts
+ artifacts = list(pkg_dir.glob("*.pkg.tar.*"))
+ artifacts = [a for a in artifacts if not a.name.endswith(".sig")]
+
+ if artifacts:
+ # Create a mock build result for add_packages
+ from archbuild.builder import BuildResult
+ mock_result = BuildResult(
+ package=pkg_name,
+ status=BuildStatus.SUCCESS,
+ artifacts=artifacts,
+ )
+ success = repo.add_packages(mock_result)
+ self.check(success, f"Added {pkg_name} to repository")
+
+ # List packages in repo
+ pkg_list = repo.list_packages()
+ self.check(len(pkg_list) > 0, f"Repository contains {len(pkg_list)} package(s)")
+
+ for pkg in pkg_list:
+ console.print(f" → {pkg.name} {pkg.version} ({pkg.size / 1024:.1f} KB)")
+
+ async def test_repo_database(self) -> None:
+ """Test repository database integrity."""
+ console.print("\n[bold blue]═══ Test: Database Integrity ═══[/]")
+
+ db_path = self.config.repository.path / f"{self.config.repository.name}.db.tar.zst"
+
+ self.check(db_path.exists(), f"Database file exists: {db_path.name}")
+
+ if db_path.exists():
+ # Try to list contents with tar
+ result = subprocess.run(
+ ["tar", "-tf", str(db_path)],
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode == 0:
+ entries = [e for e in result.stdout.strip().split("\n") if e]
+ self.check(
+ len(entries) > 0,
+ f"Database contains {len(entries)} entries"
+ )
+
+ # Check for integrity issues
+ repo = RepoManager(self.config)
+ issues = repo.check_integrity()
+ self.check(
+ len(issues) == 0,
+ f"No integrity issues found" if not issues else f"Issues: {issues}"
+ )
+
+ async def test_cleanup(self) -> None:
+ """Test package cleanup functionality."""
+ console.print("\n[bold blue]═══ Test: Cleanup ═══[/]")
+
+ repo = RepoManager(self.config)
+ removed = repo.cleanup()
+
+ self.check(True, f"Cleanup removed {removed} old version(s)")
+
+ async def run(self) -> bool:
+ """Run all integration tests."""
+ console.print("[bold magenta]╔════════════════════════════════════════════╗[/]")
+ console.print("[bold magenta]║ Archbuild Integration Test Suite ║[/]")
+ console.print("[bold magenta]╚════════════════════════════════════════════╝[/]")
+
+ try:
+ self.setup()
+
+ # Run tests
+ await self.test_init()
+
+ packages = await self.test_aur_client()
+ if not packages:
+ console.print("[red]No test packages available, cannot continue[/]")
+ return False
+
+ build_results = await self.test_build_packages(packages) # Build all found packages
+
+ successful = [p for p, s in build_results.items() if s == BuildStatus.SUCCESS]
+ if successful:
+ await self.test_repo_add(successful)
+ await self.test_repo_database()
+ await self.test_cleanup()
+ else:
+ console.print("[yellow]No successful builds to test repository with[/]")
+
+ # Summary
+ console.print("\n[bold blue]═══ Test Summary ═══[/]")
+ total = self.passed + self.failed
+ console.print(f" Total: {total}")
+ console.print(f" [green]Passed: {self.passed}[/]")
+ console.print(f" [red]Failed: {self.failed}[/]")
+
+ if self.failed == 0:
+ console.print("\n[bold green]✓ All tests passed![/]")
+ return True
+ else:
+ console.print(f"\n[bold red]✗ {self.failed} test(s) failed[/]")
+ return False
+
+ except KeyboardInterrupt:
+ console.print("\n[yellow]Test interrupted[/]")
+ return False
+ except Exception as e:
+ console.print(f"\n[bold red]Test error:[/] {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+ finally:
+ self.teardown()
+
+
+async def main() -> int:
+ """Main entry point."""
+ keep_temp = "--keep-temp" in sys.argv
+
+ # Check if running on Arch Linux
+ if not Path("/etc/arch-release").exists():
+ console.print("[yellow]Warning: Not running on Arch Linux[/]")
+ console.print("[yellow]Some tests may fail or be skipped[/]")
+
+ # Check for required tools
+ for tool in ["makepkg", "pacman", "git"]:
+ result = subprocess.run(["which", tool], capture_output=True)
+ if result.returncode != 0:
+ console.print(f"[red]Required tool not found: {tool}[/]")
+ return 1
+
+ test = IntegrationTest(keep_temp=keep_temp)
+ success = await test.run()
+
+ return 0 if success else 1
+
+
+if __name__ == "__main__":
+ exit_code = asyncio.run(main())
+ sys.exit(exit_code)
diff --git a/tests/test_aur.py b/tests/test_aur.py
new file mode 100644
index 0000000..93975ce
--- /dev/null
+++ b/tests/test_aur.py
@@ -0,0 +1,127 @@
+"""Tests for AUR client."""
+
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import datetime
+
+from archbuild.aur import AURClient, Package
+
+
+@pytest.fixture
+def sample_package_data():
+ """Sample AUR RPC response data."""
+ return {
+ "Name": "test-package",
+ "Version": "1.0.0-1",
+ "Description": "A test package",
+ "URL": "https://example.com",
+ "Maintainer": "testuser",
+ "NumVotes": 100,
+ "Popularity": 1.5,
+ "OutOfDate": None,
+ "FirstSubmitted": 1609459200,
+ "LastModified": 1640995200,
+ "Depends": ["dep1", "dep2"],
+ "MakeDepends": ["makedep1"],
+ "CheckDepends": [],
+ "OptDepends": [],
+ "Provides": [],
+ "Conflicts": [],
+ "Replaces": [],
+ "License": ["MIT"],
+ "Keywords": ["test"],
+ }
+
+
+class TestPackage:
+ """Tests for Package dataclass."""
+
+ def test_from_rpc(self, sample_package_data):
+ """Test creating Package from RPC data."""
+ pkg = Package.from_rpc(sample_package_data)
+
+ assert pkg.name == "test-package"
+ assert pkg.version == "1.0.0-1"
+ assert pkg.description == "A test package"
+ assert pkg.maintainer == "testuser"
+ assert pkg.votes == 100
+ assert pkg.depends == ["dep1", "dep2"]
+ assert pkg.makedepends == ["makedep1"]
+
+ def test_git_url(self, sample_package_data):
+ """Test git_url property."""
+ pkg = Package.from_rpc(sample_package_data)
+ assert pkg.git_url == "https://aur.archlinux.org/test-package.git"
+
+ def test_aur_url(self, sample_package_data):
+ """Test aur_url property."""
+ pkg = Package.from_rpc(sample_package_data)
+ assert pkg.aur_url == "https://aur.archlinux.org/packages/test-package"
+
+
+class TestAURClient:
+ """Tests for AURClient."""
+
+ @pytest.mark.asyncio
+ async def test_get_package_cached(self, sample_package_data):
+ """Test that cached packages are returned."""
+ async with AURClient(cache_ttl=300) as client:
+ # Manually add to cache
+ pkg = Package.from_rpc(sample_package_data)
+ client._set_cached(pkg)
+
+ # Should return from cache without network request
+ result = await client.get_package("test-package")
+ assert result is not None
+ assert result.name == "test-package"
+
+ @pytest.mark.asyncio
+ async def test_cache_expiry(self, sample_package_data):
+ """Test cache TTL expiry."""
+ async with AURClient(cache_ttl=0) as client:
+ pkg = Package.from_rpc(sample_package_data)
+ client._set_cached(pkg)
+
+ # Cache should be expired immediately
+ assert client._get_cached("test-package") is None
+
+ @pytest.mark.asyncio
+ async def test_batch_request(self, sample_package_data):
+ """Test batch package requests."""
+ async with AURClient() as client:
+ # Mock the request method
+ mock_response = {
+ "type": "multiinfo",
+ "results": [sample_package_data],
+ }
+ client._request = AsyncMock(return_value=mock_response)
+
+ packages = await client.get_packages(["test-package"])
+
+ assert len(packages) == 1
+ assert packages[0].name == "test-package"
+
+
+class TestDependencyParsing:
+ """Tests for dependency string parsing."""
+
+ def test_parse_simple(self):
+ """Test parsing simple dependency."""
+ from archbuild.resolver import Dependency
+ dep = Dependency.parse("package")
+ assert dep.name == "package"
+ assert dep.version_constraint is None
+
+ def test_parse_with_version(self):
+ """Test parsing dependency with version."""
+ from archbuild.resolver import Dependency
+ dep = Dependency.parse("package>=1.0")
+ assert dep.name == "package"
+ assert dep.version_constraint == ">=1.0"
+
+ def test_parse_exact_version(self):
+ """Test parsing dependency with exact version."""
+ from archbuild.resolver import Dependency
+ dep = Dependency.parse("package=2.0")
+ assert dep.name == "package"
+ assert dep.version_constraint == "=2.0"
diff --git a/tests/test_config.py b/tests/test_config.py
new file mode 100644
index 0000000..55194af
--- /dev/null
+++ b/tests/test_config.py
@@ -0,0 +1,149 @@
+"""Tests for configuration loading and validation."""
+
+import pytest
+from pathlib import Path
+from tempfile import NamedTemporaryFile
+
+from archbuild.config import (
+ Config,
+ load_config,
+ migrate_vars_sh,
+ save_config,
+ BuildingConfig,
+ RepositoryConfig,
+)
+
+
+class TestConfig:
+ """Tests for Config model."""
+
+ def test_minimal_config(self):
+ """Test minimal valid configuration."""
+ config = Config(
+ repository=RepositoryConfig(
+ name="test",
+ path=Path("/repo"),
+ build_dir=Path("/build"),
+ )
+ )
+ assert config.repository.name == "test"
+ assert config.building.parallel is True # default
+ assert config.building.max_workers == 4 # default
+
+ def test_building_config_defaults(self):
+ """Test BuildingConfig defaults."""
+ config = BuildingConfig()
+ assert config.parallel is True
+ assert config.max_workers == 4
+ assert config.clean is True
+ assert config.retry_attempts == 3
+
+ def test_max_workers_validation(self):
+ """Test max_workers bounds."""
+ with pytest.raises(ValueError):
+ BuildingConfig(max_workers=0)
+ with pytest.raises(ValueError):
+ BuildingConfig(max_workers=100)
+
+ def test_log_level_validation(self):
+ """Test log level validation."""
+ config = Config(
+ repository=RepositoryConfig(
+ name="test",
+ path=Path("/repo"),
+ build_dir=Path("/build"),
+ ),
+ log_level="debug",
+ )
+ assert config.log_level == "DEBUG"
+
+ with pytest.raises(ValueError):
+ Config(
+ repository=RepositoryConfig(
+ name="test",
+ path=Path("/repo"),
+ build_dir=Path("/build"),
+ ),
+ log_level="invalid",
+ )
+
+
+class TestLoadConfig:
+ """Tests for config file loading."""
+
+ def test_load_yaml(self, tmp_path):
+ """Test loading YAML config file."""
+ config_file = tmp_path / "config.yaml"
+ config_file.write_text("""
+repository:
+ name: myrepo
+ path: /repo/x86_64
+ build_dir: /repo/build
+building:
+ max_workers: 8
+""")
+ config = load_config(config_file)
+ assert config.repository.name == "myrepo"
+ assert config.building.max_workers == 8
+
+ def test_file_not_found(self):
+ """Test error on missing config file."""
+ with pytest.raises(FileNotFoundError):
+ load_config(Path("/nonexistent/config.yaml"))
+
+
+class TestMigrateVarsSh:
+ """Tests for vars.sh migration."""
+
+ def test_migrate_basic(self, tmp_path):
+ """Test basic vars.sh migration."""
+ vars_file = tmp_path / "vars.sh"
+ vars_file.write_text("""
+REPODIR=/repo/x86_64
+BUILDDIR=/repo/build
+REPONAME=myrepo
+PARALLEL=Y
+SIGN=N
+NUM_OLD=5
+""")
+ data = migrate_vars_sh(vars_file)
+
+ assert data["repository"]["name"] == "myrepo"
+ assert data["repository"]["path"] == "/repo/x86_64"
+ assert data["building"]["parallel"] is True
+ assert data["signing"]["enabled"] is False
+ assert data["retention"]["keep_versions"] == 5
+
+ def test_migrate_with_export(self, tmp_path):
+ """Test migration handles export statements."""
+ vars_file = tmp_path / "vars.sh"
+ vars_file.write_text("""
+export REPONAME="testrepo"
+export REPODIR="/test/repo"
+export BUILDDIR="/test/build"
+""")
+ data = migrate_vars_sh(vars_file)
+
+ assert data["repository"]["name"] == "testrepo"
+
+
+class TestSaveConfig:
+ """Tests for config saving."""
+
+ def test_round_trip(self, tmp_path):
+ """Test config save/load round trip."""
+ config = Config(
+ repository=RepositoryConfig(
+ name="test",
+ path=Path("/repo"),
+ build_dir=Path("/build"),
+ ),
+ building=BuildingConfig(max_workers=6),
+ )
+
+ config_file = tmp_path / "config.yaml"
+ save_config(config, config_file)
+
+ loaded = load_config(config_file)
+ assert loaded.repository.name == "test"
+ assert loaded.building.max_workers == 6
diff --git a/tests/test_resolver.py b/tests/test_resolver.py
new file mode 100644
index 0000000..f2e089b
--- /dev/null
+++ b/tests/test_resolver.py
@@ -0,0 +1,131 @@
+"""Tests for dependency resolver."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from archbuild.resolver import DependencyResolver, Dependency, DependencyType, BuildOrder
+
+
+class TestDependency:
+ """Tests for Dependency class."""
+
+ def test_parse_simple(self):
+ """Test parsing simple dependency name."""
+ dep = Dependency.parse("packagename")
+ assert dep.name == "packagename"
+ assert dep.version_constraint is None
+ assert dep.dep_type == DependencyType.RUNTIME
+
+ def test_parse_with_gte(self):
+ """Test parsing with >= constraint."""
+ dep = Dependency.parse("package>=1.0")
+ assert dep.name == "package"
+ assert dep.version_constraint == ">=1.0"
+
+ def test_parse_with_lte(self):
+ """Test parsing with <= constraint."""
+ dep = Dependency.parse("package<=2.0")
+ assert dep.name == "package"
+ assert dep.version_constraint == "<=2.0"
+
+ def test_parse_build_dep(self):
+ """Test parsing as build dependency."""
+ dep = Dependency.parse("makedep", DependencyType.BUILD)
+ assert dep.dep_type == DependencyType.BUILD
+
+
+class TestBuildOrder:
+ """Tests for BuildOrder class."""
+
+ def test_iteration(self):
+ """Test BuildOrder iteration."""
+ order = BuildOrder(packages=["a", "b", "c"])
+ assert list(order) == ["a", "b", "c"]
+
+ def test_length(self):
+ """Test BuildOrder length."""
+ order = BuildOrder(packages=["a", "b"])
+ assert len(order) == 2
+
+
+class TestDependencyResolver:
+ """Tests for DependencyResolver."""
+
+ @pytest.fixture
+ def mock_aur_client(self):
+ """Create mock AUR client."""
+ client = AsyncMock()
+ return client
+
+ def test_topological_sort_simple(self, mock_aur_client):
+ """Test topological sort with simple graph."""
+ resolver = DependencyResolver(mock_aur_client)
+
+ # A depends on B, B depends on C
+ graph = {
+ "A": {"B"},
+ "B": {"C"},
+ "C": set(),
+ }
+
+ order = resolver._topological_sort(graph)
+
+ # C must come before B, B must come before A
+ assert order.index("C") < order.index("B")
+ assert order.index("B") < order.index("A")
+
+ def test_topological_sort_parallel(self, mock_aur_client):
+ """Test topological sort with parallel dependencies."""
+ resolver = DependencyResolver(mock_aur_client)
+
+ # A depends on B and C (parallel)
+ graph = {
+ "A": {"B", "C"},
+ "B": set(),
+ "C": set(),
+ }
+
+ order = resolver._topological_sort(graph)
+
+ # B and C must come before A
+ assert order.index("B") < order.index("A")
+ assert order.index("C") < order.index("A")
+
+ def test_detect_cycles_no_cycle(self, mock_aur_client):
+ """Test cycle detection with no cycles."""
+ resolver = DependencyResolver(mock_aur_client)
+
+ graph = {
+ "A": {"B"},
+ "B": {"C"},
+ "C": set(),
+ }
+
+ cycles = resolver.detect_cycles(graph)
+ assert len(cycles) == 0
+
+ def test_detect_cycles_with_cycle(self, mock_aur_client):
+ """Test cycle detection with cycle."""
+ resolver = DependencyResolver(mock_aur_client)
+
+ # A -> B -> C -> A (cycle)
+ graph = {
+ "A": {"B"},
+ "B": {"C"},
+ "C": {"A"},
+ }
+
+ cycles = resolver.detect_cycles(graph)
+ assert len(cycles) > 0
+
+ @patch("archbuild.resolver.subprocess.run")
+ def test_is_in_official_repos(self, mock_run, mock_aur_client):
+ """Test checking official repos."""
+ mock_run.return_value.returncode = 0
+ mock_run.return_value.stdout = "base\ngit\nvim\n"
+
+ resolver = DependencyResolver(mock_aur_client)
+ resolver._refresh_pacman_cache()
+
+ assert resolver.is_in_official_repos("git")
+ assert not resolver.is_in_official_repos("yay")
--
Gitblit v1.10.0