diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/__init__.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker.py new file mode 100644 index 0000000..21fa91a --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker.py @@ -0,0 +1,106 @@ +"""Arch Linux Security Tracker backend. + +Source: https://security.archlinux.org/issues/all.json +No auth. Full dump ~900KB. Always fetches entire dataset (no incremental API). +Uses pydantic TypeAdapter to parse the JSON response directly. +""" + +import logging +import urllib.request + +from typing import Optional + +import pydantic + +from .arch_tracker_types import arch_avg_t +from .base import SyncProgressCallback, cve_backend_t +from .types import ( + cve_entry_t, + cve_severity_t, + cve_source_t, + cve_status_t, + cve_sync_estimate_t, +) + +logger = logging.getLogger(__name__) + +ISSUES_URL = 'https://security.archlinux.org/issues/all.json' + +SEVERITY_MAP: dict[str, cve_severity_t] = { + 'Low': cve_severity_t.low, + 'Medium': cve_severity_t.medium, + 'High': cve_severity_t.high, + 'Critical': cve_severity_t.critical, +} + +STATUS_MAP: dict[str, cve_status_t] = { + 'Vulnerable': cve_status_t.vulnerable, + 'Fixed': cve_status_t.fixed, + 'Not affected': cve_status_t.not_affected, +} + +_avg_list_adapter = pydantic.TypeAdapter(list[arch_avg_t]) + + +class arch_tracker_backend_t(cve_backend_t): + @property + def source(self) -> cve_source_t: + return cve_source_t.arch_tracker + + def estimate_sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + ) -> cve_sync_estimate_t: + content_length = self._head_content_length(ISSUES_URL) + return cve_sync_estimate_t( + source=cve_source_t.arch_tracker, + num_fetches=1, + content_length=content_length, + incremental=False, + available=content_length > 0, + ) + + def sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + on_progress: Optional[SyncProgressCallback] = None, + ) -> list[cve_entry_t]: + logger.info(dict(msg='fetching arch security tracker', url=ISSUES_URL)) + + resp = urllib.request.urlopen(ISSUES_URL, timeout=30) + raw_bytes = resp.read() + + avgs = _avg_list_adapter.validate_json(raw_bytes) + + entries: list[cve_entry_t] = [] + total = len(avgs) + + for i, avg in enumerate(avgs): + severity = SEVERITY_MAP.get(avg.severity, cve_severity_t.unknown) + status = STATUS_MAP.get(avg.status, cve_status_t.unknown) + + for cve_id in avg.issues: + for pkg in avg.packages: + entries.append( + cve_entry_t( + cve_id=cve_id, + source=cve_source_t.arch_tracker, + product=pkg, + version_affected=avg.affected, + version_fixed=avg.fixed or '', + severity=severity, + title='%s %s' % (avg.name, avg.type), + status=status, + ) + ) + + if on_progress is not None and (i + 1) % 100 == 0: + on_progress(i + 1, total) + + if on_progress is not None: + on_progress(total, total) + + logger.info(dict(msg='arch tracker sync done', avgs=total, entries=len(entries))) + return entries diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker_types.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker_types.py new file mode 100644 index 0000000..2e45a2e --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker_types.py @@ -0,0 +1,20 @@ +"""Pydantic models for Arch Linux Security Tracker API responses.""" + +from typing import Optional + +import pydantic + + +class arch_avg_t(pydantic.BaseModel): + """One AVG entry from /issues/all.json.""" + + name: str + packages: list[str] = pydantic.Field(default_factory=list) + status: str = '' + severity: str = '' + type: str = '' + affected: str = '' + fixed: Optional[str] = None + ticket: Optional[str] = None + issues: list[str] = pydantic.Field(default_factory=list) + advisories: list[str] = pydantic.Field(default_factory=list) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/base.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/base.py new file mode 100644 index 0000000..45f19a5 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/base.py @@ -0,0 +1,55 @@ +"""Abstract backend interface for CVE data sources.""" + +import abc +import logging +import urllib.request + +from typing import Callable, Optional + +from .types import ( + cve_entry_t, + cve_source_t, + cve_sync_estimate_t, +) + +logger = logging.getLogger(__name__) + +SyncProgressCallback = Callable[[int, int], None] + + +class cve_backend_t(abc.ABC): + @property + @abc.abstractmethod + def source(self) -> cve_source_t: + raise NotImplementedError + + @abc.abstractmethod + def estimate_sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + ) -> cve_sync_estimate_t: + """Estimate sync size via HTTP HEAD or lightweight API call.""" + raise NotImplementedError + + @abc.abstractmethod + def sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + on_progress: Optional[SyncProgressCallback] = None, + ) -> list[cve_entry_t]: + """Fetch CVE entries from the source.""" + raise NotImplementedError + + @staticmethod + def _head_content_length(url: str) -> int: + """HTTP HEAD to get Content-Length. Returns 0 if unavailable.""" + try: + req = urllib.request.Request(url, method='HEAD') + resp = urllib.request.urlopen(req, timeout=10) + cl = resp.headers.get('Content-Length', '0') + return int(cl) + except Exception: + logger.debug(dict(msg='HEAD failed', url=url)) + return 0 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/checker.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/checker.py new file mode 100644 index 0000000..1c9dfcd --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/checker.py @@ -0,0 +1,84 @@ +"""Check packages against cached CVE data. + +Uses vercmp to compare package versions against affected/fixed ranges. +""" + +import logging + +from typing import Optional + +from ...models import vercmp_t +from .db import cve_db_t +from .types import ( + cve_check_result_t, + cve_entry_t, + cve_source_t, + cve_status_t, +) + +logger = logging.getLogger(__name__) + + +def _is_affected( + pkg_version: str, + entry: cve_entry_t, +) -> bool: + """Check if pkg_version is affected by this CVE entry. + + A package is affected if: + - version_affected is set and pkg_version <= version_affected + - version_fixed is set and pkg_version < version_fixed + - if both set: version_affected <= pkg_version < version_fixed + """ + if entry.version_fixed != '': + cmp = vercmp_t.vercmp(pkg_version, entry.version_fixed) + if cmp >= 0: + return False # at or above fix + + if entry.version_affected != '': + cmp = vercmp_t.vercmp(pkg_version, entry.version_affected) + if cmp > 0: + return False # above affected version + + # if neither is set, we can't determine — assume affected + if entry.version_affected == '' and entry.version_fixed == '': + return entry.status is cve_status_t.vulnerable + + return True + + +def check_packages( + db: cve_db_t, + packages: list[tuple[str, str]], + sources: Optional[list[cve_source_t]] = None, +) -> list[cve_check_result_t]: + """Check a list of (name, version) against cached CVEs. + + Returns list of CVE matches where the package version is affected. + """ + results: list[cve_check_result_t] = [] + + for name, version in packages: + entries = db.query_by_product(name) + + if sources is not None: + entries = [e for e in entries if e.source in sources] + + for entry in entries: + if not _is_affected(version, entry): + continue + + results.append( + cve_check_result_t( + package=name, + version=version, + cve_id=entry.cve_id, + severity=entry.severity, + score=entry.score, + title=entry.title, + version_fixed=entry.version_fixed, + status=entry.status, + ) + ) + + return results diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/db.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/db.py new file mode 100644 index 0000000..0602d7a --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/db.py @@ -0,0 +1,206 @@ +"""CVE ORM module — sqlite tables for cached CVE data.""" + +import json +import logging +import pathlib +import sqlite3 + +from typing import Any, Optional + +import pydantic + +from ..orm.registry import orm_module_t, orm_registry_t +from .types import ( + cve_entry_t, + cve_source_t, + cve_sync_status_t, +) + +logger = logging.getLogger(__name__) + +_entry_list_adapter = pydantic.TypeAdapter(list[cve_entry_t]) +_sync_status_list_adapter = pydantic.TypeAdapter(list[cve_sync_status_t]) + + +def _rows_to_dicts(cur: sqlite3.Cursor) -> list[dict[str, Any]]: + columns = [desc[0] for desc in cur.description] + return [dict(zip(columns, row)) for row in cur.fetchall()] + + +class cve_db_t(orm_module_t): + @classmethod + def table_prefix(cls) -> str: + return 'cve' + + @classmethod + def schema_version(cls) -> int: + return 1 + + @classmethod + def migrate(cls, conn: sqlite3.Connection, from_version: int, to_version: int) -> None: + if from_version < 1: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS cve_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + cve_id TEXT NOT NULL, + source TEXT NOT NULL, + product TEXT NOT NULL, + version_affected TEXT NOT NULL DEFAULT '', + version_fixed TEXT NOT NULL DEFAULT '', + severity TEXT NOT NULL DEFAULT 'unknown', + score REAL NOT NULL DEFAULT 0.0, + title TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + date_published TEXT NOT NULL DEFAULT '', + date_modified TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'unknown', + UNIQUE(cve_id, source, product) + ); + + CREATE TABLE IF NOT EXISTS cve_details ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + cve_id TEXT NOT NULL, + source TEXT NOT NULL, + raw_json TEXT NOT NULL DEFAULT '{}', + UNIQUE(cve_id, source) + ); + + CREATE TABLE IF NOT EXISTS cve_sync_meta ( + source TEXT PRIMARY KEY, + last_sync TEXT NOT NULL DEFAULT '', + entry_count INTEGER NOT NULL DEFAULT 0 + ); + + CREATE INDEX IF NOT EXISTS idx_cve_entries_cve_id ON cve_entries(cve_id); + CREATE INDEX IF NOT EXISTS idx_cve_entries_product ON cve_entries(product); + CREATE INDEX IF NOT EXISTS idx_cve_entries_source ON cve_entries(source); + """) + conn.commit() + + def __init__(self, db_path_or_conn: 'pathlib.Path | sqlite3.Connection') -> None: + if isinstance(db_path_or_conn, sqlite3.Connection): + super().__init__(db_path_or_conn) + else: + registry = orm_registry_t.get(db_path_or_conn) + super().__init__(registry.conn) + + def upsert_entries(self, entries: list[cve_entry_t]) -> int: + cur = self._conn.cursor() + count = 0 + for e in entries: + cur.execute( + ''' + INSERT INTO cve_entries + (cve_id, source, product, version_affected, version_fixed, + severity, score, title, description, date_published, date_modified, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(cve_id, source, product) DO UPDATE SET + version_affected = excluded.version_affected, + version_fixed = excluded.version_fixed, + severity = excluded.severity, + score = excluded.score, + title = excluded.title, + description = excluded.description, + date_published = excluded.date_published, + date_modified = excluded.date_modified, + status = excluded.status + ''', + ( + e.cve_id, + e.source.value, + e.product, + e.version_affected, + e.version_fixed, + e.severity.value, + e.score, + e.title, + e.description, + e.date_published, + e.date_modified, + e.status.value, + ), + ) + count += 1 + self._conn.commit() + return count + + def upsert_detail(self, cve_id: str, source: cve_source_t, raw: object) -> None: + self._conn.execute( + ''' + INSERT INTO cve_details (cve_id, source, raw_json) + VALUES (?, ?, ?) + ON CONFLICT(cve_id, source) DO UPDATE SET raw_json = excluded.raw_json + ''', + (cve_id, source.value, json.dumps(raw, default=str)), + ) + self._conn.commit() + + def update_sync_meta(self, source: cve_source_t, last_sync: str, entry_count: int) -> None: + self._conn.execute( + ''' + INSERT INTO cve_sync_meta (source, last_sync, entry_count) + VALUES (?, ?, ?) + ON CONFLICT(source) DO UPDATE SET + last_sync = excluded.last_sync, + entry_count = excluded.entry_count + ''', + (source.value, last_sync, entry_count), + ) + self._conn.commit() + + def get_sync_status(self, source: cve_source_t) -> cve_sync_status_t: + cur = self._conn.execute( + 'SELECT source, last_sync, entry_count FROM cve_sync_meta WHERE source = ?', + (source.value,), + ) + rows = _rows_to_dicts(cur) + if len(rows) == 0: + return cve_sync_status_t(source=source) + validated = _sync_status_list_adapter.validate_python(rows) + return validated[0] + + def get_all_sync_statuses(self) -> list[cve_sync_status_t]: + cur = self._conn.execute( + 'SELECT source, last_sync, entry_count FROM cve_sync_meta' + ) + return _sync_status_list_adapter.validate_python(_rows_to_dicts(cur)) + + def query_by_product(self, product: str) -> list[cve_entry_t]: + cur = self._conn.execute( + 'SELECT cve_id, source, product, version_affected, version_fixed, ' + 'severity, score, title, description, date_published, date_modified, status ' + 'FROM cve_entries WHERE product = ?', + (product,), + ) + return _entry_list_adapter.validate_python(_rows_to_dicts(cur)) + + def query_by_cve_id(self, cve_id: str) -> list[cve_entry_t]: + cur = self._conn.execute( + 'SELECT cve_id, source, product, version_affected, version_fixed, ' + 'severity, score, title, description, date_published, date_modified, status ' + 'FROM cve_entries WHERE cve_id = ?', + (cve_id,), + ) + return _entry_list_adapter.validate_python(_rows_to_dicts(cur)) + + def query_by_source(self, source: cve_source_t) -> list[cve_entry_t]: + cur = self._conn.execute( + 'SELECT cve_id, source, product, version_affected, version_fixed, ' + 'severity, score, title, description, date_published, date_modified, status ' + 'FROM cve_entries WHERE source = ?', + (source.value,), + ) + return _entry_list_adapter.validate_python(_rows_to_dicts(cur)) + + def count_entries(self, source: Optional[cve_source_t] = None) -> int: + if source is not None: + row = self._conn.execute( + 'SELECT COUNT(*) FROM cve_entries WHERE source = ?', + (source.value,), + ).fetchone() + else: + row = self._conn.execute('SELECT COUNT(*) FROM cve_entries').fetchone() + return row[0] if row else 0 + + +orm_registry_t.register(cve_db_t) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/heuristics.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/heuristics.py new file mode 100644 index 0000000..86c9929 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/heuristics.py @@ -0,0 +1,90 @@ +"""Heuristics to map CVE product names to Arch Linux package names. + +NVD uses CPE product names (e.g. 'vim', 'linux_kernel', 'openssl'). +This module provides simple mapping strategies: + 1. Exact match — product == arch package name + 2. Normalized match — underscores to hyphens, lowercase + 3. Known aliases — manual mapping table for common divergences +""" + +import logging +import re + +from typing import Optional + +logger = logging.getLogger(__name__) + +# known divergences: nvd/cpe product name -> arch package name +KNOWN_ALIASES: dict[str, str] = { + 'linux_kernel': 'linux', + 'linux': 'linux', + 'openssh': 'openssh', + 'openssl': 'openssl', + 'gnu_bash': 'bash', + 'gnutls': 'gnutls', + 'libtiff': 'libtiff', + 'libxml2': 'libxml2', + 'libpng': 'libpng', + 'zlib': 'zlib', + 'curl': 'curl', + 'wget': 'wget', + 'python': 'python', + 'ruby': 'ruby', + 'perl': 'perl', + 'node.js': 'nodejs', + 'nodejs': 'nodejs', + 'firefox': 'firefox', + 'thunderbird': 'thunderbird', + 'chromium': 'chromium', + 'vim': 'vim', + 'neovim': 'neovim', + 'git': 'git', + 'sudo': 'sudo', + 'systemd': 'systemd', + 'glibc': 'glibc', + 'binutils': 'binutils', + 'gcc': 'gcc', + 'grub2': 'grub', + 'xorg-server': 'xorg-server', +} + + +def normalize_product_name(product: str) -> str: + """Normalize a product name to match Arch conventions.""" + name = product.lower().strip() + name = name.replace('_', '-') + name = re.sub(r'[^a-z0-9\-+.]', '', name) + return name + + +def map_to_arch_package( + product: str, + known_packages: Optional[set[str]] = None, +) -> Optional[str]: + """Try to map a CVE product name to an Arch package name. + + Returns the mapped name or None if no match found. + + Strategies in order: + 1. Known aliases table + 2. Exact match against known_packages + 3. Normalized match against known_packages + """ + lower = product.lower() + + # 1. known aliases + if lower in KNOWN_ALIASES: + candidate = KNOWN_ALIASES[lower] + if known_packages is None or candidate in known_packages: + return candidate + + # 2. exact match + if known_packages is not None and lower in known_packages: + return lower + + # 3. normalized + normalized = normalize_product_name(product) + if known_packages is not None and normalized in known_packages: + return normalized + + return None diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd.py new file mode 100644 index 0000000..7704936 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd.py @@ -0,0 +1,211 @@ +"""NVD (NIST) backend. + +Source: https://services.nvd.nist.gov/rest/json/cves/2.0 +Optional API key. Rate limited: 5 req/30s without key, 50 with key. +Paginated (max 2000/page). Supports lastModStartDate/lastModEndDate (max 120 days). +""" + +import logging +import math +import time +import urllib.parse +import urllib.request + +from datetime import datetime, timedelta, timezone +from typing import Optional + +import pydantic + +from .base import SyncProgressCallback, cve_backend_t +from .nvd_types import nvd_response_t +from .types import ( + cve_entry_t, + cve_severity_t, + cve_source_t, + cve_status_t, + cve_sync_estimate_t, +) + +logger = logging.getLogger(__name__) + +BASE_URL = 'https://services.nvd.nist.gov/rest/json/cves/2.0' +PAGE_SIZE = 2000 +MAX_RANGE_DAYS = 120 +REQUEST_DELAY_NO_KEY = 6.5 # 5 req / 30s → ~6s between +REQUEST_DELAY_WITH_KEY = 0.7 # 50 req / 30s → ~0.6s between + + +def _severity_from_nvd(s: str) -> cve_severity_t: + mapping = { + 'LOW': cve_severity_t.low, + 'MEDIUM': cve_severity_t.medium, + 'HIGH': cve_severity_t.high, + 'CRITICAL': cve_severity_t.critical, + } + return mapping.get(s.upper(), cve_severity_t.unknown) + + +def _date_ranges(start: datetime, end: datetime) -> list[tuple[str, str]]: + """Split a date range into chunks of MAX_RANGE_DAYS.""" + ranges: list[tuple[str, str]] = [] + cur = start + while cur < end: + chunk_end = min(cur + timedelta(days=MAX_RANGE_DAYS), end) + ranges.append(( + cur.strftime('%Y-%m-%dT%H:%M:%S.000'), + chunk_end.strftime('%Y-%m-%dT%H:%M:%S.000'), + )) + cur = chunk_end + return ranges + + +class nvd_backend_t(cve_backend_t): + def __init__(self, api_key: Optional[str] = None) -> None: + self._api_key = api_key + self._delay = REQUEST_DELAY_WITH_KEY if api_key else REQUEST_DELAY_NO_KEY + + @property + def source(self) -> cve_source_t: + return cve_source_t.nvd + + def _build_url(self, params: dict[str, str]) -> str: + return '%s?%s' % (BASE_URL, urllib.parse.urlencode(params)) + + def _fetch_page(self, url: str) -> nvd_response_t: + req = urllib.request.Request(url) + if self._api_key: + req.add_header('apiKey', self._api_key) + resp = urllib.request.urlopen(req, timeout=30) + raw = resp.read() + return pydantic.TypeAdapter(nvd_response_t).validate_json(raw) + + def _compute_date_range( + self, + since: Optional[str], + months: Optional[int], + ) -> tuple[datetime, datetime]: + end = datetime.now(timezone.utc) + if since is not None: + start = datetime.fromisoformat(since).replace(tzinfo=timezone.utc) + elif months is not None: + start = end - timedelta(days=months * 30) + else: + start = end - timedelta(days=120) + return start, end + + def estimate_sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + ) -> cve_sync_estimate_t: + start, end = self._compute_date_range(since, months) + ranges = _date_ranges(start, end) + + # fetch first page of first range to get totalResults + if len(ranges) == 0: + return cve_sync_estimate_t( + source=cve_source_t.nvd, available=False, + ) + + params = { + 'lastModStartDate': ranges[0][0], + 'lastModEndDate': ranges[0][1], + 'resultsPerPage': '1', + } + try: + page = self._fetch_page(self._build_url(params)) + total_first_range = page.totalResults + except Exception as e: + logger.warning(dict(msg='nvd estimate failed', error=str(e))) + return cve_sync_estimate_t( + source=cve_source_t.nvd, available=False, + ) + + # rough estimate: total_first_range * num_ranges (assuming uniform distribution) + estimated_total = total_first_range * len(ranges) + pages_per_range = max(1, math.ceil(total_first_range / PAGE_SIZE)) + num_fetches = pages_per_range * len(ranges) + + return cve_sync_estimate_t( + source=cve_source_t.nvd, + num_fetches=num_fetches, + content_length=0, + incremental=since is not None, + available=True, + ) + + def sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + on_progress: Optional[SyncProgressCallback] = None, + ) -> list[cve_entry_t]: + start, end = self._compute_date_range(since, months) + ranges = _date_ranges(start, end) + + entries: list[cve_entry_t] = [] + fetch_count = 0 + + for range_start, range_end in ranges: + start_index = 0 + + while True: + params = { + 'lastModStartDate': range_start, + 'lastModEndDate': range_end, + 'resultsPerPage': str(PAGE_SIZE), + 'startIndex': str(start_index), + } + + url = self._build_url(params) + logger.info(dict(msg='nvd fetch', url=url)) + + page = self._fetch_page(url) + fetch_count += 1 + + for vuln in page.vulnerabilities: + cve = vuln.cve + desc = '' + for d in cve.descriptions: + if d.lang == 'en': + desc = d.value + break + + score = 0.0 + severity = cve_severity_t.unknown + for metric_key in ('cvssMetricV31', 'cvssMetricV30', 'cvssMetricV2'): + metrics = cve.metrics.get(metric_key, []) + if len(metrics) > 0: + m = metrics[0] + score = m.cvssData.baseScore + severity = _severity_from_nvd(m.cvssData.baseSeverity) + break + + entries.append( + cve_entry_t( + cve_id=cve.id, + source=cve_source_t.nvd, + product=cve.id, + severity=severity, + score=score, + title=cve.id, + description=desc, + date_published=cve.published, + date_modified=cve.lastModified, + ) + ) + + if on_progress is not None: + on_progress(len(entries), page.totalResults * len(ranges)) + + if start_index + page.resultsPerPage >= page.totalResults: + break + + start_index += page.resultsPerPage + time.sleep(self._delay) + + if len(ranges) > 1: + time.sleep(self._delay) + + logger.info(dict(msg='nvd sync done', fetches=fetch_count, entries=len(entries))) + return entries diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd_types.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd_types.py new file mode 100644 index 0000000..4999091 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd_types.py @@ -0,0 +1,50 @@ +"""Pydantic models for NVD (NIST) API responses.""" + +import pydantic + + +class nvd_cvss_data_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + version: str = '' + baseScore: float = 0.0 + baseSeverity: str = '' + + +class nvd_cvss_metric_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + source: str = '' + type: str = '' + cvssData: nvd_cvss_data_t = pydantic.Field(default_factory=nvd_cvss_data_t) + + +class nvd_description_t(pydantic.BaseModel): + lang: str + value: str + + +class nvd_cve_item_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + id: str + sourceIdentifier: str = '' + published: str = '' + lastModified: str = '' + vulnStatus: str = '' + descriptions: list[nvd_description_t] = pydantic.Field(default_factory=list) + metrics: dict[str, list[nvd_cvss_metric_t]] = pydantic.Field(default_factory=dict) + + +class nvd_vulnerability_t(pydantic.BaseModel): + cve: nvd_cve_item_t + + +class nvd_response_t(pydantic.BaseModel): + resultsPerPage: int = 0 + startIndex: int = 0 + totalResults: int = 0 + format: str = '' + version: str = '' + timestamp: str = '' + vulnerabilities: list[nvd_vulnerability_t] = pydantic.Field(default_factory=list) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv.py new file mode 100644 index 0000000..08b2b2c --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv.py @@ -0,0 +1,131 @@ +"""OSV (Google) backend. + +Source: https://api.osv.dev/v1/ +No auth. No rate limits. Arch Linux is NOT a supported ecosystem, +so we query using Debian ecosystem as a proxy and map results. +Supports batch queries (up to 1000 per request). +""" + +import json +import logging +import urllib.request + +from typing import Optional + +import pydantic + +from .base import SyncProgressCallback, cve_backend_t +from .osv_types import ( + osv_batch_request_t, + osv_batch_response_t, + osv_package_query_t, + osv_query_t, +) +from .types import ( + cve_entry_t, + cve_source_t, + cve_sync_estimate_t, +) + +logger = logging.getLogger(__name__) + +QUERY_URL = 'https://api.osv.dev/v1/querybatch' +VULN_URL = 'https://api.osv.dev/v1/vulns' +BATCH_SIZE = 1000 +DEFAULT_ECOSYSTEM = 'Debian:12' + + +class osv_backend_t(cve_backend_t): + def __init__(self, ecosystem: str = DEFAULT_ECOSYSTEM) -> None: + self._ecosystem = ecosystem + + @property + def source(self) -> cve_source_t: + return cve_source_t.osv + + def estimate_sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + ) -> cve_sync_estimate_t: + # OSV doesn't support time-range queries for bulk. + # Estimation not meaningful without a package list. + return cve_sync_estimate_t( + source=cve_source_t.osv, + num_fetches=0, + content_length=0, + incremental=False, + available=False, + ) + + def sync( + self, + since: Optional[str] = None, + months: Optional[int] = None, + on_progress: Optional[SyncProgressCallback] = None, + ) -> list[cve_entry_t]: + # OSV requires package names to query. A blind sync isn't supported. + # Use query_packages() instead. + logger.warning(dict(msg='osv sync requires explicit package list, use query_packages()')) + return [] + + def query_packages( + self, + packages: list[tuple[str, str]], + on_progress: Optional[SyncProgressCallback] = None, + ) -> list[cve_entry_t]: + """Query OSV for a list of (name, version) tuples. + + Uses batch API. Returns unified CVE entries. + """ + entries: list[cve_entry_t] = [] + total = len(packages) + + for batch_start in range(0, total, BATCH_SIZE): + batch = packages[batch_start:batch_start + BATCH_SIZE] + + request = osv_batch_request_t( + queries=[ + osv_query_t( + package=osv_package_query_t( + name=name, + ecosystem=self._ecosystem, + ), + version=version, + ) + for name, version in batch + ] + ) + + req = urllib.request.Request( + QUERY_URL, + data=request.model_dump_json().encode('utf-8'), + headers={'Content-Type': 'application/json'}, + method='POST', + ) + + resp = urllib.request.urlopen(req, timeout=30) + raw = resp.read() + batch_resp = pydantic.TypeAdapter(osv_batch_response_t).validate_json(raw) + + for i, result in enumerate(batch_resp.results): + if i >= len(batch): + break + pkg_name, pkg_version = batch[i] + for vuln in result.vulns: + entries.append( + cve_entry_t( + cve_id=vuln.id, + source=cve_source_t.osv, + product=pkg_name, + date_modified=vuln.modified, + title=vuln.id, + ) + ) + + if on_progress is not None: + done = min(batch_start + BATCH_SIZE, total) + on_progress(done, total) + + logger.info(dict(msg='osv query done', packages=total, entries=len(entries))) + return entries diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv_types.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv_types.py new file mode 100644 index 0000000..89d3e45 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv_types.py @@ -0,0 +1,30 @@ +"""Pydantic models for OSV (Google) API responses.""" + +import pydantic + + +class osv_vuln_brief_t(pydantic.BaseModel): + id: str + modified: str = '' + + +class osv_query_result_t(pydantic.BaseModel): + vulns: list[osv_vuln_brief_t] = pydantic.Field(default_factory=list) + + +class osv_batch_response_t(pydantic.BaseModel): + results: list[osv_query_result_t] = pydantic.Field(default_factory=list) + + +class osv_package_query_t(pydantic.BaseModel): + name: str + ecosystem: str + + +class osv_query_t(pydantic.BaseModel): + package: osv_package_query_t + version: str = '' + + +class osv_batch_request_t(pydantic.BaseModel): + queries: list[osv_query_t] diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/types.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/types.py new file mode 100644 index 0000000..cac56e7 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/types.py @@ -0,0 +1,82 @@ +"""CVE unified types — exposed to users of the cve module.""" + +import enum + +import pydantic + + +class cve_source_t(enum.Enum): + arch_tracker = 'arch_tracker' + nvd = 'nvd' + osv = 'osv' + + +class cve_severity_t(enum.Enum): + unknown = 'unknown' + low = 'low' + medium = 'medium' + high = 'high' + critical = 'critical' + + +class cve_status_t(enum.Enum): + unknown = 'unknown' + vulnerable = 'vulnerable' + fixed = 'fixed' + not_affected = 'not_affected' + + +class cve_entry_t(pydantic.BaseModel): + """Unified CVE entry across all sources.""" + + model_config = pydantic.ConfigDict(frozen=True) + + cve_id: str + source: cve_source_t + product: str + version_affected: str = '' + version_fixed: str = '' + severity: cve_severity_t = cve_severity_t.unknown + score: float = 0.0 + title: str = '' + description: str = '' + date_published: str = '' + date_modified: str = '' + status: cve_status_t = cve_status_t.unknown + + +class cve_sync_status_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(frozen=True) + + source: cve_source_t + last_sync: str = '' + entry_count: int = 0 + + +class cve_sync_estimate_t(pydantic.BaseModel): + """Estimate for an upcoming sync. + + num_fetches: expected HTTP requests needed. + content_length: total bytes across all fetches (from HEAD). + """ + + model_config = pydantic.ConfigDict(frozen=True) + + source: cve_source_t + num_fetches: int = 0 + content_length: int = 0 + incremental: bool = False + available: bool = True + + +class cve_check_result_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(frozen=True) + + package: str + version: str + cve_id: str + severity: cve_severity_t + score: float + title: str + version_fixed: str = '' + status: cve_status_t = cve_status_t.unknown diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_cve.py b/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_cve.py new file mode 100644 index 0000000..5f50d7a --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_cve.py @@ -0,0 +1,442 @@ +"""Tests for apps/cve/ module. + +Test coverage: + - types: parse arch tracker JSON, NVD JSON via pydantic + - db: ORM migration, upsert, query, sync meta + - arch_tracker: parse real AVG samples + - nvd: parse real NVD response sample + - osv: parse real OSV batch response sample + - checker: version comparison, affected/fixed logic + - heuristics: normalize, alias mapping, exact match +""" + +import json +import pathlib +import sqlite3 +import tempfile +import unittest + +import pydantic + +from ..apps.cve.arch_tracker_types import arch_avg_t +from ..apps.cve.nvd_types import nvd_response_t +from ..apps.cve.osv_types import osv_batch_response_t +from ..apps.cve.types import ( + cve_entry_t, + cve_severity_t, + cve_source_t, + cve_status_t, + cve_sync_estimate_t, +) +from ..apps.cve.db import cve_db_t +from ..apps.cve.checker import check_packages, _is_affected +from ..apps.cve.heuristics import ( + KNOWN_ALIASES, + map_to_arch_package, + normalize_product_name, +) +from ..apps.orm.registry import orm_registry_t + + +# ── real-world JSON samples (from curl) ── + +ARCH_AVG_SAMPLE = json.dumps([ + { + "name": "AVG-2843", + "packages": ["vim"], + "status": "Unknown", + "severity": "Unknown", + "type": "unknown", + "affected": "9.0.1224-1", + "fixed": "9.0.1225-1", + "ticket": None, + "issues": ["CVE-2023-0433", "CVE-2023-0288"], + "advisories": [], + }, + { + "name": "AVG-2842", + "packages": ["libtiff"], + "status": "Unknown", + "severity": "Unknown", + "type": "unknown", + "affected": "4.4.0-1", + "fixed": None, + "ticket": None, + "issues": ["CVE-2022-48281", "CVE-2022-3970"], + "advisories": [], + }, +]) + +NVD_RESPONSE_SAMPLE = json.dumps({ + "resultsPerPage": 1, + "startIndex": 0, + "totalResults": 1, + "format": "NVD_CVE", + "version": "2.0", + "timestamp": "2026-04-13T00:00:00.000", + "vulnerabilities": [ + { + "cve": { + "id": "CVE-2023-0433", + "sourceIdentifier": "security@huntr.dev", + "published": "2023-01-21T15:15:10.153", + "lastModified": "2024-11-21T07:37:10.260", + "vulnStatus": "Analyzed", + "descriptions": [ + {"lang": "en", "value": "Heap-based Buffer Overflow in GitHub repository vim/vim prior to 9.0.1225."}, + {"lang": "es", "value": "Desbordamiento de búfer basado en heap en vim/vim."}, + ], + "metrics": { + "cvssMetricV31": [ + { + "source": "nvd@nist.gov", + "type": "Primary", + "cvssData": { + "version": "3.1", + "baseScore": 7.8, + "baseSeverity": "HIGH", + }, + } + ], + }, + } + } + ], +}) + +OSV_RESPONSE_SAMPLE = json.dumps({ + "results": [ + { + "vulns": [ + {"id": "DEBIAN-CVE-2023-0433", "modified": "2026-01-01T00:00:00Z"}, + {"id": "DEBIAN-CVE-2023-0288", "modified": "2026-01-01T00:00:00Z"}, + ] + }, + { + "vulns": [] + }, + ] +}) + + +# ── Type parsing tests ── + +class TestArchTrackerTypes(unittest.TestCase): + def test_parse_avg_list(self) -> None: + adapter = pydantic.TypeAdapter(list[arch_avg_t]) + avgs = adapter.validate_json(ARCH_AVG_SAMPLE.encode()) + self.assertEqual(len(avgs), 2) + self.assertEqual(avgs[0].name, 'AVG-2843') + self.assertEqual(avgs[0].packages, ['vim']) + self.assertEqual(len(avgs[0].issues), 2) + self.assertEqual(avgs[0].affected, '9.0.1224-1') + self.assertEqual(avgs[0].fixed, '9.0.1225-1') + + def test_parse_avg_null_fixed(self) -> None: + adapter = pydantic.TypeAdapter(list[arch_avg_t]) + avgs = adapter.validate_json(ARCH_AVG_SAMPLE.encode()) + self.assertIsNone(avgs[1].fixed) + + def test_parse_avg_empty_advisories(self) -> None: + adapter = pydantic.TypeAdapter(list[arch_avg_t]) + avgs = adapter.validate_json(ARCH_AVG_SAMPLE.encode()) + self.assertEqual(avgs[0].advisories, []) + + +class TestNvdTypes(unittest.TestCase): + def test_parse_response(self) -> None: + resp = pydantic.TypeAdapter(nvd_response_t).validate_json(NVD_RESPONSE_SAMPLE.encode()) + self.assertEqual(resp.totalResults, 1) + self.assertEqual(len(resp.vulnerabilities), 1) + + cve = resp.vulnerabilities[0].cve + self.assertEqual(cve.id, 'CVE-2023-0433') + self.assertEqual(cve.published, '2023-01-21T15:15:10.153') + + def test_parse_descriptions(self) -> None: + resp = pydantic.TypeAdapter(nvd_response_t).validate_json(NVD_RESPONSE_SAMPLE.encode()) + descs = resp.vulnerabilities[0].cve.descriptions + en = [d for d in descs if d.lang == 'en'] + self.assertEqual(len(en), 1) + self.assertIn('vim', en[0].value) + + def test_parse_cvss(self) -> None: + resp = pydantic.TypeAdapter(nvd_response_t).validate_json(NVD_RESPONSE_SAMPLE.encode()) + metrics = resp.vulnerabilities[0].cve.metrics + self.assertIn('cvssMetricV31', metrics) + m = metrics['cvssMetricV31'][0] + self.assertAlmostEqual(m.cvssData.baseScore, 7.8) + self.assertEqual(m.cvssData.baseSeverity, 'HIGH') + + +class TestOsvTypes(unittest.TestCase): + def test_parse_batch_response(self) -> None: + resp = pydantic.TypeAdapter(osv_batch_response_t).validate_json(OSV_RESPONSE_SAMPLE.encode()) + self.assertEqual(len(resp.results), 2) + self.assertEqual(len(resp.results[0].vulns), 2) + self.assertEqual(resp.results[0].vulns[0].id, 'DEBIAN-CVE-2023-0433') + self.assertEqual(len(resp.results[1].vulns), 0) + + +# ── DB tests ── + +class TestCveDb(unittest.TestCase): + def setUp(self) -> None: + self.tmpdir = tempfile.mkdtemp() + orm_registry_t._registered_classes.clear() + orm_registry_t._instances.clear() + orm_registry_t.register(cve_db_t) + self.db_path = pathlib.Path(self.tmpdir) / 'test.db' + self.db = cve_db_t(self.db_path) + + def tearDown(self) -> None: + orm_registry_t.reset() + + def test_migration_creates_tables(self) -> None: + tables = self.db._conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" + ).fetchall() + names = {r[0] for r in tables} + self.assertIn('cve_entries', names) + self.assertIn('cve_details', names) + self.assertIn('cve_sync_meta', names) + + def test_upsert_and_query(self) -> None: + entries = [ + cve_entry_t( + cve_id='CVE-2023-0433', + source=cve_source_t.arch_tracker, + product='vim', + version_affected='9.0.1224-1', + version_fixed='9.0.1225-1', + severity=cve_severity_t.high, + score=7.8, + title='AVG-2843 unknown', + ), + cve_entry_t( + cve_id='CVE-2023-0288', + source=cve_source_t.arch_tracker, + product='vim', + version_affected='9.0.1224-1', + version_fixed='9.0.1225-1', + ), + ] + count = self.db.upsert_entries(entries) + self.assertEqual(count, 2) + + results = self.db.query_by_product('vim') + self.assertEqual(len(results), 2) + self.assertEqual(results[0].cve_id, 'CVE-2023-0433') + self.assertEqual(results[0].severity, cve_severity_t.high) + + def test_upsert_updates_existing(self) -> None: + e1 = cve_entry_t( + cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker, + product='vim', title='old title', + ) + self.db.upsert_entries([e1]) + + e2 = cve_entry_t( + cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker, + product='vim', title='new title', + ) + self.db.upsert_entries([e2]) + + results = self.db.query_by_product('vim') + self.assertEqual(len(results), 1) + self.assertEqual(results[0].title, 'new title') + + def test_query_by_cve_id(self) -> None: + self.db.upsert_entries([ + cve_entry_t(cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker, product='vim'), + cve_entry_t(cve_id='CVE-2023-0433', source=cve_source_t.nvd, product='CVE-2023-0433'), + ]) + results = self.db.query_by_cve_id('CVE-2023-0433') + self.assertEqual(len(results), 2) + + def test_query_by_source(self) -> None: + self.db.upsert_entries([ + cve_entry_t(cve_id='CVE-A', source=cve_source_t.arch_tracker, product='a'), + cve_entry_t(cve_id='CVE-B', source=cve_source_t.nvd, product='b'), + ]) + arch_only = self.db.query_by_source(cve_source_t.arch_tracker) + self.assertEqual(len(arch_only), 1) + self.assertEqual(arch_only[0].cve_id, 'CVE-A') + + def test_count_entries(self) -> None: + self.db.upsert_entries([ + cve_entry_t(cve_id='CVE-A', source=cve_source_t.arch_tracker, product='a'), + cve_entry_t(cve_id='CVE-B', source=cve_source_t.nvd, product='b'), + ]) + self.assertEqual(self.db.count_entries(), 2) + self.assertEqual(self.db.count_entries(cve_source_t.arch_tracker), 1) + + def test_sync_meta(self) -> None: + self.db.update_sync_meta(cve_source_t.arch_tracker, '2026-04-13', 500) + status = self.db.get_sync_status(cve_source_t.arch_tracker) + self.assertEqual(status.last_sync, '2026-04-13') + self.assertEqual(status.entry_count, 500) + + def test_sync_meta_all(self) -> None: + self.db.update_sync_meta(cve_source_t.arch_tracker, '2026-04-13', 100) + self.db.update_sync_meta(cve_source_t.nvd, '2026-04-12', 200) + statuses = self.db.get_all_sync_statuses() + self.assertEqual(len(statuses), 2) + + def test_sync_meta_missing(self) -> None: + status = self.db.get_sync_status(cve_source_t.osv) + self.assertEqual(status.last_sync, '') + self.assertEqual(status.entry_count, 0) + + def test_upsert_detail(self) -> None: + self.db.upsert_detail('CVE-2023-0433', cve_source_t.nvd, {'score': 7.8}) + row = self.db._conn.execute( + 'SELECT raw_json FROM cve_details WHERE cve_id = ?', ('CVE-2023-0433',) + ).fetchone() + self.assertIsNotNone(row) + data = json.loads(row[0]) + self.assertAlmostEqual(data['score'], 7.8) + + +# ── Checker tests ── + +class TestChecker(unittest.TestCase): + def setUp(self) -> None: + self.tmpdir = tempfile.mkdtemp() + orm_registry_t._registered_classes.clear() + orm_registry_t._instances.clear() + orm_registry_t.register(cve_db_t) + self.db = cve_db_t(pathlib.Path(self.tmpdir) / 'test.db') + + def tearDown(self) -> None: + orm_registry_t.reset() + + def test_affected_below_fix(self) -> None: + e = cve_entry_t( + cve_id='CVE-X', source=cve_source_t.arch_tracker, product='vim', + version_affected='9.0.1224-1', version_fixed='9.0.1225-1', + ) + self.assertTrue(_is_affected('9.0.1224-1', e)) + self.assertFalse(_is_affected('9.0.1225-1', e)) + self.assertFalse(_is_affected('9.1.0-1', e)) + + def test_affected_no_fix(self) -> None: + e = cve_entry_t( + cve_id='CVE-X', source=cve_source_t.arch_tracker, product='libtiff', + version_affected='4.4.0-1', status=cve_status_t.vulnerable, + ) + self.assertTrue(_is_affected('4.4.0-1', e)) + self.assertFalse(_is_affected('4.5.0-1', e)) + + def test_not_affected_above_fix(self) -> None: + e = cve_entry_t( + cve_id='CVE-X', source=cve_source_t.arch_tracker, product='vim', + version_affected='9.0.1224-1', version_fixed='9.0.1225-1', + ) + self.assertFalse(_is_affected('10.0.0-1', e)) + + def test_check_packages_finds_vulnerable(self) -> None: + self.db.upsert_entries([ + cve_entry_t( + cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker, + product='vim', version_affected='9.0.1224-1', version_fixed='9.0.1225-1', + severity=cve_severity_t.high, score=7.8, + ), + ]) + results = check_packages(self.db, [('vim', '9.0.1224-1')]) + self.assertEqual(len(results), 1) + self.assertEqual(results[0].cve_id, 'CVE-2023-0433') + + def test_check_packages_skips_fixed(self) -> None: + self.db.upsert_entries([ + cve_entry_t( + cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker, + product='vim', version_affected='9.0.1224-1', version_fixed='9.0.1225-1', + ), + ]) + results = check_packages(self.db, [('vim', '9.0.1225-1')]) + self.assertEqual(len(results), 0) + + def test_check_packages_source_filter(self) -> None: + self.db.upsert_entries([ + cve_entry_t(cve_id='CVE-A', source=cve_source_t.arch_tracker, product='vim', + version_affected='1.0-1', version_fixed='2.0-1'), + cve_entry_t(cve_id='CVE-B', source=cve_source_t.nvd, product='vim', + version_affected='1.0-1', version_fixed='2.0-1'), + ]) + results = check_packages( + self.db, [('vim', '1.0-1')], + sources=[cve_source_t.arch_tracker], + ) + self.assertEqual(len(results), 1) + self.assertEqual(results[0].cve_id, 'CVE-A') + + def test_check_no_cves_for_package(self) -> None: + results = check_packages(self.db, [('nonexistent', '1.0-1')]) + self.assertEqual(len(results), 0) + + +# ── Heuristics tests ── + +class TestHeuristics(unittest.TestCase): + def test_normalize_underscore(self) -> None: + self.assertEqual(normalize_product_name('linux_kernel'), 'linux-kernel') + + def test_normalize_case(self) -> None: + self.assertEqual(normalize_product_name('OpenSSL'), 'openssl') + + def test_normalize_special_chars(self) -> None: + self.assertEqual(normalize_product_name('lib@foo!bar'), 'libfoobar') + + def test_alias_linux_kernel(self) -> None: + self.assertEqual(map_to_arch_package('linux_kernel'), 'linux') + + def test_alias_node(self) -> None: + self.assertEqual(map_to_arch_package('node.js'), 'nodejs') + + def test_alias_grub(self) -> None: + self.assertEqual(map_to_arch_package('grub2'), 'grub') + + def test_exact_match(self) -> None: + known = {'vim', 'bash', 'glibc'} + self.assertEqual(map_to_arch_package('vim', known), 'vim') + + def test_normalized_match(self) -> None: + known = {'linux-kernel', 'bash'} + # 'linux_kernel' normalizes to 'linux-kernel' + # but known aliases maps it to 'linux' first — which is not in known + # so alias fails, exact fails, normalized matches + result = map_to_arch_package('linux_kernel', known) + self.assertEqual(result, 'linux-kernel') + + def test_no_match(self) -> None: + known = {'bash', 'vim'} + self.assertIsNone(map_to_arch_package('totally_unknown_product', known)) + + def test_alias_respects_known_packages(self) -> None: + # alias maps 'grub2' -> 'grub', but if 'grub' not in known, skip alias + known = {'vim'} + self.assertIsNone(map_to_arch_package('grub2', known)) + + def test_all_known_aliases_are_lowercase(self) -> None: + for key in KNOWN_ALIASES: + self.assertEqual(key, key.lower(), 'alias key must be lowercase: %s' % key) + + +# ── Sync estimate types ── + +class TestSyncEstimate(unittest.TestCase): + def test_available(self) -> None: + e = cve_sync_estimate_t( + source=cve_source_t.arch_tracker, + num_fetches=1, + content_length=903023, + available=True, + ) + self.assertTrue(e.available) + self.assertEqual(e.num_fetches, 1) + + def test_unavailable(self) -> None: + e = cve_sync_estimate_t(source=cve_source_t.osv, available=False) + self.assertFalse(e.available) + self.assertEqual(e.num_fetches, 0)