[+] add CVE module: arch tracker, NVD, OSV backends with unified types and ORM

1. add apps/cve/ with unified types (cve_entry_t, cve_severity_t, cve_status_t);
  2. backend-specific pydantic models in arch_tracker_types.py, nvd_types.py, osv_types.py;
  3. abstract cve_backend_t with estimate_sync (HTTP HEAD) and sync interfaces;
  4. arch_tracker backend: fetches /issues/all.json, pydantic validate_json from stream;
  5. nvd backend: paginated, rate-limited, date range chunking (120-day max), optional API key;
  6. osv backend: batch query API, Debian ecosystem proxy, query_packages() for explicit lists;
  7. cve_db_t ORM module: cve_entries, cve_details (JSON blob), cve_sync_meta tables;
  8. all db queries return pydantic-validated lists via TypeAdapter;
  9. checker.py: check packages against cached CVEs using vercmp version comparison;
  10. heuristics.py: normalize product names, known alias table, exact/normalized matching;
  11. test_cve.py: 37 tests covering type parsing (real samples), db CRUD, checker logic, heuristics;
This commit is contained in:
LLM 2026-04-13 09:00:00 +00:00
parent 96ab3c23e5
commit da86659b67
13 changed files with 1507 additions and 0 deletions

@ -0,0 +1,106 @@
"""Arch Linux Security Tracker backend.
Source: https://security.archlinux.org/issues/all.json
No auth. Full dump ~900KB. Always fetches entire dataset (no incremental API).
Uses pydantic TypeAdapter to parse the JSON response directly.
"""
import logging
import urllib.request
from typing import Optional
import pydantic
from .arch_tracker_types import arch_avg_t
from .base import SyncProgressCallback, cve_backend_t
from .types import (
cve_entry_t,
cve_severity_t,
cve_source_t,
cve_status_t,
cve_sync_estimate_t,
)
logger = logging.getLogger(__name__)
ISSUES_URL = 'https://security.archlinux.org/issues/all.json'
SEVERITY_MAP: dict[str, cve_severity_t] = {
'Low': cve_severity_t.low,
'Medium': cve_severity_t.medium,
'High': cve_severity_t.high,
'Critical': cve_severity_t.critical,
}
STATUS_MAP: dict[str, cve_status_t] = {
'Vulnerable': cve_status_t.vulnerable,
'Fixed': cve_status_t.fixed,
'Not affected': cve_status_t.not_affected,
}
_avg_list_adapter = pydantic.TypeAdapter(list[arch_avg_t])
class arch_tracker_backend_t(cve_backend_t):
@property
def source(self) -> cve_source_t:
return cve_source_t.arch_tracker
def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
) -> cve_sync_estimate_t:
content_length = self._head_content_length(ISSUES_URL)
return cve_sync_estimate_t(
source=cve_source_t.arch_tracker,
num_fetches=1,
content_length=content_length,
incremental=False,
available=content_length > 0,
)
def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
logger.info(dict(msg='fetching arch security tracker', url=ISSUES_URL))
resp = urllib.request.urlopen(ISSUES_URL, timeout=30)
raw_bytes = resp.read()
avgs = _avg_list_adapter.validate_json(raw_bytes)
entries: list[cve_entry_t] = []
total = len(avgs)
for i, avg in enumerate(avgs):
severity = SEVERITY_MAP.get(avg.severity, cve_severity_t.unknown)
status = STATUS_MAP.get(avg.status, cve_status_t.unknown)
for cve_id in avg.issues:
for pkg in avg.packages:
entries.append(
cve_entry_t(
cve_id=cve_id,
source=cve_source_t.arch_tracker,
product=pkg,
version_affected=avg.affected,
version_fixed=avg.fixed or '',
severity=severity,
title='%s %s' % (avg.name, avg.type),
status=status,
)
)
if on_progress is not None and (i + 1) % 100 == 0:
on_progress(i + 1, total)
if on_progress is not None:
on_progress(total, total)
logger.info(dict(msg='arch tracker sync done', avgs=total, entries=len(entries)))
return entries

@ -0,0 +1,20 @@
"""Pydantic models for Arch Linux Security Tracker API responses."""
from typing import Optional
import pydantic
class arch_avg_t(pydantic.BaseModel):
"""One AVG entry from /issues/all.json."""
name: str
packages: list[str] = pydantic.Field(default_factory=list)
status: str = ''
severity: str = ''
type: str = ''
affected: str = ''
fixed: Optional[str] = None
ticket: Optional[str] = None
issues: list[str] = pydantic.Field(default_factory=list)
advisories: list[str] = pydantic.Field(default_factory=list)

@ -0,0 +1,55 @@
"""Abstract backend interface for CVE data sources."""
import abc
import logging
import urllib.request
from typing import Callable, Optional
from .types import (
cve_entry_t,
cve_source_t,
cve_sync_estimate_t,
)
logger = logging.getLogger(__name__)
SyncProgressCallback = Callable[[int, int], None]
class cve_backend_t(abc.ABC):
@property
@abc.abstractmethod
def source(self) -> cve_source_t:
raise NotImplementedError
@abc.abstractmethod
def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
) -> cve_sync_estimate_t:
"""Estimate sync size via HTTP HEAD or lightweight API call."""
raise NotImplementedError
@abc.abstractmethod
def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
"""Fetch CVE entries from the source."""
raise NotImplementedError
@staticmethod
def _head_content_length(url: str) -> int:
"""HTTP HEAD to get Content-Length. Returns 0 if unavailable."""
try:
req = urllib.request.Request(url, method='HEAD')
resp = urllib.request.urlopen(req, timeout=10)
cl = resp.headers.get('Content-Length', '0')
return int(cl)
except Exception:
logger.debug(dict(msg='HEAD failed', url=url))
return 0

@ -0,0 +1,84 @@
"""Check packages against cached CVE data.
Uses vercmp to compare package versions against affected/fixed ranges.
"""
import logging
from typing import Optional
from ...models import vercmp_t
from .db import cve_db_t
from .types import (
cve_check_result_t,
cve_entry_t,
cve_source_t,
cve_status_t,
)
logger = logging.getLogger(__name__)
def _is_affected(
pkg_version: str,
entry: cve_entry_t,
) -> bool:
"""Check if pkg_version is affected by this CVE entry.
A package is affected if:
- version_affected is set and pkg_version <= version_affected
- version_fixed is set and pkg_version < version_fixed
- if both set: version_affected <= pkg_version < version_fixed
"""
if entry.version_fixed != '':
cmp = vercmp_t.vercmp(pkg_version, entry.version_fixed)
if cmp >= 0:
return False # at or above fix
if entry.version_affected != '':
cmp = vercmp_t.vercmp(pkg_version, entry.version_affected)
if cmp > 0:
return False # above affected version
# if neither is set, we can't determine — assume affected
if entry.version_affected == '' and entry.version_fixed == '':
return entry.status is cve_status_t.vulnerable
return True
def check_packages(
db: cve_db_t,
packages: list[tuple[str, str]],
sources: Optional[list[cve_source_t]] = None,
) -> list[cve_check_result_t]:
"""Check a list of (name, version) against cached CVEs.
Returns list of CVE matches where the package version is affected.
"""
results: list[cve_check_result_t] = []
for name, version in packages:
entries = db.query_by_product(name)
if sources is not None:
entries = [e for e in entries if e.source in sources]
for entry in entries:
if not _is_affected(version, entry):
continue
results.append(
cve_check_result_t(
package=name,
version=version,
cve_id=entry.cve_id,
severity=entry.severity,
score=entry.score,
title=entry.title,
version_fixed=entry.version_fixed,
status=entry.status,
)
)
return results

@ -0,0 +1,206 @@
"""CVE ORM module — sqlite tables for cached CVE data."""
import json
import logging
import pathlib
import sqlite3
from typing import Any, Optional
import pydantic
from ..orm.registry import orm_module_t, orm_registry_t
from .types import (
cve_entry_t,
cve_source_t,
cve_sync_status_t,
)
logger = logging.getLogger(__name__)
_entry_list_adapter = pydantic.TypeAdapter(list[cve_entry_t])
_sync_status_list_adapter = pydantic.TypeAdapter(list[cve_sync_status_t])
def _rows_to_dicts(cur: sqlite3.Cursor) -> list[dict[str, Any]]:
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
class cve_db_t(orm_module_t):
@classmethod
def table_prefix(cls) -> str:
return 'cve'
@classmethod
def schema_version(cls) -> int:
return 1
@classmethod
def migrate(cls, conn: sqlite3.Connection, from_version: int, to_version: int) -> None:
if from_version < 1:
conn.executescript("""
CREATE TABLE IF NOT EXISTS cve_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cve_id TEXT NOT NULL,
source TEXT NOT NULL,
product TEXT NOT NULL,
version_affected TEXT NOT NULL DEFAULT '',
version_fixed TEXT NOT NULL DEFAULT '',
severity TEXT NOT NULL DEFAULT 'unknown',
score REAL NOT NULL DEFAULT 0.0,
title TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
date_published TEXT NOT NULL DEFAULT '',
date_modified TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'unknown',
UNIQUE(cve_id, source, product)
);
CREATE TABLE IF NOT EXISTS cve_details (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cve_id TEXT NOT NULL,
source TEXT NOT NULL,
raw_json TEXT NOT NULL DEFAULT '{}',
UNIQUE(cve_id, source)
);
CREATE TABLE IF NOT EXISTS cve_sync_meta (
source TEXT PRIMARY KEY,
last_sync TEXT NOT NULL DEFAULT '',
entry_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_cve_entries_cve_id ON cve_entries(cve_id);
CREATE INDEX IF NOT EXISTS idx_cve_entries_product ON cve_entries(product);
CREATE INDEX IF NOT EXISTS idx_cve_entries_source ON cve_entries(source);
""")
conn.commit()
def __init__(self, db_path_or_conn: 'pathlib.Path | sqlite3.Connection') -> None:
if isinstance(db_path_or_conn, sqlite3.Connection):
super().__init__(db_path_or_conn)
else:
registry = orm_registry_t.get(db_path_or_conn)
super().__init__(registry.conn)
def upsert_entries(self, entries: list[cve_entry_t]) -> int:
cur = self._conn.cursor()
count = 0
for e in entries:
cur.execute(
'''
INSERT INTO cve_entries
(cve_id, source, product, version_affected, version_fixed,
severity, score, title, description, date_published, date_modified, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(cve_id, source, product) DO UPDATE SET
version_affected = excluded.version_affected,
version_fixed = excluded.version_fixed,
severity = excluded.severity,
score = excluded.score,
title = excluded.title,
description = excluded.description,
date_published = excluded.date_published,
date_modified = excluded.date_modified,
status = excluded.status
''',
(
e.cve_id,
e.source.value,
e.product,
e.version_affected,
e.version_fixed,
e.severity.value,
e.score,
e.title,
e.description,
e.date_published,
e.date_modified,
e.status.value,
),
)
count += 1
self._conn.commit()
return count
def upsert_detail(self, cve_id: str, source: cve_source_t, raw: object) -> None:
self._conn.execute(
'''
INSERT INTO cve_details (cve_id, source, raw_json)
VALUES (?, ?, ?)
ON CONFLICT(cve_id, source) DO UPDATE SET raw_json = excluded.raw_json
''',
(cve_id, source.value, json.dumps(raw, default=str)),
)
self._conn.commit()
def update_sync_meta(self, source: cve_source_t, last_sync: str, entry_count: int) -> None:
self._conn.execute(
'''
INSERT INTO cve_sync_meta (source, last_sync, entry_count)
VALUES (?, ?, ?)
ON CONFLICT(source) DO UPDATE SET
last_sync = excluded.last_sync,
entry_count = excluded.entry_count
''',
(source.value, last_sync, entry_count),
)
self._conn.commit()
def get_sync_status(self, source: cve_source_t) -> cve_sync_status_t:
cur = self._conn.execute(
'SELECT source, last_sync, entry_count FROM cve_sync_meta WHERE source = ?',
(source.value,),
)
rows = _rows_to_dicts(cur)
if len(rows) == 0:
return cve_sync_status_t(source=source)
validated = _sync_status_list_adapter.validate_python(rows)
return validated[0]
def get_all_sync_statuses(self) -> list[cve_sync_status_t]:
cur = self._conn.execute(
'SELECT source, last_sync, entry_count FROM cve_sync_meta'
)
return _sync_status_list_adapter.validate_python(_rows_to_dicts(cur))
def query_by_product(self, product: str) -> list[cve_entry_t]:
cur = self._conn.execute(
'SELECT cve_id, source, product, version_affected, version_fixed, '
'severity, score, title, description, date_published, date_modified, status '
'FROM cve_entries WHERE product = ?',
(product,),
)
return _entry_list_adapter.validate_python(_rows_to_dicts(cur))
def query_by_cve_id(self, cve_id: str) -> list[cve_entry_t]:
cur = self._conn.execute(
'SELECT cve_id, source, product, version_affected, version_fixed, '
'severity, score, title, description, date_published, date_modified, status '
'FROM cve_entries WHERE cve_id = ?',
(cve_id,),
)
return _entry_list_adapter.validate_python(_rows_to_dicts(cur))
def query_by_source(self, source: cve_source_t) -> list[cve_entry_t]:
cur = self._conn.execute(
'SELECT cve_id, source, product, version_affected, version_fixed, '
'severity, score, title, description, date_published, date_modified, status '
'FROM cve_entries WHERE source = ?',
(source.value,),
)
return _entry_list_adapter.validate_python(_rows_to_dicts(cur))
def count_entries(self, source: Optional[cve_source_t] = None) -> int:
if source is not None:
row = self._conn.execute(
'SELECT COUNT(*) FROM cve_entries WHERE source = ?',
(source.value,),
).fetchone()
else:
row = self._conn.execute('SELECT COUNT(*) FROM cve_entries').fetchone()
return row[0] if row else 0
orm_registry_t.register(cve_db_t)

@ -0,0 +1,90 @@
"""Heuristics to map CVE product names to Arch Linux package names.
NVD uses CPE product names (e.g. 'vim', 'linux_kernel', 'openssl').
This module provides simple mapping strategies:
1. Exact match product == arch package name
2. Normalized match underscores to hyphens, lowercase
3. Known aliases manual mapping table for common divergences
"""
import logging
import re
from typing import Optional
logger = logging.getLogger(__name__)
# known divergences: nvd/cpe product name -> arch package name
KNOWN_ALIASES: dict[str, str] = {
'linux_kernel': 'linux',
'linux': 'linux',
'openssh': 'openssh',
'openssl': 'openssl',
'gnu_bash': 'bash',
'gnutls': 'gnutls',
'libtiff': 'libtiff',
'libxml2': 'libxml2',
'libpng': 'libpng',
'zlib': 'zlib',
'curl': 'curl',
'wget': 'wget',
'python': 'python',
'ruby': 'ruby',
'perl': 'perl',
'node.js': 'nodejs',
'nodejs': 'nodejs',
'firefox': 'firefox',
'thunderbird': 'thunderbird',
'chromium': 'chromium',
'vim': 'vim',
'neovim': 'neovim',
'git': 'git',
'sudo': 'sudo',
'systemd': 'systemd',
'glibc': 'glibc',
'binutils': 'binutils',
'gcc': 'gcc',
'grub2': 'grub',
'xorg-server': 'xorg-server',
}
def normalize_product_name(product: str) -> str:
"""Normalize a product name to match Arch conventions."""
name = product.lower().strip()
name = name.replace('_', '-')
name = re.sub(r'[^a-z0-9\-+.]', '', name)
return name
def map_to_arch_package(
product: str,
known_packages: Optional[set[str]] = None,
) -> Optional[str]:
"""Try to map a CVE product name to an Arch package name.
Returns the mapped name or None if no match found.
Strategies in order:
1. Known aliases table
2. Exact match against known_packages
3. Normalized match against known_packages
"""
lower = product.lower()
# 1. known aliases
if lower in KNOWN_ALIASES:
candidate = KNOWN_ALIASES[lower]
if known_packages is None or candidate in known_packages:
return candidate
# 2. exact match
if known_packages is not None and lower in known_packages:
return lower
# 3. normalized
normalized = normalize_product_name(product)
if known_packages is not None and normalized in known_packages:
return normalized
return None

@ -0,0 +1,211 @@
"""NVD (NIST) backend.
Source: https://services.nvd.nist.gov/rest/json/cves/2.0
Optional API key. Rate limited: 5 req/30s without key, 50 with key.
Paginated (max 2000/page). Supports lastModStartDate/lastModEndDate (max 120 days).
"""
import logging
import math
import time
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
from typing import Optional
import pydantic
from .base import SyncProgressCallback, cve_backend_t
from .nvd_types import nvd_response_t
from .types import (
cve_entry_t,
cve_severity_t,
cve_source_t,
cve_status_t,
cve_sync_estimate_t,
)
logger = logging.getLogger(__name__)
BASE_URL = 'https://services.nvd.nist.gov/rest/json/cves/2.0'
PAGE_SIZE = 2000
MAX_RANGE_DAYS = 120
REQUEST_DELAY_NO_KEY = 6.5 # 5 req / 30s → ~6s between
REQUEST_DELAY_WITH_KEY = 0.7 # 50 req / 30s → ~0.6s between
def _severity_from_nvd(s: str) -> cve_severity_t:
mapping = {
'LOW': cve_severity_t.low,
'MEDIUM': cve_severity_t.medium,
'HIGH': cve_severity_t.high,
'CRITICAL': cve_severity_t.critical,
}
return mapping.get(s.upper(), cve_severity_t.unknown)
def _date_ranges(start: datetime, end: datetime) -> list[tuple[str, str]]:
"""Split a date range into chunks of MAX_RANGE_DAYS."""
ranges: list[tuple[str, str]] = []
cur = start
while cur < end:
chunk_end = min(cur + timedelta(days=MAX_RANGE_DAYS), end)
ranges.append((
cur.strftime('%Y-%m-%dT%H:%M:%S.000'),
chunk_end.strftime('%Y-%m-%dT%H:%M:%S.000'),
))
cur = chunk_end
return ranges
class nvd_backend_t(cve_backend_t):
def __init__(self, api_key: Optional[str] = None) -> None:
self._api_key = api_key
self._delay = REQUEST_DELAY_WITH_KEY if api_key else REQUEST_DELAY_NO_KEY
@property
def source(self) -> cve_source_t:
return cve_source_t.nvd
def _build_url(self, params: dict[str, str]) -> str:
return '%s?%s' % (BASE_URL, urllib.parse.urlencode(params))
def _fetch_page(self, url: str) -> nvd_response_t:
req = urllib.request.Request(url)
if self._api_key:
req.add_header('apiKey', self._api_key)
resp = urllib.request.urlopen(req, timeout=30)
raw = resp.read()
return pydantic.TypeAdapter(nvd_response_t).validate_json(raw)
def _compute_date_range(
self,
since: Optional[str],
months: Optional[int],
) -> tuple[datetime, datetime]:
end = datetime.now(timezone.utc)
if since is not None:
start = datetime.fromisoformat(since).replace(tzinfo=timezone.utc)
elif months is not None:
start = end - timedelta(days=months * 30)
else:
start = end - timedelta(days=120)
return start, end
def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
) -> cve_sync_estimate_t:
start, end = self._compute_date_range(since, months)
ranges = _date_ranges(start, end)
# fetch first page of first range to get totalResults
if len(ranges) == 0:
return cve_sync_estimate_t(
source=cve_source_t.nvd, available=False,
)
params = {
'lastModStartDate': ranges[0][0],
'lastModEndDate': ranges[0][1],
'resultsPerPage': '1',
}
try:
page = self._fetch_page(self._build_url(params))
total_first_range = page.totalResults
except Exception as e:
logger.warning(dict(msg='nvd estimate failed', error=str(e)))
return cve_sync_estimate_t(
source=cve_source_t.nvd, available=False,
)
# rough estimate: total_first_range * num_ranges (assuming uniform distribution)
estimated_total = total_first_range * len(ranges)
pages_per_range = max(1, math.ceil(total_first_range / PAGE_SIZE))
num_fetches = pages_per_range * len(ranges)
return cve_sync_estimate_t(
source=cve_source_t.nvd,
num_fetches=num_fetches,
content_length=0,
incremental=since is not None,
available=True,
)
def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
start, end = self._compute_date_range(since, months)
ranges = _date_ranges(start, end)
entries: list[cve_entry_t] = []
fetch_count = 0
for range_start, range_end in ranges:
start_index = 0
while True:
params = {
'lastModStartDate': range_start,
'lastModEndDate': range_end,
'resultsPerPage': str(PAGE_SIZE),
'startIndex': str(start_index),
}
url = self._build_url(params)
logger.info(dict(msg='nvd fetch', url=url))
page = self._fetch_page(url)
fetch_count += 1
for vuln in page.vulnerabilities:
cve = vuln.cve
desc = ''
for d in cve.descriptions:
if d.lang == 'en':
desc = d.value
break
score = 0.0
severity = cve_severity_t.unknown
for metric_key in ('cvssMetricV31', 'cvssMetricV30', 'cvssMetricV2'):
metrics = cve.metrics.get(metric_key, [])
if len(metrics) > 0:
m = metrics[0]
score = m.cvssData.baseScore
severity = _severity_from_nvd(m.cvssData.baseSeverity)
break
entries.append(
cve_entry_t(
cve_id=cve.id,
source=cve_source_t.nvd,
product=cve.id,
severity=severity,
score=score,
title=cve.id,
description=desc,
date_published=cve.published,
date_modified=cve.lastModified,
)
)
if on_progress is not None:
on_progress(len(entries), page.totalResults * len(ranges))
if start_index + page.resultsPerPage >= page.totalResults:
break
start_index += page.resultsPerPage
time.sleep(self._delay)
if len(ranges) > 1:
time.sleep(self._delay)
logger.info(dict(msg='nvd sync done', fetches=fetch_count, entries=len(entries)))
return entries

@ -0,0 +1,50 @@
"""Pydantic models for NVD (NIST) API responses."""
import pydantic
class nvd_cvss_data_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(extra='allow')
version: str = ''
baseScore: float = 0.0
baseSeverity: str = ''
class nvd_cvss_metric_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(extra='allow')
source: str = ''
type: str = ''
cvssData: nvd_cvss_data_t = pydantic.Field(default_factory=nvd_cvss_data_t)
class nvd_description_t(pydantic.BaseModel):
lang: str
value: str
class nvd_cve_item_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(extra='allow')
id: str
sourceIdentifier: str = ''
published: str = ''
lastModified: str = ''
vulnStatus: str = ''
descriptions: list[nvd_description_t] = pydantic.Field(default_factory=list)
metrics: dict[str, list[nvd_cvss_metric_t]] = pydantic.Field(default_factory=dict)
class nvd_vulnerability_t(pydantic.BaseModel):
cve: nvd_cve_item_t
class nvd_response_t(pydantic.BaseModel):
resultsPerPage: int = 0
startIndex: int = 0
totalResults: int = 0
format: str = ''
version: str = ''
timestamp: str = ''
vulnerabilities: list[nvd_vulnerability_t] = pydantic.Field(default_factory=list)

@ -0,0 +1,131 @@
"""OSV (Google) backend.
Source: https://api.osv.dev/v1/
No auth. No rate limits. Arch Linux is NOT a supported ecosystem,
so we query using Debian ecosystem as a proxy and map results.
Supports batch queries (up to 1000 per request).
"""
import json
import logging
import urllib.request
from typing import Optional
import pydantic
from .base import SyncProgressCallback, cve_backend_t
from .osv_types import (
osv_batch_request_t,
osv_batch_response_t,
osv_package_query_t,
osv_query_t,
)
from .types import (
cve_entry_t,
cve_source_t,
cve_sync_estimate_t,
)
logger = logging.getLogger(__name__)
QUERY_URL = 'https://api.osv.dev/v1/querybatch'
VULN_URL = 'https://api.osv.dev/v1/vulns'
BATCH_SIZE = 1000
DEFAULT_ECOSYSTEM = 'Debian:12'
class osv_backend_t(cve_backend_t):
def __init__(self, ecosystem: str = DEFAULT_ECOSYSTEM) -> None:
self._ecosystem = ecosystem
@property
def source(self) -> cve_source_t:
return cve_source_t.osv
def estimate_sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
) -> cve_sync_estimate_t:
# OSV doesn't support time-range queries for bulk.
# Estimation not meaningful without a package list.
return cve_sync_estimate_t(
source=cve_source_t.osv,
num_fetches=0,
content_length=0,
incremental=False,
available=False,
)
def sync(
self,
since: Optional[str] = None,
months: Optional[int] = None,
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
# OSV requires package names to query. A blind sync isn't supported.
# Use query_packages() instead.
logger.warning(dict(msg='osv sync requires explicit package list, use query_packages()'))
return []
def query_packages(
self,
packages: list[tuple[str, str]],
on_progress: Optional[SyncProgressCallback] = None,
) -> list[cve_entry_t]:
"""Query OSV for a list of (name, version) tuples.
Uses batch API. Returns unified CVE entries.
"""
entries: list[cve_entry_t] = []
total = len(packages)
for batch_start in range(0, total, BATCH_SIZE):
batch = packages[batch_start:batch_start + BATCH_SIZE]
request = osv_batch_request_t(
queries=[
osv_query_t(
package=osv_package_query_t(
name=name,
ecosystem=self._ecosystem,
),
version=version,
)
for name, version in batch
]
)
req = urllib.request.Request(
QUERY_URL,
data=request.model_dump_json().encode('utf-8'),
headers={'Content-Type': 'application/json'},
method='POST',
)
resp = urllib.request.urlopen(req, timeout=30)
raw = resp.read()
batch_resp = pydantic.TypeAdapter(osv_batch_response_t).validate_json(raw)
for i, result in enumerate(batch_resp.results):
if i >= len(batch):
break
pkg_name, pkg_version = batch[i]
for vuln in result.vulns:
entries.append(
cve_entry_t(
cve_id=vuln.id,
source=cve_source_t.osv,
product=pkg_name,
date_modified=vuln.modified,
title=vuln.id,
)
)
if on_progress is not None:
done = min(batch_start + BATCH_SIZE, total)
on_progress(done, total)
logger.info(dict(msg='osv query done', packages=total, entries=len(entries)))
return entries

@ -0,0 +1,30 @@
"""Pydantic models for OSV (Google) API responses."""
import pydantic
class osv_vuln_brief_t(pydantic.BaseModel):
id: str
modified: str = ''
class osv_query_result_t(pydantic.BaseModel):
vulns: list[osv_vuln_brief_t] = pydantic.Field(default_factory=list)
class osv_batch_response_t(pydantic.BaseModel):
results: list[osv_query_result_t] = pydantic.Field(default_factory=list)
class osv_package_query_t(pydantic.BaseModel):
name: str
ecosystem: str
class osv_query_t(pydantic.BaseModel):
package: osv_package_query_t
version: str = ''
class osv_batch_request_t(pydantic.BaseModel):
queries: list[osv_query_t]

@ -0,0 +1,82 @@
"""CVE unified types — exposed to users of the cve module."""
import enum
import pydantic
class cve_source_t(enum.Enum):
arch_tracker = 'arch_tracker'
nvd = 'nvd'
osv = 'osv'
class cve_severity_t(enum.Enum):
unknown = 'unknown'
low = 'low'
medium = 'medium'
high = 'high'
critical = 'critical'
class cve_status_t(enum.Enum):
unknown = 'unknown'
vulnerable = 'vulnerable'
fixed = 'fixed'
not_affected = 'not_affected'
class cve_entry_t(pydantic.BaseModel):
"""Unified CVE entry across all sources."""
model_config = pydantic.ConfigDict(frozen=True)
cve_id: str
source: cve_source_t
product: str
version_affected: str = ''
version_fixed: str = ''
severity: cve_severity_t = cve_severity_t.unknown
score: float = 0.0
title: str = ''
description: str = ''
date_published: str = ''
date_modified: str = ''
status: cve_status_t = cve_status_t.unknown
class cve_sync_status_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(frozen=True)
source: cve_source_t
last_sync: str = ''
entry_count: int = 0
class cve_sync_estimate_t(pydantic.BaseModel):
"""Estimate for an upcoming sync.
num_fetches: expected HTTP requests needed.
content_length: total bytes across all fetches (from HEAD).
"""
model_config = pydantic.ConfigDict(frozen=True)
source: cve_source_t
num_fetches: int = 0
content_length: int = 0
incremental: bool = False
available: bool = True
class cve_check_result_t(pydantic.BaseModel):
model_config = pydantic.ConfigDict(frozen=True)
package: str
version: str
cve_id: str
severity: cve_severity_t
score: float
title: str
version_fixed: str = ''
status: cve_status_t = cve_status_t.unknown

@ -0,0 +1,442 @@
"""Tests for apps/cve/ module.
Test coverage:
- types: parse arch tracker JSON, NVD JSON via pydantic
- db: ORM migration, upsert, query, sync meta
- arch_tracker: parse real AVG samples
- nvd: parse real NVD response sample
- osv: parse real OSV batch response sample
- checker: version comparison, affected/fixed logic
- heuristics: normalize, alias mapping, exact match
"""
import json
import pathlib
import sqlite3
import tempfile
import unittest
import pydantic
from ..apps.cve.arch_tracker_types import arch_avg_t
from ..apps.cve.nvd_types import nvd_response_t
from ..apps.cve.osv_types import osv_batch_response_t
from ..apps.cve.types import (
cve_entry_t,
cve_severity_t,
cve_source_t,
cve_status_t,
cve_sync_estimate_t,
)
from ..apps.cve.db import cve_db_t
from ..apps.cve.checker import check_packages, _is_affected
from ..apps.cve.heuristics import (
KNOWN_ALIASES,
map_to_arch_package,
normalize_product_name,
)
from ..apps.orm.registry import orm_registry_t
# ── real-world JSON samples (from curl) ──
ARCH_AVG_SAMPLE = json.dumps([
{
"name": "AVG-2843",
"packages": ["vim"],
"status": "Unknown",
"severity": "Unknown",
"type": "unknown",
"affected": "9.0.1224-1",
"fixed": "9.0.1225-1",
"ticket": None,
"issues": ["CVE-2023-0433", "CVE-2023-0288"],
"advisories": [],
},
{
"name": "AVG-2842",
"packages": ["libtiff"],
"status": "Unknown",
"severity": "Unknown",
"type": "unknown",
"affected": "4.4.0-1",
"fixed": None,
"ticket": None,
"issues": ["CVE-2022-48281", "CVE-2022-3970"],
"advisories": [],
},
])
NVD_RESPONSE_SAMPLE = json.dumps({
"resultsPerPage": 1,
"startIndex": 0,
"totalResults": 1,
"format": "NVD_CVE",
"version": "2.0",
"timestamp": "2026-04-13T00:00:00.000",
"vulnerabilities": [
{
"cve": {
"id": "CVE-2023-0433",
"sourceIdentifier": "security@huntr.dev",
"published": "2023-01-21T15:15:10.153",
"lastModified": "2024-11-21T07:37:10.260",
"vulnStatus": "Analyzed",
"descriptions": [
{"lang": "en", "value": "Heap-based Buffer Overflow in GitHub repository vim/vim prior to 9.0.1225."},
{"lang": "es", "value": "Desbordamiento de búfer basado en heap en vim/vim."},
],
"metrics": {
"cvssMetricV31": [
{
"source": "nvd@nist.gov",
"type": "Primary",
"cvssData": {
"version": "3.1",
"baseScore": 7.8,
"baseSeverity": "HIGH",
},
}
],
},
}
}
],
})
OSV_RESPONSE_SAMPLE = json.dumps({
"results": [
{
"vulns": [
{"id": "DEBIAN-CVE-2023-0433", "modified": "2026-01-01T00:00:00Z"},
{"id": "DEBIAN-CVE-2023-0288", "modified": "2026-01-01T00:00:00Z"},
]
},
{
"vulns": []
},
]
})
# ── Type parsing tests ──
class TestArchTrackerTypes(unittest.TestCase):
def test_parse_avg_list(self) -> None:
adapter = pydantic.TypeAdapter(list[arch_avg_t])
avgs = adapter.validate_json(ARCH_AVG_SAMPLE.encode())
self.assertEqual(len(avgs), 2)
self.assertEqual(avgs[0].name, 'AVG-2843')
self.assertEqual(avgs[0].packages, ['vim'])
self.assertEqual(len(avgs[0].issues), 2)
self.assertEqual(avgs[0].affected, '9.0.1224-1')
self.assertEqual(avgs[0].fixed, '9.0.1225-1')
def test_parse_avg_null_fixed(self) -> None:
adapter = pydantic.TypeAdapter(list[arch_avg_t])
avgs = adapter.validate_json(ARCH_AVG_SAMPLE.encode())
self.assertIsNone(avgs[1].fixed)
def test_parse_avg_empty_advisories(self) -> None:
adapter = pydantic.TypeAdapter(list[arch_avg_t])
avgs = adapter.validate_json(ARCH_AVG_SAMPLE.encode())
self.assertEqual(avgs[0].advisories, [])
class TestNvdTypes(unittest.TestCase):
def test_parse_response(self) -> None:
resp = pydantic.TypeAdapter(nvd_response_t).validate_json(NVD_RESPONSE_SAMPLE.encode())
self.assertEqual(resp.totalResults, 1)
self.assertEqual(len(resp.vulnerabilities), 1)
cve = resp.vulnerabilities[0].cve
self.assertEqual(cve.id, 'CVE-2023-0433')
self.assertEqual(cve.published, '2023-01-21T15:15:10.153')
def test_parse_descriptions(self) -> None:
resp = pydantic.TypeAdapter(nvd_response_t).validate_json(NVD_RESPONSE_SAMPLE.encode())
descs = resp.vulnerabilities[0].cve.descriptions
en = [d for d in descs if d.lang == 'en']
self.assertEqual(len(en), 1)
self.assertIn('vim', en[0].value)
def test_parse_cvss(self) -> None:
resp = pydantic.TypeAdapter(nvd_response_t).validate_json(NVD_RESPONSE_SAMPLE.encode())
metrics = resp.vulnerabilities[0].cve.metrics
self.assertIn('cvssMetricV31', metrics)
m = metrics['cvssMetricV31'][0]
self.assertAlmostEqual(m.cvssData.baseScore, 7.8)
self.assertEqual(m.cvssData.baseSeverity, 'HIGH')
class TestOsvTypes(unittest.TestCase):
def test_parse_batch_response(self) -> None:
resp = pydantic.TypeAdapter(osv_batch_response_t).validate_json(OSV_RESPONSE_SAMPLE.encode())
self.assertEqual(len(resp.results), 2)
self.assertEqual(len(resp.results[0].vulns), 2)
self.assertEqual(resp.results[0].vulns[0].id, 'DEBIAN-CVE-2023-0433')
self.assertEqual(len(resp.results[1].vulns), 0)
# ── DB tests ──
class TestCveDb(unittest.TestCase):
def setUp(self) -> None:
self.tmpdir = tempfile.mkdtemp()
orm_registry_t._registered_classes.clear()
orm_registry_t._instances.clear()
orm_registry_t.register(cve_db_t)
self.db_path = pathlib.Path(self.tmpdir) / 'test.db'
self.db = cve_db_t(self.db_path)
def tearDown(self) -> None:
orm_registry_t.reset()
def test_migration_creates_tables(self) -> None:
tables = self.db._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
names = {r[0] for r in tables}
self.assertIn('cve_entries', names)
self.assertIn('cve_details', names)
self.assertIn('cve_sync_meta', names)
def test_upsert_and_query(self) -> None:
entries = [
cve_entry_t(
cve_id='CVE-2023-0433',
source=cve_source_t.arch_tracker,
product='vim',
version_affected='9.0.1224-1',
version_fixed='9.0.1225-1',
severity=cve_severity_t.high,
score=7.8,
title='AVG-2843 unknown',
),
cve_entry_t(
cve_id='CVE-2023-0288',
source=cve_source_t.arch_tracker,
product='vim',
version_affected='9.0.1224-1',
version_fixed='9.0.1225-1',
),
]
count = self.db.upsert_entries(entries)
self.assertEqual(count, 2)
results = self.db.query_by_product('vim')
self.assertEqual(len(results), 2)
self.assertEqual(results[0].cve_id, 'CVE-2023-0433')
self.assertEqual(results[0].severity, cve_severity_t.high)
def test_upsert_updates_existing(self) -> None:
e1 = cve_entry_t(
cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker,
product='vim', title='old title',
)
self.db.upsert_entries([e1])
e2 = cve_entry_t(
cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker,
product='vim', title='new title',
)
self.db.upsert_entries([e2])
results = self.db.query_by_product('vim')
self.assertEqual(len(results), 1)
self.assertEqual(results[0].title, 'new title')
def test_query_by_cve_id(self) -> None:
self.db.upsert_entries([
cve_entry_t(cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker, product='vim'),
cve_entry_t(cve_id='CVE-2023-0433', source=cve_source_t.nvd, product='CVE-2023-0433'),
])
results = self.db.query_by_cve_id('CVE-2023-0433')
self.assertEqual(len(results), 2)
def test_query_by_source(self) -> None:
self.db.upsert_entries([
cve_entry_t(cve_id='CVE-A', source=cve_source_t.arch_tracker, product='a'),
cve_entry_t(cve_id='CVE-B', source=cve_source_t.nvd, product='b'),
])
arch_only = self.db.query_by_source(cve_source_t.arch_tracker)
self.assertEqual(len(arch_only), 1)
self.assertEqual(arch_only[0].cve_id, 'CVE-A')
def test_count_entries(self) -> None:
self.db.upsert_entries([
cve_entry_t(cve_id='CVE-A', source=cve_source_t.arch_tracker, product='a'),
cve_entry_t(cve_id='CVE-B', source=cve_source_t.nvd, product='b'),
])
self.assertEqual(self.db.count_entries(), 2)
self.assertEqual(self.db.count_entries(cve_source_t.arch_tracker), 1)
def test_sync_meta(self) -> None:
self.db.update_sync_meta(cve_source_t.arch_tracker, '2026-04-13', 500)
status = self.db.get_sync_status(cve_source_t.arch_tracker)
self.assertEqual(status.last_sync, '2026-04-13')
self.assertEqual(status.entry_count, 500)
def test_sync_meta_all(self) -> None:
self.db.update_sync_meta(cve_source_t.arch_tracker, '2026-04-13', 100)
self.db.update_sync_meta(cve_source_t.nvd, '2026-04-12', 200)
statuses = self.db.get_all_sync_statuses()
self.assertEqual(len(statuses), 2)
def test_sync_meta_missing(self) -> None:
status = self.db.get_sync_status(cve_source_t.osv)
self.assertEqual(status.last_sync, '')
self.assertEqual(status.entry_count, 0)
def test_upsert_detail(self) -> None:
self.db.upsert_detail('CVE-2023-0433', cve_source_t.nvd, {'score': 7.8})
row = self.db._conn.execute(
'SELECT raw_json FROM cve_details WHERE cve_id = ?', ('CVE-2023-0433',)
).fetchone()
self.assertIsNotNone(row)
data = json.loads(row[0])
self.assertAlmostEqual(data['score'], 7.8)
# ── Checker tests ──
class TestChecker(unittest.TestCase):
def setUp(self) -> None:
self.tmpdir = tempfile.mkdtemp()
orm_registry_t._registered_classes.clear()
orm_registry_t._instances.clear()
orm_registry_t.register(cve_db_t)
self.db = cve_db_t(pathlib.Path(self.tmpdir) / 'test.db')
def tearDown(self) -> None:
orm_registry_t.reset()
def test_affected_below_fix(self) -> None:
e = cve_entry_t(
cve_id='CVE-X', source=cve_source_t.arch_tracker, product='vim',
version_affected='9.0.1224-1', version_fixed='9.0.1225-1',
)
self.assertTrue(_is_affected('9.0.1224-1', e))
self.assertFalse(_is_affected('9.0.1225-1', e))
self.assertFalse(_is_affected('9.1.0-1', e))
def test_affected_no_fix(self) -> None:
e = cve_entry_t(
cve_id='CVE-X', source=cve_source_t.arch_tracker, product='libtiff',
version_affected='4.4.0-1', status=cve_status_t.vulnerable,
)
self.assertTrue(_is_affected('4.4.0-1', e))
self.assertFalse(_is_affected('4.5.0-1', e))
def test_not_affected_above_fix(self) -> None:
e = cve_entry_t(
cve_id='CVE-X', source=cve_source_t.arch_tracker, product='vim',
version_affected='9.0.1224-1', version_fixed='9.0.1225-1',
)
self.assertFalse(_is_affected('10.0.0-1', e))
def test_check_packages_finds_vulnerable(self) -> None:
self.db.upsert_entries([
cve_entry_t(
cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker,
product='vim', version_affected='9.0.1224-1', version_fixed='9.0.1225-1',
severity=cve_severity_t.high, score=7.8,
),
])
results = check_packages(self.db, [('vim', '9.0.1224-1')])
self.assertEqual(len(results), 1)
self.assertEqual(results[0].cve_id, 'CVE-2023-0433')
def test_check_packages_skips_fixed(self) -> None:
self.db.upsert_entries([
cve_entry_t(
cve_id='CVE-2023-0433', source=cve_source_t.arch_tracker,
product='vim', version_affected='9.0.1224-1', version_fixed='9.0.1225-1',
),
])
results = check_packages(self.db, [('vim', '9.0.1225-1')])
self.assertEqual(len(results), 0)
def test_check_packages_source_filter(self) -> None:
self.db.upsert_entries([
cve_entry_t(cve_id='CVE-A', source=cve_source_t.arch_tracker, product='vim',
version_affected='1.0-1', version_fixed='2.0-1'),
cve_entry_t(cve_id='CVE-B', source=cve_source_t.nvd, product='vim',
version_affected='1.0-1', version_fixed='2.0-1'),
])
results = check_packages(
self.db, [('vim', '1.0-1')],
sources=[cve_source_t.arch_tracker],
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].cve_id, 'CVE-A')
def test_check_no_cves_for_package(self) -> None:
results = check_packages(self.db, [('nonexistent', '1.0-1')])
self.assertEqual(len(results), 0)
# ── Heuristics tests ──
class TestHeuristics(unittest.TestCase):
def test_normalize_underscore(self) -> None:
self.assertEqual(normalize_product_name('linux_kernel'), 'linux-kernel')
def test_normalize_case(self) -> None:
self.assertEqual(normalize_product_name('OpenSSL'), 'openssl')
def test_normalize_special_chars(self) -> None:
self.assertEqual(normalize_product_name('lib@foo!bar'), 'libfoobar')
def test_alias_linux_kernel(self) -> None:
self.assertEqual(map_to_arch_package('linux_kernel'), 'linux')
def test_alias_node(self) -> None:
self.assertEqual(map_to_arch_package('node.js'), 'nodejs')
def test_alias_grub(self) -> None:
self.assertEqual(map_to_arch_package('grub2'), 'grub')
def test_exact_match(self) -> None:
known = {'vim', 'bash', 'glibc'}
self.assertEqual(map_to_arch_package('vim', known), 'vim')
def test_normalized_match(self) -> None:
known = {'linux-kernel', 'bash'}
# 'linux_kernel' normalizes to 'linux-kernel'
# but known aliases maps it to 'linux' first — which is not in known
# so alias fails, exact fails, normalized matches
result = map_to_arch_package('linux_kernel', known)
self.assertEqual(result, 'linux-kernel')
def test_no_match(self) -> None:
known = {'bash', 'vim'}
self.assertIsNone(map_to_arch_package('totally_unknown_product', known))
def test_alias_respects_known_packages(self) -> None:
# alias maps 'grub2' -> 'grub', but if 'grub' not in known, skip alias
known = {'vim'}
self.assertIsNone(map_to_arch_package('grub2', known))
def test_all_known_aliases_are_lowercase(self) -> None:
for key in KNOWN_ALIASES:
self.assertEqual(key, key.lower(), 'alias key must be lowercase: %s' % key)
# ── Sync estimate types ──
class TestSyncEstimate(unittest.TestCase):
def test_available(self) -> None:
e = cve_sync_estimate_t(
source=cve_source_t.arch_tracker,
num_fetches=1,
content_length=903023,
available=True,
)
self.assertTrue(e.available)
self.assertEqual(e.num_fetches, 1)
def test_unavailable(self) -> None:
e = cve_sync_estimate_t(source=cve_source_t.osv, available=False)
self.assertFalse(e.available)
self.assertEqual(e.num_fetches, 0)