"""Async AUR RPC API client with caching and retry logic.""" import asyncio from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum from typing import Any import aiohttp from archbuild.logging import get_logger logger = get_logger("aur") AUR_RPC_URL = "https://aur.archlinux.org/rpc" AUR_PACKAGE_URL = "https://aur.archlinux.org/packages" AUR_GIT_URL = "https://aur.archlinux.org" class PackageType(Enum): """Package type from AUR.""" NORMAL = "normal" SPLIT = "split" @dataclass class Package: """AUR package metadata.""" name: str version: str description: str url: str | None maintainer: str | None votes: int popularity: float out_of_date: datetime | None first_submitted: datetime last_modified: datetime depends: list[str] = field(default_factory=list) makedepends: list[str] = field(default_factory=list) checkdepends: list[str] = field(default_factory=list) optdepends: list[str] = field(default_factory=list) provides: list[str] = field(default_factory=list) conflicts: list[str] = field(default_factory=list) replaces: list[str] = field(default_factory=list) license: list[str] = field(default_factory=list) keywords: list[str] = field(default_factory=list) @property def git_url(self) -> str: """Get the git clone URL for this package.""" return f"{AUR_GIT_URL}/{self.name}.git" @property def aur_url(self) -> str: """Get the AUR web page URL for this package.""" return f"{AUR_PACKAGE_URL}/{self.name}" @classmethod def from_rpc(cls, data: dict[str, Any]) -> "Package": """Create Package from AUR RPC response data. Args: data: Package data from AUR RPC API Returns: Package instance """ return cls( name=data["Name"], version=data["Version"], description=data.get("Description", ""), url=data.get("URL"), maintainer=data.get("Maintainer"), votes=data.get("NumVotes", 0), popularity=data.get("Popularity", 0.0), out_of_date=( datetime.fromtimestamp(data["OutOfDate"]) if data.get("OutOfDate") else None ), first_submitted=datetime.fromtimestamp(data["FirstSubmitted"]), last_modified=datetime.fromtimestamp(data["LastModified"]), depends=data.get("Depends", []), makedepends=data.get("MakeDepends", []), checkdepends=data.get("CheckDepends", []), optdepends=data.get("OptDepends", []), provides=data.get("Provides", []), conflicts=data.get("Conflicts", []), replaces=data.get("Replaces", []), license=data.get("License", []), keywords=data.get("Keywords", []), ) @dataclass class CacheEntry: """Cache entry with TTL.""" data: Package expires: datetime class AURClient: """Async client for AUR RPC API with caching and retry.""" def __init__( self, cache_ttl: int = 300, max_retries: int = 3, retry_delay: float = 1.0, batch_size: int = 100, ): """Initialize AUR client. Args: cache_ttl: Cache time-to-live in seconds max_retries: Maximum number of retry attempts retry_delay: Base delay between retries (exponential backoff) batch_size: Maximum packages per batch request """ self.cache_ttl = cache_ttl self.max_retries = max_retries self.retry_delay = retry_delay self.batch_size = batch_size self._cache: dict[str, CacheEntry] = {} self._session: aiohttp.ClientSession | None = None async def __aenter__(self) -> "AURClient": """Async context manager entry.""" self._session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Async context manager exit.""" if self._session: await self._session.close() self._session = None def _get_cached(self, name: str) -> Package | None: """Get package from cache if not expired. Args: name: Package name Returns: Cached package or None if not cached/expired """ entry = self._cache.get(name) if entry and entry.expires > datetime.now(): return entry.data return None def _set_cached(self, package: Package) -> None: """Store package in cache. Args: package: Package to cache """ self._cache[package.name] = CacheEntry( data=package, expires=datetime.now() + timedelta(seconds=self.cache_ttl), ) async def _request( self, params: list[tuple[str, Any]] | dict[str, Any], ) -> dict[str, Any]: """Make request to AUR RPC API with retry logic. Args: params: Query parameters (as list of tuples for repeated keys, or dict) Returns: JSON response data Raises: aiohttp.ClientError: If request fails after all retries """ if not self._session: raise RuntimeError("AURClient must be used as async context manager") last_error: Exception | None = None for attempt in range(self.max_retries): try: async with self._session.get(AUR_RPC_URL, params=params) as response: response.raise_for_status() data = await response.json() if data.get("type") == "error": raise ValueError(f"AUR API error: {data.get('error')}") return data except (aiohttp.ClientError, asyncio.TimeoutError) as e: last_error = e if attempt < self.max_retries - 1: delay = self.retry_delay * (2**attempt) logger.warning( f"AUR request failed (attempt {attempt + 1}/{self.max_retries}), " f"retrying in {delay}s: {e}" ) await asyncio.sleep(delay) raise last_error or RuntimeError("Request failed") async def get_package(self, name: str) -> Package | None: """Get a single package by name. Args: name: Package name Returns: Package if found, None otherwise """ # Check cache first cached = self._get_cached(name) if cached: logger.debug(f"Cache hit for package: {name}") return cached packages = await self.get_packages([name]) return packages[0] if packages else None async def get_packages(self, names: list[str]) -> list[Package]: """Get multiple packages by name using batch queries. Args: names: List of package names Returns: List of found packages (may be fewer than requested) """ # Separate cached and uncached packages result: list[Package] = [] uncached: list[str] = [] for name in names: cached = self._get_cached(name) if cached: result.append(cached) else: uncached.append(name) if not uncached: return result # Batch request uncached packages for i in range(0, len(uncached), self.batch_size): batch = uncached[i : i + self.batch_size] # Build params as list of tuples for repeated arg[] keys params: list[tuple[str, Any]] = [("v", 5), ("type", "info")] for name in batch: params.append(("arg[]", name)) data = await self._request(params) for pkg_data in data.get("results", []): package = Package.from_rpc(pkg_data) self._set_cached(package) result.append(package) return result async def search(self, query: str, by: str = "name-desc") -> list[Package]: """Search AUR packages. Args: query: Search query string by: Search field (name, name-desc, maintainer, depends, makedepends, optdepends, checkdepends) Returns: List of matching packages """ params = {"v": 5, "type": "search", "by": by, "arg": query} data = await self._request(params) packages = [] for pkg_data in data.get("results", []): package = Package.from_rpc(pkg_data) self._set_cached(package) packages.append(package) return packages async def is_available(self, name: str) -> bool: """Check if a package exists in the AUR. Args: name: Package name Returns: True if package exists """ package = await self.get_package(name) return package is not None def clear_cache(self) -> None: """Clear the package cache.""" self._cache.clear()