Skip to content

Commit

Permalink
update doc; fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Nanguage committed Jan 3, 2025
1 parent 097eb54 commit 2df7bb1
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 80 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ By harnessing the capabilities of Executor Engine, users can effortlessly constr

+ 📚 Support multiple job types
* `LocalJob`, `ThreadJob`, `ProcessJob`, `DaskJob`
* Extend job types: `SubprocessJob`, `WebappJob`
* Extend job types: `SubprocessJob`, `WebappJob`, `CronJob`
+ 🔧 Job management
* Job dependency management.
* Job status: Pending, Running, Done, Failed, Cancelled.
Expand Down
43 changes: 39 additions & 4 deletions docs/api-reference/condition.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,57 @@
options:
show_root_heading: true

::: executor.engine.job.condition.AfterTimepoint
::: executor.engine.job.condition.Combination
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.Combination
::: executor.engine.job.condition.AllSatisfied
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.AllSatisfied
::: executor.engine.job.condition.AnySatisfied
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.AnySatisfied
::: executor.engine.job.condition.TimeCondition
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.EveryPeriod
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.AfterClock
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.BeforeClock
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.AfterWeekday
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.BeforeWeekday
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.AfterTimepoint
handler: python
options:
show_root_heading: true

::: executor.engine.job.condition.BeforeTimepoint
handler: python
options:
show_root_heading: true
13 changes: 12 additions & 1 deletion docs/api-reference/extend_job.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,15 @@
::: executor.engine.job.extend.WebappJob
handler: python
options:
show_root_heading: true
show_root_heading: true

::: executor.engine.job.extend.SentinelJob
handler: python
options:
show_root_heading: true

::: executor.engine.job.extend.CronJob
handler: python
options:
show_root_heading: true

227 changes: 159 additions & 68 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ engine = Engine()
def add(a, b):
return a + b

async def add_async(a, b):
# async function can also be used as the job function
await asyncio.sleep(1.0)
return a + b

async def main():
job1 = ProcessJob(add, args=(1, 2))
job1 = ProcessJob(add_async, args=(1, 2))
job2 = ProcessJob(add, args=(job1.future, 4))
await engine.submit_async(job1, job2)
await engine.join()
Expand Down Expand Up @@ -96,73 +101,6 @@ They are executed by different backends and suitable for different scenarios.
| `ProcessJob` | `executor.engine.backend.process` | CPU-bound tasks |
| `DaskJob` | `executor.engine.backend.dask` | Distributed tasks |

### Extend job types

There are two extend job types:
[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob)
and
[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob)
. They are used to execute shell commands and launch web applications.
The extend job types can based on the job types above(`LocalJob`, `ThreadJob`, `ProcessJob`, `DaskJob`).

#### SubprocessJob

[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob)
is a job type for executing shell commands.
`SubprocessJob` accept a shell command as its argument. It will execute the command in a subprocess:

```python
from executor.engine import Engine
from executor.engine.job.extend import SubprocessJob

job = SubprocessJob(
"python -c 'print(1 + 2)'",
)

with Engine() as engine:
engine.submit(job)
engine.wait_job(job)
```


#### WebappJob

[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob)
is a job type for launching a web application.
It can accept a function with `ip` and `port` as arguments:

```python
from executor.engine import Engine
from executor.engine.job.extend import WebappJob
from http.server import HTTPServer, SimpleHTTPRequestHandler

def run_simple_httpd(ip: str, port: int):
server_addr = (ip, port)
httpd = HTTPServer(server_addr, SimpleHTTPRequestHandler)
httpd.serve_forever()

with Engine() as engine:
job = WebappJob(run_simple_httpd, ip="127.0.0.1", port=8000)
engine.submit(job)
print("Open your browser and visit http://127.0.0.1:8000")
engine.wait()
```

`WebappJob` can also accept a command template as its argument:

```python
from executor.engine import Engine
from executor.engine.job.extend import WebappJob

with Engine() as engine:
job = WebappJob(
"python -m http.server -b {ip} {port}",
ip="127.0.0.1", port=8000)
engine.submit(job)
print("Open your browser and visit http://127.0.0.1:8000")
engine.wait()
```

### Conditional job execution

After another job:
Expand Down Expand Up @@ -284,6 +222,42 @@ with Engine() as engine:
engine.wait()
```

##### Syntactic sugar for condition combination

You can use `&` and `|` to combine conditions:

```python
from executor.engine import Engine, ThreadJob
from executor.engine.job.condition import AfterAnother
import time

s = set()

def sleep_and_add_1(t: int):
time.sleep(t)
s.add(t)

def has_two_elements():
print(s)
assert len(s) == 2

with Engine() as engine:
job1 = ThreadJob(sleep_and_add_1, args=(1,))
job2 = ThreadJob(sleep_and_add_1, args=(2,))
job3 = ThreadJob(sleep_and_add_1, args=(3,))
job4 = ThreadJob(
has_two_elements,
condition=(
(
AfterAnother(job_id=job1.id) &
AfterAnother(job_id=job2.id)
) |
AfterAnother(job_id=job3.id)
)
)
engine.submit(job4, job3, job2, job1)
engine.wait()
```

#### Custom condition

Expand Down Expand Up @@ -319,6 +293,123 @@ with Engine() as engine:
engine.wait()
```

### Extend job types

There are 4 extend job types:
[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob),
[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob)
[`SentinelJob`](api-reference/extend_job.md#executor.engine.job.extend.SentinelJob)
[`CronJob`](api-reference/extend_job.md#executor.engine.job.extend.CronJob)

They are used to execute shell commands, launch web applications
and schedule jobs based on time. The extend job types can based on the job types above(`LocalJob`, `ThreadJob`, `ProcessJob`, `DaskJob`).

#### SubprocessJob

[`SubprocessJob`](api-reference/extend_job.md#executor.engine.job.extend.SubprocessJob)
is a job type for executing shell commands.
`SubprocessJob` accept a shell command as its argument. It will execute the command in a subprocess:

```python
from executor.engine import Engine
from executor.engine.job.extend import SubprocessJob

job = SubprocessJob(
"python -c 'print(1 + 2)'",
)

with Engine() as engine:
engine.submit(job)
engine.wait_job(job)
```


#### WebappJob

[`WebappJob`](api-reference/extend_job.md#executor.engine.job.extend.WebappJob)
is a job type for launching a web application.
It can accept a function with `ip` and `port` as arguments:

```python
from executor.engine import Engine
from executor.engine.job.extend import WebappJob
from http.server import HTTPServer, SimpleHTTPRequestHandler

def run_simple_httpd(ip: str, port: int):
server_addr = (ip, port)
httpd = HTTPServer(server_addr, SimpleHTTPRequestHandler)
httpd.serve_forever()

with Engine() as engine:
job = WebappJob(run_simple_httpd, ip="127.0.0.1", port=8000)
engine.submit(job)
print("Open your browser and visit http://127.0.0.1:8000")
engine.wait()
```

`WebappJob` can also accept a command template as its argument:

```python
from executor.engine import Engine
from executor.engine.job.extend import WebappJob

with Engine() as engine:
job = WebappJob(
"python -m http.server -b {ip} {port}",
ip="127.0.0.1", port=8000)
engine.submit(job)
print("Open your browser and visit http://127.0.0.1:8000")
engine.wait()
```

#### CronJob

[`CronJob`](api-reference/extend_job.md#executor.engine.job.extend.CronJob)
is a job type for scheduling jobs periodically.
It should be used with different `TimeCondition` for different scheduling strategies. For example:

```python
from executor.engine import Engine
from executor.engine.job.extend.cron import (
CronJob, every,
hourly, daily, weekly,
after_clock, between_clock,
after_weekday,
)


def do_something():
print("hello")


with Engine() as engine:
# will run the function every 10 seconds
job1 = CronJob(do_something, every("10s"))
# will run the function every minute
job2 = CronJob(do_something, every("1m"))
# will run the function every hour
job3 = CronJob(do_something, hourly)
# will run the function every day at 12:00
job4 = CronJob(do_something, daily & after_clock("12:00"))
# will run the function every week on Monday at 12:00
job5 = CronJob(
do_something,
weekly & after_weekday("Monday") & after_clock("12:00"))
# will run the function every 5min at the night
job6 = CronJob(do_something, every("5m") & between_clock("18:00", "24:00"))
engine.submit(job1, job2, job3, job4, job5, job6)
engine.wait()
```

See [CronJob](api-reference/extend_job.md#executor.engine.job.extend.CronJob) for more details.

!!! info
`CronJob` is a special kind of
[`SentinelJob`](api-reference/extend_job.md#executor.engine.job.extend.SentinelJob),
`SentinelJob` is a more general kind of job, it will be
submit other jobs when the condition is satisfied.


### Generator support

`executor.engine` supports generator job, which is a special job that returns a generator.
Expand Down
2 changes: 1 addition & 1 deletion executor/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .core import Engine, EngineSetting
from .job import LocalJob, ThreadJob, ProcessJob

__version__ = '0.2.9'
__version__ = '0.3.0'

__all__ = [
'Engine', 'EngineSetting',
Expand Down
2 changes: 1 addition & 1 deletion executor/engine/job/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class EveryPeriod(TimeCondition):
"""
period_str: str
last_submitted_at: T.Optional[datetime] = None
immediate: bool = True
immediate: bool = False

def satisfy(self, _) -> bool:
period = _parse_period_str(self.period_str)
Expand Down
4 changes: 3 additions & 1 deletion executor/engine/job/extend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .subprocess import SubprocessJob
from .webapp import WebappJob
from .sentinel import SentinelJob
from .cron import CronJob


__all__ = ["SubprocessJob", "WebappJob"]
__all__ = ["SubprocessJob", "WebappJob", "SentinelJob", "CronJob"]
2 changes: 1 addition & 1 deletion executor/engine/job/extend/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

every = EveryPeriod
daily = EveryPeriod("1d")
weekly = EveryPeriod("7d")
hourly = EveryPeriod("1h")
monthly = EveryPeriod("1m")

before_clock = BeforeClock
after_clock = AfterClock
Expand Down
4 changes: 2 additions & 2 deletions executor/engine/job/extend/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ def SentinelJob(
else:
base_class = job_type

async def submitter(__engine__: "Engine"):
async def sentinel(__engine__: "Engine"):
while True:
if sentinel_condition.satisfy(__engine__):
job = base_class(func, **attrs)
await __engine__.submit_async(job)
await asyncio.sleep(time_delta)

sentinel_job = LocalJob(
submitter,
sentinel,
**sentinel_attrs
)
return sentinel_job

0 comments on commit 2df7bb1

Please sign in to comment.