mirror of https://github.com/Chizi123/Arch-autobuild-repo.git

Joel Grunbaum
22 hours ago cbc630823e875a56ef24775c064461d7b8f8c720
src/archrepobuild/repo.py
@@ -67,6 +67,49 @@
            cwd=self.config.repository.path,
        )
    def _vercmp(self, v1: str, v2: str) -> int:
        """Compare two version strings using vercmp.
        Returns:
            >0 if v1 > v2, <0 if v1 < v2, 0 if equal
        """
        try:
            result = subprocess.run(
                ["vercmp", v1, v2],
                capture_output=True,
                text=True,
                check=True,
            )
            return int(result.stdout.strip())
        except (subprocess.CalledProcessError, ValueError):
            return 0
    def _parse_pkg_filename(self, filename: str) -> tuple[str, str, str]:
        """Parse package name, version, and architecture from a filename.
        Args:
            filename: Package filename (e.g. name-version-rel-arch.pkg.tar.zst)
        Returns:
            Tuple of (package_name, version-release, architecture)
        """
        # Remove suffixes
        stem = filename
        for suffix in [".pkg.tar.zst", ".pkg.tar.xz", ".pkg.tar.gz", ".pkg.tar.bz2", ".pkg.tar"]:
            if stem.endswith(suffix):
                stem = stem[:-len(suffix)]
                break
        # Format: name-version-rel-arch
        parts = stem.rsplit("-", 3)
        if len(parts) == 4:
            name = parts[0]
            version = f"{parts[1]}-{parts[2]}"
            arch = parts[3]
            return name, version, arch
        return stem, "unknown", "unknown"
    def ensure_repo_exists(self) -> None:
        """Ensure repository directory and database exist."""
        self.config.repository.path.mkdir(parents=True, exist_ok=True)
@@ -81,32 +124,43 @@
            if result.returncode != 0:
                logger.warning(f"Could not create empty database: {result.stderr}")
    def add_packages(self, build_result: BuildResult) -> bool:
    def add_packages(self, build_result: BuildResult) -> list[str]:
        """Add built packages to the repository.
        Args:
            build_result: Result from package build
        Returns:
            True if packages were added successfully
            List of filenames added successfully
        """
        if build_result.status != BuildStatus.SUCCESS:
            logger.warning(f"Cannot add {build_result.package}: build was not successful")
            return False
            return []
        if not build_result.artifacts:
            logger.warning(f"No artifacts to add for {build_result.package}")
            return False
            return []
        with self._get_repo_lock():
            self.ensure_repo_exists()
            # Remove old versions of this package
            self._remove_old_packages(build_result.package)
            # Group artifacts by (name, arch) and only keep the latest version
            latest_artifacts: dict[tuple[str, str], Path] = {}
            for artifact in build_result.artifacts:
                name, version, arch = self._parse_pkg_filename(artifact.name)
                key = (name, arch)
                if key not in latest_artifacts:
                    latest_artifacts[key] = artifact
                else:
                    _, current_best_ver, _ = self._parse_pkg_filename(latest_artifacts[key].name)
                    if self._vercmp(version, current_best_ver) > 0:
                        latest_artifacts[key] = artifact
            artifacts_to_copy = list(latest_artifacts.values())
            # Copy artifacts to repo directory
            copied_files: list[Path] = []
            for artifact in build_result.artifacts:
            for artifact in artifacts_to_copy:
                dest = self.config.repository.path / artifact.name
                shutil.copy2(artifact, dest)
                copied_files.append(dest)
@@ -124,10 +178,15 @@
            if result.returncode != 0:
                logger.error(f"Failed to add packages to database: {result.stderr}")
                return False
                return []
            logger.info(f"Added {len(copied_files)} package(s) to repository")
            return True
            # Clean up old versions in repo for each package name added
            for (name, arch) in latest_artifacts.keys():
                self._remove_old_packages(name)
            added_names = [f.name for f in copied_files]
            logger.info(f"Added to repository: {', '.join(added_names)}")
            return added_names
    def remove_package(self, package: str) -> bool:
        """Remove a package from the repository.
@@ -209,15 +268,7 @@
            if f.name.endswith(".sig"):
                continue
            # Parse package name and version from filename
            # Format: name-version-rel-arch.pkg.tar.zst
            parts = f.stem.replace(".pkg.tar", "").rsplit("-", 3)
            if len(parts) >= 3:
                name = "-".join(parts[:-2])
                version = f"{parts[-2]}-{parts[-1]}" if len(parts) > 2 else parts[-1]
            else:
                name = f.stem
                version = "unknown"
            name, version, arch = self._parse_pkg_filename(f.name)
            stat = f.stat()
            packages.append(PackageInfo(