Skip to content

Commit

Permalink
add support for reading result dbs for debian provider (#613)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>
  • Loading branch information
wagoodman committed Jun 28, 2024
1 parent 5a6a6d9 commit acb806c
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 13 deletions.
58 changes: 53 additions & 5 deletions src/vunnel/providers/debian/parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import copy
import glob
import logging
import os
import re
Expand All @@ -9,6 +10,7 @@

import orjson

from vunnel.result import SQLiteReader
from vunnel.utils import http, vulnerability

DSAFixedInTuple = namedtuple("DSAFixedInTuple", ["dsa", "link", "distro", "pkg", "ver"])
Expand Down Expand Up @@ -449,7 +451,52 @@ def _normalize_json(self, ns_cve_dsalist=None): # noqa: PLR0912,PLR0915,C901

return vuln_records

def _get_legacy_records(self):
def _get_legacy_records(self) -> dict[str, dict[str, Any]]:
legacy_records = self._get_legacy_records_from_results_db()

fs_legacy_records = self._get_legacy_records_from_feed_service_datadrop()
for relno, vuln_dict in fs_legacy_records.items():
if relno not in legacy_records:
legacy_records[relno] = {}
legacy_records[relno].update(vuln_dict)

if legacy_records:
self.logger.info(f"found existing legacy data for the following releases: {list(legacy_records.keys())}")
else:
self.logger.info("no existing legacy data found")

return legacy_records

def _get_legacy_records_from_results_db(self) -> dict[str, dict[str, Any]]:
legacy_records = {}

def process_result(file_path: str) -> None:
self.logger.info(f"found existing legacy dataset: {file_path}")

releases = set()
records = 0
with SQLiteReader(file_path) as db:
envelopes = db.read_all()
for envelope in envelopes:
relno = envelope.item["Vulnerability"]["NamespaceName"].split(":")[-1]
releases.add(relno)
vid = envelope.item["Vulnerability"]["Name"]
if relno not in legacy_records:
legacy_records[relno] = {}

records += 1
legacy_records[relno][vid] = envelope.item

self.logger.debug(f"legacy dataset {file_path} contains {len(releases)} releases with {records} records")

result_files = glob.glob(os.path.join(self.legacy_records_path, "**", "results.db"), recursive=True)

for file_path in result_files:
process_result(file_path)

return legacy_records

def _get_legacy_records_from_feed_service_datadrop(self) -> dict[str, dict[str, Any]]:
legacy_records = {}

def process_file(contents: list[dict[str, Any]]) -> None:
Expand Down Expand Up @@ -477,9 +524,7 @@ def process_file(contents: list[dict[str, Any]]) -> None:
process_file(orjson.loads(f.read()))

if legacy_records:
self.logger.info(f"found existing legacy data for the following releases: {list(legacy_records.keys())}")
else:
self.logger.info("no existing legacy data found")
self.logger.info(f"found feed service legacy data for the following releases: {list(legacy_records.keys())}")

return legacy_records

Expand All @@ -496,7 +541,10 @@ def get(self):

# fetch records from legacy (if they exist)
legacy_records = self._get_legacy_records()
vuln_records.update(legacy_records)
for relno, vuln_dict in legacy_records.items():
if relno not in vuln_records:
vuln_records[relno] = {}
vuln_records[relno].update(vuln_dict)

if vuln_records:
for relno, vuln_dict in vuln_records.items():
Expand Down
8 changes: 7 additions & 1 deletion src/vunnel/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,15 @@ def read(self, identifier: str) -> dict[str, Any] | None:

return orjson.loads(result.record)

def read_all(self) -> list[Envelope]:
conn, table = self.connection()
with conn.begin():
results = conn.execute(table.select()).fetchall()
return [Envelope(**orjson.loads(r.record)) for r in results]

def connection(self) -> tuple[db.engine.Connection, db.Table]:
if not self.conn:
self.engine = db.create_engine(f"sqlite:///{self.db_path}")
self.engine = db.create_engine(f"sqlite:///{self.db_path}?mode=ro")
self.conn = self.engine.connect() # type: ignore[attr-defined]
metadata = db.MetaData(bind=self.engine)
self.table = db.Table(self.table_name, metadata, autoload=True, autoload_with=self.engine)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# this is not a DB -- I promise
!results.db
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# this is a dummy DB!
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"schema":"https://raw.githubusercontent.com/anchore/vunnel/main/schema/vulnerability/os/schema-1.0.0.json","identifier":"debian:10/cve-2012-0833","item":{"Vulnerability":{"Severity":"Negligible","NamespaceName":"debian:10","FixedIn":[],"Link":"https://security-tracker.debian.org/tracker/CVE-2012-0833","Description":"The acllas__handle_group_entry function in servers/plugins/acl/acllas.c in 389 Directory Server before 1.2.10 does not properly handled access control instructions (ACIs) that use certificate groups, which allows remote authenticated LDAP users with a certificate group to cause a denial of service (infinite loop and CPU consumption) by binding to the server.","Metadata":{},"Name":"CVE-2012-0833","CVSS":[]}}}
47 changes: 40 additions & 7 deletions tests/unit/providers/debian/test_debian.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,40 @@

import os.path
import shutil
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
from vunnel import result, workspace
from vunnel.providers.debian import Config, Provider, parser


@pytest.fixture()
def mock_legacy_db(mocker):
mock_record = {
"schema": "https://raw.githubusercontent.com/anchore/vunnel/main/schema/vulnerability/os/schema-1.0.0.json",
"identifier": "debian:10/cve-2012-0833",
"item": {
"Vulnerability": {
"Severity": "Negligible",
"NamespaceName": "debian:10",
"FixedIn": [],
"Link": "https://security-tracker.debian.org/tracker/CVE-2012-0833",
"Description": "The acllas__handle_group_entry function in servers/plugins/acl/acllas.c in 389 Directory Server before 1.2.10 does not properly handled access control instructions (ACIs) that use certificate groups, which allows remote authenticated LDAP users with a certificate group to cause a denial of service (infinite loop and CPU consumption) by binding to the server.",
"Metadata": {},
"Name": "CVE-2012-0833",
"CVSS": [],
}
},
}

mock_records = [result.Envelope(**mock_record)]

mocker.patch("vunnel.result.SQLiteReader.read_all", return_value=mock_records)


class TestParser:
_sample_dsa_data_ = "test-fixtures/input/DSA"
_sample_json_data_ = "test-fixtures/input/debian.json"
_sample_legacy_data = "test-fixtures/input/legacy/vulnerabilities-debian:7-0.json"

def test_normalize_dsa_list(self, tmpdir, helpers, disable_get_requests):
subject = parser.Parser(workspace=workspace.Workspace(tmpdir, "test", create=True))
Expand Down Expand Up @@ -84,7 +107,7 @@ def test_normalize_json(self, tmpdir, helpers, disable_get_requests):
assert all(x.get("Vulnerability", {}).get("Description") is not None for x in vuln_dict.values())
assert not subject.logger.exception.called, "no exceptions should be logged"

def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests):
def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests, mock_legacy_db):
subject = parser.Parser(workspace=workspace.Workspace(tmpdir, "test", create=True))

mock_data_path = helpers.local_dir("test-fixtures/input")
Expand All @@ -94,8 +117,18 @@ def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests):

assert isinstance(legacy_records, dict)
assert len(legacy_records) > 0

# from the feed service data dump
assert "7" in legacy_records.keys()
assert len(legacy_records["7"]) > 0
assert "CVE-2004-1653" in legacy_records["7"].keys()
assert len(legacy_records["7"]["CVE-2004-1653"]) > 0

# from the DB
assert "10" in legacy_records.keys()
assert len(legacy_records["10"]) > 0
assert "CVE-2012-0833" in legacy_records["10"].keys()
assert len(legacy_records["10"]["CVE-2012-0833"]) > 0

for _rel, vuln_dict in legacy_records.items():
assert isinstance(vuln_dict, dict)
Expand All @@ -107,7 +140,7 @@ def test_get_legacy_records(self, tmpdir, helpers, disable_get_requests):
assert all(x.get("Vulnerability", {}).get("Description") is not None for x in vuln_dict.values())


def test_provider_schema(helpers, disable_get_requests, monkeypatch):
def test_provider_schema(helpers, disable_get_requests, monkeypatch, mock_legacy_db):
workspace = helpers.provider_workspace_helper(
name=Provider.name(),
input_fixture="test-fixtures/input",
Expand All @@ -129,13 +162,13 @@ def mock_download():

p.update(None)

# 17 entries from the legacy records, 21 from the mock json data
expected = 38
# 18 entries from the legacy FS records, 1 from legacy DB record, 21 from the mock json data
expected = 39
assert workspace.num_result_entries() == expected
assert workspace.result_schemas_valid(require_entries=True)


def test_provider_via_snapshot(helpers, disable_get_requests, monkeypatch):
def test_provider_via_snapshot(helpers, disable_get_requests, monkeypatch, mock_legacy_db):
workspace = helpers.provider_workspace_helper(
name=Provider.name(),
input_fixture="test-fixtures/input",
Expand Down

0 comments on commit acb806c

Please sign in to comment.