Skip to content

Commit

Permalink
Fix scan profile db event issue by adding an explicit reference field…
Browse files Browse the repository at this point in the history
… (1.8) (#1094)

Co-authored-by: Donny Peeters <46660228+Donnype@users.noreply.github.com>
Co-authored-by: ammar92 <ammar.abdulamir@gmail.com>
Co-authored-by: Patrick <Darwinkel@users.noreply.github.com>
  • Loading branch information
4 people committed Jun 1, 2023
1 parent 677b2c0 commit 42bf33b
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 132 deletions.
2 changes: 1 addition & 1 deletion octopoes/octopoes/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def _on_delete_origin_parameter(self, event: OriginParameterDBEvent) -> None:
return

def _run_inferences(self, event: ScanProfileDBEvent) -> None:
inference_origins = self.origin_repository.list_by_source(event.new_data.reference, valid_time=event.valid_time)
inference_origins = self.origin_repository.list_by_source(event.reference, valid_time=event.valid_time)
inference_origins = [o for o in inference_origins if o.origin_type == OriginType.INFERENCE]
for inference_origin in inference_origins:
self._run_inference(inference_origin, event.valid_time)
Expand Down
3 changes: 2 additions & 1 deletion octopoes/octopoes/events/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ def primary_key(self) -> str:

class ScanProfileDBEvent(DBEvent):
entity_type: Literal["scan_profile"] = "scan_profile"
reference: Reference
old_data: Optional[ScanProfile]
new_data: Optional[ScanProfile]

@property
def primary_key(self) -> Reference:
return self.new_data.reference if self.new_data else self.old_data.reference
return self.reference


EVENT_TYPE = Union[OOIDBEvent, OriginDBEvent, OriginParameterDBEvent, ScanProfileDBEvent]
94 changes: 46 additions & 48 deletions octopoes/octopoes/events/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,60 +54,58 @@ def publish(self, event: DBEvent) -> None:
event.client,
)

if isinstance(event, ScanProfileDBEvent):
incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or (
event.operation_type == OperationType.UPDATE and event.new_data.level > event.old_data.level
)
if incremented:
ooi = json.dumps(
{
"primary_key": event.new_data.reference,
"object_type": event.new_data.reference.class_,
"scan_profile": event.new_data.dict(),
}
)

self.channel.basic_publish(
"",
f"{event.client}__scan_profile_increments",
ooi.encode(),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
),
)

logger.info(
"Published scan_profile_increment [primary_key=%s] [level=%s]",
format_id_short(event.primary_key),
event.new_data.level,
)

# publish mutations
mutation = ScanProfileMutation(
operation=event.operation_type,
primary_key=event.primary_key,
)
if not isinstance(event, ScanProfileDBEvent):
return

if event.operation_type != OperationType.DELETE:
mutation.value = AbstractOOI(
primary_key=event.new_data.reference,
object_type=event.new_data.reference.class_,
scan_profile=event.new_data,
)
incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or (
event.operation_type == OperationType.UPDATE
and event.old_data
and event.new_data.level > event.old_data.level
)

if incremented:
ooi = json.dumps(
{
"primary_key": event.reference,
"object_type": event.reference.class_,
"scan_profile": event.new_data.dict(),
}
)

self.channel.basic_publish(
"",
f"{event.client}__scan_profile_mutations",
mutation.json().encode(),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
),
f"{event.client}__scan_profile_increments",
ooi.encode(),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)

level = mutation.value.scan_profile.level if mutation.value != OperationType.DELETE else None
logger.info(
"Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]",
mutation.operation,
"Published scan_profile_increment [primary_key=%s] [level=%s]",
format_id_short(event.primary_key),
level,
event.new_data.level,
)

# publish mutations
mutation = ScanProfileMutation(operation=event.operation_type, primary_key=event.primary_key)

if event.operation_type != OperationType.DELETE:
mutation.value = AbstractOOI(
primary_key=event.new_data.reference,
object_type=event.new_data.reference.class_,
scan_profile=event.new_data,
)

self.channel.basic_publish(
"",
f"{event.client}__scan_profile_mutations",
mutation.json().encode(),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)

level = mutation.value.scan_profile.level if mutation.value is not None else None
logger.info(
"Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]",
mutation.operation,
format_id_short(event.primary_key),
level,
)
2 changes: 2 additions & 0 deletions octopoes/octopoes/repositories/scan_profile_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def save(
event = ScanProfileDBEvent(
operation_type=OperationType.CREATE if old_scan_profile is None else OperationType.UPDATE,
valid_time=valid_time,
reference=new_scan_profile.reference,
old_data=old_scan_profile,
new_data=new_scan_profile,
)
Expand All @@ -118,6 +119,7 @@ def delete(self, scan_profile: ScanProfileBase, valid_time: datetime) -> None:

event = ScanProfileDBEvent(
operation_type=OperationType.DELETE,
reference=scan_profile.reference,
valid_time=valid_time,
old_data=scan_profile,
)
Expand Down
42 changes: 41 additions & 1 deletion octopoes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
from unittest.mock import Mock

import pytest
from bits.runner import BitRunner

from octopoes.models import OOI, EmptyScanProfile, Reference, ScanProfileBase
from octopoes.api.api import app
from octopoes.api.router import settings
from octopoes.config.settings import Settings, XTDBType
from octopoes.core.service import OctopoesService
from octopoes.models import OOI, DeclaredScanProfile, EmptyScanProfile, Reference, ScanProfileBase
from octopoes.models.path import Direction, Path
from octopoes.models.types import DNSZone, Hostname, IPAddressV4, Network, ResolvedHostname
from octopoes.repositories.ooi_repository import OOIRepository
Expand Down Expand Up @@ -138,3 +143,38 @@ def resolved_hostname(hostname, ipaddressv4, ooi_repository, scan_profile_reposi
scan_profile_repository,
valid_time,
)


@pytest.fixture
def empty_scan_profile():
return EmptyScanProfile(reference="test_reference")


@pytest.fixture
def declared_scan_profile():
return DeclaredScanProfile(reference="test_reference", level=2)


@pytest.fixture
def xtdbtype_multinode():
def get_settings_override():
return Settings(xtdb_type=XTDBType.XTDB_MULTINODE)

app.dependency_overrides[settings] = get_settings_override
yield
app.dependency_overrides = {}


@pytest.fixture
def app_settings():
return Settings(xtdb_type=XTDBType.XTDB_MULTINODE)


@pytest.fixture
def octopoes_service() -> OctopoesService:
return OctopoesService(Mock(), Mock(), Mock(), Mock())


@pytest.fixture
def bit_runner(mocker) -> BitRunner:
return mocker.patch("octopoes.core.service.BitRunner")
173 changes: 173 additions & 0 deletions octopoes/tests/test_event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import uuid
from datetime import datetime

import pika

from octopoes.events.events import OOIDBEvent, OperationType, ScanProfileDBEvent
from octopoes.events.manager import EventManager


def test_event_manager_create_ooi(mocker, network):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = OOIDBEvent(operation_type=OperationType.CREATE, valid_time=datetime(2023, 1, 1), new_data=network)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "ooi",
"operation_type": "create",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": None,
"new_data": {
"object_type": "Network",
"scan_profile": None,
"primary_key": "Network|internet",
"name": "internet",
},
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

channel_mock.basic_publish.assert_not_called()


def test_event_manager_create_empty_scan_profile(mocker, empty_scan_profile):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = ScanProfileDBEvent(
operation_type=OperationType.CREATE,
valid_time=datetime(2023, 1, 1),
new_data=empty_scan_profile,
reference="test_reference",
)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "scan_profile",
"operation_type": "create",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": None,
"new_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0},
"reference": "test_reference",
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

channel_mock.basic_publish.assert_called_once_with(
"",
"test__scan_profile_mutations",
b'{"operation": "create", "primary_key": "test_reference", '
b'"value": {"primary_key": "test_reference", '
b'"object_type": "test_reference", '
b'"scan_profile": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0}}}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)


def test_event_manager_create_declared_scan_profile(mocker, declared_scan_profile):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = ScanProfileDBEvent(
operation_type=OperationType.CREATE,
valid_time=datetime(2023, 1, 1),
new_data=declared_scan_profile,
reference="test_reference",
)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "scan_profile",
"operation_type": "create",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": None,
"new_data": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2},
"reference": "test_reference",
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

assert channel_mock.basic_publish.call_count == 2
channel_mock.basic_publish.asset_has_calls(
mocker.call(
"",
"test__scan_profile_increments",
b'{"primary_key": "test_reference", "object_type": "test_reference",'
b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
),
mocker.call(
"",
"test__scan_profile_mutations",
b'{"operation": "create", "primary_key": "test_reference", '
b'"value": {"primary_key": "test_reference", '
b'"object_type": "test_reference", '
b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
),
)


def test_event_manager_delete_empty_scan_profile(mocker, empty_scan_profile):
celery_mock = mocker.Mock()
channel_mock = mocker.Mock()

mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
manager = EventManager("test", celery_mock, "queue", channel_mock)
event = ScanProfileDBEvent(
operation_type=OperationType.DELETE,
valid_time=datetime(2023, 1, 1),
old_data=empty_scan_profile,
reference="test_reference",
)
manager.publish(event)

celery_mock.send_task.assert_called_once_with(
"octopoes.tasks.tasks.handle_event",
(
{
"entity_type": "scan_profile",
"operation_type": "delete",
"valid_time": "2023-01-01T00:00:00",
"client": "test",
"old_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0},
"new_data": None,
"reference": "test_reference",
},
),
queue="queue",
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
)

channel_mock.basic_publish.assert_called_once_with(
"",
"test__scan_profile_mutations",
b'{"operation": "delete", "primary_key": "test_reference", "value": null}',
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)
Loading

0 comments on commit 42bf33b

Please sign in to comment.