Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow job_file_dir_mkdtmp to be path patterns. #185

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions law.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,13 @@
; Default: "$LAW_JOB_FILE_DIR", "tempfile.gettempdir()"

; job_file_dir_mkdtemp
; Description: A boolean flag that decides if a temporary directory inside job_file_dir should be
; created in which the actual job files are stored. It is recommended to set this option to "True"
; in order to have a clear separation between files created by different job file factories.
; Type: boolean
; Description: A boolean flag or string that decides if a temporary directory inside job_file_dir
; should be created in which the actual job files are stored. It is discouraged to set this option
; to "False" in order to have a clear separation between files created by different job file
; factories. When a string is used it serves as a prefix for name of the temporary directory. More
; than three X's will be replaced with random characters and template variables "{{task_id}}" and
; "{{task_family}}" will be expanded with task values.
; Type: boolean, string
; Default: True

; job_file_dir_cleanup
Expand Down
22 changes: 21 additions & 1 deletion law/contrib/arc/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from abc import abstractmethod
from collections import OrderedDict

import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -211,9 +214,26 @@ def arc_job_file_factory_cls(self):
return ARCJobFileFactory

def arc_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.arc_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.arc_job_file_factory_defaults, kwargs)
return self.arc_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "arc_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def arc_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/cms/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from abc import abstractmethod
from collections import OrderedDict

import six

import law
from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
Expand Down Expand Up @@ -318,9 +320,26 @@ def crab_create_job_file_factory(self, **kwargs):
"""
Hook to configure how the underlying job file factory is instantiated and configured.
"""
# get the file factory cls
factory_cls = self.crab_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.crab_job_file_factory_defaults, kwargs)
return self.crab_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "crab_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def crab_job_config(self, config, submit_jobs):
"""
Expand Down
22 changes: 21 additions & 1 deletion law/contrib/glite/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from abc import abstractmethod
from collections import OrderedDict

import six

import law
from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -231,9 +234,26 @@ def glite_job_file_factory_cls(self):
return GLiteJobFileFactory

def glite_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.glite_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.glite_job_file_factory_defaults, kwargs)
return self.glite_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "glite_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def glite_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/htcondor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from collections import OrderedDict

import luigi
import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -298,9 +300,26 @@ def htcondor_job_file_factory_cls(self):
return HTCondorJobFileFactory

def htcondor_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.htcondor_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.htcondor_job_file_factory_defaults, kwargs)
return self.htcondor_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "htcondor_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def htcondor_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/lsf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from collections import OrderedDict

import luigi
import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -216,9 +218,26 @@ def lsf_job_file_factory_cls(self):
return LSFJobFileFactory

def lsf_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.lsf_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.lsf_job_file_factory_defaults, kwargs)
return self.lsf_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "lsf_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def lsf_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/slurm/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from collections import OrderedDict

import luigi
import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -212,9 +214,26 @@ def slurm_job_file_factory_cls(self):
return SlurmJobFileFactory

def slurm_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.slurm_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.slurm_job_file_factory_defaults, kwargs)
return self.slurm_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "slurm_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def slurm_job_config(self, config, job_num, branches):
return config
Expand Down
20 changes: 18 additions & 2 deletions law/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from law.target.remote.base import RemoteTarget
from law.util import (
colored, make_list, make_tuple, iter_chunks, makedirs, create_hash, empty_context,
create_random_string,
)
from law.logger import get_logger

Expand Down Expand Up @@ -774,7 +775,7 @@ def __init__(self, dir=None, render_variables=None, custom_log_file=None, mkdtem
self.cleanup = cleanup

# when dir ist None, a temporary directory is forced
if not dir:
if not dir and not mkdtemp:
mkdtemp = True

# store the directory, default to the job.job_file_dir config
Expand All @@ -786,7 +787,8 @@ def __init__(self, dir=None, render_variables=None, custom_log_file=None, mkdtem

# check if it should be extended by a temporary dir
if mkdtemp:
self.dir = tempfile.mkdtemp(dir=self.dir)
prefix = mkdtemp if isinstance(mkdtemp, six.string_types) else None
self.dir = tempfile.mkdtemp(dir=self.dir, prefix=prefix)

# store attributes
self.render_variables = render_variables or {}
Expand Down Expand Up @@ -963,6 +965,20 @@ def postfix_fn(m):
with open(dst, "w") as f:
f.write(content)

@classmethod
def _expand_template_path(cls, path, variables=None):
# replace more than three X's with random characters
if "XXX" in path:
repl = lambda m: create_random_string(l=len(m.group(1)))
path = re.sub("(X{3,})", repl, path)

# replace variables
if variables:
for key, value in variables.items():
path = cls.render_string(path, key, value)

return path

def provide_input(self, src, postfix=None, dir=None, render_variables=None,
skip_existing=False):
"""
Expand Down