From 1e1cd6c1c0a3ade50bc453babfb0368ca45b9f35 Mon Sep 17 00:00:00 2001 From: LLM Date: Thu, 9 Apr 2026 09:00:00 +0000 Subject: [PATCH] [+] remove old pre-refactor archlinux modules 1. delete archive.py, cache_db.py, cli.py, compile.py, db.py, pacman.py, resolver.py, solv_backend.py; 2. all functionality moved to apps/, cli/, resolver/ subpackages; --- .../pr34/commands_typed/archlinux/archive.py | 294 -------- .../pr34/commands_typed/archlinux/cache_db.py | 689 ------------------ .../pr34/commands_typed/archlinux/cli.py | 442 ----------- .../pr34/commands_typed/archlinux/compile.py | 147 ---- .../pr34/commands_typed/archlinux/db.py | 157 ---- .../pr34/commands_typed/archlinux/pacman.py | 182 ----- .../pr34/commands_typed/archlinux/resolver.py | 161 ---- .../commands_typed/archlinux/solv_backend.py | 416 ----------- 8 files changed, 2488 deletions(-) delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/archive.py delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/cache_db.py delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/cli.py delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/compile.py delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/db.py delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/pacman.py delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/resolver.py delete mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/archive.py b/python/online/fxreader/pr34/commands_typed/archlinux/archive.py deleted file mode 100644 index 6ce5725..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/archive.py +++ /dev/null @@ -1,294 +0,0 @@ -import argparse -import datetime -import enum -import logging -import pathlib -import re - -from typing import ( - ClassVar, - Optional, -) - -from .cache_db import cache_db_t -from .db import db_parser_t -from .models import mirror_config_t -from .pacman import pacman_t - -logger = logging.getLogger(__name__) - - -class ArchiveAction(enum.Enum): - list_dates = 'list-dates' - list_packages = 'list-packages' - show_versions = 'show-versions' - sync = 'sync' - - -class archive_t: - class constants_t: - base_url: ClassVar[str] = 'https://archive.archlinux.org/repos/' - href_re: ClassVar[re.Pattern[str]] = re.compile(r'href="(\d{4}/\d{2}/\d{2})/"') - default_repos: ClassVar[list[str]] = ['core', 'extra', 'multilib'] - - @staticmethod - def list_remote_dates( - base_url: Optional[str] = None, - ) -> list[str]: - """Scrape available date directories from the archive index page.""" - import urllib.request - - if base_url is None: - base_url = archive_t.constants_t.base_url - - logger.info(dict(msg='fetching archive index', url=base_url)) - - with urllib.request.urlopen(base_url) as resp: - html = resp.read().decode('utf-8') - - dates: list[str] = [] - for m in archive_t.constants_t.href_re.finditer(html): - dates.append(m.group(1)) - - dates.sort(reverse=True) - return dates - - @staticmethod - def sync_date( - date: str, - cache_dir: pathlib.Path, - cache_db: cache_db_t, - repos: Optional[list[str]] = None, - arch: str = 'x86_64', - ) -> None: - if repos is None: - repos = list(archive_t.constants_t.default_repos) - - mirror = mirror_config_t.from_archive_date( - date=date, - repos=repos, - arch=arch, - ) - - db_dir = cache_dir / date - db_dir.mkdir(parents=True, exist_ok=True) - - for repo_cfg in mirror.repos: - db_url = '%s/%s.db' % (repo_cfg.url, repo_cfg.name) - db_path = db_dir / ('%s.db' % repo_cfg.name) - db_rel_path = '%s/%s.db' % (date, repo_cfg.name) - - if not db_path.exists(): - logger.info( - dict( - msg='downloading db', - url=db_url, - dest=str(db_path), - ) - ) - pacman_t.download_db(db_url, db_path) - else: - logger.info( - dict( - msg='db already cached on disk', - path=str(db_path), - ) - ) - - db_sha256 = cache_db_t.file_sha256(db_path) - - snapshot_id = cache_db.upsert_snapshot( - date=date, - repo=repo_cfg.name, - arch=arch, - db_sha256=db_sha256, - db_rel_path=db_rel_path, - ) - - if cache_db.snapshot_package_count(snapshot_id) > 0: - snap = cache_db.get_snapshot_by_id(snapshot_id) - if snap is not None and snap.db_sha256 == db_sha256: - logger.info( - dict( - msg='snapshot already in sqlite', - date=date, - repo=repo_cfg.name, - snapshot_id=snapshot_id, - ) - ) - continue - - index = db_parser_t.parse_db_path(db_path, repo_name=repo_cfg.name) - - cache_db.store_index( - snapshot_id=snapshot_id, - index=index, - ) - - logger.info( - dict( - msg='synced', - date=date, - repo=repo_cfg.name, - packages=len(index.packages), - ) - ) - - @staticmethod - def _parse_date(s: str) -> datetime.date: - parts = s.split('/') - if len(parts) == 3: - return datetime.date(int(parts[0]), int(parts[1]), int(parts[2])) - return datetime.date.fromisoformat(s) - - @staticmethod - def _format_date(d: datetime.date) -> str: - return '%04d/%02d/%02d' % (d.year, d.month, d.day) - - @staticmethod - def sync_date_range( - start_date: str, - end_date: str, - cache_dir: pathlib.Path, - cache_db: cache_db_t, - repos: Optional[list[str]] = None, - arch: str = 'x86_64', - step_days: int = 1, - ) -> None: - start = archive_t._parse_date(start_date) - end = archive_t._parse_date(end_date) - step = datetime.timedelta(days=step_days) - - current = end - while current >= start: - date_str = archive_t._format_date(current) - - try: - archive_t.sync_date( - date=date_str, - cache_dir=cache_dir, - cache_db=cache_db, - repos=repos, - arch=arch, - ) - except Exception: - logger.warning( - dict( - msg='failed to sync date, skipping', - date=date_str, - ), - exc_info=True, - ) - - current -= step - - -def main(args: list[str]) -> int: - archive_parser = argparse.ArgumentParser( - prog='online-fxreader-pr34-archlinux archive', - ) - archive_parser.add_argument( - 'action', - choices=[o.value for o in ArchiveAction], - ) - archive_parser.add_argument( - '--cache-dir', - dest='cache_dir', - required=True, - help='directory for cached .db files and sqlite database', - ) - archive_parser.add_argument( - '--repos', - nargs='*', - default=['core', 'extra', 'multilib'], - ) - archive_parser.add_argument( - '--arch', - default='x86_64', - ) - archive_parser.add_argument( - '--date', - default=None, - help='single date (e.g. 2024/01/15) for sync', - ) - archive_parser.add_argument( - '--date-range', - dest='date_range', - nargs=2, - metavar=('START', 'END'), - default=None, - help='date range for sync (e.g. 2024/01/01 2024/06/30)', - ) - archive_parser.add_argument( - '--date-step', - dest='date_step', - type=int, - default=1, - help='step in days when iterating date range, default 1', - ) - archive_parser.add_argument( - '--packages', - nargs='*', - default=None, - help='package names for show-versions', - ) - - archive_options = archive_parser.parse_args(args) - archive_options.action = ArchiveAction(archive_options.action) - - cache_dir = pathlib.Path(archive_options.cache_dir) - cache_dir.mkdir(parents=True, exist_ok=True) - - db = cache_db_t(cache_dir / 'archlinux_cache.db') - - try: - if archive_options.action is ArchiveAction.list_dates: - if db.has_data(): - print('=== cached dates ===') - for date_str in db.list_dates(): - print(date_str) - - print('=== remote dates ===') - for date_str in archive_t.list_remote_dates(): - print(date_str) - - elif archive_options.action is ArchiveAction.list_packages: - for row in db.package_count_per_date(): - print('%s %d' % (row.date, row.count)) - - elif archive_options.action is ArchiveAction.show_versions: - if archive_options.packages is None or len(archive_options.packages) == 0: - logger.error('--packages required for show-versions') - return 1 - - for row in db.get_package_versions(archive_options.packages): - print('%s %s %s %s' % (row.date, row.repo, row.name, row.version)) - - elif archive_options.action is ArchiveAction.sync: - if archive_options.date is not None: - archive_t.sync_date( - date=archive_options.date, - cache_dir=cache_dir, - cache_db=db, - repos=archive_options.repos, - arch=archive_options.arch, - ) - elif archive_options.date_range is not None: - archive_t.sync_date_range( - start_date=archive_options.date_range[0], - end_date=archive_options.date_range[1], - cache_dir=cache_dir, - cache_db=db, - repos=archive_options.repos, - arch=archive_options.arch, - step_days=archive_options.date_step, - ) - else: - logger.error('sync requires --date or --date-range') - return 1 - else: - raise NotImplementedError - finally: - db.close() - - return 0 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/cache_db.py b/python/online/fxreader/pr34/commands_typed/archlinux/cache_db.py deleted file mode 100644 index f22e019..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/cache_db.py +++ /dev/null @@ -1,689 +0,0 @@ -import datetime -import hashlib -import io -import logging -import pathlib -import sqlite3 - -from typing import ( - ClassVar, - Generator, - Optional, - TypeVar, -) - -import pydantic - -from .models import ( - package_desc_t, - repo_index_t, -) - -logger = logging.getLogger(__name__) - -_T = TypeVar('_T', bound=pydantic.BaseModel) - - -class snapshot_row_t(pydantic.BaseModel): - id: int - date: str - repo: str - arch: str - db_sha256: str - db_rel_path: str - synced_at: str - - -class package_row_t(pydantic.BaseModel): - id: int - snapshot_id: int - name: str - version: str - base: str = '' - desc: str = '' - filename: str = '' - csize: int = 0 - isize: int = 0 - md5sum: str = '' - sha256sum: str = '' - url: str = '' - arch: str = '' - builddate: int = 0 - packager: str = '' - - -class package_version_row_t(pydantic.BaseModel): - date: str - repo: str - name: str - version: str - - -class date_count_row_t(pydantic.BaseModel): - date: str - count: int - - -class package_hash_row_t(pydantic.BaseModel): - sha256sum: str - - -class local_package_row_t(pydantic.BaseModel): - id: int - name: str - version: str - filename: str - sha256sum: str - local_path: str - downloaded_at: str - - -class signature_row_t(pydantic.BaseModel): - id: int - local_package_id: int - sig_path: str - keyring_package_version: Optional[str] = None - gpg_key_id: Optional[str] = None - verified_at: Optional[str] = None - - -class trusted_entry_t(pydantic.BaseModel, frozen=True): - name: str - version: str - - -def _stream_rows( - cur: sqlite3.Cursor, - model: type[_T], -) -> Generator[_T, None, None]: - columns = [desc[0] for desc in cur.description] - for raw in cur: - yield model.model_validate(dict(zip(columns, raw))) - - -def _fetch_one( - cur: sqlite3.Cursor, - model: type[_T], -) -> Optional[_T]: - columns = [desc[0] for desc in cur.description] - raw = cur.fetchone() - if raw is None: - return None - return model.model_validate(dict(zip(columns, raw))) - - -class cache_db_t: - class constants_t: - schema_version: ClassVar[int] = 1 - - list_relation_types: ClassVar[dict[str, str]] = { - 'license': 'license', - 'depends': 'depends', - 'optdepends': 'optdepends', - 'makedepends': 'makedepends', - 'checkdepends': 'checkdepends', - 'provides': 'provides', - 'conflicts': 'conflicts', - 'replaces': 'replaces', - 'groups': 'groups', - } - - def __init__(self, db_path: pathlib.Path) -> None: - self._db_path = db_path - self._conn = sqlite3.connect(str(db_path)) - self._conn.execute('PRAGMA journal_mode=WAL') - self._conn.execute('PRAGMA foreign_keys=ON') - self._ensure_schema() - - def close(self) -> None: - self._conn.close() - - def _ensure_schema(self) -> None: - cur = self._conn.cursor() - - cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='schema_meta'") - if cur.fetchone() is None: - self._create_schema(cur) - self._conn.commit() - return - - cur.execute('SELECT version FROM schema_meta LIMIT 1') - row = cur.fetchone() - if row is None or row[0] < cache_db_t.constants_t.schema_version: - self._create_schema(cur) - self._conn.commit() - - def _create_schema(self, cur: sqlite3.Cursor) -> None: - cur.executescript(""" - CREATE TABLE IF NOT EXISTS schema_meta ( - version INTEGER NOT NULL - ); - - CREATE TABLE IF NOT EXISTS snapshots ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - date TEXT NOT NULL, - repo TEXT NOT NULL, - arch TEXT NOT NULL DEFAULT 'x86_64', - db_sha256 TEXT NOT NULL, - db_rel_path TEXT NOT NULL DEFAULT '', - synced_at TEXT NOT NULL, - UNIQUE(date, repo, arch) - ); - - CREATE TABLE IF NOT EXISTS packages ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - snapshot_id INTEGER NOT NULL REFERENCES snapshots(id) ON DELETE CASCADE, - name TEXT NOT NULL, - version TEXT NOT NULL, - base TEXT NOT NULL DEFAULT '', - desc TEXT NOT NULL DEFAULT '', - filename TEXT NOT NULL DEFAULT '', - csize INTEGER NOT NULL DEFAULT 0, - isize INTEGER NOT NULL DEFAULT 0, - md5sum TEXT NOT NULL DEFAULT '', - sha256sum TEXT NOT NULL DEFAULT '', - url TEXT NOT NULL DEFAULT '', - arch TEXT NOT NULL DEFAULT '', - builddate INTEGER NOT NULL DEFAULT 0, - packager TEXT NOT NULL DEFAULT '', - UNIQUE(snapshot_id, name) - ); - - CREATE TABLE IF NOT EXISTS package_relations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE, - relation_type TEXT NOT NULL, - value TEXT NOT NULL - ); - - CREATE TABLE IF NOT EXISTS local_packages ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL, - version TEXT NOT NULL, - filename TEXT NOT NULL, - sha256sum TEXT NOT NULL DEFAULT '', - local_path TEXT NOT NULL, - downloaded_at TEXT NOT NULL, - UNIQUE(name, version, filename) - ); - - CREATE TABLE IF NOT EXISTS local_signatures ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - local_package_id INTEGER NOT NULL REFERENCES local_packages(id) ON DELETE CASCADE, - sig_path TEXT NOT NULL, - keyring_package_version TEXT DEFAULT NULL, - gpg_key_id TEXT DEFAULT NULL, - verified_at TEXT DEFAULT NULL, - UNIQUE(local_package_id) - ); - - CREATE INDEX IF NOT EXISTS idx_packages_snapshot ON packages(snapshot_id); - CREATE INDEX IF NOT EXISTS idx_packages_name ON packages(name); - CREATE INDEX IF NOT EXISTS idx_packages_name_version ON packages(name, version); - CREATE INDEX IF NOT EXISTS idx_snapshots_date ON snapshots(date); - CREATE INDEX IF NOT EXISTS idx_package_relations_pkg - ON package_relations(package_id, relation_type); - CREATE INDEX IF NOT EXISTS idx_local_packages_name_version - ON local_packages(name, version); - """) - - cur.execute('DELETE FROM schema_meta') - cur.execute( - 'INSERT INTO schema_meta (version) VALUES (?)', - (cache_db_t.constants_t.schema_version,), - ) - - # ── helpers ── - - @staticmethod - def file_sha256(path: pathlib.Path) -> str: - h = hashlib.sha256() - with io.open(path, 'rb') as f: - while True: - chunk = f.read(65536) - if not chunk: - break - h.update(chunk) - return h.hexdigest() - - # ── snapshot CRUD ── - - def upsert_snapshot( - self, - date: str, - repo: str, - arch: str, - db_sha256: str, - db_rel_path: str = '', - ) -> int: - now = datetime.datetime.now(datetime.timezone.utc).isoformat() - cur = self._conn.cursor() - - cur.execute( - 'SELECT id, db_sha256 FROM snapshots WHERE date=? AND repo=? AND arch=?', - (date, repo, arch), - ) - row = cur.fetchone() - - if row is not None: - snapshot_id: int = row[0] - if row[1] == db_sha256: - return snapshot_id - - cur.execute( - 'DELETE FROM packages WHERE snapshot_id=?', - (snapshot_id,), - ) - cur.execute( - 'UPDATE snapshots SET db_sha256=?, db_rel_path=?, synced_at=? WHERE id=?', - (db_sha256, db_rel_path, now, snapshot_id), - ) - self._conn.commit() - return snapshot_id - - cur.execute( - 'INSERT INTO snapshots (date, repo, arch, db_sha256, db_rel_path, synced_at) VALUES (?, ?, ?, ?, ?, ?)', - (date, repo, arch, db_sha256, db_rel_path, now), - ) - self._conn.commit() - assert cur.lastrowid is not None - return cur.lastrowid - - def get_snapshot( - self, - date: str, - repo: str, - arch: str, - ) -> Optional[snapshot_row_t]: - cur = self._conn.cursor() - cur.execute( - 'SELECT * FROM snapshots WHERE date=? AND repo=? AND arch=?', - (date, repo, arch), - ) - return _fetch_one(cur, snapshot_row_t) - - def get_snapshot_by_id( - self, - snapshot_id: int, - ) -> Optional[snapshot_row_t]: - cur = self._conn.cursor() - cur.execute( - 'SELECT * FROM snapshots WHERE id=?', - (snapshot_id,), - ) - return _fetch_one(cur, snapshot_row_t) - - def list_snapshots(self) -> Generator[snapshot_row_t, None, None]: - cur = self._conn.cursor() - cur.execute('SELECT * FROM snapshots ORDER BY date DESC, repo') - yield from _stream_rows(cur, snapshot_row_t) - - def list_dates(self) -> list[str]: - cur = self._conn.cursor() - cur.execute('SELECT DISTINCT date FROM snapshots ORDER BY date DESC') - return [row[0] for row in cur.fetchall()] - - def snapshot_package_count(self, snapshot_id: int) -> int: - cur = self._conn.cursor() - cur.execute( - 'SELECT COUNT(*) FROM packages WHERE snapshot_id=?', - (snapshot_id,), - ) - row = cur.fetchone() - return row[0] if row is not None else 0 - - # ── package CRUD ── - - def store_index( - self, - snapshot_id: int, - index: repo_index_t, - ) -> None: - cur = self._conn.cursor() - - pkg_rows: list[tuple[int, str, str, str, str, str, int, int, str, str, str, str, int, str]] = [] - for pkg in index.packages.values(): - pkg_rows.append( - ( - snapshot_id, - pkg.name, - pkg.version, - pkg.base, - pkg.desc, - pkg.filename, - pkg.csize, - pkg.isize, - pkg.md5sum, - pkg.sha256sum, - pkg.url, - pkg.arch, - pkg.builddate, - pkg.packager, - ) - ) - - cur.executemany( - 'INSERT OR REPLACE INTO packages ' - '(snapshot_id, name, version, base, desc, filename, csize, isize, ' - 'md5sum, sha256sum, url, arch, builddate, packager) ' - 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', - pkg_rows, - ) - - cur.execute( - 'SELECT id, name FROM packages WHERE snapshot_id=?', - (snapshot_id,), - ) - pkg_id_map: dict[str, int] = {} - for row_raw in cur.fetchall(): - pkg_id_map[row_raw[1]] = row_raw[0] - - rel_rows: list[tuple[int, str, str]] = [] - for pkg in index.packages.values(): - pkg_id = pkg_id_map.get(pkg.name) - if pkg_id is None: - continue - - for rel_type, attr_name in cache_db_t.constants_t.list_relation_types.items(): - values: list[str] = getattr(pkg, attr_name) - for v in values: - rel_rows.append((pkg_id, rel_type, v)) - - if len(rel_rows) > 0: - cur.executemany( - 'INSERT INTO package_relations (package_id, relation_type, value) VALUES (?, ?, ?)', - rel_rows, - ) - - self._conn.commit() - - logger.info( - dict( - msg='stored index', - snapshot_id=snapshot_id, - packages=len(pkg_rows), - relations=len(rel_rows), - ) - ) - - def package_count_per_date(self) -> Generator[date_count_row_t, None, None]: - cur = self._conn.cursor() - cur.execute('SELECT s.date AS date, COUNT(p.id) AS count FROM snapshots s JOIN packages p ON p.snapshot_id = s.id GROUP BY s.date ORDER BY s.date DESC') - yield from _stream_rows(cur, date_count_row_t) - - def get_package_versions( - self, - names: list[str], - ) -> Generator[package_version_row_t, None, None]: - if len(names) == 0: - yield from () - return - - cur = self._conn.cursor() - placeholders = ','.join('?' for _ in names) - cur.execute( - 'SELECT s.date AS date, s.repo AS repo, p.name AS name, p.version AS version ' - 'FROM packages p ' - 'JOIN snapshots s ON s.id = p.snapshot_id ' - 'WHERE p.name IN (%s) ' - 'ORDER BY p.name, s.date DESC' % placeholders, - names, - ) - yield from _stream_rows(cur, package_version_row_t) - - def find_package_hash( - self, - name: str, - version: str, - ) -> Optional[package_hash_row_t]: - cur = self._conn.cursor() - cur.execute( - "SELECT sha256sum FROM packages WHERE name=? AND version=? AND sha256sum != '' ORDER BY snapshot_id DESC LIMIT 1", - (name, version), - ) - return _fetch_one(cur, package_hash_row_t) - - # ── repo_index_t loading ── - - def load_repo_index( - self, - snapshot_id: int, - repo_name: str, - ) -> repo_index_t: - cur = self._conn.cursor() - - cur.execute( - 'SELECT * FROM packages WHERE snapshot_id=?', - (snapshot_id,), - ) - - index = repo_index_t(name=repo_name) - - pkg_ids: list[int] = [] - pkg_by_id: dict[int, package_desc_t] = {} - - columns = [desc[0] for desc in cur.description] - for raw in cur.fetchall(): - row_dict = dict(zip(columns, raw)) - pkg = package_desc_t( - name=row_dict['name'], - version=row_dict['version'], - base=row_dict['base'], - desc=row_dict['desc'], - filename=row_dict['filename'], - csize=row_dict['csize'], - isize=row_dict['isize'], - md5sum=row_dict['md5sum'], - sha256sum=row_dict['sha256sum'], - url=row_dict['url'], - arch=row_dict['arch'], - builddate=row_dict['builddate'], - packager=row_dict['packager'], - ) - index.packages[pkg.name] = pkg - pkg_ids.append(row_dict['id']) - pkg_by_id[row_dict['id']] = pkg - - if len(pkg_ids) > 0: - self._load_relations(cur, pkg_ids, pkg_by_id) - - index.build_provides_index() - return index - - def load_all_indices(self) -> list[repo_index_t]: - """Load all snapshots as repo_index_t objects via bulk queries. - - Returns one index per (snapshot_id, repo) so the solver sees all - package versions across all synced dates. Uses two bulk queries - instead of per-snapshot loading for performance. - """ - cur = self._conn.cursor() - - cur.execute('SELECT * FROM snapshots ORDER BY date ASC') - snap_columns = [desc[0] for desc in cur.description] - snapshots = [dict(zip(snap_columns, raw)) for raw in cur.fetchall()] - - cur.execute( - 'SELECT id, snapshot_id, name, version, base, desc, filename, csize, isize, md5sum, sha256sum, url, arch, builddate, packager FROM packages' - ) - pkg_columns = [desc[0] for desc in cur.description] - - pkgs_by_snapshot: dict[int, dict[str, package_desc_t]] = {} - all_pkg_ids: list[int] = [] - pkg_by_id: dict[int, package_desc_t] = {} - - for raw in cur.fetchall(): - rd = dict(zip(pkg_columns, raw)) - pkg = package_desc_t( - name=rd['name'], - version=rd['version'], - base=rd['base'], - desc=rd['desc'], - filename=rd['filename'], - csize=rd['csize'], - isize=rd['isize'], - md5sum=rd['md5sum'], - sha256sum=rd['sha256sum'], - url=rd['url'], - arch=rd['arch'], - builddate=rd['builddate'], - packager=rd['packager'], - ) - snap_id: int = rd['snapshot_id'] - if snap_id not in pkgs_by_snapshot: - pkgs_by_snapshot[snap_id] = {} - pkgs_by_snapshot[snap_id][pkg.name] = pkg - all_pkg_ids.append(rd['id']) - pkg_by_id[rd['id']] = pkg - - if len(all_pkg_ids) > 0: - self._load_relations(cur, all_pkg_ids, pkg_by_id) - - indices: list[repo_index_t] = [] - for snap in snapshots: - pkgs = pkgs_by_snapshot.get(snap['id']) - if pkgs is None or len(pkgs) == 0: - continue - idx = repo_index_t(name=snap['repo'], packages=pkgs) - idx.build_provides_index() - indices.append(idx) - - return indices - - def _load_relations( - self, - cur: sqlite3.Cursor, - pkg_ids: list[int], - pkg_by_id: dict[int, package_desc_t], - ) -> None: - batch_size = 500 - for i in range(0, len(pkg_ids), batch_size): - batch = pkg_ids[i : i + batch_size] - placeholders = ','.join('?' for _ in batch) - cur.execute( - 'SELECT package_id, relation_type, value FROM package_relations WHERE package_id IN (%s)' % placeholders, - batch, - ) - for row_raw in cur.fetchall(): - pkg = pkg_by_id.get(row_raw[0]) - if pkg is None: - continue - - attr_name = cache_db_t.constants_t.list_relation_types.get(row_raw[1]) - if attr_name is None: - continue - - target_list: list[str] = getattr(pkg, attr_name) - target_list.append(row_raw[2]) - - # ── local packages & signatures ── - - def record_local_package( - self, - name: str, - version: str, - filename: str, - sha256sum: str, - local_path: str, - ) -> int: - now = datetime.datetime.now(datetime.timezone.utc).isoformat() - cur = self._conn.cursor() - cur.execute( - 'INSERT OR REPLACE INTO local_packages (name, version, filename, sha256sum, local_path, downloaded_at) VALUES (?, ?, ?, ?, ?, ?)', - (name, version, filename, sha256sum, local_path, now), - ) - self._conn.commit() - assert cur.lastrowid is not None - return cur.lastrowid - - def record_signature( - self, - local_package_id: int, - sig_path: str, - keyring_package_version: Optional[str] = None, - gpg_key_id: Optional[str] = None, - ) -> None: - now = datetime.datetime.now(datetime.timezone.utc).isoformat() - cur = self._conn.cursor() - cur.execute( - 'INSERT OR REPLACE INTO local_signatures (local_package_id, sig_path, keyring_package_version, gpg_key_id, verified_at) VALUES (?, ?, ?, ?, ?)', - (local_package_id, sig_path, keyring_package_version, gpg_key_id, now), - ) - self._conn.commit() - - def get_signature_info( - self, - name: str, - version: str, - ) -> Optional[signature_row_t]: - cur = self._conn.cursor() - cur.execute( - 'SELECT ls.id, ls.local_package_id, ls.sig_path, ' - 'ls.keyring_package_version, ls.gpg_key_id, ls.verified_at ' - 'FROM local_signatures ls ' - 'JOIN local_packages lp ON lp.id = ls.local_package_id ' - 'WHERE lp.name=? AND lp.version=?', - (name, version), - ) - return _fetch_one(cur, signature_row_t) - - def get_trusted_package_set( - self, - trust_keyring_versions: Optional[list[str]] = None, - trust_gpg_keys: Optional[list[str]] = None, - exclude_keyring_versions: Optional[list[str]] = None, - exclude_gpg_keys: Optional[list[str]] = None, - ) -> Optional[set[trusted_entry_t]]: - """Return set of trusted (name, version) entries that pass trust filters. - - Returns None if no trust filters are set (meaning all packages pass). - """ - has_filters = ( - (trust_keyring_versions is not None and len(trust_keyring_versions) > 0) - or (trust_gpg_keys is not None and len(trust_gpg_keys) > 0) - or (exclude_keyring_versions is not None and len(exclude_keyring_versions) > 0) - or (exclude_gpg_keys is not None and len(exclude_gpg_keys) > 0) - ) - if not has_filters: - return None - - cur = self._conn.cursor() - cur.execute( - 'SELECT lp.name, lp.version, ls.keyring_package_version, ls.gpg_key_id ' - 'FROM local_packages lp ' - 'JOIN local_signatures ls ON ls.local_package_id = lp.id' - ) - - trusted: set[trusted_entry_t] = set() - - for row_raw in cur.fetchall(): - keyring_ver = row_raw[2] - gpg_key = row_raw[3] - - if exclude_keyring_versions and keyring_ver in exclude_keyring_versions: - continue - if exclude_gpg_keys and gpg_key in exclude_gpg_keys: - continue - - is_trusted = False - - if trust_keyring_versions and keyring_ver in trust_keyring_versions: - is_trusted = True - if trust_gpg_keys and gpg_key in trust_gpg_keys: - is_trusted = True - - if not trust_keyring_versions and not trust_gpg_keys: - is_trusted = True - - if is_trusted: - trusted.add(trusted_entry_t(name=row_raw[0], version=row_raw[1])) - - return trusted - - # ── status ── - - def has_data(self) -> bool: - cur = self._conn.cursor() - cur.execute('SELECT COUNT(*) FROM snapshots') - row = cur.fetchone() - return row is not None and row[0] > 0 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/cli.py b/python/online/fxreader/pr34/commands_typed/archlinux/cli.py deleted file mode 100644 index 971c012..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/cli.py +++ /dev/null @@ -1,442 +0,0 @@ -import argparse -import enum -import logging -import math -import pathlib -import re -import subprocess -import sys -import urllib.request - -from typing import ( - ClassVar, - Optional, -) - -logger = logging.getLogger(__name__) - - -class Command(enum.Enum): - list_installed = 'list-installed' - compile = 'compile' - download = 'download' - archive = 'archive' - - -class parse_rate_t: - class constants_t: - rate_re: ClassVar[re.Pattern[str]] = re.compile(r'^(\d+(?:\.\d+)?)\s*([bBkKmMgGpPtT]?)(?:[iI]?[bB])?(?:/s)?$') - - units: ClassVar[dict[str, int]] = { - '': 0, - 'b': 0, - 'B': 0, - 'k': 1, - 'K': 1, - 'm': 2, - 'M': 2, - 'g': 3, - 'G': 3, - 't': 4, - 'T': 4, - 'p': 5, - 'P': 5, - } - - @staticmethod - def parse(s: str) -> int: - m = parse_rate_t.constants_t.rate_re.match(s.strip()) - if not m: - raise ValueError('invalid rate: %s' % s) - - value = float(m.group(1)) - unit = m.group(2) - - power = parse_rate_t.constants_t.units.get(unit, 0) - - return int(value * (1024**power)) - - -class downloader_t: - class constants_t: - class backend_t(enum.Enum): - urllib = 'urllib' - curl = 'curl' - aria2c = 'aria2c' - - @staticmethod - def download( - url: str, - dest: pathlib.Path, - backend: 'downloader_t.constants_t.backend_t', - limit_rate: int, - ) -> None: - dest.parent.mkdir(parents=True, exist_ok=True) - - if backend is downloader_t.constants_t.backend_t.urllib: - urllib.request.urlretrieve(url, str(dest)) - elif backend is downloader_t.constants_t.backend_t.curl: - cmd = [ - 'curl', - '-fSL', - '--limit-rate', - '%d' % limit_rate, - '-o', - str(dest), - url, - ] - subprocess.check_call(cmd) - elif backend is downloader_t.constants_t.backend_t.aria2c: - cmd = [ - 'aria2c', - '--max-download-limit=%d' % limit_rate, - '-d', - str(dest.parent), - '-o', - dest.name, - url, - ] - subprocess.check_call(cmd) - else: - raise NotImplementedError - - -class download_requirements_t: - @staticmethod - def parse_requirements(txt: str) -> list[tuple[str, str]]: - entries: list[tuple[str, str]] = [] - url: Optional[str] = None - - for line in txt.splitlines(): - line = line.strip() - if line == '': - continue - if line.startswith('#'): - candidate = line[1:].strip() - if '/' in candidate and '://' in candidate: - url = candidate - continue - - parts = line.split() - if len(parts) == 0: - continue - - pkg_spec = parts[0] - - if url is not None: - filename = url.rsplit('/', 1)[-1] if '/' in url else pkg_spec - entries.append((url, filename)) - url = None - - return entries - - -def _find_cached_pkg( - cache_dir: pathlib.Path, - name: str, - version: str, -) -> Optional[pathlib.Path]: - """Find a cached .pkg.tar.* file for a given package name and version.""" - for suffix in ['.pkg.tar.zst', '.pkg.tar.xz', '.pkg.tar.gz', '.pkg.tar.bz2', '.pkg.tar']: - for arch in ['x86_64', 'any']: - candidate = cache_dir / ('%s-%s-%s%s' % (name, version, arch, suffix)) - if candidate.exists(): - return candidate - return None - - -def main(argv: Optional[list[str]] = None) -> int: - if argv is None: - argv = sys.argv[1:] - - logging.basicConfig(level=logging.INFO) - - parser = argparse.ArgumentParser( - prog='online-fxreader-pr34-archlinux', - description='Arch Linux package management tools', - ) - parser.add_argument( - 'command', - choices=[o.value for o in Command], - ) - - options, args = parser.parse_known_args(argv) - options.command = Command(options.command) - - if options.command is Command.list_installed: - import hashlib - - from .pacman import pacman_t - - list_parser = argparse.ArgumentParser() - list_parser.add_argument( - '--format', - choices=['plain', 'constraints', 'compiled'], - default='plain', - help='plain: name version; constraints: name>=version; compiled: name==version with optional hashes', - ) - list_parser.add_argument( - '--generate-hashes', - action='store_true', - default=False, - help='include sha256 from local /var/cache/pacman/pkg/ files; fails if file not found for any package', - ) - list_parser.add_argument( - '--db-path', - dest='db_path', - default='/var/lib/pacman', - help='pacman db path, default /var/lib/pacman', - ) - list_parser.add_argument( - '--pkg-cache-dir', - dest='pkg_cache_dir', - default='/var/cache/pacman/pkg', - help='local pacman package cache directory, default /var/cache/pacman/pkg', - ) - - list_options = list_parser.parse_args(args) - - installed = pacman_t.list_installed_simple( - db_path=pathlib.Path(list_options.db_path), - ) - - pkg_cache_dir = pathlib.Path(list_options.pkg_cache_dir) - - if list_options.format == 'plain': - for name, version in installed: - print('%s %s' % (name, version)) - elif list_options.format == 'constraints': - for name, version in installed: - print('%s>=%s' % (name, version)) - elif list_options.format == 'compiled': - missing_hashes: list[str] = [] - - for name, version in installed: - line = '%s==%s' % (name, version) - - if list_options.generate_hashes: - pkg_file = _find_cached_pkg( - pkg_cache_dir, - name, - version, - ) - - if pkg_file is not None: - h = hashlib.sha256() - with open(pkg_file, 'rb') as fh: - while True: - chunk = fh.read(65536) - if not chunk: - break - h.update(chunk) - line += ' --hash=sha256:%s' % h.hexdigest() - else: - missing_hashes.append(name) - - print(line) - - if len(missing_hashes) > 0: - logger.error( - "can't determine checksum of installed package(s) - no cached file found for %d package(s): %s" % (len(missing_hashes), missing_hashes) - ) - return 1 - - return 0 - elif options.command is Command.compile: - compile_parser = argparse.ArgumentParser() - compile_parser.add_argument( - 'packages', - nargs='*', - ) - compile_parser.add_argument( - '-r', - dest='requirements_file', - default=None, - help='path to file with package constraints (one per line)', - ) - compile_parser.add_argument( - '--index', - dest='index_url', - default=None, - help='mirror URL', - ) - compile_parser.add_argument( - '--archive-date', - dest='archive_date', - default=None, - help='Arch Linux Archive date (e.g. 2024/01/15)', - ) - compile_parser.add_argument( - '--offline', - action='store_true', - default=False, - ) - compile_parser.add_argument( - '--no-cache', - action='store_true', - default=False, - ) - compile_parser.add_argument( - '--generate-hashes', - action='store_true', - default=False, - ) - compile_parser.add_argument( - '--cache-dir', - dest='cache_dir', - default=None, - ) - compile_parser.add_argument( - '--repos', - nargs='*', - default=['core', 'extra', 'multilib'], - ) - compile_parser.add_argument( - '--arch', - default='x86_64', - ) - compile_parser.add_argument( - '--backend', - choices=['python', 'solv'], - default='solv', - ) - compile_parser.add_argument( - '--archive-cache', - dest='archive_cache', - default=None, - help='path to archive cache dir (with archlinux_cache.db from archive sync); loads all synced dates into the solver pool', - ) - compile_parser.add_argument( - '--reference', - default=None, - help='path to previously compiled requirements file to use as version pins', - ) - compile_parser.add_argument( - '--resolution-strategy', - dest='resolution_strategy', - choices=['upgrade-all', 'pin-referenced'], - default='upgrade-all', - help='upgrade-all: resolve fresh; pin-referenced: keep referenced versions, only upgrade explicitly requested packages', - ) - - compile_options = compile_parser.parse_args(args) - - from .models import compile_options_t, resolution_strategy_t - - packages: list[str] = list(compile_options.packages) - - if compile_options.requirements_file is not None: - for line in pathlib.Path(compile_options.requirements_file).read_text().splitlines(): - line = line.strip() - if line != '' and not line.startswith('#'): - packages.append(line) - - opts = compile_options_t( - packages=packages, - index_url=compile_options.index_url, - archive_date=compile_options.archive_date, - offline=compile_options.offline, - no_cache=compile_options.no_cache, - generate_hashes=compile_options.generate_hashes, - repos=compile_options.repos, - arch=compile_options.arch, - cache_dir=compile_options.cache_dir, - reference=compile_options.reference, - resolution_strategy=resolution_strategy_t(compile_options.resolution_strategy), - ) - - try: - if compile_options.backend == 'solv': - from .solv_backend import compile_solv_t, repo_store_t - - stores = None - if compile_options.archive_cache is not None: - from .cache_db import cache_db_t - - archive_cache_dir = pathlib.Path(compile_options.archive_cache) - db_path = archive_cache_dir / 'archlinux_cache.db' - if db_path.exists(): - cache_db = cache_db_t(db_path) - indices = cache_db.load_all_indices() - cache_db.close() - stores = [repo_store_t(index=idx) for idx in indices] - - result = compile_solv_t.compile(opts, stores=stores) - else: - from .compile import compile_t - - result = compile_t.compile(opts) - except RuntimeError as e: - logger.error(str(e)) - return 1 - - print(result.txt) - - return 0 - elif options.command is Command.download: - download_parser = argparse.ArgumentParser() - download_parser.add_argument( - '-r', - dest='requirements', - required=True, - help='path to compiled requirements file', - ) - download_parser.add_argument( - '-d', - dest='dest_dir', - required=True, - help='destination directory for downloaded packages', - ) - download_parser.add_argument( - '--downloader', - choices=[o.value for o in downloader_t.constants_t.backend_t], - default='urllib', - ) - download_parser.add_argument( - '--limit-rate', - dest='limit_rate', - default='128KiB/s', - help='download speed limit (e.g. 128KiB/s, 1MiB/s, 512K), default 128KiB/s', - ) - - download_options = download_parser.parse_args(args) - - dest_dir = pathlib.Path(download_options.dest_dir) - dest_dir.mkdir(parents=True, exist_ok=True) - - backend = downloader_t.constants_t.backend_t(download_options.downloader) - limit_rate = parse_rate_t.parse(download_options.limit_rate) - - requirements_txt = pathlib.Path(download_options.requirements).read_text() - entries = download_requirements_t.parse_requirements(requirements_txt) - - count = 0 - for url, filename in entries: - dest_path = dest_dir / filename - - if dest_path.exists(): - logger.info(dict(msg='already downloaded', path=str(dest_path))) - else: - logger.info(dict(msg='downloading', url=url, dest=str(dest_path), backend=backend.value, limit_rate=limit_rate)) - downloader_t.download( - url=url, - dest=dest_path, - backend=backend, - limit_rate=limit_rate, - ) - - count += 1 - - logger.info(dict(msg='download complete', count=count)) - - return 0 - elif options.command is Command.archive: - from . import archive as _archive - - return _archive.main(args) - else: - raise NotImplementedError - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/compile.py b/python/online/fxreader/pr34/commands_typed/archlinux/compile.py deleted file mode 100644 index b3ea697..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/compile.py +++ /dev/null @@ -1,147 +0,0 @@ -import io -import hashlib -import pathlib -import tempfile -import logging - -from typing import ( - Optional, - Any, -) - -from .models import ( - compile_options_t, - compile_entry_t, - compile_result_t, - mirror_config_t, - repo_index_t, -) - -from .db import db_parser_t -from .pacman import pacman_t -from .resolver import resolver_t - -logger = logging.getLogger(__name__) - - -class compile_t: - @staticmethod - def build_mirror_config(options: compile_options_t) -> mirror_config_t: - if options.archive_date is not None: - return mirror_config_t.from_archive_date( - date=options.archive_date, - repos=options.repos, - arch=options.arch, - ) - elif options.index_url is not None: - return mirror_config_t.from_mirror_url( - mirror_url=options.index_url, - repos=options.repos, - arch=options.arch, - ) - else: - return mirror_config_t.from_mirror_url( - mirror_url='https://archive.archlinux.org/repos/last', - repos=options.repos, - arch=options.arch, - ) - - @staticmethod - def fetch_indices( - mirror: mirror_config_t, - cache_dir: Optional[pathlib.Path] = None, - no_cache: bool = False, - offline: bool = False, - ) -> list[repo_index_t]: - indices: list[repo_index_t] = [] - - for repo in mirror.repos: - db_url = '%s/%s.db' % (repo.url, repo.name) - - if cache_dir is not None and not no_cache: - cached_path = cache_dir / ('%s.db' % repo.name) - - if cached_path.exists(): - logger.info( - dict( - repo=repo.name, - msg='using cached db', - path=str(cached_path), - ) - ) - index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name) - indices.append(index) - continue - - if offline: - raise FileNotFoundError('offline mode: cached db not found for %s at %s' % (repo.name, str(cached_path))) - - pacman_t.download_db(db_url, cached_path) - index = db_parser_t.parse_db_path(cached_path, repo_name=repo.name) - indices.append(index) - else: - if offline: - raise FileNotFoundError('offline mode requires --cache-dir with pre-fetched db files') - - with tempfile.NamedTemporaryFile(suffix='.db') as tmp: - pacman_t.download_db(db_url, pathlib.Path(tmp.name)) - index = db_parser_t.parse_db_path(pathlib.Path(tmp.name), repo_name=repo.name) - indices.append(index) - - return indices - - @staticmethod - def compile( - options: compile_options_t, - ) -> compile_result_t.res_t: - mirror = compile_t.build_mirror_config(options) - - cache_dir: Optional[pathlib.Path] = None - if options.cache_dir is not None: - cache_dir = pathlib.Path(options.cache_dir) - cache_dir.mkdir(parents=True, exist_ok=True) - - indices = compile_t.fetch_indices( - mirror=mirror, - cache_dir=cache_dir, - no_cache=options.no_cache, - offline=options.offline, - ) - - resolved = resolver_t.resolve( - packages=options.packages, - indices=indices, - ) - - result = compile_result_t.res_t() - - for pkg_name in resolved.resolution_order: - pkg = resolved.resolved[pkg_name] - - repo_name = '' - for idx in indices: - if pkg_name in idx.packages: - repo_name = idx.name - break - - repo_url = '' - for repo_cfg in mirror.repos: - if repo_cfg.name == repo_name: - repo_url = repo_cfg.url - break - - entry = compile_entry_t( - name=pkg.name, - version=pkg.version, - filename=pkg.filename, - repo=repo_name, - url='%s/%s' % (repo_url, pkg.filename) if repo_url and pkg.filename else '', - sha256=pkg.sha256sum if options.generate_hashes else '', - depends=pkg.depends, - ) - - result.entries.append(entry) - - result.txt = result.to_txt() - - return result diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/db.py b/python/online/fxreader/pr34/commands_typed/archlinux/db.py deleted file mode 100644 index 715d358..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/db.py +++ /dev/null @@ -1,157 +0,0 @@ -import io -import re -import tarfile -import logging -import pathlib - -from typing import ( - ClassVar, - Optional, - Any, - BinaryIO, -) - -from .models import ( - package_desc_t, - repo_index_t, -) - -logger = logging.getLogger(__name__) - - -class db_parser_t: - class constants_t: - field_re: ClassVar[re.Pattern[str]] = re.compile(r'^%([A-Z0-9]+)%$') - - list_fields: ClassVar[set[str]] = { - 'LICENSE', - 'DEPENDS', - 'OPTDEPENDS', - 'MAKEDEPENDS', - 'CHECKDEPENDS', - 'PROVIDES', - 'CONFLICTS', - 'REPLACES', - 'GROUPS', - } - - field_map: ClassVar[dict[str, str]] = { - 'FILENAME': 'filename', - 'NAME': 'name', - 'VERSION': 'version', - 'DESC': 'desc', - 'CSIZE': 'csize', - 'ISIZE': 'isize', - 'MD5SUM': 'md5sum', - 'SHA256SUM': 'sha256sum', - 'URL': 'url', - 'ARCH': 'arch', - 'BUILDDATE': 'builddate', - 'PACKAGER': 'packager', - 'LICENSE': 'license', - 'DEPENDS': 'depends', - 'OPTDEPENDS': 'optdepends', - 'MAKEDEPENDS': 'makedepends', - 'CHECKDEPENDS': 'checkdepends', - 'PROVIDES': 'provides', - 'CONFLICTS': 'conflicts', - 'REPLACES': 'replaces', - 'GROUPS': 'groups', - 'BASE': 'base', - } - - int_fields: ClassVar[set[str]] = { - 'CSIZE', - 'ISIZE', - 'BUILDDATE', - } - - @staticmethod - def parse_desc(content: str) -> package_desc_t: - fields: dict[str, Any] = {} - lines = content.split('\n') - i = 0 - - while i < len(lines): - line = lines[i].strip() - - if line == '': - i += 1 - continue - - m = db_parser_t.constants_t.field_re.match(line) - if not m: - i += 1 - continue - - field_name = m.group(1) - i += 1 - - values: list[str] = [] - while i < len(lines) and lines[i].strip() != '': - values.append(lines[i].strip()) - i += 1 - - attr_name = db_parser_t.constants_t.field_map.get(field_name) - if attr_name is None: - continue - - if field_name in db_parser_t.constants_t.list_fields: - fields[attr_name] = values - elif field_name in db_parser_t.constants_t.int_fields: - fields[attr_name] = int(values[0]) if len(values) > 0 else 0 - else: - fields[attr_name] = values[0] if len(values) > 0 else '' - - if 'name' not in fields or 'version' not in fields: - raise ValueError('desc missing NAME or VERSION') - - return package_desc_t(**fields) - - @staticmethod - def parse_db( - f: BinaryIO, - repo_name: str = '', - ) -> repo_index_t: - index = repo_index_t(name=repo_name) - - with tarfile.open(fileobj=f, mode='r:*') as tar: - desc_members: list[tarfile.TarInfo] = [] - - for member in tar.getmembers(): - if member.name.endswith('/desc') and member.isfile(): - desc_members.append(member) - - for member in desc_members: - extracted = tar.extractfile(member) - if extracted is None: - continue - - content = extracted.read().decode('utf-8') - extracted.close() - - try: - pkg = db_parser_t.parse_desc(content) - index.packages[pkg.name] = pkg - except ValueError: - logger.warning( - dict( - member=member.name, - msg='failed to parse desc', - ) - ) - - index.build_provides_index() - - return index - - @staticmethod - def parse_db_path( - path: pathlib.Path, - repo_name: Optional[str] = None, - ) -> repo_index_t: - if repo_name is None: - repo_name = path.stem.split('.')[0] - - with io.open(path, 'rb') as f: - return db_parser_t.parse_db(f, repo_name=repo_name) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/pacman.py b/python/online/fxreader/pr34/commands_typed/archlinux/pacman.py deleted file mode 100644 index 3caf8ec..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/pacman.py +++ /dev/null @@ -1,182 +0,0 @@ -import re -import subprocess -import pathlib -import dataclasses -import logging - -from typing import ( - ClassVar, - Optional, - Any, -) - -from .models import ( - package_desc_t, -) - -logger = logging.getLogger(__name__) - - -class pacman_t: - class constants_t: - default_db_path: ClassVar[pathlib.Path] = pathlib.Path('/var/lib/pacman') - default_cache_dir: ClassVar[pathlib.Path] = pathlib.Path('/var/cache/pacman/pkg') - field_re: ClassVar[re.Pattern[str]] = re.compile(r'^([A-Za-z ]+?)\s*:\s*(.*)$') - - @dataclasses.dataclass - class query_entry_t: - name: str - version: str - description: str = '' - architecture: str = '' - url: str = '' - depends_on: list[str] = dataclasses.field(default_factory=lambda: list[str]()) - provides: list[str] = dataclasses.field(default_factory=lambda: list[str]()) - conflicts_with: list[str] = dataclasses.field(default_factory=lambda: list[str]()) - replaces: list[str] = dataclasses.field(default_factory=lambda: list[str]()) - install_size: str = '' - packager: str = '' - groups: list[str] = dataclasses.field(default_factory=lambda: list[str]()) - - class list_installed_t: - @dataclasses.dataclass - class res_t: - packages: list['pacman_t.query_entry_t'] = dataclasses.field(default_factory=lambda: list[pacman_t.query_entry_t]()) - - @staticmethod - def parse_info_block(block: str) -> 'pacman_t.query_entry_t': - fields: dict[str, list[str]] = {} - current_key: Optional[str] = None - - for line in block.split('\n'): - m = pacman_t.constants_t.field_re.match(line) - if m: - current_key = m.group(1).strip() - value = m.group(2).strip() - assert isinstance(current_key, str) - if current_key not in fields: - fields[current_key] = [] - if value and value != 'None': - fields[current_key].append(value) - elif current_key and line.startswith(' '): - value = line.strip() - if value and value != 'None': - fields[current_key].append(value) - - name = fields.get('Name', [''])[0] - version = fields.get('Version', [''])[0] - - if not name or not version: - raise ValueError('missing Name or Version in block') - - return pacman_t.query_entry_t( - name=name, - version=version, - description=fields.get('Description', [''])[0] if fields.get('Description') else '', - architecture=fields.get('Architecture', [''])[0] if fields.get('Architecture') else '', - url=fields.get('URL', [''])[0] if fields.get('URL') else '', - depends_on=fields.get('Depends On', []), - provides=fields.get('Provides', []), - conflicts_with=fields.get('Conflicts With', []), - replaces=fields.get('Replaces', []), - install_size=fields.get('Installed Size', [''])[0] if fields.get('Installed Size') else '', - packager=fields.get('Packager', [''])[0] if fields.get('Packager') else '', - groups=fields.get('Groups', []), - ) - - @staticmethod - def list_installed( - db_path: Optional[pathlib.Path] = None, - ) -> 'pacman_t.list_installed_t.res_t': - cmd: list[str] = ['pacman', '-Qi'] - - if db_path is not None: - cmd.extend(['--dbpath', str(db_path)]) - - output = subprocess.check_output( - cmd, - stderr=subprocess.DEVNULL, - ).decode('utf-8') - - blocks = output.split('\n\n') - result = pacman_t.list_installed_t.res_t() - - for block in blocks: - block = block.strip() - if not block: - continue - - try: - entry = pacman_t.parse_info_block(block) - result.packages.append(entry) - except ValueError: - logger.warning( - dict( - msg='failed to parse pacman info block', - ) - ) - - return result - - @staticmethod - def list_installed_simple( - db_path: Optional[pathlib.Path] = None, - ) -> list[tuple[str, str]]: - cmd: list[str] = ['pacman', '-Q'] - - if db_path is not None: - cmd.extend(['--dbpath', str(db_path)]) - - output = subprocess.check_output( - cmd, - stderr=subprocess.DEVNULL, - ).decode('utf-8') - - result: list[tuple[str, str]] = [] - - for line in output.strip().split('\n'): - parts = line.strip().split(None, 1) - if len(parts) == 2: - result.append((parts[0], parts[1])) - - return result - - @staticmethod - def sync_db( - mirror_url: str, - db_path: pathlib.Path, - repos: Optional[list[str]] = None, - ) -> None: - if repos is None: - repos = ['core', 'extra', 'multilib'] - - cmd: list[str] = [ - 'pacman', - '-Sy', - '--dbpath', - str(db_path), - ] - - subprocess.check_call(cmd) - - @staticmethod - def download_db( - url: str, - output_path: pathlib.Path, - ) -> None: - import urllib.request - - logger.info( - dict( - url=url, - output_path=str(output_path), - msg='downloading db', - ) - ) - - output_path.parent.mkdir(parents=True, exist_ok=True) - - urllib.request.urlretrieve( - url, - str(output_path), - ) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/resolver.py b/python/online/fxreader/pr34/commands_typed/archlinux/resolver.py deleted file mode 100644 index 7849009..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/resolver.py +++ /dev/null @@ -1,161 +0,0 @@ -import dataclasses -import logging - -from typing import ( - Optional, - Any, -) - -from .models import ( - package_desc_t, - package_constraint_t, - repo_index_t, - vercmp_t, -) - -logger = logging.getLogger(__name__) - - -class resolver_t: - class error_t: - class not_found_t(Exception): - def __init__(self, name: str) -> None: - self.name = name - super().__init__('package not found: %s' % name) - - class conflict_t(Exception): - def __init__(self, pkg_a: str, pkg_b: str, constraint: str) -> None: - self.pkg_a = pkg_a - self.pkg_b = pkg_b - self.constraint = constraint - super().__init__('conflict: %s conflicts with %s (%s)' % (pkg_a, pkg_b, constraint)) - - class unsatisfied_t(Exception): - def __init__(self, parent: str, dep: str) -> None: - self.parent = parent - self.dep = dep - super().__init__('unsatisfied dependency: %s requires %s' % (parent, dep)) - - @dataclasses.dataclass - class res_t: - resolved: dict[str, package_desc_t] = dataclasses.field(default_factory=lambda: dict[str, package_desc_t]()) - resolution_order: list[str] = dataclasses.field(default_factory=lambda: list[str]()) - - @staticmethod - def _find_provider( - constraint: package_constraint_t, - indices: list[repo_index_t], - ) -> Optional[tuple[package_desc_t, str]]: - for index in indices: - if constraint.name in index.packages: - pkg = index.packages[constraint.name] - if constraint.satisfied_by(pkg.version): - return (pkg, index.name) - - for index in indices: - if constraint.name in index.provides_index: - for provider_name in index.provides_index[constraint.name]: - pkg = index.packages[provider_name] - for prov in pkg.parsed_provides(): - if prov.name == constraint.name: - if constraint.version is None or prov.version is None: - return (pkg, index.name) - if constraint.satisfied_by(prov.version): - return (pkg, index.name) - - return None - - @staticmethod - def resolve( - packages: list[str], - indices: list[repo_index_t], - skip_installed: Optional[set[str]] = None, - ) -> 'resolver_t.res_t': - if skip_installed is None: - skip_installed = set() - - result = resolver_t.res_t() - visited: set[str] = set() - stack: list[tuple[package_constraint_t, Optional[str]]] = [] - - for pkg_str in packages: - constraint = package_constraint_t.parse(pkg_str) - stack.append((constraint, None)) - - while len(stack) > 0: - constraint, parent = stack.pop() - - if constraint.name in visited: - if constraint.name in result.resolved: - pkg = result.resolved[constraint.name] - if not constraint.satisfied_by(pkg.version): - raise resolver_t.error_t.unsatisfied_t( - parent=parent or '', - dep=constraint.to_str(), - ) - continue - - if constraint.name in skip_installed: - visited.add(constraint.name) - continue - - found = resolver_t._find_provider(constraint, indices) - - if found is None: - exists = any(constraint.name in idx.packages or constraint.name in idx.provides_index for idx in indices) - if exists: - raise resolver_t.error_t.unsatisfied_t( - parent=parent or '', - dep=constraint.to_str(), - ) - raise resolver_t.error_t.not_found_t(constraint.name) - - pkg, repo_name = found - - if pkg.name in visited: - if pkg.name in result.resolved and constraint.op is not None: - resolved_pkg = result.resolved[pkg.name] - if constraint.name == resolved_pkg.name: - if not constraint.satisfied_by(resolved_pkg.version): - raise resolver_t.error_t.unsatisfied_t( - parent=parent or '', - dep=constraint.to_str(), - ) - else: - matched = False - for prov in resolved_pkg.parsed_provides(): - if prov.name == constraint.name: - if prov.version is not None and constraint.satisfied_by(prov.version): - matched = True - break - elif prov.version is None: - matched = True - break - if not matched: - raise resolver_t.error_t.unsatisfied_t( - parent=parent or '', - dep=constraint.to_str(), - ) - continue - - visited.add(pkg.name) - visited.add(constraint.name) - - result.resolved[pkg.name] = pkg - result.resolution_order.append(pkg.name) - - for conflict in pkg.parsed_conflicts(): - if conflict.name in result.resolved: - resolved_version = result.resolved[conflict.name].version - if conflict.satisfied_by(resolved_version): - raise resolver_t.error_t.conflict_t( - pkg_a=pkg.name, - pkg_b=conflict.name, - constraint=conflict.to_str(), - ) - - for dep in pkg.parsed_depends(): - if dep.name not in visited and dep.name not in skip_installed: - stack.append((dep, pkg.name)) - - return result diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py b/python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py deleted file mode 100644 index 3c89b80..0000000 --- a/python/online/fxreader/pr34/commands_typed/archlinux/solv_backend.py +++ /dev/null @@ -1,416 +0,0 @@ -import hashlib -import io -import logging -import pathlib -import re - -from typing import ( - ClassVar, - Optional, - Any, -) - -from .models import ( - package_desc_t, - repo_index_t, - compile_options_t, - compile_entry_t, - compile_result_t, - mirror_config_t, - resolution_strategy_t, -) - -from .db import db_parser_t -from .compile import compile_t as compile_base_t - -logger = logging.getLogger(__name__) - - -class repo_store_t: - class constants_t: - checksum_filename: ClassVar[str] = 'checksum.sha256' - - def __init__( - self, - index: repo_index_t, - db_checksum: str = '', - ) -> None: - self.index = index - self.db_checksum = db_checksum - - @staticmethod - def _file_checksum(path: pathlib.Path) -> str: - h = hashlib.sha256() - with io.open(path, 'rb') as f: - while True: - chunk = f.read(65536) - if not chunk: - break - h.update(chunk) - return h.hexdigest() - - @staticmethod - def from_db( - db_path: pathlib.Path, - repo_name: Optional[str] = None, - cache_dir: Optional[pathlib.Path] = None, - ) -> 'repo_store_t': - if repo_name is None: - repo_name = db_path.stem.split('.')[0] - - db_checksum = repo_store_t._file_checksum(db_path) - - if cache_dir is not None: - solv_cache_path = cache_dir / ('%s.solv' % repo_name) - checksum_path = cache_dir / ('%s.solv.sha256' % repo_name) - index_cache_path = cache_dir / ('%s.index.solv' % repo_name) - - if solv_cache_path.exists() and checksum_path.exists(): - stored_checksum = checksum_path.read_text().strip() - if stored_checksum == db_checksum: - logger.info( - dict( - repo=repo_name, - msg='using cached solv', - path=str(solv_cache_path), - ) - ) - - index = db_parser_t.parse_db_path(db_path, repo_name=repo_name) - - return repo_store_t( - index=index, - db_checksum=db_checksum, - ) - - index = db_parser_t.parse_db_path(db_path, repo_name=repo_name) - - return repo_store_t( - index=index, - db_checksum=db_checksum, - ) - - def write_solv_cache( - self, - cache_dir: pathlib.Path, - solv_repo: Any, - ) -> None: - import solv - - cache_dir.mkdir(parents=True, exist_ok=True) - solv_cache_path = cache_dir / ('%s.solv' % self.index.name) - checksum_path = cache_dir / ('%s.solv.sha256' % self.index.name) - - f = solv.xfopen(str(solv_cache_path), 'w') - solv_repo.write(f) - f.close() - - checksum_path.write_text(self.db_checksum) - - logger.info( - dict( - repo=self.index.name, - msg='wrote solv cache', - path=str(solv_cache_path), - size=solv_cache_path.stat().st_size, - ) - ) - - -class solv_pool_t: - class constants_t: - dep_re: ClassVar[re.Pattern[str]] = re.compile(r'^([a-zA-Z0-9@._+\-]+?)(?:(>=|<=|>|<|=)(.+))?$') - - def __init__( - self, - stores: Optional[list[repo_store_t]] = None, - cache_dir: Optional[pathlib.Path] = None, - ) -> None: - import solv - - self._solv = solv - self._pool = solv.Pool() - self._pool.setdisttype(solv.Pool.DISTTYPE_ARCH) - self._pool.setarch('x86_64') - self._rel_map = { - '>=': solv.REL_GT | solv.REL_EQ, - '<=': solv.REL_LT | solv.REL_EQ, - '>': solv.REL_GT, - '<': solv.REL_LT, - '=': solv.REL_EQ, - } - self._stores: list[repo_store_t] = [] - - if stores is not None: - for store in stores: - self.add_store(store, cache_dir=cache_dir) - self.finalize() - - def _parse_dep(self, dep_str: str) -> Any: - m = solv_pool_t.constants_t.dep_re.match(dep_str.strip()) - if not m: - return self._pool.str2id(dep_str) - - name = m.group(1) - op = m.group(2) - ver = m.group(3) - - name_id = self._pool.str2id(name) - - if op and ver: - ver_id = self._pool.str2id(ver) - return self._pool.rel2id(name_id, ver_id, self._rel_map[op]) - - return name_id - - def add_store( - self, - store: repo_store_t, - cache_dir: Optional[pathlib.Path] = None, - ) -> None: - solv = self._solv - - self._stores.append(store) - - loaded_from_cache = False - - if cache_dir is not None: - solv_cache_path = cache_dir / ('%s.solv' % store.index.name) - checksum_path = cache_dir / ('%s.solv.sha256' % store.index.name) - - if solv_cache_path.exists() and checksum_path.exists(): - stored_checksum = checksum_path.read_text().strip() - if stored_checksum == store.db_checksum: - repo = self._pool.add_repo(store.index.name) - f = solv.xfopen(str(solv_cache_path)) - repo.add_solv(f) - f.close() - loaded_from_cache = True - - logger.info( - dict( - repo=store.index.name, - msg='loaded solv from cache', - solvables=repo.nsolvables, - ) - ) - - if not loaded_from_cache: - repo = self._pool.add_repo(store.index.name) - for pkg in store.index.packages.values(): - s = repo.add_solvable() - s.name = pkg.name - s.evr = pkg.version - s.arch = 'noarch' if pkg.arch == 'any' else (pkg.arch or 'x86_64') - - for dep_str in pkg.depends: - s.add_requires(self._parse_dep(dep_str)) - - for prov_str in pkg.provides: - s.add_provides(self._parse_dep(prov_str)) - - s.add_provides(self._pool.rel2id(s.nameid, s.evrid, solv.REL_EQ)) - - for conf_str in pkg.conflicts: - s.add_conflicts(self._parse_dep(conf_str)) - - repo.internalize() - - if cache_dir is not None: - store.write_solv_cache(cache_dir, repo) - - def finalize(self) -> None: - self._pool.createwhatprovides() - - class resolve_t: - class res_t: - def __init__(self) -> None: - self.resolved: dict[str, Any] = {} - self.problems: list[str] = [] - - def expand_groups( - self, - packages: list[str], - ) -> list[str]: - expanded: list[str] = [] - for pkg_name in packages: - found_group = False - for store in self._stores: - if pkg_name in store.index.groups_index: - expanded.extend(store.index.groups_index[pkg_name]) - found_group = True - break - if not found_group: - expanded.append(pkg_name) - return expanded - - @staticmethod - def parse_reference(txt: str) -> dict[str, str]: - pinned: dict[str, str] = {} - for line in txt.splitlines(): - line = line.strip() - if line == '' or line.startswith('#'): - continue - parts = line.split() - pkg_spec = parts[0] - if '==' in pkg_spec: - name, version = pkg_spec.split('==', 1) - pinned[name] = version - return pinned - - def resolve( - self, - packages: list[str], - expand_groups: bool = True, - pinned: Optional[dict[str, str]] = None, - upgrade_packages: Optional[list[str]] = None, - ) -> 'solv_pool_t.resolve_t.res_t': - solv = self._solv - - if expand_groups: - packages = self.expand_groups(packages) - - result = solv_pool_t.resolve_t.res_t() - - solver = self._pool.Solver() - jobs: list[Any] = [] - - upgrade_set: set[str] = set() - if upgrade_packages is not None: - if expand_groups: - upgrade_packages = self.expand_groups(upgrade_packages) - upgrade_set = set(upgrade_packages) - - for pkg_spec in packages: - pkg_name = pkg_spec.split('>=')[0].split('<=')[0].split('>')[0].split('<')[0].split('=')[0] - - if pinned is not None and pkg_name in pinned and pkg_name not in upgrade_set: - pinned_spec = '%s=%s' % (pkg_name, pinned[pkg_name]) - dep = self._parse_dep(pinned_spec) - jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep)) - else: - dep = self._parse_dep(pkg_spec) - - sel = self._pool.select(pkg_name, solv.Selection.SELECTION_NAME | solv.Selection.SELECTION_PROVIDES) - if sel.isempty(): - result.problems.append('package not found: %s' % pkg_spec) - continue - - if pkg_name != pkg_spec: - jobs.append(self._pool.Job(solv.Job.SOLVER_INSTALL | solv.Job.SOLVER_SOLVABLE_PROVIDES, dep)) - else: - jobs += sel.jobs(solv.Job.SOLVER_INSTALL) - - if len(result.problems) > 0: - return result - - problems = solver.solve(jobs) - - if problems: - for p in problems: - result.problems.append(str(p)) - return result - - trans = solver.transaction() - for s in trans.newsolvables(): - result.resolved[s.name] = s - - return result - - -class compile_solv_t: - @staticmethod - def compile( - options: compile_options_t, - stores: Optional[list[repo_store_t]] = None, - ) -> compile_result_t.res_t: - mirror = compile_base_t.build_mirror_config(options) - - cache_dir: Optional[pathlib.Path] = None - if options.cache_dir is not None: - cache_dir = pathlib.Path(options.cache_dir) - cache_dir.mkdir(parents=True, exist_ok=True) - - if stores is None: - indices = compile_base_t.fetch_indices( - mirror=mirror, - cache_dir=cache_dir, - no_cache=options.no_cache, - offline=options.offline, - ) - stores = [repo_store_t(index=idx) for idx in indices] - - pool = solv_pool_t(stores=stores, cache_dir=cache_dir) - - pinned: Optional[dict[str, str]] = None - upgrade_packages: Optional[list[str]] = None - - if options.reference is not None: - ref_txt = pathlib.Path(options.reference).read_text() - pinned = solv_pool_t.parse_reference(ref_txt) - - if options.resolution_strategy is resolution_strategy_t.pin_referenced: - upgrade_packages = options.packages - packages = list(pinned.keys()) + [p for p in options.packages if p not in pinned] - else: - packages = options.packages - else: - packages = options.packages - - resolved = pool.resolve( - packages, - pinned=pinned if options.resolution_strategy is resolution_strategy_t.pin_referenced else None, - upgrade_packages=upgrade_packages, - ) - - if len(resolved.problems) > 0: - raise RuntimeError('resolution failed with %d problem(s):\n%s' % (len(resolved.problems), '\n'.join(resolved.problems))) - - result = compile_result_t.res_t() - - for pkg_name, solvable in resolved.resolved.items(): - repo_name = solvable.repo.name if solvable.repo else '' - - pkg_desc: Optional[package_desc_t] = None - for store in stores: - candidate = store.index.packages.get(pkg_name) - if candidate is not None and candidate.version == solvable.evr: - pkg_desc = candidate - if store.index.name == repo_name: - break - - filename = pkg_desc.filename if pkg_desc else '' - sha256 = (pkg_desc.sha256sum if pkg_desc else '') if options.generate_hashes else '' - - url = '' - if filename: - repo_url = '' - for repo_cfg in mirror.repos: - if repo_cfg.name == repo_name: - repo_url = repo_cfg.url - break - - if repo_url: - url = '%s/%s' % (repo_url, filename) - else: - url = 'https://archive.archlinux.org/packages/%s/%s/%s' % ( - pkg_name[0], - pkg_name, - filename, - ) - - entry = compile_entry_t( - name=pkg_name, - version=solvable.evr, - filename=filename, - repo=repo_name, - url=url, - sha256=sha256, - depends=pkg_desc.depends if pkg_desc else [], - ) - - result.entries.append(entry) - - result.txt = result.to_txt() - - return result