Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend LXCFS integration #1072

Merged
merged 6 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions benchexec/baseexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ def _start_execution(
@param child_setup_fn a function without parameters that is called in the child process
before the tool is started
@param parent_cleanup_fn a function that is called in the parent process
immediately after the tool terminated, with three parameters:
immediately after the tool terminated, with four parameters:
the result of parent_setup_fn, the result of the executed process as ProcessExitCode,
and the base path for looking up files as parameter values
the base path for looking up files as parameter values,
and the cgroup of the tool
@return: a tuple of PID of process and a blocking function, which waits for the process
and a triple of the exit code and the resource usage of the process
and the result of parent_cleanup_fn (do not use os.wait)
Expand Down Expand Up @@ -125,11 +126,11 @@ def wait_and_get_result():
exitcode, ru_child = self._wait_for_process(p.pid, args[0])

parent_cleanup = parent_cleanup_fn(
parent_setup, util.ProcessExitCode.from_raw(exitcode), ""
parent_setup, util.ProcessExitCode.from_raw(exitcode), "", cgroups
)
return exitcode, ru_child, parent_cleanup

return p.pid, wait_and_get_result
return p.pid, cgroups, wait_and_get_result

def _wait_for_process(self, pid, name):
"""Wait for the given process to terminate.
Expand Down
4 changes: 4 additions & 0 deletions benchexec/cgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ def handle_errors(self, critical_cgroups):
def create_fresh_child_cgroup(self, subsystems):
pass

@abstractmethod
def create_fresh_child_cgroup_for_delegation(self):
pass

@abstractmethod
def add_task(self, pid):
pass
Expand Down
16 changes: 14 additions & 2 deletions benchexec/cgroupsv1.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def get_group_name(gid):
else:
sys.exit(_ERROR_MSG_OTHER) # e.g., subsystem not mounted

def create_fresh_child_cgroup(self, subsystems):
def create_fresh_child_cgroup(self, subsystems, prefix=CGROUP_NAME_PREFIX):
"""
Create child cgroups of the current cgroup for at least the given subsystems.
@return: A Cgroup instance representing the new child cgroup(s).
Expand All @@ -374,7 +374,7 @@ def create_fresh_child_cgroup(self, subsystems):
]
continue

cgroup = tempfile.mkdtemp(prefix=CGROUP_NAME_PREFIX, dir=parentCgroup)
cgroup = tempfile.mkdtemp(prefix=prefix, dir=parentCgroup)
createdCgroupsPerSubsystem[subsystem] = cgroup
createdCgroupsPerParent[parentCgroup] = cgroup

Expand All @@ -395,6 +395,18 @@ def copy_parent_to_child(name):

return CgroupsV1(createdCgroupsPerSubsystem)

def create_fresh_child_cgroup_for_delegation(self, prefix="delegate_"):
"""
Create a child cgroup with all controllers.
On cgroupsv1 there is no difference to a regular child cgroup.
"""
child_cgroup = self.create_fresh_child_cgroup(self.subsystems.keys(), prefix)
assert (
self.subsystems.keys() == child_cgroup.subsystems.keys()
), "delegation failed for at least one controller"

return child_cgroup

def add_task(self, pid):
"""
Add a process to the cgroups represented by this instance.
Expand Down
13 changes: 12 additions & 1 deletion benchexec/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@
DIR_MODES = [DIR_HIDDEN, DIR_READ_ONLY, DIR_OVERLAY, DIR_FULL_ACCESS]
"""modes how a directory can be mounted in the container"""

LXCFS_PROC_DIR = b"/var/lib/lxcfs/proc"
LXCFS_BASE_DIR = b"/var/lib/lxcfs"
LXCFS_PROC_DIR = LXCFS_BASE_DIR + b"/proc"
SYS_CPU_DIR = b"/sys/devices/system/cpu"

_CLONE_NESTED_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int)
"""Type for callback of execute_in_namespace, nested in our primary callback."""
Expand Down Expand Up @@ -968,6 +970,15 @@ def setup_container_system_config(basedir, mountdir, dir_modes):
{"h": CONTAINER_HOME, "p": os.path.dirname(CONTAINER_HOME)},
)

# Virtualize CPU info with LXCFS if directory is not hidden nor full-access
if (
os.access(LXCFS_BASE_DIR + SYS_CPU_DIR, os.R_OK)
and determine_directory_mode(dir_modes, SYS_CPU_DIR) == DIR_READ_ONLY
):
make_bind_mount(
LXCFS_BASE_DIR + SYS_CPU_DIR, mountdir + SYS_CPU_DIR, private=True
)


def is_container_system_config_file(file):
"""Determine whether a given file is one of the files created by
Expand Down
43 changes: 28 additions & 15 deletions benchexec/containerexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ def is_accessible(path):
# container, but we do not want a warning per run.
if not is_accessible(container.LXCFS_PROC_DIR):
logging.info(
"LXCFS is not available,"
" some host information like the uptime leaks into the container."
"LXCFS is not available, some host information like the uptime"
" and the total number of CPU cores leaks into the container."
)

if not NATIVE_CLONE_CALLBACK_SUPPORTED:
Expand Down Expand Up @@ -457,13 +457,14 @@ def execute_run(
cgroups = self._cgroups.create_fresh_child_cgroup(
self._cgroups.subsystems.keys()
)
pid = None
tool_pid = None
tool_cgroups = None
returnvalue = 0

logging.debug("Starting process.")

try:
pid, result_fn = self._start_execution(
tool_pid, tool_cgroups, result_fn = self._start_execution(
args=args,
stdin=None,
stdout=None,
Expand All @@ -481,7 +482,7 @@ def execute_run(
)

with self.SUB_PROCESS_PIDS_LOCK:
self.SUB_PROCESS_PIDS.add(pid)
self.SUB_PROCESS_PIDS.add(tool_pid)

# wait until process has terminated
returnvalue, unused_ru_child, unused = result_fn()
Expand All @@ -491,7 +492,7 @@ def execute_run(
logging.debug("Process terminated, exit code %s.", returnvalue)

with self.SUB_PROCESS_PIDS_LOCK:
self.SUB_PROCESS_PIDS.discard(pid)
self.SUB_PROCESS_PIDS.discard(tool_pid)

if temp_dir is not None:
logging.debug("Cleaning up temporary directory.")
Expand Down Expand Up @@ -969,15 +970,24 @@ def check_child_exit_code():
child_pid,
)

# cgroups is the cgroups where we configure limits.
# So for isolation, we need to create a child cgroup that becomes the root
# of the cgroup ns, such that the limit settings are not accessible in the
# container and cannot be changed.
if use_cgroup_ns:
cgroups = cgroups.create_fresh_child_cgroup_for_delegation()
# cgroups is the cgroup where we configure limits.
# We add another layer of cgroups below it, for two reasons:
# - We want to move our child process (the init process of the container)
# into a cgroups where the limits apply because LXCFS reports the limits
# of the init process of the container.
# - On cgroupsv2 we want to move the grandchild process (the started tool)
# into a cgroup that becomes the root of the cgroup ns,
# such that no other cgroup (in particular the one with the limits)
# is accessible in the container and the limits cannot be changed
# from within the container.
child_cgroup = cgroups.create_fresh_child_cgroup(
cgroups.subsystems.keys(), prefix="init_"
)
child_cgroup.add_task(child_pid)
grandchild_cgroups = cgroups.create_fresh_child_cgroup_for_delegation()

# start measurements
cgroups.add_task(grandchild_pid)
grandchild_cgroups.add_task(grandchild_pid)
parent_setup = parent_setup_fn()

# Signal grandchild that setup is finished
Expand Down Expand Up @@ -1015,7 +1025,10 @@ def wait_for_grandchild():

base_path = f"/proc/{child_pid}/root"
parent_cleanup = parent_cleanup_fn(
parent_setup, util.ProcessExitCode.from_raw(exitcode), base_path
parent_setup,
util.ProcessExitCode.from_raw(exitcode),
base_path,
grandchild_cgroups,
)

if result_files_patterns:
Expand All @@ -1032,7 +1045,7 @@ def wait_for_grandchild():

return exitcode, ru_child, parent_cleanup

return grandchild_pid, wait_for_grandchild
return grandchild_pid, grandchild_cgroups, wait_for_grandchild

def _setup_container_filesystem(self, temp_dir, output_dir, memlimit, memory_nodes):
"""Setup the filesystem layout in the container.
Expand Down
35 changes: 23 additions & 12 deletions benchexec/runexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ def preParent():
walltime_before = time.monotonic()
return starttime, walltime_before

def postParent(preParent_result, exit_code, base_path):
def postParent(preParent_result, exit_code, base_path, tool_cgroups):
"""Cleanup that is executed in the parent process immediately after the actual tool terminated."""
# finish measurements
starttime, walltime_before = preParent_result
Expand All @@ -846,8 +846,8 @@ def postParent(preParent_result, exit_code, base_path):
# process existed, and killing via cgroups prevents this.
# But if we do not have freezer, it is safer to just let all processes run
# until the container is killed.
if cgroups.FREEZE in cgroups:
cgroups.kill_all_tasks()
if tool_cgroups.FREEZE in tool_cgroups:
tool_cgroups.kill_all_tasks()

# For a similar reason, we cancel all limits. Otherwise a run could have
# terminationreason=walltime because copying output files took a long time.
Expand Down Expand Up @@ -882,7 +882,8 @@ def preSubprocess():
error_filename, args, write_header=write_header
)

pid = None
tool_pid = None
tool_cgroups = None
returnvalue = 0
ru_child = None
self._termination_reason = None
Expand All @@ -894,7 +895,7 @@ def preSubprocess():
logging.debug("Starting process.")

try:
pid, result_fn = self._start_execution(
tool_pid, tool_cgroups, result_fn = self._start_execution(
args=args,
stdin=stdin,
stdout=outputFile,
Expand All @@ -912,14 +913,21 @@ def preSubprocess():
)

with self.SUB_PROCESS_PIDS_LOCK:
self.SUB_PROCESS_PIDS.add(pid)
self.SUB_PROCESS_PIDS.add(tool_pid)

timelimitThread = self._setup_cgroup_time_limit(
hardtimelimit, softtimelimit, walltimelimit, cgroups, cores, pid
hardtimelimit,
softtimelimit,
walltimelimit,
tool_cgroups,
cores,
tool_pid,
)
oomThread = self._setup_cgroup_memory_limit_thread(
memlimit, tool_cgroups, tool_pid
)
oomThread = self._setup_cgroup_memory_limit_thread(memlimit, cgroups, pid)
file_hierarchy_limit_thread = self._setup_file_hierarchy_limit(
files_count_limit, files_size_limit, temp_dir, cgroups, pid
files_count_limit, files_size_limit, temp_dir, tool_cgroups, tool_pid
)

# wait until process has terminated
Expand All @@ -932,7 +940,7 @@ def preSubprocess():
logging.debug("Process terminated, exit code %s.", returnvalue)

with self.SUB_PROCESS_PIDS_LOCK:
self.SUB_PROCESS_PIDS.discard(pid)
self.SUB_PROCESS_PIDS.discard(tool_pid)

if timelimitThread:
timelimitThread.cancel()
Expand All @@ -945,16 +953,19 @@ def preSubprocess():

# Make sure to kill all processes if there are still some
# (needs to come early to avoid accumulating more CPU time)
cgroups.kill_all_tasks()
if tool_cgroups:
tool_cgroups.kill_all_tasks()

# normally subprocess closes file, we do this again after all tasks terminated
outputFile.close()
if errorFile is not outputFile:
errorFile.close()

# measurements are not relevant in case of failure, but need to come before cgroup cleanup
self._get_cgroup_measurements(cgroups, ru_child, result)
if tool_cgroups:
self._get_cgroup_measurements(tool_cgroups, ru_child, result)
logging.debug("Cleaning up cgroups.")
cgroups.kill_all_tasks() # currently necessary for removing child cgroups
cgroups.remove()

self._cleanup_temp_dir(temp_dir)
Expand Down
23 changes: 23 additions & 0 deletions benchexec/test_runexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def setUpClass(cls):
cls.echo = shutil.which("echo") or "/bin/echo"
cls.sleep = shutil.which("sleep") or "/bin/sleep"
cls.cat = shutil.which("cat") or "/bin/cat"
cls.grep = shutil.which("grep") or "/bin/grep"

def setUp(self, *args, **kwargs):
with self.skip_if_logs(
Expand Down Expand Up @@ -1151,6 +1152,28 @@ def test_path_with_space(self):
finally:
shutil.rmtree(temp_dir)

def test_cpuinfo_with_lxcfs(self):
if not os.path.exists("/var/lib/lxcfs/proc"):
self.skipTest("missing lxcfs")
result, output = self.execute_run(
self.grep, "^processor", "/proc/cpuinfo", cores=[0]
)
self.check_result_keys(result)
self.check_exitcode(result, 0, "exit code for reading cpuinfo is not zero")
cpus = [int(line.split()[2]) for line in output if line.startswith("processor")]
self.assertListEqual(cpus, [0], "Unexpected CPU cores visible in container")

def test_sys_cpu_with_lxcfs(self):
if not os.path.exists("/var/lib/lxcfs/proc"):
self.skipTest("missing lxcfs")
result, output = self.execute_run(
self.cat, "/sys/devices/system/cpu/online", cores=[0]
)
self.check_result_keys(result)
self.check_exitcode(result, 0, "exit code for reading online CPUs is not zero")
cpus = util.parse_int_list(output[-1])
self.assertListEqual(cpus, [0], "Unexpected CPU cores online in container")

def test_uptime_with_lxcfs(self):
if not os.path.exists("/var/lib/lxcfs/proc"):
self.skipTest("missing lxcfs")
Expand Down