| | |
| | | self._pacman_cache: dict[str, set[str]] = {} # repo -> packages |
| | | self._pacman_checked = False |
| | | |
| | | def _refresh_pacman_cache(self) -> None: |
| | | """Refresh cache of packages available from official repos.""" |
| | | def _refresh_pacman_cache(self, sync: bool = False) -> None: |
| | | """Refresh cache of packages available from official repos. |
| | | |
| | | Args: |
| | | sync: Whether to synchronize pacman databases first using sudo pacman -Sy |
| | | """ |
| | | try: |
| | | if sync: |
| | | logger.info("Synchronizing pacman databases...") |
| | | subprocess.run( |
| | | ["sudo", "pacman", "-Sy", "--noconfirm"], |
| | | capture_output=True, |
| | | text=True, |
| | | check=True, |
| | | ) |
| | | |
| | | result = subprocess.run( |
| | | ["pacman", "-Sl"], |
| | | capture_output=True, |
| | |
| | | total_pkgs = sum(len(pkgs) for pkgs in self._pacman_cache.values()) |
| | | logger.debug(f"Cached {total_pkgs} packages from {len(self._pacman_cache)} repos") |
| | | except subprocess.CalledProcessError as e: |
| | | logger.warning(f"Failed to get pacman package list: {e}") |
| | | self._pacman_cache = {} |
| | | logger.warning(f"Failed to refresh pacman cache: {e}") |
| | | if not self._pacman_cache: |
| | | self._pacman_cache = {} |
| | | |
| | | def is_in_repos(self, name: str, include_all: bool = True) -> str | None: |
| | | """Check if package is available in repositories. |
| | |
| | | continue |
| | | if base_name in pkgs: |
| | | return repo |
| | | |
| | | # Fallback: check provides via pacman -Sp |
| | | try: |
| | | # Use pacman -Sp --noconfirm to see if pacman can resolve it |
| | | result = subprocess.run( |
| | | ["pacman", "-Sp", "--noconfirm", base_name], |
| | | capture_output=True, |
| | | text=True, |
| | | ) |
| | | if result.returncode == 0: |
| | | # Successfully resolved. Find what it resolved to. |
| | | # Output looks like: file:///var/cache/pacman/pkg/name-version... |
| | | output = result.stdout.strip().split("\n")[-1] |
| | | if output: |
| | | filename = output.split("/")[-1] |
| | | # The package name is before the version |
| | | import re |
| | | match = re.search(r"^(.*?)-[0-9].*", filename) |
| | | if match: |
| | | resolved_name = match.group(1) |
| | | # Now check which repo this resolved_name belongs to |
| | | # We already have resolved_name in our cache if it's in a repo |
| | | for repo, pkgs in self._pacman_cache.items(): |
| | | if not include_all and repo not in OFFICIAL_REPOS: |
| | | continue |
| | | if resolved_name in pkgs: |
| | | return repo |
| | | except Exception as e: |
| | | logger.debug(f"Failed to resolve provides for {base_name}: {e}") |
| | | |
| | | return None |
| | | |
| | | def is_installed(self, name: str) -> bool: |