| | |
| | | cwd=self.config.repository.path, |
| | | ) |
| | | |
| | | def _vercmp(self, v1: str, v2: str) -> int: |
| | | """Compare two version strings using vercmp. |
| | | |
| | | Returns: |
| | | >0 if v1 > v2, <0 if v1 < v2, 0 if equal |
| | | """ |
| | | try: |
| | | result = subprocess.run( |
| | | ["vercmp", v1, v2], |
| | | capture_output=True, |
| | | text=True, |
| | | check=True, |
| | | ) |
| | | return int(result.stdout.strip()) |
| | | except (subprocess.CalledProcessError, ValueError): |
| | | return 0 |
| | | |
| | | def _parse_pkg_filename(self, filename: str) -> tuple[str, str]: |
| | | """Parse package name and version from a filename. |
| | | |
| | | Args: |
| | | filename: Package filename (e.g. name-version-rel-arch.pkg.tar.zst) |
| | | |
| | | Returns: |
| | | Tuple of (package_name, version-release) |
| | | """ |
| | | # Remove suffixes |
| | | stem = filename |
| | | for suffix in [".pkg.tar.zst", ".pkg.tar.xz", ".pkg.tar.gz", ".pkg.tar.bz2", ".pkg.tar"]: |
| | | if stem.endswith(suffix): |
| | | stem = stem[:-len(suffix)] |
| | | break |
| | | |
| | | # Format: name-version-rel-arch |
| | | parts = stem.rsplit("-", 3) |
| | | if len(parts) == 4: |
| | | name = parts[0] |
| | | version = f"{parts[1]}-{parts[2]}" |
| | | return name, version |
| | | |
| | | return stem, "unknown" |
| | | |
| | | def ensure_repo_exists(self) -> None: |
| | | """Ensure repository directory and database exist.""" |
| | | self.config.repository.path.mkdir(parents=True, exist_ok=True) |
| | |
| | | with self._get_repo_lock(): |
| | | self.ensure_repo_exists() |
| | | |
| | | # Remove old versions of this package |
| | | self._remove_old_packages(build_result.package) |
| | | # Group artifacts by package name and only keep the latest version |
| | | latest_artifacts: dict[str, Path] = {} |
| | | for artifact in build_result.artifacts: |
| | | name, version = self._parse_pkg_filename(artifact.name) |
| | | if name not in latest_artifacts: |
| | | latest_artifacts[name] = artifact |
| | | else: |
| | | _, current_best_ver = self._parse_pkg_filename(latest_artifacts[name].name) |
| | | if self._vercmp(version, current_best_ver) > 0: |
| | | latest_artifacts[name] = artifact |
| | | |
| | | artifacts_to_copy = list(latest_artifacts.values()) |
| | | |
| | | # Copy artifacts to repo directory |
| | | copied_files: list[Path] = [] |
| | | for artifact in build_result.artifacts: |
| | | for artifact in artifacts_to_copy: |
| | | dest = self.config.repository.path / artifact.name |
| | | shutil.copy2(artifact, dest) |
| | | copied_files.append(dest) |
| | |
| | | logger.error(f"Failed to add packages to database: {result.stderr}") |
| | | return False |
| | | |
| | | # Clean up old versions in repo for each package name added |
| | | for name in latest_artifacts.keys(): |
| | | self._remove_old_packages(name) |
| | | |
| | | logger.info(f"Added {len(copied_files)} package(s) to repository") |
| | | return True |
| | | |
| | |
| | | removed = 0 |
| | | |
| | | for f in to_remove: |
| | | f.remove() |
| | | f.unlink() |
| | | # Also remove signature |
| | | sig = f.with_suffix(f.suffix + ".sig") |
| | | if sig.exists(): |
| | | sig.remove() |
| | | sig.unlink() |
| | | removed += 1 |
| | | |
| | | if removed: |
| | |
| | | if f.name.endswith(".sig"): |
| | | continue |
| | | |
| | | # Parse package name and version from filename |
| | | # Format: name-version-rel-arch.pkg.tar.zst |
| | | parts = f.stem.replace(".pkg.tar", "").rsplit("-", 3) |
| | | if len(parts) >= 3: |
| | | name = "-".join(parts[:-2]) |
| | | version = f"{parts[-2]}-{parts[-1]}" if len(parts) > 2 else parts[-1] |
| | | else: |
| | | name = f.stem |
| | | version = "unknown" |
| | | name, version = self._parse_pkg_filename(f.name) |
| | | |
| | | stat = f.stat() |
| | | packages.append(PackageInfo( |