[+] unified resolver: constraints_t, resolver_base_t, requested flag

1. add resolver/common.py with constraints_t class holding all constraints
     with filtered property views (install, excluded, ignored, pinned,
     upgrade, requested) and resolver_base_t abstract base class;
  2. refactor resolver/general.py: extend resolver_base_t, resolve() takes
     constraints_t, add resolve_specs() classmethod convenience for tests;
  3. refactor resolver/solv.py: solv_resolver_t extends resolver_base_t,
     fix pkg_spec NameError in error messages, remove duplicate parse_reference;
  4. add requested flag to package_constraint_t for tracking user-specified
     packages vs reference pins;
  5. update test_resolver.py and test_solv_backend.py for new interface;
This commit is contained in:
LLM 2026-04-22 09:00:00 +00:00
parent f0c9d072e0
commit d0215504d0
6 changed files with 367 additions and 123 deletions

@ -0,0 +1,91 @@
"""Common resolver interface and constraint preprocessing."""
import abc
from ..apps.specs.models import (
constraint_op_t,
package_constraint_t,
package_index_t,
resolve_result_t,
)
class constraints_t:
"""Holds all constraints from all sources. Provides filtered views."""
def __init__(self) -> None:
self._all: list[package_constraint_t] = []
def add(self, c: package_constraint_t) -> None:
self._all.append(c)
def add_spec(self, spec: str, requested: bool = False) -> None:
c = package_constraint_t.parse(spec)
if requested:
c.requested = True
self._all.append(c)
def add_pinned(self, name: str, version: str) -> None:
self._all.append(package_constraint_t(
name=name, op=constraint_op_t.eq, version=version, pinned=True,
))
def add_upgrade(self, name: str) -> None:
self._all.append(package_constraint_t(name=name, upgrade=True))
@property
def ignored_names(self) -> set[str]:
return {c.name for c in self._all if c.ignore}
@property
def excluded_names(self) -> set[str]:
return {c.name for c in self._all if c.exclude}
@property
def skip_names(self) -> set[str]:
return self.ignored_names | self.excluded_names
@property
def install(self) -> list[package_constraint_t]:
skip = self.skip_names
return [c for c in self._all if not c.ignore and not c.exclude and c.name not in skip]
@property
def pinned(self) -> dict[str, str]:
skip = self.skip_names
result: dict[str, str] = {}
for c in self._all:
if c.pinned and c.op is constraint_op_t.eq and c.version is not None and c.name not in skip:
result[c.name] = c.version
return result
@property
def requested_names(self) -> set[str]:
return {c.name for c in self._all if c.requested}
@property
def upgrade_names(self) -> set[str]:
return {c.name for c in self._all if c.upgrade}
def filter_pinned(self, pinned: dict[str, str]) -> dict[str, str]:
skip = self.skip_names
return {k: v for k, v in pinned.items() if k not in skip}
@classmethod
def from_specs(cls, specs: list[str], requested: bool = False) -> 'constraints_t':
result = cls()
for spec in specs:
result.add_spec(spec, requested=requested)
return result
class resolver_base_t(abc.ABC):
"""Common interface for all resolver backends."""
@abc.abstractmethod
def resolve(
self,
constraints: constraints_t,
indices: list[package_index_t],
) -> resolve_result_t:
raise NotImplementedError

@ -1,21 +1,20 @@
import dataclasses
import logging
from typing import (
Optional,
)
from typing import Optional
from ..models import (
from ..apps.specs.models import (
package_t,
package_constraint_t,
package_index_t,
resolve_result_t,
)
from .common import constraints_t, resolver_base_t
logger = logging.getLogger(__name__)
class resolver_t:
class resolver_t(resolver_base_t):
class error_t:
class not_found_t(Exception):
def __init__(self, name: str) -> None:
@ -63,26 +62,36 @@ class resolver_t:
return None
@staticmethod
def resolve(
packages: list[str],
@classmethod
def resolve_specs(
cls,
specs: list[str],
indices: list[package_index_t],
skip_installed: Optional[set[str]] = None,
) -> resolve_result_t:
if skip_installed is None:
skip_installed = set()
"""Convenience: parse raw specs and resolve."""
return cls().resolve(constraints_t.from_specs(specs), indices)
def resolve(
self,
constraints: constraints_t,
indices: list[package_index_t],
) -> resolve_result_t:
excluded = constraints.excluded_names
ignored = constraints.ignored_names
result = resolve_result_t()
visited: set[str] = set()
stack: list[tuple[package_constraint_t, Optional[str]]] = []
for pkg_str in packages:
constraint = package_constraint_t.parse(pkg_str)
for constraint in constraints.install:
stack.append((constraint, None))
while len(stack) > 0:
constraint, parent = stack.pop()
if constraint.name in excluded:
continue
if constraint.name in visited:
if constraint.name in result.resolved:
pkg = result.resolved[constraint.name]
@ -93,7 +102,7 @@ class resolver_t:
)
continue
if constraint.name in skip_installed:
if constraint.name in ignored:
visited.add(constraint.name)
continue
@ -152,7 +161,7 @@ class resolver_t:
)
for dep in pkg.depends:
if dep.name not in visited and dep.name not in skip_installed:
if dep.name not in visited and dep.name not in ignored:
stack.append((dep, pkg.name))
return result

@ -15,13 +15,17 @@ from typing import (
Any,
)
from ..models import (
from ..apps.specs.models import (
constraint_op_t,
package_t,
package_index_t,
package_constraint_t,
resolve_result_t,
vercmp_t,
)
from .common import constraints_t, resolver_base_t
from .solv_types import (
solv_package_t,
solv_index_t,
@ -195,20 +199,6 @@ class solv_pool_t:
expanded.append(pkg_name)
return expanded
@staticmethod
def parse_reference(txt: str) -> dict[str, str]:
pinned: dict[str, str] = {}
for line in txt.splitlines():
line = line.strip()
if line == '' or line.startswith('#'):
continue
parts = line.split()
pkg_spec = parts[0]
if '==' in pkg_spec:
name, version = pkg_spec.split('==', 1)
pinned[name] = version
return pinned
def resolve(
self,
packages: list[str],
@ -232,15 +222,11 @@ class solv_pool_t:
upgrade_packages = self.expand_groups(upgrade_packages)
upgrade_set = set(upgrade_packages)
for pkg_spec in packages:
pkg_name = (
pkg_spec.split('>=')[0]
.split('<=')[0]
.split('>')[0]
.split('<')[0]
.split('==')[0]
.split('=')[0]
)
rc = constraints_t.from_specs(packages)
excluded = rc.excluded_names
for constraint in rc.install:
pkg_name = constraint.name
target_evr: Optional[str] = None
if (
@ -249,36 +235,62 @@ class solv_pool_t:
and pkg_name not in upgrade_set
):
target_evr = pinned[pkg_name]
elif pkg_name != pkg_spec and '==' in pkg_spec:
target_evr = pkg_spec.split('==', 1)[1]
elif constraint.op is not None and constraint.op is constraint_op_t.eq:
target_evr = constraint.version
sel = self._pool.select(
pkg_name,
solv.Selection.SELECTION_NAME,
)
if sel.isempty():
result.problems.append('package not found: %s' % pkg_spec)
result.problems.append('package not found: %s' % pkg_name)
continue
if target_evr is not None:
# match by exact name + version, never via provides hijack
# exact version match
matching = [s for s in sel.solvables() if s.evr == target_evr]
if len(matching) == 0:
result.problems.append(
'no name match for %s==%s' % (pkg_name, target_evr)
)
continue
# pick exactly one solvable — multiple snapshots may yield
# duplicates of the same pkg which libsolv considers conflicting
jobs.append(
self._pool.Job(
solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE,
matching[0].id,
)
)
elif constraint.op is not None and constraint.version is not None:
# version constraint (<, <=, >=, >) — filter solvables
matching = [
s for s in sel.solvables()
if constraint.satisfied_by(str(s.evr))
]
if len(matching) == 0:
result.problems.append(
'no version satisfies %s' % constraint.to_str()
)
continue
# pick the best: for < and <= pick highest matching, for > and >= also highest
best = matching[0]
for s in matching[1:]:
if vercmp_t.vercmp(str(s.evr), str(best.evr)) > 0:
best = s
jobs.append(
self._pool.Job(
solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE,
best.id,
)
)
else:
jobs += sel.jobs(solv.Job.SOLVER_INSTALL)
# add exclusion jobs — lock excluded packages as not installable
for excl_name in excluded:
excl_sel = self._pool.select(excl_name, solv.Selection.SELECTION_NAME)
if not excl_sel.isempty():
jobs += excl_sel.jobs(solv.Job.SOLVER_LOCK)
if len(result.problems) > 0:
return result
@ -292,6 +304,10 @@ class solv_pool_t:
trans = solver.transaction()
new_solvables = list(trans.newsolvables())
for s in new_solvables:
if s.name in excluded:
raise RuntimeError(
'libsolv resolved excluded package %s — exclusion constraint violated' % s.name
)
result.resolved[s.name] = s
if logger.isEnabledFor(logging.DEBUG):
@ -365,16 +381,14 @@ class solv_pool_t:
)
def resolve(
indices: list[package_index_t],
packages: list[str],
pinned: Optional[dict[str, str]] = None,
upgrade_packages: Optional[list[str]] = None,
) -> resolve_result_t:
"""Resolve using libsolv. Takes general types, returns general types.
class solv_resolver_t(resolver_base_t):
"""Libsolv-based resolver implementing the common interface."""
Converts package_index_t solv_index_t internally.
"""
def resolve(
self,
constraints: 'constraints_t',
indices: list[package_index_t],
) -> resolve_result_t:
# convert general → solv internal types
stores: list[repo_store_t] = []
for idx in indices:
@ -397,6 +411,15 @@ def resolve(
pool = solv_pool_t(stores=stores)
# build raw package specs for solv_pool_t.resolve
packages: list[str] = [c.to_str() for c in constraints.install]
for name in constraints.excluded_names:
packages.append('!%s' % name)
pinned = constraints.pinned or None
upgrade_names = constraints.upgrade_names
upgrade_packages = list(upgrade_names) if len(upgrade_names) > 0 else None
solv_result = pool.resolve(
packages=packages,
pinned=pinned,
@ -408,7 +431,6 @@ def resolve(
result.problems = list(solv_result.problems)
for pkg_name, solvable in solv_result.resolved.items():
# find the general package_t matching this solvable by name+version
for idx in indices:
version_map = idx.packages.get(pkg_name)
if version_map is None:

@ -1,6 +1,6 @@
"""Re-export general types from models for convenience."""
from ..models import (
from ..apps.specs.models import (
package_t,
package_index_t,
resolve_result_t,

@ -2,7 +2,7 @@ import unittest
from typing import Optional
from ..resolver.general import resolver_t
from ..models import (
from ..apps.specs.models import (
package_t,
package_index_t,
package_constraint_t,
@ -45,7 +45,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['bash'], [idx])
result = resolver_t.resolve_specs(['bash'], [idx])
self.assertIn('bash', result.resolved)
self.assertEqual(result.resolved['bash'].version, '5.2.015-1')
@ -60,7 +60,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['python'], [idx])
result = resolver_t.resolve_specs(['python'], [idx])
self.assertIn('python', result.resolved)
self.assertIn('glibc', result.resolved)
@ -75,7 +75,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['a'], [idx])
result = resolver_t.resolve_specs(['a'], [idx])
self.assertEqual(len(result.resolved), 3)
self.assertIn('a', result.resolved)
@ -91,7 +91,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['python'], [idx])
result = resolver_t.resolve_specs(['python'], [idx])
self.assertIn('glibc', result.resolved)
@ -104,7 +104,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['python'], [idx])
result = resolver_t.resolve_specs(['python'], [idx])
self.assertGreater(len(result.problems), 0)
def test_resolve_not_found(self) -> None:
@ -115,7 +115,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['nonexistent'], [idx])
result = resolver_t.resolve_specs(['nonexistent'], [idx])
self.assertGreater(len(result.problems), 0)
self.assertTrue(any('nonexistent' in p for p in result.problems))
@ -129,7 +129,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['bash', 'python', 'gcc'], [idx])
result = resolver_t.resolve_specs(['bash', 'python', 'gcc'], [idx])
self.assertEqual(len(result.resolved), 3)
@ -143,7 +143,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['python', 'bash'], [idx])
result = resolver_t.resolve_specs(['python', 'bash'], [idx])
self.assertEqual(len(result.resolved), 3)
self.assertEqual(result.resolution_order.count('glibc'), 1)
@ -162,7 +162,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['python'], [extra, core])
result = resolver_t.resolve_specs(['python'], [extra, core])
self.assertIn('python', result.resolved)
self.assertIn('glibc', result.resolved)
@ -181,7 +181,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['bash'], [core, extra])
result = resolver_t.resolve_specs(['bash'], [core, extra])
self.assertEqual(result.resolved['bash'].version, '5.2.015-1')
@ -198,7 +198,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['app'], [idx])
result = resolver_t.resolve_specs(['app'], [idx])
self.assertIn('python', result.resolved)
self.assertIn('app', result.resolved)
@ -216,7 +216,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['app'], [idx])
result = resolver_t.resolve_specs(['app'], [idx])
self.assertIn('python', result.resolved)
@ -230,11 +230,11 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['app'], [idx])
result = resolver_t.resolve_specs(['app'], [idx])
self.assertGreater(len(result.problems), 0)
self.assertTrue(any('conflict' in p for p in result.problems))
def test_resolve_skip_installed(self) -> None:
def test_resolve_skip_via_ignore(self) -> None:
idx = self._make_index(
'core',
[
@ -242,10 +242,9 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(
['python'],
result = resolver_t.resolve_specs(
['python', '-glibc'],
[idx],
skip_installed={'glibc'},
)
self.assertIn('python', result.resolved)
@ -259,7 +258,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve([], [idx])
result = resolver_t.resolve_specs([], [idx])
self.assertEqual(len(result.resolved), 0)
self.assertEqual(result.resolution_order, [])
@ -273,7 +272,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['a'], [idx])
result = resolver_t.resolve_specs(['a'], [idx])
self.assertIn('a', result.resolved)
self.assertIn('b', result.resolved)
@ -289,7 +288,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['app'], [idx])
result = resolver_t.resolve_specs(['app'], [idx])
self.assertEqual(len(result.resolved), 4)
self.assertEqual(result.resolution_order.count('libcommon'), 1)
@ -302,7 +301,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['bash>=5.0'], [idx])
result = resolver_t.resolve_specs(['bash>=5.0'], [idx])
self.assertIn('bash', result.resolved)
@ -314,7 +313,7 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['bash==5.2.015-1'], [idx])
result = resolver_t.resolve_specs(['bash==5.2.015-1'], [idx])
self.assertIn('bash', result.resolved)
@ -326,9 +325,43 @@ class TestResolver(unittest.TestCase):
],
)
result = resolver_t.resolve(['bash==5.1.000-1'], [idx])
result = resolver_t.resolve_specs(['bash==5.1.000-1'], [idx])
self.assertGreater(len(result.problems), 0)
def test_exclude_package(self) -> None:
idx = self._make_index(
'core',
[
self._pkg(name='app', version='1.0', depends=['foo']),
self._pkg(name='foo', version='1.0-1'),
self._pkg(name='bar', version='1.0-1'),
],
)
result = resolver_t.resolve_specs(['app', 'bar', '!bar'], [idx])
self.assertIn('app', result.resolved)
self.assertIn('foo', result.resolved)
self.assertNotIn('bar', result.resolved)
def test_exclude_transitive_dep(self) -> None:
idx = self._make_index(
'extra',
[
self._pkg(name='qpwgraph', version='1.0.0-1', depends=['libgcc']),
self._pkg(name='qpwgraph', version='0.8.1-1', depends=['gcc-libs']),
self._pkg(name='libgcc', version='15.2.1-1'),
self._pkg(name='gcc-libs', version='15.1.1-1'),
],
)
# without exclusion, picks latest qpwgraph which needs libgcc
result_no_excl = resolver_t.resolve_specs(['qpwgraph'], [idx])
self.assertIn('qpwgraph', result_no_excl.resolved)
# with !libgcc, should pick older qpwgraph or fail
result = resolver_t.resolve_specs(['qpwgraph', '!libgcc'], [idx])
self.assertNotIn('libgcc', result.resolved)
def test_error_not_found_message(self) -> None:
err = resolver_t.error_t.not_found_t('missing-pkg')
self.assertIn('missing-pkg', str(err))

@ -100,6 +100,95 @@ class TestSolvPoolUnit(unittest.TestCase):
self.assertIn('glibc', result.resolved)
self.assertEqual(len(result.problems), 0)
def test_resolve_less_than_constraint(self) -> None:
"""qpwgraph<0.9 should NOT resolve to 1.0.0."""
idx = solv_index_t(name='extra')
idx.add(solv_package_t(name='qpwgraph', version='0.8.0-1', arch='x86_64'))
idx.add(solv_package_t(name='qpwgraph', version='1.0.0-1', arch='x86_64'))
idx.build_provides_index()
pool = solv_pool_t(stores=[repo_store_t(index=idx)])
result = pool.resolve(['qpwgraph<0.9'])
self.assertIn('qpwgraph', result.resolved)
self.assertEqual(result.resolved['qpwgraph'].evr, '0.8.0-1')
self.assertEqual(len(result.problems), 0)
def test_resolve_less_equal_constraint(self) -> None:
idx = solv_index_t(name='extra')
idx.add(solv_package_t(name='foo', version='1.0-1', arch='x86_64'))
idx.add(solv_package_t(name='foo', version='2.0-1', arch='x86_64'))
idx.add(solv_package_t(name='foo', version='3.0-1', arch='x86_64'))
idx.build_provides_index()
pool = solv_pool_t(stores=[repo_store_t(index=idx)])
result = pool.resolve(['foo<=2.0-1'])
self.assertIn('foo', result.resolved)
self.assertIn(result.resolved['foo'].evr, ['1.0-1', '2.0-1'])
self.assertNotEqual(result.resolved['foo'].evr, '3.0-1')
def test_resolve_greater_equal_constraint(self) -> None:
idx = solv_index_t(name='extra')
idx.add(solv_package_t(name='bar', version='1.0-1', arch='x86_64'))
idx.add(solv_package_t(name='bar', version='2.0-1', arch='x86_64'))
idx.add(solv_package_t(name='bar', version='3.0-1', arch='x86_64'))
idx.build_provides_index()
pool = solv_pool_t(stores=[repo_store_t(index=idx)])
result = pool.resolve(['bar>=2.0-1'])
self.assertIn('bar', result.resolved)
self.assertIn(result.resolved['bar'].evr, ['2.0-1', '3.0-1'])
self.assertNotEqual(result.resolved['bar'].evr, '1.0-1')
def test_resolve_constraint_no_match(self) -> None:
idx = solv_index_t(name='extra')
idx.add(solv_package_t(name='baz', version='5.0-1', arch='x86_64'))
idx.build_provides_index()
pool = solv_pool_t(stores=[repo_store_t(index=idx)])
result = pool.resolve(['baz<1.0'])
self.assertNotIn('baz', result.resolved)
self.assertGreater(len(result.problems), 0)
def test_exclude_package(self) -> None:
"""!libgcc should prevent libgcc from being resolved."""
idx = solv_index_t(name='extra')
idx.add(solv_package_t(name='qpwgraph', version='1.0.0-1', arch='x86_64', depends=['libgcc']))
idx.add(solv_package_t(name='qpwgraph', version='0.8.1-1', arch='x86_64', depends=['gcc-libs']))
idx.add(solv_package_t(name='libgcc', version='15.2.1-1', arch='x86_64'))
idx.add(solv_package_t(name='gcc-libs', version='15.1.1-1', arch='x86_64'))
idx.build_provides_index()
pool = solv_pool_t(stores=[repo_store_t(index=idx)])
result = pool.resolve(['qpwgraph', '!libgcc'])
self.assertIn('qpwgraph', result.resolved)
self.assertNotIn('libgcc', result.resolved)
# should pick 0.8.1 which depends on gcc-libs, not libgcc
self.assertEqual(result.resolved['qpwgraph'].evr, '0.8.1-1')
self.assertIn('gcc-libs', result.resolved)
def test_exclude_multiple(self) -> None:
idx = solv_index_t(name='test')
idx.add(solv_package_t(name='app', version='1.0-1', arch='x86_64', depends=['foo']))
idx.add(solv_package_t(name='foo', version='1.0-1', arch='x86_64'))
idx.add(solv_package_t(name='bar', version='1.0-1', arch='x86_64'))
idx.build_provides_index()
pool = solv_pool_t(stores=[repo_store_t(index=idx)])
result = pool.resolve(['app', 'bar', '!bar'])
self.assertIn('app', result.resolved)
self.assertNotIn('bar', result.resolved)
def test_exclude_only(self) -> None:
"""Just exclusions, no packages to install."""
idx = solv_index_t(name='test')
idx.add(solv_package_t(name='foo', version='1.0-1', arch='x86_64'))
idx.build_provides_index()
pool = solv_pool_t(stores=[repo_store_t(index=idx)])
result = pool.resolve(['!foo'])
self.assertNotIn('foo', result.resolved)
self.assertEqual(len(result.problems), 0)
def test_resolve_multiple_repos(self) -> None:
core = solv_index_t(name='core')
core.add(solv_package_t(name='glibc', version='2.38-1', arch='x86_64'))