Skip to content

Commit

Permalink
Change: Use a UUID for cpe_name_id at NIST CPE API and models
Browse files Browse the repository at this point in the history
cpe_name_id values are actually UUID therefore they should be parsed as
a UUID instance. This allows for better serialization (as bytes) if
necessary.
  • Loading branch information
bjoernricks committed Nov 17, 2023
1 parent 0ca44ec commit 790781d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 45 deletions.
5 changes: 3 additions & 2 deletions pontos/nvd/cpe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Type,
Union,
)
from uuid import UUID

from httpx import Timeout

Expand Down Expand Up @@ -88,7 +89,7 @@ def __init__(
rate_limit=rate_limit,
)

async def cpe(self, cpe_name_id: str) -> CPE:
async def cpe(self, cpe_name_id: Union[str, UUID]) -> CPE:
"""
Query for a CPE matching the CPE UUID.
Expand All @@ -114,7 +115,7 @@ async def cpe(self, cpe_name_id: str) -> CPE:
if not cpe_name_id:
raise PontosError("Missing CPE Name ID.")

response = await self._get(params={"cpeNameId": cpe_name_id})
response = await self._get(params={"cpeNameId": str(cpe_name_id)})
response.raise_for_status()
data = response.json(object_hook=convert_camel_case)
products = data["products"]
Expand Down
7 changes: 4 additions & 3 deletions pontos/nvd/models/cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from uuid import UUID

from pontos.models import Model, StrEnum

Expand Down Expand Up @@ -90,7 +91,7 @@ class DeprecatedBy(Model):
"""

cpe_name: Optional[str] = None
cpe_name_id: Optional[str] = None
cpe_name_id: Optional[UUID] = None


@dataclass
Expand All @@ -100,7 +101,7 @@ class CPE(Model):
Attributes:
cpe_name: The name of the CPE
cpe_name_id: ID of the CPE
cpe_name_id: UUID of the CPE
deprecated: True if the CPE is deprecated
last_modified: Last modification date of the CPE
created: Creation date of the CPE
Expand All @@ -111,7 +112,7 @@ class CPE(Model):
"""

cpe_name: str
cpe_name_id: str
cpe_name_id: UUID
deprecated: bool
last_modified: datetime
created: datetime
Expand Down
108 changes: 70 additions & 38 deletions tests/nvd/cpe/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
# ruff: noqa: E501

from datetime import datetime
from typing import Any, Dict, List, Optional
from itertools import repeat
from typing import Any, Optional
from unittest.mock import MagicMock, patch
from uuid import UUID, uuid4

from httpx import AsyncClient, Response

Expand All @@ -31,17 +33,37 @@
from tests.nvd import get_cpe_data


def uuid_replace_str(uuid: UUID, iteration: int, number: int) -> str:
id_str = str(uuid).rsplit("-", 2)
nn = "".join([str(j) for j in repeat(number, 4)])
ii = "".join([str(j) for j in repeat(iteration, 12)])
return f"{id_str[0]}-{nn}-{ii}"


def uuid_replace(uuid: UUID, iteration: int, number: int) -> UUID:
return UUID(uuid_replace_str(uuid, iteration, number))


def create_cpe_response(
cpe_name_id: str,
cpe_name_id: UUID,
*,
update: Optional[Dict[str, Any]] = None,
update: Optional[dict[str, Any]] = None,
results: int = 1,
iteration: int = 1,
) -> MagicMock:
products = [
{
"cpe": get_cpe_data(
{
"cpe_name_id": f"{uuid_replace_str(cpe_name_id, iteration, i)}"
}
)
}
for i in range(1, results + 1)
]

data = {
"products": [
{"cpe": get_cpe_data({"cpe_name_id": f"{cpe_name_id}-{i}"})}
for i in range(1, results + 1)
],
"products": products,
"results_per_page": results,
}
if update:
Expand All @@ -53,13 +75,14 @@ def create_cpe_response(


def create_cpes_responses(
responses: int = 2, results_per_response: int = 1
) -> List[MagicMock]:
cpe_name_id: UUID, responses: int = 2, results_per_response: int = 1
) -> list[MagicMock]:
return [
create_cpe_response(
cpe_name_id=f"CPE-{i}",
cpe_name_id=cpe_name_id,
update={"total_results": responses * results_per_response},
results=results_per_response,
iteration=i,
)
for i in range(1, responses + 1)
]
Expand Down Expand Up @@ -91,21 +114,22 @@ async def test_no_cpe(self):
await self.api.cpe("CPE-1")

async def test_cpe(self):
self.http_client.get.return_value = create_cpe_response("CPE-1")
uuid = uuid4()
self.http_client.get.return_value = create_cpe_response(uuid)

cpe = await self.api.cpe("CPE-1-1")
cpe = await self.api.cpe(uuid)

self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
params={"cpeNameId": "CPE-1-1"},
params={"cpeNameId": str(uuid)},
)

self.assertEqual(
cpe.cpe_name,
"cpe:2.3:o:microsoft:windows_10_22h2:-:*:*:*:*:*:arm64:*",
)
self.assertEqual(cpe.cpe_name_id, "CPE-1-1")
self.assertEqual(cpe.cpe_name_id, uuid_replace(uuid, 1, 1))
self.assertFalse(cpe.deprecated)
self.assertEqual(
cpe.last_modified, datetime(2022, 12, 9, 18, 15, 16, 973000)
Expand All @@ -123,7 +147,8 @@ async def test_rate_limit(
sleep_mock: MagicMock,
monotonic_mock: MagicMock,
):
self.http_client.get.side_effect = create_cpes_responses(8)
uuid = uuid4()
self.http_client.get.side_effect = create_cpes_responses(uuid, 8)
monotonic_mock.side_effect = [10, 11]

it = aiter(self.api.cpes())
Expand All @@ -141,15 +166,16 @@ async def test_rate_limit(

@patch("pontos.nvd.cpe.api.now", spec=now)
async def test_cves_last_modified_start_date(self, now_mock: MagicMock):
uuid = uuid4()
now_mock.return_value = datetime(2022, 12, 31)
self.http_client.get.side_effect = create_cpes_responses()
self.http_client.get.side_effect = create_cpes_responses(uuid)

it = aiter(
self.api.cpes(last_modified_start_date=datetime(2022, 12, 1))
)
cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-1-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -165,7 +191,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock):

cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-2-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -181,7 +207,8 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock):
cve = await anext(it)

async def test_cves_last_modified_end_date(self):
self.http_client.get.side_effect = create_cpes_responses()
uuid = uuid4()
self.http_client.get.side_effect = create_cpes_responses(uuid)

it = aiter(
self.api.cpes(
Expand All @@ -191,7 +218,7 @@ async def test_cves_last_modified_end_date(self):
)
cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-1-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -207,7 +234,7 @@ async def test_cves_last_modified_end_date(self):

cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-2-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -223,12 +250,13 @@ async def test_cves_last_modified_end_date(self):
cve = await anext(it)

async def test_cpes_keywords(self):
self.http_client.get.side_effect = create_cpes_responses()
uuid = uuid4()
self.http_client.get.side_effect = create_cpes_responses(uuid)

it = aiter(self.api.cpes(keywords=["Mac OS X", "kernel"]))
cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-1-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -244,7 +272,7 @@ async def test_cpes_keywords(self):

cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-2-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -260,12 +288,13 @@ async def test_cpes_keywords(self):
cve = await anext(it)

async def test_cpes_keyword(self):
self.http_client.get.side_effect = create_cpes_responses()
uuid = uuid4()
self.http_client.get.side_effect = create_cpes_responses(uuid)

it = aiter(self.api.cpes(keywords="macOS"))
cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-1-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -280,7 +309,7 @@ async def test_cpes_keyword(self):

cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-2-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -295,7 +324,8 @@ async def test_cpes_keyword(self):
cve = await anext(it)

async def test_cpes_cpe_match_string(self):
self.http_client.get.side_effect = create_cpes_responses()
uuid = uuid4()
self.http_client.get.side_effect = create_cpes_responses(uuid)

it = aiter(
self.api.cpes(
Expand All @@ -304,7 +334,7 @@ async def test_cpes_cpe_match_string(self):
)
cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-1-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -319,7 +349,7 @@ async def test_cpes_cpe_match_string(self):

cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-2-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -334,7 +364,8 @@ async def test_cpes_cpe_match_string(self):
cve = await anext(it)

async def test_cpes_match_criteria_id(self):
self.http_client.get.side_effect = create_cpes_responses()
uuid = uuid4()
self.http_client.get.side_effect = create_cpes_responses(uuid)

it = aiter(
self.api.cpes(
Expand All @@ -343,7 +374,7 @@ async def test_cpes_match_criteria_id(self):
)
cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-1-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -358,7 +389,7 @@ async def test_cpes_match_criteria_id(self):

cve = await anext(it)

self.assertEqual(cve.cpe_name_id, "CPE-2-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1))
self.http_client.get.assert_awaited_once_with(
"https://services.nvd.nist.gov/rest/json/cpes/2.0",
headers={},
Expand All @@ -373,8 +404,9 @@ async def test_cpes_match_criteria_id(self):
cve = await anext(it)

async def test_cpes_request_results(self):
uuid = uuid4()
self.http_client.get.side_effect = create_cpes_responses(
results_per_response=2
uuid, results_per_response=2
)

it = aiter(self.api.cpes(request_results=10))
Expand All @@ -388,11 +420,11 @@ async def test_cpes_request_results(self):
"resultsPerPage": 10,
},
)
self.assertEqual(cve.cpe_name_id, "CPE-1-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1))

self.http_client.get.reset_mock()
cve = await anext(it)
self.assertEqual(cve.cpe_name_id, "CPE-1-2")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 2))
self.http_client.get.assert_not_called()

self.http_client.get.reset_mock()
Expand All @@ -405,11 +437,11 @@ async def test_cpes_request_results(self):
"resultsPerPage": 2,
},
)
self.assertEqual(cve.cpe_name_id, "CPE-2-1")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1))

self.http_client.get.reset_mock()
cve = await anext(it)
self.assertEqual(cve.cpe_name_id, "CPE-2-2")
self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 2))
self.http_client.get.assert_not_called()

self.http_client.get.reset_mock()
Expand Down
6 changes: 4 additions & 2 deletions tests/nvd/models/test_cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import unittest
from datetime import datetime
from uuid import UUID

from pontos.nvd.models.cpe import CPE, ReferenceType
from tests.nvd import get_cpe_data
Expand All @@ -32,7 +33,7 @@ def test_required_only(self):
"cpe:2.3:o:microsoft:windows_10_22h2:-:*:*:*:*:*:arm64:*",
)
self.assertEqual(
cpe.cpe_name_id, "9BAECDB2-614D-4E9C-9936-190C30246F03"
cpe.cpe_name_id, UUID("9BAECDB2-614D-4E9C-9936-190C30246F03")
)
self.assertFalse(cpe.deprecated)
self.assertEqual(
Expand Down Expand Up @@ -111,5 +112,6 @@ def test_deprecated(self):
"cpe:2.3:o:microsoft:windows_10_22h2:-:*:*:*:*:*:x64:*",
)
self.assertEqual(
deprecated_by.cpe_name_id, "A09335E2-B42F-4820-B487-57A4BF0CEE98"
deprecated_by.cpe_name_id,
UUID("A09335E2-B42F-4820-B487-57A4BF0CEE98"),
)

0 comments on commit 790781d

Please sign in to comment.