diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 1eafe4e..e2e3365 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -18,6 +18,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | + pip install -r requirements.txt pip install -r requirements-dev.txt - name: Lint with flake8 run: | @@ -27,7 +28,7 @@ jobs: flake8 . --count --exit-zero --statistics - name: Test with pytest run: | - pytest --cov=./gittrail --cov-report xml --cov-report term-missing gittrail/ + pytest -v --cov=./gittrail --cov-report xml --cov-report term-missing gittrail/ - name: Upload coverage uses: codecov/codecov-action@v2 if: matrix.python-version == 3.9 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c01ab0f..0109b5c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -21,10 +21,11 @@ jobs: python-version: 3.9 - name: Install dependencies run: | + pip install -r requirements.txt pip install -r requirements-dev.txt - name: Test with pytest run: | - pytest --cov=./gittrail --cov-report term-missing gittrail/ + pytest -v --cov=./gittrail --cov-report term-missing gittrail/ - name: Build package run: | python setup.py sdist bdist_wheel diff --git a/.pylintrc b/.pylintrc index 3da0320..51b8e96 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,2 @@ [MESSAGES CONTROL] -disable=C0103,R1711,R0902 +disable=C0103,R1711,R0902,E0401 diff --git a/gittrail/__init__.py b/gittrail/__init__.py index dc16750..0d01df7 100644 --- a/gittrail/__init__.py +++ b/gittrail/__init__.py @@ -4,4 +4,4 @@ from .core import GitTrail from .exceptions import * -__version__ = "0.1.0" +__version__ = "0.1.1" diff --git a/gittrail/core.py b/gittrail/core.py index 8ef07da..49b738b 100644 --- a/gittrail/core.py +++ b/gittrail/core.py @@ -8,6 +8,8 @@ from datetime import datetime from typing import Dict, Optional, Set, Union +import filelock + from . import exceptions, utils _log = logging.getLogger(__file__) @@ -102,6 +104,7 @@ def __init__(self, repo: str, data: str, *, log_level: int = None, store: str = raise FileNotFoundError(f"Data path {self.data} does not exist.") self._dp_trail = self.data / store + self._dp_trail_lock = filelock.FileLock(self._dp_trail / "lock") self._git_history = None self._session_number = None self._session_fp = None @@ -135,67 +138,100 @@ def __enter__(self): # Check the audittrail self._dp_trail.mkdir(exist_ok=True) - self._session_number = _get_session_number(self._dp_trail) - self._session_fp = self._dp_trail / f"{self._session_number:04d}.json" - self._log_file = self._dp_trail / f"{self._session_number:04d}.log" - active_sessions = _get_active_sessions(self._dp_trail) - self._files = _drop_active( - files=utils.hash_all_files(self.data), - drop_logs=active_sessions, - drop_meta=active_sessions, - store=self._dp_trail.name, - ) - self._check_integrity() - - # All checks succceeded. The session can start. - self._attach_log_handler() - self._session_start_utc = utils.now() - _commit_trail( - fp=self._session_fp, - commit_id=self._git_history[0], - start_utc=self._session_start_utc, - end_utc=None, - files=self._files, - ) + with self._dp_trail_lock.acquire(): + self._session_start_utc = utils.now() + self._session_number = _get_session_number(self._dp_trail) + self._session_fp = self._dp_trail / f"{self._session_number:04d}.json" + self._log_file = self._dp_trail / f"{self._session_number:04d}.log" + active_sessions = _get_active_sessions(self._dp_trail) + self._files = _drop_active( + files=utils.hash_all_files(self.data, exclude={self._dp_trail_lock.lock_file}), + drop_logs=active_sessions, + drop_meta=active_sessions, + store=self._dp_trail.name, + ) + self._check_integrity() + + # All checks succceeded. The session can start. + self._attach_log_handler() + _commit_trail( + fp=self._session_fp, + commit_id=self._git_history[0], + start_utc=self._session_start_utc, + end_utc=None, + files=self._files, + ) return self def __exit__(self, *args, **kwargs): - self._session_end_utc = utils.now() - self._detach_log_handler() - active_sessions = _get_active_sessions(self._dp_trail) - self._files = _drop_active( - files=utils.hash_all_files(self.data), - drop_logs=active_sessions - {self._session_number}, - drop_meta=active_sessions, - store=self._dp_trail.name, - ) - _commit_trail( - fp=self._session_fp, - commit_id=self._git_history[0], - start_utc=self._session_start_utc, - end_utc=self._session_end_utc, - files=self._files, - ) + with self._dp_trail_lock.acquire(): + self._detach_log_handler() + self._session_end_utc = utils.now() + active_sessions = _get_active_sessions(self._dp_trail) + self._files = _drop_active( + files=utils.hash_all_files(self.data, exclude={self._dp_trail_lock.lock_file}), + drop_logs=active_sessions - {self._session_number}, + drop_meta=active_sessions, + store=self._dp_trail.name, + ) + _commit_trail( + fp=self._session_fp, + commit_id=self._git_history[0], + start_utc=self._session_start_utc, + end_utc=self._session_end_utc, + files=self._files, + ) return def _check_integrity(self): """Checks that the current contents of the [data] match the audittrail.""" _log.debug("Checking integrity of %s", self.data) - # Read all session files in order to validate that their commit_id is known. - files_before = {} - for s in range(self._session_number): - meta = _meta_read(self._dp_trail / f"{s:04d}.json") + # First read all existing session metadata + metas = [ + (s, _meta_read(self._dp_trail / f"{s:04d}.json")) for s in range(self._session_number) + ] + + # Sort them by the time of the snapshot and collect active vs. completed separately + metas = list(sorted(metas, key=lambda sm: sm[1]["end_utc"] or sm[1]["start_utc"])) + metas_active = [(s, m) for s, m in metas if m["end_utc"] is None] + metas_ended = [(s, m) for s, m in metas if m["end_utc"] is not None] + + # All commit IDs must be in the git history + known_files = {} + for s, meta in metas: cid = meta["commit_id"] if not cid in self._git_history: raise exceptions.UnknownCommitError( f"Audit trail session {s} ran with " f"commit {cid} that's not in the git history." ) - files_before = meta["files"] + known_files.update(meta["files"]) - # Any files missing, compared to the previous session? - missing = {fpr: h for fpr, h in files_before.items() if not fpr in self._files} + # Some files are expected to be missing from the history + expected_extra = set() + for s, meta in metas_active: + # JSON and LOG files of active sessions are subject to change. + expected_extra.add(self._dp_trail / f"{s:04d}.json") + expected_extra.add(self._dp_trail / f"{s:04d}.log") + if metas_ended and metas_ended[-1] == metas[-1]: + # No session began after the last session ended. + s, _ = metas_ended[-1] + expected_extra.add(self._dp_trail / f"{s:04d}.json") + expected_extra = { + str(fp.relative_to(self.data)).replace(os.sep, "/") for fp in expected_extra + } + unexpectedly_known = expected_extra.intersection(known_files) + if unexpectedly_known: + msg = "\n".join(unexpectedly_known) + msg += f"\nKnown: {known_files.keys()}" + msg += f"\nExpected to be missing: {expected_extra}" + raise exceptions.IntegrityError( + f"Some files are, but should not be in the history:\n{msg}" + ) + + # Any files missing, compared to the historically known files? + missing = {fpr: h for fpr, h in known_files.items() if not fpr in self._files} if missing: _log.warning( "Missing %i files compared to the previous session:\n%s", @@ -203,19 +239,39 @@ def _check_integrity(self): "\n".join(missing), ) - # Any new/changed files? (Except the last sessions audittrail file.) - expected_extra = { - self._dp_trail / f"{self._session_number - 1:04d}.json", - } - expected_extra = { - str(fp.relative_to(self.data)).replace(os.sep, "/") for fp in expected_extra + # Any new files? (Not allowed unless other sessions are currently active.) + new = set(self._files) - set(known_files) - expected_extra + if new: + msg = "\n".join(new) + if metas_active: + _log.warning( + "One of the %i currently active sessions added some files:\n%s", + len(metas_active), + msg, + ) + else: + raise exceptions.IntegrityError( + f"Found {len(new)} files that were illegally added:\n{msg}" + ) + + # Any changed files? (Not allowed unless other sessions are currently active.) + changed = { + fp: h + for fp, h in self._files.items() + if fp not in new and fp not in expected_extra and h != known_files.get(fp, h) } - extra = set(self._files) - set(files_before) - set(expected_extra) - if extra: - msg = "\n".join(extra) - raise exceptions.IntegrityError( - f"Found {len(extra)} files that were illegally added/changed:\n{msg}" - ) + if changed: + msg = "\n".join(changed) + if metas_active: + _log.warning( + "One of the %i currently active sessions changed some files:\n%s", + len(metas_active), + msg, + ) + else: + raise exceptions.IntegrityError( + f"Found {len(changed)} files that were illegally changed:\n{msg}" + ) return def _attach_log_handler(self): diff --git a/gittrail/test_core.py b/gittrail/test_core.py index 4c6e39e..6d921ac 100644 --- a/gittrail/test_core.py +++ b/gittrail/test_core.py @@ -160,66 +160,6 @@ def test_log_capture(self, tmpdir, log_level): assert (f"Level {l}." in captured) == (l >= log_level), f"Level {l} is missing" pass - def test_enter_exceptions(self, tmpdir): - repo = test_helpers.create_repo(tmpdir) - data = tmpdir / "data" - data.mkdir() - gt = gittrail.GitTrail(repo, data) - - fp_diff = repo / "new_code.py" - fp_diff.touch() - - with pytest.raises(gittrail.UncleanGitStatusError, match="new_code.py"): - gt.__enter__() - - pass - - def test_check_integrity(self, tmpdir): - repo = test_helpers.create_repo(tmpdir) - data = tmpdir / "data" - data.mkdir() - gt = gittrail.GitTrail(repo, data) - with gt: - logging.info("Session 0") - - # Each historic session must link a commit ID from the git history - gittrail.core._commit_trail( - fp=gt._dp_trail / "0001.json", - commit_id="notarealone", - start_utc=utils.now(), - end_utc=utils.now(), - files=utils.hash_all_files(data), - ) - with pytest.raises(gittrail.UnknownCommitError, match="session 1 ran with"): - with gt: - pass - pass - - def test_multiple_sessions(self, tmpdir, caplog): - repo = test_helpers.create_repo(tmpdir) - data = tmpdir / "data" - data.mkdir() - gt = gittrail.GitTrail(repo, data) - with gt: - logging.info("Session 0") - with gt: - logging.info("Session 1") - # Remove the log file of session 0 to trigger the missing file warning - (gt._dp_trail / "0000.log").unlink() - with caplog.at_level(logging.WARNING): - caplog.clear() - with gt: - logging.info("Session 2") - assert "Missing 1 files" in caplog.records[0].message - assert "0000.log" in caplog.records[0].message - - # Write a new log file of session 0 to trigger the file integrity error - (gt._dp_trail / "0000.log").write_text("Not the real log") - with pytest.raises(gittrail.IntegrityError, match="Found 1 files"): - with gt: - pass - pass - def test_nested_sessions(self, tmpdir): repo = test_helpers.create_repo(tmpdir) data = tmpdir / "data" @@ -261,7 +201,8 @@ def test_interleaved_sessions(self, tmpdir): assert len(sessB._files) == 3 pass - @pytest.mark.xfail(reason="https://github.com/michaelosthege/gittrail/issues/1") + +class TestMultiprocessing: @pytest.mark.parametrize("outfile", ["none", "start", "end"]) def test_multiprocessing_burst(self, tmpdir, outfile): repo = test_helpers.create_repo(tmpdir) @@ -281,13 +222,13 @@ def test_multiprocessing_burst(self, tmpdir, outfile): ) for num in range(nworkers) ] - successes = pool.map(test_helpers.session_worker, args) - assert all(successes) + results = pool.map(test_helpers.session_worker, args) + for succ, msg in results: + assert succ, msg with gittrail.GitTrail(repo, data): logging.info("QC passed.") pass - @pytest.mark.xfail(reason="https://github.com/michaelosthege/gittrail/issues/1") @pytest.mark.parametrize("outfile", ["none", "start", "end"]) def test_multiprocessing_interleaved(self, tmpdir, outfile): repo = test_helpers.create_repo(tmpdir) @@ -303,34 +244,181 @@ def test_multiprocessing_interleaved(self, tmpdir, outfile): dict(**common, worker_number=2, delay=0.2, duration=1), dict(**common, worker_number=3, delay=0, duration=1.3), ] - successes = pool.map(test_helpers.session_worker, args) - assert all(successes) + results = pool.map(test_helpers.session_worker, args) + for succ, msg in results: + assert succ, msg with gittrail.GitTrail(repo, data): logging.info("QC passed.") pass - def test_interleaved_data_edit_fails(self, tmpdir): + +class TestGitIntegrityChecks: + def test_check_commit_in_history(self, tmpdir): + repo = test_helpers.create_repo(tmpdir) + data = tmpdir / "data" + data.mkdir() + gt = gittrail.GitTrail(repo, data) + with gt: + logging.info("Session 0") + + # Each historic session must link a commit ID from the git history + gittrail.core._commit_trail( + fp=gt._dp_trail / "0001.json", + commit_id="notarealone", + start_utc=utils.now(), + end_utc=utils.now(), + files=utils.hash_all_files(data), + ) + with pytest.raises(gittrail.UnknownCommitError, match="session 1 ran with"): + with gt: + pass + pass + + def test_checks_git_status_clean(self, tmpdir): + repo = test_helpers.create_repo(tmpdir) + data = tmpdir / "data" + data.mkdir() + gt = gittrail.GitTrail(repo, data) + + fp_diff = repo / "new_code.py" + fp_diff.touch() + + with pytest.raises(gittrail.UncleanGitStatusError, match="new_code.py"): + gt.__enter__() + + pass + + +class TestDataIntegrityChecks: + def test_missing_file_warning(self, tmpdir, caplog): + repo = test_helpers.create_repo(tmpdir) + data = tmpdir / "data" + data.mkdir() + gt = gittrail.GitTrail(repo, data) + with gt: + logging.info("Session 0") + + # Remove the log file of session 0 to trigger the missing file warning + (gt._dp_trail / "0000.log").unlink() + with caplog.at_level(logging.WARNING): + caplog.clear() + with gt: + logging.info("Session 1") + assert "Missing 1 files" in caplog.records[0].message + assert "0000.log" in caplog.records[0].message + pass + + def test_change_out_of_session_error(self, tmpdir): + repo = test_helpers.create_repo(tmpdir) + data = tmpdir / "data" + data.mkdir() + + with gittrail.GitTrail(repo, data) as gt: + pass + + gt._log_file.write_text("This is not allowed.") + + with pytest.raises(gittrail.IntegrityError, match="illegally changed"): + with gittrail.GitTrail(repo, data): + pass + pass + + def test_change_in_session_warning(self, tmpdir, caplog): """When session A creates a file before session B starts, but session A continues to change the file after session B ended, - the integrity breaks. This is because session B hashed the file - in its original form already. - The only exception to this are the log files of active sessions. - - Users should avoid making consecutive changes to a file in the data - directory, but instead use a temporary working directory and move - the file when it's complete. + the session B hashed an intermediate state. + This is okay, because A is the session that last _ended_. """ repo = test_helpers.create_repo(tmpdir) data = tmpdir / "data" data.mkdir() - common = dict(repo=repo, data=data) - with multiprocessing.Pool(2) as pool: - args = [ - dict(**common, outfile="start+end", delay=0, duration=0.5, worker_number=1), - dict(**common, delay=0.25, duration=0.5, worker_number=2), - ] - first, second = pool.map(test_helpers.session_worker, args) - assert first - assert not second + with gittrail.GitTrail(repo, data) as gtA: + fp = gtA.data / "file.txt" + fp.touch() + with gittrail.GitTrail(repo, data) as gtB: + assert "file.txt" in gtB._files + # B recorded a hash + fp.write_text("But now it's different!") + + # A third session should detect that the history from B no longer applies. + with caplog.at_level(logging.WARNING): + caplog.clear() + with gittrail.GitTrail(repo, data): + # This is fine. + pass + assert "1 currently active sessions changed" in caplog.records[0].message + assert "file.txt" in caplog.records[0].message + + with caplog.at_level(logging.WARNING): + caplog.clear() + with gittrail.GitTrail(repo, data): + pass + assert not caplog.records + pass + + def test_addition_in_session_warning(self, tmpdir, caplog): + repo = test_helpers.create_repo(tmpdir) + data = tmpdir / "data" + data.mkdir() + + with gittrail.GitTrail(repo, data) as gtA: + with gittrail.GitTrail(repo, data) as gtB: + pass + assert set(gtB._files) == {"gittrail/0001.log"} + + # B recorded only its own logfile. + # A is still active and can still create some. + fp = gtA.data / "file.txt" + fp.touch() + + # A third session should detect the new file. + with caplog.at_level(logging.WARNING): + caplog.clear() + with gittrail.GitTrail(repo, data): + # This is fine. + pass + assert "1 currently active sessions added" in caplog.records[0].message + assert "file.txt" in caplog.records[0].message + + with caplog.at_level(logging.WARNING): + caplog.clear() + with gittrail.GitTrail(repo, data): + pass + assert not caplog.records + pass + + def test_addition_out_of_session_error(self, tmpdir): + repo = test_helpers.create_repo(tmpdir) + data = tmpdir / "data" + data.mkdir() + + with gittrail.GitTrail(repo, data) as gt: + pass + + (gt.data / "file.txt").touch() + + with pytest.raises(gittrail.IntegrityError, match="illegally added"): + with gittrail.GitTrail(repo, data): + pass + pass + + def test_unexpectedly_known_exception(self, tmpdir): + repo = test_helpers.create_repo(tmpdir) + data = tmpdir / "data" + data.mkdir() + + with gittrail.GitTrail(repo, data) as gt: + pass + # Manually mark the session as active, without removing the logfile + # from the session JSON. + with open(gt._session_fp) as jf: + meta = json.load(jf) + meta["end_utc"] = None + with open(gt._session_fp, "w") as jf: + json.dump(meta, jf, indent=4) + + with pytest.raises(gittrail.IntegrityError, match="should not be in the history"): + with gt: + pass pass diff --git a/gittrail/test_helpers.py b/gittrail/test_helpers.py index 9588b4b..ab11407 100644 --- a/gittrail/test_helpers.py +++ b/gittrail/test_helpers.py @@ -2,6 +2,8 @@ import pathlib import subprocess import time +import traceback +from datetime import datetime import gittrail @@ -27,22 +29,27 @@ def session_worker(kwargs): assert outfile in {"none", "start", "end", "start+end"} delay = kwargs.get("delay", 0) duration = kwargs.get("duration", 0) + success = False + message = "" try: if delay: logging.info("Worker number %i delaying.", num) time.sleep(delay) logging.info("Worker number %i starting.", num) - with gittrail.GitTrail(repo, data) as gt: + with gittrail.GitTrail(repo, data, log_level=logging.DEBUG) as gt: fp_outfile = gt.data / f"worker_{num}.data" logging.info("Worker number %i started.", num) if "start" in outfile: - fp_outfile.open("a").write(f"Made by worker {num} (start).") + fp_outfile.open("a").write(f"Made by worker {num} ({datetime.now()}).\n") time.sleep(duration) if "end" in outfile: - fp_outfile.open("a").write(f"Made by worker {num} (end).") + fp_outfile.open("a").write(f"Made by worker {num} ({datetime.now()}).\n") logging.info("Worker number %i exiting.", num) logging.info("Worker number %i exited.", num) - return True + success = True except Exception as ex: - logging.error("Worker %i failed.", num, exc_info=ex) - return False + success = False + message = f"Worker {num} failed.\n" + logging.error(message, exc_info=ex) + message += "".join(traceback.format_exception(None, ex, ex.__traceback__)) + return success, message diff --git a/gittrail/utils.py b/gittrail/utils.py index b606cdd..4809c3e 100644 --- a/gittrail/utils.py +++ b/gittrail/utils.py @@ -7,7 +7,7 @@ import os import pathlib import subprocess -from typing import Dict, Tuple, Union +from typing import Dict, Optional, Set, Tuple, Union PathLike = Union[str, pathlib.Path] @@ -24,14 +24,28 @@ def hash_file(fp: PathLike) -> str: return file_hash.hexdigest() -def hash_all_files(dp: str) -> Dict[str, str]: - """Hashes all files in [dp], returning a dict keyed by the relative filepaths.""" +def hash_all_files( + dp: str, exclude: Optional[Set[Union[str, pathlib.Path]]] = None +) -> Dict[str, str]: + """Hashes all files in [dp], returning a dict keyed by the relative filepaths. + + Parameters + ---------- + dp : path-like + Target directory. + exclude : set of str + File paths to ``dp`` to exclude from the hashing. + """ + exclude = { + str(pathlib.Path(fp).relative_to(dp)).replace(os.sep, "/") for fp in exclude or set() + } files = tuple(pathlib.Path(dp).glob("**/*")) hashes = {} for fp in files: if fp.is_file(): fpr = str(fp.relative_to(dp)).replace(os.sep, "/") - hashes[fpr] = hash_file(fp) + if fpr not in exclude: + hashes[fpr] = hash_file(fp) return hashes diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..83c2e35 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +filelock diff --git a/setup.py b/setup.py index d8df751..cf115e0 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,6 @@ def get_version(): "License :: OSI Approved :: GNU Affero General Public License v3", "Intended Audience :: Science/Research", ], - install_requires=[], + install_requires=[open(pathlib.Path(ROOT, "requirements.txt")).readlines()], python_requires=">=3.6", )