[+] CVE backends async, CLI with output formats, status density, ecosystem tracking

1. convert all CVE backends to async (run_in_executor + urllib);
  2. base class provides _fetch_url, _post_json, _head_content_length async helpers;
  3. NVD rate-limit delays use asyncio.sleep;
  4. add --timeout to cve sync via asyncio.wait_for;
  5. add cve CLI as apps/cve/cli.py with cve_cli_t class, routed from cli/main.py;
  6. main.py uses pr34 parse_args stop_at for proper -h passthrough;
  7. add cli_types.py with pydantic models for sync/status/check results;
  8. add cli_render.py with txt/json/yaml output renderers;
  9. add --format txt|json|yaml flag on cve parser;
  10. status shows per-source: entries, last_sync (datetime), fetch_range, density;
  11. density computed from date_published (NVD), CVE ID year (arch_tracker);
  12. cve_db_t schema v2: add cve_osv_ecosystems table;
  13. osv_ecosystems_t fetches from GCS, fixes infinite loop (IsTruncated check);
  14. status shows ecosystem counts: total/supported/unsupported per package_map_t;
  15. sync stores ecosystems in sqlite, status reads from db only;
  16. all print() replaced with logger or stdout render;
  17. arch_tracker sync: clean fetch/parsed/ingested logging, no per-100 spam;
  18. upsert_entries returns cve_upsert_result_t with received/inserted counts;
  19. add cve_date_range_t, cve_upsert_result_t, output_format_t, date_source_t types;
This commit is contained in:
LLM 2026-04-17 09:00:00 +00:00
parent 2dac844087
commit 4c681b6018
12 changed files with 1013 additions and 97 deletions

@ -1,12 +1,10 @@
"""Arch Linux Security Tracker backend.
"""Arch Linux Security Tracker backend (async).
Source: https://security.archlinux.org/issues/all.json
No auth. Full dump ~900KB. Always fetches entire dataset (no incremental API).
Uses pydantic TypeAdapter to parse the JSON response directly.
No auth required. Full dump ~900KB.
"""
import logging
import urllib.request
from typing import Optional
@ -47,12 +45,12 @@ class arch_tracker_backend_t(cve_backend_t):
def source(self) -> cve_source_t:
return cve_source_t.arch_tracker
def estimate_sync(
async def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
) -> cve_sync_estimate_t:
content_length = self._head_content_length(ISSUES_URL)
content_length = await self._head_content_length(ISSUES_URL)
return cve_sync_estimate_t(
source=cve_source_t.arch_tracker,
num_fetches=1,
@ -61,23 +59,22 @@ class arch_tracker_backend_t(cve_backend_t):
available=content_length > 0,
)
def sync(
async def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
logger.info(dict(msg='fetching arch security tracker', url=ISSUES_URL))
resp = urllib.request.urlopen(ISSUES_URL, timeout=30)
raw_bytes = resp.read()
logger.info(dict(msg='fetch', source='arch_tracker', url=ISSUES_URL))
raw_bytes = await self._fetch_url(ISSUES_URL)
avgs = _avg_list_adapter.validate_json(raw_bytes)
entries: list[cve_entry_t] = []
total = len(avgs)
logger.info(dict(msg='fetched', source='arch_tracker', avgs=len(avgs), bytes=len(raw_bytes)))
for i, avg in enumerate(avgs):
entries: list[cve_entry_t] = []
for avg in avgs:
severity = SEVERITY_MAP.get(avg.severity, cve_severity_t.unknown)
status = STATUS_MAP.get(avg.status, cve_status_t.unknown)
@ -96,11 +93,5 @@ class arch_tracker_backend_t(cve_backend_t):
)
)
if on_progress is not None and (i + 1) % 100 == 0:
on_progress(i + 1, total)
if on_progress is not None:
on_progress(total, total)
logger.info(dict(msg='arch tracker sync done', avgs=total, entries=len(entries)))
logger.info(dict(msg='parsed', source='arch_tracker', avgs=len(avgs), entries=len(entries)))
return entries

@ -1,6 +1,7 @@
"""Abstract backend interface for CVE data sources."""
import abc
import asyncio
import logging
import urllib.request
@ -24,32 +25,59 @@ class cve_backend_t(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
def estimate_sync(
async def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
) -> cve_sync_estimate_t:
"""Estimate sync size via HTTP HEAD or lightweight API call."""
raise NotImplementedError
@abc.abstractmethod
def sync(
async def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
"""Fetch CVE entries from the source."""
raise NotImplementedError
@staticmethod
def _head_content_length(url: str) -> int:
"""HTTP HEAD to get Content-Length. Returns 0 if unavailable."""
async def _head_content_length(url: str) -> int:
loop = asyncio.get_running_loop()
try:
req = urllib.request.Request(url, method='HEAD')
resp = urllib.request.urlopen(req, timeout=10)
cl = resp.headers.get('Content-Length', '0')
return int(cl)
def _do() -> int:
req = urllib.request.Request(url, method='HEAD')
resp = urllib.request.urlopen(req, timeout=10)
cl = resp.headers.get('Content-Length', '0')
return int(cl)
return await loop.run_in_executor(None, _do)
except Exception:
logger.debug(dict(msg='HEAD failed', url=url))
return 0
@staticmethod
async def _fetch_url(url: str, timeout: int = 30) -> bytes:
loop = asyncio.get_running_loop()
def _do() -> bytes:
resp = urllib.request.urlopen(url, timeout=timeout)
return resp.read()
return await loop.run_in_executor(None, _do)
@staticmethod
async def _post_json(url: str, data: bytes, timeout: int = 30) -> bytes:
loop = asyncio.get_running_loop()
def _do() -> bytes:
req = urllib.request.Request(
url,
data=data,
headers={'Content-Type': 'application/json'},
method='POST',
)
resp = urllib.request.urlopen(req, timeout=timeout)
return resp.read()
return await loop.run_in_executor(None, _do)

@ -0,0 +1,348 @@
"""CVE CLI: sync, status, check subcommands."""
import argparse
import asyncio
import datetime
import enum
import logging
import pathlib
from typing import Optional
from .base import cve_backend_t
from .cli_render import render
from .cli_types import (
check_cve_entry_t,
check_result_t,
date_source_t,
status_density_t,
status_ecosystem_category_t,
status_ecosystems_t,
status_result_t,
status_source_t,
sync_result_t,
sync_source_result_t,
)
from .db import cve_db_t
from .types import (
cve_severity_t,
cve_source_t,
output_format_t,
)
logger = logging.getLogger(__name__)
class cve_action_t(enum.StrEnum):
sync = 'sync'
status = 'status'
check = 'check'
class cve_cli_t:
@staticmethod
def _get_backend(source: cve_source_t, nvd_api_key: Optional[str] = None) -> cve_backend_t:
if source is cve_source_t.arch_tracker:
from .arch_tracker import arch_tracker_backend_t
return arch_tracker_backend_t()
elif source is cve_source_t.nvd:
from .nvd import nvd_backend_t
return nvd_backend_t(api_key=nvd_api_key)
elif source is cve_source_t.osv:
from .osv import osv_backend_t
return osv_backend_t()
else:
raise ValueError('unknown source: %s' % source)
@staticmethod
async def _async_sync(
cache_dir: pathlib.Path,
source: str,
since: Optional[str] = None,
months: Optional[int] = None,
nvd_api_key: Optional[str] = None,
) -> sync_result_t:
db_path = cache_dir / 'archlinux_cache.db'
db = cve_db_t(db_path)
sources: list[cve_source_t] = []
if source == 'all':
sources = [cve_source_t.arch_tracker, cve_source_t.nvd]
else:
sources = [cve_source_t(source)]
result = sync_result_t()
for src in sources:
backend = cve_cli_t._get_backend(src, nvd_api_key=nvd_api_key)
estimate = await backend.estimate_sync(since=since, months=months)
logger.info(dict(
msg='sync estimate',
source=src.value,
num_fetches=estimate.num_fetches,
content_length=estimate.content_length,
available=estimate.available,
))
if not estimate.available:
logger.warning(dict(msg='sync not available', source=src.value))
result.sources.append(sync_source_result_t(source=src.value, status='not available'))
continue
src_val = src.value
def on_progress(done: int, total: int) -> None:
logger.info(dict(msg='sync progress', source=src_val, done=done, total=total))
entries = await backend.sync(since=since, months=months, on_progress=on_progress)
upsert = db.upsert_entries(entries)
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
db.update_sync_meta(src, last_sync=now, entry_count=db.count_entries(src))
logger.info(dict(
msg='ingested',
source=src.value,
received=upsert.received,
in_db=upsert.inserted,
))
result.sources.append(sync_source_result_t(
source=src.value, synced=upsert.received, total_in_db=upsert.inserted,
))
if source in ('all', 'osv'):
from .osv import osv_ecosystems_t
now_dt = datetime.datetime.now(datetime.timezone.utc)
eco_rows = await osv_ecosystems_t.to_ecosystem_rows(now_dt)
eco_count = db.upsert_ecosystems(eco_rows)
logger.info(dict(msg='osv ecosystems synced', count=eco_count))
result.osv_ecosystems = eco_count
return result
@staticmethod
def sync(
cache_dir: pathlib.Path,
source: str,
fmt: output_format_t = output_format_t.txt,
since: Optional[str] = None,
months: Optional[int] = None,
nvd_api_key: Optional[str] = None,
timeout: Optional[int] = None,
) -> int:
coro = cve_cli_t._async_sync(
cache_dir=cache_dir,
source=source,
since=since,
months=months,
nvd_api_key=nvd_api_key,
)
if timeout is not None:
async def _with_timeout() -> sync_result_t:
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
logger.warning(dict(msg='sync timeout', timeout=timeout))
return sync_result_t(error='timeout after %ds' % timeout)
result = asyncio.run(_with_timeout())
else:
result = asyncio.run(coro)
render(result, fmt)
return 2 if result.error is not None else 0
@staticmethod
def status(cache_dir: pathlib.Path, fmt: output_format_t = output_format_t.txt) -> int:
db_path = cache_dir / 'archlinux_cache.db'
db = cve_db_t(db_path)
now = datetime.datetime.now(datetime.timezone.utc)
result = status_result_t()
for src in cve_source_t:
s = db.get_sync_status(src)
last_sync_dt: Optional[datetime.datetime] = None
if s.last_sync:
try:
last_sync_dt = datetime.datetime.fromisoformat(s.last_sync)
except ValueError:
pass
# fetch range: what modification-date window we actually synced
fr = db.fetch_range(src)
fetch_start = fr.earliest.date() if fr.earliest else None
fetch_end = fr.latest.date() if fr.latest else None
# density: count entries by time bucket
if src is cve_source_t.nvd:
d = db.density(src, 'date_published', now)
density = status_density_t(
last_1mo=d['last_1mo'], last_3mo=d['last_3mo'],
last_1yr=d['last_1yr'], total=d['total'],
date_source=date_source_t.published,
)
elif src is cve_source_t.arch_tracker:
d = db.density_by_cve_year(src, now)
density = status_density_t(
last_1mo=d['last_1mo'], last_3mo=d['last_3mo'],
last_1yr=d['last_1yr'], total=d['total'],
date_source=date_source_t.cve_id_year,
)
else:
density = status_density_t(total=s.entry_count)
result.sources.append(status_source_t(
source=src.value,
entries=s.entry_count,
last_sync=last_sync_dt,
fetch_range_start=fetch_start,
fetch_range_end=fetch_end,
density=density,
))
result.total_entries = db.count_entries()
# ecosystems with supported/unsupported breakdown
eco_count = db.count_ecosystems()
if eco_count > 0:
by_cat = db.get_ecosystems_by_category()
all_eco_names: set[str] = set()
cat_models: dict[str, status_ecosystem_category_t] = {}
for cat, names in by_cat.items():
cat_models[cat] = status_ecosystem_category_t(count=len(names), names=names)
all_eco_names.update(names)
from .package_mapping import package_map_t
tested = package_map_t.constants_t.tested_ecosystems
mapped_ecosystems = all_eco_names & tested
unsupported = sorted(all_eco_names - tested)
result.osv_ecosystems = status_ecosystems_t(
total=eco_count,
supported=len(mapped_ecosystems),
unsupported=len(unsupported),
by_category=cat_models,
unsupported_names=unsupported,
)
else:
result.osv_ecosystems = status_ecosystems_t(total=0)
render(result, fmt)
return 0
@staticmethod
def check(
cache_dir: pathlib.Path,
requirements: pathlib.Path,
source: str = 'all',
fmt: output_format_t = output_format_t.txt,
) -> int:
from .checker import check_packages
db_path = cache_dir / 'archlinux_cache.db'
db = cve_db_t(db_path)
packages: list[tuple[str, str]] = []
for line in requirements.read_text().splitlines():
line = line.strip()
if line == '' or line.startswith('#'):
continue
if ' #' in line:
line = line.split(' #', 1)[0].strip()
parts = line.split()
pkg_spec = parts[0]
if '==' in pkg_spec:
name, version = pkg_spec.split('==', 1)
packages.append((name, version))
sources_filter: Optional[list[cve_source_t]] = None
if source != 'all':
sources_filter = [cve_source_t(source)]
results = check_packages(db, packages, sources=sources_filter)
result = check_result_t(
packages_checked=len(packages),
cves_found=len(results),
affected_packages=len(set(r.package for r in results)),
)
for r in results:
result.by_severity.setdefault(r.severity.value, []).append(
check_cve_entry_t(
cve_id=r.cve_id,
package=r.package,
version=r.version,
score=r.score,
title=r.title,
version_fixed=r.version_fixed,
)
)
render(result, fmt)
has_severe = any(
r.severity in (cve_severity_t.critical, cve_severity_t.high)
for r in results
)
return 1 if has_severe else 0
@classmethod
def main(cls, args: list[str], prog: str = 'cve') -> int:
parser = argparse.ArgumentParser(prog=prog)
parser.add_argument('--cache-dir', dest='cache_dir', required=True)
parser.add_argument('--format', dest='output_format', choices=[f.value for f in output_format_t], default='txt')
subparsers = parser.add_subparsers(dest='action')
sync_p = subparsers.add_parser(cve_action_t.sync, help='sync CVE data from remote sources')
sync_p.add_argument('--source', choices=['all', 'arch_tracker', 'nvd', 'osv'], default='arch_tracker')
sync_p.add_argument('--since', default=None, help='ISO date for incremental sync')
sync_p.add_argument('--months', type=int, default=None, help='sync last N months')
sync_p.add_argument('--nvd-api-key', dest='nvd_api_key', default=None)
sync_p.add_argument('--timeout', type=int, default=None, help='max seconds for sync')
subparsers.add_parser(cve_action_t.status, help='show CVE sync status')
check_p = subparsers.add_parser(cve_action_t.check, help='check packages against cached CVEs')
check_p.add_argument('--source', choices=['all', 'arch_tracker', 'nvd', 'osv'], default='all')
check_p.add_argument('-r', dest='requirements', required=True)
opts = parser.parse_args(args)
fmt = output_format_t(opts.output_format)
if opts.action == cve_action_t.sync:
return cls.sync(
cache_dir=pathlib.Path(opts.cache_dir),
source=opts.source,
fmt=fmt,
since=opts.since,
months=opts.months,
nvd_api_key=opts.nvd_api_key,
timeout=opts.timeout,
)
elif opts.action == cve_action_t.status:
return cls.status(cache_dir=pathlib.Path(opts.cache_dir), fmt=fmt)
elif opts.action == cve_action_t.check:
return cls.check(
cache_dir=pathlib.Path(opts.cache_dir),
requirements=pathlib.Path(opts.requirements),
source=opts.source,
fmt=fmt,
)
else:
parser.print_help()
return 1
def main(args: list[str], prog: str = 'cve') -> int:
return cve_cli_t.main(args, prog=prog)

@ -0,0 +1,145 @@
"""Render CVE CLI result models to txt, json, yaml on stdout."""
import json
import logging
import sys
import pydantic
from .cli_types import (
check_result_t,
status_result_t,
sync_result_t,
)
from .types import output_format_t
logger = logging.getLogger(__name__)
def render(model: pydantic.BaseModel, fmt: output_format_t) -> None:
"""Render a pydantic model to stdout in the requested format."""
if fmt is output_format_t.json:
_render_json(model)
elif fmt is output_format_t.yaml:
_render_yaml(model)
else:
_render_txt(model)
def _render_json(model: pydantic.BaseModel) -> None:
sys.stdout.write(model.model_dump_json(indent=2) + '\n')
def _render_yaml(model: pydantic.BaseModel) -> None:
try:
import yaml
sys.stdout.write(yaml.dump(
model.model_dump(mode='json'),
default_flow_style=False,
sort_keys=False,
))
except ImportError:
logger.warning(dict(msg='pyyaml not installed, falling back to json'))
_render_json(model)
def _render_txt(model: pydantic.BaseModel) -> None:
if isinstance(model, sync_result_t):
_render_sync_txt(model)
elif isinstance(model, status_result_t):
_render_status_txt(model)
elif isinstance(model, check_result_t):
_render_check_txt(model)
else:
sys.stdout.write(model.model_dump_json(indent=2) + '\n')
def _render_sync_txt(r: sync_result_t) -> None:
if r.error is not None:
sys.stdout.write('error: %s\n' % r.error)
return
for s in r.sources:
if s.status != 'ok':
sys.stdout.write('%-15s %s\n' % (s.source, s.status))
else:
sys.stdout.write('%-15s synced=%d total=%d\n' % (s.source, s.synced, s.total_in_db))
if r.osv_ecosystems > 0:
sys.stdout.write('osv ecosystems: %d\n' % r.osv_ecosystems)
def _render_status_txt(r: status_result_t) -> None:
import datetime
now = datetime.datetime.now(datetime.timezone.utc)
for s in r.sources:
# sync age
if s.last_sync is not None:
delta = now - s.last_sync
if delta.days == 0:
age = 'today'
elif delta.days == 1:
age = '1d ago'
else:
age = '%dd ago' % delta.days
sync_str = '%s (%s)' % (s.last_sync.strftime('%Y-%m-%d %H:%M'), age)
else:
sync_str = 'never'
sys.stdout.write('%-15s entries=%-6d synced=%s\n' % (s.source, s.entries, sync_str))
# fetch range (modification dates)
if s.fetch_range_start is not None and s.fetch_range_end is not None:
sys.stdout.write(' fetched: [%s, %s]\n' % (
s.fetch_range_start.strftime('%Y-%m-%d'),
s.fetch_range_end.strftime('%Y-%m-%d'),
))
# density
d = s.density
if d.total > 0:
src_label = ' (by %s)' % d.date_source if d.date_source is not None else ''
sys.stdout.write(' density%s: 1mo=%d 3mo=%d 1yr=%d total=%d\n' % (
src_label, d.last_1mo, d.last_3mo, d.last_1yr, d.total,
))
sys.stdout.write('total: %d entries\n' % r.total_entries)
eco = r.osv_ecosystems
if eco.total > 0:
sys.stdout.write('\nosv ecosystems: %d total, %d supported, %d unsupported\n' % (
eco.total, eco.supported, eco.unsupported,
))
for cat, info in eco.by_category.items():
pct = info.count / eco.total * 100 if eco.total > 0 else 0
sys.stdout.write(' %s: %d (%.0f%%)\n' % (cat, info.count, pct))
if len(eco.unsupported_names) > 0:
sys.stdout.write(' unsupported: %s\n' % ', '.join(eco.unsupported_names[:20]))
if len(eco.unsupported_names) > 20:
sys.stdout.write(' ... +%d more\n' % (len(eco.unsupported_names) - 20))
else:
sys.stdout.write('\nosv ecosystems: not synced\n')
def _render_check_txt(r: check_result_t) -> None:
if r.cves_found == 0:
sys.stdout.write('no CVEs found (%d packages checked)\n' % r.packages_checked)
return
severity_order = ['critical', 'high', 'medium', 'low', 'unknown']
for sev in severity_order:
group = r.by_severity.get(sev, [])
if len(group) == 0:
continue
sys.stdout.write('\n[%s] %d:\n' % (sev.upper(), len(group)))
for e in group:
fix = ' fix=%s' % e.version_fixed if e.version_fixed else ''
sys.stdout.write(' %-20s %s==%s score=%.1f%s\n' % (
e.cve_id, e.package, e.version, e.score, fix,
))
sys.stdout.write('\n%d CVE(s), %d package(s) affected\n' % (r.cves_found, r.affected_packages))

@ -0,0 +1,80 @@
"""Pydantic models for CVE CLI command results."""
import datetime
import enum
from typing import Optional
import pydantic
class sync_source_result_t(pydantic.BaseModel):
source: str
status: str = 'ok'
synced: int = 0
total_in_db: int = 0
class sync_result_t(pydantic.BaseModel):
sources: list[sync_source_result_t] = pydantic.Field(default_factory=list)
osv_ecosystems: int = 0
error: Optional[str] = None
class date_source_t(enum.StrEnum):
published = 'published'
modified = 'modified'
cve_id_year = 'cve_id_year'
nvd_cross_ref = 'nvd_cross_ref'
class status_density_t(pydantic.BaseModel):
last_1mo: int = 0
last_3mo: int = 0
last_1yr: int = 0
total: int = 0
date_source: Optional[date_source_t] = None
class status_source_t(pydantic.BaseModel):
source: str
entries: int = 0
last_sync: Optional[datetime.datetime] = None
fetch_range_start: Optional[datetime.date] = None
fetch_range_end: Optional[datetime.date] = None
density: status_density_t = pydantic.Field(default_factory=status_density_t)
class status_ecosystem_category_t(pydantic.BaseModel):
count: int = 0
names: list[str] = pydantic.Field(default_factory=list)
class status_ecosystems_t(pydantic.BaseModel):
total: int = 0
supported: int = 0
unsupported: int = 0
by_category: dict[str, status_ecosystem_category_t] = pydantic.Field(default_factory=dict)
unsupported_names: list[str] = pydantic.Field(default_factory=list)
class status_result_t(pydantic.BaseModel):
sources: list[status_source_t] = pydantic.Field(default_factory=list)
total_entries: int = 0
osv_ecosystems: status_ecosystems_t = pydantic.Field(default_factory=status_ecosystems_t)
class check_cve_entry_t(pydantic.BaseModel):
cve_id: str
package: str
version: str
score: float
title: str
version_fixed: str = ''
class check_result_t(pydantic.BaseModel):
packages_checked: int = 0
cves_found: int = 0
affected_packages: int = 0
by_severity: dict[str, list[check_cve_entry_t]] = pydantic.Field(default_factory=dict)

@ -1,5 +1,6 @@
"""CVE ORM module — sqlite tables for cached CVE data."""
import datetime
import json
import logging
import pathlib
@ -11,15 +12,19 @@ import pydantic
from ..orm.registry import orm_module_t, orm_registry_t
from .types import (
cve_date_range_t,
cve_ecosystem_row_t,
cve_entry_t,
cve_source_t,
cve_sync_status_t,
cve_upsert_result_t,
)
logger = logging.getLogger(__name__)
_entry_list_adapter = pydantic.TypeAdapter(list[cve_entry_t])
_sync_status_list_adapter = pydantic.TypeAdapter(list[cve_sync_status_t])
_ecosystem_list_adapter = pydantic.TypeAdapter(list[cve_ecosystem_row_t])
def _rows_to_dicts(cur: sqlite3.Cursor) -> list[dict[str, Any]]:
@ -34,7 +39,7 @@ class cve_db_t(orm_module_t):
@classmethod
def schema_version(cls) -> int:
return 1
return 2
@classmethod
def migrate(cls, conn: sqlite3.Connection, from_version: int, to_version: int) -> None:
@ -77,6 +82,16 @@ class cve_db_t(orm_module_t):
""")
conn.commit()
if from_version < 2:
conn.executescript("""
CREATE TABLE IF NOT EXISTS cve_osv_ecosystems (
name TEXT PRIMARY KEY,
category TEXT NOT NULL DEFAULT 'other',
synced_at TEXT NOT NULL DEFAULT ''
);
""")
conn.commit()
def __init__(self, db_path_or_conn: 'pathlib.Path | sqlite3.Connection') -> None:
if isinstance(db_path_or_conn, sqlite3.Connection):
super().__init__(db_path_or_conn)
@ -84,9 +99,10 @@ class cve_db_t(orm_module_t):
registry = orm_registry_t.get(db_path_or_conn)
super().__init__(registry.conn)
def upsert_entries(self, entries: list[cve_entry_t]) -> int:
def upsert_entries(self, entries: list[cve_entry_t]) -> cve_upsert_result_t:
cur = self._conn.cursor()
count = 0
inserted = 0
updated = 0
for e in entries:
cur.execute(
'''
@ -120,9 +136,14 @@ class cve_db_t(orm_module_t):
e.status.value,
),
)
count += 1
self._conn.commit()
return count
# count actual rows per source after upsert
actual = self.count_entries(entries[0].source) if len(entries) > 0 else 0
return cve_upsert_result_t(
received=len(entries),
inserted=actual,
updated=0,
)
def upsert_detail(self, cve_id: str, source: cve_source_t, raw: object) -> None:
self._conn.execute(
@ -192,6 +213,142 @@ class cve_db_t(orm_module_t):
)
return _entry_list_adapter.validate_python(_rows_to_dicts(cur))
def date_range(self, source: cve_source_t) -> tuple[str, str]:
"""Return (earliest_date, latest_date) of entries for a source. Empty strings if no data."""
row = self._conn.execute(
'SELECT MIN(date_published), MAX(date_published) FROM cve_entries '
'WHERE source = ? AND date_published != ""',
(source.value,),
).fetchone()
if row is None or row[0] is None:
# fallback to date_modified
row = self._conn.execute(
'SELECT MIN(date_modified), MAX(date_modified) FROM cve_entries '
'WHERE source = ? AND date_modified != ""',
(source.value,),
).fetchone()
if row is None or row[0] is None:
return ('', '')
return (str(row[0]), str(row[1]))
def date_range(self, source: cve_source_t) -> cve_date_range_t:
"""Return earliest/latest datetime of entries for a source.
Tries date_published first, then date_modified, then extracts
year from CVE ID (CVE-YYYY-*) as fallback.
"""
for col in ['date_published', 'date_modified']:
row = self._conn.execute(
'SELECT MIN(%s), MAX(%s) FROM cve_entries '
'WHERE source = ? AND %s != ""' % (col, col, col),
(source.value,),
).fetchone()
if row is not None and row[0] is not None:
try:
earliest = datetime.datetime.fromisoformat(str(row[0]).replace('Z', '+00:00'))
latest = datetime.datetime.fromisoformat(str(row[1]).replace('Z', '+00:00'))
return cve_date_range_t(earliest=earliest, latest=latest)
except ValueError:
continue
# fallback: extract year from CVE ID
row = self._conn.execute(
"SELECT MIN(SUBSTR(cve_id, 5, 4)), MAX(SUBSTR(cve_id, 5, 4)) "
"FROM cve_entries WHERE source = ? AND cve_id LIKE 'CVE-%'",
(source.value,),
).fetchone()
if row is not None and row[0] is not None:
try:
earliest = datetime.datetime(int(row[0]), 1, 1, tzinfo=datetime.timezone.utc)
latest = datetime.datetime(int(row[1]), 12, 31, tzinfo=datetime.timezone.utc)
return cve_date_range_t(earliest=earliest, latest=latest)
except (ValueError, TypeError):
pass
return cve_date_range_t()
def covered_months(self, source: cve_source_t) -> list[str]:
"""Return sorted list of YYYY-MM strings that have entries for this source.
Uses date_published, date_modified, or CVE ID year as fallback.
"""
months: set[str] = set()
for col in ['date_published', 'date_modified']:
rows = self._conn.execute(
"SELECT DISTINCT SUBSTR(%s, 1, 7) FROM cve_entries "
"WHERE source = ? AND %s != '' AND LENGTH(%s) >= 7" % (col, col, col),
(source.value,),
).fetchall()
if len(rows) > 0:
for r in rows:
if r[0] is not None:
months.add(str(r[0]))
return sorted(months)
# fallback: CVE ID year -> one entry per year as YYYY-01
rows = self._conn.execute(
"SELECT DISTINCT SUBSTR(cve_id, 5, 4) FROM cve_entries "
"WHERE source = ? AND cve_id LIKE 'CVE-%'",
(source.value,),
).fetchall()
for r in rows:
if r[0] is not None:
months.add('%s-01' % str(r[0]))
return sorted(months)
def fetch_range(self, source: cve_source_t) -> cve_date_range_t:
"""Return min/max of date_modified — represents what sync window we fetched."""
row = self._conn.execute(
'SELECT MIN(date_modified), MAX(date_modified) FROM cve_entries '
'WHERE source = ? AND date_modified != ""',
(source.value,),
).fetchone()
if row is None or row[0] is None:
return cve_date_range_t()
try:
earliest = datetime.datetime.fromisoformat(str(row[0]).replace('Z', '+00:00'))
latest = datetime.datetime.fromisoformat(str(row[1]).replace('Z', '+00:00'))
return cve_date_range_t(earliest=earliest, latest=latest)
except ValueError:
return cve_date_range_t()
def density(self, source: cve_source_t, date_col: str, now: datetime.datetime) -> dict[str, int]:
"""Count entries in time buckets. date_col is 'date_published' or a SQL expression."""
result: dict[str, int] = {}
for label, months_ago in [('last_1mo', 1), ('last_3mo', 3), ('last_1yr', 12)]:
cutoff = (now - datetime.timedelta(days=months_ago * 30)).strftime('%Y-%m-%d')
row = self._conn.execute(
'SELECT COUNT(*) FROM cve_entries WHERE source = ? AND %s >= ?' % date_col,
(source.value, cutoff),
).fetchone()
result[label] = row[0] if row else 0
row = self._conn.execute(
'SELECT COUNT(*) FROM cve_entries WHERE source = ?',
(source.value,),
).fetchone()
result['total'] = row[0] if row else 0
return result
def density_by_cve_year(self, source: cve_source_t, now: datetime.datetime) -> dict[str, int]:
"""Count entries by year extracted from CVE ID."""
result: dict[str, int] = {}
for label, months_ago in [('last_1mo', 1), ('last_3mo', 3), ('last_1yr', 12)]:
cutoff_year = (now - datetime.timedelta(days=months_ago * 30)).year
row = self._conn.execute(
"SELECT COUNT(*) FROM cve_entries WHERE source = ? "
"AND cve_id LIKE 'CVE-%%' AND CAST(SUBSTR(cve_id, 5, 4) AS INTEGER) >= ?",
(source.value, cutoff_year),
).fetchone()
result[label] = row[0] if row else 0
row = self._conn.execute(
'SELECT COUNT(*) FROM cve_entries WHERE source = ?',
(source.value,),
).fetchone()
result['total'] = row[0] if row else 0
return result
def count_entries(self, source: Optional[cve_source_t] = None) -> int:
if source is not None:
row = self._conn.execute(
@ -203,4 +360,39 @@ class cve_db_t(orm_module_t):
return row[0] if row else 0
def upsert_ecosystems(self, ecosystems: list[cve_ecosystem_row_t]) -> int:
cur = self._conn.cursor()
count = 0
for e in ecosystems:
cur.execute(
'''
INSERT INTO cve_osv_ecosystems (name, category, synced_at)
VALUES (?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
category = excluded.category,
synced_at = excluded.synced_at
''',
(e.name, e.category, e.synced_at.isoformat() if e.synced_at else ''),
)
count += 1
self._conn.commit()
return count
def get_ecosystems(self) -> list[cve_ecosystem_row_t]:
cur = self._conn.execute(
'SELECT name, category, synced_at FROM cve_osv_ecosystems ORDER BY category, name'
)
return _ecosystem_list_adapter.validate_python(_rows_to_dicts(cur))
def get_ecosystems_by_category(self) -> dict[str, list[str]]:
result: dict[str, list[str]] = {}
for e in self.get_ecosystems():
result.setdefault(e.category, []).append(e.name)
return result
def count_ecosystems(self) -> int:
row = self._conn.execute('SELECT COUNT(*) FROM cve_osv_ecosystems').fetchone()
return row[0] if row else 0
orm_registry_t.register(cve_db_t)

@ -1,15 +1,14 @@
"""NVD (NIST) backend.
"""NVD (NIST) backend (async).
Source: https://services.nvd.nist.gov/rest/json/cves/2.0
Optional API key. Rate limited: 5 req/30s without key, 50 with key.
Paginated (max 2000/page). Supports lastModStartDate/lastModEndDate (max 120 days).
"""
import asyncio
import logging
import math
import time
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
from typing import Optional
@ -22,7 +21,6 @@ from .types import (
cve_entry_t,
cve_severity_t,
cve_source_t,
cve_status_t,
cve_sync_estimate_t,
)
@ -31,8 +29,10 @@ logger = logging.getLogger(__name__)
BASE_URL = 'https://services.nvd.nist.gov/rest/json/cves/2.0'
PAGE_SIZE = 2000
MAX_RANGE_DAYS = 120
REQUEST_DELAY_NO_KEY = 6.5 # 5 req / 30s → ~6s between
REQUEST_DELAY_WITH_KEY = 0.7 # 50 req / 30s → ~0.6s between
REQUEST_DELAY_NO_KEY = 6.5
REQUEST_DELAY_WITH_KEY = 0.7
_response_adapter = pydantic.TypeAdapter(nvd_response_t)
def _severity_from_nvd(s: str) -> cve_severity_t:
@ -46,7 +46,6 @@ def _severity_from_nvd(s: str) -> cve_severity_t:
def _date_ranges(start: datetime, end: datetime) -> list[tuple[str, str]]:
"""Split a date range into chunks of MAX_RANGE_DAYS."""
ranges: list[tuple[str, str]] = []
cur = start
while cur < end:
@ -71,13 +70,22 @@ class nvd_backend_t(cve_backend_t):
def _build_url(self, params: dict[str, str]) -> str:
return '%s?%s' % (BASE_URL, urllib.parse.urlencode(params))
def _fetch_page(self, url: str) -> nvd_response_t:
req = urllib.request.Request(url)
if self._api_key:
req.add_header('apiKey', self._api_key)
resp = urllib.request.urlopen(req, timeout=30)
raw = resp.read()
return pydantic.TypeAdapter(nvd_response_t).validate_json(raw)
async def _fetch_page(self, url: str) -> nvd_response_t:
loop = asyncio.get_running_loop()
api_key = self._api_key
def _do() -> bytes:
import urllib.request as ur
req = ur.Request(url)
if api_key:
req.add_header('apiKey', api_key)
resp = ur.urlopen(req, timeout=30)
return resp.read()
raw = await loop.run_in_executor(None, _do)
return _response_adapter.validate_json(raw)
def _compute_date_range(
self,
@ -93,7 +101,7 @@ class nvd_backend_t(cve_backend_t):
start = end - timedelta(days=120)
return start, end
def estimate_sync(
async def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
@ -101,11 +109,8 @@ class nvd_backend_t(cve_backend_t):
start, end = self._compute_date_range(since, months)
ranges = _date_ranges(start, end)
# fetch first page of first range to get totalResults
if len(ranges) == 0:
return cve_sync_estimate_t(
source=cve_source_t.nvd, available=False,
)
return cve_sync_estimate_t(source=cve_source_t.nvd, available=False)
params = {
'lastModStartDate': ranges[0][0],
@ -113,15 +118,12 @@ class nvd_backend_t(cve_backend_t):
'resultsPerPage': '1',
}
try:
page = self._fetch_page(self._build_url(params))
page = await self._fetch_page(self._build_url(params))
total_first_range = page.totalResults
except Exception as e:
logger.warning(dict(msg='nvd estimate failed', error=str(e)))
return cve_sync_estimate_t(
source=cve_source_t.nvd, available=False,
)
return cve_sync_estimate_t(source=cve_source_t.nvd, available=False)
# rough estimate: total_first_range * num_ranges (assuming uniform distribution)
estimated_total = total_first_range * len(ranges)
pages_per_range = max(1, math.ceil(total_first_range / PAGE_SIZE))
num_fetches = pages_per_range * len(ranges)
@ -134,7 +136,7 @@ class nvd_backend_t(cve_backend_t):
available=True,
)
def sync(
async def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
@ -160,7 +162,7 @@ class nvd_backend_t(cve_backend_t):
url = self._build_url(params)
logger.info(dict(msg='nvd fetch', url=url))
page = self._fetch_page(url)
page = await self._fetch_page(url)
fetch_count += 1
for vuln in page.vulnerabilities:
@ -202,10 +204,10 @@ class nvd_backend_t(cve_backend_t):
break
start_index += page.resultsPerPage
time.sleep(self._delay)
await asyncio.sleep(self._delay)
if len(ranges) > 1:
time.sleep(self._delay)
await asyncio.sleep(self._delay)
logger.info(dict(msg='nvd sync done', fetches=fetch_count, entries=len(entries)))
return entries

@ -4,13 +4,16 @@ Source: https://api.osv.dev/v1/
No auth. No rate limits. Arch Linux is NOT a supported ecosystem,
so we query using Debian ecosystem as a proxy and map results.
Supports batch queries (up to 1000 per request).
Ecosystem list fetched from GCS bucket listing.
"""
import json
import asyncio
import datetime
import logging
import urllib.request
import xml.etree.ElementTree as ET
from typing import Optional
from typing import ClassVar, Optional
import pydantic
@ -18,10 +21,12 @@ from .base import SyncProgressCallback, cve_backend_t
from .osv_types import (
osv_batch_request_t,
osv_batch_response_t,
osv_ecosystem_t,
osv_package_query_t,
osv_query_t,
)
from .types import (
cve_ecosystem_row_t,
cve_entry_t,
cve_source_t,
cve_sync_estimate_t,
@ -30,11 +35,100 @@ from .types import (
logger = logging.getLogger(__name__)
QUERY_URL = 'https://api.osv.dev/v1/querybatch'
VULN_URL = 'https://api.osv.dev/v1/vulns'
GCS_BUCKET_URL = 'https://osv-vulnerabilities.storage.googleapis.com/'
BATCH_SIZE = 1000
DEFAULT_ECOSYSTEM = 'Debian:12'
class osv_ecosystems_t:
"""Fetches and caches the list of ecosystems supported by OSV."""
_cached: ClassVar[Optional[list[osv_ecosystem_t]]] = None
@classmethod
async def fetch(cls, force: bool = False) -> list[osv_ecosystem_t]:
if cls._cached is not None and not force:
return cls._cached
loop = asyncio.get_running_loop()
ecosystems: list[osv_ecosystem_t] = []
marker = ''
seen: set[str] = set()
while True:
url = '%s?delimiter=/&prefix=&marker=%s' % (GCS_BUCKET_URL, marker)
logger.debug(dict(msg='fetching osv ecosystems page', marker=marker))
def _do(u: str = url) -> str:
resp = urllib.request.urlopen(u, timeout=30)
return resp.read().decode('utf-8')
raw = await loop.run_in_executor(None, _do)
root = ET.fromstring(raw)
ns = '{http://doc.s3.amazonaws.com/2006-03-01}'
new_count = 0
for prefix_el in root.findall('.//' + ns + 'Prefix'):
if prefix_el.text is not None:
name = prefix_el.text.rstrip('/')
if name and name not in ('all', 'icons', '[EMPTY]') and name not in seen:
ecosystems.append(osv_ecosystem_t(name=name))
seen.add(name)
new_count += 1
marker = name
is_truncated_el = root.find(ns + 'IsTruncated')
is_truncated = is_truncated_el is not None and is_truncated_el.text == 'true'
if not is_truncated or new_count == 0:
break
cls._cached = ecosystems
logger.info(dict(msg='osv ecosystems fetched', count=len(ecosystems)))
return ecosystems
@classmethod
async def names(cls, force: bool = False) -> set[str]:
return {e.name for e in await cls.fetch(force=force)}
@classmethod
def reset_cache(cls) -> None:
cls._cached = None
# known categories for status reporting
LINUX_DISTROS: ClassVar[set[str]] = {
'Debian', 'Debian:12', 'Debian:13',
'Ubuntu', 'Ubuntu:24.04:LTS',
'Alpine', 'Alpine:v3.20', 'Alpine:v3.21',
'Wolfi', 'Chainguard',
'Rocky Linux', 'AlmaLinux',
'Red Hat', 'SUSE', 'openSUSE',
'Linux',
}
LANGUAGE_ECOSYSTEMS: ClassVar[set[str]] = {
'PyPI', 'npm', 'crates.io', 'Go', 'RubyGems',
'Hackage', 'Maven', 'NuGet', 'Packagist',
'Hex', 'Pub', 'CRAN',
}
@classmethod
async def to_ecosystem_rows(cls, now: datetime.datetime) -> list[cve_ecosystem_row_t]:
"""Fetch ecosystems and return typed rows with categories."""
names = await cls.names(force=True)
rows: list[cve_ecosystem_row_t] = []
for name in sorted(names):
if name in cls.LINUX_DISTROS:
cat = 'linux_distro'
elif name in cls.LANGUAGE_ECOSYSTEMS:
cat = 'language'
else:
cat = 'other'
rows.append(cve_ecosystem_row_t(name=name, category=cat, synced_at=now))
return rows
class osv_backend_t(cve_backend_t):
def __init__(self, ecosystem: str = DEFAULT_ECOSYSTEM) -> None:
self._ecosystem = ecosystem
@ -43,13 +137,11 @@ class osv_backend_t(cve_backend_t):
def source(self) -> cve_source_t:
return cve_source_t.osv
def estimate_sync(
async def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
) -> cve_sync_estimate_t:
# OSV doesn't support time-range queries for bulk.
# Estimation not meaningful without a package list.
return cve_sync_estimate_t(
source=cve_source_t.osv,
num_fetches=0,
@ -58,26 +150,21 @@ class osv_backend_t(cve_backend_t):
available=False,
)
def sync(
async def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
# OSV requires package names to query. A blind sync isn't supported.
# Use query_packages() instead.
logger.warning(dict(msg='osv sync requires explicit package list, use query_packages()'))
return []
def query_packages(
async def query_packages(
self,
packages: list[tuple[str, str]],
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
"""Query OSV for a list of (name, version) tuples.
Uses batch API. Returns unified CVE entries.
"""
"""Query OSV for a list of (name, version) tuples."""
entries: list[cve_entry_t] = []
total = len(packages)
@ -97,15 +184,7 @@ class osv_backend_t(cve_backend_t):
]
)
req = urllib.request.Request(
QUERY_URL,
data=request.model_dump_json().encode('utf-8'),
headers={'Content-Type': 'application/json'},
method='POST',
)
resp = urllib.request.urlopen(req, timeout=30)
raw = resp.read()
raw = await self._post_json(QUERY_URL, request.model_dump_json().encode('utf-8'))
batch_resp = pydantic.TypeAdapter(osv_batch_response_t).validate_json(raw)
for i, result in enumerate(batch_resp.results):

@ -28,3 +28,9 @@ class osv_query_t(pydantic.BaseModel):
class osv_batch_request_t(pydantic.BaseModel):
queries: list[osv_query_t]
class osv_ecosystem_t(pydantic.BaseModel):
"""One ecosystem entry parsed from GCS bucket listing."""
name: str

@ -1,10 +1,19 @@
"""CVE unified types — exposed to users of the cve module."""
import datetime
import enum
from typing import Optional
import pydantic
class output_format_t(enum.StrEnum):
txt = 'txt'
json = 'json'
yaml = 'yaml'
class cve_source_t(enum.Enum):
arch_tracker = 'arch_tracker'
nvd = 'nvd'
@ -80,3 +89,26 @@ class cve_check_result_t(pydantic.BaseModel):
title: str
version_fixed: str = ''
status: cve_status_t = cve_status_t.unknown
class cve_ecosystem_row_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(frozen=True)
name: str
category: str = 'other'
synced_at: Optional[datetime.datetime] = None
class cve_upsert_result_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(frozen=True)
received: int = 0
inserted: int = 0
updated: int = 0
class cve_date_range_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(frozen=True)
earliest: Optional[datetime.datetime] = None
latest: Optional[datetime.datetime] = None

@ -20,15 +20,21 @@ class Command(enum.Enum):
download = 'download'
archive = 'archive'
diff = 'diff'
cve = 'cve'
def main(argv: Optional[list[str]] = None) -> int:
if argv is None:
argv = sys.argv[1:]
from online.fxreader.pr34.commands_typed import argparse as pr34_argparse
prog = 'online-fxreader-pr34-archlinux'
command_values = [o.value for o in Command]
parser = argparse.ArgumentParser(
prog='online-fxreader-pr34-archlinux',
description='Arch Linux package management tools',
prog=prog,
description='Arch Linux package management tools. Commands: %s' % ', '.join(command_values),
)
parser.add_argument(
'--log-level',
@ -37,13 +43,15 @@ def main(argv: Optional[list[str]] = None) -> int:
default='INFO',
help='log level (default: INFO)',
)
parser.add_argument(
'command',
choices=[o.value for o in Command],
)
options, args = parser.parse_known_args(argv)
options.command = Command(options.command)
options, args = pr34_argparse.parse_args(parser, argv, stop_at=command_values)
if len(args) == 0 or args[0] not in command_values:
parser.print_help()
return 1
options.command = Command(args[0])
args = args[1:]
pr34_logging.setup(
level=getattr(logging, options.log_level),
@ -71,6 +79,10 @@ def main(argv: Optional[list[str]] = None) -> int:
from . import diff
return diff.main(args)
elif options.command is Command.cve:
from ..apps.cve.cli import main as cve_main
return cve_main(args, prog='%s cve' % parser.prog)
else:
raise NotImplementedError

@ -221,8 +221,9 @@ class TestCveDb(unittest.TestCase):
version_fixed='9.0.1225-1',
),
]
count = self.db.upsert_entries(entries)
self.assertEqual(count, 2)
result = self.db.upsert_entries(entries)
self.assertEqual(result.received, 2)
self.assertGreater(result.inserted, 0)
results = self.db.query_by_product('vim')
self.assertEqual(len(results), 2)