From b49750be021ccba4026c74676f227298c18229f6 Mon Sep 17 00:00:00 2001 From: Christop Kraemer Date: Wed, 26 Jul 2023 14:42:07 +0200 Subject: [PATCH] Add: version ranges Now it is possible to set a range for versions within a package is vulnerable --- notus/scanner/loader/json.py | 94 ++++++++++++--- notus/scanner/models/packages/package.py | 66 +++++++++-- notus/scanner/scanner.py | 22 ++-- tests/loader/range.notus | 144 +++++++++++++++++++++++ tests/loader/test_json.py | 52 +++++++- tests/models/packages/test_package.py | 6 +- 6 files changed, 343 insertions(+), 41 deletions(-) create mode 100644 tests/loader/range.notus diff --git a/notus/scanner/loader/json.py b/notus/scanner/loader/json.py index 683b766..75ddfbe 100644 --- a/notus/scanner/loader/json.py +++ b/notus/scanner/loader/json.py @@ -11,7 +11,12 @@ from ..errors import AdvisoriesLoadingError from ..models.packages import package_class_by_type -from ..models.packages.package import PackageAdvisories, PackageType +from ..models.packages.package import ( + Package, + PackageAdvisories, + PackageRange, + PackageType, +) from .gpg_sha_verifier import VerificationResult from .loader import AdvisoriesLoader @@ -75,6 +80,32 @@ def __load_data(self, operating_system: str) -> Optional[Dict]: "while decoding JSON data." ) from None + @staticmethod + def _get_package_from_package_dict( + package_class: Package, + package_dict: Dict[str, str], + operating_system: str, + ) -> Optional[Package]: + full_name = package_dict.get("full_name") + if full_name: + package = package_class.from_full_name(full_name) + else: + package = package_class.from_name_and_full_version( + package_dict.get("name"), + package_dict.get("full_version"), + ) + if not package: + logger.warning( + ( + "could not parse fixed package information from" + " %s in %s: version range must contain exactly" + " 2 entries" + ), + package_dict, + operating_system, + ) + return package + def load_package_advisories( self, operating_system: str ) -> Optional[PackageAdvisories]: @@ -114,25 +145,52 @@ def load_package_advisories( advisory = oid for package_dict in fixed_packages: - full_name = package_dict.get("full_name") - if full_name: - package = package_class.from_full_name(full_name) - else: - package = package_class.from_name_and_full_version( - package_dict.get("name"), - package_dict.get("full_version"), + version_range = package_dict.get("range") + if version_range: + if len(version_range) != 2: + logger.warning( + ( + "could not parse fixed package information from" + " %s in %s: version range must contain exactly" + " 2 entries" + ), + package_dict, + operating_system, + ) + continue + package_dict1 = version_range[0] + package_dict2 = version_range[1] + + package1 = self._get_package_from_package_dict( + package_class, package_dict1, operating_system + ) + if not package1: + continue + package2 = self._get_package_from_package_dict( + package_class, package_dict2, operating_system ) - if not package: - logger.warning( - "Could not parse fixed package information from %s " - "in %s", - package_dict, - operating_system, + if not package2: + continue + package_range = PackageRange( + name=package1.name, + verifier1=package_dict1.get("specifier"), + verifier2=package_dict2.get("specifier"), + package1=package1, + package2=package2, ) - continue + package_advisories.add_range_advisory_for_package( + package_range, advisory + ) + else: + package = self._get_package_from_package_dict( + package_class, package_dict, operating_system + ) + if not package: + logger.warning("No Package") - package_advisories.add_advisory_for_package( - package, advisory, package_dict.get("specifier") - ) + continue + package_advisories.add_advisory_for_package( + package, advisory, package_dict.get("specifier") + ) return package_advisories diff --git a/notus/scanner/models/packages/package.py b/notus/scanner/models/packages/package.py index 1af6d65..870a4c7 100644 --- a/notus/scanner/models/packages/package.py +++ b/notus/scanner/models/packages/package.py @@ -150,6 +150,15 @@ def from_name_and_full_version(name: str, full_version: str): raise NotImplementedError() +@dataclass +class PackageRange: + name: str + verifier1: str + verifier2: str + package1: Package + package2: Package + + @dataclass(frozen=True, unsafe_hash=True) class PackageAdvisory: """Connects a package with an advisory""" @@ -173,9 +182,9 @@ class PackageAdvisories: default_factory=dict ) - is_comparable = ( - lambda a, b: a.compare(b) != PackageComparison.NOT_COMPARABLE - ) + @staticmethod + def is_comparable(package1: Package, package2: Package): + return package1.compare(package2) != PackageComparison.NOT_COMPARABLE comparison_map = { ">=": lambda a, b: a > b @@ -196,9 +205,9 @@ class PackageAdvisories: } def get_package_advisories_for_package( - self, package: Package + self, package_name: str ) -> Dict[str, Set[PackageAdvisory]]: - return self.advisories.get(package.name) or dict() + return self.advisories.get(package_name) or dict() def add_advisory_for_package( self, @@ -208,10 +217,10 @@ def add_advisory_for_package( ) -> None: if verifier not in self.comparison_map: verifier = ">=" - advisories = self.get_package_advisories_for_package(package) - is_vulnerable = lambda other: self.comparison_map[verifier]( - package, other - ) + advisories = self.get_package_advisories_for_package(package.name) + + def is_vulnerable(other: Package) -> Optional[bool]: + return self.comparison_map[verifier](package, other) if not advisory in advisories: advisories[advisory] = set() @@ -221,5 +230,44 @@ def add_advisory_for_package( ) self.advisories[package.name] = advisories + def add_range_advisory_for_package( + self, + package_range: PackageRange, + advisory: str, + ) -> None: + advisories = self.get_package_advisories_for_package(package_range.name) + if package_range.verifier1 in self.comparison_map: + package_range.verifier1 = "<" + if package_range.verifier2 in self.comparison_map: + package_range.verifier2 = ">=" + + def is_vulnerable(other: Package) -> Optional[bool]: + return self.comparison_map[package_range.verifier1]( + package_range.package1, other + ) and self.comparison_map[package_range.verifier2]( + package_range.package2, other + ) + + if not advisory in advisories: + advisories[advisory] = set() + + advisories[advisory].add( + PackageAdvisory( + package_range.package1, + advisory, + package_range.verifier1, + is_vulnerable, + ) + ) + advisories[advisory].add( + PackageAdvisory( + package_range.package2, + advisory, + package_range.verifier2, + is_vulnerable, + ) + ) + self.advisories[package_range.name] = advisories + def __len__(self) -> int: return len(self.advisories) diff --git a/notus/scanner/scanner.py b/notus/scanner/scanner.py index 47522f2..f81608d 100644 --- a/notus/scanner/scanner.py +++ b/notus/scanner/scanner.py @@ -105,10 +105,8 @@ def _check_package( package_advisory.package, ) is_vulnerable = package_advisory.is_vulnerable(package) - if is_vulnerable is None: + if not is_vulnerable: continue - elif not is_vulnerable: - return vul.add(package, package_advisory) @@ -123,7 +121,9 @@ def _start_scan( for package in installed_packages: package_advisory_oids = ( - package_advisories.get_package_advisories_for_package(package) + package_advisories.get_package_advisories_for_package( + package.name + ) ) for oid, package_advisory_list in package_advisory_oids.items(): vul = self._check_package(package, package_advisory_list) @@ -171,9 +171,11 @@ def run_scan( if not package_advisories: # Probably a wrong or not supported OS-release logger.error( - "Unable to start scan for %s: No advisories for OS-release %s" - " found. Check if the OS-release is correct and the" - " corresponding advisories are given.", + ( + "Unable to start scan for %s: No advisories for OS-release" + " %s found. Check if the OS-release is correct and the" + " corresponding advisories are given." + ), message.host_ip, message.os_release, ) @@ -189,8 +191,10 @@ def run_scan( package_class = package_class_by_type(package_type) if not package_class: logger.error( - "Unable to start scan for %s: No package implementation for " - "OS-release %s found. Check if the OS-release is correct.", + ( + "Unable to start scan for %s: No package implementation for" + " OS-release %s found. Check if the OS-release is correct." + ), message.host_ip, message.os_release, ) diff --git a/tests/loader/range.notus b/tests/loader/range.notus new file mode 100644 index 0000000..6c40fb8 --- /dev/null +++ b/tests/loader/range.notus @@ -0,0 +1,144 @@ +{ + "version": "1.0", + "package_type": "deb", + "product_name": "Range Test", + "advisories": [ + { + "oid": "1.3.6.1.4.1.25623.1.1.7.2.2023.10089729899100", + "fixed_packages": [ + { + "range": [ + { + "name": "gitlab-ce", + "full_version": "15.11.0", + "specifier": "<" + }, + { + "name": "gitlab-ce", + "full_version": "15.11.11", + "specifier": ">=" + } + ] + }, + { + "range": [ + { + "name": "gitlab-ce", + "full_version": "16.0.0", + "specifier": "<" + }, + { + "name": "gitlab-ce", + "full_version": "16.0.7", + "specifier": ">=" + } + ] + }, + { + "range": [ + { + "name": "gitlab-ce", + "full_version": "16.1.0", + "specifier": "<" + }, + { + "name": "gitlab-ce", + "full_version": "16.1.2", + "specifier": ">=" + } + ] + } + ] + }, + { + "oid": "1.3.6.1.4.1.25623.1.1.7.2.2023.0988598199100", + "fixed_packages": [ + { + "name": "grafana", + "full_version": "8.5.24", + "specifier": ">=" + }, + { + "range": [ + { + "name": "grafana", + "full_version": "9.0.0", + "specifier": "<" + }, + { + "name": "grafana", + "full_version": "9.2.17", + "specifier": ">=" + } + ] + }, + { + "range": [ + { + "name": "grafana", + "full_version": "9.3.0", + "specifier": "<" + }, + { + "name": "grafana", + "full_version": "9.3.13", + "specifier": ">=" + } + ] + }, + { + "range": [ + { + "name": "grafana", + "full_version": "9.4.0", + "specifier": "<" + }, + { + "name": "grafana", + "full_version": "9.4.9", + "specifier": ">=" + } + ] + }, + { + "name": "grafana8", + "full_version": "8.5.24", + "specifier": ">=" + }, + { + "name": "grafana9", + "full_version": "9.2.17", + "specifier": ">=" + }, + { + "range": [ + { + "name": "grafana9", + "full_version": "9.3.0", + "specifier": "<" + }, + { + "name": "grafana9", + "full_version": "9.3.13", + "specifier": ">=" + } + ] + }, + { + "range": [ + { + "name": "grafana9", + "full_version": "9.4.0", + "specifier": "<" + }, + { + "name": "grafana9", + "full_version": "9.4.9", + "specifier": ">=" + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/loader/test_json.py b/tests/loader/test_json.py index 9e7747a..b4a7257 100644 --- a/tests/loader/test_json.py +++ b/tests/loader/test_json.py @@ -9,6 +9,7 @@ from notus.scanner.errors import AdvisoriesLoadingError from notus.scanner.loader.gpg_sha_verifier import VerificationResult from notus.scanner.loader.json import JSONAdvisoriesLoader +from notus.scanner.models.packages.deb import DEBPackage from notus.scanner.models.packages.rpm import RPMPackage _here = Path(__file__).parent @@ -62,10 +63,10 @@ def test_example(self): if not package2: self.fail("package2 is None") package_advisories1 = advisories.get_package_advisories_for_package( - package1 + package1.name ) package_advisories2 = advisories.get_package_advisories_for_package( - package2 + package2.name ) oid = "1.3.6.1.4.1.25623.1.1.2.2016.1008" @@ -89,6 +90,53 @@ def test_example(self): self.assertEqual(advisory, "1.3.6.1.4.1.25623.1.1.2.2016.1008") + def test_example_range(self): + loader = JSONAdvisoriesLoader( + advisories_directory_path=_here, + verify=lambda _: VerificationResult.SUCCESS, + ) + + advisories = loader.load_package_advisories("range") + if not advisories: + self.fail("Advisories are none") + self.assertIsNotNone(advisories) + self.assertEqual(len(advisories), 4) + + package_lol1 = DEBPackage.from_name_and_full_version( + "gitlab-ce", "15.11.1" + ) + if not package_lol1: + self.fail("package1 is None") + + package_lol2 = DEBPackage.from_name_and_full_version( + "gitlab-ce", "15.10.1" + ) + if not package_lol2: + self.fail("package2 is None") + package_advisories = advisories.get_package_advisories_for_package( + package_lol1.name + ) + + oid = "1.3.6.1.4.1.25623.1.1.7.2.2023.10089729899100" + + self.assertEqual(len(package_advisories), 1) + + self.assertIn(oid, package_advisories.keys()) + + package_advisories = package_advisories[oid] + + # get first PackageAdvisory from the sets + package_advisory_lol = next(iter(package_advisories)) + + advisory = package_advisory_lol.oid + + self.assertEqual( + advisory, "1.3.6.1.4.1.25623.1.1.7.2.2023.10089729899100" + ) + + self.assertTrue(package_advisory_lol.is_vulnerable(package_lol1)) + self.assertFalse(package_advisory_lol.is_vulnerable(package_lol2)) + def test_invalid_package_type(self): loader = JSONAdvisoriesLoader( advisories_directory_path=_here, diff --git a/tests/models/packages/test_package.py b/tests/models/packages/test_package.py index c1ee420..9c6cecf 100644 --- a/tests/models/packages/test_package.py +++ b/tests/models/packages/test_package.py @@ -291,7 +291,7 @@ def test_default_is_vulnerable(self): ) package_advisories.add_advisory_for_package(package, advisory, None) advisories = package_advisories.get_package_advisories_for_package( - other + other.name ) self.assertEqual(1, len(advisories)) for package_advisories in advisories.values(): @@ -347,10 +347,10 @@ def test_get_package_advisories_for_package(self): package_advisories.add_advisory_for_package(package2, advisory2, None) advisories1 = package_advisories.get_package_advisories_for_package( - package1 + package1.name ) advisories2 = package_advisories.get_package_advisories_for_package( - package2 + package2.name ) self.assertEqual(len(advisories1), 2) self.assertEqual(len(advisories2), 1)