Skip to content

Commit

Permalink
Merge pull request #185 from riga/feature/readable_job_tmp_dirs
Browse files Browse the repository at this point in the history
Allow job_file_dir_mkdtmp to be path patterns.
  • Loading branch information
riga authored Jul 18, 2024
2 parents be9461c + 36dc0e2 commit 4a05454
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 12 deletions.
11 changes: 7 additions & 4 deletions law.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,13 @@
; Default: "$LAW_JOB_FILE_DIR", "tempfile.gettempdir()"

; job_file_dir_mkdtemp
; Description: A boolean flag that decides if a temporary directory inside job_file_dir should be
; created in which the actual job files are stored. It is recommended to set this option to "True"
; in order to have a clear separation between files created by different job file factories.
; Type: boolean
; Description: A boolean flag or string that decides if a temporary directory inside job_file_dir
; should be created in which the actual job files are stored. It is discouraged to set this option
; to "False" in order to have a clear separation between files created by different job file
; factories. When a string is used it serves as a prefix for name of the temporary directory. More
; than three X's will be replaced with random characters and template variables "{{task_id}}" and
; "{{task_family}}" will be expanded with task values.
; Type: boolean, string
; Default: True

; job_file_dir_cleanup
Expand Down
22 changes: 21 additions & 1 deletion law/contrib/arc/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from abc import abstractmethod
from collections import OrderedDict

import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -211,9 +214,26 @@ def arc_job_file_factory_cls(self):
return ARCJobFileFactory

def arc_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.arc_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.arc_job_file_factory_defaults, kwargs)
return self.arc_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "arc_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def arc_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/cms/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from abc import abstractmethod
from collections import OrderedDict

import six

import law
from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
Expand Down Expand Up @@ -318,9 +320,26 @@ def crab_create_job_file_factory(self, **kwargs):
"""
Hook to configure how the underlying job file factory is instantiated and configured.
"""
# get the file factory cls
factory_cls = self.crab_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.crab_job_file_factory_defaults, kwargs)
return self.crab_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "crab_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def crab_job_config(self, config, submit_jobs):
"""
Expand Down
22 changes: 21 additions & 1 deletion law/contrib/glite/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from abc import abstractmethod
from collections import OrderedDict

import six

import law
from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -231,9 +234,26 @@ def glite_job_file_factory_cls(self):
return GLiteJobFileFactory

def glite_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.glite_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.glite_job_file_factory_defaults, kwargs)
return self.glite_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "glite_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def glite_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/htcondor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from collections import OrderedDict

import luigi
import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -298,9 +300,26 @@ def htcondor_job_file_factory_cls(self):
return HTCondorJobFileFactory

def htcondor_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.htcondor_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.htcondor_job_file_factory_defaults, kwargs)
return self.htcondor_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "htcondor_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def htcondor_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/lsf/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from collections import OrderedDict

import luigi
import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile, DeprecatedInputFiles
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -216,9 +218,26 @@ def lsf_job_file_factory_cls(self):
return LSFJobFileFactory

def lsf_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.lsf_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.lsf_job_file_factory_defaults, kwargs)
return self.lsf_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "lsf_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def lsf_job_config(self, config, job_num, branches):
return config
Expand Down
21 changes: 20 additions & 1 deletion law/contrib/slurm/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from collections import OrderedDict

import luigi
import six

from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy
from law.job.base import JobArguments, JobInputFile
from law.task.proxy import ProxyCommand
Expand Down Expand Up @@ -212,9 +214,26 @@ def slurm_job_file_factory_cls(self):
return SlurmJobFileFactory

def slurm_create_job_file_factory(self, **kwargs):
# get the file factory cls
factory_cls = self.slurm_job_file_factory_cls()

# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.slurm_job_file_factory_defaults, kwargs)
return self.slurm_job_file_factory_cls()(**kwargs)

# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "slurm_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, six.string_types) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)

return factory_cls(**kwargs)

def slurm_job_config(self, config, job_num, branches):
return config
Expand Down
20 changes: 18 additions & 2 deletions law/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from law.target.remote.base import RemoteTarget
from law.util import (
colored, make_list, make_tuple, iter_chunks, makedirs, create_hash, empty_context,
create_random_string,
)
from law.logger import get_logger

Expand Down Expand Up @@ -774,7 +775,7 @@ def __init__(self, dir=None, render_variables=None, custom_log_file=None, mkdtem
self.cleanup = cleanup

# when dir ist None, a temporary directory is forced
if not dir:
if not dir and not mkdtemp:
mkdtemp = True

# store the directory, default to the job.job_file_dir config
Expand All @@ -786,7 +787,8 @@ def __init__(self, dir=None, render_variables=None, custom_log_file=None, mkdtem

# check if it should be extended by a temporary dir
if mkdtemp:
self.dir = tempfile.mkdtemp(dir=self.dir)
prefix = mkdtemp if isinstance(mkdtemp, six.string_types) else None
self.dir = tempfile.mkdtemp(dir=self.dir, prefix=prefix)

# store attributes
self.render_variables = render_variables or {}
Expand Down Expand Up @@ -963,6 +965,20 @@ def postfix_fn(m):
with open(dst, "w") as f:
f.write(content)

@classmethod
def _expand_template_path(cls, path, variables=None):
# replace more than three X's with random characters
if "XXX" in path:
repl = lambda m: create_random_string(l=len(m.group(1)))
path = re.sub("(X{3,})", repl, path)

# replace variables
if variables:
for key, value in variables.items():
path = cls.render_string(path, key, value)

return path

def provide_input(self, src, postfix=None, dir=None, render_variables=None,
skip_existing=False):
"""
Expand Down

0 comments on commit 4a05454

Please sign in to comment.