From 34e39223a2b2c0a3042955f9bbd841b2df6c4989 Mon Sep 17 00:00:00 2001 From: JP Bruins Slot Date: Thu, 19 Sep 2024 15:00:21 +0200 Subject: [PATCH] Update --- mula/scheduler/models/boefje.py | 2 -- mula/scheduler/models/plugin.py | 4 +-- mula/scheduler/schedulers/boefje.py | 33 ++++++++++--------- mula/tests/factories/boefje.py | 2 -- mula/tests/factories/plugin.py | 4 +++ .../integration/test_boefje_scheduler.py | 22 +++++++++++-- 6 files changed, 43 insertions(+), 24 deletions(-) diff --git a/mula/scheduler/models/boefje.py b/mula/scheduler/models/boefje.py index d0fdfe5165..0d3b693229 100644 --- a/mula/scheduler/models/boefje.py +++ b/mula/scheduler/models/boefje.py @@ -11,8 +11,6 @@ class Boefje(BaseModel): id: str name: str | None = Field(default=None) version: str | None = Field(default=None) - cron: str | None = Field(default=None) # FIXME: placeholder - interval: int | None = Field(default=None) # FIXME: placeholder class BoefjeMeta(BaseModel): diff --git a/mula/scheduler/models/plugin.py b/mula/scheduler/models/plugin.py index 8d95a94eaf..83d79c5791 100644 --- a/mula/scheduler/models/plugin.py +++ b/mula/scheduler/models/plugin.py @@ -17,5 +17,5 @@ class Plugin(BaseModel): consumes: str | list[str] options: list[str] | None = None produces: list[str] - cron: str | None = None # FIXME: placeholder - interval: int | None = None # FIXME: placeholder + cron: str | None = None + interval: int | None = None diff --git a/mula/scheduler/schedulers/boefje.py b/mula/scheduler/schedulers/boefje.py index 82a16094b8..a2a38b6e14 100644 --- a/mula/scheduler/schedulers/boefje.py +++ b/mula/scheduler/schedulers/boefje.py @@ -1000,10 +1000,14 @@ def get_boefjes_for_ooi(self, ooi) -> list[Plugin]: return boefjes def post_push(self, item: Task) -> Task: - """Override for the post_push when a boefje specifies we schedule for its execution""" + """Override Schedule.post_push() when a boefje specifies a schedule for + execution (cron expression) we schedule for its execution""" # Does a boefje have a schedule defined? - schedule = utils.deep_get(item.data, ["boefje", "cron"]) # FIXME: based on implementation - if schedule is None: + plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id( + utils.deep_get(item.data, ["boefje", "id"]), + self.organisation.id, + ) + if plugin.cron is None: return super().post_push(item) schedule_db = self.ctx.datastores.schedule_store.get_schedule_by_hash(item.hash) @@ -1014,7 +1018,7 @@ def post_push(self, item: Task) -> Task: hash=item.hash, deadline_at=self.calculate_deadline(item), data=item.data, - schedule=schedule, + schedule=plugin.cron, ) schedule_db = self.ctx.datastores.schedule_store.create_schedule(schedule) @@ -1024,23 +1028,20 @@ def post_push(self, item: Task) -> Task: else: # We update the schedule if it already exists, this makes sure # that when a boefje schedule is updated, we also update the schedule. - schedule_db.schedule = schedule + schedule_db.schedule = plugin.schedule self.ctx.datastores.schedule_store.update_schedule(schedule_db) return super().post_push(item) def calculate_deadline(self, task: Task) -> datetime: - """Calculate the deadline for a task. - - Args: - task: The task to calculate the deadline for. - - Returns: - The calculated deadline. - """ + """Override Scheduler.calculate_deadline() to calculate the deadline + for a task and based on the boefje interval.""" # Does the boefje have an interval defined? - interval = utils.deep_get(task.data, ["boefje", "interval"]) # FIXME: based on implementation - if interval is not None: - return datetime.now(timezone.utc) + timedelta(minutes=interval) + plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id( + utils.deep_get(task.data, ["boefje", "id"]), + self.organisation.id, + ) + if plugin.interval is not None: + return datetime.now(timezone.utc) + timedelta(minutes=plugin.interval) return super().calculate_deadline(task) diff --git a/mula/tests/factories/boefje.py b/mula/tests/factories/boefje.py index 8d19f4e342..1fd6af6200 100644 --- a/mula/tests/factories/boefje.py +++ b/mula/tests/factories/boefje.py @@ -16,8 +16,6 @@ class Meta: scan_level: int = fuzzy.FuzzyInteger(0, 4) consumes: list[str] = LazyFunction(lambda: []) produces: list[str] = LazyFunction(lambda: []) - cron: str | None = None - interval: int | None = None class BoefjeMetaFactory(Factory): diff --git a/mula/tests/factories/plugin.py b/mula/tests/factories/plugin.py index 23f07e1778..c05bc97db4 100644 --- a/mula/tests/factories/plugin.py +++ b/mula/tests/factories/plugin.py @@ -1,4 +1,5 @@ from factory import Factory, LazyFunction, Sequence, fuzzy + from scheduler.models import Plugin @@ -15,3 +16,6 @@ class Meta: produces: list[str] = LazyFunction(lambda: []) enabled: bool = True + + cron: str | None = None + interval: int | None = None diff --git a/mula/tests/integration/test_boefje_scheduler.py b/mula/tests/integration/test_boefje_scheduler.py index c7acad4dc4..719623017c 100644 --- a/mula/tests/integration/test_boefje_scheduler.py +++ b/mula/tests/integration/test_boefje_scheduler.py @@ -848,7 +848,7 @@ def test_post_push_boefje_cron(self): scan_profile = ScanProfileFactory(level=0) ooi = OOIFactory(scan_profile=scan_profile) boefje_task = models.BoefjeTask( - boefje=BoefjeFactory(cron=cron), + boefje=BoefjeFactory(), input_ooi=ooi.primary_key, organization=self.organisation.id, ) @@ -869,6 +869,12 @@ def test_post_push_boefje_cron(self): task=task, ) + self.mock_get_plugin.return_value = PluginFactory( + scan_level=0, + consumes=[ooi.object_type], + cron=cron, + ) + # Act self.scheduler.push_item_to_queue(item) @@ -959,6 +965,12 @@ def test_post_push_boefje_cron_multiple_tasks(self): task=task2, ) + self.mock_get_plugin.return_value = PluginFactory( + scan_level=0, + consumes=[ooi1.object_type], + cron=cron, + ) + # Act self.scheduler.push_item_to_queue(item1) self.scheduler.push_item_to_queue(item2) @@ -1015,7 +1027,7 @@ def test_post_push_boefje_interval(self): scan_profile = ScanProfileFactory(level=0) ooi = OOIFactory(scan_profile=scan_profile) boefje_task = models.BoefjeTask( - boefje=BoefjeFactory(interval=1500), + boefje=BoefjeFactory(), input_ooi=ooi.primary_key, organization=self.organisation.id, ) @@ -1036,6 +1048,12 @@ def test_post_push_boefje_interval(self): task=task, ) + self.mock_get_plugin.return_value = PluginFactory( + scan_level=0, + consumes=[ooi.object_type], + interval=1500, + ) + # Act self.scheduler.push_item_to_queue(item)