diff --git a/README.md b/README.md index 27e253a..beb4dd6 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ By harnessing the capabilities of Executor Engine, users can effortlessly constr + 📚 Support multiple job types * `LocalJob`, `ThreadJob`, `ProcessJob`, `DaskJob` - * Extend job types: `SubprocessJob`, `WebappJob` + * Extend job types: `SubprocessJob`, `WebappJob`, `CronJob` + 🔧 Job management * Job dependency management. * Job status: Pending, Running, Done, Failed, Cancelled. diff --git a/docs/api-reference/condition.md b/docs/api-reference/condition.md index 6f06f8e..3d10c82 100644 --- a/docs/api-reference/condition.md +++ b/docs/api-reference/condition.md @@ -15,22 +15,57 @@ options: show_root_heading: true -::: executor.engine.job.condition.AfterTimepoint +::: executor.engine.job.condition.Combination handler: python options: show_root_heading: true -::: executor.engine.job.condition.Combination +::: executor.engine.job.condition.AllSatisfied handler: python options: show_root_heading: true -::: executor.engine.job.condition.AllSatisfied +::: executor.engine.job.condition.AnySatisfied handler: python options: show_root_heading: true -::: executor.engine.job.condition.AnySatisfied +::: executor.engine.job.condition.TimeCondition + handler: python + options: + show_root_heading: true + +::: executor.engine.job.condition.EveryPeriod + handler: python + options: + show_root_heading: true + +::: executor.engine.job.condition.AfterClock + handler: python + options: + show_root_heading: true + +::: executor.engine.job.condition.BeforeClock + handler: python + options: + show_root_heading: true + +::: executor.engine.job.condition.AfterWeekday + handler: python + options: + show_root_heading: true + +::: executor.engine.job.condition.BeforeWeekday + handler: python + options: + show_root_heading: true + +::: executor.engine.job.condition.AfterTimepoint + handler: python + options: + show_root_heading: true + +::: executor.engine.job.condition.BeforeTimepoint handler: python options: show_root_heading: true diff --git a/docs/api-reference/extend_job.md b/docs/api-reference/extend_job.md index 258c604..69e480a 100644 --- a/docs/api-reference/extend_job.md +++ b/docs/api-reference/extend_job.md @@ -8,4 +8,15 @@ ::: executor.engine.job.extend.WebappJob handler: python options: - show_root_heading: true \ No newline at end of file + show_root_heading: true + +::: executor.engine.job.extend.SentinelJob + handler: python + options: + show_root_heading: true + +::: executor.engine.job.extend.CronJob + handler: python + options: + show_root_heading: true + diff --git a/docs/getting-started.md b/docs/getting-started.md index 8a1c916..23dd649 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -45,8 +45,13 @@ engine = Engine() def add(a, b): return a + b +async def add_async(a, b): + # async function can also be used as the job function + await asyncio.sleep(1.0) + return a + b + async def main(): - job1 = ProcessJob(add, args=(1, 2)) + job1 = ProcessJob(add_async, args=(1, 2)) job2 = ProcessJob(add, args=(job1.future, 4)) await engine.submit_async(job1, job2) await engine.join() @@ -96,73 +101,6 @@ They are executed by different backends and suitable for different scenarios. | `ProcessJob` | `executor.engine.backend.process` | CPU-bound tasks | | `DaskJob` | `executor.engine.backend.dask` | Distributed tasks | -### Extend job types - -There are two extend job types: -[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob) -and -[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob) -. They are used to execute shell commands and launch web applications. -The extend job types can based on the job types above(`LocalJob`, `ThreadJob`, `ProcessJob`, `DaskJob`). - -#### SubprocessJob - -[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob) -is a job type for executing shell commands. -`SubprocessJob` accept a shell command as its argument. It will execute the command in a subprocess: - -```python -from executor.engine import Engine -from executor.engine.job.extend import SubprocessJob - -job = SubprocessJob( - "python -c 'print(1 + 2)'", -) - -with Engine() as engine: - engine.submit(job) - engine.wait_job(job) -``` - - -#### WebappJob - -[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob) -is a job type for launching a web application. -It can accept a function with `ip` and `port` as arguments: - -```python -from executor.engine import Engine -from executor.engine.job.extend import WebappJob -from http.server import HTTPServer, SimpleHTTPRequestHandler - -def run_simple_httpd(ip: str, port: int): - server_addr = (ip, port) - httpd = HTTPServer(server_addr, SimpleHTTPRequestHandler) - httpd.serve_forever() - -with Engine() as engine: - job = WebappJob(run_simple_httpd, ip="127.0.0.1", port=8000) - engine.submit(job) - print("Open your browser and visit http://127.0.0.1:8000") - engine.wait() -``` - -`WebappJob` can also accept a command template as its argument: - -```python -from executor.engine import Engine -from executor.engine.job.extend import WebappJob - -with Engine() as engine: - job = WebappJob( - "python -m http.server -b {ip} {port}", - ip="127.0.0.1", port=8000) - engine.submit(job) - print("Open your browser and visit http://127.0.0.1:8000") - engine.wait() -``` - ### Conditional job execution After another job: @@ -284,6 +222,42 @@ with Engine() as engine: engine.wait() ``` +##### Syntactic sugar for condition combination + +You can use `&` and `|` to combine conditions: + +```python +from executor.engine import Engine, ThreadJob +from executor.engine.job.condition import AfterAnother +import time + +s = set() + +def sleep_and_add_1(t: int): + time.sleep(t) + s.add(t) + +def has_two_elements(): + print(s) + assert len(s) == 2 + +with Engine() as engine: + job1 = ThreadJob(sleep_and_add_1, args=(1,)) + job2 = ThreadJob(sleep_and_add_1, args=(2,)) + job3 = ThreadJob(sleep_and_add_1, args=(3,)) + job4 = ThreadJob( + has_two_elements, + condition=( + ( + AfterAnother(job_id=job1.id) & + AfterAnother(job_id=job2.id) + ) | + AfterAnother(job_id=job3.id) + ) + ) + engine.submit(job4, job3, job2, job1) + engine.wait() +``` #### Custom condition @@ -319,6 +293,123 @@ with Engine() as engine: engine.wait() ``` +### Extend job types + +There are 4 extend job types: +[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob), +[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob) +[`SentinelJob`](api-reference/extend_job.md#executor.engine.job.extend.SentinelJob) +[`CronJob`](api-reference/extend_job.md#executor.engine.job.extend.CronJob) + +They are used to execute shell commands, launch web applications +and schedule jobs based on time. The extend job types can based on the job types above(`LocalJob`, `ThreadJob`, `ProcessJob`, `DaskJob`). + +#### SubprocessJob + +[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob) +is a job type for executing shell commands. +`SubprocessJob` accept a shell command as its argument. It will execute the command in a subprocess: + +```python +from executor.engine import Engine +from executor.engine.job.extend import SubprocessJob + +job = SubprocessJob( + "python -c 'print(1 + 2)'", +) + +with Engine() as engine: + engine.submit(job) + engine.wait_job(job) +``` + + +#### WebappJob + +[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob) +is a job type for launching a web application. +It can accept a function with `ip` and `port` as arguments: + +```python +from executor.engine import Engine +from executor.engine.job.extend import WebappJob +from http.server import HTTPServer, SimpleHTTPRequestHandler + +def run_simple_httpd(ip: str, port: int): + server_addr = (ip, port) + httpd = HTTPServer(server_addr, SimpleHTTPRequestHandler) + httpd.serve_forever() + +with Engine() as engine: + job = WebappJob(run_simple_httpd, ip="127.0.0.1", port=8000) + engine.submit(job) + print("Open your browser and visit http://127.0.0.1:8000") + engine.wait() +``` + +`WebappJob` can also accept a command template as its argument: + +```python +from executor.engine import Engine +from executor.engine.job.extend import WebappJob + +with Engine() as engine: + job = WebappJob( + "python -m http.server -b {ip} {port}", + ip="127.0.0.1", port=8000) + engine.submit(job) + print("Open your browser and visit http://127.0.0.1:8000") + engine.wait() +``` + +#### CronJob + +[`CronJob`](api-reference/extend_job.md#executor.engine.job.extend.CronJob) +is a job type for scheduling jobs periodically. +It should be used with different `TimeCondition` for different scheduling strategies. For example: + +```python +from executor.engine import Engine +from executor.engine.job.extend.cron import ( + CronJob, every, + hourly, daily, weekly, + after_clock, between_clock, + after_weekday, +) + + +def do_something(): + print("hello") + + +with Engine() as engine: + # will run the function every 10 seconds + job1 = CronJob(do_something, every("10s")) + # will run the function every minute + job2 = CronJob(do_something, every("1m")) + # will run the function every hour + job3 = CronJob(do_something, hourly) + # will run the function every day at 12:00 + job4 = CronJob(do_something, daily & after_clock("12:00")) + # will run the function every week on Monday at 12:00 + job5 = CronJob( + do_something, + weekly & after_weekday("Monday") & after_clock("12:00")) + # will run the function every 5min at the night + job6 = CronJob(do_something, every("5m") & between_clock("18:00", "24:00")) + engine.submit(job1, job2, job3, job4, job5, job6) + engine.wait() +``` + +See [CronJob](api-reference/extend_job.md#executor.engine.job.extend.CronJob) for more details. + +!!! info + `CronJob` is a special kind of + [`SentinelJob`](api-reference/extend_job.md#executor.engine.job.extend.SentinelJob), + `SentinelJob` is a more general kind of job, it will be + submit other jobs when the condition is satisfied. + + ### Generator support `executor.engine` supports generator job, which is a special job that returns a generator. diff --git a/executor/engine/__init__.py b/executor/engine/__init__.py index 41de325..713a802 100644 --- a/executor/engine/__init__.py +++ b/executor/engine/__init__.py @@ -1,7 +1,7 @@ from .core import Engine, EngineSetting from .job import LocalJob, ThreadJob, ProcessJob -__version__ = '0.2.9' +__version__ = '0.3.0' __all__ = [ 'Engine', 'EngineSetting', diff --git a/executor/engine/job/condition.py b/executor/engine/job/condition.py index b2c3bed..47f62da 100644 --- a/executor/engine/job/condition.py +++ b/executor/engine/job/condition.py @@ -187,7 +187,7 @@ class EveryPeriod(TimeCondition): """ period_str: str last_submitted_at: T.Optional[datetime] = None - immediate: bool = True + immediate: bool = False def satisfy(self, _) -> bool: period = _parse_period_str(self.period_str) diff --git a/executor/engine/job/extend/__init__.py b/executor/engine/job/extend/__init__.py index 1ef9379..5d2fd17 100644 --- a/executor/engine/job/extend/__init__.py +++ b/executor/engine/job/extend/__init__.py @@ -1,5 +1,7 @@ from .subprocess import SubprocessJob from .webapp import WebappJob +from .sentinel import SentinelJob +from .cron import CronJob -__all__ = ["SubprocessJob", "WebappJob"] +__all__ = ["SubprocessJob", "WebappJob", "SentinelJob", "CronJob"] diff --git a/executor/engine/job/extend/cron.py b/executor/engine/job/extend/cron.py index 7648954..dee4a7b 100644 --- a/executor/engine/job/extend/cron.py +++ b/executor/engine/job/extend/cron.py @@ -13,8 +13,8 @@ every = EveryPeriod daily = EveryPeriod("1d") +weekly = EveryPeriod("7d") hourly = EveryPeriod("1h") -monthly = EveryPeriod("1m") before_clock = BeforeClock after_clock = AfterClock diff --git a/executor/engine/job/extend/sentinel.py b/executor/engine/job/extend/sentinel.py index 8d663a9..0547bec 100644 --- a/executor/engine/job/extend/sentinel.py +++ b/executor/engine/job/extend/sentinel.py @@ -43,7 +43,7 @@ def SentinelJob( else: base_class = job_type - async def submitter(__engine__: "Engine"): + async def sentinel(__engine__: "Engine"): while True: if sentinel_condition.satisfy(__engine__): job = base_class(func, **attrs) @@ -51,7 +51,7 @@ async def submitter(__engine__: "Engine"): await asyncio.sleep(time_delta) sentinel_job = LocalJob( - submitter, + sentinel, **sentinel_attrs ) return sentinel_job