diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/archive.py b/python/online/fxreader/pr34/commands_typed/archlinux/archive.py new file mode 100644 index 0000000..6ce5725 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/archive.py @@ -0,0 +1,294 @@ +import argparse +import datetime +import enum +import logging +import pathlib +import re + +from typing import ( + ClassVar, + Optional, +) + +from .cache_db import cache_db_t +from .db import db_parser_t +from .models import mirror_config_t +from .pacman import pacman_t + +logger = logging.getLogger(__name__) + + +class ArchiveAction(enum.Enum): + list_dates = 'list-dates' + list_packages = 'list-packages' + show_versions = 'show-versions' + sync = 'sync' + + +class archive_t: + class constants_t: + base_url: ClassVar[str] = 'https://archive.archlinux.org/repos/' + href_re: ClassVar[re.Pattern[str]] = re.compile(r'href="(\d{4}/\d{2}/\d{2})/"') + default_repos: ClassVar[list[str]] = ['core', 'extra', 'multilib'] + + @staticmethod + def list_remote_dates( + base_url: Optional[str] = None, + ) -> list[str]: + """Scrape available date directories from the archive index page.""" + import urllib.request + + if base_url is None: + base_url = archive_t.constants_t.base_url + + logger.info(dict(msg='fetching archive index', url=base_url)) + + with urllib.request.urlopen(base_url) as resp: + html = resp.read().decode('utf-8') + + dates: list[str] = [] + for m in archive_t.constants_t.href_re.finditer(html): + dates.append(m.group(1)) + + dates.sort(reverse=True) + return dates + + @staticmethod + def sync_date( + date: str, + cache_dir: pathlib.Path, + cache_db: cache_db_t, + repos: Optional[list[str]] = None, + arch: str = 'x86_64', + ) -> None: + if repos is None: + repos = list(archive_t.constants_t.default_repos) + + mirror = mirror_config_t.from_archive_date( + date=date, + repos=repos, + arch=arch, + ) + + db_dir = cache_dir / date + db_dir.mkdir(parents=True, exist_ok=True) + + for repo_cfg in mirror.repos: + db_url = '%s/%s.db' % (repo_cfg.url, repo_cfg.name) + db_path = db_dir / ('%s.db' % repo_cfg.name) + db_rel_path = '%s/%s.db' % (date, repo_cfg.name) + + if not db_path.exists(): + logger.info( + dict( + msg='downloading db', + url=db_url, + dest=str(db_path), + ) + ) + pacman_t.download_db(db_url, db_path) + else: + logger.info( + dict( + msg='db already cached on disk', + path=str(db_path), + ) + ) + + db_sha256 = cache_db_t.file_sha256(db_path) + + snapshot_id = cache_db.upsert_snapshot( + date=date, + repo=repo_cfg.name, + arch=arch, + db_sha256=db_sha256, + db_rel_path=db_rel_path, + ) + + if cache_db.snapshot_package_count(snapshot_id) > 0: + snap = cache_db.get_snapshot_by_id(snapshot_id) + if snap is not None and snap.db_sha256 == db_sha256: + logger.info( + dict( + msg='snapshot already in sqlite', + date=date, + repo=repo_cfg.name, + snapshot_id=snapshot_id, + ) + ) + continue + + index = db_parser_t.parse_db_path(db_path, repo_name=repo_cfg.name) + + cache_db.store_index( + snapshot_id=snapshot_id, + index=index, + ) + + logger.info( + dict( + msg='synced', + date=date, + repo=repo_cfg.name, + packages=len(index.packages), + ) + ) + + @staticmethod + def _parse_date(s: str) -> datetime.date: + parts = s.split('/') + if len(parts) == 3: + return datetime.date(int(parts[0]), int(parts[1]), int(parts[2])) + return datetime.date.fromisoformat(s) + + @staticmethod + def _format_date(d: datetime.date) -> str: + return '%04d/%02d/%02d' % (d.year, d.month, d.day) + + @staticmethod + def sync_date_range( + start_date: str, + end_date: str, + cache_dir: pathlib.Path, + cache_db: cache_db_t, + repos: Optional[list[str]] = None, + arch: str = 'x86_64', + step_days: int = 1, + ) -> None: + start = archive_t._parse_date(start_date) + end = archive_t._parse_date(end_date) + step = datetime.timedelta(days=step_days) + + current = end + while current >= start: + date_str = archive_t._format_date(current) + + try: + archive_t.sync_date( + date=date_str, + cache_dir=cache_dir, + cache_db=cache_db, + repos=repos, + arch=arch, + ) + except Exception: + logger.warning( + dict( + msg='failed to sync date, skipping', + date=date_str, + ), + exc_info=True, + ) + + current -= step + + +def main(args: list[str]) -> int: + archive_parser = argparse.ArgumentParser( + prog='online-fxreader-pr34-archlinux archive', + ) + archive_parser.add_argument( + 'action', + choices=[o.value for o in ArchiveAction], + ) + archive_parser.add_argument( + '--cache-dir', + dest='cache_dir', + required=True, + help='directory for cached .db files and sqlite database', + ) + archive_parser.add_argument( + '--repos', + nargs='*', + default=['core', 'extra', 'multilib'], + ) + archive_parser.add_argument( + '--arch', + default='x86_64', + ) + archive_parser.add_argument( + '--date', + default=None, + help='single date (e.g. 2024/01/15) for sync', + ) + archive_parser.add_argument( + '--date-range', + dest='date_range', + nargs=2, + metavar=('START', 'END'), + default=None, + help='date range for sync (e.g. 2024/01/01 2024/06/30)', + ) + archive_parser.add_argument( + '--date-step', + dest='date_step', + type=int, + default=1, + help='step in days when iterating date range, default 1', + ) + archive_parser.add_argument( + '--packages', + nargs='*', + default=None, + help='package names for show-versions', + ) + + archive_options = archive_parser.parse_args(args) + archive_options.action = ArchiveAction(archive_options.action) + + cache_dir = pathlib.Path(archive_options.cache_dir) + cache_dir.mkdir(parents=True, exist_ok=True) + + db = cache_db_t(cache_dir / 'archlinux_cache.db') + + try: + if archive_options.action is ArchiveAction.list_dates: + if db.has_data(): + print('=== cached dates ===') + for date_str in db.list_dates(): + print(date_str) + + print('=== remote dates ===') + for date_str in archive_t.list_remote_dates(): + print(date_str) + + elif archive_options.action is ArchiveAction.list_packages: + for row in db.package_count_per_date(): + print('%s %d' % (row.date, row.count)) + + elif archive_options.action is ArchiveAction.show_versions: + if archive_options.packages is None or len(archive_options.packages) == 0: + logger.error('--packages required for show-versions') + return 1 + + for row in db.get_package_versions(archive_options.packages): + print('%s %s %s %s' % (row.date, row.repo, row.name, row.version)) + + elif archive_options.action is ArchiveAction.sync: + if archive_options.date is not None: + archive_t.sync_date( + date=archive_options.date, + cache_dir=cache_dir, + cache_db=db, + repos=archive_options.repos, + arch=archive_options.arch, + ) + elif archive_options.date_range is not None: + archive_t.sync_date_range( + start_date=archive_options.date_range[0], + end_date=archive_options.date_range[1], + cache_dir=cache_dir, + cache_db=db, + repos=archive_options.repos, + arch=archive_options.arch, + step_days=archive_options.date_step, + ) + else: + logger.error('sync requires --date or --date-range') + return 1 + else: + raise NotImplementedError + finally: + db.close() + + return 0 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/cache_db.py b/python/online/fxreader/pr34/commands_typed/archlinux/cache_db.py new file mode 100644 index 0000000..03d60dc --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/cache_db.py @@ -0,0 +1,691 @@ +import datetime +import hashlib +import io +import logging +import pathlib +import sqlite3 + +from typing import ( + ClassVar, + Generator, + Optional, + TypeVar, +) + +import pydantic + +from .models import ( + package_desc_t, + repo_index_t, +) + +logger = logging.getLogger(__name__) + +_T = TypeVar('_T', bound=pydantic.BaseModel) + + +class snapshot_row_t(pydantic.BaseModel): + id: int + date: str + repo: str + arch: str + db_sha256: str + db_rel_path: str + synced_at: str + + +class package_row_t(pydantic.BaseModel): + id: int + snapshot_id: int + name: str + version: str + base: str = '' + desc: str = '' + filename: str = '' + csize: int = 0 + isize: int = 0 + md5sum: str = '' + sha256sum: str = '' + url: str = '' + arch: str = '' + builddate: int = 0 + packager: str = '' + + +class package_version_row_t(pydantic.BaseModel): + date: str + repo: str + name: str + version: str + + +class date_count_row_t(pydantic.BaseModel): + date: str + count: int + + +class package_hash_row_t(pydantic.BaseModel): + sha256sum: str + + +class local_package_row_t(pydantic.BaseModel): + id: int + name: str + version: str + filename: str + sha256sum: str + local_path: str + downloaded_at: str + + +class signature_row_t(pydantic.BaseModel): + id: int + local_package_id: int + sig_path: str + keyring_package_version: Optional[str] = None + gpg_key_id: Optional[str] = None + verified_at: Optional[str] = None + + +class trusted_entry_t(pydantic.BaseModel, frozen=True): + name: str + version: str + + +def _stream_rows( + cur: sqlite3.Cursor, + model: type[_T], +) -> Generator[_T, None, None]: + columns = [desc[0] for desc in cur.description] + for raw in cur: + yield model.model_validate(dict(zip(columns, raw))) + + +def _fetch_one( + cur: sqlite3.Cursor, + model: type[_T], +) -> Optional[_T]: + columns = [desc[0] for desc in cur.description] + raw = cur.fetchone() + if raw is None: + return None + return model.model_validate(dict(zip(columns, raw))) + + +class cache_db_t: + class constants_t: + schema_version: ClassVar[int] = 1 + + list_relation_types: ClassVar[dict[str, str]] = { + 'license': 'license', + 'depends': 'depends', + 'optdepends': 'optdepends', + 'makedepends': 'makedepends', + 'checkdepends': 'checkdepends', + 'provides': 'provides', + 'conflicts': 'conflicts', + 'replaces': 'replaces', + 'groups': 'groups', + } + + def __init__(self, db_path: pathlib.Path) -> None: + self._db_path = db_path + self._conn = sqlite3.connect(str(db_path)) + self._conn.execute('PRAGMA journal_mode=WAL') + self._conn.execute('PRAGMA foreign_keys=ON') + self._ensure_schema() + + def close(self) -> None: + self._conn.close() + + def _ensure_schema(self) -> None: + cur = self._conn.cursor() + + cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='schema_meta'") + if cur.fetchone() is None: + self._create_schema(cur) + self._conn.commit() + return + + cur.execute('SELECT version FROM schema_meta LIMIT 1') + row = cur.fetchone() + if row is None or row[0] < cache_db_t.constants_t.schema_version: + self._create_schema(cur) + self._conn.commit() + + def _create_schema(self, cur: sqlite3.Cursor) -> None: + cur.executescript(""" + CREATE TABLE IF NOT EXISTS schema_meta ( + version INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, + repo TEXT NOT NULL, + arch TEXT NOT NULL DEFAULT 'x86_64', + db_sha256 TEXT NOT NULL, + db_rel_path TEXT NOT NULL DEFAULT '', + synced_at TEXT NOT NULL, + UNIQUE(date, repo, arch) + ); + + CREATE TABLE IF NOT EXISTS packages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + snapshot_id INTEGER NOT NULL REFERENCES snapshots(id) ON DELETE CASCADE, + name TEXT NOT NULL, + version TEXT NOT NULL, + base TEXT NOT NULL DEFAULT '', + desc TEXT NOT NULL DEFAULT '', + filename TEXT NOT NULL DEFAULT '', + csize INTEGER NOT NULL DEFAULT 0, + isize INTEGER NOT NULL DEFAULT 0, + md5sum TEXT NOT NULL DEFAULT '', + sha256sum TEXT NOT NULL DEFAULT '', + url TEXT NOT NULL DEFAULT '', + arch TEXT NOT NULL DEFAULT '', + builddate INTEGER NOT NULL DEFAULT 0, + packager TEXT NOT NULL DEFAULT '', + UNIQUE(snapshot_id, name) + ); + + CREATE TABLE IF NOT EXISTS package_relations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE, + relation_type TEXT NOT NULL, + value TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS local_packages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + version TEXT NOT NULL, + filename TEXT NOT NULL, + sha256sum TEXT NOT NULL DEFAULT '', + local_path TEXT NOT NULL, + downloaded_at TEXT NOT NULL, + UNIQUE(name, version, filename) + ); + + CREATE TABLE IF NOT EXISTS local_signatures ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + local_package_id INTEGER NOT NULL REFERENCES local_packages(id) ON DELETE CASCADE, + sig_path TEXT NOT NULL, + keyring_package_version TEXT DEFAULT NULL, + gpg_key_id TEXT DEFAULT NULL, + verified_at TEXT DEFAULT NULL, + UNIQUE(local_package_id) + ); + + CREATE INDEX IF NOT EXISTS idx_packages_snapshot ON packages(snapshot_id); + CREATE INDEX IF NOT EXISTS idx_packages_name ON packages(name); + CREATE INDEX IF NOT EXISTS idx_packages_name_version ON packages(name, version); + CREATE INDEX IF NOT EXISTS idx_snapshots_date ON snapshots(date); + CREATE INDEX IF NOT EXISTS idx_package_relations_pkg + ON package_relations(package_id, relation_type); + CREATE INDEX IF NOT EXISTS idx_local_packages_name_version + ON local_packages(name, version); + """) + + cur.execute('DELETE FROM schema_meta') + cur.execute( + 'INSERT INTO schema_meta (version) VALUES (?)', + (cache_db_t.constants_t.schema_version,), + ) + + # ── helpers ── + + @staticmethod + def file_sha256(path: pathlib.Path) -> str: + h = hashlib.sha256() + with io.open(path, 'rb') as f: + while True: + chunk = f.read(65536) + if not chunk: + break + h.update(chunk) + return h.hexdigest() + + # ── snapshot CRUD ── + + def upsert_snapshot( + self, + date: str, + repo: str, + arch: str, + db_sha256: str, + db_rel_path: str = '', + ) -> int: + now = datetime.datetime.now(datetime.timezone.utc).isoformat() + cur = self._conn.cursor() + + cur.execute( + 'SELECT id, db_sha256 FROM snapshots WHERE date=? AND repo=? AND arch=?', + (date, repo, arch), + ) + row = cur.fetchone() + + if row is not None: + snapshot_id: int = row[0] + if row[1] == db_sha256: + return snapshot_id + + cur.execute( + 'DELETE FROM packages WHERE snapshot_id=?', + (snapshot_id,), + ) + cur.execute( + 'UPDATE snapshots SET db_sha256=?, db_rel_path=?, synced_at=? WHERE id=?', + (db_sha256, db_rel_path, now, snapshot_id), + ) + self._conn.commit() + return snapshot_id + + cur.execute( + 'INSERT INTO snapshots (date, repo, arch, db_sha256, db_rel_path, synced_at) VALUES (?, ?, ?, ?, ?, ?)', + (date, repo, arch, db_sha256, db_rel_path, now), + ) + self._conn.commit() + assert cur.lastrowid is not None + return cur.lastrowid + + def get_snapshot( + self, + date: str, + repo: str, + arch: str, + ) -> Optional[snapshot_row_t]: + cur = self._conn.cursor() + cur.execute( + 'SELECT * FROM snapshots WHERE date=? AND repo=? AND arch=?', + (date, repo, arch), + ) + return _fetch_one(cur, snapshot_row_t) + + def get_snapshot_by_id( + self, + snapshot_id: int, + ) -> Optional[snapshot_row_t]: + cur = self._conn.cursor() + cur.execute( + 'SELECT * FROM snapshots WHERE id=?', + (snapshot_id,), + ) + return _fetch_one(cur, snapshot_row_t) + + def list_snapshots(self) -> Generator[snapshot_row_t, None, None]: + cur = self._conn.cursor() + cur.execute('SELECT * FROM snapshots ORDER BY date DESC, repo') + yield from _stream_rows(cur, snapshot_row_t) + + def list_dates(self) -> list[str]: + cur = self._conn.cursor() + cur.execute('SELECT DISTINCT date FROM snapshots ORDER BY date DESC') + return [row[0] for row in cur.fetchall()] + + def snapshot_package_count(self, snapshot_id: int) -> int: + cur = self._conn.cursor() + cur.execute( + 'SELECT COUNT(*) FROM packages WHERE snapshot_id=?', + (snapshot_id,), + ) + row = cur.fetchone() + return row[0] if row is not None else 0 + + # ── package CRUD ── + + def store_index( + self, + snapshot_id: int, + index: repo_index_t, + ) -> None: + cur = self._conn.cursor() + + pkg_rows: list[tuple[int, str, str, str, str, str, int, int, str, str, str, str, int, str]] = [] + for pkg in index.packages.values(): + pkg_rows.append( + ( + snapshot_id, + pkg.name, + pkg.version, + pkg.base, + pkg.desc, + pkg.filename, + pkg.csize, + pkg.isize, + pkg.md5sum, + pkg.sha256sum, + pkg.url, + pkg.arch, + pkg.builddate, + pkg.packager, + ) + ) + + cur.executemany( + 'INSERT OR REPLACE INTO packages ' + '(snapshot_id, name, version, base, desc, filename, csize, isize, ' + 'md5sum, sha256sum, url, arch, builddate, packager) ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', + pkg_rows, + ) + + cur.execute( + 'SELECT id, name FROM packages WHERE snapshot_id=?', + (snapshot_id,), + ) + pkg_id_map: dict[str, int] = {} + for row_raw in cur.fetchall(): + pkg_id_map[row_raw[1]] = row_raw[0] + + rel_rows: list[tuple[int, str, str]] = [] + for pkg in index.packages.values(): + pkg_id = pkg_id_map.get(pkg.name) + if pkg_id is None: + continue + + for rel_type, attr_name in cache_db_t.constants_t.list_relation_types.items(): + values: list[str] = getattr(pkg, attr_name) + for v in values: + rel_rows.append((pkg_id, rel_type, v)) + + if len(rel_rows) > 0: + cur.executemany( + 'INSERT INTO package_relations (package_id, relation_type, value) VALUES (?, ?, ?)', + rel_rows, + ) + + self._conn.commit() + + logger.info( + dict( + msg='stored index', + snapshot_id=snapshot_id, + packages=len(pkg_rows), + relations=len(rel_rows), + ) + ) + + def package_count_per_date(self) -> Generator[date_count_row_t, None, None]: + cur = self._conn.cursor() + cur.execute('SELECT s.date AS date, COUNT(p.id) AS count FROM snapshots s JOIN packages p ON p.snapshot_id = s.id GROUP BY s.date ORDER BY s.date DESC') + yield from _stream_rows(cur, date_count_row_t) + + def get_package_versions( + self, + names: list[str], + ) -> Generator[package_version_row_t, None, None]: + if len(names) == 0: + yield from () + return + + cur = self._conn.cursor() + placeholders = ','.join('?' for _ in names) + cur.execute( + 'SELECT s.date AS date, s.repo AS repo, p.name AS name, p.version AS version ' + 'FROM packages p ' + 'JOIN snapshots s ON s.id = p.snapshot_id ' + 'WHERE p.name IN (%s) ' + 'ORDER BY p.name, s.date DESC' % placeholders, + names, + ) + yield from _stream_rows(cur, package_version_row_t) + + def find_package_hash( + self, + name: str, + version: str, + ) -> Optional[package_hash_row_t]: + cur = self._conn.cursor() + cur.execute( + "SELECT sha256sum FROM packages WHERE name=? AND version=? AND sha256sum != '' ORDER BY snapshot_id DESC LIMIT 1", + (name, version), + ) + return _fetch_one(cur, package_hash_row_t) + + # ── repo_index_t loading ── + + def load_repo_index( + self, + snapshot_id: int, + repo_name: str, + ) -> repo_index_t: + cur = self._conn.cursor() + + cur.execute( + 'SELECT * FROM packages WHERE snapshot_id=?', + (snapshot_id,), + ) + + index = repo_index_t(name=repo_name) + + pkg_ids: list[int] = [] + pkg_by_id: dict[int, package_desc_t] = {} + + columns = [desc[0] for desc in cur.description] + for raw in cur.fetchall(): + row_dict = dict(zip(columns, raw)) + pkg = package_desc_t( + name=row_dict['name'], + version=row_dict['version'], + base=row_dict['base'], + desc=row_dict['desc'], + filename=row_dict['filename'], + csize=row_dict['csize'], + isize=row_dict['isize'], + md5sum=row_dict['md5sum'], + sha256sum=row_dict['sha256sum'], + url=row_dict['url'], + arch=row_dict['arch'], + builddate=row_dict['builddate'], + packager=row_dict['packager'], + ) + index.packages[pkg.name] = pkg + pkg_ids.append(row_dict['id']) + pkg_by_id[row_dict['id']] = pkg + + if len(pkg_ids) > 0: + self._load_relations(cur, pkg_ids, pkg_by_id) + + index.build_provides_index() + return index + + def load_all_indices(self) -> list[repo_index_t]: + """Load all snapshots as repo_index_t objects via bulk queries. + + Returns one index per (snapshot_id, repo) so the solver sees all + package versions across all synced dates. Uses two bulk queries + instead of per-snapshot loading for performance. + """ + cur = self._conn.cursor() + + cur.execute('SELECT * FROM snapshots ORDER BY date ASC') + snap_columns = [desc[0] for desc in cur.description] + snapshots = [dict(zip(snap_columns, raw)) for raw in cur.fetchall()] + + cur.execute( + 'SELECT id, snapshot_id, name, version, base, desc, filename, ' + 'csize, isize, md5sum, sha256sum, url, arch, builddate, packager ' + 'FROM packages' + ) + pkg_columns = [desc[0] for desc in cur.description] + + pkgs_by_snapshot: dict[int, dict[str, package_desc_t]] = {} + all_pkg_ids: list[int] = [] + pkg_by_id: dict[int, package_desc_t] = {} + + for raw in cur.fetchall(): + rd = dict(zip(pkg_columns, raw)) + pkg = package_desc_t( + name=rd['name'], + version=rd['version'], + base=rd['base'], + desc=rd['desc'], + filename=rd['filename'], + csize=rd['csize'], + isize=rd['isize'], + md5sum=rd['md5sum'], + sha256sum=rd['sha256sum'], + url=rd['url'], + arch=rd['arch'], + builddate=rd['builddate'], + packager=rd['packager'], + ) + snap_id: int = rd['snapshot_id'] + if snap_id not in pkgs_by_snapshot: + pkgs_by_snapshot[snap_id] = {} + pkgs_by_snapshot[snap_id][pkg.name] = pkg + all_pkg_ids.append(rd['id']) + pkg_by_id[rd['id']] = pkg + + if len(all_pkg_ids) > 0: + self._load_relations(cur, all_pkg_ids, pkg_by_id) + + indices: list[repo_index_t] = [] + for snap in snapshots: + pkgs = pkgs_by_snapshot.get(snap['id']) + if pkgs is None or len(pkgs) == 0: + continue + idx = repo_index_t(name=snap['repo'], packages=pkgs) + idx.build_provides_index() + indices.append(idx) + + return indices + + def _load_relations( + self, + cur: sqlite3.Cursor, + pkg_ids: list[int], + pkg_by_id: dict[int, package_desc_t], + ) -> None: + batch_size = 500 + for i in range(0, len(pkg_ids), batch_size): + batch = pkg_ids[i : i + batch_size] + placeholders = ','.join('?' for _ in batch) + cur.execute( + 'SELECT package_id, relation_type, value FROM package_relations WHERE package_id IN (%s)' % placeholders, + batch, + ) + for row_raw in cur.fetchall(): + pkg = pkg_by_id.get(row_raw[0]) + if pkg is None: + continue + + attr_name = cache_db_t.constants_t.list_relation_types.get(row_raw[1]) + if attr_name is None: + continue + + target_list: list[str] = getattr(pkg, attr_name) + target_list.append(row_raw[2]) + + # ── local packages & signatures ── + + def record_local_package( + self, + name: str, + version: str, + filename: str, + sha256sum: str, + local_path: str, + ) -> int: + now = datetime.datetime.now(datetime.timezone.utc).isoformat() + cur = self._conn.cursor() + cur.execute( + 'INSERT OR REPLACE INTO local_packages (name, version, filename, sha256sum, local_path, downloaded_at) VALUES (?, ?, ?, ?, ?, ?)', + (name, version, filename, sha256sum, local_path, now), + ) + self._conn.commit() + assert cur.lastrowid is not None + return cur.lastrowid + + def record_signature( + self, + local_package_id: int, + sig_path: str, + keyring_package_version: Optional[str] = None, + gpg_key_id: Optional[str] = None, + ) -> None: + now = datetime.datetime.now(datetime.timezone.utc).isoformat() + cur = self._conn.cursor() + cur.execute( + 'INSERT OR REPLACE INTO local_signatures (local_package_id, sig_path, keyring_package_version, gpg_key_id, verified_at) VALUES (?, ?, ?, ?, ?)', + (local_package_id, sig_path, keyring_package_version, gpg_key_id, now), + ) + self._conn.commit() + + def get_signature_info( + self, + name: str, + version: str, + ) -> Optional[signature_row_t]: + cur = self._conn.cursor() + cur.execute( + 'SELECT ls.id, ls.local_package_id, ls.sig_path, ' + 'ls.keyring_package_version, ls.gpg_key_id, ls.verified_at ' + 'FROM local_signatures ls ' + 'JOIN local_packages lp ON lp.id = ls.local_package_id ' + 'WHERE lp.name=? AND lp.version=?', + (name, version), + ) + return _fetch_one(cur, signature_row_t) + + def get_trusted_package_set( + self, + trust_keyring_versions: Optional[list[str]] = None, + trust_gpg_keys: Optional[list[str]] = None, + exclude_keyring_versions: Optional[list[str]] = None, + exclude_gpg_keys: Optional[list[str]] = None, + ) -> Optional[set[trusted_entry_t]]: + """Return set of trusted (name, version) entries that pass trust filters. + + Returns None if no trust filters are set (meaning all packages pass). + """ + has_filters = ( + (trust_keyring_versions is not None and len(trust_keyring_versions) > 0) + or (trust_gpg_keys is not None and len(trust_gpg_keys) > 0) + or (exclude_keyring_versions is not None and len(exclude_keyring_versions) > 0) + or (exclude_gpg_keys is not None and len(exclude_gpg_keys) > 0) + ) + if not has_filters: + return None + + cur = self._conn.cursor() + cur.execute( + 'SELECT lp.name, lp.version, ls.keyring_package_version, ls.gpg_key_id ' + 'FROM local_packages lp ' + 'JOIN local_signatures ls ON ls.local_package_id = lp.id' + ) + + trusted: set[trusted_entry_t] = set() + + for row_raw in cur.fetchall(): + keyring_ver = row_raw[2] + gpg_key = row_raw[3] + + if exclude_keyring_versions and keyring_ver in exclude_keyring_versions: + continue + if exclude_gpg_keys and gpg_key in exclude_gpg_keys: + continue + + is_trusted = False + + if trust_keyring_versions and keyring_ver in trust_keyring_versions: + is_trusted = True + if trust_gpg_keys and gpg_key in trust_gpg_keys: + is_trusted = True + + if not trust_keyring_versions and not trust_gpg_keys: + is_trusted = True + + if is_trusted: + trusted.add(trusted_entry_t(name=row_raw[0], version=row_raw[1])) + + return trusted + + # ── status ── + + def has_data(self) -> bool: + cur = self._conn.cursor() + cur.execute('SELECT COUNT(*) FROM snapshots') + row = cur.fetchone() + return row is not None and row[0] > 0