Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading result DBs for Debian provider #613

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions src/vunnel/providers/debian/parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import copy
import glob
import logging
import os
import re
Expand All @@ -9,6 +10,7 @@

import orjson

from vunnel.result import SQLiteReader
from vunnel.utils import http, vulnerability

DSAFixedInTuple = namedtuple("DSAFixedInTuple", ["dsa", "link", "distro", "pkg", "ver"])
Expand Down Expand Up @@ -449,7 +451,52 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915,C901

return vuln_records

def _get_legacy_records(self):
def _get_legacy_records(self) -> dict[str, dict[str, Any]]:
legacy_records = self._get_legacy_records_from_results_db()

fs_legacy_records = self._get_legacy_records_from_feed_service_datadrop()
for relno, vuln_dict in fs_legacy_records.items():
if relno not in legacy_records:
legacy_records[relno] = {}
legacy_records[relno].update(vuln_dict)

if legacy_records:
self.logger.info(f"found existing legacy data for the following releases: {list(legacy_records.keys())}")
else:
self.logger.info("no existing legacy data found")

return legacy_records

def _get_legacy_records_from_results_db(self) -> dict[str, dict[str, Any]]:
legacy_records = {}

def process_result(file_path: str) -> None:
self.logger.info(f"found existing legacy dataset: {file_path}")

releases = set()
records = 0
with SQLiteReader(file_path) as db:
envelopes = db.read_all()
for envelope in envelopes:
relno = envelope.item["Vulnerability"]["NamespaceName"].split(":")[-1]
releases.add(relno)
vid = envelope.item["Vulnerability"]["Name"]
if relno not in legacy_records:
legacy_records[relno] = {}

records += 1
legacy_records[relno][vid] = envelope.item

self.logger.debug(f"legacy dataset {file_path} contains {len(releases)} releases with {records} records")

result_files = glob.glob(os.path.join(self.legacy_records_path, "**", "results.db"), recursive=True)

for file_path in result_files:
process_result(file_path)

return legacy_records

def _get_legacy_records_from_feed_service_datadrop(self) -> dict[str, dict[str, Any]]:
legacy_records = {}

def process_file(contents: list[dict[str, Any]]) -> None:
Expand Down Expand Up @@ -477,9 +524,7 @@ def process_file(contents: list[dict[str, Any]]) -> None:
process_file(orjson.loads(f.read()))

if legacy_records:
self.logger.info(f"found existing legacy data for the following releases: {list(legacy_records.keys())}")
else:
self.logger.info("no existing legacy data found")
self.logger.info(f"found feed service legacy data for the following releases: {list(legacy_records.keys())}")

return legacy_records

Expand All @@ -496,7 +541,10 @@ def get(self):

# fetch records from legacy (if they exist)
legacy_records = self._get_legacy_records()
vuln_records.update(legacy_records)
for relno, vuln_dict in legacy_records.items():
if relno not in vuln_records:
vuln_records[relno] = {}
vuln_records[relno].update(vuln_dict)

if vuln_records:
for relno, vuln_dict in vuln_records.items():
Expand Down
8 changes: 7 additions & 1 deletion src/vunnel/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,15 @@ def read(self, identifier: str) -> dict[str, Any] | None:

return orjson.loads(result.record)

def read_all(self) -> list[Envelope]:
conn, table = self.connection()
with conn.begin():
results = conn.execute(table.select()).fetchall()
return [Envelope(**orjson.loads(r.record)) for r in results]

def connection(self) -> tuple[db.engine.Connection, db.Table]:
if not self.conn:
self.engine = db.create_engine(f"sqlite:///{self.db_path}")
self.engine = db.create_engine(f"sqlite:///{self.db_path}?mode=ro")
self.conn = self.engine.connect() # type: ignore[attr-defined]
metadata = db.MetaData(bind=self.engine)
self.table = db.Table(self.table_name, metadata, autoload=True, autoload_with=self.engine)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# this is not a DB -- I promise
!results.db
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# this is a dummy DB!
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"schema":"https://raw.githubusercontent.com/anchore/vunnel/main/schema/vulnerability/os/schema-1.0.0.json","identifier":"debian:10/cve-2012-0833","item":{"Vulnerability":{"Severity":"Negligible","NamespaceName":"debian:10","FixedIn":[],"Link":"https://security-tracker.debian.org/tracker/CVE-2012-0833","Description":"The acllas__handle_group_entry function in servers/plugins/acl/acllas.c in 389 Directory Server before 1.2.10 does not properly handled access control instructions (ACIs) that use certificate groups, which allows remote authenticated LDAP users with a certificate group to cause a denial of service (infinite loop and CPU consumption) by binding to the server.","Metadata":{},"Name":"CVE-2012-0833","CVSS":[]}}}
47 changes: 40 additions & 7 deletions tests/unit/providers/debian/test_debian.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,40 @@

import os.path
import shutil
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
from vunnel import result, workspace
from vunnel.providers.debian import Config, Provider, parser


@pytest.fixture()
def mock_legacy_db(mocker):
mock_record = {
"schema": "https://raw.githubusercontent.com/anchore/vunnel/main/schema/vulnerability/os/schema-1.0.0.json",
"identifier": "debian:10/cve-2012-0833",
"item": {
"Vulnerability": {
"Severity": "Negligible",
"NamespaceName": "debian:10",
"FixedIn": [],
"Link": "https://security-tracker.debian.org/tracker/CVE-2012-0833",
"Description": "The acllas__handle_group_entry function in servers/plugins/acl/acllas.c in 389 Directory Server before 1.2.10 does not properly handled access control instructions (ACIs) that use certificate groups, which allows remote authenticated LDAP users with a certificate group to cause a denial of service (infinite loop and CPU consumption) by binding to the server.",
"Metadata": {},
"Name": "CVE-2012-0833",
"CVSS": [],
}
},
}

mock_records = [result.Envelope(**mock_record)]

mocker.patch("vunnel.result.SQLiteReader.read_all", return_value=mock_records)


class TestParser:
_sample_dsa_data_ = "test-fixtures/input/DSA"
_sample_json_data_ = "test-fixtures/input/debian.json"
_sample_legacy_data = "test-fixtures/input/legacy/vulnerabilities-debian:7-0.json"

def test_normalize_dsa_list(self, tmpdir, helpers, disable_get_requests):
subject = parser.Parser(workspace=workspace.Workspace(tmpdir, "test", create=True))
Expand Down Expand Up @@ -84,7 +107,7 @@ def test_normalize_json(self, tmpdir, helpers, disable_get_requests):
assert all(x.get("Vulnerability", {}).get("Description") is not None for x in vuln_dict.values())
assert not subject.logger.exception.called, "no exceptions should be logged"

def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests):
def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests, mock_legacy_db):
subject = parser.Parser(workspace=workspace.Workspace(tmpdir, "test", create=True))

mock_data_path = helpers.local_dir("test-fixtures/input")
Expand All @@ -94,8 +117,18 @@ def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests):

assert isinstance(legacy_records, dict)
assert len(legacy_records) > 0

# from the feed service data dump
assert "7" in legacy_records.keys()
assert len(legacy_records["7"]) > 0
assert "CVE-2004-1653" in legacy_records["7"].keys()
assert len(legacy_records["7"]["CVE-2004-1653"]) > 0

# from the DB
assert "10" in legacy_records.keys()
assert len(legacy_records["10"]) > 0
assert "CVE-2012-0833" in legacy_records["10"].keys()
assert len(legacy_records["10"]["CVE-2012-0833"]) > 0

for _rel, vuln_dict in legacy_records.items():
assert isinstance(vuln_dict, dict)
Expand All @@ -107,7 +140,7 @@ def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests):
assert all(x.get("Vulnerability", {}).get("Description") is not None for x in vuln_dict.values())


def test_provider_schema(helpers, disable_get_requests, monkeypatch):
def test_provider_schema(helpers, disable_get_requests, monkeypatch, mock_legacy_db):
workspace = helpers.provider_workspace_helper(
name=Provider.name(),
input_fixture="test-fixtures/input",
Expand All @@ -129,13 +162,13 @@ def mock_download():

p.update(None)

# 17 entries from the legacy records, 21 from the mock json data
expected = 38
# 18 entries from the legacy FS records, 1 from legacy DB record, 21 from the mock json data
expected = 39
assert workspace.num_result_entries() == expected
assert workspace.result_schemas_valid(require_entries=True)


def test_provider_via_snapshot(helpers, disable_get_requests, monkeypatch):
def test_provider_via_snapshot(helpers, disable_get_requests, monkeypatch, mock_legacy_db):
workspace = helpers.provider_workspace_helper(
name=Provider.name(),
input_fixture="test-fixtures/input",
Expand Down
Loading