Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Sep 19, 2024
1 parent 5c9be2b commit 34e3922
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 24 deletions.
2 changes: 0 additions & 2 deletions mula/scheduler/models/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ class Boefje(BaseModel):
id: str
name: str | None = Field(default=None)
version: str | None = Field(default=None)
cron: str | None = Field(default=None) # FIXME: placeholder
interval: int | None = Field(default=None) # FIXME: placeholder


class BoefjeMeta(BaseModel):
Expand Down
4 changes: 2 additions & 2 deletions mula/scheduler/models/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ class Plugin(BaseModel):
consumes: str | list[str]
options: list[str] | None = None
produces: list[str]
cron: str | None = None # FIXME: placeholder
interval: int | None = None # FIXME: placeholder
cron: str | None = None
interval: int | None = None
33 changes: 17 additions & 16 deletions mula/scheduler/schedulers/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,10 +1000,14 @@ def get_boefjes_for_ooi(self, ooi) -> list[Plugin]:
return boefjes

def post_push(self, item: Task) -> Task:
"""Override for the post_push when a boefje specifies we schedule for its execution"""
"""Override Schedule.post_push() when a boefje specifies a schedule for
execution (cron expression) we schedule for its execution"""
# Does a boefje have a schedule defined?
schedule = utils.deep_get(item.data, ["boefje", "cron"]) # FIXME: based on implementation
if schedule is None:
plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id(
utils.deep_get(item.data, ["boefje", "id"]),
self.organisation.id,
)
if plugin.cron is None:
return super().post_push(item)

schedule_db = self.ctx.datastores.schedule_store.get_schedule_by_hash(item.hash)
Expand All @@ -1014,7 +1018,7 @@ def post_push(self, item: Task) -> Task:
hash=item.hash,
deadline_at=self.calculate_deadline(item),
data=item.data,
schedule=schedule,
schedule=plugin.cron,
)

schedule_db = self.ctx.datastores.schedule_store.create_schedule(schedule)
Expand All @@ -1024,23 +1028,20 @@ def post_push(self, item: Task) -> Task:
else:
# We update the schedule if it already exists, this makes sure
# that when a boefje schedule is updated, we also update the schedule.
schedule_db.schedule = schedule
schedule_db.schedule = plugin.schedule
self.ctx.datastores.schedule_store.update_schedule(schedule_db)

return super().post_push(item)

def calculate_deadline(self, task: Task) -> datetime:
"""Calculate the deadline for a task.
Args:
task: The task to calculate the deadline for.
Returns:
The calculated deadline.
"""
"""Override Scheduler.calculate_deadline() to calculate the deadline
for a task and based on the boefje interval."""
# Does the boefje have an interval defined?
interval = utils.deep_get(task.data, ["boefje", "interval"]) # FIXME: based on implementation
if interval is not None:
return datetime.now(timezone.utc) + timedelta(minutes=interval)
plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id(
utils.deep_get(task.data, ["boefje", "id"]),
self.organisation.id,
)
if plugin.interval is not None:
return datetime.now(timezone.utc) + timedelta(minutes=plugin.interval)

return super().calculate_deadline(task)
2 changes: 0 additions & 2 deletions mula/tests/factories/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class Meta:
scan_level: int = fuzzy.FuzzyInteger(0, 4)
consumes: list[str] = LazyFunction(lambda: [])
produces: list[str] = LazyFunction(lambda: [])
cron: str | None = None
interval: int | None = None


class BoefjeMetaFactory(Factory):
Expand Down
4 changes: 4 additions & 0 deletions mula/tests/factories/plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from factory import Factory, LazyFunction, Sequence, fuzzy

from scheduler.models import Plugin


Expand All @@ -15,3 +16,6 @@ class Meta:
produces: list[str] = LazyFunction(lambda: [])

enabled: bool = True

cron: str | None = None
interval: int | None = None
22 changes: 20 additions & 2 deletions mula/tests/integration/test_boefje_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ def test_post_push_boefje_cron(self):
scan_profile = ScanProfileFactory(level=0)
ooi = OOIFactory(scan_profile=scan_profile)
boefje_task = models.BoefjeTask(
boefje=BoefjeFactory(cron=cron),
boefje=BoefjeFactory(),
input_ooi=ooi.primary_key,
organization=self.organisation.id,
)
Expand All @@ -869,6 +869,12 @@ def test_post_push_boefje_cron(self):
task=task,
)

self.mock_get_plugin.return_value = PluginFactory(
scan_level=0,
consumes=[ooi.object_type],
cron=cron,
)

# Act
self.scheduler.push_item_to_queue(item)

Expand Down Expand Up @@ -959,6 +965,12 @@ def test_post_push_boefje_cron_multiple_tasks(self):
task=task2,
)

self.mock_get_plugin.return_value = PluginFactory(
scan_level=0,
consumes=[ooi1.object_type],
cron=cron,
)

# Act
self.scheduler.push_item_to_queue(item1)
self.scheduler.push_item_to_queue(item2)
Expand Down Expand Up @@ -1015,7 +1027,7 @@ def test_post_push_boefje_interval(self):
scan_profile = ScanProfileFactory(level=0)
ooi = OOIFactory(scan_profile=scan_profile)
boefje_task = models.BoefjeTask(
boefje=BoefjeFactory(interval=1500),
boefje=BoefjeFactory(),
input_ooi=ooi.primary_key,
organization=self.organisation.id,
)
Expand All @@ -1036,6 +1048,12 @@ def test_post_push_boefje_interval(self):
task=task,
)

self.mock_get_plugin.return_value = PluginFactory(
scan_level=0,
consumes=[ooi.object_type],
interval=1500,
)

# Act
self.scheduler.push_item_to_queue(item)

Expand Down

0 comments on commit 34e3922

Please sign in to comment.