| | |
| | | |
| | | logger = get_logger("resolver") |
| | | |
| | | # Official Arch Linux repositories |
| | | OFFICIAL_REPOS = {"core", "extra", "multilib", "testing", "extra-testing", "multilib-testing", "gnome-unstable", "kde-unstable"} |
| | | |
| | | |
| | | |
| | | class DependencyType(Enum): |
| | | """Type of dependency.""" |
| | |
| | | aur_client: AURClient instance for fetching package info |
| | | """ |
| | | self.aur_client = aur_client |
| | | self._pacman_cache: set[str] = set() |
| | | self._pacman_cache: dict[str, set[str]] = {} # repo -> packages |
| | | self._pacman_checked = False |
| | | |
| | | def _refresh_pacman_cache(self) -> None: |
| | | """Refresh cache of packages available from official repos.""" |
| | | def _refresh_pacman_cache(self, sync: bool = False) -> None: |
| | | """Refresh cache of packages available from official repos. |
| | | |
| | | Args: |
| | | sync: Whether to synchronize pacman databases first using sudo pacman -Sy |
| | | """ |
| | | try: |
| | | if sync: |
| | | logger.info("Synchronizing pacman databases...") |
| | | subprocess.run( |
| | | ["sudo", "pacman", "-Sy", "--noconfirm"], |
| | | capture_output=True, |
| | | text=True, |
| | | check=True, |
| | | ) |
| | | |
| | | result = subprocess.run( |
| | | ["pacman", "-Slq"], |
| | | ["pacman", "-Sl"], |
| | | capture_output=True, |
| | | text=True, |
| | | check=True, |
| | | ) |
| | | self._pacman_cache = set(result.stdout.strip().split("\n")) |
| | | self._pacman_cache = {} |
| | | for line in result.stdout.strip().split("\n"): |
| | | if not line: |
| | | continue |
| | | parts = line.split() |
| | | if len(parts) >= 2: |
| | | repo, name = parts[0], parts[1] |
| | | if repo not in self._pacman_cache: |
| | | self._pacman_cache[repo] = set() |
| | | self._pacman_cache[repo].add(name) |
| | | |
| | | self._pacman_checked = True |
| | | logger.debug(f"Cached {len(self._pacman_cache)} packages from official repos") |
| | | total_pkgs = sum(len(pkgs) for pkgs in self._pacman_cache.values()) |
| | | logger.debug(f"Cached {total_pkgs} packages from {len(self._pacman_cache)} repos") |
| | | except subprocess.CalledProcessError as e: |
| | | logger.warning(f"Failed to get pacman package list: {e}") |
| | | self._pacman_cache = set() |
| | | logger.warning(f"Failed to refresh pacman cache: {e}") |
| | | if not self._pacman_cache: |
| | | self._pacman_cache = {} |
| | | |
| | | def is_in_official_repos(self, name: str) -> bool: |
| | | """Check if package is available in official repositories. |
| | | def is_in_repos(self, name: str, include_all: bool = True) -> str | None: |
| | | """Check if package is available in repositories. |
| | | |
| | | Args: |
| | | name: Package name (without version constraint) |
| | | include_all: If True, check all enabled repos. If False, only official ones. |
| | | |
| | | Returns: |
| | | True if available in official repos |
| | | Name of repository where package was found, or None if not found |
| | | """ |
| | | if not self._pacman_checked: |
| | | self._refresh_pacman_cache() |
| | | |
| | | # Strip version constraint |
| | | base_name = name.split(">=")[0].split("<=")[0].split("=")[0].split(">")[0].split("<")[0] |
| | | return base_name in self._pacman_cache |
| | | |
| | | for repo, pkgs in self._pacman_cache.items(): |
| | | if not include_all and repo not in OFFICIAL_REPOS: |
| | | continue |
| | | if base_name in pkgs: |
| | | return repo |
| | | |
| | | # Fallback: check provides via pacman -Sp |
| | | try: |
| | | # Use pacman -Sp --noconfirm to see if pacman can resolve it |
| | | result = subprocess.run( |
| | | ["pacman", "-Sp", "--noconfirm", base_name], |
| | | capture_output=True, |
| | | text=True, |
| | | ) |
| | | if result.returncode == 0: |
| | | # Successfully resolved. Find what it resolved to. |
| | | # Output looks like: file:///var/cache/pacman/pkg/name-version... |
| | | output = result.stdout.strip().split("\n")[-1] |
| | | if output: |
| | | filename = output.split("/")[-1] |
| | | # The package name is before the version |
| | | import re |
| | | match = re.search(r"^(.*?)-[0-9].*", filename) |
| | | if match: |
| | | resolved_name = match.group(1) |
| | | # Now check which repo this resolved_name belongs to |
| | | # We already have resolved_name in our cache if it's in a repo |
| | | for repo, pkgs in self._pacman_cache.items(): |
| | | if not include_all and repo not in OFFICIAL_REPOS: |
| | | continue |
| | | if resolved_name in pkgs: |
| | | return repo |
| | | except Exception as e: |
| | | logger.debug(f"Failed to resolve provides for {base_name}: {e}") |
| | | |
| | | return None |
| | | |
| | | def is_installed(self, name: str) -> bool: |
| | | """Check if package is already installed. |
| | |
| | | all_deps: list[str] = [] |
| | | all_deps.extend(package.depends) |
| | | all_deps.extend(package.makedepends) |
| | | all_deps.extend(package.checkdepends) |
| | | |
| | | aur_deps: list[str] = [] |
| | | for dep in all_deps: |
| | | dep_parsed = Dependency.parse(dep) |
| | | base_name = dep_parsed.name |
| | | |
| | | # Skip if in official repos or already installed |
| | | if self.is_in_official_repos(base_name): |
| | | # Skip if in repos or already installed |
| | | if self.is_in_repos(base_name): |
| | | continue |
| | | if self.is_installed(base_name): |
| | | continue |
| | |
| | | Raises: |
| | | ValueError: If package not found or circular dependency |
| | | """ |
| | | # Filter out packages already in official repos or installed |
| | | # Filter out packages already in repos or installed |
| | | aur_package_names = [] |
| | | for name in package_names: |
| | | if self.is_in_official_repos(name): |
| | | logger.info(f"Package {name} found in official repositories, skipping AUR lookup") |
| | | if self.is_in_repos(name): |
| | | logger.info(f"Package {name} found in repositories, skipping AUR lookup") |
| | | continue |
| | | if self.is_installed(name): |
| | | logger.info(f"Package {name} is already installed, skipping AUR lookup") |
| | |
| | | deps: list[Dependency] = [] |
| | | for dep in pkg.depends: |
| | | parsed = Dependency.parse(dep, DependencyType.RUNTIME) |
| | | if not self.is_in_official_repos(parsed.name): |
| | | if not self.is_in_repos(parsed.name): |
| | | parsed.is_aur = True |
| | | deps.append(parsed) |
| | | for dep in pkg.makedepends: |
| | | parsed = Dependency.parse(dep, DependencyType.BUILD) |
| | | if not self.is_in_official_repos(parsed.name): |
| | | if not self.is_in_repos(parsed.name): |
| | | parsed.is_aur = True |
| | | deps.append(parsed) |
| | | for dep in pkg.checkdepends: |
| | | parsed = Dependency.parse(dep, DependencyType.CHECK) |
| | | if not self.is_in_repos(parsed.name): |
| | | parsed.is_aur = True |
| | | deps.append(parsed) |
| | | aur_deps[name] = deps |