From 64845d732d0dac3e55a183d6a39e775503be1a80 Mon Sep 17 00:00:00 2001 From: LLM Date: Fri, 3 Apr 2026 16:42:27 +0000 Subject: [PATCH] [+] add archlinux package management module with libsolv backend 1. add models: package_desc_t, repo_index_t, vercmp_t, compile_options_t; 2. add db.py: parse pacman .db tar archives into repo_index_t; 3. add pacman.py: wrap pacman cli for listing installed, downloading .db; 4. add resolver.py: pure python dependency resolver; 5. add compile.py: fetch archive indices, resolve, output compiled reqs; 6. add solv_backend.py: libsolv-based solver with repo_store_t, solv_pool_t; 7. add cli.py: list-installed, compile, download subcommands; 8. add mypy-stubs/types-solv for solv python bindings; 9. add .gitattributes lfs tracking for test .db files; --- .gitattributes | 1 + mypy-stubs/types-solv/solv/__init__.pyi | 263 +++++++++++ .../pr34/commands_typed/archlinux/__init__.py | 0 .../pr34/commands_typed/archlinux/cli.py | 442 ++++++++++++++++++ .../pr34/commands_typed/archlinux/compile.py | 147 ++++++ .../pr34/commands_typed/archlinux/db.py | 157 +++++++ .../pr34/commands_typed/archlinux/models.py | 324 +++++++++++++ .../pr34/commands_typed/archlinux/pacman.py | 182 ++++++++ .../pr34/commands_typed/archlinux/py.typed | 0 .../pr34/commands_typed/archlinux/resolver.py | 161 +++++++ .../commands_typed/archlinux/solv_backend.py | 416 +++++++++++++++++ 11 files changed, 2093 insertions(+) create mode 100644 mypy-stubs/types-solv/solv/__init__.pyi create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/__init__.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/cli.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/compile.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/db.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/models.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/pacman.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/py.typed create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/resolver.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py diff --git a/.gitattributes b/.gitattributes index a7c9697..dc859bb 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,2 +1,3 @@ releases/tar/** filter=lfs diff=lfs merge=lfs -text releases/whl/** filter=lfs diff=lfs merge=lfs -text +python/online/fxreader/pr34/commands_typed/archlinux/tests/res/*.db filter=lfs diff=lfs merge=lfs -text diff --git a/mypy-stubs/types-solv/solv/__init__.pyi b/mypy-stubs/types-solv/solv/__init__.pyi new file mode 100644 index 0000000..81beb30 --- /dev/null +++ b/mypy-stubs/types-solv/solv/__init__.pyi @@ -0,0 +1,263 @@ +from typing import Iterator, Sequence + +REL_GT: int +REL_EQ: int +REL_LT: int +REL_AND: int +REL_OR: int +REL_WITH: int +REL_NAMESPACE: int +REL_ARCH: int +REL_FILECONFLICT: int +REL_COND: int +REL_COMPAT: int +REL_KIND: int +REL_MULTIARCH: int +REL_ELSE: int +REL_ERROR: int +REL_WITHOUT: int +REL_UNLESS: int +REL_CONDA: int + + +def xfopen(path: str, mode: str = ...) -> XFile: ... +def xfopen_fd(path: str, fd: int) -> XFile: ... + + +class XFile: + def close(self) -> None: ... + + +class Dep: + ... + + +class Pool: + DISTTYPE_RPM: int + DISTTYPE_DEB: int + DISTTYPE_ARCH: int + DISTTYPE_HAIKU: int + DISTTYPE_CONDA: int + DISTTYPE_APK: int + + installed: Repo | None + repos: list[Repo] + + def setdisttype(self, disttype: int) -> None: ... + def setarch(self, arch: str) -> None: ... + def add_repo(self, name: str) -> Repo: ... + def createwhatprovides(self) -> None: ... + def str2id(self, s: str, create: bool = ...) -> int: ... + def rel2id(self, name: int, evr: int, flags: int, create: bool = ...) -> int: ... + def dep2str(self, dep: int) -> str: ... + def id2str(self, id: int) -> str: ... + def select(self, name: str, flags: int) -> Selection: ... + def Solver(self) -> Solver: ... + def Selection(self) -> Selection: ... + def Selection_all(self) -> Selection: ... + def Job(self, how: int, what: int) -> Job: ... + def Dep(self, s: str, create: bool = ...) -> Dep: ... + def isknownarch(self, id: int) -> bool: ... + def set_debuglevel(self, level: int) -> None: ... + def set_flag(self, flag: int, value: int) -> int: ... + def get_flag(self, flag: int) -> int: ... + def set_rootdir(self, rootdir: str) -> None: ... + def get_rootdir(self) -> str: ... + def repos_iter(self) -> Iterator[Repo]: ... + def solvables_iter(self) -> Iterator[Solvable]: ... + def id2repo(self, id: int) -> Repo | None: ... + def id2solvable(self, id: int) -> Solvable: ... + + +class Repo: + REPO_EXTEND_SOLVABLES: int + REPO_LOCALPOOL: int + REPO_NO_INTERNALIZE: int + REPO_NO_LOCATION: int + REPO_REUSE_REPODATA: int + REPO_USE_LOADING: int + REPO_USE_ROOTDIR: int + SOLV_ADD_NO_STUBS: int + + name: str + nsolvables: int + id: int + pool: Pool + + def add_solvable(self) -> Solvable: ... + def add_solv(self, f: XFile, flags: int = ...) -> bool: ... + def add_rpmmd(self, f: XFile, language: str | None, flags: int = ...) -> bool: ... + def add_repomdxml(self, f: XFile, flags: int = ...) -> bool: ... + def write(self, f: XFile) -> bool: ... + def write_first_repodata(self, f: XFile) -> bool: ... + def internalize(self) -> None: ... + def isempty(self) -> bool: ... + def empty(self) -> None: ... + def free(self, reuseids: bool = ...) -> None: ... + def solvables_iter(self) -> Iterator[Solvable]: ... + def Selection(self, flags: int = ...) -> Selection: ... + + +class Solvable: + name: str + evr: str + arch: str + nameid: int + evrid: int + archid: int + id: int + repo: Repo + pool: Pool + vendor: str + vendorid: int + + def add_provides(self, dep: int) -> None: ... + def add_requires(self, dep: int) -> None: ... + def add_conflicts(self, dep: int) -> None: ... + def add_obsoletes(self, dep: int) -> None: ... + def add_recommends(self, dep: int) -> None: ... + def add_suggests(self, dep: int) -> None: ... + def add_supplements(self, dep: int) -> None: ... + def add_enhances(self, dep: int) -> None: ... + def add_deparray(self, keyname: int, dep: int, marker: int = ...) -> None: ... + def installable(self) -> bool: ... + def isinstalled(self) -> bool: ... + def identical(self, other: Solvable) -> bool: ... + def evrcmp(self, other: Solvable) -> int: ... + def matchesdep(self, keyname: int, dep: int, marker: int = ...) -> bool: ... + def lookup_str(self, keyname: int) -> str | None: ... + def lookup_num(self, keyname: int, notfound: int = ...) -> int: ... + def lookup_id(self, keyname: int) -> int: ... + def lookup_idarray(self, keyname: int, marker: int = ...) -> list[int]: ... + def lookup_deparray(self, keyname: int, marker: int = ...) -> list[Dep]: ... + def lookup_checksum(self, keyname: int) -> Chksum | None: ... + def lookup_location(self) -> tuple[str | None, int]: ... + def lookup_sourcepkg(self) -> str | None: ... + def lookup_void(self, keyname: int) -> bool: ... + def Selection(self, flags: int = ...) -> Selection: ... + def unset(self, keyname: int) -> None: ... + def __str__(self) -> str: ... + + +class Solver: + SOLVER_FLAG_ALLOW_DOWNGRADE: int + SOLVER_FLAG_ALLOW_ARCHCHANGE: int + SOLVER_FLAG_ALLOW_VENDORCHANGE: int + SOLVER_FLAG_ALLOW_UNINSTALL: int + SOLVER_FLAG_ALLOW_NAMECHANGE: int + SOLVER_FLAG_SPLITPROVIDES: int + SOLVER_FLAG_IGNORE_RECOMMENDED: int + SOLVER_FLAG_ADD_ALREADY_RECOMMENDED: int + SOLVER_FLAG_NO_INFARCHCHECK: int + SOLVER_FLAG_BEST_OBEY_POLICY: int + SOLVER_FLAG_NO_AUTOTARGET: int + SOLVER_FLAG_FOCUS_INSTALLED: int + SOLVER_FLAG_FOCUS_BEST: int + SOLVER_FLAG_INSTALL_ALSO_UPDATES: int + SOLVER_FLAG_STRICT_REPO_PRIORITY: int + + def solve(self, jobs: Sequence[Job]) -> list[Problem]: ... + def transaction(self) -> Transaction: ... + def write_testcase(self, path: str) -> bool: ... + + +class Problem: + def __str__(self) -> str: ... + + +class Selection: + SELECTION_NAME: int + SELECTION_PROVIDES: int + SELECTION_FILELIST: int + SELECTION_CANON: int + SELECTION_DOTARCH: int + SELECTION_REL: int + SELECTION_INSTALLED_ONLY: int + SELECTION_GLOB: int + SELECTION_FLAT: int + SELECTION_NOCASE: int + SELECTION_SOURCE_ONLY: int + SELECTION_WITH_SOURCE: int + SELECTION_SKIP_KIND: int + SELECTION_MATCH_DEPSTR: int + SELECTION_WITH_DISABLED: int + SELECTION_WITH_BADARCH: int + SELECTION_WITH_ALL: int + SELECTION_ADD: int + SELECTION_SUBTRACT: int + SELECTION_FILTER: int + SELECTION_FILTER_KEEP_IFEMPTY: int + SELECTION_FILTER_SWAPPED: int + + def isempty(self) -> bool: ... + def jobs(self, action: int) -> list[Job]: ... + def solvables(self) -> list[Solvable]: ... + def select(self, name: str, flags: int) -> None: ... + def add_raw(self, how: int, what: int) -> None: ... + + +class Job: + SOLVER_INSTALL: int + SOLVER_ERASE: int + SOLVER_UPDATE: int + SOLVER_WEAKENDEPS: int + SOLVER_MULTIVERSION: int + SOLVER_LOCK: int + SOLVER_DISTUPGRADE: int + SOLVER_VERIFY: int + SOLVER_DROP_ORPHANED: int + SOLVER_USERINSTALLED: int + SOLVER_ALLOWUNINSTALL: int + SOLVER_FAVOR: int + SOLVER_DISFAVOR: int + SOLVER_EXCLUDEFROMWEAK: int + SOLVER_SOLVABLE: int + SOLVER_SOLVABLE_NAME: int + SOLVER_SOLVABLE_PROVIDES: int + SOLVER_SOLVABLE_ONE_OF: int + SOLVER_SOLVABLE_REPO: int + SOLVER_SOLVABLE_ALL: int + SOLVER_SELECTMASK: int + SOLVER_JOBMASK: int + SOLVER_NOOP: int + SOLVER_WEAK: int + SOLVER_ESSENTIAL: int + SOLVER_CLEANDEPS: int + SOLVER_FORCEBEST: int + SOLVER_TARGETED: int + SOLVER_NOTBYUSER: int + SOLVER_SETEV: int + SOLVER_SETEVR: int + SOLVER_SETARCH: int + SOLVER_SETVENDOR: int + SOLVER_SETREPO: int + SOLVER_SETNAME: int + SOLVER_NOAUTOSET: int + SOLVER_SETMASK: int + + def isemptyupdate(self) -> bool: ... + + +class Transaction: + pool: Pool + + def isempty(self) -> bool: ... + def newsolvables(self) -> list[Solvable]: ... + def keptsolvables(self) -> list[Solvable]: ... + def newpackages(self) -> list[Solvable]: ... + def keptpackages(self) -> list[Solvable]: ... + def steps(self) -> list[Solvable]: ... + def steptype(self, solvable: Solvable, mode: int) -> int: ... + def othersolvable(self, solvable: Solvable) -> Solvable | None: ... + def allothersolvables(self, solvable: Solvable) -> list[Solvable]: ... + def calc_installsizechange(self) -> int: ... + def order(self, flags: int = ...) -> None: ... + def classify(self, mode: int = ...) -> list[TransactionClass]: ... + + +class TransactionClass: + ... + + +class Chksum: + ... diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/__init__.py b/python/online/fxreader/pr34/commands_typed/archlinux/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/cli.py b/python/online/fxreader/pr34/commands_typed/archlinux/cli.py new file mode 100644 index 0000000..971c012 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/cli.py @@ -0,0 +1,442 @@ +import argparse +import enum +import logging +import math +import pathlib +import re +import subprocess +import sys +import urllib.request + +from typing import ( + ClassVar, + Optional, +) + +logger = logging.getLogger(__name__) + + +class Command(enum.Enum): + list_installed = 'list-installed' + compile = 'compile' + download = 'download' + archive = 'archive' + + +class parse_rate_t: + class constants_t: + rate_re: ClassVar[re.Pattern[str]] = re.compile(r'^(\d+(?:\.\d+)?)\s*([bBkKmMgGpPtT]?)(?:[iI]?[bB])?(?:/s)?$') + + units: ClassVar[dict[str, int]] = { + '': 0, + 'b': 0, + 'B': 0, + 'k': 1, + 'K': 1, + 'm': 2, + 'M': 2, + 'g': 3, + 'G': 3, + 't': 4, + 'T': 4, + 'p': 5, + 'P': 5, + } + + @staticmethod + def parse(s: str) -> int: + m = parse_rate_t.constants_t.rate_re.match(s.strip()) + if not m: + raise ValueError('invalid rate: %s' % s) + + value = float(m.group(1)) + unit = m.group(2) + + power = parse_rate_t.constants_t.units.get(unit, 0) + + return int(value * (1024**power)) + + +class downloader_t: + class constants_t: + class backend_t(enum.Enum): + urllib = 'urllib' + curl = 'curl' + aria2c = 'aria2c' + + @staticmethod + def download( + url: str, + dest: pathlib.Path, + backend: 'downloader_t.constants_t.backend_t', + limit_rate: int, + ) -> None: + dest.parent.mkdir(parents=True, exist_ok=True) + + if backend is downloader_t.constants_t.backend_t.urllib: + urllib.request.urlretrieve(url, str(dest)) + elif backend is downloader_t.constants_t.backend_t.curl: + cmd = [ + 'curl', + '-fSL', + '--limit-rate', + '%d' % limit_rate, + '-o', + str(dest), + url, + ] + subprocess.check_call(cmd) + elif backend is downloader_t.constants_t.backend_t.aria2c: + cmd = [ + 'aria2c', + '--max-download-limit=%d' % limit_rate, + '-d', + str(dest.parent), + '-o', + dest.name, + url, + ] + subprocess.check_call(cmd) + else: + raise NotImplementedError + + +class download_requirements_t: + @staticmethod + def parse_requirements(txt: str) -> list[tuple[str, str]]: + entries: list[tuple[str, str]] = [] + url: Optional[str] = None + + for line in txt.splitlines(): + line = line.strip() + if line == '': + continue + if line.startswith('#'): + candidate = line[1:].strip() + if '/' in candidate and '://' in candidate: + url = candidate + continue + + parts = line.split() + if len(parts) == 0: + continue + + pkg_spec = parts[0] + + if url is not None: + filename = url.rsplit('/', 1)[-1] if '/' in url else pkg_spec + entries.append((url, filename)) + url = None + + return entries + + +def _find_cached_pkg( + cache_dir: pathlib.Path, + name: str, + version: str, +) -> Optional[pathlib.Path]: + """Find a cached .pkg.tar.* file for a given package name and version.""" + for suffix in ['.pkg.tar.zst', '.pkg.tar.xz', '.pkg.tar.gz', '.pkg.tar.bz2', '.pkg.tar']: + for arch in ['x86_64', 'any']: + candidate = cache_dir / ('%s-%s-%s%s' % (name, version, arch, suffix)) + if candidate.exists(): + return candidate + return None + + +def main(argv: Optional[list[str]] = None) -> int: + if argv is None: + argv = sys.argv[1:] + + logging.basicConfig(level=logging.INFO) + + parser = argparse.ArgumentParser( + prog='online-fxreader-pr34-archlinux', + description='Arch Linux package management tools', + ) + parser.add_argument( + 'command', + choices=[o.value for o in Command], + ) + + options, args = parser.parse_known_args(argv) + options.command = Command(options.command) + + if options.command is Command.list_installed: + import hashlib + + from .pacman import pacman_t + + list_parser = argparse.ArgumentParser() + list_parser.add_argument( + '--format', + choices=['plain', 'constraints', 'compiled'], + default='plain', + help='plain: name version; constraints: name>=version; compiled: name==version with optional hashes', + ) + list_parser.add_argument( + '--generate-hashes', + action='store_true', + default=False, + help='include sha256 from local /var/cache/pacman/pkg/ files; fails if file not found for any package', + ) + list_parser.add_argument( + '--db-path', + dest='db_path', + default='/var/lib/pacman', + help='pacman db path, default /var/lib/pacman', + ) + list_parser.add_argument( + '--pkg-cache-dir', + dest='pkg_cache_dir', + default='/var/cache/pacman/pkg', + help='local pacman package cache directory, default /var/cache/pacman/pkg', + ) + + list_options = list_parser.parse_args(args) + + installed = pacman_t.list_installed_simple( + db_path=pathlib.Path(list_options.db_path), + ) + + pkg_cache_dir = pathlib.Path(list_options.pkg_cache_dir) + + if list_options.format == 'plain': + for name, version in installed: + print('%s %s' % (name, version)) + elif list_options.format == 'constraints': + for name, version in installed: + print('%s>=%s' % (name, version)) + elif list_options.format == 'compiled': + missing_hashes: list[str] = [] + + for name, version in installed: + line = '%s==%s' % (name, version) + + if list_options.generate_hashes: + pkg_file = _find_cached_pkg( + pkg_cache_dir, + name, + version, + ) + + if pkg_file is not None: + h = hashlib.sha256() + with open(pkg_file, 'rb') as fh: + while True: + chunk = fh.read(65536) + if not chunk: + break + h.update(chunk) + line += ' --hash=sha256:%s' % h.hexdigest() + else: + missing_hashes.append(name) + + print(line) + + if len(missing_hashes) > 0: + logger.error( + "can't determine checksum of installed package(s) - no cached file found for %d package(s): %s" % (len(missing_hashes), missing_hashes) + ) + return 1 + + return 0 + elif options.command is Command.compile: + compile_parser = argparse.ArgumentParser() + compile_parser.add_argument( + 'packages', + nargs='*', + ) + compile_parser.add_argument( + '-r', + dest='requirements_file', + default=None, + help='path to file with package constraints (one per line)', + ) + compile_parser.add_argument( + '--index', + dest='index_url', + default=None, + help='mirror URL', + ) + compile_parser.add_argument( + '--archive-date', + dest='archive_date', + default=None, + help='Arch Linux Archive date (e.g. 2024/01/15)', + ) + compile_parser.add_argument( + '--offline', + action='store_true', + default=False, + ) + compile_parser.add_argument( + '--no-cache', + action='store_true', + default=False, + ) + compile_parser.add_argument( + '--generate-hashes', + action='store_true', + default=False, + ) + compile_parser.add_argument( + '--cache-dir', + dest='cache_dir', + default=None, + ) + compile_parser.add_argument( + '--repos', + nargs='*', + default=['core', 'extra', 'multilib'], + ) + compile_parser.add_argument( + '--arch', + default='x86_64', + ) + compile_parser.add_argument( + '--backend', + choices=['python', 'solv'], + default='solv', + ) + compile_parser.add_argument( + '--archive-cache', + dest='archive_cache', + default=None, + help='path to archive cache dir (with archlinux_cache.db from archive sync); loads all synced dates into the solver pool', + ) + compile_parser.add_argument( + '--reference', + default=None, + help='path to previously compiled requirements file to use as version pins', + ) + compile_parser.add_argument( + '--resolution-strategy', + dest='resolution_strategy', + choices=['upgrade-all', 'pin-referenced'], + default='upgrade-all', + help='upgrade-all: resolve fresh; pin-referenced: keep referenced versions, only upgrade explicitly requested packages', + ) + + compile_options = compile_parser.parse_args(args) + + from .models import compile_options_t, resolution_strategy_t + + packages: list[str] = list(compile_options.packages) + + if compile_options.requirements_file is not None: + for line in pathlib.Path(compile_options.requirements_file).read_text().splitlines(): + line = line.strip() + if line != '' and not line.startswith('#'): + packages.append(line) + + opts = compile_options_t( + packages=packages, + index_url=compile_options.index_url, + archive_date=compile_options.archive_date, + offline=compile_options.offline, + no_cache=compile_options.no_cache, + generate_hashes=compile_options.generate_hashes, + repos=compile_options.repos, + arch=compile_options.arch, + cache_dir=compile_options.cache_dir, + reference=compile_options.reference, + resolution_strategy=resolution_strategy_t(compile_options.resolution_strategy), + ) + + try: + if compile_options.backend == 'solv': + from .solv_backend import compile_solv_t, repo_store_t + + stores = None + if compile_options.archive_cache is not None: + from .cache_db import cache_db_t + + archive_cache_dir = pathlib.Path(compile_options.archive_cache) + db_path = archive_cache_dir / 'archlinux_cache.db' + if db_path.exists(): + cache_db = cache_db_t(db_path) + indices = cache_db.load_all_indices() + cache_db.close() + stores = [repo_store_t(index=idx) for idx in indices] + + result = compile_solv_t.compile(opts, stores=stores) + else: + from .compile import compile_t + + result = compile_t.compile(opts) + except RuntimeError as e: + logger.error(str(e)) + return 1 + + print(result.txt) + + return 0 + elif options.command is Command.download: + download_parser = argparse.ArgumentParser() + download_parser.add_argument( + '-r', + dest='requirements', + required=True, + help='path to compiled requirements file', + ) + download_parser.add_argument( + '-d', + dest='dest_dir', + required=True, + help='destination directory for downloaded packages', + ) + download_parser.add_argument( + '--downloader', + choices=[o.value for o in downloader_t.constants_t.backend_t], + default='urllib', + ) + download_parser.add_argument( + '--limit-rate', + dest='limit_rate', + default='128KiB/s', + help='download speed limit (e.g. 128KiB/s, 1MiB/s, 512K), default 128KiB/s', + ) + + download_options = download_parser.parse_args(args) + + dest_dir = pathlib.Path(download_options.dest_dir) + dest_dir.mkdir(parents=True, exist_ok=True) + + backend = downloader_t.constants_t.backend_t(download_options.downloader) + limit_rate = parse_rate_t.parse(download_options.limit_rate) + + requirements_txt = pathlib.Path(download_options.requirements).read_text() + entries = download_requirements_t.parse_requirements(requirements_txt) + + count = 0 + for url, filename in entries: + dest_path = dest_dir / filename + + if dest_path.exists(): + logger.info(dict(msg='already downloaded', path=str(dest_path))) + else: + logger.info(dict(msg='downloading', url=url, dest=str(dest_path), backend=backend.value, limit_rate=limit_rate)) + downloader_t.download( + url=url, + dest=dest_path, + backend=backend, + limit_rate=limit_rate, + ) + + count += 1 + + logger.info(dict(msg='download complete', count=count)) + + return 0 + elif options.command is Command.archive: + from . import archive as _archive + + return _archive.main(args) + else: + raise NotImplementedError + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/compile.py b/python/online/fxreader/pr34/commands_typed/archlinux/compile.py new file mode 100644 index 0000000..b3ea697 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/compile.py @@ -0,0 +1,147 @@ +import io +import hashlib +import pathlib +import tempfile +import logging + +from typing import ( + Optional, + Any, +) + +from .models import ( + compile_options_t, + compile_entry_t, + compile_result_t, + mirror_config_t, + repo_index_t, +) + +from .db import db_parser_t +from .pacman import pacman_t +from .resolver import resolver_t + +logger = logging.getLogger(__name__) + + +class compile_t: + @staticmethod + def build_mirror_config(options: compile_options_t) -> mirror_config_t: + if options.archive_date is not None: + return mirror_config_t.from_archive_date( + date=options.archive_date, + repos=options.repos, + arch=options.arch, + ) + elif options.index_url is not None: + return mirror_config_t.from_mirror_url( + mirror_url=options.index_url, + repos=options.repos, + arch=options.arch, + ) + else: + return mirror_config_t.from_mirror_url( + mirror_url='https://archive.archlinux.org/repos/last', + repos=options.repos, + arch=options.arch, + ) + + @staticmethod + def fetch_indices( + mirror: mirror_config_t, + cache_dir: Optional[pathlib.Path] = None, + no_cache: bool = False, + offline: bool = False, + ) -> list[repo_index_t]: + indices: list[repo_index_t] = [] + + for repo in mirror.repos: + db_url = '%s/%s.db' % (repo.url, repo.name) + + if cache_dir is not None and not no_cache: + cached_path = cache_dir / ('%s.db' % repo.name) + + if cached_path.exists(): + logger.info( + dict( + repo=repo.name, + msg='using cached db', + path=str(cached_path), + ) + ) + index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name) + indices.append(index) + continue + + if offline: + raise FileNotFoundError('offline mode: cached db not found for %s at %s' % (repo.name, str(cached_path))) + + pacman_t.download_db(db_url, cached_path) + index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name) + indices.append(index) + else: + if offline: + raise FileNotFoundError('offline mode requires --cache-dir with pre-fetched db files') + + with tempfile.NamedTemporaryFile(suffix='.db') as tmp: + pacman_t.download_db(db_url, pathlib.Path(tmp.name)) + index = db_parser_t.parse_db_path(pathlib.Path(tmp.name), repo_name=repo.name) + indices.append(index) + + return indices + + @staticmethod + def compile( + options: compile_options_t, + ) -> compile_result_t.res_t: + mirror = compile_t.build_mirror_config(options) + + cache_dir: Optional[pathlib.Path] = None + if options.cache_dir is not None: + cache_dir = pathlib.Path(options.cache_dir) + cache_dir.mkdir(parents=True, exist_ok=True) + + indices = compile_t.fetch_indices( + mirror=mirror, + cache_dir=cache_dir, + no_cache=options.no_cache, + offline=options.offline, + ) + + resolved = resolver_t.resolve( + packages=options.packages, + indices=indices, + ) + + result = compile_result_t.res_t() + + for pkg_name in resolved.resolution_order: + pkg = resolved.resolved[pkg_name] + + repo_name = '' + for idx in indices: + if pkg_name in idx.packages: + repo_name = idx.name + break + + repo_url = '' + for repo_cfg in mirror.repos: + if repo_cfg.name == repo_name: + repo_url = repo_cfg.url + break + + entry = compile_entry_t( + name=pkg.name, + version=pkg.version, + filename=pkg.filename, + repo=repo_name, + url='%s/%s' % (repo_url, pkg.filename) if repo_url and pkg.filename else '', + sha256=pkg.sha256sum if options.generate_hashes else '', + depends=pkg.depends, + ) + + result.entries.append(entry) + + result.txt = result.to_txt() + + return result diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/db.py b/python/online/fxreader/pr34/commands_typed/archlinux/db.py new file mode 100644 index 0000000..715d358 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/db.py @@ -0,0 +1,157 @@ +import io +import re +import tarfile +import logging +import pathlib + +from typing import ( + ClassVar, + Optional, + Any, + BinaryIO, +) + +from .models import ( + package_desc_t, + repo_index_t, +) + +logger = logging.getLogger(__name__) + + +class db_parser_t: + class constants_t: + field_re: ClassVar[re.Pattern[str]] = re.compile(r'^%([A-Z0-9]+)%$') + + list_fields: ClassVar[set[str]] = { + 'LICENSE', + 'DEPENDS', + 'OPTDEPENDS', + 'MAKEDEPENDS', + 'CHECKDEPENDS', + 'PROVIDES', + 'CONFLICTS', + 'REPLACES', + 'GROUPS', + } + + field_map: ClassVar[dict[str, str]] = { + 'FILENAME': 'filename', + 'NAME': 'name', + 'VERSION': 'version', + 'DESC': 'desc', + 'CSIZE': 'csize', + 'ISIZE': 'isize', + 'MD5SUM': 'md5sum', + 'SHA256SUM': 'sha256sum', + 'URL': 'url', + 'ARCH': 'arch', + 'BUILDDATE': 'builddate', + 'PACKAGER': 'packager', + 'LICENSE': 'license', + 'DEPENDS': 'depends', + 'OPTDEPENDS': 'optdepends', + 'MAKEDEPENDS': 'makedepends', + 'CHECKDEPENDS': 'checkdepends', + 'PROVIDES': 'provides', + 'CONFLICTS': 'conflicts', + 'REPLACES': 'replaces', + 'GROUPS': 'groups', + 'BASE': 'base', + } + + int_fields: ClassVar[set[str]] = { + 'CSIZE', + 'ISIZE', + 'BUILDDATE', + } + + @staticmethod + def parse_desc(content: str) -> package_desc_t: + fields: dict[str, Any] = {} + lines = content.split('\n') + i = 0 + + while i < len(lines): + line = lines[i].strip() + + if line == '': + i += 1 + continue + + m = db_parser_t.constants_t.field_re.match(line) + if not m: + i += 1 + continue + + field_name = m.group(1) + i += 1 + + values: list[str] = [] + while i < len(lines) and lines[i].strip() != '': + values.append(lines[i].strip()) + i += 1 + + attr_name = db_parser_t.constants_t.field_map.get(field_name) + if attr_name is None: + continue + + if field_name in db_parser_t.constants_t.list_fields: + fields[attr_name] = values + elif field_name in db_parser_t.constants_t.int_fields: + fields[attr_name] = int(values[0]) if len(values) > 0 else 0 + else: + fields[attr_name] = values[0] if len(values) > 0 else '' + + if 'name' not in fields or 'version' not in fields: + raise ValueError('desc missing NAME or VERSION') + + return package_desc_t(**fields) + + @staticmethod + def parse_db( + f: BinaryIO, + repo_name: str = '', + ) -> repo_index_t: + index = repo_index_t(name=repo_name) + + with tarfile.open(fileobj=f, mode='r:*') as tar: + desc_members: list[tarfile.TarInfo] = [] + + for member in tar.getmembers(): + if member.name.endswith('/desc') and member.isfile(): + desc_members.append(member) + + for member in desc_members: + extracted = tar.extractfile(member) + if extracted is None: + continue + + content = extracted.read().decode('utf-8') + extracted.close() + + try: + pkg = db_parser_t.parse_desc(content) + index.packages[pkg.name] = pkg + except ValueError: + logger.warning( + dict( + member=member.name, + msg='failed to parse desc', + ) + ) + + index.build_provides_index() + + return index + + @staticmethod + def parse_db_path( + path: pathlib.Path, + repo_name: Optional[str] = None, + ) -> repo_index_t: + if repo_name is None: + repo_name = path.stem.split('.')[0] + + with io.open(path, 'rb') as f: + return db_parser_t.parse_db(f, repo_name=repo_name) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/models.py b/python/online/fxreader/pr34/commands_typed/archlinux/models.py new file mode 100644 index 0000000..85d77f9 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/models.py @@ -0,0 +1,324 @@ +import dataclasses +import enum +import re +import logging + +from typing import ( + ClassVar, + Optional, + Literal, + Any, +) + +logger = logging.getLogger(__name__) + + +class vercmp_t: + class constants_t: + epoch_re: ClassVar[re.Pattern[str]] = re.compile(r'^(\d+):(.+)$') + rel_re: ClassVar[re.Pattern[str]] = re.compile(r'^(.+)-(\d+)$') + segment_re: ClassVar[re.Pattern[str]] = re.compile(r'(\d+|[a-zA-Z]+)') + + @staticmethod + def split_evr(version: str) -> tuple[int, str, str]: + epoch = 0 + m = vercmp_t.constants_t.epoch_re.match(version) + if m: + epoch = int(m.group(1)) + version = m.group(2) + + rel = '0' + m = vercmp_t.constants_t.rel_re.match(version) + if m: + version = m.group(1) + rel = m.group(2) + + return (epoch, version, rel) + + @staticmethod + def compare_segment(a: str, b: str) -> int: + a_segments = vercmp_t.constants_t.segment_re.findall(a) + b_segments = vercmp_t.constants_t.segment_re.findall(b) + + for sa, sb in zip(a_segments, b_segments): + a_digit = sa.isdigit() + b_digit = sb.isdigit() + + if a_digit and b_digit: + ia, ib = int(sa), int(sb) + if ia != ib: + return 1 if ia > ib else -1 + elif a_digit: + return 1 + elif b_digit: + return -1 + else: + if sa != sb: + return 1 if sa > sb else -1 + + if len(a_segments) != len(b_segments): + return 1 if len(a_segments) > len(b_segments) else -1 + + return 0 + + @staticmethod + def vercmp(a: str, b: str) -> int: + if a == b: + return 0 + + a_epoch, a_ver, a_rel = vercmp_t.split_evr(a) + b_epoch, b_ver, b_rel = vercmp_t.split_evr(b) + + if a_epoch != b_epoch: + return 1 if a_epoch > b_epoch else -1 + + ret = vercmp_t.compare_segment(a_ver, b_ver) + if ret != 0: + return ret + + return vercmp_t.compare_segment(a_rel, b_rel) + + +class constraint_op_t(enum.Enum): + eq = '=' + ge = '>=' + le = '<=' + gt = '>' + lt = '<' + + +class package_constraint_t: + class constants_t: + constraint_re: ClassVar[re.Pattern[str]] = re.compile(r'^([a-zA-Z0-9@._+\-]+?)(?:(>=|<=|>|<|=)(.+))?$') + + def __init__( + self, + name: str, + op: Optional[constraint_op_t] = None, + version: Optional[str] = None, + ) -> None: + self.name = name + self.op = op + self.version = version + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, package_constraint_t): + return NotImplemented + return self.name == other.name and self.op == other.op and self.version == other.version + + def __hash__(self) -> int: + return hash((self.name, self.op, self.version)) + + def __repr__(self) -> str: + if self.op is None or self.version is None: + return 'package_constraint_t(%r)' % self.name + return 'package_constraint_t(%r, %r, %r)' % (self.name, self.op.value, self.version) + + @staticmethod + def parse(s: str) -> 'package_constraint_t': + m = package_constraint_t.constants_t.constraint_re.match(s.strip()) + if not m: + raise ValueError('invalid constraint: %s' % s) + + name = m.group(1) + op_str = m.group(2) + version = m.group(3) + + op: Optional[constraint_op_t] = None + if op_str: + op = constraint_op_t(op_str) + + return package_constraint_t( + name=name, + op=op, + version=version, + ) + + def satisfied_by(self, version: str) -> bool: + if self.op is None or self.version is None: + return True + + constraint_ver = self.version + candidate_ver = version + + if '-' not in constraint_ver and '-' in candidate_ver: + candidate_ver = vercmp_t.constants_t.rel_re.sub(r'\1', candidate_ver) + + cmp = vercmp_t.vercmp(candidate_ver, constraint_ver) + + if self.op is constraint_op_t.eq: + return cmp == 0 + elif self.op is constraint_op_t.ge: + return cmp >= 0 + elif self.op is constraint_op_t.le: + return cmp <= 0 + elif self.op is constraint_op_t.gt: + return cmp > 0 + elif self.op is constraint_op_t.lt: + return cmp < 0 + else: + raise NotImplementedError + + def to_str(self) -> str: + if self.op is None or self.version is None: + return self.name + return '%s%s%s' % (self.name, self.op.value, self.version) + + +@dataclasses.dataclass +class package_desc_t: + name: str + version: str + desc: str = '' + filename: str = '' + csize: int = 0 + isize: int = 0 + md5sum: str = '' + sha256sum: str = '' + url: str = '' + arch: str = '' + builddate: int = 0 + packager: str = '' + license: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + depends: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + optdepends: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + makedepends: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + checkdepends: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + provides: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + conflicts: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + replaces: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + groups: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + base: str = '' + + def parsed_depends(self) -> list[package_constraint_t]: + return [package_constraint_t.parse(d) for d in self.depends] + + def parsed_provides(self) -> list[package_constraint_t]: + return [package_constraint_t.parse(p) for p in self.provides] + + def parsed_conflicts(self) -> list[package_constraint_t]: + return [package_constraint_t.parse(c) for c in self.conflicts] + + +@dataclasses.dataclass +class repo_config_t: + name: str + url: str + + +@dataclasses.dataclass +class mirror_config_t: + repos: list[repo_config_t] = dataclasses.field(default_factory=lambda: list[repo_config_t]()) + arch: str = 'x86_64' + + @staticmethod + def from_archive_date( + date: str, + repos: Optional[list[str]] = None, + arch: str = 'x86_64', + ) -> 'mirror_config_t': + if repos is None: + repos = ['core', 'extra', 'multilib'] + + base_url = 'https://archive.archlinux.org/repos/%s' % date + + return mirror_config_t( + repos=[ + repo_config_t( + name=r, + url='%s/%s/os/%s' % (base_url, r, arch), + ) + for r in repos + ], + arch=arch, + ) + + @staticmethod + def from_mirror_url( + mirror_url: str, + repos: Optional[list[str]] = None, + arch: str = 'x86_64', + ) -> 'mirror_config_t': + if repos is None: + repos = ['core', 'extra', 'multilib'] + + return mirror_config_t( + repos=[ + repo_config_t( + name=r, + url='%s/%s/os/%s' % (mirror_url.rstrip('/'), r, arch), + ) + for r in repos + ], + arch=arch, + ) + + +@dataclasses.dataclass +class repo_index_t: + name: str + packages: dict[str, package_desc_t] = dataclasses.field(default_factory=lambda: dict[str, package_desc_t]()) + provides_index: dict[str, list[str]] = dataclasses.field(default_factory=lambda: dict[str, list[str]]()) + groups_index: dict[str, list[str]] = dataclasses.field(default_factory=lambda: dict[str, list[str]]()) + + def build_provides_index(self) -> None: + self.provides_index = {} + self.groups_index = {} + for pkg_name, pkg in self.packages.items(): + for prov in pkg.provides: + prov_constraint = package_constraint_t.parse(prov) + if prov_constraint.name not in self.provides_index: + self.provides_index[prov_constraint.name] = [] + self.provides_index[prov_constraint.name].append(pkg_name) + for group in pkg.groups: + if group not in self.groups_index: + self.groups_index[group] = [] + self.groups_index[group].append(pkg_name) + + +class resolution_strategy_t(enum.Enum): + upgrade_all = 'upgrade-all' + pin_referenced = 'pin-referenced' + + +@dataclasses.dataclass +class compile_options_t: + packages: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + index_url: Optional[str] = None + archive_date: Optional[str] = None + offline: bool = False + no_cache: bool = False + generate_hashes: bool = False + repos: list[str] = dataclasses.field(default_factory=lambda: ['core', 'extra', 'multilib']) + arch: str = 'x86_64' + cache_dir: Optional[str] = None + reference: Optional[str] = None + resolution_strategy: resolution_strategy_t = resolution_strategy_t.upgrade_all + + +@dataclasses.dataclass +class compile_entry_t: + name: str + version: str + filename: str + repo: str + url: str + sha256: str = '' + depends: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + + +class compile_result_t: + @dataclasses.dataclass + class res_t: + entries: list[compile_entry_t] = dataclasses.field(default_factory=lambda: list[compile_entry_t]()) + txt: str = '' + + def to_txt(self) -> str: + lines: list[str] = [] + for e in sorted(self.entries, key=lambda x: x.name): + line = '%s==%s' % (e.name, e.version) + if e.sha256: + line += ' --hash=sha256:%s' % e.sha256 + lines.append('# %s' % e.url if e.url else '# %s/%s' % (e.repo, e.filename)) + lines.append(line) + return '\n'.join(lines) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/pacman.py b/python/online/fxreader/pr34/commands_typed/archlinux/pacman.py new file mode 100644 index 0000000..3caf8ec --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/pacman.py @@ -0,0 +1,182 @@ +import re +import subprocess +import pathlib +import dataclasses +import logging + +from typing import ( + ClassVar, + Optional, + Any, +) + +from .models import ( + package_desc_t, +) + +logger = logging.getLogger(__name__) + + +class pacman_t: + class constants_t: + default_db_path: ClassVar[pathlib.Path] = pathlib.Path('/var/lib/pacman') + default_cache_dir: ClassVar[pathlib.Path] = pathlib.Path('/var/cache/pacman/pkg') + field_re: ClassVar[re.Pattern[str]] = re.compile(r'^([A-Za-z ]+?)\s*:\s*(.*)$') + + @dataclasses.dataclass + class query_entry_t: + name: str + version: str + description: str = '' + architecture: str = '' + url: str = '' + depends_on: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + provides: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + conflicts_with: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + replaces: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + install_size: str = '' + packager: str = '' + groups: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + + class list_installed_t: + @dataclasses.dataclass + class res_t: + packages: list['pacman_t.query_entry_t'] = dataclasses.field(default_factory=lambda: list[pacman_t.query_entry_t]()) + + @staticmethod + def parse_info_block(block: str) -> 'pacman_t.query_entry_t': + fields: dict[str, list[str]] = {} + current_key: Optional[str] = None + + for line in block.split('\n'): + m = pacman_t.constants_t.field_re.match(line) + if m: + current_key = m.group(1).strip() + value = m.group(2).strip() + assert isinstance(current_key, str) + if current_key not in fields: + fields[current_key] = [] + if value and value != 'None': + fields[current_key].append(value) + elif current_key and line.startswith(' '): + value = line.strip() + if value and value != 'None': + fields[current_key].append(value) + + name = fields.get('Name', [''])[0] + version = fields.get('Version', [''])[0] + + if not name or not version: + raise ValueError('missing Name or Version in block') + + return pacman_t.query_entry_t( + name=name, + version=version, + description=fields.get('Description', [''])[0] if fields.get('Description') else '', + architecture=fields.get('Architecture', [''])[0] if fields.get('Architecture') else '', + url=fields.get('URL', [''])[0] if fields.get('URL') else '', + depends_on=fields.get('Depends On', []), + provides=fields.get('Provides', []), + conflicts_with=fields.get('Conflicts With', []), + replaces=fields.get('Replaces', []), + install_size=fields.get('Installed Size', [''])[0] if fields.get('Installed Size') else '', + packager=fields.get('Packager', [''])[0] if fields.get('Packager') else '', + groups=fields.get('Groups', []), + ) + + @staticmethod + def list_installed( + db_path: Optional[pathlib.Path] = None, + ) -> 'pacman_t.list_installed_t.res_t': + cmd: list[str] = ['pacman', '-Qi'] + + if db_path is not None: + cmd.extend(['--dbpath', str(db_path)]) + + output = subprocess.check_output( + cmd, + stderr=subprocess.DEVNULL, + ).decode('utf-8') + + blocks = output.split('\n\n') + result = pacman_t.list_installed_t.res_t() + + for block in blocks: + block = block.strip() + if not block: + continue + + try: + entry = pacman_t.parse_info_block(block) + result.packages.append(entry) + except ValueError: + logger.warning( + dict( + msg='failed to parse pacman info block', + ) + ) + + return result + + @staticmethod + def list_installed_simple( + db_path: Optional[pathlib.Path] = None, + ) -> list[tuple[str, str]]: + cmd: list[str] = ['pacman', '-Q'] + + if db_path is not None: + cmd.extend(['--dbpath', str(db_path)]) + + output = subprocess.check_output( + cmd, + stderr=subprocess.DEVNULL, + ).decode('utf-8') + + result: list[tuple[str, str]] = [] + + for line in output.strip().split('\n'): + parts = line.strip().split(None, 1) + if len(parts) == 2: + result.append((parts[0], parts[1])) + + return result + + @staticmethod + def sync_db( + mirror_url: str, + db_path: pathlib.Path, + repos: Optional[list[str]] = None, + ) -> None: + if repos is None: + repos = ['core', 'extra', 'multilib'] + + cmd: list[str] = [ + 'pacman', + '-Sy', + '--dbpath', + str(db_path), + ] + + subprocess.check_call(cmd) + + @staticmethod + def download_db( + url: str, + output_path: pathlib.Path, + ) -> None: + import urllib.request + + logger.info( + dict( + url=url, + output_path=str(output_path), + msg='downloading db', + ) + ) + + output_path.parent.mkdir(parents=True, exist_ok=True) + + urllib.request.urlretrieve( + url, + str(output_path), + ) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/py.typed b/python/online/fxreader/pr34/commands_typed/archlinux/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/resolver.py b/python/online/fxreader/pr34/commands_typed/archlinux/resolver.py new file mode 100644 index 0000000..7849009 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/resolver.py @@ -0,0 +1,161 @@ +import dataclasses +import logging + +from typing import ( + Optional, + Any, +) + +from .models import ( + package_desc_t, + package_constraint_t, + repo_index_t, + vercmp_t, +) + +logger = logging.getLogger(__name__) + + +class resolver_t: + class error_t: + class not_found_t(Exception): + def __init__(self, name: str) -> None: + self.name = name + super().__init__('package not found: %s' % name) + + class conflict_t(Exception): + def __init__(self, pkg_a: str, pkg_b: str, constraint: str) -> None: + self.pkg_a = pkg_a + self.pkg_b = pkg_b + self.constraint = constraint + super().__init__('conflict: %s conflicts with %s (%s)' % (pkg_a, pkg_b, constraint)) + + class unsatisfied_t(Exception): + def __init__(self, parent: str, dep: str) -> None: + self.parent = parent + self.dep = dep + super().__init__('unsatisfied dependency: %s requires %s' % (parent, dep)) + + @dataclasses.dataclass + class res_t: + resolved: dict[str, package_desc_t] = dataclasses.field(default_factory=lambda: dict[str, package_desc_t]()) + resolution_order: list[str] = dataclasses.field(default_factory=lambda: list[str]()) + + @staticmethod + def _find_provider( + constraint: package_constraint_t, + indices: list[repo_index_t], + ) -> Optional[tuple[package_desc_t, str]]: + for index in indices: + if constraint.name in index.packages: + pkg = index.packages[constraint.name] + if constraint.satisfied_by(pkg.version): + return (pkg, index.name) + + for index in indices: + if constraint.name in index.provides_index: + for provider_name in index.provides_index[constraint.name]: + pkg = index.packages[provider_name] + for prov in pkg.parsed_provides(): + if prov.name == constraint.name: + if constraint.version is None or prov.version is None: + return (pkg, index.name) + if constraint.satisfied_by(prov.version): + return (pkg, index.name) + + return None + + @staticmethod + def resolve( + packages: list[str], + indices: list[repo_index_t], + skip_installed: Optional[set[str]] = None, + ) -> 'resolver_t.res_t': + if skip_installed is None: + skip_installed = set() + + result = resolver_t.res_t() + visited: set[str] = set() + stack: list[tuple[package_constraint_t, Optional[str]]] = [] + + for pkg_str in packages: + constraint = package_constraint_t.parse(pkg_str) + stack.append((constraint, None)) + + while len(stack) > 0: + constraint, parent = stack.pop() + + if constraint.name in visited: + if constraint.name in result.resolved: + pkg = result.resolved[constraint.name] + if not constraint.satisfied_by(pkg.version): + raise resolver_t.error_t.unsatisfied_t( + parent=parent or '', + dep=constraint.to_str(), + ) + continue + + if constraint.name in skip_installed: + visited.add(constraint.name) + continue + + found = resolver_t._find_provider(constraint, indices) + + if found is None: + exists = any(constraint.name in idx.packages or constraint.name in idx.provides_index for idx in indices) + if exists: + raise resolver_t.error_t.unsatisfied_t( + parent=parent or '', + dep=constraint.to_str(), + ) + raise resolver_t.error_t.not_found_t(constraint.name) + + pkg, repo_name = found + + if pkg.name in visited: + if pkg.name in result.resolved and constraint.op is not None: + resolved_pkg = result.resolved[pkg.name] + if constraint.name == resolved_pkg.name: + if not constraint.satisfied_by(resolved_pkg.version): + raise resolver_t.error_t.unsatisfied_t( + parent=parent or '', + dep=constraint.to_str(), + ) + else: + matched = False + for prov in resolved_pkg.parsed_provides(): + if prov.name == constraint.name: + if prov.version is not None and constraint.satisfied_by(prov.version): + matched = True + break + elif prov.version is None: + matched = True + break + if not matched: + raise resolver_t.error_t.unsatisfied_t( + parent=parent or '', + dep=constraint.to_str(), + ) + continue + + visited.add(pkg.name) + visited.add(constraint.name) + + result.resolved[pkg.name] = pkg + result.resolution_order.append(pkg.name) + + for conflict in pkg.parsed_conflicts(): + if conflict.name in result.resolved: + resolved_version = result.resolved[conflict.name].version + if conflict.satisfied_by(resolved_version): + raise resolver_t.error_t.conflict_t( + pkg_a=pkg.name, + pkg_b=conflict.name, + constraint=conflict.to_str(), + ) + + for dep in pkg.parsed_depends(): + if dep.name not in visited and dep.name not in skip_installed: + stack.append((dep, pkg.name)) + + return result diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py b/python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py new file mode 100644 index 0000000..3c89b80 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py @@ -0,0 +1,416 @@ +import hashlib +import io +import logging +import pathlib +import re + +from typing import ( + ClassVar, + Optional, + Any, +) + +from .models import ( + package_desc_t, + repo_index_t, + compile_options_t, + compile_entry_t, + compile_result_t, + mirror_config_t, + resolution_strategy_t, +) + +from .db import db_parser_t +from .compile import compile_t as compile_base_t + +logger = logging.getLogger(__name__) + + +class repo_store_t: + class constants_t: + checksum_filename: ClassVar[str] = 'checksum.sha256' + + def __init__( + self, + index: repo_index_t, + db_checksum: str = '', + ) -> None: + self.index = index + self.db_checksum = db_checksum + + @staticmethod + def _file_checksum(path: pathlib.Path) -> str: + h = hashlib.sha256() + with io.open(path, 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + h.update(chunk) + return h.hexdigest() + + @staticmethod + def from_db( + db_path: pathlib.Path, + repo_name: Optional[str] = None, + cache_dir: Optional[pathlib.Path] = None, + ) -> 'repo_store_t': + if repo_name is None: + repo_name = db_path.stem.split('.')[0] + + db_checksum = repo_store_t._file_checksum(db_path) + + if cache_dir is not None: + solv_cache_path = cache_dir / ('%s.solv' % repo_name) + checksum_path = cache_dir / ('%s.solv.sha256' % repo_name) + index_cache_path = cache_dir / ('%s.index.solv' % repo_name) + + if solv_cache_path.exists() and checksum_path.exists(): + stored_checksum = checksum_path.read_text().strip() + if stored_checksum == db_checksum: + logger.info( + dict( + repo=repo_name, + msg='using cached solv', + path=str(solv_cache_path), + ) + ) + + index = db_parser_t.parse_db_path(db_path, repo_name=repo_name) + + return repo_store_t( + index=index, + db_checksum=db_checksum, + ) + + index = db_parser_t.parse_db_path(db_path, repo_name=repo_name) + + return repo_store_t( + index=index, + db_checksum=db_checksum, + ) + + def write_solv_cache( + self, + cache_dir: pathlib.Path, + solv_repo: Any, + ) -> None: + import solv + + cache_dir.mkdir(parents=True, exist_ok=True) + solv_cache_path = cache_dir / ('%s.solv' % self.index.name) + checksum_path = cache_dir / ('%s.solv.sha256' % self.index.name) + + f = solv.xfopen(str(solv_cache_path), 'w') + solv_repo.write(f) + f.close() + + checksum_path.write_text(self.db_checksum) + + logger.info( + dict( + repo=self.index.name, + msg='wrote solv cache', + path=str(solv_cache_path), + size=solv_cache_path.stat().st_size, + ) + ) + + +class solv_pool_t: + class constants_t: + dep_re: ClassVar[re.Pattern[str]] = re.compile(r'^([a-zA-Z0-9@._+\-]+?)(?:(>=|<=|>|<|=)(.+))?$') + + def __init__( + self, + stores: Optional[list[repo_store_t]] = None, + cache_dir: Optional[pathlib.Path] = None, + ) -> None: + import solv + + self._solv = solv + self._pool = solv.Pool() + self._pool.setdisttype(solv.Pool.DISTTYPE_ARCH) + self._pool.setarch('x86_64') + self._rel_map = { + '>=': solv.REL_GT | solv.REL_EQ, + '<=': solv.REL_LT | solv.REL_EQ, + '>': solv.REL_GT, + '<': solv.REL_LT, + '=': solv.REL_EQ, + } + self._stores: list[repo_store_t] = [] + + if stores is not None: + for store in stores: + self.add_store(store, cache_dir=cache_dir) + self.finalize() + + def _parse_dep(self, dep_str: str) -> Any: + m = solv_pool_t.constants_t.dep_re.match(dep_str.strip()) + if not m: + return self._pool.str2id(dep_str) + + name = m.group(1) + op = m.group(2) + ver = m.group(3) + + name_id = self._pool.str2id(name) + + if op and ver: + ver_id = self._pool.str2id(ver) + return self._pool.rel2id(name_id, ver_id, self._rel_map[op]) + + return name_id + + def add_store( + self, + store: repo_store_t, + cache_dir: Optional[pathlib.Path] = None, + ) -> None: + solv = self._solv + + self._stores.append(store) + + loaded_from_cache = False + + if cache_dir is not None: + solv_cache_path = cache_dir / ('%s.solv' % store.index.name) + checksum_path = cache_dir / ('%s.solv.sha256' % store.index.name) + + if solv_cache_path.exists() and checksum_path.exists(): + stored_checksum = checksum_path.read_text().strip() + if stored_checksum == store.db_checksum: + repo = self._pool.add_repo(store.index.name) + f = solv.xfopen(str(solv_cache_path)) + repo.add_solv(f) + f.close() + loaded_from_cache = True + + logger.info( + dict( + repo=store.index.name, + msg='loaded solv from cache', + solvables=repo.nsolvables, + ) + ) + + if not loaded_from_cache: + repo = self._pool.add_repo(store.index.name) + for pkg in store.index.packages.values(): + s = repo.add_solvable() + s.name = pkg.name + s.evr = pkg.version + s.arch = 'noarch' if pkg.arch == 'any' else (pkg.arch or 'x86_64') + + for dep_str in pkg.depends: + s.add_requires(self._parse_dep(dep_str)) + + for prov_str in pkg.provides: + s.add_provides(self._parse_dep(prov_str)) + + s.add_provides(self._pool.rel2id(s.nameid, s.evrid, solv.REL_EQ)) + + for conf_str in pkg.conflicts: + s.add_conflicts(self._parse_dep(conf_str)) + + repo.internalize() + + if cache_dir is not None: + store.write_solv_cache(cache_dir, repo) + + def finalize(self) -> None: + self._pool.createwhatprovides() + + class resolve_t: + class res_t: + def __init__(self) -> None: + self.resolved: dict[str, Any] = {} + self.problems: list[str] = [] + + def expand_groups( + self, + packages: list[str], + ) -> list[str]: + expanded: list[str] = [] + for pkg_name in packages: + found_group = False + for store in self._stores: + if pkg_name in store.index.groups_index: + expanded.extend(store.index.groups_index[pkg_name]) + found_group = True + break + if not found_group: + expanded.append(pkg_name) + return expanded + + @staticmethod + def parse_reference(txt: str) -> dict[str, str]: + pinned: dict[str, str] = {} + for line in txt.splitlines(): + line = line.strip() + if line == '' or line.startswith('#'): + continue + parts = line.split() + pkg_spec = parts[0] + if '==' in pkg_spec: + name, version = pkg_spec.split('==', 1) + pinned[name] = version + return pinned + + def resolve( + self, + packages: list[str], + expand_groups: bool = True, + pinned: Optional[dict[str, str]] = None, + upgrade_packages: Optional[list[str]] = None, + ) -> 'solv_pool_t.resolve_t.res_t': + solv = self._solv + + if expand_groups: + packages = self.expand_groups(packages) + + result = solv_pool_t.resolve_t.res_t() + + solver = self._pool.Solver() + jobs: list[Any] = [] + + upgrade_set: set[str] = set() + if upgrade_packages is not None: + if expand_groups: + upgrade_packages = self.expand_groups(upgrade_packages) + upgrade_set = set(upgrade_packages) + + for pkg_spec in packages: + pkg_name = pkg_spec.split('>=')[0].split('<=')[0].split('>')[0].split('<')[0].split('=')[0] + + if pinned is not None and pkg_name in pinned and pkg_name not in upgrade_set: + pinned_spec = '%s=%s' % (pkg_name, pinned[pkg_name]) + dep = self._parse_dep(pinned_spec) + jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep)) + else: + dep = self._parse_dep(pkg_spec) + + sel = self._pool.select(pkg_name, solv.Selection.SELECTION_NAME | solv.Selection.SELECTION_PROVIDES) + if sel.isempty(): + result.problems.append('package not found: %s' % pkg_spec) + continue + + if pkg_name != pkg_spec: + jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep)) + else: + jobs += sel.jobs(solv.Job.SOLVER_INSTALL) + + if len(result.problems) > 0: + return result + + problems = solver.solve(jobs) + + if problems: + for p in problems: + result.problems.append(str(p)) + return result + + trans = solver.transaction() + for s in trans.newsolvables(): + result.resolved[s.name] = s + + return result + + +class compile_solv_t: + @staticmethod + def compile( + options: compile_options_t, + stores: Optional[list[repo_store_t]] = None, + ) -> compile_result_t.res_t: + mirror = compile_base_t.build_mirror_config(options) + + cache_dir: Optional[pathlib.Path] = None + if options.cache_dir is not None: + cache_dir = pathlib.Path(options.cache_dir) + cache_dir.mkdir(parents=True, exist_ok=True) + + if stores is None: + indices = compile_base_t.fetch_indices( + mirror=mirror, + cache_dir=cache_dir, + no_cache=options.no_cache, + offline=options.offline, + ) + stores = [repo_store_t(index=idx) for idx in indices] + + pool = solv_pool_t(stores=stores, cache_dir=cache_dir) + + pinned: Optional[dict[str, str]] = None + upgrade_packages: Optional[list[str]] = None + + if options.reference is not None: + ref_txt = pathlib.Path(options.reference).read_text() + pinned = solv_pool_t.parse_reference(ref_txt) + + if options.resolution_strategy is resolution_strategy_t.pin_referenced: + upgrade_packages = options.packages + packages = list(pinned.keys()) + [p for p in options.packages if p not in pinned] + else: + packages = options.packages + else: + packages = options.packages + + resolved = pool.resolve( + packages, + pinned=pinned if options.resolution_strategy is resolution_strategy_t.pin_referenced else None, + upgrade_packages=upgrade_packages, + ) + + if len(resolved.problems) > 0: + raise RuntimeError('resolution failed with %d problem(s):\n%s' % (len(resolved.problems), '\n'.join(resolved.problems))) + + result = compile_result_t.res_t() + + for pkg_name, solvable in resolved.resolved.items(): + repo_name = solvable.repo.name if solvable.repo else '' + + pkg_desc: Optional[package_desc_t] = None + for store in stores: + candidate = store.index.packages.get(pkg_name) + if candidate is not None and candidate.version == solvable.evr: + pkg_desc = candidate + if store.index.name == repo_name: + break + + filename = pkg_desc.filename if pkg_desc else '' + sha256 = (pkg_desc.sha256sum if pkg_desc else '') if options.generate_hashes else '' + + url = '' + if filename: + repo_url = '' + for repo_cfg in mirror.repos: + if repo_cfg.name == repo_name: + repo_url = repo_cfg.url + break + + if repo_url: + url = '%s/%s' % (repo_url, filename) + else: + url = 'https://archive.archlinux.org/packages/%s/%s/%s' % ( + pkg_name[0], + pkg_name, + filename, + ) + + entry = compile_entry_t( + name=pkg_name, + version=solvable.evr, + filename=filename, + repo=repo_name, + url=url, + sha256=sha256, + depends=pkg_desc.depends if pkg_desc else [], + ) + + result.entries.append(entry) + + result.txt = result.to_txt() + + return result