Skip to content

Commit

Permalink
Update schedule Schemas
Browse files Browse the repository at this point in the history
- Add a validation step to ensure the schedule field has a valid cron
  syntax.
- Update the tests
  • Loading branch information
sasirven committed Oct 28, 2024
1 parent d7e2d09 commit 6898392
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
8 changes: 7 additions & 1 deletion app/objects/c_schedule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import uuid

import marshmallow as ma
from croniter import croniter

from app.objects.interfaces.i_object import FirstClassObjectInterface
from app.objects.c_operation import OperationSchema
Expand All @@ -13,9 +14,14 @@ class Meta:
unknown = ma.EXCLUDE

id = ma.fields.String()
schedule = ma.fields.String(required=True)
schedule = ma.fields.String(required=True, metadata={"example": "5 4 * * *"})
task = ma.fields.Nested(OperationSchema())

@ma.validates('schedule')
def validate_schedule(self, value):
if not croniter.is_valid(value):
raise ma.ValidationError("Invalid cron syntax for schedule field.")

@ma.post_load
def build_schedule(self, data, **kwargs):
return None if kwargs.get('partial') is True else Schedule(**data)
Expand Down
8 changes: 4 additions & 4 deletions tests/api/v2/handlers/test_schedules_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@pytest.fixture
def updated_schedule_payload():
payload = {
'schedule': '01:00:00.000000',
'schedule': '0 1 * * *',
'task': {
'autonomous': 1,
'obfuscator': 'base64',
Expand Down Expand Up @@ -40,7 +40,7 @@ def _merge_dictionaries(dict1, dict2):
@pytest.fixture
def replaced_schedule_payload():
payload = {
'schedule': '12:12:00.000000',
'schedule': '0 12 12 * *',
'task': {
'autonomous': 1,
'obfuscator': 'base64',
Expand All @@ -57,7 +57,7 @@ def replaced_schedule_payload():

@pytest.fixture
def new_schedule_payload(test_planner, test_adversary, test_source):
payload = dict(schedule='00:00:00.000000',
payload = dict(schedule='0 0 * * *',
id='456',
task={
'name': 'new_scheduled_operation',
Expand All @@ -80,7 +80,7 @@ def expected_new_schedule_dump(new_schedule_payload):
def test_schedule(test_operation, event_loop):
operation = OperationSchema().load(test_operation)
schedule = ScheduleSchema().load(dict(id='123',
schedule='03:00:00.000000',
schedule='0 3 * * *',
task=operation.schema.dump(operation)))
event_loop.run_until_complete(BaseService.get_service('data_svc').store(schedule))
return schedule
Expand Down

0 comments on commit 6898392

Please sign in to comment.