[+] remove old pre-refactor archlinux modules

1. delete archive.py, cache_db.py, cli.py, compile.py, db.py, pacman.py, resolver.py, solv_backend.py;
  2. all functionality moved to apps/, cli/, resolver/ subpackages;
This commit is contained in:
LLM 2026-04-09 09:00:00 +00:00
parent 7a03db3e97
commit 1e1cd6c1c0
8 changed files with 0 additions and 2488 deletions

@ -1,294 +0,0 @@
import argparse
import datetime
import enum
import logging
import pathlib
import re
from typing import (
ClassVar,
Optional,
)
from .cache_db import cache_db_t
from .db import db_parser_t
from .models import mirror_config_t
from .pacman import pacman_t
logger = logging.getLogger(__name__)
class ArchiveAction(enum.Enum):
list_dates = 'list-dates'
list_packages = 'list-packages'
show_versions = 'show-versions'
sync = 'sync'
class archive_t:
class constants_t:
base_url: ClassVar[str] = 'https://archive.archlinux.org/repos/'
href_re: ClassVar[re.Pattern[str]] = re.compile(r'href="(\d{4}/\d{2}/\d{2})/"')
default_repos: ClassVar[list[str]] = ['core', 'extra', 'multilib']
@staticmethod
def list_remote_dates(
base_url: Optional[str] = None,
) -> list[str]:
"""Scrape available date directories from the archive index page."""
import urllib.request
if base_url is None:
base_url = archive_t.constants_t.base_url
logger.info(dict(msg='fetching archive index', url=base_url))
with urllib.request.urlopen(base_url) as resp:
html = resp.read().decode('utf-8')
dates: list[str] = []
for m in archive_t.constants_t.href_re.finditer(html):
dates.append(m.group(1))
dates.sort(reverse=True)
return dates
@staticmethod
def sync_date(
date: str,
cache_dir: pathlib.Path,
cache_db: cache_db_t,
repos: Optional[list[str]] = None,
arch: str = 'x86_64',
) -> None:
if repos is None:
repos = list(archive_t.constants_t.default_repos)
mirror = mirror_config_t.from_archive_date(
date=date,
repos=repos,
arch=arch,
)
db_dir = cache_dir / date
db_dir.mkdir(parents=True, exist_ok=True)
for repo_cfg in mirror.repos:
db_url = '%s/%s.db' % (repo_cfg.url, repo_cfg.name)
db_path = db_dir / ('%s.db' % repo_cfg.name)
db_rel_path = '%s/%s.db' % (date, repo_cfg.name)
if not db_path.exists():
logger.info(
dict(
msg='downloading db',
url=db_url,
dest=str(db_path),
)
)
pacman_t.download_db(db_url, db_path)
else:
logger.info(
dict(
msg='db already cached on disk',
path=str(db_path),
)
)
db_sha256 = cache_db_t.file_sha256(db_path)
snapshot_id = cache_db.upsert_snapshot(
date=date,
repo=repo_cfg.name,
arch=arch,
db_sha256=db_sha256,
db_rel_path=db_rel_path,
)
if cache_db.snapshot_package_count(snapshot_id) > 0:
snap = cache_db.get_snapshot_by_id(snapshot_id)
if snap is not None and snap.db_sha256 == db_sha256:
logger.info(
dict(
msg='snapshot already in sqlite',
date=date,
repo=repo_cfg.name,
snapshot_id=snapshot_id,
)
)
continue
index = db_parser_t.parse_db_path(db_path, repo_name=repo_cfg.name)
cache_db.store_index(
snapshot_id=snapshot_id,
index=index,
)
logger.info(
dict(
msg='synced',
date=date,
repo=repo_cfg.name,
packages=len(index.packages),
)
)
@staticmethod
def _parse_date(s: str) -> datetime.date:
parts = s.split('/')
if len(parts) == 3:
return datetime.date(int(parts[0]), int(parts[1]), int(parts[2]))
return datetime.date.fromisoformat(s)
@staticmethod
def _format_date(d: datetime.date) -> str:
return '%04d/%02d/%02d' % (d.year, d.month, d.day)
@staticmethod
def sync_date_range(
start_date: str,
end_date: str,
cache_dir: pathlib.Path,
cache_db: cache_db_t,
repos: Optional[list[str]] = None,
arch: str = 'x86_64',
step_days: int = 1,
) -> None:
start = archive_t._parse_date(start_date)
end = archive_t._parse_date(end_date)
step = datetime.timedelta(days=step_days)
current = end
while current >= start:
date_str = archive_t._format_date(current)
try:
archive_t.sync_date(
date=date_str,
cache_dir=cache_dir,
cache_db=cache_db,
repos=repos,
arch=arch,
)
except Exception:
logger.warning(
dict(
msg='failed to sync date, skipping',
date=date_str,
),
exc_info=True,
)
current -= step
def main(args: list[str]) -> int:
archive_parser = argparse.ArgumentParser(
prog='online-fxreader-pr34-archlinux archive',
)
archive_parser.add_argument(
'action',
choices=[o.value for o in ArchiveAction],
)
archive_parser.add_argument(
'--cache-dir',
dest='cache_dir',
required=True,
help='directory for cached .db files and sqlite database',
)
archive_parser.add_argument(
'--repos',
nargs='*',
default=['core', 'extra', 'multilib'],
)
archive_parser.add_argument(
'--arch',
default='x86_64',
)
archive_parser.add_argument(
'--date',
default=None,
help='single date (e.g. 2024/01/15) for sync',
)
archive_parser.add_argument(
'--date-range',
dest='date_range',
nargs=2,
metavar=('START', 'END'),
default=None,
help='date range for sync (e.g. 2024/01/01 2024/06/30)',
)
archive_parser.add_argument(
'--date-step',
dest='date_step',
type=int,
default=1,
help='step in days when iterating date range, default 1',
)
archive_parser.add_argument(
'--packages',
nargs='*',
default=None,
help='package names for show-versions',
)
archive_options = archive_parser.parse_args(args)
archive_options.action = ArchiveAction(archive_options.action)
cache_dir = pathlib.Path(archive_options.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
db = cache_db_t(cache_dir / 'archlinux_cache.db')
try:
if archive_options.action is ArchiveAction.list_dates:
if db.has_data():
print('=== cached dates ===')
for date_str in db.list_dates():
print(date_str)
print('=== remote dates ===')
for date_str in archive_t.list_remote_dates():
print(date_str)
elif archive_options.action is ArchiveAction.list_packages:
for row in db.package_count_per_date():
print('%s %d' % (row.date, row.count))
elif archive_options.action is ArchiveAction.show_versions:
if archive_options.packages is None or len(archive_options.packages) == 0:
logger.error('--packages required for show-versions')
return 1
for row in db.get_package_versions(archive_options.packages):
print('%s %s %s %s' % (row.date, row.repo, row.name, row.version))
elif archive_options.action is ArchiveAction.sync:
if archive_options.date is not None:
archive_t.sync_date(
date=archive_options.date,
cache_dir=cache_dir,
cache_db=db,
repos=archive_options.repos,
arch=archive_options.arch,
)
elif archive_options.date_range is not None:
archive_t.sync_date_range(
start_date=archive_options.date_range[0],
end_date=archive_options.date_range[1],
cache_dir=cache_dir,
cache_db=db,
repos=archive_options.repos,
arch=archive_options.arch,
step_days=archive_options.date_step,
)
else:
logger.error('sync requires --date or --date-range')
return 1
else:
raise NotImplementedError
finally:
db.close()
return 0

@ -1,689 +0,0 @@
import datetime
import hashlib
import io
import logging
import pathlib
import sqlite3
from typing import (
ClassVar,
Generator,
Optional,
TypeVar,
)
import pydantic
from .models import (
package_desc_t,
repo_index_t,
)
logger = logging.getLogger(__name__)
_T = TypeVar('_T', bound=pydantic.BaseModel)
class snapshot_row_t(pydantic.BaseModel):
id: int
date: str
repo: str
arch: str
db_sha256: str
db_rel_path: str
synced_at: str
class package_row_t(pydantic.BaseModel):
id: int
snapshot_id: int
name: str
version: str
base: str = ''
desc: str = ''
filename: str = ''
csize: int = 0
isize: int = 0
md5sum: str = ''
sha256sum: str = ''
url: str = ''
arch: str = ''
builddate: int = 0
packager: str = ''
class package_version_row_t(pydantic.BaseModel):
date: str
repo: str
name: str
version: str
class date_count_row_t(pydantic.BaseModel):
date: str
count: int
class package_hash_row_t(pydantic.BaseModel):
sha256sum: str
class local_package_row_t(pydantic.BaseModel):
id: int
name: str
version: str
filename: str
sha256sum: str
local_path: str
downloaded_at: str
class signature_row_t(pydantic.BaseModel):
id: int
local_package_id: int
sig_path: str
keyring_package_version: Optional[str] = None
gpg_key_id: Optional[str] = None
verified_at: Optional[str] = None
class trusted_entry_t(pydantic.BaseModel, frozen=True):
name: str
version: str
def _stream_rows(
cur: sqlite3.Cursor,
model: type[_T],
) -> Generator[_T, None, None]:
columns = [desc[0] for desc in cur.description]
for raw in cur:
yield model.model_validate(dict(zip(columns, raw)))
def _fetch_one(
cur: sqlite3.Cursor,
model: type[_T],
) -> Optional[_T]:
columns = [desc[0] for desc in cur.description]
raw = cur.fetchone()
if raw is None:
return None
return model.model_validate(dict(zip(columns, raw)))
class cache_db_t:
class constants_t:
schema_version: ClassVar[int] = 1
list_relation_types: ClassVar[dict[str, str]] = {
'license': 'license',
'depends': 'depends',
'optdepends': 'optdepends',
'makedepends': 'makedepends',
'checkdepends': 'checkdepends',
'provides': 'provides',
'conflicts': 'conflicts',
'replaces': 'replaces',
'groups': 'groups',
}
def __init__(self, db_path: pathlib.Path) -> None:
self._db_path = db_path
self._conn = sqlite3.connect(str(db_path))
self._conn.execute('PRAGMA journal_mode=WAL')
self._conn.execute('PRAGMA foreign_keys=ON')
self._ensure_schema()
def close(self) -> None:
self._conn.close()
def _ensure_schema(self) -> None:
cur = self._conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='schema_meta'")
if cur.fetchone() is None:
self._create_schema(cur)
self._conn.commit()
return
cur.execute('SELECT version FROM schema_meta LIMIT 1')
row = cur.fetchone()
if row is None or row[0] < cache_db_t.constants_t.schema_version:
self._create_schema(cur)
self._conn.commit()
def _create_schema(self, cur: sqlite3.Cursor) -> None:
cur.executescript("""
CREATE TABLE IF NOT EXISTS schema_meta (
version INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
repo TEXT NOT NULL,
arch TEXT NOT NULL DEFAULT 'x86_64',
db_sha256 TEXT NOT NULL,
db_rel_path TEXT NOT NULL DEFAULT '',
synced_at TEXT NOT NULL,
UNIQUE(date, repo, arch)
);
CREATE TABLE IF NOT EXISTS packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL REFERENCES snapshots(id) ON DELETE CASCADE,
name TEXT NOT NULL,
version TEXT NOT NULL,
base TEXT NOT NULL DEFAULT '',
desc TEXT NOT NULL DEFAULT '',
filename TEXT NOT NULL DEFAULT '',
csize INTEGER NOT NULL DEFAULT 0,
isize INTEGER NOT NULL DEFAULT 0,
md5sum TEXT NOT NULL DEFAULT '',
sha256sum TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
arch TEXT NOT NULL DEFAULT '',
builddate INTEGER NOT NULL DEFAULT 0,
packager TEXT NOT NULL DEFAULT '',
UNIQUE(snapshot_id, name)
);
CREATE TABLE IF NOT EXISTS package_relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
relation_type TEXT NOT NULL,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS local_packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
version TEXT NOT NULL,
filename TEXT NOT NULL,
sha256sum TEXT NOT NULL DEFAULT '',
local_path TEXT NOT NULL,
downloaded_at TEXT NOT NULL,
UNIQUE(name, version, filename)
);
CREATE TABLE IF NOT EXISTS local_signatures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
local_package_id INTEGER NOT NULL REFERENCES local_packages(id) ON DELETE CASCADE,
sig_path TEXT NOT NULL,
keyring_package_version TEXT DEFAULT NULL,
gpg_key_id TEXT DEFAULT NULL,
verified_at TEXT DEFAULT NULL,
UNIQUE(local_package_id)
);
CREATE INDEX IF NOT EXISTS idx_packages_snapshot ON packages(snapshot_id);
CREATE INDEX IF NOT EXISTS idx_packages_name ON packages(name);
CREATE INDEX IF NOT EXISTS idx_packages_name_version ON packages(name, version);
CREATE INDEX IF NOT EXISTS idx_snapshots_date ON snapshots(date);
CREATE INDEX IF NOT EXISTS idx_package_relations_pkg
ON package_relations(package_id, relation_type);
CREATE INDEX IF NOT EXISTS idx_local_packages_name_version
ON local_packages(name, version);
""")
cur.execute('DELETE FROM schema_meta')
cur.execute(
'INSERT INTO schema_meta (version) VALUES (?)',
(cache_db_t.constants_t.schema_version,),
)
# ── helpers ──
@staticmethod
def file_sha256(path: pathlib.Path) -> str:
h = hashlib.sha256()
with io.open(path, 'rb') as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
# ── snapshot CRUD ──
def upsert_snapshot(
self,
date: str,
repo: str,
arch: str,
db_sha256: str,
db_rel_path: str = '',
) -> int:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
cur = self._conn.cursor()
cur.execute(
'SELECT id, db_sha256 FROM snapshots WHERE date=? AND repo=? AND arch=?',
(date, repo, arch),
)
row = cur.fetchone()
if row is not None:
snapshot_id: int = row[0]
if row[1] == db_sha256:
return snapshot_id
cur.execute(
'DELETE FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
cur.execute(
'UPDATE snapshots SET db_sha256=?, db_rel_path=?, synced_at=? WHERE id=?',
(db_sha256, db_rel_path, now, snapshot_id),
)
self._conn.commit()
return snapshot_id
cur.execute(
'INSERT INTO snapshots (date, repo, arch, db_sha256, db_rel_path, synced_at) VALUES (?, ?, ?, ?, ?, ?)',
(date, repo, arch, db_sha256, db_rel_path, now),
)
self._conn.commit()
assert cur.lastrowid is not None
return cur.lastrowid
def get_snapshot(
self,
date: str,
repo: str,
arch: str,
) -> Optional[snapshot_row_t]:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM snapshots WHERE date=? AND repo=? AND arch=?',
(date, repo, arch),
)
return _fetch_one(cur, snapshot_row_t)
def get_snapshot_by_id(
self,
snapshot_id: int,
) -> Optional[snapshot_row_t]:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM snapshots WHERE id=?',
(snapshot_id,),
)
return _fetch_one(cur, snapshot_row_t)
def list_snapshots(self) -> Generator[snapshot_row_t, None, None]:
cur = self._conn.cursor()
cur.execute('SELECT * FROM snapshots ORDER BY date DESC, repo')
yield from _stream_rows(cur, snapshot_row_t)
def list_dates(self) -> list[str]:
cur = self._conn.cursor()
cur.execute('SELECT DISTINCT date FROM snapshots ORDER BY date DESC')
return [row[0] for row in cur.fetchall()]
def snapshot_package_count(self, snapshot_id: int) -> int:
cur = self._conn.cursor()
cur.execute(
'SELECT COUNT(*) FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
row = cur.fetchone()
return row[0] if row is not None else 0
# ── package CRUD ──
def store_index(
self,
snapshot_id: int,
index: repo_index_t,
) -> None:
cur = self._conn.cursor()
pkg_rows: list[tuple[int, str, str, str, str, str, int, int, str, str, str, str, int, str]] = []
for pkg in index.packages.values():
pkg_rows.append(
(
snapshot_id,
pkg.name,
pkg.version,
pkg.base,
pkg.desc,
pkg.filename,
pkg.csize,
pkg.isize,
pkg.md5sum,
pkg.sha256sum,
pkg.url,
pkg.arch,
pkg.builddate,
pkg.packager,
)
)
cur.executemany(
'INSERT OR REPLACE INTO packages '
'(snapshot_id, name, version, base, desc, filename, csize, isize, '
'md5sum, sha256sum, url, arch, builddate, packager) '
'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
pkg_rows,
)
cur.execute(
'SELECT id, name FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
pkg_id_map: dict[str, int] = {}
for row_raw in cur.fetchall():
pkg_id_map[row_raw[1]] = row_raw[0]
rel_rows: list[tuple[int, str, str]] = []
for pkg in index.packages.values():
pkg_id = pkg_id_map.get(pkg.name)
if pkg_id is None:
continue
for rel_type, attr_name in cache_db_t.constants_t.list_relation_types.items():
values: list[str] = getattr(pkg, attr_name)
for v in values:
rel_rows.append((pkg_id, rel_type, v))
if len(rel_rows) > 0:
cur.executemany(
'INSERT INTO package_relations (package_id, relation_type, value) VALUES (?, ?, ?)',
rel_rows,
)
self._conn.commit()
logger.info(
dict(
msg='stored index',
snapshot_id=snapshot_id,
packages=len(pkg_rows),
relations=len(rel_rows),
)
)
def package_count_per_date(self) -> Generator[date_count_row_t, None, None]:
cur = self._conn.cursor()
cur.execute('SELECT s.date AS date, COUNT(p.id) AS count FROM snapshots s JOIN packages p ON p.snapshot_id = s.id GROUP BY s.date ORDER BY s.date DESC')
yield from _stream_rows(cur, date_count_row_t)
def get_package_versions(
self,
names: list[str],
) -> Generator[package_version_row_t, None, None]:
if len(names) == 0:
yield from ()
return
cur = self._conn.cursor()
placeholders = ','.join('?' for _ in names)
cur.execute(
'SELECT s.date AS date, s.repo AS repo, p.name AS name, p.version AS version '
'FROM packages p '
'JOIN snapshots s ON s.id = p.snapshot_id '
'WHERE p.name IN (%s) '
'ORDER BY p.name, s.date DESC' % placeholders,
names,
)
yield from _stream_rows(cur, package_version_row_t)
def find_package_hash(
self,
name: str,
version: str,
) -> Optional[package_hash_row_t]:
cur = self._conn.cursor()
cur.execute(
"SELECT sha256sum FROM packages WHERE name=? AND version=? AND sha256sum != '' ORDER BY snapshot_id DESC LIMIT 1",
(name, version),
)
return _fetch_one(cur, package_hash_row_t)
# ── repo_index_t loading ──
def load_repo_index(
self,
snapshot_id: int,
repo_name: str,
) -> repo_index_t:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
index = repo_index_t(name=repo_name)
pkg_ids: list[int] = []
pkg_by_id: dict[int, package_desc_t] = {}
columns = [desc[0] for desc in cur.description]
for raw in cur.fetchall():
row_dict = dict(zip(columns, raw))
pkg = package_desc_t(
name=row_dict['name'],
version=row_dict['version'],
base=row_dict['base'],
desc=row_dict['desc'],
filename=row_dict['filename'],
csize=row_dict['csize'],
isize=row_dict['isize'],
md5sum=row_dict['md5sum'],
sha256sum=row_dict['sha256sum'],
url=row_dict['url'],
arch=row_dict['arch'],
builddate=row_dict['builddate'],
packager=row_dict['packager'],
)
index.packages[pkg.name] = pkg
pkg_ids.append(row_dict['id'])
pkg_by_id[row_dict['id']] = pkg
if len(pkg_ids) > 0:
self._load_relations(cur, pkg_ids, pkg_by_id)
index.build_provides_index()
return index
def load_all_indices(self) -> list[repo_index_t]:
"""Load all snapshots as repo_index_t objects via bulk queries.
Returns one index per (snapshot_id, repo) so the solver sees all
package versions across all synced dates. Uses two bulk queries
instead of per-snapshot loading for performance.
"""
cur = self._conn.cursor()
cur.execute('SELECT * FROM snapshots ORDER BY date ASC')
snap_columns = [desc[0] for desc in cur.description]
snapshots = [dict(zip(snap_columns, raw)) for raw in cur.fetchall()]
cur.execute(
'SELECT id, snapshot_id, name, version, base, desc, filename, csize, isize, md5sum, sha256sum, url, arch, builddate, packager FROM packages'
)
pkg_columns = [desc[0] for desc in cur.description]
pkgs_by_snapshot: dict[int, dict[str, package_desc_t]] = {}
all_pkg_ids: list[int] = []
pkg_by_id: dict[int, package_desc_t] = {}
for raw in cur.fetchall():
rd = dict(zip(pkg_columns, raw))
pkg = package_desc_t(
name=rd['name'],
version=rd['version'],
base=rd['base'],
desc=rd['desc'],
filename=rd['filename'],
csize=rd['csize'],
isize=rd['isize'],
md5sum=rd['md5sum'],
sha256sum=rd['sha256sum'],
url=rd['url'],
arch=rd['arch'],
builddate=rd['builddate'],
packager=rd['packager'],
)
snap_id: int = rd['snapshot_id']
if snap_id not in pkgs_by_snapshot:
pkgs_by_snapshot[snap_id] = {}
pkgs_by_snapshot[snap_id][pkg.name] = pkg
all_pkg_ids.append(rd['id'])
pkg_by_id[rd['id']] = pkg
if len(all_pkg_ids) > 0:
self._load_relations(cur, all_pkg_ids, pkg_by_id)
indices: list[repo_index_t] = []
for snap in snapshots:
pkgs = pkgs_by_snapshot.get(snap['id'])
if pkgs is None or len(pkgs) == 0:
continue
idx = repo_index_t(name=snap['repo'], packages=pkgs)
idx.build_provides_index()
indices.append(idx)
return indices
def _load_relations(
self,
cur: sqlite3.Cursor,
pkg_ids: list[int],
pkg_by_id: dict[int, package_desc_t],
) -> None:
batch_size = 500
for i in range(0, len(pkg_ids), batch_size):
batch = pkg_ids[i : i + batch_size]
placeholders = ','.join('?' for _ in batch)
cur.execute(
'SELECT package_id, relation_type, value FROM package_relations WHERE package_id IN (%s)' % placeholders,
batch,
)
for row_raw in cur.fetchall():
pkg = pkg_by_id.get(row_raw[0])
if pkg is None:
continue
attr_name = cache_db_t.constants_t.list_relation_types.get(row_raw[1])
if attr_name is None:
continue
target_list: list[str] = getattr(pkg, attr_name)
target_list.append(row_raw[2])
# ── local packages & signatures ──
def record_local_package(
self,
name: str,
version: str,
filename: str,
sha256sum: str,
local_path: str,
) -> int:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
cur = self._conn.cursor()
cur.execute(
'INSERT OR REPLACE INTO local_packages (name, version, filename, sha256sum, local_path, downloaded_at) VALUES (?, ?, ?, ?, ?, ?)',
(name, version, filename, sha256sum, local_path, now),
)
self._conn.commit()
assert cur.lastrowid is not None
return cur.lastrowid
def record_signature(
self,
local_package_id: int,
sig_path: str,
keyring_package_version: Optional[str] = None,
gpg_key_id: Optional[str] = None,
) -> None:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
cur = self._conn.cursor()
cur.execute(
'INSERT OR REPLACE INTO local_signatures (local_package_id, sig_path, keyring_package_version, gpg_key_id, verified_at) VALUES (?, ?, ?, ?, ?)',
(local_package_id, sig_path, keyring_package_version, gpg_key_id, now),
)
self._conn.commit()
def get_signature_info(
self,
name: str,
version: str,
) -> Optional[signature_row_t]:
cur = self._conn.cursor()
cur.execute(
'SELECT ls.id, ls.local_package_id, ls.sig_path, '
'ls.keyring_package_version, ls.gpg_key_id, ls.verified_at '
'FROM local_signatures ls '
'JOIN local_packages lp ON lp.id = ls.local_package_id '
'WHERE lp.name=? AND lp.version=?',
(name, version),
)
return _fetch_one(cur, signature_row_t)
def get_trusted_package_set(
self,
trust_keyring_versions: Optional[list[str]] = None,
trust_gpg_keys: Optional[list[str]] = None,
exclude_keyring_versions: Optional[list[str]] = None,
exclude_gpg_keys: Optional[list[str]] = None,
) -> Optional[set[trusted_entry_t]]:
"""Return set of trusted (name, version) entries that pass trust filters.
Returns None if no trust filters are set (meaning all packages pass).
"""
has_filters = (
(trust_keyring_versions is not None and len(trust_keyring_versions) > 0)
or (trust_gpg_keys is not None and len(trust_gpg_keys) > 0)
or (exclude_keyring_versions is not None and len(exclude_keyring_versions) > 0)
or (exclude_gpg_keys is not None and len(exclude_gpg_keys) > 0)
)
if not has_filters:
return None
cur = self._conn.cursor()
cur.execute(
'SELECT lp.name, lp.version, ls.keyring_package_version, ls.gpg_key_id '
'FROM local_packages lp '
'JOIN local_signatures ls ON ls.local_package_id = lp.id'
)
trusted: set[trusted_entry_t] = set()
for row_raw in cur.fetchall():
keyring_ver = row_raw[2]
gpg_key = row_raw[3]
if exclude_keyring_versions and keyring_ver in exclude_keyring_versions:
continue
if exclude_gpg_keys and gpg_key in exclude_gpg_keys:
continue
is_trusted = False
if trust_keyring_versions and keyring_ver in trust_keyring_versions:
is_trusted = True
if trust_gpg_keys and gpg_key in trust_gpg_keys:
is_trusted = True
if not trust_keyring_versions and not trust_gpg_keys:
is_trusted = True
if is_trusted:
trusted.add(trusted_entry_t(name=row_raw[0], version=row_raw[1]))
return trusted
# ── status ──
def has_data(self) -> bool:
cur = self._conn.cursor()
cur.execute('SELECT COUNT(*) FROM snapshots')
row = cur.fetchone()
return row is not None and row[0] > 0

@ -1,442 +0,0 @@
import argparse
import enum
import logging
import math
import pathlib
import re
import subprocess
import sys
import urllib.request
from typing import (
ClassVar,
Optional,
)
logger = logging.getLogger(__name__)
class Command(enum.Enum):
list_installed = 'list-installed'
compile = 'compile'
download = 'download'
archive = 'archive'
class parse_rate_t:
class constants_t:
rate_re: ClassVar[re.Pattern[str]] = re.compile(r'^(\d+(?:\.\d+)?)\s*([bBkKmMgGpPtT]?)(?:[iI]?[bB])?(?:/s)?$')
units: ClassVar[dict[str, int]] = {
'': 0,
'b': 0,
'B': 0,
'k': 1,
'K': 1,
'm': 2,
'M': 2,
'g': 3,
'G': 3,
't': 4,
'T': 4,
'p': 5,
'P': 5,
}
@staticmethod
def parse(s: str) -> int:
m = parse_rate_t.constants_t.rate_re.match(s.strip())
if not m:
raise ValueError('invalid rate: %s' % s)
value = float(m.group(1))
unit = m.group(2)
power = parse_rate_t.constants_t.units.get(unit, 0)
return int(value * (1024**power))
class downloader_t:
class constants_t:
class backend_t(enum.Enum):
urllib = 'urllib'
curl = 'curl'
aria2c = 'aria2c'
@staticmethod
def download(
url: str,
dest: pathlib.Path,
backend: 'downloader_t.constants_t.backend_t',
limit_rate: int,
) -> None:
dest.parent.mkdir(parents=True, exist_ok=True)
if backend is downloader_t.constants_t.backend_t.urllib:
urllib.request.urlretrieve(url, str(dest))
elif backend is downloader_t.constants_t.backend_t.curl:
cmd = [
'curl',
'-fSL',
'--limit-rate',
'%d' % limit_rate,
'-o',
str(dest),
url,
]
subprocess.check_call(cmd)
elif backend is downloader_t.constants_t.backend_t.aria2c:
cmd = [
'aria2c',
'--max-download-limit=%d' % limit_rate,
'-d',
str(dest.parent),
'-o',
dest.name,
url,
]
subprocess.check_call(cmd)
else:
raise NotImplementedError
class download_requirements_t:
@staticmethod
def parse_requirements(txt: str) -> list[tuple[str, str]]:
entries: list[tuple[str, str]] = []
url: Optional[str] = None
for line in txt.splitlines():
line = line.strip()
if line == '':
continue
if line.startswith('#'):
candidate = line[1:].strip()
if '/' in candidate and '://' in candidate:
url = candidate
continue
parts = line.split()
if len(parts) == 0:
continue
pkg_spec = parts[0]
if url is not None:
filename = url.rsplit('/', 1)[-1] if '/' in url else pkg_spec
entries.append((url, filename))
url = None
return entries
def _find_cached_pkg(
cache_dir: pathlib.Path,
name: str,
version: str,
) -> Optional[pathlib.Path]:
"""Find a cached .pkg.tar.* file for a given package name and version."""
for suffix in ['.pkg.tar.zst', '.pkg.tar.xz', '.pkg.tar.gz', '.pkg.tar.bz2', '.pkg.tar']:
for arch in ['x86_64', 'any']:
candidate = cache_dir / ('%s-%s-%s%s' % (name, version, arch, suffix))
if candidate.exists():
return candidate
return None
def main(argv: Optional[list[str]] = None) -> int:
if argv is None:
argv = sys.argv[1:]
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
prog='online-fxreader-pr34-archlinux',
description='Arch Linux package management tools',
)
parser.add_argument(
'command',
choices=[o.value for o in Command],
)
options, args = parser.parse_known_args(argv)
options.command = Command(options.command)
if options.command is Command.list_installed:
import hashlib
from .pacman import pacman_t
list_parser = argparse.ArgumentParser()
list_parser.add_argument(
'--format',
choices=['plain', 'constraints', 'compiled'],
default='plain',
help='plain: name version; constraints: name>=version; compiled: name==version with optional hashes',
)
list_parser.add_argument(
'--generate-hashes',
action='store_true',
default=False,
help='include sha256 from local /var/cache/pacman/pkg/ files; fails if file not found for any package',
)
list_parser.add_argument(
'--db-path',
dest='db_path',
default='/var/lib/pacman',
help='pacman db path, default /var/lib/pacman',
)
list_parser.add_argument(
'--pkg-cache-dir',
dest='pkg_cache_dir',
default='/var/cache/pacman/pkg',
help='local pacman package cache directory, default /var/cache/pacman/pkg',
)
list_options = list_parser.parse_args(args)
installed = pacman_t.list_installed_simple(
db_path=pathlib.Path(list_options.db_path),
)
pkg_cache_dir = pathlib.Path(list_options.pkg_cache_dir)
if list_options.format == 'plain':
for name, version in installed:
print('%s %s' % (name, version))
elif list_options.format == 'constraints':
for name, version in installed:
print('%s>=%s' % (name, version))
elif list_options.format == 'compiled':
missing_hashes: list[str] = []
for name, version in installed:
line = '%s==%s' % (name, version)
if list_options.generate_hashes:
pkg_file = _find_cached_pkg(
pkg_cache_dir,
name,
version,
)
if pkg_file is not None:
h = hashlib.sha256()
with open(pkg_file, 'rb') as fh:
while True:
chunk = fh.read(65536)
if not chunk:
break
h.update(chunk)
line += ' --hash=sha256:%s' % h.hexdigest()
else:
missing_hashes.append(name)
print(line)
if len(missing_hashes) > 0:
logger.error(
"can't determine checksum of installed package(s) - no cached file found for %d package(s): %s" % (len(missing_hashes), missing_hashes)
)
return 1
return 0
elif options.command is Command.compile:
compile_parser = argparse.ArgumentParser()
compile_parser.add_argument(
'packages',
nargs='*',
)
compile_parser.add_argument(
'-r',
dest='requirements_file',
default=None,
help='path to file with package constraints (one per line)',
)
compile_parser.add_argument(
'--index',
dest='index_url',
default=None,
help='mirror URL',
)
compile_parser.add_argument(
'--archive-date',
dest='archive_date',
default=None,
help='Arch Linux Archive date (e.g. 2024/01/15)',
)
compile_parser.add_argument(
'--offline',
action='store_true',
default=False,
)
compile_parser.add_argument(
'--no-cache',
action='store_true',
default=False,
)
compile_parser.add_argument(
'--generate-hashes',
action='store_true',
default=False,
)
compile_parser.add_argument(
'--cache-dir',
dest='cache_dir',
default=None,
)
compile_parser.add_argument(
'--repos',
nargs='*',
default=['core', 'extra', 'multilib'],
)
compile_parser.add_argument(
'--arch',
default='x86_64',
)
compile_parser.add_argument(
'--backend',
choices=['python', 'solv'],
default='solv',
)
compile_parser.add_argument(
'--archive-cache',
dest='archive_cache',
default=None,
help='path to archive cache dir (with archlinux_cache.db from archive sync); loads all synced dates into the solver pool',
)
compile_parser.add_argument(
'--reference',
default=None,
help='path to previously compiled requirements file to use as version pins',
)
compile_parser.add_argument(
'--resolution-strategy',
dest='resolution_strategy',
choices=['upgrade-all', 'pin-referenced'],
default='upgrade-all',
help='upgrade-all: resolve fresh; pin-referenced: keep referenced versions, only upgrade explicitly requested packages',
)
compile_options = compile_parser.parse_args(args)
from .models import compile_options_t, resolution_strategy_t
packages: list[str] = list(compile_options.packages)
if compile_options.requirements_file is not None:
for line in pathlib.Path(compile_options.requirements_file).read_text().splitlines():
line = line.strip()
if line != '' and not line.startswith('#'):
packages.append(line)
opts = compile_options_t(
packages=packages,
index_url=compile_options.index_url,
archive_date=compile_options.archive_date,
offline=compile_options.offline,
no_cache=compile_options.no_cache,
generate_hashes=compile_options.generate_hashes,
repos=compile_options.repos,
arch=compile_options.arch,
cache_dir=compile_options.cache_dir,
reference=compile_options.reference,
resolution_strategy=resolution_strategy_t(compile_options.resolution_strategy),
)
try:
if compile_options.backend == 'solv':
from .solv_backend import compile_solv_t, repo_store_t
stores = None
if compile_options.archive_cache is not None:
from .cache_db import cache_db_t
archive_cache_dir = pathlib.Path(compile_options.archive_cache)
db_path = archive_cache_dir / 'archlinux_cache.db'
if db_path.exists():
cache_db = cache_db_t(db_path)
indices = cache_db.load_all_indices()
cache_db.close()
stores = [repo_store_t(index=idx) for idx in indices]
result = compile_solv_t.compile(opts, stores=stores)
else:
from .compile import compile_t
result = compile_t.compile(opts)
except RuntimeError as e:
logger.error(str(e))
return 1
print(result.txt)
return 0
elif options.command is Command.download:
download_parser = argparse.ArgumentParser()
download_parser.add_argument(
'-r',
dest='requirements',
required=True,
help='path to compiled requirements file',
)
download_parser.add_argument(
'-d',
dest='dest_dir',
required=True,
help='destination directory for downloaded packages',
)
download_parser.add_argument(
'--downloader',
choices=[o.value for o in downloader_t.constants_t.backend_t],
default='urllib',
)
download_parser.add_argument(
'--limit-rate',
dest='limit_rate',
default='128KiB/s',
help='download speed limit (e.g. 128KiB/s, 1MiB/s, 512K), default 128KiB/s',
)
download_options = download_parser.parse_args(args)
dest_dir = pathlib.Path(download_options.dest_dir)
dest_dir.mkdir(parents=True, exist_ok=True)
backend = downloader_t.constants_t.backend_t(download_options.downloader)
limit_rate = parse_rate_t.parse(download_options.limit_rate)
requirements_txt = pathlib.Path(download_options.requirements).read_text()
entries = download_requirements_t.parse_requirements(requirements_txt)
count = 0
for url, filename in entries:
dest_path = dest_dir / filename
if dest_path.exists():
logger.info(dict(msg='already downloaded', path=str(dest_path)))
else:
logger.info(dict(msg='downloading', url=url, dest=str(dest_path), backend=backend.value, limit_rate=limit_rate))
downloader_t.download(
url=url,
dest=dest_path,
backend=backend,
limit_rate=limit_rate,
)
count += 1
logger.info(dict(msg='download complete', count=count))
return 0
elif options.command is Command.archive:
from . import archive as _archive
return _archive.main(args)
else:
raise NotImplementedError
if __name__ == '__main__':
sys.exit(main())

@ -1,147 +0,0 @@
import io
import hashlib
import pathlib
import tempfile
import logging
from typing import (
Optional,
Any,
)
from .models import (
compile_options_t,
compile_entry_t,
compile_result_t,
mirror_config_t,
repo_index_t,
)
from .db import db_parser_t
from .pacman import pacman_t
from .resolver import resolver_t
logger = logging.getLogger(__name__)
class compile_t:
@staticmethod
def build_mirror_config(options: compile_options_t) -> mirror_config_t:
if options.archive_date is not None:
return mirror_config_t.from_archive_date(
date=options.archive_date,
repos=options.repos,
arch=options.arch,
)
elif options.index_url is not None:
return mirror_config_t.from_mirror_url(
mirror_url=options.index_url,
repos=options.repos,
arch=options.arch,
)
else:
return mirror_config_t.from_mirror_url(
mirror_url='https://archive.archlinux.org/repos/last',
repos=options.repos,
arch=options.arch,
)
@staticmethod
def fetch_indices(
mirror: mirror_config_t,
cache_dir: Optional[pathlib.Path] = None,
no_cache: bool = False,
offline: bool = False,
) -> list[repo_index_t]:
indices: list[repo_index_t] = []
for repo in mirror.repos:
db_url = '%s/%s.db' % (repo.url, repo.name)
if cache_dir is not None and not no_cache:
cached_path = cache_dir / ('%s.db' % repo.name)
if cached_path.exists():
logger.info(
dict(
repo=repo.name,
msg='using cached db',
path=str(cached_path),
)
)
index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name)
indices.append(index)
continue
if offline:
raise FileNotFoundError('offline mode: cached db not found for %s at %s' % (repo.name, str(cached_path)))
pacman_t.download_db(db_url, cached_path)
index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name)
indices.append(index)
else:
if offline:
raise FileNotFoundError('offline mode requires --cache-dir with pre-fetched db files')
with tempfile.NamedTemporaryFile(suffix='.db') as tmp:
pacman_t.download_db(db_url, pathlib.Path(tmp.name))
index = db_parser_t.parse_db_path(pathlib.Path(tmp.name), repo_name=repo.name)
indices.append(index)
return indices
@staticmethod
def compile(
options: compile_options_t,
) -> compile_result_t.res_t:
mirror = compile_t.build_mirror_config(options)
cache_dir: Optional[pathlib.Path] = None
if options.cache_dir is not None:
cache_dir = pathlib.Path(options.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
indices = compile_t.fetch_indices(
mirror=mirror,
cache_dir=cache_dir,
no_cache=options.no_cache,
offline=options.offline,
)
resolved = resolver_t.resolve(
packages=options.packages,
indices=indices,
)
result = compile_result_t.res_t()
for pkg_name in resolved.resolution_order:
pkg = resolved.resolved[pkg_name]
repo_name = ''
for idx in indices:
if pkg_name in idx.packages:
repo_name = idx.name
break
repo_url = ''
for repo_cfg in mirror.repos:
if repo_cfg.name == repo_name:
repo_url = repo_cfg.url
break
entry = compile_entry_t(
name=pkg.name,
version=pkg.version,
filename=pkg.filename,
repo=repo_name,
url='%s/%s' % (repo_url, pkg.filename) if repo_url and pkg.filename else '',
sha256=pkg.sha256sum if options.generate_hashes else '',
depends=pkg.depends,
)
result.entries.append(entry)
result.txt = result.to_txt()
return result

@ -1,157 +0,0 @@
import io
import re
import tarfile
import logging
import pathlib
from typing import (
ClassVar,
Optional,
Any,
BinaryIO,
)
from .models import (
package_desc_t,
repo_index_t,
)
logger = logging.getLogger(__name__)
class db_parser_t:
class constants_t:
field_re: ClassVar[re.Pattern[str]] = re.compile(r'^%([A-Z0-9]+)%$')
list_fields: ClassVar[set[str]] = {
'LICENSE',
'DEPENDS',
'OPTDEPENDS',
'MAKEDEPENDS',
'CHECKDEPENDS',
'PROVIDES',
'CONFLICTS',
'REPLACES',
'GROUPS',
}
field_map: ClassVar[dict[str, str]] = {
'FILENAME': 'filename',
'NAME': 'name',
'VERSION': 'version',
'DESC': 'desc',
'CSIZE': 'csize',
'ISIZE': 'isize',
'MD5SUM': 'md5sum',
'SHA256SUM': 'sha256sum',
'URL': 'url',
'ARCH': 'arch',
'BUILDDATE': 'builddate',
'PACKAGER': 'packager',
'LICENSE': 'license',
'DEPENDS': 'depends',
'OPTDEPENDS': 'optdepends',
'MAKEDEPENDS': 'makedepends',
'CHECKDEPENDS': 'checkdepends',
'PROVIDES': 'provides',
'CONFLICTS': 'conflicts',
'REPLACES': 'replaces',
'GROUPS': 'groups',
'BASE': 'base',
}
int_fields: ClassVar[set[str]] = {
'CSIZE',
'ISIZE',
'BUILDDATE',
}
@staticmethod
def parse_desc(content: str) -> package_desc_t:
fields: dict[str, Any] = {}
lines = content.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
if line == '':
i += 1
continue
m = db_parser_t.constants_t.field_re.match(line)
if not m:
i += 1
continue
field_name = m.group(1)
i += 1
values: list[str] = []
while i < len(lines) and lines[i].strip() != '':
values.append(lines[i].strip())
i += 1
attr_name = db_parser_t.constants_t.field_map.get(field_name)
if attr_name is None:
continue
if field_name in db_parser_t.constants_t.list_fields:
fields[attr_name] = values
elif field_name in db_parser_t.constants_t.int_fields:
fields[attr_name] = int(values[0]) if len(values) > 0 else 0
else:
fields[attr_name] = values[0] if len(values) > 0 else ''
if 'name' not in fields or 'version' not in fields:
raise ValueError('desc missing NAME or VERSION')
return package_desc_t(**fields)
@staticmethod
def parse_db(
f: BinaryIO,
repo_name: str = '',
) -> repo_index_t:
index = repo_index_t(name=repo_name)
with tarfile.open(fileobj=f, mode='r:*') as tar:
desc_members: list[tarfile.TarInfo] = []
for member in tar.getmembers():
if member.name.endswith('/desc') and member.isfile():
desc_members.append(member)
for member in desc_members:
extracted = tar.extractfile(member)
if extracted is None:
continue
content = extracted.read().decode('utf-8')
extracted.close()
try:
pkg = db_parser_t.parse_desc(content)
index.packages[pkg.name] = pkg
except ValueError:
logger.warning(
dict(
member=member.name,
msg='failed to parse desc',
)
)
index.build_provides_index()
return index
@staticmethod
def parse_db_path(
path: pathlib.Path,
repo_name: Optional[str] = None,
) -> repo_index_t:
if repo_name is None:
repo_name = path.stem.split('.')[0]
with io.open(path, 'rb') as f:
return db_parser_t.parse_db(f, repo_name=repo_name)

@ -1,182 +0,0 @@
import re
import subprocess
import pathlib
import dataclasses
import logging
from typing import (
ClassVar,
Optional,
Any,
)
from .models import (
package_desc_t,
)
logger = logging.getLogger(__name__)
class pacman_t:
class constants_t:
default_db_path: ClassVar[pathlib.Path] = pathlib.Path('/var/lib/pacman')
default_cache_dir: ClassVar[pathlib.Path] = pathlib.Path('/var/cache/pacman/pkg')
field_re: ClassVar[re.Pattern[str]] = re.compile(r'^([A-Za-z ]+?)\s*:\s*(.*)$')
@dataclasses.dataclass
class query_entry_t:
name: str
version: str
description: str = ''
architecture: str = ''
url: str = ''
depends_on: list[str] = dataclasses.field(default_factory=lambda: list[str]())
provides: list[str] = dataclasses.field(default_factory=lambda: list[str]())
conflicts_with: list[str] = dataclasses.field(default_factory=lambda: list[str]())
replaces: list[str] = dataclasses.field(default_factory=lambda: list[str]())
install_size: str = ''
packager: str = ''
groups: list[str] = dataclasses.field(default_factory=lambda: list[str]())
class list_installed_t:
@dataclasses.dataclass
class res_t:
packages: list['pacman_t.query_entry_t'] = dataclasses.field(default_factory=lambda: list[pacman_t.query_entry_t]())
@staticmethod
def parse_info_block(block: str) -> 'pacman_t.query_entry_t':
fields: dict[str, list[str]] = {}
current_key: Optional[str] = None
for line in block.split('\n'):
m = pacman_t.constants_t.field_re.match(line)
if m:
current_key = m.group(1).strip()
value = m.group(2).strip()
assert isinstance(current_key, str)
if current_key not in fields:
fields[current_key] = []
if value and value != 'None':
fields[current_key].append(value)
elif current_key and line.startswith(' '):
value = line.strip()
if value and value != 'None':
fields[current_key].append(value)
name = fields.get('Name', [''])[0]
version = fields.get('Version', [''])[0]
if not name or not version:
raise ValueError('missing Name or Version in block')
return pacman_t.query_entry_t(
name=name,
version=version,
description=fields.get('Description', [''])[0] if fields.get('Description') else '',
architecture=fields.get('Architecture', [''])[0] if fields.get('Architecture') else '',
url=fields.get('URL', [''])[0] if fields.get('URL') else '',
depends_on=fields.get('Depends On', []),
provides=fields.get('Provides', []),
conflicts_with=fields.get('Conflicts With', []),
replaces=fields.get('Replaces', []),
install_size=fields.get('Installed Size', [''])[0] if fields.get('Installed Size') else '',
packager=fields.get('Packager', [''])[0] if fields.get('Packager') else '',
groups=fields.get('Groups', []),
)
@staticmethod
def list_installed(
db_path: Optional[pathlib.Path] = None,
) -> 'pacman_t.list_installed_t.res_t':
cmd: list[str] = ['pacman', '-Qi']
if db_path is not None:
cmd.extend(['--dbpath', str(db_path)])
output = subprocess.check_output(
cmd,
stderr=subprocess.DEVNULL,
).decode('utf-8')
blocks = output.split('\n\n')
result = pacman_t.list_installed_t.res_t()
for block in blocks:
block = block.strip()
if not block:
continue
try:
entry = pacman_t.parse_info_block(block)
result.packages.append(entry)
except ValueError:
logger.warning(
dict(
msg='failed to parse pacman info block',
)
)
return result
@staticmethod
def list_installed_simple(
db_path: Optional[pathlib.Path] = None,
) -> list[tuple[str, str]]:
cmd: list[str] = ['pacman', '-Q']
if db_path is not None:
cmd.extend(['--dbpath', str(db_path)])
output = subprocess.check_output(
cmd,
stderr=subprocess.DEVNULL,
).decode('utf-8')
result: list[tuple[str, str]] = []
for line in output.strip().split('\n'):
parts = line.strip().split(None, 1)
if len(parts) == 2:
result.append((parts[0], parts[1]))
return result
@staticmethod
def sync_db(
mirror_url: str,
db_path: pathlib.Path,
repos: Optional[list[str]] = None,
) -> None:
if repos is None:
repos = ['core', 'extra', 'multilib']
cmd: list[str] = [
'pacman',
'-Sy',
'--dbpath',
str(db_path),
]
subprocess.check_call(cmd)
@staticmethod
def download_db(
url: str,
output_path: pathlib.Path,
) -> None:
import urllib.request
logger.info(
dict(
url=url,
output_path=str(output_path),
msg='downloading db',
)
)
output_path.parent.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(
url,
str(output_path),
)

@ -1,161 +0,0 @@
import dataclasses
import logging
from typing import (
Optional,
Any,
)
from .models import (
package_desc_t,
package_constraint_t,
repo_index_t,
vercmp_t,
)
logger = logging.getLogger(__name__)
class resolver_t:
class error_t:
class not_found_t(Exception):
def __init__(self, name: str) -> None:
self.name = name
super().__init__('package not found: %s' % name)
class conflict_t(Exception):
def __init__(self, pkg_a: str, pkg_b: str, constraint: str) -> None:
self.pkg_a = pkg_a
self.pkg_b = pkg_b
self.constraint = constraint
super().__init__('conflict: %s conflicts with %s (%s)' % (pkg_a, pkg_b, constraint))
class unsatisfied_t(Exception):
def __init__(self, parent: str, dep: str) -> None:
self.parent = parent
self.dep = dep
super().__init__('unsatisfied dependency: %s requires %s' % (parent, dep))
@dataclasses.dataclass
class res_t:
resolved: dict[str, package_desc_t] = dataclasses.field(default_factory=lambda: dict[str, package_desc_t]())
resolution_order: list[str] = dataclasses.field(default_factory=lambda: list[str]())
@staticmethod
def _find_provider(
constraint: package_constraint_t,
indices: list[repo_index_t],
) -> Optional[tuple[package_desc_t, str]]:
for index in indices:
if constraint.name in index.packages:
pkg = index.packages[constraint.name]
if constraint.satisfied_by(pkg.version):
return (pkg, index.name)
for index in indices:
if constraint.name in index.provides_index:
for provider_name in index.provides_index[constraint.name]:
pkg = index.packages[provider_name]
for prov in pkg.parsed_provides():
if prov.name == constraint.name:
if constraint.version is None or prov.version is None:
return (pkg, index.name)
if constraint.satisfied_by(prov.version):
return (pkg, index.name)
return None
@staticmethod
def resolve(
packages: list[str],
indices: list[repo_index_t],
skip_installed: Optional[set[str]] = None,
) -> 'resolver_t.res_t':
if skip_installed is None:
skip_installed = set()
result = resolver_t.res_t()
visited: set[str] = set()
stack: list[tuple[package_constraint_t, Optional[str]]] = []
for pkg_str in packages:
constraint = package_constraint_t.parse(pkg_str)
stack.append((constraint, None))
while len(stack) > 0:
constraint, parent = stack.pop()
if constraint.name in visited:
if constraint.name in result.resolved:
pkg = result.resolved[constraint.name]
if not constraint.satisfied_by(pkg.version):
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
continue
if constraint.name in skip_installed:
visited.add(constraint.name)
continue
found = resolver_t._find_provider(constraint, indices)
if found is None:
exists = any(constraint.name in idx.packages or constraint.name in idx.provides_index for idx in indices)
if exists:
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
raise resolver_t.error_t.not_found_t(constraint.name)
pkg, repo_name = found
if pkg.name in visited:
if pkg.name in result.resolved and constraint.op is not None:
resolved_pkg = result.resolved[pkg.name]
if constraint.name == resolved_pkg.name:
if not constraint.satisfied_by(resolved_pkg.version):
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
else:
matched = False
for prov in resolved_pkg.parsed_provides():
if prov.name == constraint.name:
if prov.version is not None and constraint.satisfied_by(prov.version):
matched = True
break
elif prov.version is None:
matched = True
break
if not matched:
raise resolver_t.error_t.unsatisfied_t(
parent=parent or '<root>',
dep=constraint.to_str(),
)
continue
visited.add(pkg.name)
visited.add(constraint.name)
result.resolved[pkg.name] = pkg
result.resolution_order.append(pkg.name)
for conflict in pkg.parsed_conflicts():
if conflict.name in result.resolved:
resolved_version = result.resolved[conflict.name].version
if conflict.satisfied_by(resolved_version):
raise resolver_t.error_t.conflict_t(
pkg_a=pkg.name,
pkg_b=conflict.name,
constraint=conflict.to_str(),
)
for dep in pkg.parsed_depends():
if dep.name not in visited and dep.name not in skip_installed:
stack.append((dep, pkg.name))
return result

@ -1,416 +0,0 @@
import hashlib
import io
import logging
import pathlib
import re
from typing import (
ClassVar,
Optional,
Any,
)
from .models import (
package_desc_t,
repo_index_t,
compile_options_t,
compile_entry_t,
compile_result_t,
mirror_config_t,
resolution_strategy_t,
)
from .db import db_parser_t
from .compile import compile_t as compile_base_t
logger = logging.getLogger(__name__)
class repo_store_t:
class constants_t:
checksum_filename: ClassVar[str] = 'checksum.sha256'
def __init__(
self,
index: repo_index_t,
db_checksum: str = '',
) -> None:
self.index = index
self.db_checksum = db_checksum
@staticmethod
def _file_checksum(path: pathlib.Path) -> str:
h = hashlib.sha256()
with io.open(path, 'rb') as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
@staticmethod
def from_db(
db_path: pathlib.Path,
repo_name: Optional[str] = None,
cache_dir: Optional[pathlib.Path] = None,
) -> 'repo_store_t':
if repo_name is None:
repo_name = db_path.stem.split('.')[0]
db_checksum = repo_store_t._file_checksum(db_path)
if cache_dir is not None:
solv_cache_path = cache_dir / ('%s.solv' % repo_name)
checksum_path = cache_dir / ('%s.solv.sha256' % repo_name)
index_cache_path = cache_dir / ('%s.index.solv' % repo_name)
if solv_cache_path.exists() and checksum_path.exists():
stored_checksum = checksum_path.read_text().strip()
if stored_checksum == db_checksum:
logger.info(
dict(
repo=repo_name,
msg='using cached solv',
path=str(solv_cache_path),
)
)
index = db_parser_t.parse_db_path(db_path, repo_name=repo_name)
return repo_store_t(
index=index,
db_checksum=db_checksum,
)
index = db_parser_t.parse_db_path(db_path, repo_name=repo_name)
return repo_store_t(
index=index,
db_checksum=db_checksum,
)
def write_solv_cache(
self,
cache_dir: pathlib.Path,
solv_repo: Any,
) -> None:
import solv
cache_dir.mkdir(parents=True, exist_ok=True)
solv_cache_path = cache_dir / ('%s.solv' % self.index.name)
checksum_path = cache_dir / ('%s.solv.sha256' % self.index.name)
f = solv.xfopen(str(solv_cache_path), 'w')
solv_repo.write(f)
f.close()
checksum_path.write_text(self.db_checksum)
logger.info(
dict(
repo=self.index.name,
msg='wrote solv cache',
path=str(solv_cache_path),
size=solv_cache_path.stat().st_size,
)
)
class solv_pool_t:
class constants_t:
dep_re: ClassVar[re.Pattern[str]] = re.compile(r'^([a-zA-Z0-9@._+\-]+?)(?:(>=|<=|>|<|=)(.+))?$')
def __init__(
self,
stores: Optional[list[repo_store_t]] = None,
cache_dir: Optional[pathlib.Path] = None,
) -> None:
import solv
self._solv = solv
self._pool = solv.Pool()
self._pool.setdisttype(solv.Pool.DISTTYPE_ARCH)
self._pool.setarch('x86_64')
self._rel_map = {
'>=': solv.REL_GT | solv.REL_EQ,
'<=': solv.REL_LT | solv.REL_EQ,
'>': solv.REL_GT,
'<': solv.REL_LT,
'=': solv.REL_EQ,
}
self._stores: list[repo_store_t] = []
if stores is not None:
for store in stores:
self.add_store(store, cache_dir=cache_dir)
self.finalize()
def _parse_dep(self, dep_str: str) -> Any:
m = solv_pool_t.constants_t.dep_re.match(dep_str.strip())
if not m:
return self._pool.str2id(dep_str)
name = m.group(1)
op = m.group(2)
ver = m.group(3)
name_id = self._pool.str2id(name)
if op and ver:
ver_id = self._pool.str2id(ver)
return self._pool.rel2id(name_id, ver_id, self._rel_map[op])
return name_id
def add_store(
self,
store: repo_store_t,
cache_dir: Optional[pathlib.Path] = None,
) -> None:
solv = self._solv
self._stores.append(store)
loaded_from_cache = False
if cache_dir is not None:
solv_cache_path = cache_dir / ('%s.solv' % store.index.name)
checksum_path = cache_dir / ('%s.solv.sha256' % store.index.name)
if solv_cache_path.exists() and checksum_path.exists():
stored_checksum = checksum_path.read_text().strip()
if stored_checksum == store.db_checksum:
repo = self._pool.add_repo(store.index.name)
f = solv.xfopen(str(solv_cache_path))
repo.add_solv(f)
f.close()
loaded_from_cache = True
logger.info(
dict(
repo=store.index.name,
msg='loaded solv from cache',
solvables=repo.nsolvables,
)
)
if not loaded_from_cache:
repo = self._pool.add_repo(store.index.name)
for pkg in store.index.packages.values():
s = repo.add_solvable()
s.name = pkg.name
s.evr = pkg.version
s.arch = 'noarch' if pkg.arch == 'any' else (pkg.arch or 'x86_64')
for dep_str in pkg.depends:
s.add_requires(self._parse_dep(dep_str))
for prov_str in pkg.provides:
s.add_provides(self._parse_dep(prov_str))
s.add_provides(self._pool.rel2id(s.nameid, s.evrid, solv.REL_EQ))
for conf_str in pkg.conflicts:
s.add_conflicts(self._parse_dep(conf_str))
repo.internalize()
if cache_dir is not None:
store.write_solv_cache(cache_dir, repo)
def finalize(self) -> None:
self._pool.createwhatprovides()
class resolve_t:
class res_t:
def __init__(self) -> None:
self.resolved: dict[str, Any] = {}
self.problems: list[str] = []
def expand_groups(
self,
packages: list[str],
) -> list[str]:
expanded: list[str] = []
for pkg_name in packages:
found_group = False
for store in self._stores:
if pkg_name in store.index.groups_index:
expanded.extend(store.index.groups_index[pkg_name])
found_group = True
break
if not found_group:
expanded.append(pkg_name)
return expanded
@staticmethod
def parse_reference(txt: str) -> dict[str, str]:
pinned: dict[str, str] = {}
for line in txt.splitlines():
line = line.strip()
if line == '' or line.startswith('#'):
continue
parts = line.split()
pkg_spec = parts[0]
if '==' in pkg_spec:
name, version = pkg_spec.split('==', 1)
pinned[name] = version
return pinned
def resolve(
self,
packages: list[str],
expand_groups: bool = True,
pinned: Optional[dict[str, str]] = None,
upgrade_packages: Optional[list[str]] = None,
) -> 'solv_pool_t.resolve_t.res_t':
solv = self._solv
if expand_groups:
packages = self.expand_groups(packages)
result = solv_pool_t.resolve_t.res_t()
solver = self._pool.Solver()
jobs: list[Any] = []
upgrade_set: set[str] = set()
if upgrade_packages is not None:
if expand_groups:
upgrade_packages = self.expand_groups(upgrade_packages)
upgrade_set = set(upgrade_packages)
for pkg_spec in packages:
pkg_name = pkg_spec.split('>=')[0].split('<=')[0].split('>')[0].split('<')[0].split('=')[0]
if pinned is not None and pkg_name in pinned and pkg_name not in upgrade_set:
pinned_spec = '%s=%s' % (pkg_name, pinned[pkg_name])
dep = self._parse_dep(pinned_spec)
jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep))
else:
dep = self._parse_dep(pkg_spec)
sel = self._pool.select(pkg_name, solv.Selection.SELECTION_NAME | solv.Selection.SELECTION_PROVIDES)
if sel.isempty():
result.problems.append('package not found: %s' % pkg_spec)
continue
if pkg_name != pkg_spec:
jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep))
else:
jobs += sel.jobs(solv.Job.SOLVER_INSTALL)
if len(result.problems) > 0:
return result
problems = solver.solve(jobs)
if problems:
for p in problems:
result.problems.append(str(p))
return result
trans = solver.transaction()
for s in trans.newsolvables():
result.resolved[s.name] = s
return result
class compile_solv_t:
@staticmethod
def compile(
options: compile_options_t,
stores: Optional[list[repo_store_t]] = None,
) -> compile_result_t.res_t:
mirror = compile_base_t.build_mirror_config(options)
cache_dir: Optional[pathlib.Path] = None
if options.cache_dir is not None:
cache_dir = pathlib.Path(options.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
if stores is None:
indices = compile_base_t.fetch_indices(
mirror=mirror,
cache_dir=cache_dir,
no_cache=options.no_cache,
offline=options.offline,
)
stores = [repo_store_t(index=idx) for idx in indices]
pool = solv_pool_t(stores=stores, cache_dir=cache_dir)
pinned: Optional[dict[str, str]] = None
upgrade_packages: Optional[list[str]] = None
if options.reference is not None:
ref_txt = pathlib.Path(options.reference).read_text()
pinned = solv_pool_t.parse_reference(ref_txt)
if options.resolution_strategy is resolution_strategy_t.pin_referenced:
upgrade_packages = options.packages
packages = list(pinned.keys()) + [p for p in options.packages if p not in pinned]
else:
packages = options.packages
else:
packages = options.packages
resolved = pool.resolve(
packages,
pinned=pinned if options.resolution_strategy is resolution_strategy_t.pin_referenced else None,
upgrade_packages=upgrade_packages,
)
if len(resolved.problems) > 0:
raise RuntimeError('resolution failed with %d problem(s):\n%s' % (len(resolved.problems), '\n'.join(resolved.problems)))
result = compile_result_t.res_t()
for pkg_name, solvable in resolved.resolved.items():
repo_name = solvable.repo.name if solvable.repo else ''
pkg_desc: Optional[package_desc_t] = None
for store in stores:
candidate = store.index.packages.get(pkg_name)
if candidate is not None and candidate.version == solvable.evr:
pkg_desc = candidate
if store.index.name == repo_name:
break
filename = pkg_desc.filename if pkg_desc else ''
sha256 = (pkg_desc.sha256sum if pkg_desc else '') if options.generate_hashes else ''
url = ''
if filename:
repo_url = ''
for repo_cfg in mirror.repos:
if repo_cfg.name == repo_name:
repo_url = repo_cfg.url
break
if repo_url:
url = '%s/%s' % (repo_url, filename)
else:
url = 'https://archive.archlinux.org/packages/%s/%s/%s' % (
pkg_name[0],
pkg_name,
filename,
)
entry = compile_entry_t(
name=pkg_name,
version=solvable.evr,
filename=filename,
repo=repo_name,
url=url,
sha256=sha256,
depends=pkg_desc.depends if pkg_desc else [],
)
result.entries.append(entry)
result.txt = result.to_txt()
return result