diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker.py index 21fa91a..8433f9b 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/arch_tracker.py @@ -1,12 +1,10 @@ -"""Arch Linux Security Tracker backend. +"""Arch Linux Security Tracker backend (async). Source: https://security.archlinux.org/issues/all.json -No auth. Full dump ~900KB. Always fetches entire dataset (no incremental API). -Uses pydantic TypeAdapter to parse the JSON response directly. +No auth required. Full dump ~900KB. """ import logging -import urllib.request from typing import Optional @@ -47,12 +45,12 @@ class arch_tracker_backend_t(cve_backend_t): def source(self) -> cve_source_t: return cve_source_t.arch_tracker - def estimate_sync( + async def estimate_sync( self, since: Optional[str] = None, months: Optional[int] = None, ) -> cve_sync_estimate_t: - content_length = self._head_content_length(ISSUES_URL) + content_length = await self._head_content_length(ISSUES_URL) return cve_sync_estimate_t( source=cve_source_t.arch_tracker, num_fetches=1, @@ -61,23 +59,22 @@ class arch_tracker_backend_t(cve_backend_t): available=content_length > 0, ) - def sync( + async def sync( self, since: Optional[str] = None, months: Optional[int] = None, on_progress: Optional[SyncProgressCallback] = None, ) -> list[cve_entry_t]: - logger.info(dict(msg='fetching arch security tracker', url=ISSUES_URL)) - - resp = urllib.request.urlopen(ISSUES_URL, timeout=30) - raw_bytes = resp.read() + logger.info(dict(msg='fetch', source='arch_tracker', url=ISSUES_URL)) + raw_bytes = await self._fetch_url(ISSUES_URL) avgs = _avg_list_adapter.validate_json(raw_bytes) - entries: list[cve_entry_t] = [] - total = len(avgs) + logger.info(dict(msg='fetched', source='arch_tracker', avgs=len(avgs), bytes=len(raw_bytes))) - for i, avg in enumerate(avgs): + entries: list[cve_entry_t] = [] + + for avg in avgs: severity = SEVERITY_MAP.get(avg.severity, cve_severity_t.unknown) status = STATUS_MAP.get(avg.status, cve_status_t.unknown) @@ -96,11 +93,5 @@ class arch_tracker_backend_t(cve_backend_t): ) ) - if on_progress is not None and (i + 1) % 100 == 0: - on_progress(i + 1, total) - - if on_progress is not None: - on_progress(total, total) - - logger.info(dict(msg='arch tracker sync done', avgs=total, entries=len(entries))) + logger.info(dict(msg='parsed', source='arch_tracker', avgs=len(avgs), entries=len(entries))) return entries diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/base.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/base.py index 45f19a5..3157d05 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/base.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/base.py @@ -1,6 +1,7 @@ """Abstract backend interface for CVE data sources.""" import abc +import asyncio import logging import urllib.request @@ -24,32 +25,59 @@ class cve_backend_t(abc.ABC): raise NotImplementedError @abc.abstractmethod - def estimate_sync( + async def estimate_sync( self, since: Optional[str] = None, months: Optional[int] = None, ) -> cve_sync_estimate_t: - """Estimate sync size via HTTP HEAD or lightweight API call.""" raise NotImplementedError @abc.abstractmethod - def sync( + async def sync( self, since: Optional[str] = None, months: Optional[int] = None, on_progress: Optional[SyncProgressCallback] = None, ) -> list[cve_entry_t]: - """Fetch CVE entries from the source.""" raise NotImplementedError @staticmethod - def _head_content_length(url: str) -> int: - """HTTP HEAD to get Content-Length. Returns 0 if unavailable.""" + async def _head_content_length(url: str) -> int: + loop = asyncio.get_running_loop() try: - req = urllib.request.Request(url, method='HEAD') - resp = urllib.request.urlopen(req, timeout=10) - cl = resp.headers.get('Content-Length', '0') - return int(cl) + def _do() -> int: + req = urllib.request.Request(url, method='HEAD') + resp = urllib.request.urlopen(req, timeout=10) + cl = resp.headers.get('Content-Length', '0') + return int(cl) + + return await loop.run_in_executor(None, _do) except Exception: logger.debug(dict(msg='HEAD failed', url=url)) return 0 + + @staticmethod + async def _fetch_url(url: str, timeout: int = 30) -> bytes: + loop = asyncio.get_running_loop() + + def _do() -> bytes: + resp = urllib.request.urlopen(url, timeout=timeout) + return resp.read() + + return await loop.run_in_executor(None, _do) + + @staticmethod + async def _post_json(url: str, data: bytes, timeout: int = 30) -> bytes: + loop = asyncio.get_running_loop() + + def _do() -> bytes: + req = urllib.request.Request( + url, + data=data, + headers={'Content-Type': 'application/json'}, + method='POST', + ) + resp = urllib.request.urlopen(req, timeout=timeout) + return resp.read() + + return await loop.run_in_executor(None, _do) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli.py new file mode 100644 index 0000000..841e6c4 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli.py @@ -0,0 +1,348 @@ +"""CVE CLI: sync, status, check subcommands.""" + +import argparse +import asyncio +import datetime +import enum +import logging +import pathlib + +from typing import Optional + +from .base import cve_backend_t +from .cli_render import render +from .cli_types import ( + check_cve_entry_t, + check_result_t, + date_source_t, + status_density_t, + status_ecosystem_category_t, + status_ecosystems_t, + status_result_t, + status_source_t, + sync_result_t, + sync_source_result_t, +) +from .db import cve_db_t +from .types import ( + cve_severity_t, + cve_source_t, + output_format_t, +) + +logger = logging.getLogger(__name__) + + +class cve_action_t(enum.StrEnum): + sync = 'sync' + status = 'status' + check = 'check' + + +class cve_cli_t: + @staticmethod + def _get_backend(source: cve_source_t, nvd_api_key: Optional[str] = None) -> cve_backend_t: + if source is cve_source_t.arch_tracker: + from .arch_tracker import arch_tracker_backend_t + + return arch_tracker_backend_t() + elif source is cve_source_t.nvd: + from .nvd import nvd_backend_t + + return nvd_backend_t(api_key=nvd_api_key) + elif source is cve_source_t.osv: + from .osv import osv_backend_t + + return osv_backend_t() + else: + raise ValueError('unknown source: %s' % source) + + @staticmethod + async def _async_sync( + cache_dir: pathlib.Path, + source: str, + since: Optional[str] = None, + months: Optional[int] = None, + nvd_api_key: Optional[str] = None, + ) -> sync_result_t: + db_path = cache_dir / 'archlinux_cache.db' + db = cve_db_t(db_path) + + sources: list[cve_source_t] = [] + if source == 'all': + sources = [cve_source_t.arch_tracker, cve_source_t.nvd] + else: + sources = [cve_source_t(source)] + + result = sync_result_t() + + for src in sources: + backend = cve_cli_t._get_backend(src, nvd_api_key=nvd_api_key) + + estimate = await backend.estimate_sync(since=since, months=months) + logger.info(dict( + msg='sync estimate', + source=src.value, + num_fetches=estimate.num_fetches, + content_length=estimate.content_length, + available=estimate.available, + )) + + if not estimate.available: + logger.warning(dict(msg='sync not available', source=src.value)) + result.sources.append(sync_source_result_t(source=src.value, status='not available')) + continue + + src_val = src.value + + def on_progress(done: int, total: int) -> None: + logger.info(dict(msg='sync progress', source=src_val, done=done, total=total)) + + entries = await backend.sync(since=since, months=months, on_progress=on_progress) + + upsert = db.upsert_entries(entries) + now = datetime.datetime.now(datetime.timezone.utc).isoformat() + db.update_sync_meta(src, last_sync=now, entry_count=db.count_entries(src)) + + logger.info(dict( + msg='ingested', + source=src.value, + received=upsert.received, + in_db=upsert.inserted, + )) + result.sources.append(sync_source_result_t( + source=src.value, synced=upsert.received, total_in_db=upsert.inserted, + )) + + if source in ('all', 'osv'): + from .osv import osv_ecosystems_t + + now_dt = datetime.datetime.now(datetime.timezone.utc) + eco_rows = await osv_ecosystems_t.to_ecosystem_rows(now_dt) + eco_count = db.upsert_ecosystems(eco_rows) + logger.info(dict(msg='osv ecosystems synced', count=eco_count)) + result.osv_ecosystems = eco_count + + return result + + @staticmethod + def sync( + cache_dir: pathlib.Path, + source: str, + fmt: output_format_t = output_format_t.txt, + since: Optional[str] = None, + months: Optional[int] = None, + nvd_api_key: Optional[str] = None, + timeout: Optional[int] = None, + ) -> int: + coro = cve_cli_t._async_sync( + cache_dir=cache_dir, + source=source, + since=since, + months=months, + nvd_api_key=nvd_api_key, + ) + + if timeout is not None: + async def _with_timeout() -> sync_result_t: + try: + return await asyncio.wait_for(coro, timeout=timeout) + except asyncio.TimeoutError: + logger.warning(dict(msg='sync timeout', timeout=timeout)) + return sync_result_t(error='timeout after %ds' % timeout) + + result = asyncio.run(_with_timeout()) + else: + result = asyncio.run(coro) + + render(result, fmt) + return 2 if result.error is not None else 0 + + @staticmethod + def status(cache_dir: pathlib.Path, fmt: output_format_t = output_format_t.txt) -> int: + db_path = cache_dir / 'archlinux_cache.db' + db = cve_db_t(db_path) + + now = datetime.datetime.now(datetime.timezone.utc) + result = status_result_t() + + for src in cve_source_t: + s = db.get_sync_status(src) + + last_sync_dt: Optional[datetime.datetime] = None + if s.last_sync: + try: + last_sync_dt = datetime.datetime.fromisoformat(s.last_sync) + except ValueError: + pass + + # fetch range: what modification-date window we actually synced + fr = db.fetch_range(src) + fetch_start = fr.earliest.date() if fr.earliest else None + fetch_end = fr.latest.date() if fr.latest else None + + # density: count entries by time bucket + if src is cve_source_t.nvd: + d = db.density(src, 'date_published', now) + density = status_density_t( + last_1mo=d['last_1mo'], last_3mo=d['last_3mo'], + last_1yr=d['last_1yr'], total=d['total'], + date_source=date_source_t.published, + ) + elif src is cve_source_t.arch_tracker: + d = db.density_by_cve_year(src, now) + density = status_density_t( + last_1mo=d['last_1mo'], last_3mo=d['last_3mo'], + last_1yr=d['last_1yr'], total=d['total'], + date_source=date_source_t.cve_id_year, + ) + else: + density = status_density_t(total=s.entry_count) + + result.sources.append(status_source_t( + source=src.value, + entries=s.entry_count, + last_sync=last_sync_dt, + fetch_range_start=fetch_start, + fetch_range_end=fetch_end, + density=density, + )) + + result.total_entries = db.count_entries() + + # ecosystems with supported/unsupported breakdown + eco_count = db.count_ecosystems() + if eco_count > 0: + by_cat = db.get_ecosystems_by_category() + all_eco_names: set[str] = set() + cat_models: dict[str, status_ecosystem_category_t] = {} + for cat, names in by_cat.items(): + cat_models[cat] = status_ecosystem_category_t(count=len(names), names=names) + all_eco_names.update(names) + + from .package_mapping import package_map_t + + tested = package_map_t.constants_t.tested_ecosystems + mapped_ecosystems = all_eco_names & tested + unsupported = sorted(all_eco_names - tested) + + result.osv_ecosystems = status_ecosystems_t( + total=eco_count, + supported=len(mapped_ecosystems), + unsupported=len(unsupported), + by_category=cat_models, + unsupported_names=unsupported, + ) + else: + result.osv_ecosystems = status_ecosystems_t(total=0) + + render(result, fmt) + return 0 + + @staticmethod + def check( + cache_dir: pathlib.Path, + requirements: pathlib.Path, + source: str = 'all', + fmt: output_format_t = output_format_t.txt, + ) -> int: + from .checker import check_packages + + db_path = cache_dir / 'archlinux_cache.db' + db = cve_db_t(db_path) + + packages: list[tuple[str, str]] = [] + for line in requirements.read_text().splitlines(): + line = line.strip() + if line == '' or line.startswith('#'): + continue + if ' #' in line: + line = line.split(' #', 1)[0].strip() + parts = line.split() + pkg_spec = parts[0] + if '==' in pkg_spec: + name, version = pkg_spec.split('==', 1) + packages.append((name, version)) + + sources_filter: Optional[list[cve_source_t]] = None + if source != 'all': + sources_filter = [cve_source_t(source)] + + results = check_packages(db, packages, sources=sources_filter) + + result = check_result_t( + packages_checked=len(packages), + cves_found=len(results), + affected_packages=len(set(r.package for r in results)), + ) + + for r in results: + result.by_severity.setdefault(r.severity.value, []).append( + check_cve_entry_t( + cve_id=r.cve_id, + package=r.package, + version=r.version, + score=r.score, + title=r.title, + version_fixed=r.version_fixed, + ) + ) + + render(result, fmt) + + has_severe = any( + r.severity in (cve_severity_t.critical, cve_severity_t.high) + for r in results + ) + return 1 if has_severe else 0 + + @classmethod + def main(cls, args: list[str], prog: str = 'cve') -> int: + parser = argparse.ArgumentParser(prog=prog) + parser.add_argument('--cache-dir', dest='cache_dir', required=True) + parser.add_argument('--format', dest='output_format', choices=[f.value for f in output_format_t], default='txt') + + subparsers = parser.add_subparsers(dest='action') + + sync_p = subparsers.add_parser(cve_action_t.sync, help='sync CVE data from remote sources') + sync_p.add_argument('--source', choices=['all', 'arch_tracker', 'nvd', 'osv'], default='arch_tracker') + sync_p.add_argument('--since', default=None, help='ISO date for incremental sync') + sync_p.add_argument('--months', type=int, default=None, help='sync last N months') + sync_p.add_argument('--nvd-api-key', dest='nvd_api_key', default=None) + sync_p.add_argument('--timeout', type=int, default=None, help='max seconds for sync') + + subparsers.add_parser(cve_action_t.status, help='show CVE sync status') + + check_p = subparsers.add_parser(cve_action_t.check, help='check packages against cached CVEs') + check_p.add_argument('--source', choices=['all', 'arch_tracker', 'nvd', 'osv'], default='all') + check_p.add_argument('-r', dest='requirements', required=True) + + opts = parser.parse_args(args) + fmt = output_format_t(opts.output_format) + + if opts.action == cve_action_t.sync: + return cls.sync( + cache_dir=pathlib.Path(opts.cache_dir), + source=opts.source, + fmt=fmt, + since=opts.since, + months=opts.months, + nvd_api_key=opts.nvd_api_key, + timeout=opts.timeout, + ) + elif opts.action == cve_action_t.status: + return cls.status(cache_dir=pathlib.Path(opts.cache_dir), fmt=fmt) + elif opts.action == cve_action_t.check: + return cls.check( + cache_dir=pathlib.Path(opts.cache_dir), + requirements=pathlib.Path(opts.requirements), + source=opts.source, + fmt=fmt, + ) + else: + parser.print_help() + return 1 + + +def main(args: list[str], prog: str = 'cve') -> int: + return cve_cli_t.main(args, prog=prog) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli_render.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli_render.py new file mode 100644 index 0000000..faab3ad --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli_render.py @@ -0,0 +1,145 @@ +"""Render CVE CLI result models to txt, json, yaml on stdout.""" + +import json +import logging +import sys + +import pydantic + +from .cli_types import ( + check_result_t, + status_result_t, + sync_result_t, +) +from .types import output_format_t + +logger = logging.getLogger(__name__) + + +def render(model: pydantic.BaseModel, fmt: output_format_t) -> None: + """Render a pydantic model to stdout in the requested format.""" + if fmt is output_format_t.json: + _render_json(model) + elif fmt is output_format_t.yaml: + _render_yaml(model) + else: + _render_txt(model) + + +def _render_json(model: pydantic.BaseModel) -> None: + sys.stdout.write(model.model_dump_json(indent=2) + '\n') + + +def _render_yaml(model: pydantic.BaseModel) -> None: + try: + import yaml + + sys.stdout.write(yaml.dump( + model.model_dump(mode='json'), + default_flow_style=False, + sort_keys=False, + )) + except ImportError: + logger.warning(dict(msg='pyyaml not installed, falling back to json')) + _render_json(model) + + +def _render_txt(model: pydantic.BaseModel) -> None: + if isinstance(model, sync_result_t): + _render_sync_txt(model) + elif isinstance(model, status_result_t): + _render_status_txt(model) + elif isinstance(model, check_result_t): + _render_check_txt(model) + else: + sys.stdout.write(model.model_dump_json(indent=2) + '\n') + + +def _render_sync_txt(r: sync_result_t) -> None: + if r.error is not None: + sys.stdout.write('error: %s\n' % r.error) + return + + for s in r.sources: + if s.status != 'ok': + sys.stdout.write('%-15s %s\n' % (s.source, s.status)) + else: + sys.stdout.write('%-15s synced=%d total=%d\n' % (s.source, s.synced, s.total_in_db)) + + if r.osv_ecosystems > 0: + sys.stdout.write('osv ecosystems: %d\n' % r.osv_ecosystems) + + +def _render_status_txt(r: status_result_t) -> None: + import datetime + + now = datetime.datetime.now(datetime.timezone.utc) + + for s in r.sources: + # sync age + if s.last_sync is not None: + delta = now - s.last_sync + if delta.days == 0: + age = 'today' + elif delta.days == 1: + age = '1d ago' + else: + age = '%dd ago' % delta.days + sync_str = '%s (%s)' % (s.last_sync.strftime('%Y-%m-%d %H:%M'), age) + else: + sync_str = 'never' + + sys.stdout.write('%-15s entries=%-6d synced=%s\n' % (s.source, s.entries, sync_str)) + + # fetch range (modification dates) + if s.fetch_range_start is not None and s.fetch_range_end is not None: + sys.stdout.write(' fetched: [%s, %s]\n' % ( + s.fetch_range_start.strftime('%Y-%m-%d'), + s.fetch_range_end.strftime('%Y-%m-%d'), + )) + + # density + d = s.density + if d.total > 0: + src_label = ' (by %s)' % d.date_source if d.date_source is not None else '' + sys.stdout.write(' density%s: 1mo=%d 3mo=%d 1yr=%d total=%d\n' % ( + src_label, d.last_1mo, d.last_3mo, d.last_1yr, d.total, + )) + + sys.stdout.write('total: %d entries\n' % r.total_entries) + + eco = r.osv_ecosystems + if eco.total > 0: + sys.stdout.write('\nosv ecosystems: %d total, %d supported, %d unsupported\n' % ( + eco.total, eco.supported, eco.unsupported, + )) + for cat, info in eco.by_category.items(): + pct = info.count / eco.total * 100 if eco.total > 0 else 0 + sys.stdout.write(' %s: %d (%.0f%%)\n' % (cat, info.count, pct)) + if len(eco.unsupported_names) > 0: + sys.stdout.write(' unsupported: %s\n' % ', '.join(eco.unsupported_names[:20])) + if len(eco.unsupported_names) > 20: + sys.stdout.write(' ... +%d more\n' % (len(eco.unsupported_names) - 20)) + else: + sys.stdout.write('\nosv ecosystems: not synced\n') + + +def _render_check_txt(r: check_result_t) -> None: + if r.cves_found == 0: + sys.stdout.write('no CVEs found (%d packages checked)\n' % r.packages_checked) + return + + severity_order = ['critical', 'high', 'medium', 'low', 'unknown'] + + for sev in severity_order: + group = r.by_severity.get(sev, []) + if len(group) == 0: + continue + sys.stdout.write('\n[%s] %d:\n' % (sev.upper(), len(group))) + for e in group: + fix = ' fix=%s' % e.version_fixed if e.version_fixed else '' + sys.stdout.write(' %-20s %s==%s score=%.1f%s\n' % ( + e.cve_id, e.package, e.version, e.score, fix, + )) + + sys.stdout.write('\n%d CVE(s), %d package(s) affected\n' % (r.cves_found, r.affected_packages)) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli_types.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli_types.py new file mode 100644 index 0000000..120ed28 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/cli_types.py @@ -0,0 +1,80 @@ +"""Pydantic models for CVE CLI command results.""" + +import datetime +import enum + +from typing import Optional + +import pydantic + + +class sync_source_result_t(pydantic.BaseModel): + source: str + status: str = 'ok' + synced: int = 0 + total_in_db: int = 0 + + +class sync_result_t(pydantic.BaseModel): + sources: list[sync_source_result_t] = pydantic.Field(default_factory=list) + osv_ecosystems: int = 0 + error: Optional[str] = None + + +class date_source_t(enum.StrEnum): + published = 'published' + modified = 'modified' + cve_id_year = 'cve_id_year' + nvd_cross_ref = 'nvd_cross_ref' + + +class status_density_t(pydantic.BaseModel): + last_1mo: int = 0 + last_3mo: int = 0 + last_1yr: int = 0 + total: int = 0 + date_source: Optional[date_source_t] = None + + +class status_source_t(pydantic.BaseModel): + source: str + entries: int = 0 + last_sync: Optional[datetime.datetime] = None + fetch_range_start: Optional[datetime.date] = None + fetch_range_end: Optional[datetime.date] = None + density: status_density_t = pydantic.Field(default_factory=status_density_t) + + +class status_ecosystem_category_t(pydantic.BaseModel): + count: int = 0 + names: list[str] = pydantic.Field(default_factory=list) + + +class status_ecosystems_t(pydantic.BaseModel): + total: int = 0 + supported: int = 0 + unsupported: int = 0 + by_category: dict[str, status_ecosystem_category_t] = pydantic.Field(default_factory=dict) + unsupported_names: list[str] = pydantic.Field(default_factory=list) + + +class status_result_t(pydantic.BaseModel): + sources: list[status_source_t] = pydantic.Field(default_factory=list) + total_entries: int = 0 + osv_ecosystems: status_ecosystems_t = pydantic.Field(default_factory=status_ecosystems_t) + + +class check_cve_entry_t(pydantic.BaseModel): + cve_id: str + package: str + version: str + score: float + title: str + version_fixed: str = '' + + +class check_result_t(pydantic.BaseModel): + packages_checked: int = 0 + cves_found: int = 0 + affected_packages: int = 0 + by_severity: dict[str, list[check_cve_entry_t]] = pydantic.Field(default_factory=dict) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/db.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/db.py index 0602d7a..d938c5b 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/db.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/db.py @@ -1,5 +1,6 @@ """CVE ORM module — sqlite tables for cached CVE data.""" +import datetime import json import logging import pathlib @@ -11,15 +12,19 @@ import pydantic from ..orm.registry import orm_module_t, orm_registry_t from .types import ( + cve_date_range_t, + cve_ecosystem_row_t, cve_entry_t, cve_source_t, cve_sync_status_t, + cve_upsert_result_t, ) logger = logging.getLogger(__name__) _entry_list_adapter = pydantic.TypeAdapter(list[cve_entry_t]) _sync_status_list_adapter = pydantic.TypeAdapter(list[cve_sync_status_t]) +_ecosystem_list_adapter = pydantic.TypeAdapter(list[cve_ecosystem_row_t]) def _rows_to_dicts(cur: sqlite3.Cursor) -> list[dict[str, Any]]: @@ -34,7 +39,7 @@ class cve_db_t(orm_module_t): @classmethod def schema_version(cls) -> int: - return 1 + return 2 @classmethod def migrate(cls, conn: sqlite3.Connection, from_version: int, to_version: int) -> None: @@ -77,6 +82,16 @@ class cve_db_t(orm_module_t): """) conn.commit() + if from_version < 2: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS cve_osv_ecosystems ( + name TEXT PRIMARY KEY, + category TEXT NOT NULL DEFAULT 'other', + synced_at TEXT NOT NULL DEFAULT '' + ); + """) + conn.commit() + def __init__(self, db_path_or_conn: 'pathlib.Path | sqlite3.Connection') -> None: if isinstance(db_path_or_conn, sqlite3.Connection): super().__init__(db_path_or_conn) @@ -84,9 +99,10 @@ class cve_db_t(orm_module_t): registry = orm_registry_t.get(db_path_or_conn) super().__init__(registry.conn) - def upsert_entries(self, entries: list[cve_entry_t]) -> int: + def upsert_entries(self, entries: list[cve_entry_t]) -> cve_upsert_result_t: cur = self._conn.cursor() - count = 0 + inserted = 0 + updated = 0 for e in entries: cur.execute( ''' @@ -120,9 +136,14 @@ class cve_db_t(orm_module_t): e.status.value, ), ) - count += 1 self._conn.commit() - return count + # count actual rows per source after upsert + actual = self.count_entries(entries[0].source) if len(entries) > 0 else 0 + return cve_upsert_result_t( + received=len(entries), + inserted=actual, + updated=0, + ) def upsert_detail(self, cve_id: str, source: cve_source_t, raw: object) -> None: self._conn.execute( @@ -192,6 +213,142 @@ class cve_db_t(orm_module_t): ) return _entry_list_adapter.validate_python(_rows_to_dicts(cur)) + def date_range(self, source: cve_source_t) -> tuple[str, str]: + """Return (earliest_date, latest_date) of entries for a source. Empty strings if no data.""" + row = self._conn.execute( + 'SELECT MIN(date_published), MAX(date_published) FROM cve_entries ' + 'WHERE source = ? AND date_published != ""', + (source.value,), + ).fetchone() + if row is None or row[0] is None: + # fallback to date_modified + row = self._conn.execute( + 'SELECT MIN(date_modified), MAX(date_modified) FROM cve_entries ' + 'WHERE source = ? AND date_modified != ""', + (source.value,), + ).fetchone() + if row is None or row[0] is None: + return ('', '') + return (str(row[0]), str(row[1])) + + def date_range(self, source: cve_source_t) -> cve_date_range_t: + """Return earliest/latest datetime of entries for a source. + + Tries date_published first, then date_modified, then extracts + year from CVE ID (CVE-YYYY-*) as fallback. + """ + for col in ['date_published', 'date_modified']: + row = self._conn.execute( + 'SELECT MIN(%s), MAX(%s) FROM cve_entries ' + 'WHERE source = ? AND %s != ""' % (col, col, col), + (source.value,), + ).fetchone() + if row is not None and row[0] is not None: + try: + earliest = datetime.datetime.fromisoformat(str(row[0]).replace('Z', '+00:00')) + latest = datetime.datetime.fromisoformat(str(row[1]).replace('Z', '+00:00')) + return cve_date_range_t(earliest=earliest, latest=latest) + except ValueError: + continue + + # fallback: extract year from CVE ID + row = self._conn.execute( + "SELECT MIN(SUBSTR(cve_id, 5, 4)), MAX(SUBSTR(cve_id, 5, 4)) " + "FROM cve_entries WHERE source = ? AND cve_id LIKE 'CVE-%'", + (source.value,), + ).fetchone() + if row is not None and row[0] is not None: + try: + earliest = datetime.datetime(int(row[0]), 1, 1, tzinfo=datetime.timezone.utc) + latest = datetime.datetime(int(row[1]), 12, 31, tzinfo=datetime.timezone.utc) + return cve_date_range_t(earliest=earliest, latest=latest) + except (ValueError, TypeError): + pass + + return cve_date_range_t() + + def covered_months(self, source: cve_source_t) -> list[str]: + """Return sorted list of YYYY-MM strings that have entries for this source. + + Uses date_published, date_modified, or CVE ID year as fallback. + """ + months: set[str] = set() + + for col in ['date_published', 'date_modified']: + rows = self._conn.execute( + "SELECT DISTINCT SUBSTR(%s, 1, 7) FROM cve_entries " + "WHERE source = ? AND %s != '' AND LENGTH(%s) >= 7" % (col, col, col), + (source.value,), + ).fetchall() + if len(rows) > 0: + for r in rows: + if r[0] is not None: + months.add(str(r[0])) + return sorted(months) + + # fallback: CVE ID year -> one entry per year as YYYY-01 + rows = self._conn.execute( + "SELECT DISTINCT SUBSTR(cve_id, 5, 4) FROM cve_entries " + "WHERE source = ? AND cve_id LIKE 'CVE-%'", + (source.value,), + ).fetchall() + for r in rows: + if r[0] is not None: + months.add('%s-01' % str(r[0])) + + return sorted(months) + + def fetch_range(self, source: cve_source_t) -> cve_date_range_t: + """Return min/max of date_modified — represents what sync window we fetched.""" + row = self._conn.execute( + 'SELECT MIN(date_modified), MAX(date_modified) FROM cve_entries ' + 'WHERE source = ? AND date_modified != ""', + (source.value,), + ).fetchone() + if row is None or row[0] is None: + return cve_date_range_t() + try: + earliest = datetime.datetime.fromisoformat(str(row[0]).replace('Z', '+00:00')) + latest = datetime.datetime.fromisoformat(str(row[1]).replace('Z', '+00:00')) + return cve_date_range_t(earliest=earliest, latest=latest) + except ValueError: + return cve_date_range_t() + + def density(self, source: cve_source_t, date_col: str, now: datetime.datetime) -> dict[str, int]: + """Count entries in time buckets. date_col is 'date_published' or a SQL expression.""" + result: dict[str, int] = {} + for label, months_ago in [('last_1mo', 1), ('last_3mo', 3), ('last_1yr', 12)]: + cutoff = (now - datetime.timedelta(days=months_ago * 30)).strftime('%Y-%m-%d') + row = self._conn.execute( + 'SELECT COUNT(*) FROM cve_entries WHERE source = ? AND %s >= ?' % date_col, + (source.value, cutoff), + ).fetchone() + result[label] = row[0] if row else 0 + row = self._conn.execute( + 'SELECT COUNT(*) FROM cve_entries WHERE source = ?', + (source.value,), + ).fetchone() + result['total'] = row[0] if row else 0 + return result + + def density_by_cve_year(self, source: cve_source_t, now: datetime.datetime) -> dict[str, int]: + """Count entries by year extracted from CVE ID.""" + result: dict[str, int] = {} + for label, months_ago in [('last_1mo', 1), ('last_3mo', 3), ('last_1yr', 12)]: + cutoff_year = (now - datetime.timedelta(days=months_ago * 30)).year + row = self._conn.execute( + "SELECT COUNT(*) FROM cve_entries WHERE source = ? " + "AND cve_id LIKE 'CVE-%%' AND CAST(SUBSTR(cve_id, 5, 4) AS INTEGER) >= ?", + (source.value, cutoff_year), + ).fetchone() + result[label] = row[0] if row else 0 + row = self._conn.execute( + 'SELECT COUNT(*) FROM cve_entries WHERE source = ?', + (source.value,), + ).fetchone() + result['total'] = row[0] if row else 0 + return result + def count_entries(self, source: Optional[cve_source_t] = None) -> int: if source is not None: row = self._conn.execute( @@ -203,4 +360,39 @@ class cve_db_t(orm_module_t): return row[0] if row else 0 + def upsert_ecosystems(self, ecosystems: list[cve_ecosystem_row_t]) -> int: + cur = self._conn.cursor() + count = 0 + for e in ecosystems: + cur.execute( + ''' + INSERT INTO cve_osv_ecosystems (name, category, synced_at) + VALUES (?, ?, ?) + ON CONFLICT(name) DO UPDATE SET + category = excluded.category, + synced_at = excluded.synced_at + ''', + (e.name, e.category, e.synced_at.isoformat() if e.synced_at else ''), + ) + count += 1 + self._conn.commit() + return count + + def get_ecosystems(self) -> list[cve_ecosystem_row_t]: + cur = self._conn.execute( + 'SELECT name, category, synced_at FROM cve_osv_ecosystems ORDER BY category, name' + ) + return _ecosystem_list_adapter.validate_python(_rows_to_dicts(cur)) + + def get_ecosystems_by_category(self) -> dict[str, list[str]]: + result: dict[str, list[str]] = {} + for e in self.get_ecosystems(): + result.setdefault(e.category, []).append(e.name) + return result + + def count_ecosystems(self) -> int: + row = self._conn.execute('SELECT COUNT(*) FROM cve_osv_ecosystems').fetchone() + return row[0] if row else 0 + + orm_registry_t.register(cve_db_t) diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd.py index 7704936..90fe857 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/nvd.py @@ -1,15 +1,14 @@ -"""NVD (NIST) backend. +"""NVD (NIST) backend (async). Source: https://services.nvd.nist.gov/rest/json/cves/2.0 Optional API key. Rate limited: 5 req/30s without key, 50 with key. Paginated (max 2000/page). Supports lastModStartDate/lastModEndDate (max 120 days). """ +import asyncio import logging import math -import time import urllib.parse -import urllib.request from datetime import datetime, timedelta, timezone from typing import Optional @@ -22,7 +21,6 @@ from .types import ( cve_entry_t, cve_severity_t, cve_source_t, - cve_status_t, cve_sync_estimate_t, ) @@ -31,8 +29,10 @@ logger = logging.getLogger(__name__) BASE_URL = 'https://services.nvd.nist.gov/rest/json/cves/2.0' PAGE_SIZE = 2000 MAX_RANGE_DAYS = 120 -REQUEST_DELAY_NO_KEY = 6.5 # 5 req / 30s → ~6s between -REQUEST_DELAY_WITH_KEY = 0.7 # 50 req / 30s → ~0.6s between +REQUEST_DELAY_NO_KEY = 6.5 +REQUEST_DELAY_WITH_KEY = 0.7 + +_response_adapter = pydantic.TypeAdapter(nvd_response_t) def _severity_from_nvd(s: str) -> cve_severity_t: @@ -46,7 +46,6 @@ def _severity_from_nvd(s: str) -> cve_severity_t: def _date_ranges(start: datetime, end: datetime) -> list[tuple[str, str]]: - """Split a date range into chunks of MAX_RANGE_DAYS.""" ranges: list[tuple[str, str]] = [] cur = start while cur < end: @@ -71,13 +70,22 @@ class nvd_backend_t(cve_backend_t): def _build_url(self, params: dict[str, str]) -> str: return '%s?%s' % (BASE_URL, urllib.parse.urlencode(params)) - def _fetch_page(self, url: str) -> nvd_response_t: - req = urllib.request.Request(url) - if self._api_key: - req.add_header('apiKey', self._api_key) - resp = urllib.request.urlopen(req, timeout=30) - raw = resp.read() - return pydantic.TypeAdapter(nvd_response_t).validate_json(raw) + async def _fetch_page(self, url: str) -> nvd_response_t: + loop = asyncio.get_running_loop() + + api_key = self._api_key + + def _do() -> bytes: + import urllib.request as ur + + req = ur.Request(url) + if api_key: + req.add_header('apiKey', api_key) + resp = ur.urlopen(req, timeout=30) + return resp.read() + + raw = await loop.run_in_executor(None, _do) + return _response_adapter.validate_json(raw) def _compute_date_range( self, @@ -93,7 +101,7 @@ class nvd_backend_t(cve_backend_t): start = end - timedelta(days=120) return start, end - def estimate_sync( + async def estimate_sync( self, since: Optional[str] = None, months: Optional[int] = None, @@ -101,11 +109,8 @@ class nvd_backend_t(cve_backend_t): start, end = self._compute_date_range(since, months) ranges = _date_ranges(start, end) - # fetch first page of first range to get totalResults if len(ranges) == 0: - return cve_sync_estimate_t( - source=cve_source_t.nvd, available=False, - ) + return cve_sync_estimate_t(source=cve_source_t.nvd, available=False) params = { 'lastModStartDate': ranges[0][0], @@ -113,15 +118,12 @@ class nvd_backend_t(cve_backend_t): 'resultsPerPage': '1', } try: - page = self._fetch_page(self._build_url(params)) + page = await self._fetch_page(self._build_url(params)) total_first_range = page.totalResults except Exception as e: logger.warning(dict(msg='nvd estimate failed', error=str(e))) - return cve_sync_estimate_t( - source=cve_source_t.nvd, available=False, - ) + return cve_sync_estimate_t(source=cve_source_t.nvd, available=False) - # rough estimate: total_first_range * num_ranges (assuming uniform distribution) estimated_total = total_first_range * len(ranges) pages_per_range = max(1, math.ceil(total_first_range / PAGE_SIZE)) num_fetches = pages_per_range * len(ranges) @@ -134,7 +136,7 @@ class nvd_backend_t(cve_backend_t): available=True, ) - def sync( + async def sync( self, since: Optional[str] = None, months: Optional[int] = None, @@ -160,7 +162,7 @@ class nvd_backend_t(cve_backend_t): url = self._build_url(params) logger.info(dict(msg='nvd fetch', url=url)) - page = self._fetch_page(url) + page = await self._fetch_page(url) fetch_count += 1 for vuln in page.vulnerabilities: @@ -202,10 +204,10 @@ class nvd_backend_t(cve_backend_t): break start_index += page.resultsPerPage - time.sleep(self._delay) + await asyncio.sleep(self._delay) if len(ranges) > 1: - time.sleep(self._delay) + await asyncio.sleep(self._delay) logger.info(dict(msg='nvd sync done', fetches=fetch_count, entries=len(entries))) return entries diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv.py index 08b2b2c..be19994 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv.py @@ -4,13 +4,16 @@ Source: https://api.osv.dev/v1/ No auth. No rate limits. Arch Linux is NOT a supported ecosystem, so we query using Debian ecosystem as a proxy and map results. Supports batch queries (up to 1000 per request). +Ecosystem list fetched from GCS bucket listing. """ -import json +import asyncio +import datetime import logging import urllib.request +import xml.etree.ElementTree as ET -from typing import Optional +from typing import ClassVar, Optional import pydantic @@ -18,10 +21,12 @@ from .base import SyncProgressCallback, cve_backend_t from .osv_types import ( osv_batch_request_t, osv_batch_response_t, + osv_ecosystem_t, osv_package_query_t, osv_query_t, ) from .types import ( + cve_ecosystem_row_t, cve_entry_t, cve_source_t, cve_sync_estimate_t, @@ -30,11 +35,100 @@ from .types import ( logger = logging.getLogger(__name__) QUERY_URL = 'https://api.osv.dev/v1/querybatch' -VULN_URL = 'https://api.osv.dev/v1/vulns' +GCS_BUCKET_URL = 'https://osv-vulnerabilities.storage.googleapis.com/' BATCH_SIZE = 1000 DEFAULT_ECOSYSTEM = 'Debian:12' +class osv_ecosystems_t: + """Fetches and caches the list of ecosystems supported by OSV.""" + + _cached: ClassVar[Optional[list[osv_ecosystem_t]]] = None + + @classmethod + async def fetch(cls, force: bool = False) -> list[osv_ecosystem_t]: + if cls._cached is not None and not force: + return cls._cached + + loop = asyncio.get_running_loop() + ecosystems: list[osv_ecosystem_t] = [] + marker = '' + seen: set[str] = set() + + while True: + url = '%s?delimiter=/&prefix=&marker=%s' % (GCS_BUCKET_URL, marker) + logger.debug(dict(msg='fetching osv ecosystems page', marker=marker)) + + def _do(u: str = url) -> str: + resp = urllib.request.urlopen(u, timeout=30) + return resp.read().decode('utf-8') + + raw = await loop.run_in_executor(None, _do) + + root = ET.fromstring(raw) + ns = '{http://doc.s3.amazonaws.com/2006-03-01}' + + new_count = 0 + for prefix_el in root.findall('.//' + ns + 'Prefix'): + if prefix_el.text is not None: + name = prefix_el.text.rstrip('/') + if name and name not in ('all', 'icons', '[EMPTY]') and name not in seen: + ecosystems.append(osv_ecosystem_t(name=name)) + seen.add(name) + new_count += 1 + marker = name + + is_truncated_el = root.find(ns + 'IsTruncated') + is_truncated = is_truncated_el is not None and is_truncated_el.text == 'true' + + if not is_truncated or new_count == 0: + break + + cls._cached = ecosystems + logger.info(dict(msg='osv ecosystems fetched', count=len(ecosystems))) + return ecosystems + + @classmethod + async def names(cls, force: bool = False) -> set[str]: + return {e.name for e in await cls.fetch(force=force)} + + @classmethod + def reset_cache(cls) -> None: + cls._cached = None + + # known categories for status reporting + LINUX_DISTROS: ClassVar[set[str]] = { + 'Debian', 'Debian:12', 'Debian:13', + 'Ubuntu', 'Ubuntu:24.04:LTS', + 'Alpine', 'Alpine:v3.20', 'Alpine:v3.21', + 'Wolfi', 'Chainguard', + 'Rocky Linux', 'AlmaLinux', + 'Red Hat', 'SUSE', 'openSUSE', + 'Linux', + } + + LANGUAGE_ECOSYSTEMS: ClassVar[set[str]] = { + 'PyPI', 'npm', 'crates.io', 'Go', 'RubyGems', + 'Hackage', 'Maven', 'NuGet', 'Packagist', + 'Hex', 'Pub', 'CRAN', + } + + @classmethod + async def to_ecosystem_rows(cls, now: datetime.datetime) -> list[cve_ecosystem_row_t]: + """Fetch ecosystems and return typed rows with categories.""" + names = await cls.names(force=True) + rows: list[cve_ecosystem_row_t] = [] + for name in sorted(names): + if name in cls.LINUX_DISTROS: + cat = 'linux_distro' + elif name in cls.LANGUAGE_ECOSYSTEMS: + cat = 'language' + else: + cat = 'other' + rows.append(cve_ecosystem_row_t(name=name, category=cat, synced_at=now)) + return rows + + class osv_backend_t(cve_backend_t): def __init__(self, ecosystem: str = DEFAULT_ECOSYSTEM) -> None: self._ecosystem = ecosystem @@ -43,13 +137,11 @@ class osv_backend_t(cve_backend_t): def source(self) -> cve_source_t: return cve_source_t.osv - def estimate_sync( + async def estimate_sync( self, since: Optional[str] = None, months: Optional[int] = None, ) -> cve_sync_estimate_t: - # OSV doesn't support time-range queries for bulk. - # Estimation not meaningful without a package list. return cve_sync_estimate_t( source=cve_source_t.osv, num_fetches=0, @@ -58,26 +150,21 @@ class osv_backend_t(cve_backend_t): available=False, ) - def sync( + async def sync( self, since: Optional[str] = None, months: Optional[int] = None, on_progress: Optional[SyncProgressCallback] = None, ) -> list[cve_entry_t]: - # OSV requires package names to query. A blind sync isn't supported. - # Use query_packages() instead. logger.warning(dict(msg='osv sync requires explicit package list, use query_packages()')) return [] - def query_packages( + async def query_packages( self, packages: list[tuple[str, str]], on_progress: Optional[SyncProgressCallback] = None, ) -> list[cve_entry_t]: - """Query OSV for a list of (name, version) tuples. - - Uses batch API. Returns unified CVE entries. - """ + """Query OSV for a list of (name, version) tuples.""" entries: list[cve_entry_t] = [] total = len(packages) @@ -97,15 +184,7 @@ class osv_backend_t(cve_backend_t): ] ) - req = urllib.request.Request( - QUERY_URL, - data=request.model_dump_json().encode('utf-8'), - headers={'Content-Type': 'application/json'}, - method='POST', - ) - - resp = urllib.request.urlopen(req, timeout=30) - raw = resp.read() + raw = await self._post_json(QUERY_URL, request.model_dump_json().encode('utf-8')) batch_resp = pydantic.TypeAdapter(osv_batch_response_t).validate_json(raw) for i, result in enumerate(batch_resp.results): diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv_types.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv_types.py index 89d3e45..8b11da7 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv_types.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/osv_types.py @@ -28,3 +28,9 @@ class osv_query_t(pydantic.BaseModel): class osv_batch_request_t(pydantic.BaseModel): queries: list[osv_query_t] + + +class osv_ecosystem_t(pydantic.BaseModel): + """One ecosystem entry parsed from GCS bucket listing.""" + + name: str diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/types.py b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/types.py index cac56e7..5249fda 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/types.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/apps/cve/types.py @@ -1,10 +1,19 @@ """CVE unified types — exposed to users of the cve module.""" +import datetime import enum +from typing import Optional + import pydantic +class output_format_t(enum.StrEnum): + txt = 'txt' + json = 'json' + yaml = 'yaml' + + class cve_source_t(enum.Enum): arch_tracker = 'arch_tracker' nvd = 'nvd' @@ -80,3 +89,26 @@ class cve_check_result_t(pydantic.BaseModel): title: str version_fixed: str = '' status: cve_status_t = cve_status_t.unknown + + +class cve_ecosystem_row_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(frozen=True) + + name: str + category: str = 'other' + synced_at: Optional[datetime.datetime] = None + + +class cve_upsert_result_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(frozen=True) + + received: int = 0 + inserted: int = 0 + updated: int = 0 + + +class cve_date_range_t(pydantic.BaseModel): + model_config = pydantic.ConfigDict(frozen=True) + + earliest: Optional[datetime.datetime] = None + latest: Optional[datetime.datetime] = None diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/cli/main.py b/python/online/fxreader/pr34/commands_typed/archlinux/cli/main.py index 80229df..9be8c31 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/cli/main.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/cli/main.py @@ -20,15 +20,21 @@ class Command(enum.Enum): download = 'download' archive = 'archive' diff = 'diff' + cve = 'cve' def main(argv: Optional[list[str]] = None) -> int: if argv is None: argv = sys.argv[1:] + from online.fxreader.pr34.commands_typed import argparse as pr34_argparse + + prog = 'online-fxreader-pr34-archlinux' + command_values = [o.value for o in Command] + parser = argparse.ArgumentParser( - prog='online-fxreader-pr34-archlinux', - description='Arch Linux package management tools', + prog=prog, + description='Arch Linux package management tools. Commands: %s' % ', '.join(command_values), ) parser.add_argument( '--log-level', @@ -37,13 +43,15 @@ def main(argv: Optional[list[str]] = None) -> int: default='INFO', help='log level (default: INFO)', ) - parser.add_argument( - 'command', - choices=[o.value for o in Command], - ) - options, args = parser.parse_known_args(argv) - options.command = Command(options.command) + options, args = pr34_argparse.parse_args(parser, argv, stop_at=command_values) + + if len(args) == 0 or args[0] not in command_values: + parser.print_help() + return 1 + + options.command = Command(args[0]) + args = args[1:] pr34_logging.setup( level=getattr(logging, options.log_level), @@ -71,6 +79,10 @@ def main(argv: Optional[list[str]] = None) -> int: from . import diff return diff.main(args) + elif options.command is Command.cve: + from ..apps.cve.cli import main as cve_main + + return cve_main(args, prog='%s cve' % parser.prog) else: raise NotImplementedError diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_cve.py b/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_cve.py index 5f50d7a..df5f685 100644 --- a/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_cve.py +++ b/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_cve.py @@ -221,8 +221,9 @@ class TestCveDb(unittest.TestCase): version_fixed='9.0.1225-1', ), ] - count = self.db.upsert_entries(entries) - self.assertEqual(count, 2) + result = self.db.upsert_entries(entries) + self.assertEqual(result.received, 2) + self.assertGreater(result.inserted, 0) results = self.db.query_by_product('vim') self.assertEqual(len(results), 2)