diff --git a/src/vunnel/providers/debian/parser.py b/src/vunnel/providers/debian/parser.py index 8c5b6227..57c3a078 100644 --- a/src/vunnel/providers/debian/parser.py +++ b/src/vunnel/providers/debian/parser.py @@ -1,6 +1,7 @@ from __future__ import annotations import copy +import glob import logging import os import re @@ -9,6 +10,7 @@ import orjson +from vunnel.result import SQLiteReader from vunnel.utils import http, vulnerability DSAFixedInTuple = namedtuple("DSAFixedInTuple", ["dsa", "link", "distro", "pkg", "ver"]) @@ -449,7 +451,52 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915,C901 return vuln_records - def _get_legacy_records(self): + def _get_legacy_records(self) -> dict[str, dict[str, Any]]: + legacy_records = self._get_legacy_records_from_results_db() + + fs_legacy_records = self._get_legacy_records_from_feed_service_datadrop() + for relno, vuln_dict in fs_legacy_records.items(): + if relno not in legacy_records: + legacy_records[relno] = {} + legacy_records[relno].update(vuln_dict) + + if legacy_records: + self.logger.info(f"found existing legacy data for the following releases: {list(legacy_records.keys())}") + else: + self.logger.info("no existing legacy data found") + + return legacy_records + + def _get_legacy_records_from_results_db(self) -> dict[str, dict[str, Any]]: + legacy_records = {} + + def process_result(file_path: str) -> None: + self.logger.info(f"found existing legacy dataset: {file_path}") + + releases = set() + records = 0 + with SQLiteReader(file_path) as db: + envelopes = db.read_all() + for envelope in envelopes: + relno = envelope.item["Vulnerability"]["NamespaceName"].split(":")[-1] + releases.add(relno) + vid = envelope.item["Vulnerability"]["Name"] + if relno not in legacy_records: + legacy_records[relno] = {} + + records += 1 + legacy_records[relno][vid] = envelope.item + + self.logger.debug(f"legacy dataset {file_path} contains {len(releases)} releases with {records} records") + + result_files = glob.glob(os.path.join(self.legacy_records_path, "**", "results.db"), recursive=True) + + for file_path in result_files: + process_result(file_path) + + return legacy_records + + def _get_legacy_records_from_feed_service_datadrop(self) -> dict[str, dict[str, Any]]: legacy_records = {} def process_file(contents: list[dict[str, Any]]) -> None: @@ -477,9 +524,7 @@ def process_file(contents: list[dict[str, Any]]) -> None: process_file(orjson.loads(f.read())) if legacy_records: - self.logger.info(f"found existing legacy data for the following releases: {list(legacy_records.keys())}") - else: - self.logger.info("no existing legacy data found") + self.logger.info(f"found feed service legacy data for the following releases: {list(legacy_records.keys())}") return legacy_records @@ -496,7 +541,10 @@ def get(self): # fetch records from legacy (if they exist) legacy_records = self._get_legacy_records() - vuln_records.update(legacy_records) + for relno, vuln_dict in legacy_records.items(): + if relno not in vuln_records: + vuln_records[relno] = {} + vuln_records[relno].update(vuln_dict) if vuln_records: for relno, vuln_dict in vuln_records.items(): diff --git a/src/vunnel/result.py b/src/vunnel/result.py index 73c1fe69..3e0f34cc 100644 --- a/src/vunnel/result.py +++ b/src/vunnel/result.py @@ -285,9 +285,15 @@ def read(self, identifier: str) -> dict[str, Any] | None: return orjson.loads(result.record) + def read_all(self) -> list[Envelope]: + conn, table = self.connection() + with conn.begin(): + results = conn.execute(table.select()).fetchall() + return [Envelope(**orjson.loads(r.record)) for r in results] + def connection(self) -> tuple[db.engine.Connection, db.Table]: if not self.conn: - self.engine = db.create_engine(f"sqlite:///{self.db_path}") + self.engine = db.create_engine(f"sqlite:///{self.db_path}?mode=ro") self.conn = self.engine.connect() # type: ignore[attr-defined] metadata = db.MetaData(bind=self.engine) self.table = db.Table(self.table_name, metadata, autoload=True, autoload_with=self.engine) diff --git a/tests/unit/providers/debian/test-fixtures/input/legacy/debian-7/.gitignore b/tests/unit/providers/debian/test-fixtures/input/legacy/debian-7/.gitignore new file mode 100644 index 00000000..7da7d05e --- /dev/null +++ b/tests/unit/providers/debian/test-fixtures/input/legacy/debian-7/.gitignore @@ -0,0 +1,2 @@ +# this is not a DB -- I promise +!results.db diff --git a/tests/unit/providers/debian/test-fixtures/input/legacy/debian-7/results.db b/tests/unit/providers/debian/test-fixtures/input/legacy/debian-7/results.db new file mode 100644 index 00000000..d06529b4 --- /dev/null +++ b/tests/unit/providers/debian/test-fixtures/input/legacy/debian-7/results.db @@ -0,0 +1 @@ +# this is a dummy DB! diff --git a/tests/unit/providers/debian/test-fixtures/snapshots/debian:10/cve-2012-0833.json b/tests/unit/providers/debian/test-fixtures/snapshots/debian:10/cve-2012-0833.json new file mode 100644 index 00000000..218ae087 --- /dev/null +++ b/tests/unit/providers/debian/test-fixtures/snapshots/debian:10/cve-2012-0833.json @@ -0,0 +1 @@ +{"schema":"https://raw.githubusercontent.com/anchore/vunnel/main/schema/vulnerability/os/schema-1.0.0.json","identifier":"debian:10/cve-2012-0833","item":{"Vulnerability":{"Severity":"Negligible","NamespaceName":"debian:10","FixedIn":[],"Link":"https://security-tracker.debian.org/tracker/CVE-2012-0833","Description":"The acllas__handle_group_entry function in servers/plugins/acl/acllas.c in 389 Directory Server before 1.2.10 does not properly handled access control instructions (ACIs) that use certificate groups, which allows remote authenticated LDAP users with a certificate group to cause a denial of service (infinite loop and CPU consumption) by binding to the server.","Metadata":{},"Name":"CVE-2012-0833","CVSS":[]}}} diff --git a/tests/unit/providers/debian/test_debian.py b/tests/unit/providers/debian/test_debian.py index aa48c083..0d09eb41 100644 --- a/tests/unit/providers/debian/test_debian.py +++ b/tests/unit/providers/debian/test_debian.py @@ -2,17 +2,40 @@ import os.path import shutil -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from vunnel import result, workspace from vunnel.providers.debian import Config, Provider, parser +@pytest.fixture() +def mock_legacy_db(mocker): + mock_record = { + "schema": "https://raw.githubusercontent.com/anchore/vunnel/main/schema/vulnerability/os/schema-1.0.0.json", + "identifier": "debian:10/cve-2012-0833", + "item": { + "Vulnerability": { + "Severity": "Negligible", + "NamespaceName": "debian:10", + "FixedIn": [], + "Link": "https://security-tracker.debian.org/tracker/CVE-2012-0833", + "Description": "The acllas__handle_group_entry function in servers/plugins/acl/acllas.c in 389 Directory Server before 1.2.10 does not properly handled access control instructions (ACIs) that use certificate groups, which allows remote authenticated LDAP users with a certificate group to cause a denial of service (infinite loop and CPU consumption) by binding to the server.", + "Metadata": {}, + "Name": "CVE-2012-0833", + "CVSS": [], + } + }, + } + + mock_records = [result.Envelope(**mock_record)] + + mocker.patch("vunnel.result.SQLiteReader.read_all", return_value=mock_records) + + class TestParser: _sample_dsa_data_ = "test-fixtures/input/DSA" _sample_json_data_ = "test-fixtures/input/debian.json" - _sample_legacy_data = "test-fixtures/input/legacy/vulnerabilities-debian:7-0.json" def test_normalize_dsa_list(self, tmpdir, helpers, disable_get_requests): subject = parser.Parser(workspace=workspace.Workspace(tmpdir, "test", create=True)) @@ -84,7 +107,7 @@ def test_normalize_json(self, tmpdir, helpers, disable_get_requests): assert all(x.get("Vulnerability", {}).get("Description") is not None for x in vuln_dict.values()) assert not subject.logger.exception.called, "no exceptions should be logged" - def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests): + def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests, mock_legacy_db): subject = parser.Parser(workspace=workspace.Workspace(tmpdir, "test", create=True)) mock_data_path = helpers.local_dir("test-fixtures/input") @@ -94,8 +117,18 @@ def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests): assert isinstance(legacy_records, dict) assert len(legacy_records) > 0 + + # from the feed service data dump assert "7" in legacy_records.keys() assert len(legacy_records["7"]) > 0 + assert "CVE-2004-1653" in legacy_records["7"].keys() + assert len(legacy_records["7"]["CVE-2004-1653"]) > 0 + + # from the DB + assert "10" in legacy_records.keys() + assert len(legacy_records["10"]) > 0 + assert "CVE-2012-0833" in legacy_records["10"].keys() + assert len(legacy_records["10"]["CVE-2012-0833"]) > 0 for _rel, vuln_dict in legacy_records.items(): assert isinstance(vuln_dict, dict) @@ -107,7 +140,7 @@ def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests): assert all(x.get("Vulnerability", {}).get("Description") is not None for x in vuln_dict.values()) -def test_provider_schema(helpers, disable_get_requests, monkeypatch): +def test_provider_schema(helpers, disable_get_requests, monkeypatch, mock_legacy_db): workspace = helpers.provider_workspace_helper( name=Provider.name(), input_fixture="test-fixtures/input", @@ -129,13 +162,13 @@ def mock_download(): p.update(None) - # 17 entries from the legacy records, 21 from the mock json data - expected = 38 + # 18 entries from the legacy FS records, 1 from legacy DB record, 21 from the mock json data + expected = 39 assert workspace.num_result_entries() == expected assert workspace.result_schemas_valid(require_entries=True) -def test_provider_via_snapshot(helpers, disable_get_requests, monkeypatch): +def test_provider_via_snapshot(helpers, disable_get_requests, monkeypatch, mock_legacy_db): workspace = helpers.provider_workspace_helper( name=Provider.name(), input_fixture="test-fixtures/input",