Intiial python rewrite of repository
1 files modified
18 files added
| | |
| | | # Legacy |
| | | chroot |
| | | repo |
| | | vars.sh |
| | | |
| | | # Python |
| | | __pycache__/ |
| | | *.py[cod] |
| | | *$py.class |
| | | .venv/ |
| | | env/ |
| | | venv/ |
| | | ENV/ |
| | | |
| | | # Distribution / Build |
| | | dist/ |
| | | local_build/ |
| | | build/ |
| | | *.egg-info/ |
| | | |
| | | # Tests / Coverage |
| | | .pytest_cache/ |
| | | .coverage |
| | | htmlcov/ |
| | | .mypy_cache/ |
| | | .ruff_cache/ |
| | | |
| | | # Project Specific |
| | | config.yaml |
| | | repo/ |
| | | waitlist* |
| | | errorfile |
| | | logs/ |
| New file |
| | |
| | | # Archbuild |
| | | |
| | | A modern, sustainable AUR package building and repository management tool for Arch Linux. |
| | | |
| | | ## Features |
| | | |
| | | - **Automatic AUR builds** - Clone and build packages from the AUR |
| | | - **Dependency resolution** - Automatically resolve and build AUR dependencies in correct order |
| | | - **Parallel builds** - Build multiple packages concurrently with configurable worker count |
| | | - **Package signing** - Optional GPG signing for packages and database |
| | | - **Retry logic** - Configurable retries with exponential backoff for transient failures |
| | | - **Package retention** - Keep configurable number of old versions, auto-cleanup |
| | | - **Notifications** - Email and webhook (Discord, Slack, ntfy) notifications on failures |
| | | - **VCS package handling** - Automatically rebuild git/svn/hg packages |
| | | - **Modern Python** - Type-safe, async, well-tested codebase |
| | | |
| | | ## Installation |
| | | |
| | | ```bash |
| | | # From source |
| | | pip install -e . |
| | | |
| | | # Or with development dependencies |
| | | pip install -e ".[dev]" |
| | | ``` |
| | | |
| | | ## Quick Start |
| | | |
| | | 1. **Create configuration**: |
| | | ```bash |
| | | cp config/config.example.yaml config.yaml |
| | | # Edit config.yaml with your settings |
| | | ``` |
| | | |
| | | 2. **Initialize repository**: |
| | | ```bash |
| | | archbuild -c config.yaml init |
| | | ``` |
| | | |
| | | 3. **Add packages**: |
| | | ```bash |
| | | archbuild add yay paru |
| | | ``` |
| | | |
| | | 4. **Build all packages**: |
| | | ```bash |
| | | archbuild build-all |
| | | ``` |
| | | |
| | | 5. **Build a specific package**: |
| | | ```bash |
| | | archbuild build <package> |
| | | ``` |
| | | |
| | | ## Commands |
| | | |
| | | | Command | Description | |
| | | |----------------------------|--------------------------------------------| |
| | | | `init` | Initialize repository directories | |
| | | | `add <packages...>` | Add and build new packages | |
| | | | `remove <packages...>` | Remove packages from repo | |
| | | | `build-all [-f]` | Build all packages, `-f` forces rebuild | |
| | | | `build <package>` | Build a specific package | |
| | | | `check` | Check for packages moved to official repos | |
| | | | `list` | List packages in repository | |
| | | | `remake` | Rebuild repository database | |
| | | | `cleanup` | Remove old package versions | |
| | | | `migrate-config <vars.sh>` | Migrate legacy config | |
| | | | `test-notifications` | Test notification setup | |
| | | |
| | | ## Configuration |
| | | |
| | | See `config/config.example.yaml` for all options. Key settings: |
| | | |
| | | ```yaml |
| | | repository: |
| | | name: "myrepo" |
| | | path: "/repo/x86_64" |
| | | build_dir: "/repo/build" |
| | | |
| | | building: |
| | | parallel: true |
| | | max_workers: 4 |
| | | retry_attempts: 3 |
| | | |
| | | retention: |
| | | keep_versions: 3 |
| | | |
| | | notifications: |
| | | email: |
| | | enabled: true |
| | | to: "admin@example.com" |
| | | ``` |
| | | |
| | | ## Migration from Bash Version |
| | | |
| | | ```bash |
| | | archbuild migrate-config vars.sh -o config.yaml |
| | | ``` |
| | | |
| | | ## Systemd Timer |
| | | |
| | | Create `/etc/systemd/system/archbuild.service`: |
| | | ```ini |
| | | [Unit] |
| | | Description=Build AUR packages |
| | | |
| | | [Service] |
| | | Type=oneshot |
| | | ExecStart=/usr/bin/archbuild -c /etc/archbuild/config.yaml build-all |
| | | User=builduser |
| | | ``` |
| | | |
| | | Create `/etc/systemd/system/archbuild.timer`: |
| | | ```ini |
| | | [Unit] |
| | | Description=Run archbuild daily |
| | | |
| | | [Timer] |
| | | OnCalendar=daily |
| | | Persistent=true |
| | | |
| | | [Install] |
| | | WantedBy=timers.target |
| | | ``` |
| | | |
| | | ## License |
| | | |
| | | MIT |
| New file |
| | |
| | | # Archbuild Agent Guidance |
| | | |
| | | This document provides guidance for AI agents maintaining or extending the Archbuild codebase. |
| | | |
| | | ## Architecture Highlights |
| | | |
| | | Archbuild is a modern Python rewrite of a legacy Bash-based Arch Linux package autobuilder. It prioritizes reliability, maintainability, and extensibility. |
| | | |
| | | - **Asynchronous I/O**: Uses `asyncio` and `aiohttp` for AUR RPC queries and network-bound tasks. |
| | | - **Process Isolation**: Uses `ProcessPoolExecutor` for long-running `makepkg` builds to avoid GIL limitations. |
| | | - **Robust Locking**: Uses `fcntl.flock()` for file-based locking of the repository database and package builds. |
| | | - **Configuration**: Uses Pydantic V2 for YAML configuration loading and validation. |
| | | - **Logging**: Uses `rich` for structured, colorful terminal logging and optional file logging. |
| | | |
| | | ## Core Modules |
| | | |
| | | - `cli.py`: Command-line interface using `click`. |
| | | - `config.py`: Configuration models and migration tools. |
| | | - `aur.py`: AUR RPC API client with caching and retry logic. |
| | | - `resolver.py`: Dependency resolution and topological sorting. |
| | | - `builder.py`: Build orchestration and parallelization. |
| | | - `repo.py`: Repository database and package retention management. |
| | | - `notifications.py`: Extensible notification system (Email, Webhooks). |
| | | |
| | | ## Development Patterns |
| | | |
| | | ### Async Usage |
| | | Always use async context managers for `AURClient` and `Builder` to ensure sessions and executors are properly closed. |
| | | |
| | | ```python |
| | | async with AURClient() as aur: |
| | | async with Builder(config, aur) as builder: |
| | | await builder.build_all() |
| | | ``` |
| | | |
| | | ### Locking |
| | | Always wrap repository updates in the repository lock found in `RepoManager`. |
| | | |
| | | ### Adding New Commands |
| | | Add new commands to `cli.py` using Click. If a command needs AUR access or building, use the `AURClient` and `Builder` inside an async function wrapped by `run_async`. |
| | | |
| | | ### Testing |
| | | - Unit tests are in `tests/test_*.py`. |
| | | - Integration tests are in `tests/integration_test.py` and require an Arch Linux environment. |
| | | - Use `pytest` for running unit tests. |
| | | |
| | | ## Maintenance Checklist |
| | | |
| | | - [ ] Ensure all Pydantic models in `config.py` are up to date with new features. |
| | | - [ ] Maintain the topological sort logic in `resolver.py` (Kahn's algorithm). |
| | | - [ ] Keep `DLAGENTS` and `VCSCLIENTS` in `integration_test.py` in sync with system defaults. |
| | | - [ ] Update `README.md` when adding new CLI commands or configuration options. |
| New file |
| | |
| | | # Archbuild Configuration |
| | | # Copy this file to config.yaml and edit as needed |
| | | |
| | | repository: |
| | | # Repository name (used in pacman.conf) |
| | | name: "myrepo" |
| | | # Directory containing the repository packages and database |
| | | path: "/repo/x86_64" |
| | | # Directory for cloning and building packages |
| | | build_dir: "/repo/build" |
| | | # Compression format for packages (zst, xz, gz) |
| | | compression: "zst" |
| | | |
| | | building: |
| | | # Enable parallel builds |
| | | parallel: true |
| | | # Maximum number of concurrent build workers |
| | | max_workers: 4 |
| | | # Clean build directory after successful build |
| | | clean: true |
| | | # Update system before building (sudo pacman -Syu) |
| | | update_system: false |
| | | # Number of retry attempts on build failure |
| | | retry_attempts: 3 |
| | | # Base delay between retries in seconds (exponential backoff) |
| | | retry_delay: 5 |
| | | |
| | | signing: |
| | | # Enable package signing |
| | | enabled: false |
| | | # GPG key ID for signing (leave empty to use default key) |
| | | key: "" |
| | | |
| | | retention: |
| | | # Number of old package versions to keep |
| | | keep_versions: 3 |
| | | # Automatically clean old versions after build |
| | | cleanup_on_build: true |
| | | |
| | | notifications: |
| | | email: |
| | | # Enable email notifications on build failures |
| | | enabled: false |
| | | # Recipient email address |
| | | to: "" |
| | | # Sender email address |
| | | from: "archbuild@localhost" |
| | | # SMTP server settings |
| | | smtp_host: "localhost" |
| | | smtp_port: 25 |
| | | use_tls: false |
| | | username: "" |
| | | password: "" |
| | | |
| | | # Webhook notifications (Discord, Slack, ntfy, etc.) |
| | | # Uncomment and configure as needed |
| | | webhooks: [] |
| | | # Example Discord webhook: |
| | | # webhooks: |
| | | # - enabled: true |
| | | # type: "discord" |
| | | # url: "https://discord.com/api/webhooks/..." |
| | | # |
| | | # Example ntfy webhook: |
| | | # webhooks: |
| | | # - enabled: true |
| | | # type: "ntfy" |
| | | # url: "https://ntfy.sh/your-topic" |
| | | |
| | | # Per-package build overrides |
| | | # Use this for packages that need special handling |
| | | package_overrides: {} |
| | | # Example: Microsoft fonts need checksum skipping |
| | | # ttf-ms-win10: |
| | | # skip_checksums: true |
| | | # |
| | | # Example: Custom environment variables |
| | | # some-package: |
| | | # env: |
| | | # LC_ALL: "C" |
| | | # extra_args: |
| | | # - "--nocheck" |
| | | |
| | | # Logging configuration |
| | | log_level: "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL |
| | | # Uncomment to also log to file: |
| | | # log_file: "/var/log/archbuild.log" |
| New file |
| | |
| | | [build-system] |
| | | requires = ["hatchling"] |
| | | build-backend = "hatchling.build" |
| | | |
| | | [project] |
| | | name = "archbuild" |
| | | version = "2.0.0" |
| | | description = "Automatic AUR package building and repository management for Arch Linux" |
| | | readme = "README.md" |
| | | license = "MIT" |
| | | requires-python = ">=3.11" |
| | | authors = [ |
| | | { name = "Joel", email = "joelgrun@gmail.com" } |
| | | ] |
| | | keywords = ["arch", "linux", "aur", "pacman", "repository", "automation"] |
| | | classifiers = [ |
| | | "Development Status :: 4 - Beta", |
| | | "Environment :: Console", |
| | | "Intended Audience :: System Administrators", |
| | | "License :: OSI Approved :: MIT License", |
| | | "Operating System :: POSIX :: Linux", |
| | | "Programming Language :: Python :: 3.11", |
| | | "Programming Language :: Python :: 3.12", |
| | | "Topic :: System :: Software Distribution", |
| | | ] |
| | | dependencies = [ |
| | | "click>=8.0", |
| | | "pyyaml>=6.0", |
| | | "pydantic>=2.0", |
| | | "aiohttp>=3.8", |
| | | "rich>=13.0", |
| | | ] |
| | | |
| | | [project.optional-dependencies] |
| | | dev = [ |
| | | "pytest>=7.0", |
| | | "pytest-asyncio>=0.21", |
| | | "pytest-cov>=4.0", |
| | | "mypy>=1.0", |
| | | "ruff>=0.1", |
| | | ] |
| | | |
| | | [project.scripts] |
| | | archbuild = "archbuild.cli:main" |
| | | |
| | | [tool.hatch.build.targets.wheel] |
| | | packages = ["src/archbuild"] |
| | | |
| | | [tool.ruff] |
| | | target-version = "py311" |
| | | line-length = 100 |
| | | |
| | | [tool.ruff.lint] |
| | | select = ["E", "F", "I", "N", "W", "UP"] |
| | | |
| | | [tool.mypy] |
| | | python_version = "3.11" |
| | | strict = true |
| | | |
| | | [tool.pytest.ini_options] |
| | | asyncio_mode = "auto" |
| | | testpaths = ["tests"] |
| New file |
| | |
| | | """Archbuild - Automatic AUR package building and repository management.""" |
| | | |
| | | __version__ = "2.0.0" |
| New file |
| | |
| | | """Async AUR RPC API client with caching and retry logic.""" |
| | | |
| | | import asyncio |
| | | from dataclasses import dataclass, field |
| | | from datetime import datetime, timedelta |
| | | from enum import Enum |
| | | from typing import Any |
| | | |
| | | import aiohttp |
| | | |
| | | from archbuild.logging import get_logger |
| | | |
| | | logger = get_logger("aur") |
| | | |
| | | AUR_RPC_URL = "https://aur.archlinux.org/rpc" |
| | | AUR_PACKAGE_URL = "https://aur.archlinux.org/packages" |
| | | AUR_GIT_URL = "https://aur.archlinux.org" |
| | | |
| | | |
| | | class PackageType(Enum): |
| | | """Package type from AUR.""" |
| | | |
| | | NORMAL = "normal" |
| | | SPLIT = "split" |
| | | |
| | | |
| | | @dataclass |
| | | class Package: |
| | | """AUR package metadata.""" |
| | | |
| | | name: str |
| | | version: str |
| | | description: str |
| | | url: str | None |
| | | maintainer: str | None |
| | | votes: int |
| | | popularity: float |
| | | out_of_date: datetime | None |
| | | first_submitted: datetime |
| | | last_modified: datetime |
| | | depends: list[str] = field(default_factory=list) |
| | | makedepends: list[str] = field(default_factory=list) |
| | | checkdepends: list[str] = field(default_factory=list) |
| | | optdepends: list[str] = field(default_factory=list) |
| | | provides: list[str] = field(default_factory=list) |
| | | conflicts: list[str] = field(default_factory=list) |
| | | replaces: list[str] = field(default_factory=list) |
| | | license: list[str] = field(default_factory=list) |
| | | keywords: list[str] = field(default_factory=list) |
| | | |
| | | @property |
| | | def git_url(self) -> str: |
| | | """Get the git clone URL for this package.""" |
| | | return f"{AUR_GIT_URL}/{self.name}.git" |
| | | |
| | | @property |
| | | def aur_url(self) -> str: |
| | | """Get the AUR web page URL for this package.""" |
| | | return f"{AUR_PACKAGE_URL}/{self.name}" |
| | | |
| | | @classmethod |
| | | def from_rpc(cls, data: dict[str, Any]) -> "Package": |
| | | """Create Package from AUR RPC response data. |
| | | |
| | | Args: |
| | | data: Package data from AUR RPC API |
| | | |
| | | Returns: |
| | | Package instance |
| | | """ |
| | | return cls( |
| | | name=data["Name"], |
| | | version=data["Version"], |
| | | description=data.get("Description", ""), |
| | | url=data.get("URL"), |
| | | maintainer=data.get("Maintainer"), |
| | | votes=data.get("NumVotes", 0), |
| | | popularity=data.get("Popularity", 0.0), |
| | | out_of_date=( |
| | | datetime.fromtimestamp(data["OutOfDate"]) if data.get("OutOfDate") else None |
| | | ), |
| | | first_submitted=datetime.fromtimestamp(data["FirstSubmitted"]), |
| | | last_modified=datetime.fromtimestamp(data["LastModified"]), |
| | | depends=data.get("Depends", []), |
| | | makedepends=data.get("MakeDepends", []), |
| | | checkdepends=data.get("CheckDepends", []), |
| | | optdepends=data.get("OptDepends", []), |
| | | provides=data.get("Provides", []), |
| | | conflicts=data.get("Conflicts", []), |
| | | replaces=data.get("Replaces", []), |
| | | license=data.get("License", []), |
| | | keywords=data.get("Keywords", []), |
| | | ) |
| | | |
| | | |
| | | @dataclass |
| | | class CacheEntry: |
| | | """Cache entry with TTL.""" |
| | | |
| | | data: Package |
| | | expires: datetime |
| | | |
| | | |
| | | class AURClient: |
| | | """Async client for AUR RPC API with caching and retry.""" |
| | | |
| | | def __init__( |
| | | self, |
| | | cache_ttl: int = 300, |
| | | max_retries: int = 3, |
| | | retry_delay: float = 1.0, |
| | | batch_size: int = 100, |
| | | ): |
| | | """Initialize AUR client. |
| | | |
| | | Args: |
| | | cache_ttl: Cache time-to-live in seconds |
| | | max_retries: Maximum number of retry attempts |
| | | retry_delay: Base delay between retries (exponential backoff) |
| | | batch_size: Maximum packages per batch request |
| | | """ |
| | | self.cache_ttl = cache_ttl |
| | | self.max_retries = max_retries |
| | | self.retry_delay = retry_delay |
| | | self.batch_size = batch_size |
| | | self._cache: dict[str, CacheEntry] = {} |
| | | self._session: aiohttp.ClientSession | None = None |
| | | |
| | | async def __aenter__(self) -> "AURClient": |
| | | """Async context manager entry.""" |
| | | self._session = aiohttp.ClientSession() |
| | | return self |
| | | |
| | | async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: |
| | | """Async context manager exit.""" |
| | | if self._session: |
| | | await self._session.close() |
| | | self._session = None |
| | | |
| | | def _get_cached(self, name: str) -> Package | None: |
| | | """Get package from cache if not expired. |
| | | |
| | | Args: |
| | | name: Package name |
| | | |
| | | Returns: |
| | | Cached package or None if not cached/expired |
| | | """ |
| | | entry = self._cache.get(name) |
| | | if entry and entry.expires > datetime.now(): |
| | | return entry.data |
| | | return None |
| | | |
| | | def _set_cached(self, package: Package) -> None: |
| | | """Store package in cache. |
| | | |
| | | Args: |
| | | package: Package to cache |
| | | """ |
| | | self._cache[package.name] = CacheEntry( |
| | | data=package, |
| | | expires=datetime.now() + timedelta(seconds=self.cache_ttl), |
| | | ) |
| | | |
| | | async def _request( |
| | | self, |
| | | params: list[tuple[str, Any]] | dict[str, Any], |
| | | ) -> dict[str, Any]: |
| | | """Make request to AUR RPC API with retry logic. |
| | | |
| | | Args: |
| | | params: Query parameters (as list of tuples for repeated keys, or dict) |
| | | |
| | | Returns: |
| | | JSON response data |
| | | |
| | | Raises: |
| | | aiohttp.ClientError: If request fails after all retries |
| | | """ |
| | | if not self._session: |
| | | raise RuntimeError("AURClient must be used as async context manager") |
| | | |
| | | last_error: Exception | None = None |
| | | |
| | | for attempt in range(self.max_retries): |
| | | try: |
| | | async with self._session.get(AUR_RPC_URL, params=params) as response: |
| | | response.raise_for_status() |
| | | data = await response.json() |
| | | if data.get("type") == "error": |
| | | raise ValueError(f"AUR API error: {data.get('error')}") |
| | | return data |
| | | except (aiohttp.ClientError, asyncio.TimeoutError) as e: |
| | | last_error = e |
| | | if attempt < self.max_retries - 1: |
| | | delay = self.retry_delay * (2**attempt) |
| | | logger.warning( |
| | | f"AUR request failed (attempt {attempt + 1}/{self.max_retries}), " |
| | | f"retrying in {delay}s: {e}" |
| | | ) |
| | | await asyncio.sleep(delay) |
| | | |
| | | raise last_error or RuntimeError("Request failed") |
| | | |
| | | async def get_package(self, name: str) -> Package | None: |
| | | """Get a single package by name. |
| | | |
| | | Args: |
| | | name: Package name |
| | | |
| | | Returns: |
| | | Package if found, None otherwise |
| | | """ |
| | | # Check cache first |
| | | cached = self._get_cached(name) |
| | | if cached: |
| | | logger.debug(f"Cache hit for package: {name}") |
| | | return cached |
| | | |
| | | packages = await self.get_packages([name]) |
| | | return packages[0] if packages else None |
| | | |
| | | async def get_packages(self, names: list[str]) -> list[Package]: |
| | | """Get multiple packages by name using batch queries. |
| | | |
| | | Args: |
| | | names: List of package names |
| | | |
| | | Returns: |
| | | List of found packages (may be fewer than requested) |
| | | """ |
| | | # Separate cached and uncached packages |
| | | result: list[Package] = [] |
| | | uncached: list[str] = [] |
| | | |
| | | for name in names: |
| | | cached = self._get_cached(name) |
| | | if cached: |
| | | result.append(cached) |
| | | else: |
| | | uncached.append(name) |
| | | |
| | | if not uncached: |
| | | return result |
| | | |
| | | # Batch request uncached packages |
| | | for i in range(0, len(uncached), self.batch_size): |
| | | batch = uncached[i : i + self.batch_size] |
| | | |
| | | # Build params as list of tuples for repeated arg[] keys |
| | | params: list[tuple[str, Any]] = [("v", 5), ("type", "info")] |
| | | for name in batch: |
| | | params.append(("arg[]", name)) |
| | | |
| | | data = await self._request(params) |
| | | |
| | | for pkg_data in data.get("results", []): |
| | | package = Package.from_rpc(pkg_data) |
| | | self._set_cached(package) |
| | | result.append(package) |
| | | |
| | | return result |
| | | |
| | | async def search(self, query: str, by: str = "name-desc") -> list[Package]: |
| | | """Search AUR packages. |
| | | |
| | | Args: |
| | | query: Search query string |
| | | by: Search field (name, name-desc, maintainer, depends, makedepends, optdepends, checkdepends) |
| | | |
| | | Returns: |
| | | List of matching packages |
| | | """ |
| | | params = {"v": 5, "type": "search", "by": by, "arg": query} |
| | | data = await self._request(params) |
| | | |
| | | packages = [] |
| | | for pkg_data in data.get("results", []): |
| | | package = Package.from_rpc(pkg_data) |
| | | self._set_cached(package) |
| | | packages.append(package) |
| | | |
| | | return packages |
| | | |
| | | async def is_available(self, name: str) -> bool: |
| | | """Check if a package exists in the AUR. |
| | | |
| | | Args: |
| | | name: Package name |
| | | |
| | | Returns: |
| | | True if package exists |
| | | """ |
| | | package = await self.get_package(name) |
| | | return package is not None |
| | | |
| | | def clear_cache(self) -> None: |
| | | """Clear the package cache.""" |
| | | self._cache.clear() |
| New file |
| | |
| | | """Package builder with parallel execution and proper locking.""" |
| | | |
| | | import asyncio |
| | | import fcntl |
| | | import os |
| | | import shutil |
| | | import subprocess |
| | | from dataclasses import dataclass, field |
| | | from datetime import datetime |
| | | from enum import Enum |
| | | from pathlib import Path |
| | | from concurrent.futures import ProcessPoolExecutor |
| | | from typing import Any |
| | | |
| | | from archbuild.aur import AURClient |
| | | from archbuild.config import Config, PackageOverride |
| | | from archbuild.logging import get_logger |
| | | from archbuild.resolver import DependencyResolver |
| | | |
| | | logger = get_logger("builder") |
| | | |
| | | |
| | | class BuildStatus(Enum): |
| | | """Build status for a package.""" |
| | | |
| | | PENDING = "pending" |
| | | BUILDING = "building" |
| | | SUCCESS = "success" |
| | | FAILED = "failed" |
| | | SKIPPED = "skipped" |
| | | |
| | | |
| | | @dataclass |
| | | class BuildResult: |
| | | """Result of a package build.""" |
| | | |
| | | package: str |
| | | status: BuildStatus |
| | | version: str | None = None |
| | | duration: float = 0.0 |
| | | error: str | None = None |
| | | artifacts: list[Path] = field(default_factory=list) |
| | | timestamp: datetime = field(default_factory=datetime.now) |
| | | |
| | | |
| | | class FileLock: |
| | | """Context manager for file-based locking using flock.""" |
| | | |
| | | def __init__(self, path: Path): |
| | | """Initialize lock on file path. |
| | | |
| | | Args: |
| | | path: Path to lock file |
| | | """ |
| | | self.path = path |
| | | self.fd: int | None = None |
| | | |
| | | def __enter__(self) -> "FileLock": |
| | | """Acquire exclusive lock.""" |
| | | self.path.parent.mkdir(parents=True, exist_ok=True) |
| | | self.fd = os.open(str(self.path), os.O_RDWR | os.O_CREAT) |
| | | fcntl.flock(self.fd, fcntl.LOCK_EX) |
| | | logger.debug(f"Acquired lock: {self.path}") |
| | | return self |
| | | |
| | | def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: |
| | | """Release lock.""" |
| | | if self.fd is not None: |
| | | fcntl.flock(self.fd, fcntl.LOCK_UN) |
| | | os.close(self.fd) |
| | | logger.debug(f"Released lock: {self.path}") |
| | | |
| | | |
| | | def _run_makepkg( |
| | | package_dir: Path, |
| | | sign: bool = False, |
| | | key: str = "", |
| | | clean: bool = True, |
| | | skip_checksums: bool = False, |
| | | extra_args: list[str] | None = None, |
| | | env_overrides: dict[str, str] | None = None, |
| | | ) -> tuple[bool, str, list[Path]]: |
| | | """Run makepkg in a subprocess. |
| | | |
| | | This runs in a separate process for parallelization. |
| | | |
| | | Args: |
| | | package_dir: Directory containing PKGBUILD |
| | | sign: Whether to sign packages |
| | | key: GPG key for signing |
| | | clean: Clean build directory after build |
| | | skip_checksums: Skip checksum verification |
| | | extra_args: Additional makepkg arguments |
| | | env_overrides: Environment variable overrides |
| | | |
| | | Returns: |
| | | Tuple of (success, error_message, artifact_paths) |
| | | """ |
| | | cmd = ["makepkg", "-s", "--noconfirm"] |
| | | |
| | | if clean: |
| | | cmd.append("-c") |
| | | if sign and key: |
| | | cmd.extend(["--sign", "--key", key]) |
| | | if skip_checksums: |
| | | cmd.append("--skipchecksums") |
| | | if extra_args: |
| | | cmd.extend(extra_args) |
| | | |
| | | env = os.environ.copy() |
| | | if env_overrides: |
| | | env.update(env_overrides) |
| | | |
| | | try: |
| | | result = subprocess.run( |
| | | cmd, |
| | | cwd=package_dir, |
| | | capture_output=True, |
| | | text=True, |
| | | env=env, |
| | | timeout=3600, # 1 hour timeout |
| | | ) |
| | | |
| | | if result.returncode != 0: |
| | | return False, result.stderr or result.stdout, [] |
| | | |
| | | # Find built packages |
| | | artifacts = list(package_dir.glob("*.pkg.tar.*")) |
| | | artifacts = [a for a in artifacts if not a.name.endswith(".sig")] |
| | | |
| | | return True, "", artifacts |
| | | |
| | | except subprocess.TimeoutExpired: |
| | | return False, "Build timed out after 1 hour", [] |
| | | except Exception as e: |
| | | return False, str(e), [] |
| | | |
| | | |
| | | class Builder: |
| | | """Package builder with parallel execution support.""" |
| | | |
| | | def __init__( |
| | | self, |
| | | config: Config, |
| | | aur_client: AURClient, |
| | | ): |
| | | """Initialize builder. |
| | | |
| | | Args: |
| | | config: Application configuration |
| | | aur_client: AUR client for package info |
| | | """ |
| | | self.config = config |
| | | self.aur_client = aur_client |
| | | self.resolver = DependencyResolver(aur_client) |
| | | self._lock_dir = config.repository.build_dir / ".locks" |
| | | self._executor: ProcessPoolExecutor | None = None |
| | | |
| | | async def __aenter__(self) -> "Builder": |
| | | """Async context manager entry.""" |
| | | max_workers = self.config.building.max_workers if self.config.building.parallel else 1 |
| | | self._executor = ProcessPoolExecutor(max_workers=max_workers) |
| | | logger.info(f"Builder initialized with {max_workers} workers") |
| | | return self |
| | | |
| | | async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: |
| | | """Async context manager exit.""" |
| | | if self._executor: |
| | | self._executor.shutdown(wait=True) |
| | | self._executor = None |
| | | |
| | | def _get_lock_path(self, package: str) -> Path: |
| | | """Get lock file path for package. |
| | | |
| | | Args: |
| | | package: Package name |
| | | |
| | | Returns: |
| | | Path to lock file |
| | | """ |
| | | return self._lock_dir / f"{package}.lock" |
| | | |
| | | def _get_package_dir(self, package: str) -> Path: |
| | | """Get build directory for package. |
| | | |
| | | Args: |
| | | package: Package name |
| | | |
| | | Returns: |
| | | Path to package build directory |
| | | """ |
| | | return self.config.repository.build_dir / package |
| | | |
| | | def _get_override(self, package: str) -> PackageOverride: |
| | | """Get package-specific overrides. |
| | | |
| | | Args: |
| | | package: Package name |
| | | |
| | | Returns: |
| | | PackageOverride (default if not specified) |
| | | """ |
| | | # Check for package-specific override first, then fall back to _default |
| | | if package in self.config.package_overrides: |
| | | return self.config.package_overrides[package] |
| | | return self.config.package_overrides.get("_default", PackageOverride()) |
| | | |
| | | async def _clone_or_update(self, package: str) -> bool: |
| | | """Clone or update package from AUR. |
| | | |
| | | Args: |
| | | package: Package name |
| | | |
| | | Returns: |
| | | True if there were updates (or new clone) |
| | | """ |
| | | pkg_dir = self._get_package_dir(package) |
| | | |
| | | if pkg_dir.exists(): |
| | | # Update existing repo |
| | | result = subprocess.run( |
| | | ["git", "reset", "--hard"], |
| | | cwd=pkg_dir, |
| | | capture_output=True, |
| | | ) |
| | | result = subprocess.run( |
| | | ["git", "pull"], |
| | | cwd=pkg_dir, |
| | | capture_output=True, |
| | | text=True, |
| | | ) |
| | | return "Already up to date" not in result.stdout |
| | | else: |
| | | # Clone new repo |
| | | pkg_info = await self.aur_client.get_package(package) |
| | | if not pkg_info: |
| | | raise ValueError(f"Package not found in AUR: {package}") |
| | | |
| | | pkg_dir.parent.mkdir(parents=True, exist_ok=True) |
| | | subprocess.run( |
| | | ["git", "clone", pkg_info.git_url, str(pkg_dir)], |
| | | check=True, |
| | | capture_output=True, |
| | | ) |
| | | return True |
| | | |
| | | def _is_vcs_package(self, package_dir: Path) -> bool: |
| | | """Check if package is a VCS package (needs rebuild on each run). |
| | | |
| | | Args: |
| | | package_dir: Path to package directory |
| | | |
| | | Returns: |
| | | True if VCS package |
| | | """ |
| | | pkgbuild = package_dir / "PKGBUILD" |
| | | if not pkgbuild.exists(): |
| | | return False |
| | | |
| | | content = pkgbuild.read_text() |
| | | return "pkgver()" in content |
| | | |
| | | async def build_package( |
| | | self, |
| | | package: str, |
| | | force: bool = False, |
| | | ) -> BuildResult: |
| | | """Build a single package. |
| | | |
| | | Args: |
| | | package: Package name |
| | | force: Force rebuild even if up to date |
| | | |
| | | Returns: |
| | | BuildResult with status and artifacts |
| | | """ |
| | | start_time = datetime.now() |
| | | pkg_dir = self._get_package_dir(package) |
| | | override = self._get_override(package) |
| | | |
| | | logger.info(f"Building package: {package}") |
| | | |
| | | try: |
| | | # Clone or update |
| | | has_updates = await self._clone_or_update(package) |
| | | is_vcs = self._is_vcs_package(pkg_dir) |
| | | |
| | | # Skip if no updates and not forced |
| | | if not has_updates and not is_vcs and not force: |
| | | logger.info(f"Skipping {package}: already up to date") |
| | | return BuildResult( |
| | | package=package, |
| | | status=BuildStatus.SKIPPED, |
| | | duration=(datetime.now() - start_time).total_seconds(), |
| | | ) |
| | | |
| | | # Run build with retries |
| | | last_error = "" |
| | | for attempt in range(self.config.building.retry_attempts): |
| | | if attempt > 0: |
| | | delay = self.config.building.retry_delay * (2 ** (attempt - 1)) |
| | | logger.warning( |
| | | f"Retrying {package} (attempt {attempt + 1}/" |
| | | f"{self.config.building.retry_attempts}) after {delay}s" |
| | | ) |
| | | await asyncio.sleep(delay) |
| | | |
| | | # Run makepkg in executor |
| | | loop = asyncio.get_event_loop() |
| | | success, error, artifacts = await loop.run_in_executor( |
| | | self._executor, |
| | | _run_makepkg, |
| | | pkg_dir, |
| | | self.config.signing.enabled, |
| | | self.config.signing.key, |
| | | self.config.building.clean, |
| | | override.skip_checksums, |
| | | override.extra_args, |
| | | override.env, |
| | | ) |
| | | |
| | | if success: |
| | | duration = (datetime.now() - start_time).total_seconds() |
| | | logger.info(f"Successfully built {package} in {duration:.1f}s") |
| | | return BuildResult( |
| | | package=package, |
| | | status=BuildStatus.SUCCESS, |
| | | duration=duration, |
| | | artifacts=artifacts, |
| | | ) |
| | | |
| | | last_error = error |
| | | |
| | | # All retries failed |
| | | duration = (datetime.now() - start_time).total_seconds() |
| | | logger.error(f"Failed to build {package}: {last_error}") |
| | | return BuildResult( |
| | | package=package, |
| | | status=BuildStatus.FAILED, |
| | | duration=duration, |
| | | error=last_error, |
| | | ) |
| | | |
| | | except Exception as e: |
| | | duration = (datetime.now() - start_time).total_seconds() |
| | | logger.exception(f"Error building {package}") |
| | | return BuildResult( |
| | | package=package, |
| | | status=BuildStatus.FAILED, |
| | | duration=duration, |
| | | error=str(e), |
| | | ) |
| | | |
| | | async def build_all( |
| | | self, |
| | | force: bool = False, |
| | | ) -> list[BuildResult]: |
| | | """Build all packages in build directory. |
| | | |
| | | Args: |
| | | force: Force rebuild all packages |
| | | |
| | | Returns: |
| | | List of build results |
| | | """ |
| | | # Update system if configured |
| | | if self.config.building.update_system: |
| | | logger.info("Updating system...") |
| | | subprocess.run( |
| | | ["sudo", "pacman", "-Syu", "--noconfirm"], |
| | | check=False, |
| | | ) |
| | | |
| | | # Find all packages |
| | | build_dir = self.config.repository.build_dir |
| | | packages = [ |
| | | d.name for d in build_dir.iterdir() |
| | | if d.is_dir() and not d.name.startswith(".") |
| | | ] |
| | | |
| | | if not packages: |
| | | logger.warning("No packages found in build directory") |
| | | return [] |
| | | |
| | | logger.info(f"Building {len(packages)} packages") |
| | | |
| | | # Build in parallel or sequentially |
| | | if self.config.building.parallel: |
| | | tasks = [self.build_package(pkg, force) for pkg in packages] |
| | | results = await asyncio.gather(*tasks) |
| | | else: |
| | | results = [] |
| | | for pkg in packages: |
| | | result = await self.build_package(pkg, force) |
| | | results.append(result) |
| | | |
| | | # Summary |
| | | success = sum(1 for r in results if r.status == BuildStatus.SUCCESS) |
| | | failed = sum(1 for r in results if r.status == BuildStatus.FAILED) |
| | | skipped = sum(1 for r in results if r.status == BuildStatus.SKIPPED) |
| | | |
| | | logger.info(f"Build complete: {success} succeeded, {failed} failed, {skipped} skipped") |
| | | |
| | | return list(results) |
| | | |
| | | async def add_package(self, package: str) -> BuildResult: |
| | | """Add and build a new package with dependencies. |
| | | |
| | | Args: |
| | | package: Package name |
| | | |
| | | Returns: |
| | | BuildResult for the main package |
| | | """ |
| | | logger.info(f"Adding package: {package}") |
| | | |
| | | # Resolve dependencies |
| | | build_order = await self.resolver.resolve([package]) |
| | | |
| | | # Build dependencies first |
| | | results: list[BuildResult] = [] |
| | | for dep in build_order: |
| | | if dep != package: |
| | | logger.info(f"Building dependency: {dep}") |
| | | result = await self.build_package(dep, force=True) |
| | | results.append(result) |
| | | |
| | | if result.status == BuildStatus.FAILED: |
| | | logger.error(f"Dependency {dep} failed, aborting") |
| | | return BuildResult( |
| | | package=package, |
| | | status=BuildStatus.FAILED, |
| | | error=f"Dependency {dep} failed to build", |
| | | ) |
| | | |
| | | # Build main package |
| | | return await self.build_package(package, force=True) |
| | | |
| | | def remove_package(self, package: str) -> bool: |
| | | """Remove a package from the build directory. |
| | | |
| | | Args: |
| | | package: Package name |
| | | |
| | | Returns: |
| | | True if removed successfully |
| | | """ |
| | | pkg_dir = self._get_package_dir(package) |
| | | |
| | | if pkg_dir.exists(): |
| | | shutil.rmtree(pkg_dir) |
| | | logger.info(f"Removed package: {package}") |
| | | return True |
| | | |
| | | logger.warning(f"Package not found: {package}") |
| | | return False |
| New file |
| | |
| | | """Command-line interface using Click.""" |
| | | |
| | | import asyncio |
| | | import sys |
| | | from pathlib import Path |
| | | from typing import Any |
| | | |
| | | import click |
| | | from rich.console import Console |
| | | from rich.table import Table |
| | | |
| | | from archbuild import __version__ |
| | | from archbuild.aur import AURClient |
| | | from archbuild.builder import Builder, BuildStatus |
| | | from archbuild.config import Config, load_config, migrate_vars_sh, save_config |
| | | from archbuild.logging import console as log_console, setup_logging |
| | | from archbuild.notifications import NotificationManager |
| | | from archbuild.repo import RepoManager |
| | | |
| | | console = Console() |
| | | |
| | | |
| | | def run_async(coro: Any) -> Any: |
| | | """Run async function in sync context.""" |
| | | return asyncio.get_event_loop().run_until_complete(coro) |
| | | |
| | | |
| | | class Context: |
| | | """CLI context holding shared state.""" |
| | | |
| | | def __init__(self, config_path: Path): |
| | | self.config_path = config_path |
| | | self._config: Config | None = None |
| | | |
| | | @property |
| | | def config(self) -> Config: |
| | | if self._config is None: |
| | | self._config = load_config(self.config_path) |
| | | setup_logging(self._config.log_level, self._config.log_file) |
| | | return self._config |
| | | |
| | | |
| | | pass_context = click.make_pass_decorator(Context) |
| | | |
| | | |
| | | @click.group() |
| | | @click.option( |
| | | "-c", "--config", |
| | | type=click.Path(exists=False, path_type=Path), |
| | | default=Path("config.yaml"), |
| | | help="Path to configuration file", |
| | | ) |
| | | @click.version_option(__version__, prog_name="archbuild") |
| | | @click.pass_context |
| | | def cli(ctx: click.Context, config: Path) -> None: |
| | | """Archbuild - Automatic AUR package building and repository management. |
| | | |
| | | A modern, sustainable replacement for legacy Bash-based AUR build systems. |
| | | """ |
| | | ctx.obj = Context(config) |
| | | |
| | | |
| | | @cli.command() |
| | | @click.option("--force", "-f", is_flag=True, help="Force rebuild all packages") |
| | | @pass_context |
| | | def build_all(ctx: Context, force: bool) -> None: |
| | | """Build all packages in the build directory.""" |
| | | config = ctx.config |
| | | |
| | | async def _build_all() -> None: |
| | | async with AURClient() as aur: |
| | | async with Builder(config, aur) as builder: |
| | | results = await builder.build_all(force=force) |
| | | |
| | | # Add to repository |
| | | repo = RepoManager(config) |
| | | for result in results: |
| | | if result.status == BuildStatus.SUCCESS: |
| | | repo.add_packages(result) |
| | | |
| | | # Send notifications |
| | | notifier = NotificationManager(config) |
| | | await notifier.notify(results) |
| | | |
| | | # Print summary |
| | | _print_results(results) |
| | | |
| | | run_async(_build_all()) |
| | | |
| | | |
| | | @cli.command() |
| | | @click.argument("package") |
| | | @click.option("--force", "-f", is_flag=True, help="Force rebuild package") |
| | | @pass_context |
| | | def build(ctx: Context, package: str, force: bool) -> None: |
| | | """Build a specific package in the build directory.""" |
| | | config = ctx.config |
| | | |
| | | async def _build() -> None: |
| | | async with AURClient() as aur: |
| | | async with Builder(config, aur) as builder: |
| | | result = await builder.build_package(package, force=force) |
| | | |
| | | if result.status == BuildStatus.SUCCESS: |
| | | repo = RepoManager(config) |
| | | repo.add_packages(result) |
| | | |
| | | # Send notifications |
| | | notifier = NotificationManager(config) |
| | | await notifier.notify([result]) |
| | | |
| | | # Print summary |
| | | _print_results([result]) |
| | | |
| | | run_async(_build()) |
| | | |
| | | |
| | | @cli.command() |
| | | @click.argument("packages", nargs=-1, required=True) |
| | | @pass_context |
| | | def add(ctx: Context, packages: tuple[str, ...]) -> None: |
| | | """Add and build new packages from the AUR.""" |
| | | config = ctx.config |
| | | |
| | | async def _add() -> None: |
| | | async with AURClient() as aur: |
| | | async with Builder(config, aur) as builder: |
| | | repo = RepoManager(config) |
| | | |
| | | results = [] |
| | | for package in packages: |
| | | console.print(f"[bold blue]Adding package:[/] {package}") |
| | | result = await builder.add_package(package) |
| | | results.append(result) |
| | | |
| | | if result.status == BuildStatus.SUCCESS: |
| | | repo.add_packages(result) |
| | | console.print(f"[green]✓[/] {package} added successfully") |
| | | else: |
| | | console.print(f"[red]✗[/] {package} failed: {result.error}") |
| | | |
| | | _print_results(results) |
| | | |
| | | run_async(_add()) |
| | | |
| | | |
| | | @cli.command() |
| | | @click.argument("packages", nargs=-1, required=True) |
| | | @click.option("--all-official", "-a", is_flag=True, help="Remove packages that moved to official repos") |
| | | @pass_context |
| | | def remove(ctx: Context, packages: tuple[str, ...], all_official: bool) -> None: |
| | | """Remove packages from the repository and build directory.""" |
| | | config = ctx.config |
| | | |
| | | async def _remove() -> None: |
| | | async with AURClient() as aur: |
| | | async with Builder(config, aur) as builder: |
| | | repo = RepoManager(config) |
| | | |
| | | if all_official: |
| | | # Find packages now in official repos |
| | | from archbuild.resolver import DependencyResolver |
| | | resolver = DependencyResolver(aur) |
| | | |
| | | for pkg in repo.list_packages(): |
| | | if resolver.is_in_official_repos(pkg.name): |
| | | console.print(f"[yellow]Removing {pkg.name}[/] (now in official repos)") |
| | | builder.remove_package(pkg.name) |
| | | repo.remove_package(pkg.name) |
| | | else: |
| | | for package in packages: |
| | | builder.remove_package(package) |
| | | repo.remove_package(package) |
| | | console.print(f"[green]✓[/] Removed {package}") |
| | | |
| | | run_async(_remove()) |
| | | |
| | | |
| | | @cli.command() |
| | | @pass_context |
| | | def check(ctx: Context) -> None: |
| | | """Check for packages moved to official repos or removed from AUR.""" |
| | | config = ctx.config |
| | | |
| | | async def _check() -> None: |
| | | async with AURClient() as aur: |
| | | from archbuild.resolver import DependencyResolver |
| | | resolver = DependencyResolver(aur) |
| | | repo = RepoManager(config) |
| | | |
| | | packages = repo.list_packages() |
| | | in_official: list[str] = [] |
| | | not_in_aur: list[str] = [] |
| | | |
| | | with console.status("Checking packages..."): |
| | | for pkg in packages: |
| | | if resolver.is_in_official_repos(pkg.name): |
| | | in_official.append(pkg.name) |
| | | elif not await aur.is_available(pkg.name): |
| | | not_in_aur.append(pkg.name) |
| | | |
| | | if in_official: |
| | | console.print("\n[yellow]Packages now in official repos:[/]") |
| | | for pkg in in_official: |
| | | console.print(f" • {pkg}") |
| | | |
| | | if not_in_aur: |
| | | console.print("\n[red]Packages not found in AUR:[/]") |
| | | for pkg in not_in_aur: |
| | | console.print(f" • {pkg}") |
| | | |
| | | if not in_official and not not_in_aur: |
| | | console.print("[green]All packages OK[/]") |
| | | |
| | | run_async(_check()) |
| | | |
| | | |
| | | @cli.command() |
| | | @pass_context |
| | | def remake(ctx: Context) -> None: |
| | | """Rebuild the repository database from scratch.""" |
| | | config = ctx.config |
| | | repo = RepoManager(config) |
| | | |
| | | if repo.rebuild_database(): |
| | | console.print("[green]Repository database rebuilt successfully[/]") |
| | | else: |
| | | console.print("[red]Failed to rebuild repository database[/]") |
| | | sys.exit(1) |
| | | |
| | | |
| | | @cli.command() |
| | | @pass_context |
| | | def cleanup(ctx: Context) -> None: |
| | | """Clean up old package versions based on retention settings.""" |
| | | config = ctx.config |
| | | repo = RepoManager(config) |
| | | |
| | | removed = repo.cleanup() |
| | | console.print(f"[green]Removed {removed} old package version(s)[/]") |
| | | |
| | | |
| | | @cli.command("list") |
| | | @pass_context |
| | | def list_packages(ctx: Context) -> None: |
| | | """List all packages in the repository.""" |
| | | config = ctx.config |
| | | repo = RepoManager(config) |
| | | |
| | | packages = repo.list_packages() |
| | | |
| | | if not packages: |
| | | console.print("[yellow]No packages in repository[/]") |
| | | return |
| | | |
| | | table = Table(title=f"Packages in {config.repository.name}") |
| | | table.add_column("Name", style="cyan") |
| | | table.add_column("Version", style="green") |
| | | table.add_column("Size", justify="right") |
| | | table.add_column("Modified", style="dim") |
| | | |
| | | for pkg in packages: |
| | | size_mb = pkg.size / (1024 * 1024) |
| | | table.add_row( |
| | | pkg.name, |
| | | pkg.version, |
| | | f"{size_mb:.1f} MB", |
| | | pkg.modified.strftime("%Y-%m-%d %H:%M"), |
| | | ) |
| | | |
| | | console.print(table) |
| | | |
| | | |
| | | @cli.command() |
| | | @pass_context |
| | | def test_notifications(ctx: Context) -> None: |
| | | """Test notification configuration by sending test messages.""" |
| | | config = ctx.config |
| | | |
| | | async def _test() -> None: |
| | | notifier = NotificationManager(config) |
| | | results = await notifier.test() |
| | | |
| | | for backend, success in results.items(): |
| | | if success: |
| | | console.print(f"[green]✓[/] {backend}: OK") |
| | | else: |
| | | console.print(f"[red]✗[/] {backend}: Failed") |
| | | |
| | | run_async(_test()) |
| | | |
| | | |
| | | @cli.command() |
| | | @click.argument("vars_file", type=click.Path(exists=True, path_type=Path)) |
| | | @click.option( |
| | | "-o", "--output", |
| | | type=click.Path(path_type=Path), |
| | | default=Path("config.yaml"), |
| | | help="Output config file path", |
| | | ) |
| | | def migrate_config(vars_file: Path, output: Path) -> None: |
| | | """Migrate legacy vars.sh to new YAML config format.""" |
| | | console.print(f"[blue]Migrating {vars_file} to {output}...[/]") |
| | | |
| | | try: |
| | | data = migrate_vars_sh(vars_file) |
| | | config = Config.model_validate(data) |
| | | save_config(config, output) |
| | | console.print(f"[green]✓[/] Configuration saved to {output}") |
| | | console.print("[yellow]Note:[/] Please review and update the generated config.") |
| | | except Exception as e: |
| | | console.print(f"[red]Migration failed:[/] {e}") |
| | | sys.exit(1) |
| | | |
| | | |
| | | @cli.command() |
| | | @pass_context |
| | | def init(ctx: Context) -> None: |
| | | """Initialize repository directories and configuration.""" |
| | | config = ctx.config |
| | | |
| | | # Create directories |
| | | config.repository.path.mkdir(parents=True, exist_ok=True) |
| | | config.repository.build_dir.mkdir(parents=True, exist_ok=True) |
| | | |
| | | repo = RepoManager(config) |
| | | repo.ensure_repo_exists() |
| | | |
| | | console.print(f"[green]✓[/] Repository initialized at {config.repository.path}") |
| | | console.print(f"[green]✓[/] Build directory: {config.repository.build_dir}") |
| | | |
| | | # Check pacman.conf |
| | | pacman_conf = Path("/etc/pacman.conf") |
| | | if pacman_conf.exists(): |
| | | content = pacman_conf.read_text() |
| | | if config.repository.name not in content: |
| | | console.print( |
| | | f"\n[yellow]Note:[/] Add this to /etc/pacman.conf:\n" |
| | | f"[{config.repository.name}]\n" |
| | | f"SigLevel = Optional TrustAll\n" |
| | | f"Server = file://{config.repository.path}" |
| | | ) |
| | | |
| | | |
| | | def _print_results(results: list[Any]) -> None: |
| | | """Print build results summary table.""" |
| | | if not results: |
| | | return |
| | | |
| | | table = Table(title="Build Results") |
| | | table.add_column("Package", style="cyan") |
| | | table.add_column("Status") |
| | | table.add_column("Duration", justify="right") |
| | | table.add_column("Error", style="dim", max_width=40) |
| | | |
| | | for result in results: |
| | | status_style = { |
| | | BuildStatus.SUCCESS: "[green]✓ Success[/]", |
| | | BuildStatus.FAILED: "[red]✗ Failed[/]", |
| | | BuildStatus.SKIPPED: "[yellow]⏭ Skipped[/]", |
| | | BuildStatus.PENDING: "[blue]⏳ Pending[/]", |
| | | BuildStatus.BUILDING: "[blue]⚙ Building[/]", |
| | | } |
| | | |
| | | table.add_row( |
| | | result.package, |
| | | status_style.get(result.status, str(result.status)), |
| | | f"{result.duration:.1f}s", |
| | | (result.error or "")[:40] if result.error else "", |
| | | ) |
| | | |
| | | console.print(table) |
| | | |
| | | |
| | | def main() -> None: |
| | | """Entry point for the CLI.""" |
| | | cli() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | main() |
| New file |
| | |
| | | """Configuration loading and validation using Pydantic.""" |
| | | |
| | | from pathlib import Path |
| | | from typing import Any |
| | | |
| | | import yaml |
| | | from pydantic import BaseModel, Field, field_validator |
| | | |
| | | |
| | | class RepositoryConfig(BaseModel): |
| | | """Repository settings.""" |
| | | |
| | | name: str = Field(..., description="Repository name") |
| | | path: Path = Field(..., description="Path to repository directory") |
| | | build_dir: Path = Field(..., description="Path to build directory") |
| | | compression: str = Field(default="zst", description="Compression format") |
| | | |
| | | |
| | | class BuildingConfig(BaseModel): |
| | | """Build settings.""" |
| | | |
| | | parallel: bool = Field(default=True, description="Enable parallel builds") |
| | | max_workers: int = Field(default=4, ge=1, le=32, description="Maximum parallel workers") |
| | | clean: bool = Field(default=True, description="Clean build directory after build") |
| | | update_system: bool = Field(default=False, description="Update system before building") |
| | | retry_attempts: int = Field(default=3, ge=1, le=10, description="Retry attempts on failure") |
| | | retry_delay: int = Field(default=5, ge=1, description="Base delay between retries (seconds)") |
| | | |
| | | |
| | | class SigningConfig(BaseModel): |
| | | """Package signing settings.""" |
| | | |
| | | enabled: bool = Field(default=False, description="Enable package signing") |
| | | key: str = Field(default="", description="GPG key ID for signing") |
| | | |
| | | |
| | | class EmailConfig(BaseModel): |
| | | """Email notification settings.""" |
| | | |
| | | enabled: bool = Field(default=False, description="Enable email notifications") |
| | | to: str = Field(default="", description="Recipient email address") |
| | | from_addr: str = Field(default="", alias="from", description="Sender email address") |
| | | smtp_host: str = Field(default="localhost", description="SMTP server host") |
| | | smtp_port: int = Field(default=25, description="SMTP server port") |
| | | use_tls: bool = Field(default=False, description="Use TLS for SMTP") |
| | | username: str = Field(default="", description="SMTP username") |
| | | password: str = Field(default="", description="SMTP password") |
| | | |
| | | |
| | | class WebhookConfig(BaseModel): |
| | | """Webhook notification settings (extensible for future use).""" |
| | | |
| | | enabled: bool = Field(default=False, description="Enable webhook notifications") |
| | | url: str = Field(default="", description="Webhook URL") |
| | | type: str = Field(default="generic", description="Webhook type (generic, discord, slack)") |
| | | |
| | | |
| | | class NotificationsConfig(BaseModel): |
| | | """Notification settings.""" |
| | | |
| | | email: EmailConfig = Field(default_factory=EmailConfig) |
| | | webhooks: list[WebhookConfig] = Field(default_factory=list) |
| | | |
| | | |
| | | class PackageRetentionConfig(BaseModel): |
| | | """Package retention settings.""" |
| | | |
| | | keep_versions: int = Field(default=3, ge=1, le=100, description="Number of old versions to keep") |
| | | cleanup_on_build: bool = Field(default=True, description="Clean old versions after build") |
| | | |
| | | |
| | | class PackageOverride(BaseModel): |
| | | """Per-package build overrides.""" |
| | | |
| | | skip_checksums: bool = Field(default=False, description="Skip checksum verification") |
| | | extra_args: list[str] = Field(default_factory=list, description="Extra makepkg arguments") |
| | | env: dict[str, str] = Field(default_factory=dict, description="Environment variables") |
| | | |
| | | |
| | | class Config(BaseModel): |
| | | """Main configuration model.""" |
| | | |
| | | repository: RepositoryConfig |
| | | building: BuildingConfig = Field(default_factory=BuildingConfig) |
| | | signing: SigningConfig = Field(default_factory=SigningConfig) |
| | | notifications: NotificationsConfig = Field(default_factory=NotificationsConfig) |
| | | retention: PackageRetentionConfig = Field(default_factory=PackageRetentionConfig) |
| | | package_overrides: dict[str, PackageOverride] = Field( |
| | | default_factory=dict, description="Per-package overrides" |
| | | ) |
| | | log_level: str = Field(default="INFO", description="Logging level") |
| | | log_file: Path | None = Field(default=None, description="Optional log file path") |
| | | |
| | | @field_validator("log_level") |
| | | @classmethod |
| | | def validate_log_level(cls, v: str) -> str: |
| | | valid_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} |
| | | if v.upper() not in valid_levels: |
| | | raise ValueError(f"Invalid log level: {v}. Must be one of {valid_levels}") |
| | | return v.upper() |
| | | |
| | | |
| | | def load_config(path: Path) -> Config: |
| | | """Load and validate configuration from YAML file. |
| | | |
| | | Args: |
| | | path: Path to configuration file |
| | | |
| | | Returns: |
| | | Validated Config object |
| | | |
| | | Raises: |
| | | FileNotFoundError: If config file doesn't exist |
| | | ValidationError: If config is invalid |
| | | """ |
| | | if not path.exists(): |
| | | raise FileNotFoundError(f"Configuration file not found: {path}") |
| | | |
| | | with open(path) as f: |
| | | data = yaml.safe_load(f) |
| | | |
| | | return Config.model_validate(data) |
| | | |
| | | |
| | | def migrate_vars_sh(vars_path: Path) -> dict[str, Any]: |
| | | """Migrate old vars.sh to new config format. |
| | | |
| | | Args: |
| | | vars_path: Path to vars.sh file |
| | | |
| | | Returns: |
| | | Dictionary suitable for Config.model_validate() |
| | | """ |
| | | variables: dict[str, str] = {} |
| | | |
| | | with open(vars_path) as f: |
| | | for line in f: |
| | | line = line.strip() |
| | | if line and not line.startswith("#") and "=" in line: |
| | | # Handle export statements |
| | | if line.startswith("export "): |
| | | line = line[7:] |
| | | key, _, value = line.partition("=") |
| | | # Remove quotes |
| | | value = value.strip("'\"") |
| | | variables[key] = value |
| | | |
| | | # Convert to new format |
| | | config: dict[str, Any] = { |
| | | "repository": { |
| | | "name": variables.get("REPONAME", ""), |
| | | "path": variables.get("REPODIR", "/repo/x86_64"), |
| | | "build_dir": variables.get("BUILDDIR", "/repo/build"), |
| | | "compression": variables.get("COMPRESSION", "zst"), |
| | | }, |
| | | "building": { |
| | | "parallel": variables.get("PARALLEL", "N") == "Y", |
| | | "clean": variables.get("CLEAN", "N") == "Y", |
| | | "update_system": variables.get("UPDATE", "N") == "Y", |
| | | }, |
| | | "signing": { |
| | | "enabled": variables.get("SIGN", "N") == "Y", |
| | | "key": variables.get("KEY", ""), |
| | | }, |
| | | "notifications": { |
| | | "email": { |
| | | "enabled": bool(variables.get("TO_EMAIL")), |
| | | "to": variables.get("TO_EMAIL", ""), |
| | | "from": variables.get("FROM_EMAIL", ""), |
| | | } |
| | | }, |
| | | "retention": { |
| | | "keep_versions": int(variables.get("NUM_OLD", "5")), |
| | | }, |
| | | } |
| | | |
| | | return config |
| | | |
| | | |
| | | def save_config(config: Config, path: Path) -> None: |
| | | """Save configuration to YAML file. |
| | | |
| | | Args: |
| | | config: Config object to save |
| | | path: Path to save configuration file |
| | | """ |
| | | data = config.model_dump(by_alias=True, exclude_none=True) |
| | | |
| | | # Convert Path objects to strings for YAML |
| | | def convert_paths(obj: Any) -> Any: |
| | | if isinstance(obj, Path): |
| | | return str(obj) |
| | | elif isinstance(obj, dict): |
| | | return {k: convert_paths(v) for k, v in obj.items()} |
| | | elif isinstance(obj, list): |
| | | return [convert_paths(v) for v in obj] |
| | | return obj |
| | | |
| | | data = convert_paths(data) |
| | | |
| | | with open(path, "w") as f: |
| | | yaml.dump(data, f, default_flow_style=False, sort_keys=False) |
| New file |
| | |
| | | """Structured logging setup using Rich.""" |
| | | |
| | | import logging |
| | | from pathlib import Path |
| | | |
| | | from rich.console import Console |
| | | from rich.logging import RichHandler |
| | | |
| | | console = Console() |
| | | |
| | | |
| | | def setup_logging( |
| | | level: str = "INFO", |
| | | log_file: Path | None = None, |
| | | ) -> logging.Logger: |
| | | """Configure logging with Rich handler for pretty console output. |
| | | |
| | | Args: |
| | | level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) |
| | | log_file: Optional path to log file |
| | | |
| | | Returns: |
| | | Configured logger instance |
| | | """ |
| | | handlers: list[logging.Handler] = [ |
| | | RichHandler( |
| | | console=console, |
| | | rich_tracebacks=True, |
| | | tracebacks_show_locals=True, |
| | | show_time=True, |
| | | show_path=False, |
| | | ) |
| | | ] |
| | | |
| | | if log_file: |
| | | log_file.parent.mkdir(parents=True, exist_ok=True) |
| | | file_handler = logging.FileHandler(log_file) |
| | | file_handler.setFormatter( |
| | | logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s") |
| | | ) |
| | | handlers.append(file_handler) |
| | | |
| | | logging.basicConfig( |
| | | level=getattr(logging, level.upper()), |
| | | format="%(message)s", |
| | | datefmt="[%X]", |
| | | handlers=handlers, |
| | | ) |
| | | |
| | | return logging.getLogger("archbuild") |
| | | |
| | | |
| | | def get_logger(name: str) -> logging.Logger: |
| | | """Get a child logger for a specific module. |
| | | |
| | | Args: |
| | | name: Module name for the logger |
| | | |
| | | Returns: |
| | | Logger instance |
| | | """ |
| | | return logging.getLogger(f"archbuild.{name}") |
| New file |
| | |
| | | """Notification system with email and extensible webhook support.""" |
| | | |
| | | import asyncio |
| | | import smtplib |
| | | import ssl |
| | | from abc import ABC, abstractmethod |
| | | from dataclasses import dataclass |
| | | from datetime import datetime |
| | | from email.mime.multipart import MIMEMultipart |
| | | from email.mime.text import MIMEText |
| | | from typing import Any |
| | | |
| | | import aiohttp |
| | | |
| | | from archbuild.builder import BuildResult, BuildStatus |
| | | from archbuild.config import Config, EmailConfig, WebhookConfig |
| | | from archbuild.logging import get_logger |
| | | |
| | | logger = get_logger("notifications") |
| | | |
| | | |
| | | @dataclass |
| | | class BuildSummary: |
| | | """Summary of build results for notifications.""" |
| | | |
| | | total: int |
| | | success: int |
| | | failed: int |
| | | skipped: int |
| | | failed_packages: list[str] |
| | | duration: float |
| | | timestamp: datetime |
| | | |
| | | @classmethod |
| | | def from_results(cls, results: list[BuildResult]) -> "BuildSummary": |
| | | """Create summary from build results.""" |
| | | return cls( |
| | | total=len(results), |
| | | success=sum(1 for r in results if r.status == BuildStatus.SUCCESS), |
| | | failed=sum(1 for r in results if r.status == BuildStatus.FAILED), |
| | | skipped=sum(1 for r in results if r.status == BuildStatus.SKIPPED), |
| | | failed_packages=[r.package for r in results if r.status == BuildStatus.FAILED], |
| | | duration=sum(r.duration for r in results), |
| | | timestamp=datetime.now(), |
| | | ) |
| | | |
| | | |
| | | class NotificationBackend(ABC): |
| | | """Abstract base class for notification backends.""" |
| | | |
| | | @abstractmethod |
| | | async def send(self, summary: BuildSummary, config: Config) -> bool: |
| | | """Send notification. |
| | | |
| | | Args: |
| | | summary: Build summary |
| | | config: Application config |
| | | |
| | | Returns: |
| | | True if sent successfully |
| | | """ |
| | | pass |
| | | |
| | | |
| | | class EmailBackend(NotificationBackend): |
| | | """Email notification backend.""" |
| | | |
| | | def __init__(self, email_config: EmailConfig): |
| | | """Initialize email backend. |
| | | |
| | | Args: |
| | | email_config: Email configuration |
| | | """ |
| | | self.config = email_config |
| | | |
| | | def _format_message(self, summary: BuildSummary, repo_name: str) -> str: |
| | | """Format email message body. |
| | | |
| | | Args: |
| | | summary: Build summary |
| | | repo_name: Repository name |
| | | |
| | | Returns: |
| | | Formatted message string |
| | | """ |
| | | lines = [ |
| | | f"Build Report for {repo_name}", |
| | | f"Time: {summary.timestamp.strftime('%Y-%m-%d %H:%M:%S')}", |
| | | "", |
| | | f"Total packages: {summary.total}", |
| | | f" Successful: {summary.success}", |
| | | f" Failed: {summary.failed}", |
| | | f" Skipped: {summary.skipped}", |
| | | f"Total duration: {summary.duration:.1f}s", |
| | | ] |
| | | |
| | | if summary.failed_packages: |
| | | lines.extend([ |
| | | "", |
| | | "Failed packages:", |
| | | ]) |
| | | for pkg in summary.failed_packages: |
| | | lines.append(f" - {pkg}") |
| | | |
| | | return "\n".join(lines) |
| | | |
| | | async def send(self, summary: BuildSummary, config: Config) -> bool: |
| | | """Send email notification.""" |
| | | if not self.config.enabled: |
| | | return True |
| | | |
| | | if not self.config.to: |
| | | logger.warning("Email notification enabled but no recipient configured") |
| | | return False |
| | | |
| | | # Only send on failures |
| | | if summary.failed == 0: |
| | | logger.debug("No failures, skipping email notification") |
| | | return True |
| | | |
| | | try: |
| | | msg = MIMEMultipart() |
| | | msg["From"] = self.config.from_addr or f"archbuild@localhost" |
| | | msg["To"] = self.config.to |
| | | msg["Subject"] = f"Build Errors - {config.repository.name}" |
| | | |
| | | body = self._format_message(summary, config.repository.name) |
| | | msg.attach(MIMEText(body, "plain")) |
| | | |
| | | # Send email |
| | | loop = asyncio.get_event_loop() |
| | | await loop.run_in_executor(None, self._send_email, msg) |
| | | |
| | | logger.info(f"Sent email notification to {self.config.to}") |
| | | return True |
| | | |
| | | except Exception as e: |
| | | logger.error(f"Failed to send email: {e}") |
| | | return False |
| | | |
| | | def _send_email(self, msg: MIMEMultipart) -> None: |
| | | """Send email synchronously (called from executor).""" |
| | | if self.config.use_tls: |
| | | context = ssl.create_default_context() |
| | | with smtplib.SMTP_SSL( |
| | | self.config.smtp_host, |
| | | self.config.smtp_port, |
| | | context=context, |
| | | ) as server: |
| | | if self.config.username and self.config.password: |
| | | server.login(self.config.username, self.config.password) |
| | | server.send_message(msg) |
| | | else: |
| | | with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port) as server: |
| | | if self.config.username and self.config.password: |
| | | server.login(self.config.username, self.config.password) |
| | | server.send_message(msg) |
| | | |
| | | |
| | | class WebhookBackend(NotificationBackend): |
| | | """Generic webhook notification backend. |
| | | |
| | | Extensible for Discord, Slack, ntfy, Gotify, etc. |
| | | """ |
| | | |
| | | def __init__(self, webhook_config: WebhookConfig): |
| | | """Initialize webhook backend. |
| | | |
| | | Args: |
| | | webhook_config: Webhook configuration |
| | | """ |
| | | self.config = webhook_config |
| | | |
| | | def _format_payload(self, summary: BuildSummary, repo_name: str) -> dict[str, Any]: |
| | | """Format webhook payload based on webhook type. |
| | | |
| | | Args: |
| | | summary: Build summary |
| | | repo_name: Repository name |
| | | |
| | | Returns: |
| | | Payload dictionary |
| | | """ |
| | | base_content = ( |
| | | f"**Build Report for {repo_name}**\n" |
| | | f"✅ Success: {summary.success} | " |
| | | f"❌ Failed: {summary.failed} | " |
| | | f"⏭️ Skipped: {summary.skipped}" |
| | | ) |
| | | |
| | | if summary.failed_packages: |
| | | base_content += f"\n\nFailed: {', '.join(summary.failed_packages)}" |
| | | |
| | | if self.config.type == "discord": |
| | | return { |
| | | "content": base_content, |
| | | "embeds": [{ |
| | | "title": f"Build Report - {repo_name}", |
| | | "color": 0xFF0000 if summary.failed else 0x00FF00, |
| | | "fields": [ |
| | | {"name": "Success", "value": str(summary.success), "inline": True}, |
| | | {"name": "Failed", "value": str(summary.failed), "inline": True}, |
| | | {"name": "Skipped", "value": str(summary.skipped), "inline": True}, |
| | | ], |
| | | "timestamp": summary.timestamp.isoformat(), |
| | | }], |
| | | } |
| | | |
| | | elif self.config.type == "slack": |
| | | return { |
| | | "text": base_content, |
| | | "blocks": [ |
| | | { |
| | | "type": "section", |
| | | "text": {"type": "mrkdwn", "text": base_content}, |
| | | } |
| | | ], |
| | | } |
| | | |
| | | elif self.config.type == "ntfy": |
| | | return { |
| | | "topic": repo_name, |
| | | "title": f"Build Report - {repo_name}", |
| | | "message": base_content, |
| | | "priority": 5 if summary.failed else 3, |
| | | "tags": ["package", "failed"] if summary.failed else ["package"], |
| | | } |
| | | |
| | | else: # generic |
| | | return { |
| | | "repository": repo_name, |
| | | "summary": { |
| | | "total": summary.total, |
| | | "success": summary.success, |
| | | "failed": summary.failed, |
| | | "skipped": summary.skipped, |
| | | }, |
| | | "failed_packages": summary.failed_packages, |
| | | "timestamp": summary.timestamp.isoformat(), |
| | | } |
| | | |
| | | async def send(self, summary: BuildSummary, config: Config) -> bool: |
| | | """Send webhook notification.""" |
| | | if not self.config.enabled: |
| | | return True |
| | | |
| | | if not self.config.url: |
| | | logger.warning("Webhook notification enabled but no URL configured") |
| | | return False |
| | | |
| | | # Only send on failures (configurable in future) |
| | | if summary.failed == 0: |
| | | logger.debug("No failures, skipping webhook notification") |
| | | return True |
| | | |
| | | try: |
| | | payload = self._format_payload(summary, config.repository.name) |
| | | |
| | | async with aiohttp.ClientSession() as session: |
| | | async with session.post( |
| | | self.config.url, |
| | | json=payload, |
| | | timeout=aiohttp.ClientTimeout(total=30), |
| | | ) as response: |
| | | response.raise_for_status() |
| | | |
| | | logger.info(f"Sent webhook notification to {self.config.url}") |
| | | return True |
| | | |
| | | except Exception as e: |
| | | logger.error(f"Failed to send webhook: {e}") |
| | | return False |
| | | |
| | | |
| | | class NotificationManager: |
| | | """Manage multiple notification backends.""" |
| | | |
| | | def __init__(self, config: Config): |
| | | """Initialize notification manager. |
| | | |
| | | Args: |
| | | config: Application configuration |
| | | """ |
| | | self.config = config |
| | | self.backends: list[NotificationBackend] = [] |
| | | |
| | | # Add email backend if configured |
| | | if config.notifications.email.enabled: |
| | | self.backends.append(EmailBackend(config.notifications.email)) |
| | | |
| | | # Add webhook backends |
| | | for webhook in config.notifications.webhooks: |
| | | if webhook.enabled: |
| | | self.backends.append(WebhookBackend(webhook)) |
| | | |
| | | async def notify(self, results: list[BuildResult]) -> dict[str, bool]: |
| | | """Send notifications for build results. |
| | | |
| | | Args: |
| | | results: List of build results |
| | | |
| | | Returns: |
| | | Dict mapping backend type to success status |
| | | """ |
| | | summary = BuildSummary.from_results(results) |
| | | statuses: dict[str, bool] = {} |
| | | |
| | | for backend in self.backends: |
| | | name = type(backend).__name__ |
| | | try: |
| | | success = await backend.send(summary, self.config) |
| | | statuses[name] = success |
| | | except Exception as e: |
| | | logger.error(f"Notification backend {name} failed: {e}") |
| | | statuses[name] = False |
| | | |
| | | return statuses |
| | | |
| | | async def test(self) -> dict[str, bool]: |
| | | """Send test notification through all backends. |
| | | |
| | | Returns: |
| | | Dict mapping backend type to success status |
| | | """ |
| | | # Create dummy test summary |
| | | test_summary = BuildSummary( |
| | | total=3, |
| | | success=2, |
| | | failed=1, |
| | | skipped=0, |
| | | failed_packages=["test-package"], |
| | | duration=123.45, |
| | | timestamp=datetime.now(), |
| | | ) |
| | | |
| | | statuses: dict[str, bool] = {} |
| | | for backend in self.backends: |
| | | name = type(backend).__name__ |
| | | try: |
| | | success = await backend.send(test_summary, self.config) |
| | | statuses[name] = success |
| | | except Exception as e: |
| | | logger.error(f"Test notification failed for {name}: {e}") |
| | | statuses[name] = False |
| | | |
| | | return statuses |
| New file |
| | |
| | | """Repository management with repo-add/repo-remove wrappers.""" |
| | | |
| | | import shutil |
| | | import subprocess |
| | | from dataclasses import dataclass |
| | | from datetime import datetime |
| | | from pathlib import Path |
| | | from typing import Any |
| | | |
| | | from archbuild.builder import BuildResult, BuildStatus, FileLock |
| | | from archbuild.config import Config |
| | | from archbuild.logging import get_logger |
| | | |
| | | logger = get_logger("repo") |
| | | |
| | | |
| | | @dataclass |
| | | class PackageInfo: |
| | | """Information about a package in the repository.""" |
| | | |
| | | name: str |
| | | version: str |
| | | filename: str |
| | | size: int |
| | | modified: datetime |
| | | |
| | | |
| | | class RepoManager: |
| | | """Manage the pacman repository database.""" |
| | | |
| | | def __init__(self, config: Config): |
| | | """Initialize repository manager. |
| | | |
| | | Args: |
| | | config: Application configuration |
| | | """ |
| | | self.config = config |
| | | self._lock_path = config.repository.path / ".repo.lock" |
| | | |
| | | @property |
| | | def db_path(self) -> Path: |
| | | """Get path to repository database file.""" |
| | | compression = self.config.repository.compression or "zst" |
| | | return self.config.repository.path / f"{self.config.repository.name}.db.tar.{compression}" |
| | | |
| | | def _get_repo_lock(self) -> FileLock: |
| | | """Get lock for repository operations.""" |
| | | return FileLock(self._lock_path) |
| | | |
| | | def _run_repo_command(self, cmd: list[str]) -> subprocess.CompletedProcess[str]: |
| | | """Run a repo-add or repo-remove command. |
| | | |
| | | Args: |
| | | cmd: Command and arguments |
| | | |
| | | Returns: |
| | | Completed process result |
| | | """ |
| | | if self.config.signing.enabled and self.config.signing.key: |
| | | cmd.extend(["--sign", "--key", self.config.signing.key]) |
| | | |
| | | logger.debug(f"Running: {' '.join(cmd)}") |
| | | return subprocess.run( |
| | | cmd, |
| | | capture_output=True, |
| | | text=True, |
| | | cwd=self.config.repository.path, |
| | | ) |
| | | |
| | | def ensure_repo_exists(self) -> None: |
| | | """Ensure repository directory and database exist.""" |
| | | self.config.repository.path.mkdir(parents=True, exist_ok=True) |
| | | |
| | | if not self.db_path.exists(): |
| | | logger.info(f"Creating new repository: {self.config.repository.name}") |
| | | # Create empty database |
| | | result = self._run_repo_command([ |
| | | "repo-add", |
| | | str(self.db_path), |
| | | ]) |
| | | if result.returncode != 0: |
| | | logger.warning(f"Could not create empty database: {result.stderr}") |
| | | |
| | | def add_packages(self, build_result: BuildResult) -> bool: |
| | | """Add built packages to the repository. |
| | | |
| | | Args: |
| | | build_result: Result from package build |
| | | |
| | | Returns: |
| | | True if packages were added successfully |
| | | """ |
| | | if build_result.status != BuildStatus.SUCCESS: |
| | | logger.warning(f"Cannot add {build_result.package}: build was not successful") |
| | | return False |
| | | |
| | | if not build_result.artifacts: |
| | | logger.warning(f"No artifacts to add for {build_result.package}") |
| | | return False |
| | | |
| | | with self._get_repo_lock(): |
| | | self.ensure_repo_exists() |
| | | |
| | | # Remove old versions of this package |
| | | self._remove_old_packages(build_result.package) |
| | | |
| | | # Copy artifacts to repo directory |
| | | copied_files: list[Path] = [] |
| | | for artifact in build_result.artifacts: |
| | | dest = self.config.repository.path / artifact.name |
| | | shutil.copy2(artifact, dest) |
| | | copied_files.append(dest) |
| | | |
| | | # Also copy signature if exists |
| | | sig_path = artifact.with_suffix(artifact.suffix + ".sig") |
| | | if sig_path.exists(): |
| | | shutil.copy2(sig_path, self.config.repository.path / sig_path.name) |
| | | |
| | | # Add to database |
| | | result = self._run_repo_command([ |
| | | "repo-add", |
| | | str(self.db_path), |
| | | ] + [str(f) for f in copied_files]) |
| | | |
| | | if result.returncode != 0: |
| | | logger.error(f"Failed to add packages to database: {result.stderr}") |
| | | return False |
| | | |
| | | logger.info(f"Added {len(copied_files)} package(s) to repository") |
| | | return True |
| | | |
| | | def remove_package(self, package: str) -> bool: |
| | | """Remove a package from the repository. |
| | | |
| | | Args: |
| | | package: Package name to remove |
| | | |
| | | Returns: |
| | | True if removed successfully |
| | | """ |
| | | with self._get_repo_lock(): |
| | | # Remove from database |
| | | result = self._run_repo_command([ |
| | | "repo-remove", |
| | | str(self.db_path), |
| | | package, |
| | | ]) |
| | | |
| | | if result.returncode != 0: |
| | | logger.warning(f"Failed to remove {package} from database: {result.stderr}") |
| | | |
| | | # Remove package files |
| | | removed = 0 |
| | | for f in self.config.repository.path.glob(f"{package}-*.pkg.tar.*"): |
| | | f.unlink() |
| | | removed += 1 |
| | | |
| | | logger.info(f"Removed {package} ({removed} files)") |
| | | return True |
| | | |
| | | def _remove_old_packages(self, package: str) -> int: |
| | | """Remove old versions of a package beyond retention limit. |
| | | |
| | | Args: |
| | | package: Package name |
| | | |
| | | Returns: |
| | | Number of packages removed |
| | | """ |
| | | keep_versions = self.config.retention.keep_versions |
| | | pattern = f"{package}-*.pkg.tar.*" |
| | | |
| | | # Find all package files |
| | | files = list(self.config.repository.path.glob(pattern)) |
| | | files = [f for f in files if not f.name.endswith(".sig")] |
| | | |
| | | if len(files) <= keep_versions: |
| | | return 0 |
| | | |
| | | # Sort by modification time, oldest first |
| | | files.sort(key=lambda f: f.stat().st_mtime) |
| | | |
| | | # Remove oldest files exceeding retention |
| | | to_remove = files[:-keep_versions] if keep_versions > 0 else files |
| | | removed = 0 |
| | | |
| | | for f in to_remove: |
| | | f.unlink() |
| | | # Also remove signature |
| | | sig = f.with_suffix(f.suffix + ".sig") |
| | | if sig.exists(): |
| | | sig.unlink() |
| | | removed += 1 |
| | | |
| | | if removed: |
| | | logger.info(f"Cleaned up {removed} old version(s) of {package}") |
| | | |
| | | return removed |
| | | |
| | | def list_packages(self) -> list[PackageInfo]: |
| | | """List all packages in the repository. |
| | | |
| | | Returns: |
| | | List of PackageInfo objects |
| | | """ |
| | | packages: list[PackageInfo] = [] |
| | | |
| | | for f in self.config.repository.path.glob("*.pkg.tar.*"): |
| | | if f.name.endswith(".sig"): |
| | | continue |
| | | |
| | | # Parse package name and version from filename |
| | | # Format: name-version-rel-arch.pkg.tar.zst |
| | | parts = f.stem.replace(".pkg.tar", "").rsplit("-", 3) |
| | | if len(parts) >= 3: |
| | | name = "-".join(parts[:-2]) |
| | | version = f"{parts[-2]}-{parts[-1]}" if len(parts) > 2 else parts[-1] |
| | | else: |
| | | name = f.stem |
| | | version = "unknown" |
| | | |
| | | stat = f.stat() |
| | | packages.append(PackageInfo( |
| | | name=name, |
| | | version=version, |
| | | filename=f.name, |
| | | size=stat.st_size, |
| | | modified=datetime.fromtimestamp(stat.st_mtime), |
| | | )) |
| | | |
| | | return sorted(packages, key=lambda p: p.name) |
| | | |
| | | def rebuild_database(self) -> bool: |
| | | """Rebuild the repository database from scratch. |
| | | |
| | | Returns: |
| | | True if successful |
| | | """ |
| | | with self._get_repo_lock(): |
| | | logger.info("Rebuilding repository database") |
| | | |
| | | # Remove old database |
| | | for f in self.config.repository.path.glob(f"{self.config.repository.name}.db*"): |
| | | f.unlink() |
| | | for f in self.config.repository.path.glob(f"{self.config.repository.name}.files*"): |
| | | f.unlink() |
| | | |
| | | # Find all packages |
| | | packages = list(self.config.repository.path.glob("*.pkg.tar.*")) |
| | | packages = [p for p in packages if not p.name.endswith(".sig")] |
| | | |
| | | if not packages: |
| | | logger.warning("No packages found to add to database") |
| | | return True |
| | | |
| | | # Add all packages |
| | | result = self._run_repo_command([ |
| | | "repo-add", |
| | | str(self.db_path), |
| | | ] + [str(p) for p in packages]) |
| | | |
| | | if result.returncode != 0: |
| | | logger.error(f"Failed to rebuild database: {result.stderr}") |
| | | return False |
| | | |
| | | logger.info(f"Database rebuilt with {len(packages)} packages") |
| | | return True |
| | | |
| | | def cleanup(self) -> int: |
| | | """Run cleanup on all packages, removing old versions. |
| | | |
| | | Returns: |
| | | Total number of old packages removed |
| | | """ |
| | | # Get unique package names |
| | | packages = self.list_packages() |
| | | unique_names = set(p.name for p in packages) |
| | | |
| | | total_removed = 0 |
| | | for name in unique_names: |
| | | total_removed += self._remove_old_packages(name) |
| | | |
| | | return total_removed |
| | | |
| | | def check_integrity(self) -> list[str]: |
| | | """Check repository integrity. |
| | | |
| | | Returns: |
| | | List of issues found |
| | | """ |
| | | issues: list[str] = [] |
| | | |
| | | # Check database exists |
| | | if not self.db_path.exists(): |
| | | issues.append(f"Database not found: {self.db_path}") |
| | | return issues |
| | | |
| | | # Check for orphaned packages (in dir but not in db) |
| | | # This would require parsing the db, simplified for now |
| | | |
| | | # Check for missing signatures |
| | | if self.config.signing.enabled: |
| | | for pkg in self.config.repository.path.glob("*.pkg.tar.*"): |
| | | if pkg.name.endswith(".sig"): |
| | | continue |
| | | sig = pkg.with_suffix(pkg.suffix + ".sig") |
| | | if not sig.exists(): |
| | | issues.append(f"Missing signature: {pkg.name}") |
| | | |
| | | return issues |
| New file |
| | |
| | | """Dependency resolution with topological sorting.""" |
| | | |
| | | import subprocess |
| | | from collections import defaultdict |
| | | from dataclasses import dataclass, field |
| | | from enum import Enum |
| | | |
| | | from archbuild.aur import AURClient, Package |
| | | from archbuild.logging import get_logger |
| | | |
| | | logger = get_logger("resolver") |
| | | |
| | | |
| | | class DependencyType(Enum): |
| | | """Type of dependency.""" |
| | | |
| | | RUNTIME = "depends" |
| | | BUILD = "makedepends" |
| | | CHECK = "checkdepends" |
| | | |
| | | |
| | | @dataclass |
| | | class Dependency: |
| | | """A package dependency.""" |
| | | |
| | | name: str |
| | | version_constraint: str | None = None |
| | | dep_type: DependencyType = DependencyType.RUNTIME |
| | | is_aur: bool = False |
| | | |
| | | @classmethod |
| | | def parse(cls, dep_string: str, dep_type: DependencyType = DependencyType.RUNTIME) -> "Dependency": |
| | | """Parse dependency string like 'package>=1.0'. |
| | | |
| | | Args: |
| | | dep_string: Dependency string |
| | | dep_type: Type of dependency |
| | | |
| | | Returns: |
| | | Dependency instance |
| | | """ |
| | | # Handle version constraints |
| | | for op in [">=", "<=", "=", ">", "<"]: |
| | | if op in dep_string: |
| | | name, constraint = dep_string.split(op, 1) |
| | | return cls(name=name, version_constraint=f"{op}{constraint}", dep_type=dep_type) |
| | | return cls(name=dep_string, dep_type=dep_type) |
| | | |
| | | |
| | | @dataclass |
| | | class BuildOrder: |
| | | """Ordered list of packages to build.""" |
| | | |
| | | packages: list[str] = field(default_factory=list) |
| | | aur_dependencies: dict[str, list[Dependency]] = field(default_factory=dict) |
| | | |
| | | def __iter__(self): |
| | | return iter(self.packages) |
| | | |
| | | def __len__(self): |
| | | return len(self.packages) |
| | | |
| | | |
| | | class DependencyResolver: |
| | | """Resolve dependencies for AUR packages using topological sort.""" |
| | | |
| | | def __init__(self, aur_client: AURClient): |
| | | """Initialize resolver with AUR client. |
| | | |
| | | Args: |
| | | aur_client: AURClient instance for fetching package info |
| | | """ |
| | | self.aur_client = aur_client |
| | | self._pacman_cache: set[str] = set() |
| | | self._pacman_checked = False |
| | | |
| | | def _refresh_pacman_cache(self) -> None: |
| | | """Refresh cache of packages available from official repos.""" |
| | | try: |
| | | result = subprocess.run( |
| | | ["pacman", "-Ssq"], |
| | | capture_output=True, |
| | | text=True, |
| | | check=True, |
| | | ) |
| | | self._pacman_cache = set(result.stdout.strip().split("\n")) |
| | | self._pacman_checked = True |
| | | logger.debug(f"Cached {len(self._pacman_cache)} packages from official repos") |
| | | except subprocess.CalledProcessError as e: |
| | | logger.warning(f"Failed to get pacman package list: {e}") |
| | | self._pacman_cache = set() |
| | | |
| | | def is_in_official_repos(self, name: str) -> bool: |
| | | """Check if package is available in official repositories. |
| | | |
| | | Args: |
| | | name: Package name (without version constraint) |
| | | |
| | | Returns: |
| | | True if available in official repos |
| | | """ |
| | | if not self._pacman_checked: |
| | | self._refresh_pacman_cache() |
| | | |
| | | # Strip version constraint |
| | | base_name = name.split(">=")[0].split("<=")[0].split("=")[0].split(">")[0].split("<")[0] |
| | | return base_name in self._pacman_cache |
| | | |
| | | def is_installed(self, name: str) -> bool: |
| | | """Check if package is already installed. |
| | | |
| | | Args: |
| | | name: Package name |
| | | |
| | | Returns: |
| | | True if installed |
| | | """ |
| | | base_name = name.split(">=")[0].split("<=")[0].split("=")[0].split(">")[0].split("<")[0] |
| | | result = subprocess.run( |
| | | ["pacman", "-Qi", base_name], |
| | | capture_output=True, |
| | | ) |
| | | return result.returncode == 0 |
| | | |
| | | async def _get_all_dependencies( |
| | | self, |
| | | package: Package, |
| | | visited: set[str], |
| | | graph: dict[str, set[str]], |
| | | packages: dict[str, Package], |
| | | ) -> None: |
| | | """Recursively fetch all AUR dependencies. |
| | | |
| | | Args: |
| | | package: Package to get dependencies for |
| | | visited: Set of already visited package names |
| | | graph: Dependency graph (package -> dependencies) |
| | | packages: Map of package names to Package objects |
| | | """ |
| | | if package.name in visited: |
| | | return |
| | | |
| | | visited.add(package.name) |
| | | packages[package.name] = package |
| | | graph[package.name] = set() |
| | | |
| | | # Collect all dependencies |
| | | all_deps: list[str] = [] |
| | | all_deps.extend(package.depends) |
| | | all_deps.extend(package.makedepends) |
| | | |
| | | aur_deps: list[str] = [] |
| | | for dep in all_deps: |
| | | dep_parsed = Dependency.parse(dep) |
| | | base_name = dep_parsed.name |
| | | |
| | | # Skip if in official repos or already installed |
| | | if self.is_in_official_repos(base_name): |
| | | continue |
| | | if self.is_installed(base_name): |
| | | continue |
| | | |
| | | aur_deps.append(base_name) |
| | | graph[package.name].add(base_name) |
| | | |
| | | # Fetch AUR dependencies |
| | | if aur_deps: |
| | | dep_packages = await self.aur_client.get_packages(aur_deps) |
| | | for dep_pkg in dep_packages: |
| | | await self._get_all_dependencies(dep_pkg, visited, graph, packages) |
| | | |
| | | def _topological_sort(self, graph: dict[str, set[str]]) -> list[str]: |
| | | """Perform topological sort on dependency graph. |
| | | |
| | | Args: |
| | | graph: Dependency graph (package -> its dependencies) |
| | | |
| | | Returns: |
| | | List of packages in build order (dependencies first) |
| | | |
| | | Raises: |
| | | ValueError: If circular dependency detected |
| | | """ |
| | | # Ensure all nodes are in the graph (including leaf dependencies) |
| | | all_nodes: set[str] = set(graph.keys()) |
| | | for deps in graph.values(): |
| | | all_nodes.update(deps) |
| | | |
| | | # Add missing nodes to graph |
| | | for node in all_nodes: |
| | | if node not in graph: |
| | | graph[node] = set() |
| | | |
| | | # Kahn's algorithm |
| | | # in_degree[X] = number of packages that X depends on (outgoing edges) |
| | | # We want to process packages whose dependencies are all processed |
| | | in_degree: dict[str, int] = {node: len(graph[node]) for node in graph} |
| | | |
| | | # Start with nodes that have no dependencies (leaves) |
| | | queue = [node for node in graph if in_degree[node] == 0] |
| | | result: list[str] = [] |
| | | |
| | | while queue: |
| | | node = queue.pop(0) |
| | | result.append(node) |
| | | |
| | | # For each package that depends on this node, decrement its in-degree |
| | | for pkg, deps in graph.items(): |
| | | if node in deps: |
| | | in_degree[pkg] -= 1 |
| | | if in_degree[pkg] == 0: |
| | | queue.append(pkg) |
| | | |
| | | if len(result) != len(graph): |
| | | # Find cycle |
| | | remaining = set(graph.keys()) - set(result) |
| | | raise ValueError(f"Circular dependency detected involving: {remaining}") |
| | | |
| | | # Result is already in correct order (dependencies first) |
| | | return result |
| | | |
| | | def detect_cycles(self, graph: dict[str, set[str]]) -> list[list[str]]: |
| | | """Detect circular dependencies in graph. |
| | | |
| | | Args: |
| | | graph: Dependency graph |
| | | |
| | | Returns: |
| | | List of cycles (each cycle is a list of package names) |
| | | """ |
| | | cycles: list[list[str]] = [] |
| | | visited: set[str] = set() |
| | | rec_stack: set[str] = set() |
| | | path: list[str] = [] |
| | | |
| | | def dfs(node: str) -> bool: |
| | | visited.add(node) |
| | | rec_stack.add(node) |
| | | path.append(node) |
| | | |
| | | for neighbor in graph.get(node, set()): |
| | | if neighbor not in visited: |
| | | if dfs(neighbor): |
| | | return True |
| | | elif neighbor in rec_stack: |
| | | # Found cycle |
| | | cycle_start = path.index(neighbor) |
| | | cycles.append(path[cycle_start:] + [neighbor]) |
| | | return True |
| | | |
| | | path.pop() |
| | | rec_stack.remove(node) |
| | | return False |
| | | |
| | | for node in graph: |
| | | if node not in visited: |
| | | dfs(node) |
| | | |
| | | return cycles |
| | | |
| | | async def resolve(self, package_names: list[str]) -> BuildOrder: |
| | | """Resolve dependencies and determine build order. |
| | | |
| | | Args: |
| | | package_names: List of packages to resolve |
| | | |
| | | Returns: |
| | | BuildOrder with packages in correct build order |
| | | |
| | | Raises: |
| | | ValueError: If package not found or circular dependency |
| | | """ |
| | | logger.info(f"Resolving dependencies for: {', '.join(package_names)}") |
| | | |
| | | # Fetch requested packages |
| | | packages_list = await self.aur_client.get_packages(package_names) |
| | | if len(packages_list) != len(package_names): |
| | | found = {p.name for p in packages_list} |
| | | missing = set(package_names) - found |
| | | raise ValueError(f"Packages not found in AUR: {missing}") |
| | | |
| | | # Build dependency graph |
| | | visited: set[str] = set() |
| | | graph: dict[str, set[str]] = {} |
| | | packages: dict[str, Package] = {} |
| | | |
| | | for package in packages_list: |
| | | await self._get_all_dependencies(package, visited, graph, packages) |
| | | |
| | | # Check for cycles |
| | | cycles = self.detect_cycles(graph) |
| | | if cycles: |
| | | raise ValueError(f"Circular dependencies detected: {cycles}") |
| | | |
| | | # Topological sort |
| | | build_order = self._topological_sort(graph) |
| | | |
| | | # Build AUR dependency map |
| | | aur_deps: dict[str, list[Dependency]] = {} |
| | | for name in build_order: |
| | | pkg = packages.get(name) |
| | | if pkg: |
| | | deps: list[Dependency] = [] |
| | | for dep in pkg.depends: |
| | | parsed = Dependency.parse(dep, DependencyType.RUNTIME) |
| | | if not self.is_in_official_repos(parsed.name): |
| | | parsed.is_aur = True |
| | | deps.append(parsed) |
| | | for dep in pkg.makedepends: |
| | | parsed = Dependency.parse(dep, DependencyType.BUILD) |
| | | if not self.is_in_official_repos(parsed.name): |
| | | parsed.is_aur = True |
| | | deps.append(parsed) |
| | | aur_deps[name] = deps |
| | | |
| | | logger.info(f"Build order: {' -> '.join(build_order)}") |
| | | return BuildOrder(packages=build_order, aur_dependencies=aur_deps) |
| New file |
| | |
| | | #!/usr/bin/env python3 |
| | | """ |
| | | Integration test script for archbuild. |
| | | |
| | | This script creates a temporary repository, initializes it with a basic config, |
| | | adds test packages, and verifies they build and are added correctly. |
| | | |
| | | Usage: |
| | | python tests/integration_test.py [--keep-temp] |
| | | |
| | | Options: |
| | | --keep-temp Don't delete temporary directory after test (for debugging) |
| | | """ |
| | | |
| | | import asyncio |
| | | import os |
| | | import shutil |
| | | import subprocess |
| | | import sys |
| | | import tempfile |
| | | from pathlib import Path |
| | | |
| | | # Add src to path for development |
| | | sys.path.insert(0, str(Path(__file__).parent.parent / "src")) |
| | | |
| | | from archbuild.aur import AURClient |
| | | from archbuild.builder import Builder, BuildStatus |
| | | from archbuild.config import Config, RepositoryConfig, BuildingConfig, SigningConfig, PackageOverride |
| | | from archbuild.logging import setup_logging, console |
| | | from archbuild.repo import RepoManager |
| | | |
| | | |
| | | # Test packages - real packages that exist in the AUR |
| | | # Chosen for small size and fast build times |
| | | TEST_PACKAGES = [ |
| | | "neofetch-git", # Small bash script, very fast build |
| | | "yay", # Popular AUR helper |
| | | ] |
| | | |
| | | # Alternative packages if the above aren't available |
| | | FALLBACK_PACKAGES = [ |
| | | "gtk2", # Legacy GTK library |
| | | "paru", # Another AUR helper |
| | | ] |
| | | |
| | | |
| | | class IntegrationTest: |
| | | """Integration test runner.""" |
| | | |
| | | def __init__(self, keep_temp: bool = False): |
| | | self.keep_temp = keep_temp |
| | | self.temp_dir: Path | None = None |
| | | self.config: Config | None = None |
| | | self.passed = 0 |
| | | self.failed = 0 |
| | | |
| | | def setup(self) -> None: |
| | | """Set up temporary test environment.""" |
| | | console.print("\n[bold blue]═══ Setting up test environment ═══[/]") |
| | | |
| | | # Create temp directory |
| | | self.temp_dir = Path(tempfile.mkdtemp(prefix="archbuild_test_")) |
| | | console.print(f" Created temp directory: {self.temp_dir}") |
| | | |
| | | # Create subdirectories |
| | | repo_dir = self.temp_dir / "repo" |
| | | build_dir = self.temp_dir / "build" |
| | | repo_dir.mkdir() |
| | | build_dir.mkdir() |
| | | |
| | | # Create custom makepkg.conf that disables debug packages |
| | | # (avoids needing debugedit which may not be installed) |
| | | makepkg_conf = self.temp_dir / "makepkg.conf" |
| | | makepkg_conf.write_text(""" |
| | | # Minimal makepkg.conf for testing |
| | | CARCH="x86_64" |
| | | CHOST="x86_64-pc-linux-gnu" |
| | | CFLAGS="-O2 -pipe" |
| | | CXXFLAGS="$CFLAGS" |
| | | LDFLAGS="" |
| | | MAKEFLAGS="-j$(nproc)" |
| | | OPTIONS=(!debug !strip !staticlibs) |
| | | PKGEXT='.pkg.tar.zst' |
| | | SRCEXT='.src.tar.gz' |
| | | PACKAGER="Integration Test <test@test.local>" |
| | | |
| | | DLAGENTS=('file::/usr/bin/curl -qgC - -o %o %u' |
| | | 'ftp::/usr/bin/curl -qgfC - --ftp-pasv --retry 3 --retry-delay 3 -o %o %u' |
| | | 'http::/usr/bin/curl -qgb "" -fLC - --retry 3 --retry-delay 3 -o %o %u' |
| | | 'https::/usr/bin/curl -qgb "" -fLC - --retry 3 --retry-delay 3 -o %o %u' |
| | | 'rsync::/usr/bin/rsync --no-motd -z %u %o' |
| | | 'scp::/usr/bin/scp -C %u %o') |
| | | |
| | | VCSCLIENTS=('git::git' |
| | | 'hg::mercurial' |
| | | 'svn::subversion') |
| | | """) |
| | | |
| | | self.config = Config( |
| | | repository=RepositoryConfig( |
| | | name="testrepo", |
| | | path=repo_dir, |
| | | build_dir=build_dir, |
| | | compression="zst", |
| | | ), |
| | | building=BuildingConfig( |
| | | parallel=False, # Sequential for cleaner test output |
| | | max_workers=1, |
| | | clean=True, |
| | | update_system=False, |
| | | retry_attempts=2, |
| | | ), |
| | | signing=SigningConfig( |
| | | enabled=False, |
| | | ), |
| | | log_level="INFO", |
| | | # Apply package overrides to use our custom makepkg.conf |
| | | package_overrides={ |
| | | "_default": PackageOverride( |
| | | extra_args=["--config", str(makepkg_conf)], |
| | | ), |
| | | }, |
| | | ) |
| | | |
| | | setup_logging(self.config.log_level) |
| | | console.print(" [green]✓[/] Configuration created") |
| | | |
| | | def teardown(self) -> None: |
| | | """Clean up test environment.""" |
| | | if self.temp_dir and self.temp_dir.exists(): |
| | | if self.keep_temp: |
| | | console.print(f"\n[yellow]Keeping temp directory:[/] {self.temp_dir}") |
| | | else: |
| | | shutil.rmtree(self.temp_dir) |
| | | console.print("\n[dim]Cleaned up temp directory[/]") |
| | | |
| | | def check(self, condition: bool, message: str) -> bool: |
| | | """Check a condition and report pass/fail.""" |
| | | if condition: |
| | | console.print(f" [green]✓[/] {message}") |
| | | self.passed += 1 |
| | | return True |
| | | else: |
| | | console.print(f" [red]✗[/] {message}") |
| | | self.failed += 1 |
| | | return False |
| | | |
| | | async def test_init(self) -> bool: |
| | | """Test repository initialization.""" |
| | | console.print("\n[bold blue]═══ Test: Repository Initialization ═══[/]") |
| | | |
| | | repo = RepoManager(self.config) |
| | | repo.ensure_repo_exists() |
| | | |
| | | # Check directories exist |
| | | self.check( |
| | | self.config.repository.path.exists(), |
| | | f"Repository directory exists: {self.config.repository.path}" |
| | | ) |
| | | self.check( |
| | | self.config.repository.build_dir.exists(), |
| | | f"Build directory exists: {self.config.repository.build_dir}" |
| | | ) |
| | | |
| | | return self.failed == 0 |
| | | |
| | | async def test_aur_client(self) -> list[str]: |
| | | """Test AUR client and find available test packages.""" |
| | | console.print("\n[bold blue]═══ Test: AUR Client ═══[/]") |
| | | |
| | | available_packages = [] |
| | | |
| | | async with AURClient() as aur: |
| | | # Test package lookup |
| | | for pkg_name in TEST_PACKAGES + FALLBACK_PACKAGES: |
| | | pkg = await aur.get_package(pkg_name) |
| | | if pkg: |
| | | self.check(True, f"Found package: {pkg_name} ({pkg.version})") |
| | | available_packages.append(pkg_name) |
| | | if len(available_packages) >= 2: |
| | | break |
| | | else: |
| | | console.print(f" [yellow]⚠[/] Package not found: {pkg_name}") |
| | | |
| | | self.check( |
| | | len(available_packages) >= 1, |
| | | f"Found {len(available_packages)} test package(s)" |
| | | ) |
| | | |
| | | return available_packages |
| | | |
| | | async def test_build_packages(self, packages: list[str]) -> dict[str, BuildStatus]: |
| | | """Test building packages.""" |
| | | console.print("\n[bold blue]═══ Test: Package Building ═══[/]") |
| | | |
| | | results: dict[str, BuildStatus] = {} |
| | | |
| | | async with AURClient() as aur: |
| | | async with Builder(self.config, aur) as builder: |
| | | for pkg_name in packages: |
| | | console.print(f"\n Building {pkg_name}...") |
| | | result = await builder.build_package(pkg_name, force=True) |
| | | results[pkg_name] = result.status |
| | | |
| | | if result.status == BuildStatus.SUCCESS: |
| | | self.check(True, f"Built {pkg_name} successfully ({result.duration:.1f}s)") |
| | | self.check( |
| | | len(result.artifacts) > 0, |
| | | f" Created {len(result.artifacts)} artifact(s)" |
| | | ) |
| | | for artifact in result.artifacts: |
| | | console.print(f" → {artifact.name}") |
| | | else: |
| | | self.check(False, f"Failed to build {pkg_name}: {result.error}") |
| | | |
| | | return results |
| | | |
| | | async def test_repo_add(self, packages: list[str]) -> None: |
| | | """Test adding packages to repository.""" |
| | | console.print("\n[bold blue]═══ Test: Repository Management ═══[/]") |
| | | |
| | | repo = RepoManager(self.config) |
| | | |
| | | async with AURClient() as aur: |
| | | async with Builder(self.config, aur) as builder: |
| | | for pkg_name in packages: |
| | | # Get the build result with artifacts |
| | | pkg_dir = self.config.repository.build_dir / pkg_name |
| | | if not pkg_dir.exists(): |
| | | continue |
| | | |
| | | # Find artifacts |
| | | artifacts = list(pkg_dir.glob("*.pkg.tar.*")) |
| | | artifacts = [a for a in artifacts if not a.name.endswith(".sig")] |
| | | |
| | | if artifacts: |
| | | # Create a mock build result for add_packages |
| | | from archbuild.builder import BuildResult |
| | | mock_result = BuildResult( |
| | | package=pkg_name, |
| | | status=BuildStatus.SUCCESS, |
| | | artifacts=artifacts, |
| | | ) |
| | | success = repo.add_packages(mock_result) |
| | | self.check(success, f"Added {pkg_name} to repository") |
| | | |
| | | # List packages in repo |
| | | pkg_list = repo.list_packages() |
| | | self.check(len(pkg_list) > 0, f"Repository contains {len(pkg_list)} package(s)") |
| | | |
| | | for pkg in pkg_list: |
| | | console.print(f" → {pkg.name} {pkg.version} ({pkg.size / 1024:.1f} KB)") |
| | | |
| | | async def test_repo_database(self) -> None: |
| | | """Test repository database integrity.""" |
| | | console.print("\n[bold blue]═══ Test: Database Integrity ═══[/]") |
| | | |
| | | db_path = self.config.repository.path / f"{self.config.repository.name}.db.tar.zst" |
| | | |
| | | self.check(db_path.exists(), f"Database file exists: {db_path.name}") |
| | | |
| | | if db_path.exists(): |
| | | # Try to list contents with tar |
| | | result = subprocess.run( |
| | | ["tar", "-tf", str(db_path)], |
| | | capture_output=True, |
| | | text=True, |
| | | ) |
| | | if result.returncode == 0: |
| | | entries = [e for e in result.stdout.strip().split("\n") if e] |
| | | self.check( |
| | | len(entries) > 0, |
| | | f"Database contains {len(entries)} entries" |
| | | ) |
| | | |
| | | # Check for integrity issues |
| | | repo = RepoManager(self.config) |
| | | issues = repo.check_integrity() |
| | | self.check( |
| | | len(issues) == 0, |
| | | f"No integrity issues found" if not issues else f"Issues: {issues}" |
| | | ) |
| | | |
| | | async def test_cleanup(self) -> None: |
| | | """Test package cleanup functionality.""" |
| | | console.print("\n[bold blue]═══ Test: Cleanup ═══[/]") |
| | | |
| | | repo = RepoManager(self.config) |
| | | removed = repo.cleanup() |
| | | |
| | | self.check(True, f"Cleanup removed {removed} old version(s)") |
| | | |
| | | async def run(self) -> bool: |
| | | """Run all integration tests.""" |
| | | console.print("[bold magenta]╔════════════════════════════════════════════╗[/]") |
| | | console.print("[bold magenta]║ Archbuild Integration Test Suite ║[/]") |
| | | console.print("[bold magenta]╚════════════════════════════════════════════╝[/]") |
| | | |
| | | try: |
| | | self.setup() |
| | | |
| | | # Run tests |
| | | await self.test_init() |
| | | |
| | | packages = await self.test_aur_client() |
| | | if not packages: |
| | | console.print("[red]No test packages available, cannot continue[/]") |
| | | return False |
| | | |
| | | build_results = await self.test_build_packages(packages) # Build all found packages |
| | | |
| | | successful = [p for p, s in build_results.items() if s == BuildStatus.SUCCESS] |
| | | if successful: |
| | | await self.test_repo_add(successful) |
| | | await self.test_repo_database() |
| | | await self.test_cleanup() |
| | | else: |
| | | console.print("[yellow]No successful builds to test repository with[/]") |
| | | |
| | | # Summary |
| | | console.print("\n[bold blue]═══ Test Summary ═══[/]") |
| | | total = self.passed + self.failed |
| | | console.print(f" Total: {total}") |
| | | console.print(f" [green]Passed: {self.passed}[/]") |
| | | console.print(f" [red]Failed: {self.failed}[/]") |
| | | |
| | | if self.failed == 0: |
| | | console.print("\n[bold green]✓ All tests passed![/]") |
| | | return True |
| | | else: |
| | | console.print(f"\n[bold red]✗ {self.failed} test(s) failed[/]") |
| | | return False |
| | | |
| | | except KeyboardInterrupt: |
| | | console.print("\n[yellow]Test interrupted[/]") |
| | | return False |
| | | except Exception as e: |
| | | console.print(f"\n[bold red]Test error:[/] {e}") |
| | | import traceback |
| | | traceback.print_exc() |
| | | return False |
| | | finally: |
| | | self.teardown() |
| | | |
| | | |
| | | async def main() -> int: |
| | | """Main entry point.""" |
| | | keep_temp = "--keep-temp" in sys.argv |
| | | |
| | | # Check if running on Arch Linux |
| | | if not Path("/etc/arch-release").exists(): |
| | | console.print("[yellow]Warning: Not running on Arch Linux[/]") |
| | | console.print("[yellow]Some tests may fail or be skipped[/]") |
| | | |
| | | # Check for required tools |
| | | for tool in ["makepkg", "pacman", "git"]: |
| | | result = subprocess.run(["which", tool], capture_output=True) |
| | | if result.returncode != 0: |
| | | console.print(f"[red]Required tool not found: {tool}[/]") |
| | | return 1 |
| | | |
| | | test = IntegrationTest(keep_temp=keep_temp) |
| | | success = await test.run() |
| | | |
| | | return 0 if success else 1 |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | exit_code = asyncio.run(main()) |
| | | sys.exit(exit_code) |
| New file |
| | |
| | | """Tests for AUR client.""" |
| | | |
| | | import pytest |
| | | from unittest.mock import AsyncMock, patch, MagicMock |
| | | from datetime import datetime |
| | | |
| | | from archbuild.aur import AURClient, Package |
| | | |
| | | |
| | | @pytest.fixture |
| | | def sample_package_data(): |
| | | """Sample AUR RPC response data.""" |
| | | return { |
| | | "Name": "test-package", |
| | | "Version": "1.0.0-1", |
| | | "Description": "A test package", |
| | | "URL": "https://example.com", |
| | | "Maintainer": "testuser", |
| | | "NumVotes": 100, |
| | | "Popularity": 1.5, |
| | | "OutOfDate": None, |
| | | "FirstSubmitted": 1609459200, |
| | | "LastModified": 1640995200, |
| | | "Depends": ["dep1", "dep2"], |
| | | "MakeDepends": ["makedep1"], |
| | | "CheckDepends": [], |
| | | "OptDepends": [], |
| | | "Provides": [], |
| | | "Conflicts": [], |
| | | "Replaces": [], |
| | | "License": ["MIT"], |
| | | "Keywords": ["test"], |
| | | } |
| | | |
| | | |
| | | class TestPackage: |
| | | """Tests for Package dataclass.""" |
| | | |
| | | def test_from_rpc(self, sample_package_data): |
| | | """Test creating Package from RPC data.""" |
| | | pkg = Package.from_rpc(sample_package_data) |
| | | |
| | | assert pkg.name == "test-package" |
| | | assert pkg.version == "1.0.0-1" |
| | | assert pkg.description == "A test package" |
| | | assert pkg.maintainer == "testuser" |
| | | assert pkg.votes == 100 |
| | | assert pkg.depends == ["dep1", "dep2"] |
| | | assert pkg.makedepends == ["makedep1"] |
| | | |
| | | def test_git_url(self, sample_package_data): |
| | | """Test git_url property.""" |
| | | pkg = Package.from_rpc(sample_package_data) |
| | | assert pkg.git_url == "https://aur.archlinux.org/test-package.git" |
| | | |
| | | def test_aur_url(self, sample_package_data): |
| | | """Test aur_url property.""" |
| | | pkg = Package.from_rpc(sample_package_data) |
| | | assert pkg.aur_url == "https://aur.archlinux.org/packages/test-package" |
| | | |
| | | |
| | | class TestAURClient: |
| | | """Tests for AURClient.""" |
| | | |
| | | @pytest.mark.asyncio |
| | | async def test_get_package_cached(self, sample_package_data): |
| | | """Test that cached packages are returned.""" |
| | | async with AURClient(cache_ttl=300) as client: |
| | | # Manually add to cache |
| | | pkg = Package.from_rpc(sample_package_data) |
| | | client._set_cached(pkg) |
| | | |
| | | # Should return from cache without network request |
| | | result = await client.get_package("test-package") |
| | | assert result is not None |
| | | assert result.name == "test-package" |
| | | |
| | | @pytest.mark.asyncio |
| | | async def test_cache_expiry(self, sample_package_data): |
| | | """Test cache TTL expiry.""" |
| | | async with AURClient(cache_ttl=0) as client: |
| | | pkg = Package.from_rpc(sample_package_data) |
| | | client._set_cached(pkg) |
| | | |
| | | # Cache should be expired immediately |
| | | assert client._get_cached("test-package") is None |
| | | |
| | | @pytest.mark.asyncio |
| | | async def test_batch_request(self, sample_package_data): |
| | | """Test batch package requests.""" |
| | | async with AURClient() as client: |
| | | # Mock the request method |
| | | mock_response = { |
| | | "type": "multiinfo", |
| | | "results": [sample_package_data], |
| | | } |
| | | client._request = AsyncMock(return_value=mock_response) |
| | | |
| | | packages = await client.get_packages(["test-package"]) |
| | | |
| | | assert len(packages) == 1 |
| | | assert packages[0].name == "test-package" |
| | | |
| | | |
| | | class TestDependencyParsing: |
| | | """Tests for dependency string parsing.""" |
| | | |
| | | def test_parse_simple(self): |
| | | """Test parsing simple dependency.""" |
| | | from archbuild.resolver import Dependency |
| | | dep = Dependency.parse("package") |
| | | assert dep.name == "package" |
| | | assert dep.version_constraint is None |
| | | |
| | | def test_parse_with_version(self): |
| | | """Test parsing dependency with version.""" |
| | | from archbuild.resolver import Dependency |
| | | dep = Dependency.parse("package>=1.0") |
| | | assert dep.name == "package" |
| | | assert dep.version_constraint == ">=1.0" |
| | | |
| | | def test_parse_exact_version(self): |
| | | """Test parsing dependency with exact version.""" |
| | | from archbuild.resolver import Dependency |
| | | dep = Dependency.parse("package=2.0") |
| | | assert dep.name == "package" |
| | | assert dep.version_constraint == "=2.0" |
| New file |
| | |
| | | """Tests for configuration loading and validation.""" |
| | | |
| | | import pytest |
| | | from pathlib import Path |
| | | from tempfile import NamedTemporaryFile |
| | | |
| | | from archbuild.config import ( |
| | | Config, |
| | | load_config, |
| | | migrate_vars_sh, |
| | | save_config, |
| | | BuildingConfig, |
| | | RepositoryConfig, |
| | | ) |
| | | |
| | | |
| | | class TestConfig: |
| | | """Tests for Config model.""" |
| | | |
| | | def test_minimal_config(self): |
| | | """Test minimal valid configuration.""" |
| | | config = Config( |
| | | repository=RepositoryConfig( |
| | | name="test", |
| | | path=Path("/repo"), |
| | | build_dir=Path("/build"), |
| | | ) |
| | | ) |
| | | assert config.repository.name == "test" |
| | | assert config.building.parallel is True # default |
| | | assert config.building.max_workers == 4 # default |
| | | |
| | | def test_building_config_defaults(self): |
| | | """Test BuildingConfig defaults.""" |
| | | config = BuildingConfig() |
| | | assert config.parallel is True |
| | | assert config.max_workers == 4 |
| | | assert config.clean is True |
| | | assert config.retry_attempts == 3 |
| | | |
| | | def test_max_workers_validation(self): |
| | | """Test max_workers bounds.""" |
| | | with pytest.raises(ValueError): |
| | | BuildingConfig(max_workers=0) |
| | | with pytest.raises(ValueError): |
| | | BuildingConfig(max_workers=100) |
| | | |
| | | def test_log_level_validation(self): |
| | | """Test log level validation.""" |
| | | config = Config( |
| | | repository=RepositoryConfig( |
| | | name="test", |
| | | path=Path("/repo"), |
| | | build_dir=Path("/build"), |
| | | ), |
| | | log_level="debug", |
| | | ) |
| | | assert config.log_level == "DEBUG" |
| | | |
| | | with pytest.raises(ValueError): |
| | | Config( |
| | | repository=RepositoryConfig( |
| | | name="test", |
| | | path=Path("/repo"), |
| | | build_dir=Path("/build"), |
| | | ), |
| | | log_level="invalid", |
| | | ) |
| | | |
| | | |
| | | class TestLoadConfig: |
| | | """Tests for config file loading.""" |
| | | |
| | | def test_load_yaml(self, tmp_path): |
| | | """Test loading YAML config file.""" |
| | | config_file = tmp_path / "config.yaml" |
| | | config_file.write_text(""" |
| | | repository: |
| | | name: myrepo |
| | | path: /repo/x86_64 |
| | | build_dir: /repo/build |
| | | building: |
| | | max_workers: 8 |
| | | """) |
| | | config = load_config(config_file) |
| | | assert config.repository.name == "myrepo" |
| | | assert config.building.max_workers == 8 |
| | | |
| | | def test_file_not_found(self): |
| | | """Test error on missing config file.""" |
| | | with pytest.raises(FileNotFoundError): |
| | | load_config(Path("/nonexistent/config.yaml")) |
| | | |
| | | |
| | | class TestMigrateVarsSh: |
| | | """Tests for vars.sh migration.""" |
| | | |
| | | def test_migrate_basic(self, tmp_path): |
| | | """Test basic vars.sh migration.""" |
| | | vars_file = tmp_path / "vars.sh" |
| | | vars_file.write_text(""" |
| | | REPODIR=/repo/x86_64 |
| | | BUILDDIR=/repo/build |
| | | REPONAME=myrepo |
| | | PARALLEL=Y |
| | | SIGN=N |
| | | NUM_OLD=5 |
| | | """) |
| | | data = migrate_vars_sh(vars_file) |
| | | |
| | | assert data["repository"]["name"] == "myrepo" |
| | | assert data["repository"]["path"] == "/repo/x86_64" |
| | | assert data["building"]["parallel"] is True |
| | | assert data["signing"]["enabled"] is False |
| | | assert data["retention"]["keep_versions"] == 5 |
| | | |
| | | def test_migrate_with_export(self, tmp_path): |
| | | """Test migration handles export statements.""" |
| | | vars_file = tmp_path / "vars.sh" |
| | | vars_file.write_text(""" |
| | | export REPONAME="testrepo" |
| | | export REPODIR="/test/repo" |
| | | export BUILDDIR="/test/build" |
| | | """) |
| | | data = migrate_vars_sh(vars_file) |
| | | |
| | | assert data["repository"]["name"] == "testrepo" |
| | | |
| | | |
| | | class TestSaveConfig: |
| | | """Tests for config saving.""" |
| | | |
| | | def test_round_trip(self, tmp_path): |
| | | """Test config save/load round trip.""" |
| | | config = Config( |
| | | repository=RepositoryConfig( |
| | | name="test", |
| | | path=Path("/repo"), |
| | | build_dir=Path("/build"), |
| | | ), |
| | | building=BuildingConfig(max_workers=6), |
| | | ) |
| | | |
| | | config_file = tmp_path / "config.yaml" |
| | | save_config(config, config_file) |
| | | |
| | | loaded = load_config(config_file) |
| | | assert loaded.repository.name == "test" |
| | | assert loaded.building.max_workers == 6 |
| New file |
| | |
| | | """Tests for dependency resolver.""" |
| | | |
| | | import pytest |
| | | from unittest.mock import AsyncMock, patch |
| | | |
| | | from archbuild.resolver import DependencyResolver, Dependency, DependencyType, BuildOrder |
| | | |
| | | |
| | | class TestDependency: |
| | | """Tests for Dependency class.""" |
| | | |
| | | def test_parse_simple(self): |
| | | """Test parsing simple dependency name.""" |
| | | dep = Dependency.parse("packagename") |
| | | assert dep.name == "packagename" |
| | | assert dep.version_constraint is None |
| | | assert dep.dep_type == DependencyType.RUNTIME |
| | | |
| | | def test_parse_with_gte(self): |
| | | """Test parsing with >= constraint.""" |
| | | dep = Dependency.parse("package>=1.0") |
| | | assert dep.name == "package" |
| | | assert dep.version_constraint == ">=1.0" |
| | | |
| | | def test_parse_with_lte(self): |
| | | """Test parsing with <= constraint.""" |
| | | dep = Dependency.parse("package<=2.0") |
| | | assert dep.name == "package" |
| | | assert dep.version_constraint == "<=2.0" |
| | | |
| | | def test_parse_build_dep(self): |
| | | """Test parsing as build dependency.""" |
| | | dep = Dependency.parse("makedep", DependencyType.BUILD) |
| | | assert dep.dep_type == DependencyType.BUILD |
| | | |
| | | |
| | | class TestBuildOrder: |
| | | """Tests for BuildOrder class.""" |
| | | |
| | | def test_iteration(self): |
| | | """Test BuildOrder iteration.""" |
| | | order = BuildOrder(packages=["a", "b", "c"]) |
| | | assert list(order) == ["a", "b", "c"] |
| | | |
| | | def test_length(self): |
| | | """Test BuildOrder length.""" |
| | | order = BuildOrder(packages=["a", "b"]) |
| | | assert len(order) == 2 |
| | | |
| | | |
| | | class TestDependencyResolver: |
| | | """Tests for DependencyResolver.""" |
| | | |
| | | @pytest.fixture |
| | | def mock_aur_client(self): |
| | | """Create mock AUR client.""" |
| | | client = AsyncMock() |
| | | return client |
| | | |
| | | def test_topological_sort_simple(self, mock_aur_client): |
| | | """Test topological sort with simple graph.""" |
| | | resolver = DependencyResolver(mock_aur_client) |
| | | |
| | | # A depends on B, B depends on C |
| | | graph = { |
| | | "A": {"B"}, |
| | | "B": {"C"}, |
| | | "C": set(), |
| | | } |
| | | |
| | | order = resolver._topological_sort(graph) |
| | | |
| | | # C must come before B, B must come before A |
| | | assert order.index("C") < order.index("B") |
| | | assert order.index("B") < order.index("A") |
| | | |
| | | def test_topological_sort_parallel(self, mock_aur_client): |
| | | """Test topological sort with parallel dependencies.""" |
| | | resolver = DependencyResolver(mock_aur_client) |
| | | |
| | | # A depends on B and C (parallel) |
| | | graph = { |
| | | "A": {"B", "C"}, |
| | | "B": set(), |
| | | "C": set(), |
| | | } |
| | | |
| | | order = resolver._topological_sort(graph) |
| | | |
| | | # B and C must come before A |
| | | assert order.index("B") < order.index("A") |
| | | assert order.index("C") < order.index("A") |
| | | |
| | | def test_detect_cycles_no_cycle(self, mock_aur_client): |
| | | """Test cycle detection with no cycles.""" |
| | | resolver = DependencyResolver(mock_aur_client) |
| | | |
| | | graph = { |
| | | "A": {"B"}, |
| | | "B": {"C"}, |
| | | "C": set(), |
| | | } |
| | | |
| | | cycles = resolver.detect_cycles(graph) |
| | | assert len(cycles) == 0 |
| | | |
| | | def test_detect_cycles_with_cycle(self, mock_aur_client): |
| | | """Test cycle detection with cycle.""" |
| | | resolver = DependencyResolver(mock_aur_client) |
| | | |
| | | # A -> B -> C -> A (cycle) |
| | | graph = { |
| | | "A": {"B"}, |
| | | "B": {"C"}, |
| | | "C": {"A"}, |
| | | } |
| | | |
| | | cycles = resolver.detect_cycles(graph) |
| | | assert len(cycles) > 0 |
| | | |
| | | @patch("archbuild.resolver.subprocess.run") |
| | | def test_is_in_official_repos(self, mock_run, mock_aur_client): |
| | | """Test checking official repos.""" |
| | | mock_run.return_value.returncode = 0 |
| | | mock_run.return_value.stdout = "base\ngit\nvim\n" |
| | | |
| | | resolver = DependencyResolver(mock_aur_client) |
| | | resolver._refresh_pacman_cache() |
| | | |
| | | assert resolver.is_in_official_repos("git") |
| | | assert not resolver.is_in_official_repos("yay") |