[+] move models.py to apps/specs/, add compiled_entry_t, unified parse_compiled

1. move models.py into apps/specs/models.py, add compiled_entry_t
     pydantic model for parsed requirements entries;
  2. add apps/specs/utils.py with parse_compiled() and parse_reference()
     as single source for requirements file parsing;
  3. refactor cli/diff.py: remove local parse_compiled, use specs/utils,
     compute_diff takes dict[str, compiled_entry_t];
  4. refactor cli/download.py: remove download_requirements_t class,
     use parse_compiled from specs/utils;
  5. refactor cli/compile.py: compile_t class encapsulating all logic,
     -p/--package with action=append, --mode (requirements/needed/full),
     -o/--output, --generate-hashes default true with BooleanOptionalAction,
     build_filter computed once, transitive_deps minus pinned;
  6. update all imports across apps/, cli/, resolver/, tests/;
This commit is contained in:
LLM 2026-04-22 09:00:00 +00:00
parent d0215504d0
commit 857e9d41a2
14 changed files with 659 additions and 331 deletions

@ -1,4 +1,5 @@
import datetime
import enum
import hashlib
import io
import logging
@ -6,12 +7,16 @@ import pathlib
import sqlite3
from typing import (
TYPE_CHECKING,
ClassVar,
Generator,
Optional,
TypeVar,
)
if TYPE_CHECKING:
from ..pacman.manager import archive_entry_t
import pydantic
from ..pacman.types import (
@ -19,7 +24,7 @@ from ..pacman.types import (
repo_index_t,
)
from ...models import (
from ..specs.models import (
package_t,
package_index_t,
)
@ -99,6 +104,20 @@ class trusted_entry_t(pydantic.BaseModel, frozen=True):
version: str
class archive_version_status_t(enum.StrEnum):
pending = 'pending'
synced = 'synced'
class archive_version_row_t(pydantic.BaseModel):
id: int
name: str
version: str
archive_date: datetime.date
filename: str
status: archive_version_status_t
def _stream_rows(
cur: sqlite3.Cursor,
model: type[_T],
@ -141,7 +160,7 @@ class cache_db_t(orm_module_t):
@classmethod
def schema_version(cls) -> int:
return 1
return 2
@classmethod
def migrate(cls, conn: sqlite3.Connection, from_version: int, to_version: int) -> None:
@ -220,6 +239,25 @@ class cache_db_t(orm_module_t):
""")
conn.commit()
if from_version < 2:
conn.executescript("""
CREATE TABLE IF NOT EXISTS archive_versions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
version TEXT NOT NULL,
archive_date TEXT NOT NULL,
filename TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
UNIQUE(name, version)
);
CREATE INDEX IF NOT EXISTS idx_archive_versions_name
ON archive_versions(name);
CREATE INDEX IF NOT EXISTS idx_archive_versions_status
ON archive_versions(status);
""")
conn.commit()
# ── constructors ──
def __init__(self, db_path_or_conn: 'pathlib.Path | sqlite3.Connection') -> None:
@ -445,6 +483,14 @@ class cache_db_t(orm_module_t):
)
return _fetch_one(cur, package_hash_row_t)
def has_package_version(self, name: str, version: str) -> bool:
cur = self._conn.cursor()
cur.execute(
'SELECT 1 FROM packages WHERE name=? AND version=? LIMIT 1',
(name, version),
)
return cur.fetchone() is not None
# ── repo_index_t loading ──
def load_repo_index(
@ -769,6 +815,70 @@ class cache_db_t(orm_module_t):
return trusted
# ── archive versions ──
def upsert_archive_version(
self,
name: str,
version: str,
archive_date: datetime.date,
filename: str,
) -> None:
cur = self._conn.cursor()
cur.execute(
'INSERT INTO archive_versions (name, version, archive_date, filename, status) '
'VALUES (?, ?, ?, ?, ?) '
'ON CONFLICT(name, version) DO UPDATE SET archive_date=excluded.archive_date, filename=excluded.filename',
(name, version, archive_date.isoformat(), filename, archive_version_status_t.pending.value),
)
self._conn.commit()
def mark_archive_version_synced(self, name: str, version: str) -> None:
cur = self._conn.cursor()
cur.execute(
'UPDATE archive_versions SET status=? WHERE name=? AND version=?',
(archive_version_status_t.synced.value, name, version),
)
self._conn.commit()
def list_archive_versions(self, name: str) -> Generator[archive_version_row_t, None, None]:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM archive_versions WHERE name=? ORDER BY archive_date',
(name,),
)
yield from _stream_rows(cur, archive_version_row_t)
def list_pending_archive_versions(self) -> Generator[archive_version_row_t, None, None]:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM archive_versions WHERE status=? ORDER BY archive_date',
(archive_version_status_t.pending.value,),
)
yield from _stream_rows(cur, archive_version_row_t)
def find_archive_version(self, name: str, version: str) -> Optional[archive_version_row_t]:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM archive_versions WHERE name=? AND version=?',
(name, version),
)
return _fetch_one(cur, archive_version_row_t)
def bulk_upsert_archive_versions(self, entries: 'list[archive_entry_t]') -> None:
cur = self._conn.cursor()
rows = [
(e.name, e.version, e.date.isoformat(), e.filename, archive_version_status_t.pending.value)
for e in entries
]
cur.executemany(
'INSERT INTO archive_versions (name, version, archive_date, filename, status) '
'VALUES (?, ?, ?, ?, ?) '
'ON CONFLICT(name, version) DO UPDATE SET archive_date=excluded.archive_date, filename=excluded.filename',
rows,
)
self._conn.commit()
# ── status ──
def has_data(self) -> bool:

@ -7,7 +7,7 @@ import logging
from typing import Optional
from ...models import vercmp_t
from ..specs.models import vercmp_t
from .db import cve_db_t
from .types import (
cve_check_result_t,

@ -8,7 +8,7 @@ from typing import (
Optional,
)
from ...models import (
from ..specs.models import (
constraint_op_t,
package_constraint_t,
vercmp_t,

@ -1,7 +1,9 @@
import dataclasses
import enum
import re
import logging
import re
import pydantic
from typing import (
ClassVar,
@ -97,11 +99,21 @@ class package_constraint_t:
self,
name: str,
op: Optional[constraint_op_t] = None,
exclude: bool = False,
ignore: bool = False,
pinned: bool = False,
upgrade: bool = False,
requested: bool = False,
version: Optional[str] = None,
) -> None:
self.name = name
self.op = op
self.version = version
self.exclude = exclude
self.ignore = ignore
self.pinned = pinned
self.upgrade = upgrade
self.requested = requested
def __eq__(self, other: Any) -> bool:
if not isinstance(other, package_constraint_t):
@ -118,7 +130,17 @@ class package_constraint_t:
@staticmethod
def parse(s: str) -> 'package_constraint_t':
m = package_constraint_t.constants_t.constraint_re.match(s.strip())
stripped = s.strip()
exclude = False
ignore = False
if stripped.startswith('!'):
exclude = True
stripped = stripped[1:]
elif stripped.startswith('-') and len(stripped) > 1 and stripped[1].isalpha():
ignore = True
stripped = stripped[1:]
m = package_constraint_t.constants_t.constraint_re.match(stripped)
if not m:
raise ValueError('invalid constraint: %s' % s)
@ -134,6 +156,8 @@ class package_constraint_t:
name=name,
op=op,
version=version,
exclude=exclude,
ignore=ignore,
)
def satisfied_by(self, version: str) -> bool:
@ -272,3 +296,15 @@ class compile_result_t:
lines.append(comment)
lines.append(line)
return '\n'.join(lines)
class compiled_entry_t(pydantic.BaseModel, frozen=True):
"""Parsed entry from a compiled requirements file."""
name: str
version: str
url: str = ''
filename: str = ''
sha256: str = ''
csize: int = 0
pinned: bool = False

@ -0,0 +1,77 @@
"""Common utilities for parsing spec/reference files."""
from .models import compiled_entry_t
def parse_compiled(txt: str) -> list[compiled_entry_t]:
"""Parse a compiled requirements file into a list of entries.
Format per entry (two lines):
# <url>
<name>==<version> [--hash=sha256:<hash>] [--size=<bytes>] [# pinned]
"""
entries: list[compiled_entry_t] = []
pending_url = ''
for line in txt.splitlines():
line = line.strip()
if line == '':
continue
if line.startswith('#'):
url = line[1:].strip()
if ' #' in url:
url = url.split(' #', 1)[0].strip()
pending_url = url
continue
is_pinned = False
if ' # pinned' in line:
is_pinned = True
line = line.split(' # pinned', 1)[0].strip()
if ' #' in line:
line = line.split(' #', 1)[0].strip()
parts = line.split()
if len(parts) == 0:
continue
pkg_spec = parts[0]
if '==' not in pkg_spec:
pending_url = ''
continue
name, version = pkg_spec.split('==', 1)
sha256 = ''
csize = 0
for p in parts[1:]:
if p.startswith('--hash=sha256:'):
sha256 = p[len('--hash=sha256:'):]
elif p.startswith('--size='):
try:
csize = int(p[len('--size='):])
except ValueError:
pass
filename = ''
if '/' in pending_url:
filename = pending_url.rsplit('/', 1)[-1]
entries.append(compiled_entry_t(
name=name,
version=version,
url=pending_url,
filename=filename,
sha256=sha256,
csize=csize,
pinned=is_pinned,
))
pending_url = ''
return entries
def parse_reference(txt: str) -> dict[str, str]:
"""Parse a compiled requirements file into a dict of name -> version."""
return {e.name: e.version for e in parse_compiled(txt)}

@ -1,70 +1,93 @@
"""Compile CLI: parse args, resolve packages from cached data, output results.
Uses cache_db for package data and general resolver interface.
No pacman-specific imports in the main flow.
"""
"""Compile CLI: parse args, resolve packages from cached data, output results."""
import argparse
import enum
import logging
import pathlib
import re
from typing import (
Optional,
)
from typing import Optional
from ..apps.cache.db import cache_db_t
from ..models import (
from ..apps.specs.models import (
compile_entry_t,
compile_result_t,
package_index_t,
package_t,
resolve_result_t,
)
logger = logging.getLogger(__name__)
def main(args: list[str]) -> int:
compile_parser = argparse.ArgumentParser(
class compile_t:
class mode_t(enum.StrEnum):
requirements = 'requirements'
needed = 'needed'
full = 'full'
def __init__(self) -> None:
self._options: Optional[argparse.Namespace] = None
self._ref_pinned: dict[str, str] = {}
@property
def options(self) -> argparse.Namespace:
assert self._options is not None
return self._options
def build_parser(self) -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog='online-fxreader-pr34-archlinux compile',
description='Resolve package versions from archive repos and output compiled requirements.',
)
compile_parser.add_argument(
'packages',
nargs='*',
help='package specs to resolve (e.g. bash glibc>=2.39)',
p.add_argument(
'-p', '--package',
dest='packages',
action='append',
default=None,
help='package spec(s), comma separated, repeatable; '
'use = form for specs starting with dash: '
'-p crun -p "!libgcc" -p="-openimageio,-blender"',
)
compile_parser.add_argument(
p.add_argument(
'-r',
dest='requirements_file',
default=None,
help='path to file with package specs, one per line (like pip -r)',
)
compile_parser.add_argument(
'--generate-hashes',
action='store_true',
default=False,
help='include sha256 hashes in compiled output',
p.add_argument(
'-o', '--output',
dest='output',
default=None,
help='write output to file instead of stdout',
)
compile_parser.add_argument(
p.add_argument(
'--generate-hashes',
action=argparse.BooleanOptionalAction,
default=True,
help='include sha256 hashes in compiled output (default: true)',
)
p.add_argument(
'--backend',
choices=['python', 'solv'],
default='solv',
help='resolver backend: solv (libsolv, fast) or python (pure python, slow)',
help='resolver backend: solv (libsolv, default) or python (pure python, slow)',
)
compile_parser.add_argument(
'--cache-dir',
dest='cache_dir',
default=None,
help='directory for cached .db files and sqlite database (created by "archive sync")',
p.add_argument(
'--mode',
choices=[m.value for m in compile_t.mode_t],
default=compile_t.mode_t.requirements.value,
help='output mode: '
'requirements (default) = only requested packages and their deps; '
'needed = only packages not in --reference; '
'full = everything including pinned',
)
compile_parser.add_argument(
p.add_argument(
'--reference',
default=None,
help='path to previously compiled requirements file; '
'versions from this file are used as pins with --resolution-strategy pin-referenced',
)
compile_parser.add_argument(
p.add_argument(
'--resolution-strategy',
dest='resolution_strategy',
choices=['upgrade-all', 'pin-referenced'],
@ -72,64 +95,184 @@ def main(args: list[str]) -> int:
help='upgrade-all: resolve latest versions; '
'pin-referenced: keep --reference versions, only upgrade explicitly requested packages',
)
return p
compile_options = compile_parser.parse_args(args)
def parse_specs(self) -> list[str]:
raw_specs: list[str] = []
if self.options.packages is not None:
for item in self.options.packages:
for part in re.split(r'[,\s]+', item.strip()):
if part != '':
raw_specs.append(part)
packages: list[str] = list(compile_options.packages)
if compile_options.requirements_file is not None:
for line in pathlib.Path(compile_options.requirements_file).read_text().splitlines():
if self.options.requirements_file is not None:
for line in pathlib.Path(self.options.requirements_file).read_text().splitlines():
line = line.strip()
if line != '' and not line.startswith('#'):
packages.append(line)
raw_specs.append(line)
if compile_options.cache_dir is None:
logger.error('--cache-dir is required')
return 1
return raw_specs
cache_dir = pathlib.Path(compile_options.cache_dir)
def load_indices(self) -> list[package_index_t]:
from ..apps.cache.settings import cache_settings_t
cache_dir = cache_settings_t.singleton().dir
db_path = cache_dir / 'archlinux_cache.db'
if not db_path.exists():
logger.error('cache db not found: %s' % db_path)
return 1
raise FileNotFoundError('cache db not found: %s' % db_path)
cache_db = cache_db_t(db_path)
try:
indices = cache_db.load_indices(dedupe=True)
return cache_db.load_indices(dedupe=True)
finally:
cache_db.close()
pinned: Optional[dict[str, str]] = None
upgrade_packages: Optional[list[str]] = None
def apply_reference(self, rc: object) -> None:
from ..resolver.common import constraints_t
from ..apps.specs.utils import parse_reference
if compile_options.reference is not None:
ref_txt = pathlib.Path(compile_options.reference).read_text()
pinned = _parse_reference(ref_txt)
assert isinstance(rc, constraints_t)
if compile_options.resolution_strategy == 'pin-referenced':
upgrade_packages = packages
packages = list(pinned.keys()) + [
p for p in packages if p not in pinned
]
if self.options.reference is None:
return
try:
if compile_options.backend == 'solv':
from ..resolver.solv import resolve as solv_resolve
ref_txt = pathlib.Path(self.options.reference).read_text()
self._ref_pinned = rc.filter_pinned(parse_reference(ref_txt))
resolved = solv_resolve(
indices=indices,
packages=packages,
pinned=pinned,
upgrade_packages=upgrade_packages,
)
if self.options.resolution_strategy == 'pin-referenced':
install_names = {c.name for c in rc.install}
for name in install_names:
rc.add_upgrade(name)
for name, version in self._ref_pinned.items():
if name not in install_names:
rc.add_pinned(name, version)
else:
for name, version in self._ref_pinned.items():
rc.add_pinned(name, version)
def resolve(self, rc: object, indices: list[package_index_t]) -> resolve_result_t:
from ..resolver.common import constraints_t
assert isinstance(rc, constraints_t)
if self.options.backend == 'solv':
from ..resolver.solv import solv_resolver_t
return solv_resolver_t().resolve(constraints=rc, indices=indices)
else:
from ..resolver.general import resolver_t
resolved = resolver_t.resolve(
packages=packages,
indices=indices,
return resolver_t().resolve(constraints=rc, indices=indices)
def transitive_deps(self, resolved: resolve_result_t, roots: set[str]) -> set[str]:
visited: set[str] = set()
stack = list(roots)
while len(stack) > 0:
name = stack.pop()
if name in visited:
continue
visited.add(name)
pkg = resolved.resolved.get(name)
if pkg is None:
continue
for dep in pkg.depends:
if dep.name not in visited and dep.name in resolved.resolved:
stack.append(dep.name)
return visited
def build_filter(self, rc: object, resolved: resolve_result_t) -> set[str]:
from ..resolver.common import constraints_t
assert isinstance(rc, constraints_t)
mode = compile_t.mode_t(self.options.mode)
if mode is compile_t.mode_t.full:
return set(resolved.resolved.keys())
if mode is compile_t.mode_t.needed:
return {
name for name in resolved.resolved
if not (name in self._ref_pinned and name in rc.pinned and rc.pinned[name] == resolved.resolved[name].version)
}
if mode is compile_t.mode_t.requirements:
reachable = self.transitive_deps(resolved, rc.requested_names)
return reachable - set(rc.pinned.keys())
return set(resolved.resolved.keys())
def build_result(self, rc: object, resolved: resolve_result_t) -> compile_result_t.res_t:
from ..resolver.common import constraints_t
assert isinstance(rc, constraints_t)
allowed = self.build_filter(rc, resolved)
result = compile_result_t.res_t()
for pkg_name in resolved.resolution_order:
if pkg_name not in allowed:
continue
pkg = resolved.resolved[pkg_name]
is_pinned = (
len(rc.pinned) > 0
and pkg.name in rc.pinned
and rc.pinned[pkg.name] == pkg.version
)
url = ''
if pkg.filename:
url = 'https://archive.archlinux.org/packages/%s/%s/%s' % (
pkg.name[0],
pkg.name,
pkg.filename,
)
entry = compile_entry_t(
name=pkg.name,
version=pkg.version,
filename=pkg.filename,
repo=pkg.repo,
url=url,
sha256=pkg.sha256sum if self.options.generate_hashes else '',
csize=pkg.csize,
pinned=is_pinned,
depends=pkg.depends,
)
result.entries.append(entry)
result.txt = result.to_txt()
return result
def write_output(self, result: compile_result_t.res_t) -> None:
if self.options.output is not None:
pathlib.Path(self.options.output).write_text(result.txt + '\n')
logger.info(dict(msg='wrote output', path=self.options.output, entries=len(result.entries)))
else:
print(result.txt)
def run(self, args: list[str]) -> int:
parser = self.build_parser()
self._options = parser.parse_args(args)
from ..resolver.common import constraints_t
raw_specs = self.parse_specs()
rc = constraints_t.from_specs(raw_specs, requested=True)
try:
indices = self.load_indices()
except FileNotFoundError as e:
logger.error(str(e))
return 1
self.apply_reference(rc)
try:
resolved = self.resolve(rc, indices)
except RuntimeError as e:
logger.error(str(e))
return 1
@ -141,54 +284,11 @@ def main(args: list[str]) -> int:
)
return 1
result = compile_result_t.res_t()
for pkg_name in resolved.resolution_order:
pkg = resolved.resolved[pkg_name]
url = ''
if pkg.filename:
url = 'https://archive.archlinux.org/packages/%s/%s/%s' % (
pkg.name[0],
pkg.name,
pkg.filename,
)
is_pinned = (
pinned is not None
and pkg.name in pinned
and pinned[pkg.name] == pkg.version
)
entry = compile_entry_t(
name=pkg.name,
version=pkg.version,
filename=pkg.filename,
repo=pkg.repo,
url=url,
sha256=pkg.sha256sum if compile_options.generate_hashes else '',
csize=pkg.csize,
pinned=is_pinned,
depends=pkg.depends,
)
result.entries.append(entry)
result.txt = result.to_txt()
print(result.txt)
result = self.build_result(rc, resolved)
self.write_output(result)
return 0
def _parse_reference(txt: str) -> dict[str, str]:
pinned: dict[str, str] = {}
for line in txt.splitlines():
line = line.strip()
if line == '' or line.startswith('#'):
continue
parts = line.split()
pkg_spec = parts[0]
if '==' in pkg_spec:
name, version = pkg_spec.split('==', 1)
pinned[name] = version
return pinned
def main(args: list[str]) -> int:
return compile_t().run(args)

@ -5,6 +5,9 @@ import dataclasses
import logging
import pathlib
from ..apps.specs.models import compiled_entry_t
from ..apps.specs.utils import parse_compiled
logger = logging.getLogger(__name__)
@ -16,53 +19,13 @@ class diff_entry_t:
new: str = ''
def parse_compiled(txt: str) -> dict[str, dict[str, str]]:
"""Parse compiled requirements text into {name: {version, url, sha256}}."""
result: dict[str, dict[str, str]] = {}
pending_url = ''
for line in txt.splitlines():
line = line.strip()
if line == '':
continue
if line.startswith('#'):
url = line[1:].strip()
# strip trailing annotation like "URL # pinned"
if ' #' in url:
url = url.split(' #', 1)[0].strip()
pending_url = url
continue
# strip trailing inline comment like "pkg==1.0 --hash=... # pinned"
if ' #' in line:
line = line.split(' #', 1)[0].strip()
parts = line.split()
pkg_spec = parts[0]
sha256 = ''
for p in parts[1:]:
if p.startswith('--hash=sha256:'):
sha256 = p[len('--hash=sha256:'):]
if '==' in pkg_spec:
name, version = pkg_spec.split('==', 1)
else:
name = pkg_spec
version = ''
result[name] = {
'version': version,
'url': pending_url,
'sha256': sha256,
}
pending_url = ''
return result
def to_dict(entries: list[compiled_entry_t]) -> dict[str, compiled_entry_t]:
return {e.name: e for e in entries}
def compute_diff(
old: dict[str, dict[str, str]],
new: dict[str, dict[str, str]],
old: dict[str, compiled_entry_t],
new: dict[str, compiled_entry_t],
diff_url: bool = False,
diff_checksum: bool = False,
) -> list[diff_entry_t]:
@ -72,24 +35,24 @@ def compute_diff(
for name in all_names:
if name not in old:
entries.append(diff_entry_t(name=name, kind='added', new=new[name]['version']))
entries.append(diff_entry_t(name=name, kind='added', new=new[name].version))
continue
if name not in new:
entries.append(diff_entry_t(name=name, kind='removed', old=old[name]['version']))
entries.append(diff_entry_t(name=name, kind='removed', old=old[name].version))
continue
o = old[name]
n = new[name]
if o['version'] != n['version']:
entries.append(diff_entry_t(name=name, kind='version', old=o['version'], new=n['version']))
if o.version != n.version:
entries.append(diff_entry_t(name=name, kind='version', old=o.version, new=n.version))
if diff_url and o['url'] != n['url']:
entries.append(diff_entry_t(name=name, kind='url', old=o['url'], new=n['url']))
if diff_url and o.url != n.url:
entries.append(diff_entry_t(name=name, kind='url', old=o.url, new=n.url))
if diff_checksum and o['sha256'] != n['sha256']:
entries.append(diff_entry_t(name=name, kind='sha256', old=o['sha256'], new=n['sha256']))
if diff_checksum and o.sha256 != n.sha256:
entries.append(diff_entry_t(name=name, kind='sha256', old=o.sha256, new=n.sha256))
return entries
@ -138,11 +101,8 @@ def main(args: list[str]) -> int:
options = parser.parse_args(args)
old_txt = pathlib.Path(options.old).read_text()
new_txt = pathlib.Path(options.new).read_text()
old_parsed = parse_compiled(old_txt)
new_parsed = parse_compiled(new_txt)
old_parsed = to_dict(parse_compiled(pathlib.Path(options.old).read_text()))
new_parsed = to_dict(parse_compiled(pathlib.Path(options.new).read_text()))
entries = compute_diff(
old_parsed,

@ -313,50 +313,6 @@ class progress_t:
return line_formatter_t.format(self._build_fields(), 200)
class download_requirements_t:
@staticmethod
def parse_requirements(txt: str) -> list[download_entry_t]:
"""Parse compiled requirements into download entries."""
entries: list[download_entry_t] = []
url: Optional[str] = None
for line in txt.splitlines():
line = line.strip()
if line == '':
continue
if line.startswith('#'):
candidate = line[1:].strip()
if ' #' in candidate:
candidate = candidate.split(' #', 1)[0].strip()
if '/' in candidate and '://' in candidate:
url = candidate
continue
if ' #' in line:
line = line.split(' #', 1)[0].strip()
parts = line.split()
if len(parts) == 0:
continue
sha256 = ''
csize = 0
for p in parts[1:]:
if p.startswith('--hash=sha256:'):
sha256 = p[len('--hash=sha256:'):]
elif p.startswith('--size='):
try:
csize = int(p[len('--size='):])
except ValueError:
pass
if url is not None:
filename = url.rsplit('/', 1)[-1] if '/' in url else parts[0]
entries.append(download_entry_t(url=url, filename=filename, sha256=sha256, csize=csize))
url = None
return entries
class download_t:
def __init__(
@ -624,8 +580,13 @@ def main(args: list[str]) -> int:
progress_mode=progress_t.constants_t.mode_t(opts.progress_mode),
)
entries = download_requirements_t.parse_requirements(
pathlib.Path(opts.requirements).read_text()
)
from ..apps.specs.utils import parse_compiled
compiled = parse_compiled(pathlib.Path(opts.requirements).read_text())
entries = [
download_entry_t(url=e.url, filename=e.filename, sha256=e.sha256, csize=e.csize)
for e in compiled
if e.url != ''
]
return asyncio.run(dl.run(entries, dry_run=opts.dry_run))

@ -10,8 +10,8 @@ from typing import Any, Optional
from ..cli.download import (
parse_rate_t,
downloader_t,
download_requirements_t,
)
from ..apps.specs.utils import parse_compiled, parse_reference
from ..resolver.solv import (
solv_pool_t,
)
@ -83,7 +83,7 @@ class TestParseRate(unittest.TestCase):
class TestDownloadRequirementsParse(unittest.TestCase):
def test_simple(self) -> None:
txt = '# https://example.com/core/bash-5.2-1-x86_64.pkg.tar.zst\nbash==5.2-1 --hash=sha256:abc123\n'
entries = download_requirements_t.parse_requirements(txt)
entries = parse_compiled(txt)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].url, 'https://example.com/core/bash-5.2-1-x86_64.pkg.tar.zst')
self.assertEqual(entries[0].filename, 'bash-5.2-1-x86_64.pkg.tar.zst')
@ -92,28 +92,30 @@ class TestDownloadRequirementsParse(unittest.TestCase):
txt = (
'# https://example.com/core/bash-5.2-1-x86_64.pkg.tar.zst\nbash==5.2-1\n# https://example.com/core/glibc-2.38-1-x86_64.pkg.tar.zst\nglibc==2.38-1\n'
)
entries = download_requirements_t.parse_requirements(txt)
entries = parse_compiled(txt)
self.assertEqual(len(entries), 2)
self.assertEqual(entries[0].filename, 'bash-5.2-1-x86_64.pkg.tar.zst')
self.assertEqual(entries[1].filename, 'glibc-2.38-1-x86_64.pkg.tar.zst')
def test_no_url_skipped(self) -> None:
def test_no_url_has_empty_filename(self) -> None:
txt = 'bash==5.2-1\n'
entries = download_requirements_t.parse_requirements(txt)
self.assertEqual(len(entries), 0)
entries = parse_compiled(txt)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].url, '')
self.assertEqual(entries[0].filename, '')
def test_comment_without_url_ignored(self) -> None:
txt = '# just a comment\n# https://example.com/core/bash-5.2-1-x86_64.pkg.tar.zst\nbash==5.2-1\n'
entries = download_requirements_t.parse_requirements(txt)
entries = parse_compiled(txt)
self.assertEqual(len(entries), 1)
def test_empty_input(self) -> None:
entries = download_requirements_t.parse_requirements('')
entries = parse_compiled('')
self.assertEqual(len(entries), 0)
def test_blank_lines_ignored(self) -> None:
txt = '\n\n# https://example.com/bash.pkg\n\nbash==5.2-1\n\n'
entries = download_requirements_t.parse_requirements(txt)
entries = parse_compiled(txt)
self.assertEqual(len(entries), 1)
@ -325,20 +327,20 @@ class TestParseReference(unittest.TestCase):
'# https://example.com/core/glibc-2.38-1-x86_64.pkg.tar.zst\n'
'glibc==2.38-1\n'
)
pinned = solv_pool_t.parse_reference(txt)
pinned = parse_reference(txt)
self.assertEqual(pinned, {'bash': '5.2-1', 'glibc': '2.38-1'})
def test_empty(self) -> None:
pinned = solv_pool_t.parse_reference('')
pinned = parse_reference('')
self.assertEqual(pinned, {})
def test_comments_only(self) -> None:
pinned = solv_pool_t.parse_reference('# just a comment\n# another\n')
pinned = parse_reference('# just a comment\n# another\n')
self.assertEqual(pinned, {})
def test_no_version_skipped(self) -> None:
txt = 'bash\nglibc==2.38-1\n'
pinned = solv_pool_t.parse_reference(txt)
pinned = parse_reference(txt)
self.assertEqual(pinned, {'glibc': '2.38-1'})

@ -14,7 +14,7 @@ from ..apps.pacman.types import (
repo_config_t,
repo_index_t,
)
from ..models import (
from ..apps.specs.models import (
compile_entry_t,
compile_result_t,
package_index_t,
@ -308,7 +308,7 @@ class TestCompile(unittest.TestCase):
raw = db_parser_t.parse_db_path(db_path, repo_name=name)
indices.append(_pacman_to_general(raw))
resolved = resolver_t.resolve(packages, indices)
resolved = resolver_t.resolve_specs(packages, indices)
return _compile_resolved(resolved, mirror_url, generate_hashes)
def test_compile_single_package(self) -> None:
@ -451,7 +451,7 @@ class TestCompile(unittest.TestCase):
db_path.write_bytes(db_bytes)
raw = db_parser_t.parse_db_path(db_path, repo_name='core')
idx = _pacman_to_general(raw)
result = resolver_t.resolve(['nonexistent'], [idx])
result = resolver_t.resolve_specs(['nonexistent'], [idx])
self.assertGreater(len(result.problems), 0)
self.assertTrue(any('nonexistent' in p for p in result.problems))
@ -465,3 +465,78 @@ class TestCompile(unittest.TestCase):
result = self._fetch_and_resolve({'core': db_bytes}, [])
self.assertEqual(len(result.entries), 0)
class TestIgnorePrefix(unittest.TestCase):
"""Test -package ignore prefix in -r file and CLI args."""
def test_parse_ignore_constraint(self) -> None:
from ..apps.specs.models import package_constraint_t
c = package_constraint_t.parse('-aur-package')
self.assertTrue(c.ignore)
self.assertEqual(c.name, 'aur-package')
self.assertFalse(c.exclude)
def test_ignore_does_not_collide_with_version_constraint(self) -> None:
"""Negative version like foo-1.0 should NOT be parsed as ignore."""
from ..apps.specs.models import package_constraint_t
# foo-bar is a valid package name with hyphen, not an ignore
c = package_constraint_t.parse('foo-bar')
self.assertFalse(c.ignore)
self.assertEqual(c.name, 'foo-bar')
def test_ignore_removes_from_packages(self) -> None:
"""Ignored packages should be stripped from the package list before resolving."""
from ..apps.specs.models import package_constraint_t
specs = ['bash==5.2-1', 'aur-pkg==1.0', '-aur-pkg', 'vim==9.0-1']
ignored: set[str] = set()
install: list[str] = []
for s in specs:
c = package_constraint_t.parse(s)
if c.ignore:
ignored.add(c.name)
else:
install.append(s)
filtered = [s for s in install if package_constraint_t.parse(s).name not in ignored]
names = [package_constraint_t.parse(s).name for s in filtered]
self.assertIn('bash', names)
self.assertIn('vim', names)
self.assertNotIn('aur-pkg', names)
def test_ignore_removes_from_pinned(self) -> None:
"""Ignored packages should be stripped from --reference pinned dict."""
from ..apps.specs.models import package_constraint_t
pinned = {'bash': '5.2-1', 'aur-pkg': '1.0', 'vim': '9.0-1'}
specs = ['-aur-pkg']
ignored: set[str] = set()
for s in specs:
c = package_constraint_t.parse(s)
if c.ignore:
ignored.add(c.name)
filtered_pinned = {k: v for k, v in pinned.items() if k not in ignored}
self.assertIn('bash', filtered_pinned)
self.assertIn('vim', filtered_pinned)
self.assertNotIn('aur-pkg', filtered_pinned)
def test_ignore_in_resolver(self) -> None:
"""General resolver should skip ignored packages."""
from ..resolver.general import resolver_t
idx = _pacman_to_general(repo_index_t(name='core'))
# empty index — if aur-pkg is ignored, resolve should succeed with empty result
result = resolver_t.resolve_specs(['bash==5.2-1', '-aur-pkg'], [idx])
# bash not in index so it'll be a problem, but aur-pkg should NOT appear in problems
self.assertTrue(
all('aur-pkg' not in p for p in result.problems),
'ignored package should not appear in problems',
)

@ -4,35 +4,44 @@ from ..cli.diff import (
compute_diff,
diff_entry_t,
format_diff,
parse_compiled,
to_dict,
)
from ..apps.specs.models import compiled_entry_t
from ..apps.specs.utils import parse_compiled
def _e(name: str, version: str = '', url: str = '', sha256: str = '') -> compiled_entry_t:
return compiled_entry_t(name=name, version=version, url=url, sha256=sha256)
class TestParseCompiled(unittest.TestCase):
def _by_name(self, txt: str) -> dict[str, compiled_entry_t]:
return {e.name: e for e in parse_compiled(txt)}
def test_simple(self) -> None:
txt = (
'# https://example.com/core/bash-5.2-1-x86_64.pkg.tar.zst\n'
'bash==5.2-1\n'
)
parsed = parse_compiled(txt)
self.assertEqual(parsed['bash']['version'], '5.2-1')
self.assertEqual(parsed['bash']['url'], 'https://example.com/core/bash-5.2-1-x86_64.pkg.tar.zst')
parsed = self._by_name(txt)
self.assertEqual(parsed['bash'].version, '5.2-1')
self.assertEqual(parsed['bash'].url, 'https://example.com/core/bash-5.2-1-x86_64.pkg.tar.zst')
def test_with_hash(self) -> None:
txt = (
'# https://example.com/bash.pkg\n'
'bash==5.2-1 --hash=sha256:abc123\n'
)
parsed = parse_compiled(txt)
self.assertEqual(parsed['bash']['sha256'], 'abc123')
parsed = self._by_name(txt)
self.assertEqual(parsed['bash'].sha256, 'abc123')
def test_pinned_annotation_stripped(self) -> None:
txt = (
'# https://example.com/bash.pkg # pinned\n'
'bash==5.2-1\n'
)
parsed = parse_compiled(txt)
self.assertEqual(parsed['bash']['url'], 'https://example.com/bash.pkg')
parsed = self._by_name(txt)
self.assertEqual(parsed['bash'].url, 'https://example.com/bash.pkg')
def test_multiple(self) -> None:
txt = (
@ -41,30 +50,31 @@ class TestParseCompiled(unittest.TestCase):
'# https://example.com/glibc.pkg\n'
'glibc==2.38-1\n'
)
parsed = parse_compiled(txt)
self.assertEqual(len(parsed), 2)
self.assertIn('bash', parsed)
self.assertIn('glibc', parsed)
entries = parse_compiled(txt)
self.assertEqual(len(entries), 2)
names = {e.name for e in entries}
self.assertIn('bash', names)
self.assertIn('glibc', names)
def test_empty(self) -> None:
self.assertEqual(parse_compiled(''), {})
self.assertEqual(parse_compiled(''), [])
def test_no_url(self) -> None:
txt = 'bash==5.2-1\n'
parsed = parse_compiled(txt)
self.assertEqual(parsed['bash']['version'], '5.2-1')
self.assertEqual(parsed['bash']['url'], '')
parsed = self._by_name(txt)
self.assertEqual(parsed['bash'].version, '5.2-1')
self.assertEqual(parsed['bash'].url, '')
class TestComputeDiff(unittest.TestCase):
def test_no_changes(self) -> None:
old = {'bash': {'version': '5.2-1', 'url': 'u', 'sha256': ''}}
new = {'bash': {'version': '5.2-1', 'url': 'u', 'sha256': ''}}
old = {'bash': _e('bash', '5.2-1', url='u')}
new = {'bash': _e('bash', '5.2-1', url='u')}
self.assertEqual(compute_diff(old, new), [])
def test_added(self) -> None:
old: dict[str, dict[str, str]] = {}
new = {'bash': {'version': '5.2-1', 'url': '', 'sha256': ''}}
old: dict[str, compiled_entry_t] = {}
new = {'bash': _e('bash', '5.2-1')}
entries = compute_diff(old, new)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].kind, 'added')
@ -72,16 +82,16 @@ class TestComputeDiff(unittest.TestCase):
self.assertEqual(entries[0].new, '5.2-1')
def test_removed(self) -> None:
old = {'bash': {'version': '5.2-1', 'url': '', 'sha256': ''}}
new: dict[str, dict[str, str]] = {}
old = {'bash': _e('bash', '5.2-1')}
new: dict[str, compiled_entry_t] = {}
entries = compute_diff(old, new)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].kind, 'removed')
self.assertEqual(entries[0].old, '5.2-1')
def test_version_change(self) -> None:
old = {'bash': {'version': '5.2-1', 'url': '', 'sha256': ''}}
new = {'bash': {'version': '5.3-1', 'url': '', 'sha256': ''}}
old = {'bash': _e('bash', '5.2-1')}
new = {'bash': _e('bash', '5.3-1')}
entries = compute_diff(old, new)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].kind, 'version')
@ -89,18 +99,16 @@ class TestComputeDiff(unittest.TestCase):
self.assertEqual(entries[0].new, '5.3-1')
def test_url_change_requires_flag(self) -> None:
old = {'bash': {'version': '5.2-1', 'url': 'http://a/bash.pkg', 'sha256': ''}}
new = {'bash': {'version': '5.2-1', 'url': 'http://b/bash.pkg', 'sha256': ''}}
# off by default
old = {'bash': _e('bash', '5.2-1', url='http://a/bash.pkg')}
new = {'bash': _e('bash', '5.2-1', url='http://b/bash.pkg')}
self.assertEqual(compute_diff(old, new), [])
# enabled
entries = compute_diff(old, new, diff_url=True)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].kind, 'url')
def test_sha256_change_requires_flag(self) -> None:
old = {'bash': {'version': '5.2-1', 'url': '', 'sha256': 'aaa'}}
new = {'bash': {'version': '5.2-1', 'url': '', 'sha256': 'bbb'}}
old = {'bash': _e('bash', '5.2-1', sha256='aaa')}
new = {'bash': _e('bash', '5.2-1', sha256='bbb')}
self.assertEqual(compute_diff(old, new), [])
entries = compute_diff(old, new, diff_checksum=True)
self.assertEqual(len(entries), 1)
@ -108,14 +116,14 @@ class TestComputeDiff(unittest.TestCase):
def test_multiple_changes(self) -> None:
old = {
'bash': {'version': '5.2-1', 'url': 'u1', 'sha256': 'aaa'},
'glibc': {'version': '2.38-1', 'url': '', 'sha256': ''},
'removed': {'version': '1.0', 'url': '', 'sha256': ''},
'bash': _e('bash', '5.2-1', url='u1', sha256='aaa'),
'glibc': _e('glibc', '2.38-1'),
'removed': _e('removed', '1.0'),
}
new = {
'bash': {'version': '5.3-1', 'url': 'u2', 'sha256': 'bbb'},
'glibc': {'version': '2.38-1', 'url': '', 'sha256': ''},
'added': {'version': '2.0', 'url': '', 'sha256': ''},
'bash': _e('bash', '5.3-1', url='u2', sha256='bbb'),
'glibc': _e('glibc', '2.38-1'),
'added': _e('added', '2.0'),
}
entries = compute_diff(old, new, diff_url=True, diff_checksum=True)
@ -127,9 +135,8 @@ class TestComputeDiff(unittest.TestCase):
self.assertIn(('added', 'added'), kinds)
def test_empty_side_shows_when_flag_enabled(self) -> None:
"""When old has empty url but new has one, flag makes it visible."""
old = {'bash': {'version': '5.2-1', 'url': '', 'sha256': ''}}
new = {'bash': {'version': '5.2-1', 'url': 'http://b/bash', 'sha256': 'x'}}
old = {'bash': _e('bash', '5.2-1')}
new = {'bash': _e('bash', '5.2-1', url='http://b/bash', sha256='x')}
self.assertEqual(compute_diff(old, new), [])
entries = compute_diff(old, new, diff_url=True, diff_checksum=True)
kinds = {e.kind for e in entries}
@ -176,8 +183,8 @@ class TestDiffEndToEnd(unittest.TestCase):
'python==3.12-1\n'
)
old_parsed = parse_compiled(old_txt)
new_parsed = parse_compiled(new_txt)
old_parsed = to_dict(parse_compiled(old_txt))
new_parsed = to_dict(parse_compiled(new_txt))
entries = compute_diff(old_parsed, new_parsed, diff_url=True, diff_checksum=True)
kinds = {(e.name, e.kind) for e in entries}

@ -4,7 +4,7 @@ import unittest
from ..apps.pacman.db import db_parser_t
from ..apps.pacman.types import pacman_constraint_t, repo_index_t
from ..resolver.general import resolver_t
from ..models import (
from ..apps.specs.models import (
package_constraint_t,
package_t,
package_index_t,
@ -156,7 +156,7 @@ class TestResolveAgainstSnapshots(TestIntegrationBase):
found = [name for name, version in installed if name in available]
missing = [name for name, version in installed if name not in available]
result = resolver_t.resolve(found, indices)
result = resolver_t.resolve_specs(found, indices)
return result, found, missing
@ -245,50 +245,50 @@ class TestResolveSinglePackages(TestIntegrationBase):
def test_resolve_glibc_all_snapshots(self) -> None:
for date in self.constants_t.dates:
indices = self._load_indices(date)
result = resolver_t.resolve(['glibc'], indices)
result = resolver_t.resolve_specs(['glibc'], indices)
self.assertIn('glibc', result.resolved)
def test_resolve_bash_all_snapshots(self) -> None:
for date in self.constants_t.dates:
indices = self._load_indices(date)
result = resolver_t.resolve(['bash'], indices)
result = resolver_t.resolve_specs(['bash'], indices)
self.assertIn('bash', result.resolved)
self.assertIn('glibc', result.resolved)
def test_resolve_python_all_snapshots(self) -> None:
for date in self.constants_t.dates:
indices = self._load_indices(date)
result = resolver_t.resolve(['python'], indices)
result = resolver_t.resolve_specs(['python'], indices)
self.assertIn('python', result.resolved)
self.assertGreater(len(result.resolved), 3)
def test_resolve_gcc_all_snapshots(self) -> None:
for date in self.constants_t.dates:
indices = self._load_indices(date)
result = resolver_t.resolve(['gcc'], indices)
result = resolver_t.resolve_specs(['gcc'], indices)
self.assertIn('gcc', result.resolved)
def test_resolve_openssl_all_snapshots(self) -> None:
for date in self.constants_t.dates:
indices = self._load_indices(date)
result = resolver_t.resolve(['openssl'], indices)
result = resolver_t.resolve_specs(['openssl'], indices)
self.assertIn('openssl', result.resolved)
def test_resolve_nonexistent_raises(self) -> None:
for date in self.constants_t.dates:
indices = self._load_indices(date)
with self.assertRaises(resolver_t.error_t.not_found_t):
resolver_t.resolve(['this-package-does-not-exist-xyz'], indices)
resolver_t.resolve_specs(['this-package-does-not-exist-xyz'], indices)
class TestCrossSnapshotComparison(TestIntegrationBase):
def test_glibc_version_non_decreasing(self) -> None:
from ..models import vercmp_t
from ..apps.specs.models import vercmp_t
versions: list[str] = []
for date in self.constants_t.dates:
indices = self._load_indices(date)
result = resolver_t.resolve(['glibc'], indices)
result = resolver_t.resolve_specs(['glibc'], indices)
versions.append(result.resolved['glibc'].version)
for i in range(1, len(versions)):
@ -299,12 +299,12 @@ class TestCrossSnapshotComparison(TestIntegrationBase):
)
def test_python_version_non_decreasing(self) -> None:
from ..models import vercmp_t
from ..apps.specs.models import vercmp_t
versions: list[str] = []
for date in self.constants_t.dates:
indices = self._load_indices(date)
result = resolver_t.resolve(['python'], indices)
result = resolver_t.resolve_specs(['python'], indices)
versions.append(result.resolved['python'].version)
for i in range(1, len(versions)):

@ -1,7 +1,7 @@
import unittest
import dataclasses
from ..models import (
from ..apps.specs.models import (
vercmp_t,
constraint_op_t,
package_constraint_t,