From ad7fe5a4421b7609bbbc8d5bf9c2b211a4684fbb Mon Sep 17 00:00:00 2001 From: Dmitriy Gertsog Date: Fri, 18 Oct 2024 19:05:21 +0300 Subject: [PATCH] tests: fix flaky integration test_log Closes #975 --- test/integration/log/test_log.py | 383 +++++++++-------------- test/integration/running/test_running.py | 42 +-- test/utils.py | 186 ++++++++++- 3 files changed, 326 insertions(+), 285 deletions(-) diff --git a/test/integration/log/test_log.py b/test/integration/log/test_log.py index de39459b9..90c493921 100644 --- a/test/integration/log/test_log.py +++ b/test/integration/log/test_log.py @@ -1,311 +1,212 @@ -import os import shutil -import subprocess import time +from pathlib import Path import pytest -from utils import config_name, wait_for_lines_in_output +from utils import ProcessTextPipe, config_name, pipe_wait_all + + +def log_lines(file: Path, start: int = 0, stop: int = 0) -> None: + with open(file, "w") as f: + f.write("\n".join((f"line {j}" for j in range(start, stop)))) @pytest.fixture(scope="function") -def mock_env_dir(tmp_path): - with open(os.path.join(tmp_path, config_name), 'w') as f: - f.write('env:\n instances_enabled: ie\n') +def mock_env_dir(tmp_path: Path) -> Path: + assert tmp_path.is_dir(), "Error: pytest does not supply a temp directory" + with open(tmp_path / config_name, "w") as f: + f.write("env:\n instances_enabled: ie\n") for app_n in range(2): - app = os.path.join(tmp_path, 'ie', f'app{app_n}') - os.makedirs(app, 0o755) - with open(os.path.join(app, 'instances.yml'), 'w') as f: + app = tmp_path / f"ie/app{app_n}" + app.mkdir(0o755, parents=True) + with open(app / "instances.yml", "w") as f: for i in range(4): - f.write(f'inst{i}:\n') - os.makedirs(os.path.join(app, 'var', 'log', f'inst{i}'), 0o755) + f.write(f"inst{i}:\n") + (app / f"var/log/inst{i}").mkdir(0o755, parents=True) - with open(os.path.join(app, 'init.lua'), 'w') as f: - f.write('') + with open(app / "init.lua", "w") as f: + f.write("") for i in range(3): # Skip log for instance 4. - with open(os.path.join(app, 'var', 'log', f'inst{i}', 'tt.log'), 'w') as f: - f.writelines([f'line {j}\n' for j in range(20)]) + log_lines(app / f"var/log/inst{i}/tt.log", stop=20) return tmp_path def test_log_output_default_run(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) == 0 - output = process.stdout.read() - + expecting_lines = [] for inst_n in range(3): - assert '\n'.join([f'app0:inst{inst_n}: line {i}' for i in range(10, 20)]) in output - assert '\n'.join([f'app1:inst{inst_n}: line {i}' for i in range(10, 20)]) in output + for i in range(10, 20): + expecting_lines.append(f"app0:inst{inst_n}: line {i}") + expecting_lines.append(f"app1:inst{inst_n}: line {i}") + with ProcessTextPipe((tt_cmd, "log"), mock_env_dir) as process: + output = pipe_wait_all(process, expecting_lines) + assert process.wait(2) == 0, "Exit status not a success" - assert 'app0:inst3' not in output - assert 'app1:inst3' not in output + assert "app0:inst3" not in output + assert "app1:inst3" not in output def test_log_limit_lines_count(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-n', '3'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) == 0 - output = process.stdout.read() - + expecting_lines = [] for inst_n in range(3): - assert '\n'.join([f'app0:inst{inst_n}: line {i}' for i in range(17, 20)]) in output - assert '\n'.join([f'app1:inst{inst_n}: line {i}' for i in range(17, 20)]) in output + for i in range(17, 20): + expecting_lines.append(f"app0:inst{inst_n}: line {i}") + expecting_lines.append(f"app1:inst{inst_n}: line {i}") + with ProcessTextPipe((tt_cmd, "log", "-n", "3"), mock_env_dir) as process: + pipe_wait_all(process, expecting_lines) + assert process.wait(2) == 0, "Exit status not a success" def test_log_more_lines(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-n', '300'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) == 0 - output = process.stdout.read() - + expecting_lines = [] for inst_n in range(3): - assert '\n'.join([f'app0:inst{inst_n}: line {i}' for i in range(0, 20)]) in output - assert '\n'.join([f'app1:inst{inst_n}: line {i}' for i in range(0, 20)]) in output - + for i in range(0, 20): + expecting_lines.append(f"app0:inst{inst_n}: line {i}") + expecting_lines.append(f"app1:inst{inst_n}: line {i}") -def test_log_want_zero(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-n', '0'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) + with ProcessTextPipe((tt_cmd, "log", "-n", "300"), mock_env_dir) as process: + pipe_wait_all(process, expecting_lines) + assert process.wait(2) == 0, "Exit status not a success" - assert process.wait(10) == 0 - output = process.stdout.readlines() - assert len(output) == 0 +def test_log_want_zero(tt_cmd, mock_env_dir): + with ProcessTextPipe((tt_cmd, "log", "-n", "0"), mock_env_dir) as process: + returncode, output = process.Run(timeout=2)[0:2] + assert returncode == 0, "Exit status not a success" + assert len(output) == 0 def test_log_specific_instance(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', 'app0:inst1', '-n', '3'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) == 0 - output = process.stdout.read() + with ProcessTextPipe( + (tt_cmd, "log", "app0:inst1", "-n", "3"), mock_env_dir + ) as process: + output = pipe_wait_all( + process, + (f"app0:inst1: line {i}" for i in range(17, 20)), + ) - assert '\n'.join([f'app0:inst1: line {i}' for i in range(17, 20)]) in output - - assert 'app0:inst0' not in output and 'app0:inst2' not in output - assert 'app1' not in output + assert "app0:inst0" not in output + assert "app0:inst2" not in output + assert "app1" not in output + assert process.wait(2) == 0, "Exit status not a success" def test_log_specific_app(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', 'app1'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) == 0 - output = process.stdout.read() - + expecting_lines = [] for inst_n in range(3): - assert '\n'.join([f'app1:inst{inst_n}: line {i}' for i in range(10, 20)]) in output + for i in range(10, 20): + expecting_lines.append(f"app1:inst{inst_n}: line {i}") - assert 'app0' not in output + with ProcessTextPipe((tt_cmd, "log", "app1"), mock_env_dir) as process: + output = pipe_wait_all(process, expecting_lines) + assert "app0" not in output + assert process.wait(2) == 0, "Exit status not a success" def test_log_negative_lines_num(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-n', '-10'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) != 0 - output = process.stdout.read() - - assert 'negative' in output + with ProcessTextPipe((tt_cmd, "log", "-n", "-10"), mock_env_dir) as process: + pipe_wait_all(process, "negative") + assert process.wait(2) != 0, "Exit status should be error code" def test_log_no_app(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', 'no_app'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) != 0 - output = process.stdout.read() - - assert 'can\'t collect instance information for no_app' in output + with ProcessTextPipe((tt_cmd, "log", "no_app"), mock_env_dir) as process: + pipe_wait_all(process, "can't collect instance information for no_app") + assert process.returncode != 0 def test_log_no_inst(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', 'app0:inst4'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - - assert process.wait(10) != 0 - output = process.stdout.read() - - assert 'app0:inst4: instance(s) not found' in output + with ProcessTextPipe((tt_cmd, "log", "app0:inst4"), mock_env_dir) as process: + pipe_wait_all(process, "app0:inst4: instance(s) not found") + assert process.returncode != 0 def test_log_output_default_follow(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-f'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True, - ) - - output = wait_for_lines_in_output(process.stdout, - ['app0:inst0: line 19', 'app1:inst2: line 19', - 'app0:inst1: line 19', 'app1:inst1: line 19']) + with ProcessTextPipe((tt_cmd, "log", "-f"), mock_env_dir) as process: + output = pipe_wait_all( + process, + "app0:inst0: line 19", + "app1:inst2: line 19", + "app0:inst1: line 19", + "app1:inst1: line 19", + ) - with open(os.path.join(mock_env_dir, 'ie', 'app0', 'var', 'log', 'inst0', 'tt.log'), 'w') as f: - f.writelines([f'line {i}\n' for i in range(20, 23)]) + log_lines(mock_env_dir / "ie/app0/var/log/inst0/tt.log", 20, 23) + log_lines(mock_env_dir / "ie/app1/var/log/inst2/tt.log", 20, 23) + output += pipe_wait_all(process, "app1:inst2: line 22", "app0:inst0: line 22") - with open(os.path.join(mock_env_dir, 'ie', 'app1', 'var', 'log', 'inst2', 'tt.log'), 'w') as f: - f.writelines([f'line {i}\n' for i in range(20, 23)]) + for i in range(10, 23): + assert f"app0:inst0: line {i}" in output + assert f"app1:inst2: line {i}" in output - output += wait_for_lines_in_output(process.stdout, - ['app1:inst2: line 22', 'app0:inst0: line 22']) - - process.terminate() - for i in range(10, 23): - assert f'app0:inst0: line {i}' in output - assert f'app1:inst2: line {i}' in output - - for i in range(10, 20): - assert f'app0:inst1: line {i}' in output - assert f'app1:inst1: line {i}' in output + for i in range(10, 20): + assert f"app0:inst1: line {i}" in output + assert f"app1:inst1: line {i}" in output def test_log_output_default_follow_want_zero_last(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-f', '-n', '0'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True, - universal_newlines=True, - bufsize=1 - ) - - time.sleep(1) - - with open(os.path.join(mock_env_dir, 'ie', 'app0', 'var', 'log', 'inst0', 'tt.log'), 'w') as f: - f.writelines([f'line {i}\n' for i in range(20, 23)]) - - with open(os.path.join(mock_env_dir, 'ie', 'app1', 'var', 'log', 'inst2', 'tt.log'), 'w') as f: - f.writelines([f'line {i}\n' for i in range(20, 23)]) - - output = wait_for_lines_in_output(process.stdout, - ['app1:inst2: line 22', 'app0:inst0: line 22']) + with ProcessTextPipe((tt_cmd, "log", "-f", "-n", "0"), mock_env_dir) as process: + time.sleep(1) - process.terminate() - for i in range(20, 23): - assert f'app0:inst0: line {i}' in output - assert f'app1:inst2: line {i}' in output + log_lines(mock_env_dir / "ie/app0/var/log/inst0/tt.log", 20, 23) + log_lines(mock_env_dir / "ie/app1/var/log/inst2/tt.log", 20, 23) - assert 'app0:inst1' not in output - assert 'app0:inst2' not in output - assert 'app1:inst0' not in output + output = pipe_wait_all( + process, + (f"app0:inst0: line {i}" for i in range(20, 23)), + (f"app1:inst2: line {i}" for i in range(20, 23)), + ) + assert "app0:inst1" not in output + assert "app0:inst2" not in output + assert "app1:inst0" not in output -def test_log_dir_removed_after_follow(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-f'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True, - ) - wait_for_lines_in_output(process.stdout, - ['app0:inst0: line 19', 'app1:inst2: line 19', - 'app0:inst1: line 19', 'app1:inst1: line 19']) +def test_log_dir_removed_after_follow(tt_cmd, mock_env_dir: Path): + with ProcessTextPipe((tt_cmd, "log", "-f"), mock_env_dir) as process: + pipe_wait_all( + process, + "app0:inst0: line 19", + "app1:inst2: line 19", + "app0:inst1: line 19", + "app1:inst1: line 19", + ) - var_dir = os.path.join(mock_env_dir, 'ie') - assert os.path.exists(var_dir) - shutil.rmtree(var_dir) + with mock_env_dir / "ie" as dir: + assert dir.exists() + shutil.rmtree(dir) - assert process.wait(2) == 0 - assert "Failed to detect creation of" in process.stdout.read() + pipe_wait_all(process, "Failed to detect creation of", timeout=2) + assert process.wait(2) == 0, "Exit status not a success" # There are two apps in this test: app0 and app1. After removing app0 dirs, # tt log -f is still able to monitor the app1 log files, so there should be no issue. -def test_log_dir_partially_removed_after_follow(tt_cmd, mock_env_dir): - cmd = [tt_cmd, 'log', '-f'] - process = subprocess.Popen( - cmd, - cwd=mock_env_dir, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True, - ) - - wait_for_lines_in_output(process.stdout, - ['app0:inst0: line 19', 'app1:inst2: line 19', - 'app0:inst1: line 19', 'app1:inst1: line 19']) - - # Remove one app log dir. - var_dir = os.path.join(mock_env_dir, 'ie', 'app0', 'var', 'log') - assert os.path.exists(var_dir) - shutil.rmtree(var_dir) - - wait_for_lines_in_output(process.stdout, ['Failed to detect creation of']) - assert process.poll() is None # Still running. - - # Remove app1 log dir. - var_dir = os.path.join(mock_env_dir, 'ie', 'app1') - assert os.path.exists(var_dir) - shutil.rmtree(var_dir) - - assert process.wait(2) == 0 - assert "Failed to detect creation of" in process.stdout.read() +def test_log_dir_partially_removed_after_follow(tt_cmd, mock_env_dir: Path): + with ProcessTextPipe((tt_cmd, "log", "-f"), mock_env_dir) as process: + pipe_wait_all( + process, + "app0:inst0: line 19", + "app1:inst2: line 19", + "app0:inst1: line 19", + "app1:inst1: line 19", + ) + + # Remove one app0 log dir. + with mock_env_dir / "ie/app0/var/log" as dir: + assert dir.exists() + shutil.rmtree(dir) + + pipe_wait_all(process, "Failed to detect creation of") + assert process.is_running # Still running for monitoring app1. + + # Remove app1 log dir. + with mock_env_dir / "ie/app1" as dir: + assert dir.exists() + shutil.rmtree(dir) + + pipe_wait_all(process, "Failed to detect creation of") + assert process.wait(2) == 0, "Exit status not a success" diff --git a/test/integration/running/test_running.py b/test/integration/running/test_running.py index fb067c9e9..2f58d0829 100644 --- a/test/integration/running/test_running.py +++ b/test/integration/running/test_running.py @@ -10,11 +10,12 @@ import yaml from retry import retry -from utils import (config_name, control_socket, extract_status, initial_snap, - initial_xlog, kill_child_process, lib_path, log_file, - log_path, pid_file, run_command_and_get_output, run_path, - wait_file, wait_for_lines_in_output, wait_instance_start, - wait_instance_stop, wait_string_in_file) +from utils import (ProcessTextPipe, config_name, control_socket, + extract_status, initial_snap, initial_xlog, + kill_child_process, lib_path, log_file, log_path, pid_file, + pipe_wait_all, run_command_and_get_output, run_path, + wait_file, wait_instance_start, wait_instance_stop, + wait_string_in_file) def test_running_base_functionality(tt_cmd, tmpdir_with_cfg): @@ -935,35 +936,22 @@ def test_start_interactive(tt_cmd, tmp_path): tmp_path /= "multi_inst_app" shutil.copytree(test_app_path_src, tmp_path) - start_cmd = [tt_cmd, "start", "-i"] - instance_process = subprocess.Popen( - start_cmd, - cwd=tmp_path, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True - ) - try: - wait_for_lines_in_output(instance_process.stdout, [ + with ProcessTextPipe((tt_cmd, "start", "-i"), tmp_path) as instance_process: + pipe_wait_all( + instance_process, "multi_inst_app:router custom init file...", "multi_inst_app:router multi_inst_app:router", "multi_inst_app:master multi_inst_app:master", "multi_inst_app:replica multi_inst_app:replica", "multi_inst_app:stateboard unknown instance", - ]) - + ) instance_process.send_signal(signal.SIGTERM) - - wait_for_lines_in_output(instance_process.stdout, [ + pipe_wait_all( + instance_process, "multi_inst_app:router stopped", "multi_inst_app:master stopped", "multi_inst_app:replica stopped", "multi_inst_app:stateboard stopped", - ]) - - # Make sure no log dir created. - assert not (tmp_path / "var" / "log").exists() - - finally: - run_command_and_get_output([tt_cmd, "stop", "--yes"], cwd=tmp_path) - assert instance_process.wait(5) == 0 + line_timeout=5 + ) + assert instance_process.Stop(tt_cmd, "stop", "--yes", timeout=5) == 0 diff --git a/test/utils.py b/test/utils.py index fd78d3f4e..671affee2 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1,13 +1,19 @@ import ipaddress +import logging import os import re import shutil import socket import subprocess import time +from pathlib import Path +from signal import SIGHUP +from threading import Timer +from types import GeneratorType import netifaces import psutil +import pytest import tarantool import yaml from retry import retry @@ -551,25 +557,171 @@ def wait_string_in_file(file, text): assert found -def wait_for_lines_in_output(stdout, expected_lines: list): - output = '' - retries = 10 - while True: - line = stdout.readline() - if line == '': - if retries == 0: - break - time.sleep(0.2) - retries -= 1 +class ProcessTextPipe(subprocess.Popen): + __cwd: str | Path + + def __init__(self, args: list | tuple, work_dir: Path, **kwargs): + self.__cwd = work_dir + command = tuple(str(a) for a in args) + kwargs["cwd"] = self.__cwd + + # Apply default arguments for Popen + kwargs.setdefault("stderr", subprocess.STDOUT) + kwargs.setdefault("stdout", subprocess.PIPE) + kwargs.setdefault("bufsize", 1) # Number in text lines + + # Force to text mode + kwargs["text"] = True + kwargs["universal_newlines"] = True + + super().__init__(command, **kwargs) + + @property + def is_running(self) -> bool: + return self.poll() is None + + def Run(self, input=None, timeout=None) -> tuple: + """ + Executed until the end of the process. + Returns collected output in the tuple, where: + - [0] is return code of process + - [1] is content of `stdout` + - [2] is content of `stderr` + """ + output = self.communicate(input, timeout) + return (self.returncode, *output) + + def Stop(self, *stop_cmd, timeout=3): + """Ensure process is stopped""" + if stop_cmd: + subprocess.run(_make_string_list(*stop_cmd), cwd=self.__cwd) + code = self.poll() + if code is not None: + return code + self.terminate() + try: + return self.wait(timeout) + except subprocess.TimeoutExpired: + pass + except KeyboardInterrupt: + pass + if self.is_running: + self.kill() + return None + + def __enter__(self): + """Calls at begin of `with`""" + assert self.is_running, "The process was not started" + return super().__enter__() + + def __exit__(self, exc_type, value, traceback): + """Calls at end of `with`""" + self.Stop() + super().__exit__(exc_type, value, traceback) + + def __del__(self): + """Calls at end of variable scope (like destructor)""" + self.Stop() + super().__del__() + + +def _make_string_list(*args) -> list[str]: + result: list[str] = [] + for s in args: + if isinstance(s, str): + result.append(s) + elif isinstance(s, (tuple, list)): + result.extend(_make_string_list(*s)) + elif isinstance(s, (GeneratorType, filter)): + result.extend(_make_string_list(*tuple(s))) else: - retries = 10 - output += line - for expected in expected_lines: - if expected in line: - expected_lines.remove(expected) - break + result.append(str(s)) + return result + + +def pipe_wait_all( + process: subprocess.Popen, + *expected_lines, + pipe: str = "stdout", + timeout=0, + line_timeout=1.0, +) -> str: + """ + Scans `stdout` for matches with expected strings. + Returns the entire collected output. + - expected_lines - search strings + - timeout - allows you to set the total time limit for waiting + - line_timeout - limiting the waiting time for each new line + """ + expected = _make_string_list(*expected_lines) + output = "" + line_counter = 0 + break_timeout = "" + total_timer: Timer | None = None + + process_out = getattr(process, pipe) + + assert process_out is not None, "Wrong process pipe to read stdout" - if len(expected_lines) == 0: + def stop_wait(counter: int | None): + nonlocal break_timeout + if counter is None: + break_timeout = "total timeout expecting all matches" + else: + break_timeout = f"timeout waiting next line #{counter}" + process.send_signal(SIGHUP) + + def set_timer() -> Timer: + nonlocal line_counter + line_counter += 1 + timer = Timer(line_timeout, stop_wait, [line_counter]) + timer.name = "wait next line" + timer.start() + return timer + + def check_expected(line) -> int: + nonlocal expected + for match in expected: + if match in line: + expected.remove(match) break + return len(expected) + + if timeout > 0: + total_timer = Timer(timeout, stop_wait, [None]) + total_timer.name = "total wait stdout" + total_timer.start() + + timer = set_timer() + for line in process_out: + if break_timeout: + break + timer.cancel() + timer = set_timer() + output += line + if check_expected(line) == 0: + break + timer.cancel() + if total_timer: + total_timer.cancel() + + if break_timeout != "": + logging.getLogger(__name__).warning( + f"""{break_timeout} output: +>>>>>>>>>> +{output}<<<<<<<<<< +expected={expected} +""" + ) + pytest.fail(break_timeout) + + if len(expected) > 0: + logging.getLogger(__name__).error( + f"""Not all lines was matched: {expected} in: +>>>>>>>>>> +{output}<<<<<<<<<< +""" + ) + pytest.fail("Not all lines was matched") return output