[+] add sqlite cache and archive sync for archlinux package

1. add cache_db.py: sqlite3 ORM with pydantic row models, typed streaming;
  2. add archive.py: archive sync logic, list-dates, list-packages, show-versions cli;
  3. cache_db stores snapshots, packages, relations, local_packages, signatures;
  4. archive.py delegates cli subparser to archive.main(), keeps cli.py thin;
  5. sync downloads .db files per date, parses into sqlite, skips unchanged;
  6. supports date ranges with configurable step;
This commit is contained in:
LLM 2026-04-03 16:45:34 +00:00
parent 64845d732d
commit b67a40936b
2 changed files with 985 additions and 0 deletions

@ -0,0 +1,294 @@
import argparse
import datetime
import enum
import logging
import pathlib
import re
from typing import (
ClassVar,
Optional,
)
from .cache_db import cache_db_t
from .db import db_parser_t
from .models import mirror_config_t
from .pacman import pacman_t
logger = logging.getLogger(__name__)
class ArchiveAction(enum.Enum):
list_dates = 'list-dates'
list_packages = 'list-packages'
show_versions = 'show-versions'
sync = 'sync'
class archive_t:
class constants_t:
base_url: ClassVar[str] = 'https://archive.archlinux.org/repos/'
href_re: ClassVar[re.Pattern[str]] = re.compile(r'href="(\d{4}/\d{2}/\d{2})/"')
default_repos: ClassVar[list[str]] = ['core', 'extra', 'multilib']
@staticmethod
def list_remote_dates(
base_url: Optional[str] = None,
) -> list[str]:
"""Scrape available date directories from the archive index page."""
import urllib.request
if base_url is None:
base_url = archive_t.constants_t.base_url
logger.info(dict(msg='fetching archive index', url=base_url))
with urllib.request.urlopen(base_url) as resp:
html = resp.read().decode('utf-8')
dates: list[str] = []
for m in archive_t.constants_t.href_re.finditer(html):
dates.append(m.group(1))
dates.sort(reverse=True)
return dates
@staticmethod
def sync_date(
date: str,
cache_dir: pathlib.Path,
cache_db: cache_db_t,
repos: Optional[list[str]] = None,
arch: str = 'x86_64',
) -> None:
if repos is None:
repos = list(archive_t.constants_t.default_repos)
mirror = mirror_config_t.from_archive_date(
date=date,
repos=repos,
arch=arch,
)
db_dir = cache_dir / date
db_dir.mkdir(parents=True, exist_ok=True)
for repo_cfg in mirror.repos:
db_url = '%s/%s.db' % (repo_cfg.url, repo_cfg.name)
db_path = db_dir / ('%s.db' % repo_cfg.name)
db_rel_path = '%s/%s.db' % (date, repo_cfg.name)
if not db_path.exists():
logger.info(
dict(
msg='downloading db',
url=db_url,
dest=str(db_path),
)
)
pacman_t.download_db(db_url, db_path)
else:
logger.info(
dict(
msg='db already cached on disk',
path=str(db_path),
)
)
db_sha256 = cache_db_t.file_sha256(db_path)
snapshot_id = cache_db.upsert_snapshot(
date=date,
repo=repo_cfg.name,
arch=arch,
db_sha256=db_sha256,
db_rel_path=db_rel_path,
)
if cache_db.snapshot_package_count(snapshot_id) > 0:
snap = cache_db.get_snapshot_by_id(snapshot_id)
if snap is not None and snap.db_sha256 == db_sha256:
logger.info(
dict(
msg='snapshot already in sqlite',
date=date,
repo=repo_cfg.name,
snapshot_id=snapshot_id,
)
)
continue
index = db_parser_t.parse_db_path(db_path, repo_name=repo_cfg.name)
cache_db.store_index(
snapshot_id=snapshot_id,
index=index,
)
logger.info(
dict(
msg='synced',
date=date,
repo=repo_cfg.name,
packages=len(index.packages),
)
)
@staticmethod
def _parse_date(s: str) -> datetime.date:
parts = s.split('/')
if len(parts) == 3:
return datetime.date(int(parts[0]), int(parts[1]), int(parts[2]))
return datetime.date.fromisoformat(s)
@staticmethod
def _format_date(d: datetime.date) -> str:
return '%04d/%02d/%02d' % (d.year, d.month, d.day)
@staticmethod
def sync_date_range(
start_date: str,
end_date: str,
cache_dir: pathlib.Path,
cache_db: cache_db_t,
repos: Optional[list[str]] = None,
arch: str = 'x86_64',
step_days: int = 1,
) -> None:
start = archive_t._parse_date(start_date)
end = archive_t._parse_date(end_date)
step = datetime.timedelta(days=step_days)
current = end
while current >= start:
date_str = archive_t._format_date(current)
try:
archive_t.sync_date(
date=date_str,
cache_dir=cache_dir,
cache_db=cache_db,
repos=repos,
arch=arch,
)
except Exception:
logger.warning(
dict(
msg='failed to sync date, skipping',
date=date_str,
),
exc_info=True,
)
current -= step
def main(args: list[str]) -> int:
archive_parser = argparse.ArgumentParser(
prog='online-fxreader-pr34-archlinux archive',
)
archive_parser.add_argument(
'action',
choices=[o.value for o in ArchiveAction],
)
archive_parser.add_argument(
'--cache-dir',
dest='cache_dir',
required=True,
help='directory for cached .db files and sqlite database',
)
archive_parser.add_argument(
'--repos',
nargs='*',
default=['core', 'extra', 'multilib'],
)
archive_parser.add_argument(
'--arch',
default='x86_64',
)
archive_parser.add_argument(
'--date',
default=None,
help='single date (e.g. 2024/01/15) for sync',
)
archive_parser.add_argument(
'--date-range',
dest='date_range',
nargs=2,
metavar=('START', 'END'),
default=None,
help='date range for sync (e.g. 2024/01/01 2024/06/30)',
)
archive_parser.add_argument(
'--date-step',
dest='date_step',
type=int,
default=1,
help='step in days when iterating date range, default 1',
)
archive_parser.add_argument(
'--packages',
nargs='*',
default=None,
help='package names for show-versions',
)
archive_options = archive_parser.parse_args(args)
archive_options.action = ArchiveAction(archive_options.action)
cache_dir = pathlib.Path(archive_options.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
db = cache_db_t(cache_dir / 'archlinux_cache.db')
try:
if archive_options.action is ArchiveAction.list_dates:
if db.has_data():
print('=== cached dates ===')
for date_str in db.list_dates():
print(date_str)
print('=== remote dates ===')
for date_str in archive_t.list_remote_dates():
print(date_str)
elif archive_options.action is ArchiveAction.list_packages:
for row in db.package_count_per_date():
print('%s %d' % (row.date, row.count))
elif archive_options.action is ArchiveAction.show_versions:
if archive_options.packages is None or len(archive_options.packages) == 0:
logger.error('--packages required for show-versions')
return 1
for row in db.get_package_versions(archive_options.packages):
print('%s %s %s %s' % (row.date, row.repo, row.name, row.version))
elif archive_options.action is ArchiveAction.sync:
if archive_options.date is not None:
archive_t.sync_date(
date=archive_options.date,
cache_dir=cache_dir,
cache_db=db,
repos=archive_options.repos,
arch=archive_options.arch,
)
elif archive_options.date_range is not None:
archive_t.sync_date_range(
start_date=archive_options.date_range[0],
end_date=archive_options.date_range[1],
cache_dir=cache_dir,
cache_db=db,
repos=archive_options.repos,
arch=archive_options.arch,
step_days=archive_options.date_step,
)
else:
logger.error('sync requires --date or --date-range')
return 1
else:
raise NotImplementedError
finally:
db.close()
return 0

@ -0,0 +1,691 @@
import datetime
import hashlib
import io
import logging
import pathlib
import sqlite3
from typing import (
ClassVar,
Generator,
Optional,
TypeVar,
)
import pydantic
from .models import (
package_desc_t,
repo_index_t,
)
logger = logging.getLogger(__name__)
_T = TypeVar('_T', bound=pydantic.BaseModel)
class snapshot_row_t(pydantic.BaseModel):
id: int
date: str
repo: str
arch: str
db_sha256: str
db_rel_path: str
synced_at: str
class package_row_t(pydantic.BaseModel):
id: int
snapshot_id: int
name: str
version: str
base: str = ''
desc: str = ''
filename: str = ''
csize: int = 0
isize: int = 0
md5sum: str = ''
sha256sum: str = ''
url: str = ''
arch: str = ''
builddate: int = 0
packager: str = ''
class package_version_row_t(pydantic.BaseModel):
date: str
repo: str
name: str
version: str
class date_count_row_t(pydantic.BaseModel):
date: str
count: int
class package_hash_row_t(pydantic.BaseModel):
sha256sum: str
class local_package_row_t(pydantic.BaseModel):
id: int
name: str
version: str
filename: str
sha256sum: str
local_path: str
downloaded_at: str
class signature_row_t(pydantic.BaseModel):
id: int
local_package_id: int
sig_path: str
keyring_package_version: Optional[str] = None
gpg_key_id: Optional[str] = None
verified_at: Optional[str] = None
class trusted_entry_t(pydantic.BaseModel, frozen=True):
name: str
version: str
def _stream_rows(
cur: sqlite3.Cursor,
model: type[_T],
) -> Generator[_T, None, None]:
columns = [desc[0] for desc in cur.description]
for raw in cur:
yield model.model_validate(dict(zip(columns, raw)))
def _fetch_one(
cur: sqlite3.Cursor,
model: type[_T],
) -> Optional[_T]:
columns = [desc[0] for desc in cur.description]
raw = cur.fetchone()
if raw is None:
return None
return model.model_validate(dict(zip(columns, raw)))
class cache_db_t:
class constants_t:
schema_version: ClassVar[int] = 1
list_relation_types: ClassVar[dict[str, str]] = {
'license': 'license',
'depends': 'depends',
'optdepends': 'optdepends',
'makedepends': 'makedepends',
'checkdepends': 'checkdepends',
'provides': 'provides',
'conflicts': 'conflicts',
'replaces': 'replaces',
'groups': 'groups',
}
def __init__(self, db_path: pathlib.Path) -> None:
self._db_path = db_path
self._conn = sqlite3.connect(str(db_path))
self._conn.execute('PRAGMA journal_mode=WAL')
self._conn.execute('PRAGMA foreign_keys=ON')
self._ensure_schema()
def close(self) -> None:
self._conn.close()
def _ensure_schema(self) -> None:
cur = self._conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='schema_meta'")
if cur.fetchone() is None:
self._create_schema(cur)
self._conn.commit()
return
cur.execute('SELECT version FROM schema_meta LIMIT 1')
row = cur.fetchone()
if row is None or row[0] < cache_db_t.constants_t.schema_version:
self._create_schema(cur)
self._conn.commit()
def _create_schema(self, cur: sqlite3.Cursor) -> None:
cur.executescript("""
CREATE TABLE IF NOT EXISTS schema_meta (
version INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
repo TEXT NOT NULL,
arch TEXT NOT NULL DEFAULT 'x86_64',
db_sha256 TEXT NOT NULL,
db_rel_path TEXT NOT NULL DEFAULT '',
synced_at TEXT NOT NULL,
UNIQUE(date, repo, arch)
);
CREATE TABLE IF NOT EXISTS packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL REFERENCES snapshots(id) ON DELETE CASCADE,
name TEXT NOT NULL,
version TEXT NOT NULL,
base TEXT NOT NULL DEFAULT '',
desc TEXT NOT NULL DEFAULT '',
filename TEXT NOT NULL DEFAULT '',
csize INTEGER NOT NULL DEFAULT 0,
isize INTEGER NOT NULL DEFAULT 0,
md5sum TEXT NOT NULL DEFAULT '',
sha256sum TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
arch TEXT NOT NULL DEFAULT '',
builddate INTEGER NOT NULL DEFAULT 0,
packager TEXT NOT NULL DEFAULT '',
UNIQUE(snapshot_id, name)
);
CREATE TABLE IF NOT EXISTS package_relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
relation_type TEXT NOT NULL,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS local_packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
version TEXT NOT NULL,
filename TEXT NOT NULL,
sha256sum TEXT NOT NULL DEFAULT '',
local_path TEXT NOT NULL,
downloaded_at TEXT NOT NULL,
UNIQUE(name, version, filename)
);
CREATE TABLE IF NOT EXISTS local_signatures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
local_package_id INTEGER NOT NULL REFERENCES local_packages(id) ON DELETE CASCADE,
sig_path TEXT NOT NULL,
keyring_package_version TEXT DEFAULT NULL,
gpg_key_id TEXT DEFAULT NULL,
verified_at TEXT DEFAULT NULL,
UNIQUE(local_package_id)
);
CREATE INDEX IF NOT EXISTS idx_packages_snapshot ON packages(snapshot_id);
CREATE INDEX IF NOT EXISTS idx_packages_name ON packages(name);
CREATE INDEX IF NOT EXISTS idx_packages_name_version ON packages(name, version);
CREATE INDEX IF NOT EXISTS idx_snapshots_date ON snapshots(date);
CREATE INDEX IF NOT EXISTS idx_package_relations_pkg
ON package_relations(package_id, relation_type);
CREATE INDEX IF NOT EXISTS idx_local_packages_name_version
ON local_packages(name, version);
""")
cur.execute('DELETE FROM schema_meta')
cur.execute(
'INSERT INTO schema_meta (version) VALUES (?)',
(cache_db_t.constants_t.schema_version,),
)
# ── helpers ──
@staticmethod
def file_sha256(path: pathlib.Path) -> str:
h = hashlib.sha256()
with io.open(path, 'rb') as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
# ── snapshot CRUD ──
def upsert_snapshot(
self,
date: str,
repo: str,
arch: str,
db_sha256: str,
db_rel_path: str = '',
) -> int:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
cur = self._conn.cursor()
cur.execute(
'SELECT id, db_sha256 FROM snapshots WHERE date=? AND repo=? AND arch=?',
(date, repo, arch),
)
row = cur.fetchone()
if row is not None:
snapshot_id: int = row[0]
if row[1] == db_sha256:
return snapshot_id
cur.execute(
'DELETE FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
cur.execute(
'UPDATE snapshots SET db_sha256=?, db_rel_path=?, synced_at=? WHERE id=?',
(db_sha256, db_rel_path, now, snapshot_id),
)
self._conn.commit()
return snapshot_id
cur.execute(
'INSERT INTO snapshots (date, repo, arch, db_sha256, db_rel_path, synced_at) VALUES (?, ?, ?, ?, ?, ?)',
(date, repo, arch, db_sha256, db_rel_path, now),
)
self._conn.commit()
assert cur.lastrowid is not None
return cur.lastrowid
def get_snapshot(
self,
date: str,
repo: str,
arch: str,
) -> Optional[snapshot_row_t]:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM snapshots WHERE date=? AND repo=? AND arch=?',
(date, repo, arch),
)
return _fetch_one(cur, snapshot_row_t)
def get_snapshot_by_id(
self,
snapshot_id: int,
) -> Optional[snapshot_row_t]:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM snapshots WHERE id=?',
(snapshot_id,),
)
return _fetch_one(cur, snapshot_row_t)
def list_snapshots(self) -> Generator[snapshot_row_t, None, None]:
cur = self._conn.cursor()
cur.execute('SELECT * FROM snapshots ORDER BY date DESC, repo')
yield from _stream_rows(cur, snapshot_row_t)
def list_dates(self) -> list[str]:
cur = self._conn.cursor()
cur.execute('SELECT DISTINCT date FROM snapshots ORDER BY date DESC')
return [row[0] for row in cur.fetchall()]
def snapshot_package_count(self, snapshot_id: int) -> int:
cur = self._conn.cursor()
cur.execute(
'SELECT COUNT(*) FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
row = cur.fetchone()
return row[0] if row is not None else 0
# ── package CRUD ──
def store_index(
self,
snapshot_id: int,
index: repo_index_t,
) -> None:
cur = self._conn.cursor()
pkg_rows: list[tuple[int, str, str, str, str, str, int, int, str, str, str, str, int, str]] = []
for pkg in index.packages.values():
pkg_rows.append(
(
snapshot_id,
pkg.name,
pkg.version,
pkg.base,
pkg.desc,
pkg.filename,
pkg.csize,
pkg.isize,
pkg.md5sum,
pkg.sha256sum,
pkg.url,
pkg.arch,
pkg.builddate,
pkg.packager,
)
)
cur.executemany(
'INSERT OR REPLACE INTO packages '
'(snapshot_id, name, version, base, desc, filename, csize, isize, '
'md5sum, sha256sum, url, arch, builddate, packager) '
'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
pkg_rows,
)
cur.execute(
'SELECT id, name FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
pkg_id_map: dict[str, int] = {}
for row_raw in cur.fetchall():
pkg_id_map[row_raw[1]] = row_raw[0]
rel_rows: list[tuple[int, str, str]] = []
for pkg in index.packages.values():
pkg_id = pkg_id_map.get(pkg.name)
if pkg_id is None:
continue
for rel_type, attr_name in cache_db_t.constants_t.list_relation_types.items():
values: list[str] = getattr(pkg, attr_name)
for v in values:
rel_rows.append((pkg_id, rel_type, v))
if len(rel_rows) > 0:
cur.executemany(
'INSERT INTO package_relations (package_id, relation_type, value) VALUES (?, ?, ?)',
rel_rows,
)
self._conn.commit()
logger.info(
dict(
msg='stored index',
snapshot_id=snapshot_id,
packages=len(pkg_rows),
relations=len(rel_rows),
)
)
def package_count_per_date(self) -> Generator[date_count_row_t, None, None]:
cur = self._conn.cursor()
cur.execute('SELECT s.date AS date, COUNT(p.id) AS count FROM snapshots s JOIN packages p ON p.snapshot_id = s.id GROUP BY s.date ORDER BY s.date DESC')
yield from _stream_rows(cur, date_count_row_t)
def get_package_versions(
self,
names: list[str],
) -> Generator[package_version_row_t, None, None]:
if len(names) == 0:
yield from ()
return
cur = self._conn.cursor()
placeholders = ','.join('?' for _ in names)
cur.execute(
'SELECT s.date AS date, s.repo AS repo, p.name AS name, p.version AS version '
'FROM packages p '
'JOIN snapshots s ON s.id = p.snapshot_id '
'WHERE p.name IN (%s) '
'ORDER BY p.name, s.date DESC' % placeholders,
names,
)
yield from _stream_rows(cur, package_version_row_t)
def find_package_hash(
self,
name: str,
version: str,
) -> Optional[package_hash_row_t]:
cur = self._conn.cursor()
cur.execute(
"SELECT sha256sum FROM packages WHERE name=? AND version=? AND sha256sum != '' ORDER BY snapshot_id DESC LIMIT 1",
(name, version),
)
return _fetch_one(cur, package_hash_row_t)
# ── repo_index_t loading ──
def load_repo_index(
self,
snapshot_id: int,
repo_name: str,
) -> repo_index_t:
cur = self._conn.cursor()
cur.execute(
'SELECT * FROM packages WHERE snapshot_id=?',
(snapshot_id,),
)
index = repo_index_t(name=repo_name)
pkg_ids: list[int] = []
pkg_by_id: dict[int, package_desc_t] = {}
columns = [desc[0] for desc in cur.description]
for raw in cur.fetchall():
row_dict = dict(zip(columns, raw))
pkg = package_desc_t(
name=row_dict['name'],
version=row_dict['version'],
base=row_dict['base'],
desc=row_dict['desc'],
filename=row_dict['filename'],
csize=row_dict['csize'],
isize=row_dict['isize'],
md5sum=row_dict['md5sum'],
sha256sum=row_dict['sha256sum'],
url=row_dict['url'],
arch=row_dict['arch'],
builddate=row_dict['builddate'],
packager=row_dict['packager'],
)
index.packages[pkg.name] = pkg
pkg_ids.append(row_dict['id'])
pkg_by_id[row_dict['id']] = pkg
if len(pkg_ids) > 0:
self._load_relations(cur, pkg_ids, pkg_by_id)
index.build_provides_index()
return index
def load_all_indices(self) -> list[repo_index_t]:
"""Load all snapshots as repo_index_t objects via bulk queries.
Returns one index per (snapshot_id, repo) so the solver sees all
package versions across all synced dates. Uses two bulk queries
instead of per-snapshot loading for performance.
"""
cur = self._conn.cursor()
cur.execute('SELECT * FROM snapshots ORDER BY date ASC')
snap_columns = [desc[0] for desc in cur.description]
snapshots = [dict(zip(snap_columns, raw)) for raw in cur.fetchall()]
cur.execute(
'SELECT id, snapshot_id, name, version, base, desc, filename, '
'csize, isize, md5sum, sha256sum, url, arch, builddate, packager '
'FROM packages'
)
pkg_columns = [desc[0] for desc in cur.description]
pkgs_by_snapshot: dict[int, dict[str, package_desc_t]] = {}
all_pkg_ids: list[int] = []
pkg_by_id: dict[int, package_desc_t] = {}
for raw in cur.fetchall():
rd = dict(zip(pkg_columns, raw))
pkg = package_desc_t(
name=rd['name'],
version=rd['version'],
base=rd['base'],
desc=rd['desc'],
filename=rd['filename'],
csize=rd['csize'],
isize=rd['isize'],
md5sum=rd['md5sum'],
sha256sum=rd['sha256sum'],
url=rd['url'],
arch=rd['arch'],
builddate=rd['builddate'],
packager=rd['packager'],
)
snap_id: int = rd['snapshot_id']
if snap_id not in pkgs_by_snapshot:
pkgs_by_snapshot[snap_id] = {}
pkgs_by_snapshot[snap_id][pkg.name] = pkg
all_pkg_ids.append(rd['id'])
pkg_by_id[rd['id']] = pkg
if len(all_pkg_ids) > 0:
self._load_relations(cur, all_pkg_ids, pkg_by_id)
indices: list[repo_index_t] = []
for snap in snapshots:
pkgs = pkgs_by_snapshot.get(snap['id'])
if pkgs is None or len(pkgs) == 0:
continue
idx = repo_index_t(name=snap['repo'], packages=pkgs)
idx.build_provides_index()
indices.append(idx)
return indices
def _load_relations(
self,
cur: sqlite3.Cursor,
pkg_ids: list[int],
pkg_by_id: dict[int, package_desc_t],
) -> None:
batch_size = 500
for i in range(0, len(pkg_ids), batch_size):
batch = pkg_ids[i : i + batch_size]
placeholders = ','.join('?' for _ in batch)
cur.execute(
'SELECT package_id, relation_type, value FROM package_relations WHERE package_id IN (%s)' % placeholders,
batch,
)
for row_raw in cur.fetchall():
pkg = pkg_by_id.get(row_raw[0])
if pkg is None:
continue
attr_name = cache_db_t.constants_t.list_relation_types.get(row_raw[1])
if attr_name is None:
continue
target_list: list[str] = getattr(pkg, attr_name)
target_list.append(row_raw[2])
# ── local packages & signatures ──
def record_local_package(
self,
name: str,
version: str,
filename: str,
sha256sum: str,
local_path: str,
) -> int:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
cur = self._conn.cursor()
cur.execute(
'INSERT OR REPLACE INTO local_packages (name, version, filename, sha256sum, local_path, downloaded_at) VALUES (?, ?, ?, ?, ?, ?)',
(name, version, filename, sha256sum, local_path, now),
)
self._conn.commit()
assert cur.lastrowid is not None
return cur.lastrowid
def record_signature(
self,
local_package_id: int,
sig_path: str,
keyring_package_version: Optional[str] = None,
gpg_key_id: Optional[str] = None,
) -> None:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
cur = self._conn.cursor()
cur.execute(
'INSERT OR REPLACE INTO local_signatures (local_package_id, sig_path, keyring_package_version, gpg_key_id, verified_at) VALUES (?, ?, ?, ?, ?)',
(local_package_id, sig_path, keyring_package_version, gpg_key_id, now),
)
self._conn.commit()
def get_signature_info(
self,
name: str,
version: str,
) -> Optional[signature_row_t]:
cur = self._conn.cursor()
cur.execute(
'SELECT ls.id, ls.local_package_id, ls.sig_path, '
'ls.keyring_package_version, ls.gpg_key_id, ls.verified_at '
'FROM local_signatures ls '
'JOIN local_packages lp ON lp.id = ls.local_package_id '
'WHERE lp.name=? AND lp.version=?',
(name, version),
)
return _fetch_one(cur, signature_row_t)
def get_trusted_package_set(
self,
trust_keyring_versions: Optional[list[str]] = None,
trust_gpg_keys: Optional[list[str]] = None,
exclude_keyring_versions: Optional[list[str]] = None,
exclude_gpg_keys: Optional[list[str]] = None,
) -> Optional[set[trusted_entry_t]]:
"""Return set of trusted (name, version) entries that pass trust filters.
Returns None if no trust filters are set (meaning all packages pass).
"""
has_filters = (
(trust_keyring_versions is not None and len(trust_keyring_versions) > 0)
or (trust_gpg_keys is not None and len(trust_gpg_keys) > 0)
or (exclude_keyring_versions is not None and len(exclude_keyring_versions) > 0)
or (exclude_gpg_keys is not None and len(exclude_gpg_keys) > 0)
)
if not has_filters:
return None
cur = self._conn.cursor()
cur.execute(
'SELECT lp.name, lp.version, ls.keyring_package_version, ls.gpg_key_id '
'FROM local_packages lp '
'JOIN local_signatures ls ON ls.local_package_id = lp.id'
)
trusted: set[trusted_entry_t] = set()
for row_raw in cur.fetchall():
keyring_ver = row_raw[2]
gpg_key = row_raw[3]
if exclude_keyring_versions and keyring_ver in exclude_keyring_versions:
continue
if exclude_gpg_keys and gpg_key in exclude_gpg_keys:
continue
is_trusted = False
if trust_keyring_versions and keyring_ver in trust_keyring_versions:
is_trusted = True
if trust_gpg_keys and gpg_key in trust_gpg_keys:
is_trusted = True
if not trust_keyring_versions and not trust_gpg_keys:
is_trusted = True
if is_trusted:
trusted.add(trusted_entry_t(name=row_raw[0], version=row_raw[1]))
return trusted
# ── status ──
def has_data(self) -> bool:
cur = self._conn.cursor()
cur.execute('SELECT COUNT(*) FROM snapshots')
row = cur.fetchone()
return row is not None and row[0] > 0