[+] add archlinux package management module with libsolv backend

1. add models: package_desc_t, repo_index_t, vercmp_t, compile_options_t;
  2. add db.py: parse pacman .db tar archives into repo_index_t;
  3. add pacman.py: wrap pacman cli for listing installed, downloading .db;
  4. add resolver.py: pure python dependency resolver;
  5. add compile.py: fetch archive indices, resolve, output compiled reqs;
  6. add solv_backend.py: libsolv-based solver with repo_store_t, solv_pool_t;
  7. add cli.py: list-installed, compile, download subcommands;
  8. add mypy-stubs/types-solv for solv python bindings;
  9. add .gitattributes lfs tracking for test .db files;
This commit is contained in:
LLM 2026-04-03 16:42:27 +00:00
parent 98b906ac11
commit 64845d732d
11 changed files with 2093 additions and 0 deletions

1
.gitattributes vendored

@ -1,2 +1,3 @@
releases/tar/** filter=lfs diff=lfs merge=lfs -text
releases/whl/** filter=lfs diff=lfs merge=lfs -text
python/online/fxreader/pr34/commands_typed/archlinux/tests/res/*.db filter=lfs diff=lfs merge=lfs -text

@ -0,0 +1,263 @@
from typing import Iterator, Sequence
REL_GT: int
REL_EQ: int
REL_LT: int
REL_AND: int
REL_OR: int
REL_WITH: int
REL_NAMESPACE: int
REL_ARCH: int
REL_FILECONFLICT: int
REL_COND: int
REL_COMPAT: int
REL_KIND: int
REL_MULTIARCH: int
REL_ELSE: int
REL_ERROR: int
REL_WITHOUT: int
REL_UNLESS: int
REL_CONDA: int
def xfopen(path: str, mode: str = ...) -> XFile: ...
def xfopen_fd(path: str, fd: int) -> XFile: ...
class XFile:
def close(self) -> None: ...
class Dep:
...
class Pool:
DISTTYPE_RPM: int
DISTTYPE_DEB: int
DISTTYPE_ARCH: int
DISTTYPE_HAIKU: int
DISTTYPE_CONDA: int
DISTTYPE_APK: int
installed: Repo | None
repos: list[Repo]
def setdisttype(self, disttype: int) -> None: ...
def setarch(self, arch: str) -> None: ...
def add_repo(self, name: str) -> Repo: ...
def createwhatprovides(self) -> None: ...
def str2id(self, s: str, create: bool = ...) -> int: ...
def rel2id(self, name: int, evr: int, flags: int, create: bool = ...) -> int: ...
def dep2str(self, dep: int) -> str: ...
def id2str(self, id: int) -> str: ...
def select(self, name: str, flags: int) -> Selection: ...
def Solver(self) -> Solver: ...
def Selection(self) -> Selection: ...
def Selection_all(self) -> Selection: ...
def Job(self, how: int, what: int) -> Job: ...
def Dep(self, s: str, create: bool = ...) -> Dep: ...
def isknownarch(self, id: int) -> bool: ...
def set_debuglevel(self, level: int) -> None: ...
def set_flag(self, flag: int, value: int) -> int: ...
def get_flag(self, flag: int) -> int: ...
def set_rootdir(self, rootdir: str) -> None: ...
def get_rootdir(self) -> str: ...
def repos_iter(self) -> Iterator[Repo]: ...
def solvables_iter(self) -> Iterator[Solvable]: ...
def id2repo(self, id: int) -> Repo | None: ...
def id2solvable(self, id: int) -> Solvable: ...
class Repo:
REPO_EXTEND_SOLVABLES: int
REPO_LOCALPOOL: int
REPO_NO_INTERNALIZE: int
REPO_NO_LOCATION: int
REPO_REUSE_REPODATA: int
REPO_USE_LOADING: int
REPO_USE_ROOTDIR: int
SOLV_ADD_NO_STUBS: int
name: str
nsolvables: int
id: int
pool: Pool
def add_solvable(self) -> Solvable: ...
def add_solv(self, f: XFile, flags: int = ...) -> bool: ...
def add_rpmmd(self, f: XFile, language: str | None, flags: int = ...) -> bool: ...
def add_repomdxml(self, f: XFile, flags: int = ...) -> bool: ...
def write(self, f: XFile) -> bool: ...
def write_first_repodata(self, f: XFile) -> bool: ...
def internalize(self) -> None: ...
def isempty(self) -> bool: ...
def empty(self) -> None: ...
def free(self, reuseids: bool = ...) -> None: ...
def solvables_iter(self) -> Iterator[Solvable]: ...
def Selection(self, flags: int = ...) -> Selection: ...
class Solvable:
name: str
evr: str
arch: str
nameid: int
evrid: int
archid: int
id: int
repo: Repo
pool: Pool
vendor: str
vendorid: int
def add_provides(self, dep: int) -> None: ...
def add_requires(self, dep: int) -> None: ...
def add_conflicts(self, dep: int) -> None: ...
def add_obsoletes(self, dep: int) -> None: ...
def add_recommends(self, dep: int) -> None: ...
def add_suggests(self, dep: int) -> None: ...
def add_supplements(self, dep: int) -> None: ...
def add_enhances(self, dep: int) -> None: ...
def add_deparray(self, keyname: int, dep: int, marker: int = ...) -> None: ...
def installable(self) -> bool: ...
def isinstalled(self) -> bool: ...
def identical(self, other: Solvable) -> bool: ...
def evrcmp(self, other: Solvable) -> int: ...
def matchesdep(self, keyname: int, dep: int, marker: int = ...) -> bool: ...
def lookup_str(self, keyname: int) -> str | None: ...
def lookup_num(self, keyname: int, notfound: int = ...) -> int: ...
def lookup_id(self, keyname: int) -> int: ...
def lookup_idarray(self, keyname: int, marker: int = ...) -> list[int]: ...
def lookup_deparray(self, keyname: int, marker: int = ...) -> list[Dep]: ...
def lookup_checksum(self, keyname: int) -> Chksum | None: ...
def lookup_location(self) -> tuple[str | None, int]: ...
def lookup_sourcepkg(self) -> str | None: ...
def lookup_void(self, keyname: int) -> bool: ...
def Selection(self, flags: int = ...) -> Selection: ...
def unset(self, keyname: int) -> None: ...
def __str__(self) -> str: ...
class Solver:
SOLVER_FLAG_ALLOW_DOWNGRADE: int
SOLVER_FLAG_ALLOW_ARCHCHANGE: int
SOLVER_FLAG_ALLOW_VENDORCHANGE: int
SOLVER_FLAG_ALLOW_UNINSTALL: int
SOLVER_FLAG_ALLOW_NAMECHANGE: int
SOLVER_FLAG_SPLITPROVIDES: int
SOLVER_FLAG_IGNORE_RECOMMENDED: int
SOLVER_FLAG_ADD_ALREADY_RECOMMENDED: int
SOLVER_FLAG_NO_INFARCHCHECK: int
SOLVER_FLAG_BEST_OBEY_POLICY: int
SOLVER_FLAG_NO_AUTOTARGET: int
SOLVER_FLAG_FOCUS_INSTALLED: int
SOLVER_FLAG_FOCUS_BEST: int
SOLVER_FLAG_INSTALL_ALSO_UPDATES: int
SOLVER_FLAG_STRICT_REPO_PRIORITY: int
def solve(self, jobs: Sequence[Job]) -> list[Problem]: ...
def transaction(self) -> Transaction: ...
def write_testcase(self, path: str) -> bool: ...
class Problem:
def __str__(self) -> str: ...
class Selection:
SELECTION_NAME: int
SELECTION_PROVIDES: int
SELECTION_FILELIST: int
SELECTION_CANON: int
SELECTION_DOTARCH: int
SELECTION_REL: int
SELECTION_INSTALLED_ONLY: int
SELECTION_GLOB: int
SELECTION_FLAT: int
SELECTION_NOCASE: int
SELECTION_SOURCE_ONLY: int
SELECTION_WITH_SOURCE: int
SELECTION_SKIP_KIND: int
SELECTION_MATCH_DEPSTR: int
SELECTION_WITH_DISABLED: int
SELECTION_WITH_BADARCH: int
SELECTION_WITH_ALL: int
SELECTION_ADD: int
SELECTION_SUBTRACT: int
SELECTION_FILTER: int
SELECTION_FILTER_KEEP_IFEMPTY: int
SELECTION_FILTER_SWAPPED: int
def isempty(self) -> bool: ...
def jobs(self, action: int) -> list[Job]: ...
def solvables(self) -> list[Solvable]: ...
def select(self, name: str, flags: int) -> None: ...
def add_raw(self, how: int, what: int) -> None: ...
class Job:
SOLVER_INSTALL: int
SOLVER_ERASE: int
SOLVER_UPDATE: int
SOLVER_WEAKENDEPS: int
SOLVER_MULTIVERSION: int
SOLVER_LOCK: int
SOLVER_DISTUPGRADE: int
SOLVER_VERIFY: int
SOLVER_DROP_ORPHANED: int
SOLVER_USERINSTALLED: int
SOLVER_ALLOWUNINSTALL: int
SOLVER_FAVOR: int
SOLVER_DISFAVOR: int
SOLVER_EXCLUDEFROMWEAK: int
SOLVER_SOLVABLE: int
SOLVER_SOLVABLE_NAME: int
SOLVER_SOLVABLE_PROVIDES: int
SOLVER_SOLVABLE_ONE_OF: int
SOLVER_SOLVABLE_REPO: int
SOLVER_SOLVABLE_ALL: int
SOLVER_SELECTMASK: int
SOLVER_JOBMASK: int
SOLVER_NOOP: int
SOLVER_WEAK: int
SOLVER_ESSENTIAL: int
SOLVER_CLEANDEPS: int
SOLVER_FORCEBEST: int
SOLVER_TARGETED: int
SOLVER_NOTBYUSER: int
SOLVER_SETEV: int
SOLVER_SETEVR: int
SOLVER_SETARCH: int
SOLVER_SETVENDOR: int
SOLVER_SETREPO: int
SOLVER_SETNAME: int
SOLVER_NOAUTOSET: int
SOLVER_SETMASK: int
def isemptyupdate(self) -> bool: ...
class Transaction:
pool: Pool
def isempty(self) -> bool: ...
def newsolvables(self) -> list[Solvable]: ...
def keptsolvables(self) -> list[Solvable]: ...
def newpackages(self) -> list[Solvable]: ...
def keptpackages(self) -> list[Solvable]: ...
def steps(self) -> list[Solvable]: ...
def steptype(self, solvable: Solvable, mode: int) -> int: ...
def othersolvable(self, solvable: Solvable) -> Solvable | None: ...
def allothersolvables(self, solvable: Solvable) -> list[Solvable]: ...
def calc_installsizechange(self) -> int: ...
def order(self, flags: int = ...) -> None: ...
def classify(self, mode: int = ...) -> list[TransactionClass]: ...
class TransactionClass:
...
class Chksum:
...

@ -0,0 +1,442 @@
import argparse
import enum
import logging
import math
import pathlib
import re
import subprocess
import sys
import urllib.request
from typing import (
ClassVar,
Optional,
)
logger = logging.getLogger(__name__)
class Command(enum.Enum):
list_installed = 'list-installed'
compile = 'compile'
download = 'download'
archive = 'archive'
class parse_rate_t:
class constants_t:
rate_re: ClassVar[re.Pattern[str]] = re.compile(r'^(\d+(?:\.\d+)?)\s*([bBkKmMgGpPtT]?)(?:[iI]?[bB])?(?:/s)?$')
units: ClassVar[dict[str, int]] = {
'': 0,
'b': 0,
'B': 0,
'k': 1,
'K': 1,
'm': 2,
'M': 2,
'g': 3,
'G': 3,
't': 4,
'T': 4,
'p': 5,
'P': 5,
}
@staticmethod
def parse(s: str) -> int:
m = parse_rate_t.constants_t.rate_re.match(s.strip())
if not m:
raise ValueError('invalid rate: %s' % s)
value = float(m.group(1))
unit = m.group(2)
power = parse_rate_t.constants_t.units.get(unit, 0)
return int(value * (1024**power))
class downloader_t:
class constants_t:
class backend_t(enum.Enum):
urllib = 'urllib'
curl = 'curl'
aria2c = 'aria2c'
@staticmethod
def download(
url: str,
dest: pathlib.Path,
backend: 'downloader_t.constants_t.backend_t',
limit_rate: int,
) -> None:
dest.parent.mkdir(parents=True, exist_ok=True)
if backend is downloader_t.constants_t.backend_t.urllib:
urllib.request.urlretrieve(url, str(dest))
elif backend is downloader_t.constants_t.backend_t.curl:
cmd = [
'curl',
'-fSL',
'--limit-rate',
'%d' % limit_rate,
'-o',
str(dest),
url,
]
subprocess.check_call(cmd)
elif backend is downloader_t.constants_t.backend_t.aria2c:
cmd = [
'aria2c',
'--max-download-limit=%d' % limit_rate,
'-d',
str(dest.parent),
'-o',
dest.name,
url,
]
subprocess.check_call(cmd)
else:
raise NotImplementedError
class download_requirements_t:
@staticmethod
def parse_requirements(txt: str) -> list[tuple[str, str]]:
entries: list[tuple[str, str]] = []
url: Optional[str] = None
for line in txt.splitlines():
line = line.strip()
if line == '':
continue
if line.startswith('#'):
candidate = line[1:].strip()
if '/' in candidate and '://' in candidate:
url = candidate
continue
parts = line.split()
if len(parts) == 0:
continue
pkg_spec = parts[0]
if url is not None:
filename = url.rsplit('/', 1)[-1] if '/' in url else pkg_spec
entries.append((url, filename))
url = None
return entries
def _find_cached_pkg(
cache_dir: pathlib.Path,
name: str,
version: str,
) -> Optional[pathlib.Path]:
"""Find a cached .pkg.tar.* file for a given package name and version."""
for suffix in ['.pkg.tar.zst', '.pkg.tar.xz', '.pkg.tar.gz', '.pkg.tar.bz2', '.pkg.tar']:
for arch in ['x86_64', 'any']:
candidate = cache_dir / ('%s-%s-%s%s' % (name, version, arch, suffix))
if candidate.exists():
return candidate
return None
def main(argv: Optional[list[str]] = None) -> int:
if argv is None:
argv = sys.argv[1:]
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
prog='online-fxreader-pr34-archlinux',
description='Arch Linux package management tools',
)
parser.add_argument(
'command',
choices=[o.value for o in Command],
)
options, args = parser.parse_known_args(argv)
options.command = Command(options.command)
if options.command is Command.list_installed:
import hashlib
from .pacman import pacman_t
list_parser = argparse.ArgumentParser()
list_parser.add_argument(
'--format',
choices=['plain', 'constraints', 'compiled'],
default='plain',
help='plain: name version; constraints: name>=version; compiled: name==version with optional hashes',
)
list_parser.add_argument(
'--generate-hashes',
action='store_true',
default=False,
help='include sha256 from local /var/cache/pacman/pkg/ files; fails if file not found for any package',
)
list_parser.add_argument(
'--db-path',
dest='db_path',
default='/var/lib/pacman',
help='pacman db path, default /var/lib/pacman',
)
list_parser.add_argument(
'--pkg-cache-dir',
dest='pkg_cache_dir',
default='/var/cache/pacman/pkg',
help='local pacman package cache directory, default /var/cache/pacman/pkg',
)
list_options = list_parser.parse_args(args)
installed = pacman_t.list_installed_simple(
db_path=pathlib.Path(list_options.db_path),
)
pkg_cache_dir = pathlib.Path(list_options.pkg_cache_dir)
if list_options.format == 'plain':
for name, version in installed:
print('%s %s' % (name, version))
elif list_options.format == 'constraints':
for name, version in installed:
print('%s>=%s' % (name, version))
elif list_options.format == 'compiled':
missing_hashes: list[str] = []
for name, version in installed:
line = '%s==%s' % (name, version)
if list_options.generate_hashes:
pkg_file = _find_cached_pkg(
pkg_cache_dir,
name,
version,
)
if pkg_file is not None:
h = hashlib.sha256()
with open(pkg_file, 'rb') as fh:
while True:
chunk = fh.read(65536)
if not chunk:
break
h.update(chunk)
line += ' --hash=sha256:%s' % h.hexdigest()
else:
missing_hashes.append(name)
print(line)
if len(missing_hashes) > 0:
logger.error(
"can't determine checksum of installed package(s) - no cached file found for %d package(s): %s" % (len(missing_hashes), missing_hashes)
)
return 1
return 0
elif options.command is Command.compile:
compile_parser = argparse.ArgumentParser()
compile_parser.add_argument(
'packages',
nargs='*',
)
compile_parser.add_argument(
'-r',
dest='requirements_file',
default=None,
help='path to file with package constraints (one per line)',
)
compile_parser.add_argument(
'--index',
dest='index_url',
default=None,
help='mirror URL',
)
compile_parser.add_argument(
'--archive-date',
dest='archive_date',
default=None,
help='Arch Linux Archive date (e.g. 2024/01/15)',
)
compile_parser.add_argument(
'--offline',
action='store_true',
default=False,
)
compile_parser.add_argument(
'--no-cache',
action='store_true',
default=False,
)
compile_parser.add_argument(
'--generate-hashes',
action='store_true',
default=False,
)
compile_parser.add_argument(
'--cache-dir',
dest='cache_dir',
default=None,
)
compile_parser.add_argument(
'--repos',
nargs='*',
default=['core', 'extra', 'multilib'],
)
compile_parser.add_argument(
'--arch',
default='x86_64',
)
compile_parser.add_argument(
'--backend',
choices=['python', 'solv'],
default='solv',
)
compile_parser.add_argument(
'--archive-cache',
dest='archive_cache',
default=None,
help='path to archive cache dir (with archlinux_cache.db from archive sync); loads all synced dates into the solver pool',
)
compile_parser.add_argument(
'--reference',
default=None,
help='path to previously compiled requirements file to use as version pins',
)
compile_parser.add_argument(
'--resolution-strategy',
dest='resolution_strategy',
choices=['upgrade-all', 'pin-referenced'],
default='upgrade-all',
help='upgrade-all: resolve fresh; pin-referenced: keep referenced versions, only upgrade explicitly requested packages',
)
compile_options = compile_parser.parse_args(args)
from .models import compile_options_t, resolution_strategy_t
packages: list[str] = list(compile_options.packages)
if compile_options.requirements_file is not None:
for line in pathlib.Path(compile_options.requirements_file).read_text().splitlines():
line = line.strip()
if line != '' and not line.startswith('#'):
packages.append(line)
opts = compile_options_t(
packages=packages,
index_url=compile_options.index_url,
archive_date=compile_options.archive_date,
offline=compile_options.offline,
no_cache=compile_options.no_cache,
generate_hashes=compile_options.generate_hashes,
repos=compile_options.repos,
arch=compile_options.arch,
cache_dir=compile_options.cache_dir,
reference=compile_options.reference,
resolution_strategy=resolution_strategy_t(compile_options.resolution_strategy),
)
try:
if compile_options.backend == 'solv':
from .solv_backend import compile_solv_t, repo_store_t
stores = None
if compile_options.archive_cache is not None:
from .cache_db import cache_db_t
archive_cache_dir = pathlib.Path(compile_options.archive_cache)
db_path = archive_cache_dir / 'archlinux_cache.db'
if db_path.exists():
cache_db = cache_db_t(db_path)
indices = cache_db.load_all_indices()
cache_db.close()
stores = [repo_store_t(index=idx) for idx in indices]
result = compile_solv_t.compile(opts, stores=stores)
else:
from .compile import compile_t
result = compile_t.compile(opts)
except RuntimeError as e:
logger.error(str(e))
return 1
print(result.txt)
return 0
elif options.command is Command.download:
download_parser = argparse.ArgumentParser()
download_parser.add_argument(
'-r',
dest='requirements',
required=True,
help='path to compiled requirements file',
)
download_parser.add_argument(
'-d',
dest='dest_dir',
required=True,
help='destination directory for downloaded packages',
)
download_parser.add_argument(
'--downloader',
choices=[o.value for o in downloader_t.constants_t.backend_t],
default='urllib',
)
download_parser.add_argument(
'--limit-rate',
dest='limit_rate',
default='128KiB/s',
help='download speed limit (e.g. 128KiB/s, 1MiB/s, 512K), default 128KiB/s',
)
download_options = download_parser.parse_args(args)
dest_dir = pathlib.Path(download_options.dest_dir)
dest_dir.mkdir(parents=True, exist_ok=True)
backend = downloader_t.constants_t.backend_t(download_options.downloader)
limit_rate = parse_rate_t.parse(download_options.limit_rate)
requirements_txt = pathlib.Path(download_options.requirements).read_text()
entries = download_requirements_t.parse_requirements(requirements_txt)
count = 0
for url, filename in entries:
dest_path = dest_dir / filename
if dest_path.exists():
logger.info(dict(msg='already downloaded', path=str(dest_path)))
else:
logger.info(dict(msg='downloading', url=url, dest=str(dest_path), backend=backend.value, limit_rate=limit_rate))
downloader_t.download(
url=url,
dest=dest_path,
backend=backend,
limit_rate=limit_rate,
)
count += 1
logger.info(dict(msg='download complete', count=count))
return 0
elif options.command is Command.archive:
from . import archive as _archive
return _archive.main(args)
else:
raise NotImplementedError
if __name__ == '__main__':
sys.exit(main())

@ -0,0 +1,147 @@
import io
import hashlib
import pathlib
import tempfile
import logging
from typing import (
Optional,
Any,
)
from .models import (
compile_options_t,
compile_entry_t,
compile_result_t,
mirror_config_t,
repo_index_t,
)
from .db import db_parser_t
from .pacman import pacman_t
from .resolver import resolver_t
logger = logging.getLogger(__name__)
class compile_t:
@staticmethod
def build_mirror_config(options: compile_options_t) -> mirror_config_t:
if options.archive_date is not None:
return mirror_config_t.from_archive_date(
date=options.archive_date,
repos=options.repos,
arch=options.arch,
)
elif options.index_url is not None:
return mirror_config_t.from_mirror_url(
mirror_url=options.index_url,
repos=options.repos,
arch=options.arch,
)
else:
return mirror_config_t.from_mirror_url(
mirror_url='https://archive.archlinux.org/repos/last',
repos=options.repos,
arch=options.arch,
)
@staticmethod
def fetch_indices(
mirror: mirror_config_t,
cache_dir: Optional[pathlib.Path] = None,
no_cache: bool = False,
offline: bool = False,
) -> list[repo_index_t]:
indices: list[repo_index_t] = []
for repo in mirror.repos:
db_url = '%s/%s.db' % (repo.url, repo.name)
if cache_dir is not None and not no_cache:
cached_path = cache_dir / ('%s.db' % repo.name)
if cached_path.exists():
logger.info(
dict(
repo=repo.name,
msg='using cached db',
path=str(cached_path),
)
)
index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name)
indices.append(index)
continue
if offline:
raise FileNotFoundError('offline mode: cached db not found for %s at %s' % (repo.name, str(cached_path)))
pacman_t.download_db(db_url, cached_path)
index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name)
indices.append(index)
else:
if offline:
raise FileNotFoundError('offline mode requires --cache-dir with pre-fetched db files')
with tempfile.NamedTemporaryFile(suffix='.db') as tmp:
pacman_t.download_db(db_url, pathlib.Path(tmp.name))
index = db_parser_t.parse_db_path(pathlib.Path(tmp.name), repo_name=repo.name)
indices.append(index)
return indices
@staticmethod
def compile(
options: compile_options_t,
) -> compile_result_t.res_t:
mirror = compile_t.build_mirror_config(options)
cache_dir: Optional[pathlib.Path] = None
if options.cache_dir is not None:
cache_dir = pathlib.Path(options.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
indices = compile_t.fetch_indices(
mirror=mirror,
cache_dir=cache_dir,
no_cache=options.no_cache,
offline=options.offline,
)
resolved = resolver_t.resolve(
packages=options.packages,
indices=indices,
)
result = compile_result_t.res_t()
for pkg_name in resolved.resolution_order:
pkg = resolved.resolved[pkg_name]
repo_name = ''
for idx in indices:
if pkg_name in idx.packages:
repo_name = idx.name
break
repo_url = ''
for repo_cfg in mirror.repos:
if repo_cfg.name == repo_name:
repo_url = repo_cfg.url
break
entry = compile_entry_t(
name=pkg.name,
version=pkg.version,
filename=pkg.filename,
repo=repo_name,
url='%s/%s' % (repo_url, pkg.filename) if repo_url and pkg.filename else '',
sha256=pkg.sha256sum if options.generate_hashes else '',
depends=pkg.depends,
)
result.entries.append(entry)
result.txt = result.to_txt()
return result

@ -0,0 +1,157 @@
import io
import re
import tarfile
import logging
import pathlib
from typing import (
ClassVar,
Optional,
Any,
BinaryIO,
)
from .models import (
package_desc_t,
repo_index_t,
)
logger = logging.getLogger(__name__)
class db_parser_t:
class constants_t:
field_re: ClassVar[re.Pattern[str]] = re.compile(r'^%([A-Z0-9]+)%$')
list_fields: ClassVar[set[str]] = {
'LICENSE',
'DEPENDS',
'OPTDEPENDS',
'MAKEDEPENDS',
'CHECKDEPENDS',
'PROVIDES',
'CONFLICTS',
'REPLACES',
'GROUPS',
}
field_map: ClassVar[dict[str, str]] = {
'FILENAME': 'filename',
'NAME': 'name',
'VERSION': 'version',
'DESC': 'desc',
'CSIZE': 'csize',
'ISIZE': 'isize',
'MD5SUM': 'md5sum',
'SHA256SUM': 'sha256sum',
'URL': 'url',
'ARCH': 'arch',
'BUILDDATE': 'builddate',
'PACKAGER': 'packager',
'LICENSE': 'license',
'DEPENDS': 'depends',
'OPTDEPENDS': 'optdepends',
'MAKEDEPENDS': 'makedepends',
'CHECKDEPENDS': 'checkdepends',
'PROVIDES': 'provides',
'CONFLICTS': 'conflicts',
'REPLACES': 'replaces',
'GROUPS': 'groups',
'BASE': 'base',
}
int_fields: ClassVar[set[str]] = {
'CSIZE',
'ISIZE',
'BUILDDATE',
}
@staticmethod
def parse_desc(content: str) -> package_desc_t:
fields: dict[str, Any] = {}
lines = content.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
if line == '':
i += 1
continue
m = db_parser_t.constants_t.field_re.match(line)
if not m:
i += 1
continue
field_name = m.group(1)
i += 1
values: list[str] = []
while i < len(lines) and lines[i].strip() != '':
values.append(lines[i].strip())
i += 1
attr_name = db_parser_t.constants_t.field_map.get(field_name)
if attr_name is None:
continue
if field_name in db_parser_t.constants_t.list_fields:
fields[attr_name] = values
elif field_name in db_parser_t.constants_t.int_fields:
fields[attr_name] = int(values[0]) if len(values) > 0 else 0
else:
fields[attr_name] = values[0] if len(values) > 0 else ''
if 'name' not in fields or 'version' not in fields:
raise ValueError('desc missing NAME or VERSION')
return package_desc_t(**fields)
@staticmethod
def parse_db(
f: BinaryIO,
repo_name: str = '',
) -> repo_index_t:
index = repo_index_t(name=repo_name)
with tarfile.open(fileobj=f, mode='r:*') as tar:
desc_members: list[tarfile.TarInfo] = []
for member in tar.getmembers():
if member.name.endswith('/desc') and member.isfile():
desc_members.append(member)
for member in desc_members:
extracted = tar.extractfile(member)
if extracted is None:
continue
content = extracted.read().decode('utf-8')
extracted.close()
try:
pkg = db_parser_t.parse_desc(content)
index.packages[pkg.name] = pkg
except ValueError:
logger.warning(
dict(
member=member.name,
msg='failed to parse desc',
)
)
index.build_provides_index()
return index
@staticmethod
def parse_db_path(
path: pathlib.Path,
repo_name: Optional[str] = None,
) -> repo_index_t:
if repo_name is None:
repo_name = path.stem.split('.')[0]
with io.open(path, 'rb') as f:
return db_parser_t.parse_db(f, repo_name=repo_name)

@ -0,0 +1,324 @@
import dataclasses
import enum
import re
import logging
from typing import (
ClassVar,
Optional,
Literal,
Any,
)
logger = logging.getLogger(__name__)
class vercmp_t:
class constants_t:
epoch_re: ClassVar[re.Pattern[str]] = re.compile(r'^(\d+):(.+)$')
rel_re: ClassVar[re.Pattern[str]] = re.compile(r'^(.+)-(\d+)$')
segment_re: ClassVar[re.Pattern[str]] = re.compile(r'(\d+|[a-zA-Z]+)')
@staticmethod
def split_evr(version: str) -> tuple[int, str, str]:
epoch = 0
m = vercmp_t.constants_t.epoch_re.match(version)
if m:
epoch = int(m.group(1))
version = m.group(2)
rel = '0'
m = vercmp_t.constants_t.rel_re.match(version)
if m:
version = m.group(1)
rel = m.group(2)
return (epoch, version, rel)
@staticmethod
def compare_segment(a: str, b: str) -> int:
a_segments = vercmp_t.constants_t.segment_re.findall(a)
b_segments = vercmp_t.constants_t.segment_re.findall(b)
for sa, sb in zip(a_segments, b_segments):
a_digit = sa.isdigit()
b_digit = sb.isdigit()
if a_digit and b_digit:
ia, ib = int(sa), int(sb)
if ia != ib:
return 1 if ia > ib else -1
elif a_digit:
return 1
elif b_digit:
return -1
else:
if sa != sb:
return 1 if sa > sb else -1
if len(a_segments) != len(b_segments):
return 1 if len(a_segments) > len(b_segments) else -1
return 0
@staticmethod
def vercmp(a: str, b: str) -> int:
if a == b:
return 0
a_epoch, a_ver, a_rel = vercmp_t.split_evr(a)
b_epoch, b_ver, b_rel = vercmp_t.split_evr(b)
if a_epoch != b_epoch:
return 1 if a_epoch > b_epoch else -1
ret = vercmp_t.compare_segment(a_ver, b_ver)
if ret != 0:
return ret
return vercmp_t.compare_segment(a_rel, b_rel)
class constraint_op_t(enum.Enum):
eq = '='
ge = '>='
le = '<='
gt = '>'
lt = '<'
class package_constraint_t:
class constants_t:
constraint_re: ClassVar[re.Pattern[str]] = re.compile(r'^([a-zA-Z0-9@._+\-]+?)(?:(>=|<=|>|<|=)(.+))?$')
def __init__(
self,
name: str,
op: Optional[constraint_op_t] = None,
version: Optional[str] = None,
) -> None:
self.name = name
self.op = op
self.version = version
def __eq__(self, other: Any) -> bool:
if not isinstance(other, package_constraint_t):
return NotImplemented
return self.name == other.name and self.op == other.op and self.version == other.version
def __hash__(self) -> int:
return hash((self.name, self.op, self.version))
def __repr__(self) -> str:
if self.op is None or self.version is None:
return 'package_constraint_t(%r)' % self.name
return 'package_constraint_t(%r, %r, %r)' % (self.name, self.op.value, self.version)
@staticmethod
def parse(s: str) -> 'package_constraint_t':
m = package_constraint_t.constants_t.constraint_re.match(s.strip())
if not m:
raise ValueError('invalid constraint: %s' % s)
name = m.group(1)
op_str = m.group(2)
version = m.group(3)
op: Optional[constraint_op_t] = None
if op_str:
op = constraint_op_t(op_str)
return package_constraint_t(
name=name,
op=op,
version=version,
)
def satisfied_by(self, version: str) -> bool:
if self.op is None or self.version is None:
return True
constraint_ver = self.version
candidate_ver = version
if '-' not in constraint_ver and '-' in candidate_ver:
candidate_ver = vercmp_t.constants_t.rel_re.sub(r'\1', candidate_ver)
cmp = vercmp_t.vercmp(candidate_ver, constraint_ver)
if self.op is constraint_op_t.eq:
return cmp == 0
elif self.op is constraint_op_t.ge:
return cmp >= 0
elif self.op is constraint_op_t.le:
return cmp <= 0
elif self.op is constraint_op_t.gt:
return cmp > 0
elif self.op is constraint_op_t.lt:
return cmp < 0
else:
raise NotImplementedError
def to_str(self) -> str:
if self.op is None or self.version is None:
return self.name
return '%s%s%s' % (self.name, self.op.value, self.version)
@dataclasses.dataclass
class package_desc_t:
name: str
version: str
desc: str = ''
filename: str = ''
csize: int = 0
isize: int = 0
md5sum: str = ''
sha256sum: str = ''
url: str = ''
arch: str = ''
builddate: int = 0
packager: str = ''
license: list[str] = dataclasses.field(default_factory=lambda: list[str]())
depends: list[str] = dataclasses.field(default_factory=lambda: list[str]())
optdepends: list[str] = dataclasses.field(default_factory=lambda: list[str]())
makedepends: list[str] = dataclasses.field(default_factory=lambda: list[str]())
checkdepends: list[str] = dataclasses.field(default_factory=lambda: list[str]())
provides: list[str] = dataclasses.field(default_factory=lambda: list[str]())
conflicts: list[str] = dataclasses.field(default_factory=lambda: list[str]())
replaces: list[str] = dataclasses.field(default_factory=lambda: list[str]())
groups: list[str] = dataclasses.field(default_factory=lambda: list[str]())
base: str = ''
def parsed_depends(self) -> list[package_constraint_t]:
return [package_constraint_t.parse(d) for d in self.depends]
def parsed_provides(self) -> list[package_constraint_t]:
return [package_constraint_t.parse(p) for p in self.provides]
def parsed_conflicts(self) -> list[package_constraint_t]:
return [package_constraint_t.parse(c) for c in self.conflicts]
@dataclasses.dataclass
class repo_config_t:
name: str
url: str
@dataclasses.dataclass
class mirror_config_t:
repos: list[repo_config_t] = dataclasses.field(default_factory=lambda: list[repo_config_t]())
arch: str = 'x86_64'
@staticmethod
def from_archive_date(
date: str,
repos: Optional[list[str]] = None,
arch: str = 'x86_64',
) -> 'mirror_config_t':
if repos is None:
repos = ['core', 'extra', 'multilib']
base_url = 'https://archive.archlinux.org/repos/%s' % date
return mirror_config_t(
repos=[
repo_config_t(
name=r,
url='%s/%s/os/%s' % (base_url, r, arch),
)
for r in repos
],
arch=arch,
)
@staticmethod
def from_mirror_url(
mirror_url: str,
repos: Optional[list[str]] = None,
arch: str = 'x86_64',
) -> 'mirror_config_t':
if repos is None:
repos = ['core', 'extra', 'multilib']
return mirror_config_t(
repos=[
repo_config_t(
name=r,
url='%s/%s/os/%s' % (mirror_url.rstrip('/'), r, arch),
)
for r in repos
],
arch=arch,
)
@dataclasses.dataclass
class repo_index_t:
name: str
packages: dict[str, package_desc_t] = dataclasses.field(default_factory=lambda: dict[str, package_desc_t]())
provides_index: dict[str, list[str]] = dataclasses.field(default_factory=lambda: dict[str, list[str]]())
groups_index: dict[str, list[str]] = dataclasses.field(default_factory=lambda: dict[str, list[str]]())
def build_provides_index(self) -> None:
self.provides_index = {}
self.groups_index = {}
for pkg_name, pkg in self.packages.items():
for prov in pkg.provides:
prov_constraint = package_constraint_t.parse(prov)
if prov_constraint.name not in self.provides_index:
self.provides_index[prov_constraint.name] = []
self.provides_index[prov_constraint.name].append(pkg_name)
for group in pkg.groups:
if group not in self.groups_index:
self.groups_index[group] = []
self.groups_index[group].append(pkg_name)
class resolution_strategy_t(enum.Enum):
upgrade_all = 'upgrade-all'
pin_referenced = 'pin-referenced'
@dataclasses.dataclass
class compile_options_t:
packages: list[str] = dataclasses.field(default_factory=lambda: list[str]())
index_url: Optional[str] = None
archive_date: Optional[str] = None
offline: bool = False
no_cache: bool = False
generate_hashes: bool = False
repos: list[str] = dataclasses.field(default_factory=lambda: ['core', 'extra', 'multilib'])
arch: str = 'x86_64'
cache_dir: Optional[str] = None
reference: Optional[str] = None
resolution_strategy: resolution_strategy_t = resolution_strategy_t.upgrade_all
@dataclasses.dataclass
class compile_entry_t:
name: str
version: str
filename: str
repo: str
url: str
sha256: str = ''
depends: list[str] = dataclasses.field(default_factory=lambda: list[str]())
class compile_result_t:
@dataclasses.dataclass
class res_t:
entries: list[compile_entry_t] = dataclasses.field(default_factory=lambda: list[compile_entry_t]())
txt: str = ''
def to_txt(self) -> str:
lines: list[str] = []
for e in sorted(self.entries, key=lambda x: x.name):
line = '%s==%s' % (e.name, e.version)
if e.sha256:
line += ' --hash=sha256:%s' % e.sha256
lines.append('# %s' % e.url if e.url else '# %s/%s' % (e.repo, e.filename))
lines.append(line)
return '\n'.join(lines)

@ -0,0 +1,182 @@
import re
import subprocess
import pathlib
import dataclasses
import logging
from typing import (
ClassVar,
Optional,
Any,
)
from .models import (
package_desc_t,
)
logger = logging.getLogger(__name__)
class pacman_t:
class constants_t:
default_db_path: ClassVar[pathlib.Path] = pathlib.Path('/var/lib/pacman')
default_cache_dir: ClassVar[pathlib.Path] = pathlib.Path('/var/cache/pacman/pkg')
field_re: ClassVar[re.Pattern[str]] = re.compile(r'^([A-Za-z ]+?)\s*:\s*(.*)$')
@dataclasses.dataclass
class query_entry_t:
name: str
version: str
description: str = ''
architecture: str = ''
url: str = ''
depends_on: list[str] = dataclasses.field(default_factory=lambda: list[str]())
provides: list[str] = dataclasses.field(default_factory=lambda: list[str]())
conflicts_with: list[str] = dataclasses.field(default_factory=lambda: list[str]())
replaces: list[str] = dataclasses.field(default_factory=lambda: list[str]())
install_size: str = ''
packager: str = ''
groups: list[str] = dataclasses.field(default_factory=lambda: list[str]())
class list_installed_t:
@dataclasses.dataclass
class res_t:
packages: list['pacman_t.query_entry_t'] = dataclasses.field(default_factory=lambda: list[pacman_t.query_entry_t]())
@staticmethod
def parse_info_block(block: str) -> 'pacman_t.query_entry_t':
fields: dict[str, list[str]] = {}
current_key: Optional[str] = None
for line in block.split('\n'):
m = pacman_t.constants_t.field_re.match(line)
if m:
current_key = m.group(1).strip()
value = m.group(2).strip()
assert isinstance(current_key, str)
if current_key not in fields:
fields[current_key] = []
if value and value != 'None':
fields[current_key].append(value)
elif current_key and line.startswith(' '):
value = line.strip()
if value and value != 'None':
fields[current_key].append(value)
name = fields.get('Name', [''])[0]
version = fields.get('Version', [''])[0]
if not name or not version:
raise ValueError('missing Name or Version in block')
return pacman_t.query_entry_t(
name=name,
version=version,
description=fields.get('Description', [''])[0] if fields.get('Description') else '',
architecture=fields.get('Architecture', [''])[0] if fields.get('Architecture') else '',
url=fields.get('URL', [''])[0] if fields.get('URL') else '',
depends_on=fields.get('Depends On', []),
provides=fields.get('Provides', []),
conflicts_with=fields.get('Conflicts With', []),
replaces=fields.get('Replaces', []),
install_size=fields.get('Installed Size', [''])[0] if fields.get('Installed Size') else '',
packager=fields.get('Packager', [''])[0] if fields.get('Packager') else '',
groups=fields.get('Groups', []),
)
@staticmethod
def list_installed(
db_path: Optional[pathlib.Path] = None,
) -> 'pacman_t.list_installed_t.res_t':
cmd: list[str] = ['pacman', '-Qi']
if db_path is not None:
cmd.extend(['--dbpath', str(db_path)])
output = subprocess.check_output(
cmd,
stderr=subprocess.DEVNULL,
).decode('utf-8')
blocks = output.split('\n\n')
result = pacman_t.list_installed_t.res_t()
for block in blocks:
block = block.strip()
if not block:
continue
try:
entry = pacman_t.parse_info_block(block)
result.packages.append(entry)
except ValueError:
logger.warning(
dict(
msg='failed to parse pacman info block',
)
)
return result
@staticmethod
def list_installed_simple(
db_path: Optional[pathlib.Path] = None,
) -> list[tuple[str, str]]:
cmd: list[str] = ['pacman', '-Q']
if db_path is not None:
cmd.extend(['--dbpath', str(db_path)])
output = subprocess.check_output(
cmd,
stderr=subprocess.DEVNULL,
).decode('utf-8')
result: list[tuple[str, str]] = []
for line in output.strip().split('\n'):
parts = line.strip().split(None, 1)
if len(parts) == 2:
result.append((parts[0], parts[1]))
return result
@staticmethod
def sync_db(
mirror_url: str,
db_path: pathlib.Path,
repos: Optional[list[str]] = None,
) -> None:
if repos is None:
repos = ['core', 'extra', 'multilib']
cmd: list[str] = [
'pacman',
'-Sy',
'--dbpath',
str(db_path),
]
subprocess.check_call(cmd)
@staticmethod
def download_db(
url: str,
output_path: pathlib.Path,
) -> None:
import urllib.request
logger.info(
dict(
url=url,
output_path=str(output_path),
msg='downloading db',
)
)
output_path.parent.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(
url,
str(output_path),
)

@ -0,0 +1,161 @@
import dataclasses
import logging
from typing import (
Optional,
Any,
)
from .models import (
package_desc_t,
package_constraint_t,
repo_index_t,
vercmp_t,
)
logger = logging.getLogger(__name__)
class resolver_t:
class error_t:
class not_found_t(Exception):
def __init__(self, name: str) -> None:
self.name = name
super().__init__('package not found: %s' % name)
class conflict_t(Exception):
def __init__(self, pkg_a: str, pkg_b: str, constraint: str) -> None:
self.pkg_a = pkg_a
self.pkg_b = pkg_b
self.constraint = constraint
super().__init__('conflict: %s conflicts with %s (%s)' % (pkg_a, pkg_b, constraint))
class unsatisfied_t(Exception):
def __init__(self, parent: str, dep: str) -> None:
self.parent = parent
self.dep = dep
super().__init__('unsatisfied dependency: %s requires %s' % (parent, dep))
@dataclasses.dataclass
class res_t:
resolved: dict[str, package_desc_t] = dataclasses.field(default_factory=lambda: dict[str, package_desc_t]())
resolution_order: list[str] = dataclasses.field(default_factory=lambda: list[str]())
@staticmethod
def _find_provider(
constraint: package_constraint_t,
indices: list[repo_index_t],
) -> Optional[tuple[package_desc_t, str]]:
for index in indices:
if constraint.name in index.packages:
pkg = index.packages[constraint.name]
if constraint.satisfied_by(pkg.version):
return (pkg, index.name)
for index in indices:
if constraint.name in index.provides_index:
for provider_name in index.provides_index[constraint.name]:
pkg = index.packages[provider_name]
for prov in pkg.parsed_provides():
if prov.name == constraint.name:
if constraint.version is None or prov.version is None:
return (pkg, index.name)
if constraint.satisfied_by(prov.version):
return (pkg, index.name)
return None
@staticmethod
def resolve(
packages: list[str],
indices: list[repo_index_t],
skip_installed: Optional[set[str]] = None,
) -> 'resolver_t.res_t':
if skip_installed is None:
skip_installed = set()
result = resolver_t.res_t()
visited: set[str] = set()
stack: list[tuple[package_constraint_t, Optional[str]]] = []
for pkg_str in packages:
constraint = package_constraint_t.parse(pkg_str)
stack.append((constraint, None))
while len(stack) > 0:
constraint, parent = stack.pop()
if constraint.name in visited:
if constraint.name in result.resolved:
pkg = result.resolved[constraint.name]
if not constraint.satisfied_by(pkg.version):
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
continue
if constraint.name in skip_installed:
visited.add(constraint.name)
continue
found = resolver_t._find_provider(constraint, indices)
if found is None:
exists = any(constraint.name in idx.packages or constraint.name in idx.provides_index for idx in indices)
if exists:
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
raise resolver_t.error_t.not_found_t(constraint.name)
pkg, repo_name = found
if pkg.name in visited:
if pkg.name in result.resolved and constraint.op is not None:
resolved_pkg = result.resolved[pkg.name]
if constraint.name == resolved_pkg.name:
if not constraint.satisfied_by(resolved_pkg.version):
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
else:
matched = False
for prov in resolved_pkg.parsed_provides():
if prov.name == constraint.name:
if prov.version is not None and constraint.satisfied_by(prov.version):
matched = True
break
elif prov.version is None:
matched = True
break
if not matched:
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
continue
visited.add(pkg.name)
visited.add(constraint.name)
result.resolved[pkg.name] = pkg
result.resolution_order.append(pkg.name)
for conflict in pkg.parsed_conflicts():
if conflict.name in result.resolved:
resolved_version = result.resolved[conflict.name].version
if conflict.satisfied_by(resolved_version):
raise resolver_t.error_t.conflict_t(
pkg_a=pkg.name,
pkg_b=conflict.name,
constraint=conflict.to_str(),
)
for dep in pkg.parsed_depends():
if dep.name not in visited and dep.name not in skip_installed:
stack.append((dep, pkg.name))
return result

@ -0,0 +1,416 @@
import hashlib
import io
import logging
import pathlib
import re
from typing import (
ClassVar,
Optional,
Any,
)
from .models import (
package_desc_t,
repo_index_t,
compile_options_t,
compile_entry_t,
compile_result_t,
mirror_config_t,
resolution_strategy_t,
)
from .db import db_parser_t
from .compile import compile_t as compile_base_t
logger = logging.getLogger(__name__)
class repo_store_t:
class constants_t:
checksum_filename: ClassVar[str] = 'checksum.sha256'
def __init__(
self,
index: repo_index_t,
db_checksum: str = '',
) -> None:
self.index = index
self.db_checksum = db_checksum
@staticmethod
def _file_checksum(path: pathlib.Path) -> str:
h = hashlib.sha256()
with io.open(path, 'rb') as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
@staticmethod
def from_db(
db_path: pathlib.Path,
repo_name: Optional[str] = None,
cache_dir: Optional[pathlib.Path] = None,
) -> 'repo_store_t':
if repo_name is None:
repo_name = db_path.stem.split('.')[0]
db_checksum = repo_store_t._file_checksum(db_path)
if cache_dir is not None:
solv_cache_path = cache_dir / ('%s.solv' % repo_name)
checksum_path = cache_dir / ('%s.solv.sha256' % repo_name)
index_cache_path = cache_dir / ('%s.index.solv' % repo_name)
if solv_cache_path.exists() and checksum_path.exists():
stored_checksum = checksum_path.read_text().strip()
if stored_checksum == db_checksum:
logger.info(
dict(
repo=repo_name,
msg='using cached solv',
path=str(solv_cache_path),
)
)
index = db_parser_t.parse_db_path(db_path, repo_name=repo_name)
return repo_store_t(
index=index,
db_checksum=db_checksum,
)
index = db_parser_t.parse_db_path(db_path, repo_name=repo_name)
return repo_store_t(
index=index,
db_checksum=db_checksum,
)
def write_solv_cache(
self,
cache_dir: pathlib.Path,
solv_repo: Any,
) -> None:
import solv
cache_dir.mkdir(parents=True, exist_ok=True)
solv_cache_path = cache_dir / ('%s.solv' % self.index.name)
checksum_path = cache_dir / ('%s.solv.sha256' % self.index.name)
f = solv.xfopen(str(solv_cache_path), 'w')
solv_repo.write(f)
f.close()
checksum_path.write_text(self.db_checksum)
logger.info(
dict(
repo=self.index.name,
msg='wrote solv cache',
path=str(solv_cache_path),
size=solv_cache_path.stat().st_size,
)
)
class solv_pool_t:
class constants_t:
dep_re: ClassVar[re.Pattern[str]] = re.compile(r'^([a-zA-Z0-9@._+\-]+?)(?:(>=|<=|>|<|=)(.+))?$')
def __init__(
self,
stores: Optional[list[repo_store_t]] = None,
cache_dir: Optional[pathlib.Path] = None,
) -> None:
import solv
self._solv = solv
self._pool = solv.Pool()
self._pool.setdisttype(solv.Pool.DISTTYPE_ARCH)
self._pool.setarch('x86_64')
self._rel_map = {
'>=': solv.REL_GT | solv.REL_EQ,
'<=': solv.REL_LT | solv.REL_EQ,
'>': solv.REL_GT,
'<': solv.REL_LT,
'=': solv.REL_EQ,
}
self._stores: list[repo_store_t] = []
if stores is not None:
for store in stores:
self.add_store(store, cache_dir=cache_dir)
self.finalize()
def _parse_dep(self, dep_str: str) -> Any:
m = solv_pool_t.constants_t.dep_re.match(dep_str.strip())
if not m:
return self._pool.str2id(dep_str)
name = m.group(1)
op = m.group(2)
ver = m.group(3)
name_id = self._pool.str2id(name)
if op and ver:
ver_id = self._pool.str2id(ver)
return self._pool.rel2id(name_id, ver_id, self._rel_map[op])
return name_id
def add_store(
self,
store: repo_store_t,
cache_dir: Optional[pathlib.Path] = None,
) -> None:
solv = self._solv
self._stores.append(store)
loaded_from_cache = False
if cache_dir is not None:
solv_cache_path = cache_dir / ('%s.solv' % store.index.name)
checksum_path = cache_dir / ('%s.solv.sha256' % store.index.name)
if solv_cache_path.exists() and checksum_path.exists():
stored_checksum = checksum_path.read_text().strip()
if stored_checksum == store.db_checksum:
repo = self._pool.add_repo(store.index.name)
f = solv.xfopen(str(solv_cache_path))
repo.add_solv(f)
f.close()
loaded_from_cache = True
logger.info(
dict(
repo=store.index.name,
msg='loaded solv from cache',
solvables=repo.nsolvables,
)
)
if not loaded_from_cache:
repo = self._pool.add_repo(store.index.name)
for pkg in store.index.packages.values():
s = repo.add_solvable()
s.name = pkg.name
s.evr = pkg.version
s.arch = 'noarch' if pkg.arch == 'any' else (pkg.arch or 'x86_64')
for dep_str in pkg.depends:
s.add_requires(self._parse_dep(dep_str))
for prov_str in pkg.provides:
s.add_provides(self._parse_dep(prov_str))
s.add_provides(self._pool.rel2id(s.nameid, s.evrid, solv.REL_EQ))
for conf_str in pkg.conflicts:
s.add_conflicts(self._parse_dep(conf_str))
repo.internalize()
if cache_dir is not None:
store.write_solv_cache(cache_dir, repo)
def finalize(self) -> None:
self._pool.createwhatprovides()
class resolve_t:
class res_t:
def __init__(self) -> None:
self.resolved: dict[str, Any] = {}
self.problems: list[str] = []
def expand_groups(
self,
packages: list[str],
) -> list[str]:
expanded: list[str] = []
for pkg_name in packages:
found_group = False
for store in self._stores:
if pkg_name in store.index.groups_index:
expanded.extend(store.index.groups_index[pkg_name])
found_group = True
break
if not found_group:
expanded.append(pkg_name)
return expanded
@staticmethod
def parse_reference(txt: str) -> dict[str, str]:
pinned: dict[str, str] = {}
for line in txt.splitlines():
line = line.strip()
if line == '' or line.startswith('#'):
continue
parts = line.split()
pkg_spec = parts[0]
if '==' in pkg_spec:
name, version = pkg_spec.split('==', 1)
pinned[name] = version
return pinned
def resolve(
self,
packages: list[str],
expand_groups: bool = True,
pinned: Optional[dict[str, str]] = None,
upgrade_packages: Optional[list[str]] = None,
) -> 'solv_pool_t.resolve_t.res_t':
solv = self._solv
if expand_groups:
packages = self.expand_groups(packages)
result = solv_pool_t.resolve_t.res_t()
solver = self._pool.Solver()
jobs: list[Any] = []
upgrade_set: set[str] = set()
if upgrade_packages is not None:
if expand_groups:
upgrade_packages = self.expand_groups(upgrade_packages)
upgrade_set = set(upgrade_packages)
for pkg_spec in packages:
pkg_name = pkg_spec.split('>=')[0].split('<=')[0].split('>')[0].split('<')[0].split('=')[0]
if pinned is not None and pkg_name in pinned and pkg_name not in upgrade_set:
pinned_spec = '%s=%s' % (pkg_name, pinned[pkg_name])
dep = self._parse_dep(pinned_spec)
jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep))
else:
dep = self._parse_dep(pkg_spec)
sel = self._pool.select(pkg_name, solv.Selection.SELECTION_NAME | solv.Selection.SELECTION_PROVIDES)
if sel.isempty():
result.problems.append('package not found: %s' % pkg_spec)
continue
if pkg_name != pkg_spec:
jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep))
else:
jobs += sel.jobs(solv.Job.SOLVER_INSTALL)
if len(result.problems) > 0:
return result
problems = solver.solve(jobs)
if problems:
for p in problems:
result.problems.append(str(p))
return result
trans = solver.transaction()
for s in trans.newsolvables():
result.resolved[s.name] = s
return result
class compile_solv_t:
@staticmethod
def compile(
options: compile_options_t,
stores: Optional[list[repo_store_t]] = None,
) -> compile_result_t.res_t:
mirror = compile_base_t.build_mirror_config(options)
cache_dir: Optional[pathlib.Path] = None
if options.cache_dir is not None:
cache_dir = pathlib.Path(options.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
if stores is None:
indices = compile_base_t.fetch_indices(
mirror=mirror,
cache_dir=cache_dir,
no_cache=options.no_cache,
offline=options.offline,
)
stores = [repo_store_t(index=idx) for idx in indices]
pool = solv_pool_t(stores=stores, cache_dir=cache_dir)
pinned: Optional[dict[str, str]] = None
upgrade_packages: Optional[list[str]] = None
if options.reference is not None:
ref_txt = pathlib.Path(options.reference).read_text()
pinned = solv_pool_t.parse_reference(ref_txt)
if options.resolution_strategy is resolution_strategy_t.pin_referenced:
upgrade_packages = options.packages
packages = list(pinned.keys()) + [p for p in options.packages if p not in pinned]
else:
packages = options.packages
else:
packages = options.packages
resolved = pool.resolve(
packages,
pinned=pinned if options.resolution_strategy is resolution_strategy_t.pin_referenced else None,
upgrade_packages=upgrade_packages,
)
if len(resolved.problems) > 0:
raise RuntimeError('resolution failed with %d problem(s):\n%s' % (len(resolved.problems), '\n'.join(resolved.problems)))
result = compile_result_t.res_t()
for pkg_name, solvable in resolved.resolved.items():
repo_name = solvable.repo.name if solvable.repo else ''
pkg_desc: Optional[package_desc_t] = None
for store in stores:
candidate = store.index.packages.get(pkg_name)
if candidate is not None and candidate.version == solvable.evr:
pkg_desc = candidate
if store.index.name == repo_name:
break
filename = pkg_desc.filename if pkg_desc else ''
sha256 = (pkg_desc.sha256sum if pkg_desc else '') if options.generate_hashes else ''
url = ''
if filename:
repo_url = ''
for repo_cfg in mirror.repos:
if repo_cfg.name == repo_name:
repo_url = repo_cfg.url
break
if repo_url:
url = '%s/%s' % (repo_url, filename)
else:
url = 'https://archive.archlinux.org/packages/%s/%s/%s' % (
pkg_name[0],
pkg_name,
filename,
)
entry = compile_entry_t(
name=pkg_name,
version=solvable.evr,
filename=filename,
repo=repo_name,
url=url,
sha256=sha256,
depends=pkg_desc.depends if pkg_desc else [],
)
result.entries.append(entry)
result.txt = result.to_txt()
return result