Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[17.0][UPD] Forward port changes from 14/15/16 #731

Open
wants to merge 14 commits into
base: 17.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,39 @@ is at the top of the graph. In the example above, if it was called on
``group_a``, then ``group_b`` would never be delayed (but a warning
would be shown).

It is also possible to split a job into several jobs, each one
processing a part of the work. This can be useful to avoid very long
jobs, parallelize some task and get more specific errors. Usage is as
follows:

.. code:: python

def button_split_delayable(self):
(
self # Can be a big recordset, let's say 1000 records
.delayable()
.generate_thumbnail((50, 50))
.set(priority=30)
.set(description=_("generate xxx"))
.split(50) # Split the job in 20 jobs of 50 records each
.delay()
)

The ``split()`` method takes a ``chain`` boolean keyword argument. If
set to True, the jobs will be chained, meaning that the next job will
only start when the previous one is done:

.. code:: python

def button_increment_var(self):
(
self
.delayable()
.increment_counter()
.split(1, chain=True) # Will exceute the jobs one after the other
.delay()
)

Enqueing Job Options
~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -431,7 +464,7 @@ running Odoo**
When you are developing (ie: connector modules) you might want to bypass
the queue job and run your code immediately.

To do so you can set QUEUE_JOB\__NO_DELAY=1 in your enviroment.
To do so you can set QUEUE_JOB\__NO_DELAY=1 in your environment.

**Bypass jobs in tests**

Expand Down
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{
"name": "Job Queue",
"version": "17.0.1.1.1",
"version": "17.0.1.1.2",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
Expand Down
3 changes: 3 additions & 0 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
_logger.debug("%s started", job)

job.perform()
# Triggers any stored computed fields before calling 'set_done'
# so that will be part of the 'exec_time'
env.flush_all()

Check warning on line 39 in queue_job/controllers/main.py

View check run for this annotation

Codecov / codecov/patch

queue_job/controllers/main.py#L39

Added line #L39 was not covered by tests
job.set_done()
job.store()
env.flush_all()
Expand Down
62 changes: 53 additions & 9 deletions queue_job/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@
elif jobs_count == 1:
if jobs[0].graph_uuid:
raise ValueError(
f"Job {jobs[0]} is a single job, it should not" " have a graph uuid"
f"Job {jobs[0]} is a single job, it should not have a graph uuid"
)
else:
graph_uuids = {job.graph_uuid for job in jobs if job.graph_uuid}
Expand Down Expand Up @@ -483,11 +483,10 @@
return [self]

def __repr__(self):
return "Delayable({}.{}({}, {}))".format(
self.recordset,
self._job_method.__name__ if self._job_method else "",
self._job_args,
self._job_kwargs,
return (
f"Delayable({self.recordset}."
f"{self._job_method.__name__ if self._job_method else ''}"
f"({self._job_args}, {self._job_kwargs}))"
)

def __del__(self):
Expand Down Expand Up @@ -525,6 +524,51 @@
"""Delay the whole graph"""
self._graph.delay()

def split(self, size, chain=False):
"""Split the Delayables.

Use `DelayableGroup` or `DelayableChain`
if `chain` is True containing batches of size `size`
"""
if not self._job_method:
raise ValueError("No method set on the Delayable")

total_records = len(self.recordset)

delayables = []
for index in range(0, total_records, size):
recordset = self.recordset[index : index + size]
delayable = Delayable(
recordset,
priority=self.priority,
eta=self.eta,
max_retries=self.max_retries,
description=self.description,
channel=self.channel,
identity_key=self.identity_key,
)
# Update the __self__
delayable._job_method = getattr(recordset, self._job_method.__name__)
delayable._job_args = self._job_args
delayable._job_kwargs = self._job_kwargs

delayables.append(delayable)

description = self.description or (
self._job_method.__doc__.splitlines()[0].strip()
if self._job_method.__doc__
else f"{self.recordset._name}.{self._job_method.__name__}"
)
for index, delayable in enumerate(delayables):
delayable.set(
description=f"{description} (split {index + 1}/{len(delayables)})"
)

# Prevent warning on deletion
self._generated_job = True

return (DelayableChain if chain else DelayableGroup)(*delayables)

def _build_job(self):
if self._generated_job:
return self._generated_job
Expand Down Expand Up @@ -611,9 +655,9 @@
return _delay_delayable

def __str__(self):
return "DelayableRecordset({}{})".format(
self.delayable.recordset._name,
getattr(self.delayable.recordset, "_ids", ""),
return (

Check warning on line 658 in queue_job/delay.py

View check run for this annotation

Codecov / codecov/patch

queue_job/delay.py#L658

Added line #L658 was not covered by tests
f"DelayableRecordset({self.delayable.recordset._name}"
f"{getattr(self.delayable.recordset, '_ids', '')})"
)

__repr__ = __str__
12 changes: 10 additions & 2 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,8 @@ def perform(self):

return self.result

def enqueue_waiting(self):
sql = """
def _get_common_dependent_jobs_query(self):
return """
UPDATE queue_job
SET state = %s
FROM (
Expand Down Expand Up @@ -568,9 +568,17 @@ def enqueue_waiting(self):
AND %s = ALL(jobs.parent_states)
AND state = %s;
"""

def enqueue_waiting(self):
sql = self._get_common_dependent_jobs_query()
self.env.cr.execute(sql, (PENDING, self.uuid, DONE, WAIT_DEPENDENCIES))
self.env["queue.job"].invalidate_model(["state"])

def cancel_dependent_jobs(self):
sql = self._get_common_dependent_jobs_query()
self.env.cr.execute(sql, (CANCELLED, self.uuid, CANCELLED, WAIT_DEPENDENCIES))
self.env["queue.job"].invalidate_model(["state"])

def store(self):
"""Store the Job"""
job_model = self.env["queue.job"]
Expand Down
10 changes: 10 additions & 0 deletions queue_job/migrations/17.0.1.1.2/pre-migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

from odoo.tools.sql import table_exists


def migrate(cr, version):
if table_exists(cr, "queue_job"):
# Drop index 'queue_job_identity_key_state_partial_index',
# it will be recreated during the update
cr.execute("DROP INDEX IF EXISTS queue_job_identity_key_state_partial_index;")
6 changes: 4 additions & 2 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class QueueJob(models.Model):
state = fields.Selection(STATES, readonly=True, required=True, index=True)
priority = fields.Integer()
exc_name = fields.Char(string="Exception", readonly=True)
exc_message = fields.Char(string="Exception Message", readonly=True)
exc_message = fields.Char(string="Exception Message", readonly=True, tracking=True)
exc_info = fields.Text(string="Exception Info", readonly=True)
result = fields.Text(readonly=True)

Expand Down Expand Up @@ -139,7 +139,7 @@ def init(self):
self._cr.execute(
"CREATE INDEX queue_job_identity_key_state_partial_index "
"ON queue_job (identity_key) WHERE state in ('pending', "
"'enqueued') AND identity_key IS NOT NULL;"
"'enqueued', 'wait_dependencies') AND identity_key IS NOT NULL;"
)

@api.depends("records")
Expand Down Expand Up @@ -326,6 +326,8 @@ def _change_job_state(self, state, result=None):
elif state == CANCELLED:
job_.set_cancelled(result=result)
job_.store()
record.env["queue.job"].flush_model()
job_.cancel_dependent_jobs()
else:
raise ValueError("State not supported: %s" % state)

Expand Down
25 changes: 18 additions & 7 deletions queue_job/models/queue_job_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@
try:
# as json can't have integers as keys and the field is stored
# as json, convert back to int
retry_pattern = {
int(try_count): postpone_seconds
for try_count, postpone_seconds in self.retry_pattern.items()
}
retry_pattern = {}
for try_count, postpone_value in self.retry_pattern.items():
if isinstance(postpone_value, int):
retry_pattern[int(try_count)] = postpone_value
else:
retry_pattern[int(try_count)] = tuple(postpone_value)

Check warning on line 163 in queue_job/models/queue_job_function.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job_function.py#L163

Added line #L163 was not covered by tests
except ValueError:
_logger.error(
"Invalid retry pattern for job function %s,"
Expand Down Expand Up @@ -187,8 +189,9 @@
def _retry_pattern_format_error_message(self):
return _(
"Unexpected format of Retry Pattern for {}.\n"
"Example of valid format:\n"
"{{1: 300, 5: 600, 10: 1200, 15: 3000}}"
"Example of valid formats:\n"
"{{1: 300, 5: 600, 10: 1200, 15: 3000}}\n"
"{{1: (1, 10), 5: (11, 20), 10: (21, 30), 15: (100, 300)}}"
).format(self.name)

@api.constrains("retry_pattern")
Expand All @@ -201,12 +204,20 @@
all_values = list(retry_pattern) + list(retry_pattern.values())
for value in all_values:
try:
int(value)
self._retry_value_type_check(value)
except ValueError as ex:
raise exceptions.UserError(
record._retry_pattern_format_error_message()
) from ex

def _retry_value_type_check(self, value):
if isinstance(value, (tuple | list)):
if len(value) != 2:
raise ValueError
[self._retry_value_type_check(element) for element in value]
return
int(value)

def _related_action_format_error_message(self):
return _(
"Unexpected format of Related Action for {}.\n"
Expand Down
34 changes: 33 additions & 1 deletion queue_job/readme/USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,38 @@ is at the top of the graph. In the example above, if it was called on
`group_a`, then `group_b` would never be delayed (but a warning would be
shown).

It is also possible to split a job into several jobs, each one processing
a part of the work. This can be useful to avoid very long jobs, parallelize
some task and get more specific errors. Usage is as follows:

``` python
def button_split_delayable(self):
(
self # Can be a big recordset, let's say 1000 records
.delayable()
.generate_thumbnail((50, 50))
.set(priority=30)
.set(description=_("generate xxx"))
.split(50) # Split the job in 20 jobs of 50 records each
.delay()
)
```

The `split()` method takes a `chain` boolean keyword argument. If set to
True, the jobs will be chained, meaning that the next job will only start
when the previous one is done:

``` python
def button_increment_var(self):
(
self
.delayable()
.increment_counter()
.split(1, chain=True) # Will exceute the jobs one after the other
.delay()
)
```

### Enqueing Job Options

- priority: default is 10, the closest it is to 0, the faster it will be
Expand Down Expand Up @@ -258,7 +290,7 @@ running Odoo**
When you are developing (ie: connector modules) you might want to bypass
the queue job and run your code immediately.

To do so you can set QUEUE_JOB\_\_NO_DELAY=1 in your enviroment.
To do so you can set QUEUE_JOB\_\_NO_DELAY=1 in your environment.

**Bypass jobs in tests**

Expand Down
1 change: 1 addition & 0 deletions queue_job/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from . import test_runner_channels
from . import test_runner_runner
from . import test_delayable
from . import test_delayable_split
from . import test_json_field
from . import test_model_job_channel
from . import test_model_job_function
Expand Down
Loading
Loading