Skip to content

Commit

Permalink
Debugged for all combinations of (_LIMIT_WORKERS_RAM, _LIMIT_WORKERS_…
Browse files Browse the repository at this point in the history
…RAM), documentation extended
  • Loading branch information
luav committed Jun 27, 2017
1 parent 732a57d commit 69ab62f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 77 deletions.
127 changes: 66 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# PyExPool

A Lightweight Multi-Process Execution Pool to schedule Jobs execution with *per-job timeout*, optionally grouping them into Tasks and specifying optional execution parameters considering NUMA architecture:
A Lightweight Multi-Process Execution Pool to schedule Jobs execution with *per-job timeout*, optionally grouping them into Tasks and specifying optional execution parameters considering NUMA architecture peculiarities:

- automatic CPU affinity management and maximization of the dedicated CPU cache for a worker process
- automatic rescheduling and balancing (reduction) of the worker processes and on low memory condition for the in-RAM computations (requires [psutil](https://pypi.python.org/pypi/psutil), can be disabled)
- chained termination of related worker processes and jobs rescheduling to satisfy timeout and memory limit constraints
- automatic rescheduling and *load balancing* (reduction) of the worker processes and on low memory condition for the *in-RAM computations* (requires [psutil](https://pypi.python.org/pypi/psutil), can be disabled)
- *chained termination* of related worker processes and jobs rescheduling to satisfy *timeout* and *memory limit* constraints
- timeout per each Job (it was the main initial motivation to implement this module, because this feature is not provided by any Python implementation out of the box)
- onstart/ondone callbacks, ondone is called only on successful completion (not termination) for both Jobs and Tasks (group of jobs)
- onstart/ondone *callbacks*, ondone is called only on successful completion (not termination) for both Jobs and Tasks (group of jobs)
- stdout/err output, which can be redirected to any custom file or PIPE
- custom parameters for each Job and respective owner Task besides the name/id

Expand All @@ -16,16 +16,19 @@ Implemented as a *single-file module* to be *easily included into your project a
The main purpose of this single-file module is the **asynchronous execution of modules and external executables with cache / parallelization tuning and automatic balancing of the worker processes for the in-RAM computations**.
In case asynchronous execution of the *Python functions* is required and usage of external dependences is not a problem, or automatic jobs scheduling for in-RAM computations is not required, then more handy and straightforward approach is to use [Pebble](https://pypi.python.org/pypi/Pebble) library.

The load balancing is enabled when global variables `_LIMIT_WORKERS_RAM` and `_CHAINED_CONSTRAINTS` are set, jobs categories and relative size (if known) specified. The balancing is performed to use as much RAM and CPU resources as possible performing in-RAM computations and meeting timeout, memory limit and CPU cache (processes affinity) constraints. Large executing jobs are rescheduled for the later execution with less number of worker processes after the completion of smaller jobs. The number of workers is reduced automatically (balanced) on the jobs queue processing. It is recommended to add jobs in the order of the increasing memory/time complexity if possible to reduce the number of worker process terminations for the jobs execution postponing on rescheduling.

\author: (c) Artem Lutov <[email protected]>
\organizations: [eXascale Infolab](http://exascale.info/), [Lumais](http://www.lumais.com/), [ScienceWise](http://sciencewise.info/)
\date: 2016-01
\date: 2017-06

## Content
- [Dependencies](#dependencies)
- [API](#api)
- [Job](#job)
- [Task](#task)
- [ExecPool](#execpool)
- [Accessory Routines](#accessory-routines)
- [Usage](#usage)
- [Usage Example](#usage-example)
- [Failsafe Termination](#failsafe-termination)
Expand All @@ -47,7 +50,7 @@ $ sudo pip install mock

## API

Flexible API provides *automatic CPU affinity management, maximization of the dedicated CPU cache, limitation of the minimal dedicated RAM per worker process, balancing of the worker processes and rescheduling of chains of related jobs on low memory condition for the in-RAM computations*, optional automatic restart of jobs on timeout, access to job's process, parent task, start and stop execution time and more...
Flexible API provides *automatic CPU affinity management, maximization of the dedicated CPU cache, limitation of the minimal dedicated RAM per worker process, balancing of the worker processes and rescheduling of chains of the related jobs on low memory condition for the in-RAM computations*, optional automatic restart of jobs on timeout, access to job's process, parent task, start and stop execution time and more...
`ExecPool` represents a pool of worker processes to execute `Job`s that can be grouped into `Tasks`s for more flexible management.

### Job
Expand Down Expand Up @@ -143,6 +146,63 @@ Task(name, timeout=0, onstart=None, ondone=None, params=None, stdout=sys.stdout,
```
### ExecPool
```python
ExecPool(wksnum=cpu_count(), afnstep=None, vmlimit=0., latency=0.)
"""Multi-process execution pool of jobs
wksnum - number of resident worker processes, >=1. The reasonable value is
<= NUMA nodes * node CPUs, which is typically returned by cpu_count(),
where node CPUs = CPU cores * HW treads per core.
To guarantee minimal average RAM per a process, for example 2.5 Gb:
wksnum = min(cpu_count(), max(ramfracs(2.5), 1))
afnstep - affinity step, integer if applied. Used to bind worker to the
processing units to have warm cache for single thread workers.
Typical values:
None - do not use affinity at all (recommended for multi-threaded workers),
1 - maximize parallelization (the number of worker processes = CPU units),
cpucorethreads() - maximize the dedicated CPU cache (the number of
worker processes = CPU cores = CPU units / hardware treads per CPU core).
NOTE: specification of the afnstep might cause reduction of the workers number.
vmlimit - limit total amount of VM (automatically reduced to the amount of physical
RAM if the larger value is specified) in gigabytes that can be used by worker
processes to provide in-RAM computations.
Dynamically reduce the number of workers to consume total virtual memory
not more than specified. The workers are rescheduled starting from the
most memory-heavy processes. >= 0
NOTE:
- applicable only if _LIMIT_WORKERS_RAM
- 0 means unlimited (some jobs might be [partially] swapped)
- value > 0 is automatically limited with total physical RAM to process
jobs in RAM almost without the swapping
latency - approximate minimal latency of the workers monitoring in sec, float >= 0.
0 means automatically defined value (recommended, typically 2-3 sec).
"""

execute(job, async=True):
"""Schedule the job for the execution
job - the job to be executed, instance of Job
async - async execution or wait until execution completed
NOTE: sync tasks are started at once
return - 0 on successful execution, process return code otherwise
"""

join(timeout=0):
"""Execution cycle
timeout - execution timeout in seconds before the workers termination, >= 0.
0 means absence of the timeout. The time is measured SINCE the first job
was scheduled UNTIL the completion of all scheduled jobs.
return - True on graceful completion, False on termination by the specified timeout
"""

__del__():
"""Force termination of the pool"""

__finalize__():
"""Force termination of the pool"""
```
### Accessory Routines
```
def ramfracs(fracsize):
"""Evaluate the minimal number of RAM fractions of the specified size in GB
Expand Down Expand Up @@ -186,63 +246,8 @@ def afnicpu(iafn, corethreads=1, nodes=1, crossnodes=True):
return CPU index respective to the specified index in the affinity table
"""
ExecPool(workers=cpu_count(), afnstep=None, vmlimit=0., latency=0.)
"""Multi-process execution pool of jobs
workers - number of resident worker processes, >=1. The reasonable value is
<= NUMA nodes * node CPUs, which is typically returned by cpu_count(),
where node CPUs = CPU cores * HW treads per core.
To guarantee minimal average RAM per a process, for example 2.5 GB:
workers = min(cpu_count(), max(ramfracs(2.5), 1))
afnstep - affinity step, integer if applied. Used to bind worker to the
processing units to have warm cache for single thread workers.
Typical values:
None - do not use affinity at all (recommended for multi-threaded workers),
1 - maximize parallelization (the number of worker processes = CPU units),
cpucorethreads() - maximize the dedicated CPU cache (the number of
worker processes = CPU cores = CPU units / hardware treads per CPU core).
NOTE: specification of the afnstep might cause reduction of the workers number.
vmlimit - limit total amount of VM (automatically reduced to the amount of physical
RAM if the larger value is specified) in gigabytes that can be used by worker
processes to provide in-RAM computations.
Dynamically reduce the number of workers to consume total virtual memory
not more than specified. The workers are rescheduled starting from the
most memory-heavy processes. >= 0
NOTE:
- applicable only if _LIMIT_WORKERS_RAM
- 0 means unlimited (some jobs might be [partially] swapped)
- value > 0 is automatically limited with total physical RAM to process
jobs in RAM almost without the swapping
latency - approximate minimal latency of the workers monitoring in sec, float >= 0.
0 means automatically defined value (recommended, typically 2-3 sec).
"""

execute(job, async=True):
"""Schedule the job for the execution
job - the job to be executed, instance of Job
async - async execution or wait until execution completed
NOTE: sync tasks are started at once
return - 0 on successful execution, process return code otherwise
"""

join(timeout=0):
"""Execution cycle
timeout - execution timeout in seconds before the workers termination, >= 0.
0 means absence of the timeout. The time is measured SINCE the first job
was scheduled UNTIL the completion of all scheduled jobs.
return - True on graceful completion, False on termination by the specified timeout
"""

__del__():
"""Force termination of the pool"""

__finalize__():
"""Force termination of the pool"""
```


## Usage

Target version of the Python is 2.7+ including 3.x, also works fine on PyPy.
Expand Down
52 changes: 36 additions & 16 deletions mpepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,45 @@
# -*- coding: utf-8 -*-

"""
\descr: Multi-Process Execution Pool to schedule Jobs execution with per-Job timeout,
optionally grouping them into Tasks and specifying execution paremeters:
- timeout per each Job (it was the main motivation to implemtent this module)
- onstart/ondone callbacks, ondone is called only on successful completion (not termination)
- stdout/err output, which can be redireted to any custom file or PIPE
- custom parameters for each job and task besides the name/id
\descr: Multi-Process Execution Pool to schedule Jobs execution with per-job timeout,
optionally grouping them into Tasks and specifying optional execution parameters
considering NUMA architecture:
- automatic CPU affinity management and maximization of the dedicated CPU cache
for a worker process
- automatic rescheduling and balancing (reduction) of the worker processes and on
low memory condition for the in-RAM computations (requires psutil, can be disabled)
- chained termination of related worker processes and jobs rescheduling to satisfy
timeout and memory limit constraints
- timeout per each Job (it was the main initial motivation to implement this module,
because this feature is not provided by any Python implementation out of the box)
- onstart/ondone callbacks, ondone is called only on successful completion
(not termination) for both Jobs and Tasks (group of jobs)
- stdout/err output, which can be redirected to any custom file or PIPE
- custom parameters for each Job and respective owner Task besides the name/id
Flexible API provides optional automatic restart of jobs on timeout, access to job's process,
parent task, start and stop execution time and much more...
Global functionality parameters:
Core parameters specified as global variables:
_LIMIT_WORKERS_RAM - limit the amount of virtual memory (<= RAM) used by worker processes,
requires psutil import
_CHAINED_CONSTRAINTS - terminate related jobs on terminating any job by the execution
constraints (timeout or RAM limit)
The load balancing is enabled when global variables _LIMIT_WORKERS_RAM and _CHAINED_CONSTRAINTS
are set, jobs categories and relative size (if known) specified. The balancing is performed
to use as much RAM and CPU resources as possible performing in-RAM computations and meeting
timeout, memory limit and CPU cache (processes affinity) constraints.
Large executing jobs are rescheduled for the later execution with less number of worker
processes after the completion of smaller jobs. The number of workers is reduced automatically
(balanced) on the jobs queue processing. It is recommended to add jobs in the order of the
increasing memory/time complexity if possible to reduce the number of worker process
terminations for the jobs execution postponing on rescheduling.
\author: (c) Artem Lutov <[email protected]>
\organizations: eXascale Infolab <http://exascale.info/>, Lumais <http://www.lumais.com/>, ScienceWise <http://sciencewise.info/>
\date: 2015-07
\date: 2015-07 (v1), 2017-06 (v2)
"""

from __future__ import print_function, division # Required for stderr output, must be the first import
Expand Down Expand Up @@ -279,13 +299,13 @@ def __init__(self, name, workdir=None, args=(), timeout=0, ontimeout=False, task
self._omitafn = omitafn
if _LIMIT_WORKERS_RAM or _CHAINED_CONSTRAINTS:
self.size = size # Size of the processing data
# Consumed VM on execution in gigabytes or the least expected (inherited from the
# related jobs having the same category and non-smaller size)
self.vmem = 0.
if _CHAINED_CONSTRAINTS:
self.category = category # Job name
self.slowdown = slowdown # Execution slowdown ratio, ~ 1 / exec_speed
if _LIMIT_WORKERS_RAM:
# Consumed VM on execution in gigabytes or the least expected (inherited from the
# related jobs having the same category and non-smaller size)
self.vmem = 0.
self.wkslim = None # Worker processes limit (max number) on the job postponing if any


Expand Down Expand Up @@ -516,9 +536,8 @@ def __init__(self, wksnum=cpu_count(), afnstep=None, vmlimit=0., latency=0.):
# Virtual memory tracing attributes
# Dedicate at least 256 Mb for OS using not more than 99% of RAM
self._vmlimit = 0. if not _LIMIT_WORKERS_RAM else max(0, min(vmlimit, _RAM_SIZE * 0.99 - 0.25)) # in Gb
#self.vmtotal = 0. # Virtual memory used by all workers in gigabytes
# Execution rescheduling attributes
self._latency = latency if latency else 2 + (not not self._vmlimit) # Seconds of sleep on pooling
self._latency = latency if latency else 1 + (not not self._vmlimit) # Seconds of sleep on pooling
# Predefined private attributes
self._killCount = 3 # 3 cycles of self._latency, termination wait time
self.__termlock = Lock() # Lock for the __terminate() to avoid simultaneous call by the signal and normal execution flow
Expand Down Expand Up @@ -653,7 +672,7 @@ def __postpone(self, job, reduced, priority=False):
# Update limit of the worker processes of the other larger nonstarted jobs of the same category as this job has,
# and move jobs with the lowest wkslim to the end.
# Note: the update should be made for all nonstarted jobs, not only for the one caused the reduction.
if job.category is not None:
if _CHAINED_CONSTRAINTS and job.category is not None:
jobsnum = len(self._jobs) # The number of jobs
ij = 1 if priority else 0 # Job index
while ij < jobsnum:
Expand Down Expand Up @@ -1379,10 +1398,11 @@ def test_jobMemlimSimple(self):
self.assertGreaterEqual(jmsDvs.tstop - jmsDvs.tstart, worktime) # Smaller size of the ralted chained job to the vioated origin should not cause termination
self.assertGreaterEqual(jms1.tstop - jms1.tstart, worktime) # Independent job should have graceful completion
self.assertFalse(jms1.proc.returncode) # Errcode code is 0 on the gracefull completion
self.assertIsNone(jmsDvl1.tstart) # Postponed job should be terminated before being started by the chaned relation on the memory-violating origin
if _CHAINED_CONSTRAINTS:
self.assertIsNone(jmsDvl1.tstart) # Postponed job should be terminated before being started by the chained relation on the memory-violating origin
self.assertIsNone(jmsDvl2.tstart) # Postponed job should be terminated before being started by the chained relation on the memory-violating origin
#self.assertLess(jmsDvl1.tstop - jmsDvl1.tstart, worktime) # Early termination by the chained retalion to the mem violated origin
self.assertGreaterEqual(jms2.tstop - jms2.tstart, worktime) # Independent job should have graceful completion
self.assertIsNone(jmsDvl2.tstart) # Postponed job should be terminated before being started by the chaned relation on the memory-violating origin


@unittest.skipUnless(_LIMIT_WORKERS_RAM, 'Requires _LIMIT_WORKERS_RAM')
Expand Down

0 comments on commit 69ab62f

Please sign in to comment.