Skip to content

Commit

Permalink
Use file lock to deal with high parallelism
Browse files Browse the repository at this point in the history
Closes #1
  • Loading branch information
michaelosthege committed Dec 29, 2021
1 parent 2f47c0e commit 425c693
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 155 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Lint with flake8
run: |
Expand All @@ -27,7 +28,7 @@ jobs:
flake8 . --count --exit-zero --statistics
- name: Test with pytest
run: |
pytest --cov=./gittrail --cov-report xml --cov-report term-missing gittrail/
pytest -v --cov=./gittrail --cov-report xml --cov-report term-missing gittrail/
- name: Upload coverage
uses: codecov/codecov-action@v2
if: matrix.python-version == 3.9
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ jobs:
python-version: 3.9
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Test with pytest
run: |
pytest --cov=./gittrail --cov-report term-missing gittrail/
pytest -v --cov=./gittrail --cov-report term-missing gittrail/
- name: Build package
run: |
python setup.py sdist bdist_wheel
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[MESSAGES CONTROL]
disable=C0103,R1711,R0902
disable=C0103,R1711,R0902,E0401
2 changes: 1 addition & 1 deletion gittrail/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from .core import GitTrail
from .exceptions import *

__version__ = "0.1.0"
__version__ = "0.1.1"
170 changes: 113 additions & 57 deletions gittrail/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from datetime import datetime
from typing import Dict, Optional, Set, Union

import filelock

from . import exceptions, utils

_log = logging.getLogger(__file__)
Expand Down Expand Up @@ -102,6 +104,7 @@ def __init__(self, repo: str, data: str, *, log_level: int = None, store: str =
raise FileNotFoundError(f"Data path {self.data} does not exist.")

self._dp_trail = self.data / store
self._dp_trail_lock = filelock.FileLock(self._dp_trail / "lock")
self._git_history = None
self._session_number = None
self._session_fp = None
Expand Down Expand Up @@ -135,87 +138,140 @@ def __enter__(self):

# Check the audittrail
self._dp_trail.mkdir(exist_ok=True)
self._session_number = _get_session_number(self._dp_trail)
self._session_fp = self._dp_trail / f"{self._session_number:04d}.json"
self._log_file = self._dp_trail / f"{self._session_number:04d}.log"
active_sessions = _get_active_sessions(self._dp_trail)
self._files = _drop_active(
files=utils.hash_all_files(self.data),
drop_logs=active_sessions,
drop_meta=active_sessions,
store=self._dp_trail.name,
)
self._check_integrity()

# All checks succceeded. The session can start.
self._attach_log_handler()
self._session_start_utc = utils.now()
_commit_trail(
fp=self._session_fp,
commit_id=self._git_history[0],
start_utc=self._session_start_utc,
end_utc=None,
files=self._files,
)
with self._dp_trail_lock.acquire():
self._session_start_utc = utils.now()
self._session_number = _get_session_number(self._dp_trail)
self._session_fp = self._dp_trail / f"{self._session_number:04d}.json"
self._log_file = self._dp_trail / f"{self._session_number:04d}.log"
active_sessions = _get_active_sessions(self._dp_trail)
self._files = _drop_active(
files=utils.hash_all_files(self.data, exclude={self._dp_trail_lock.lock_file}),
drop_logs=active_sessions,
drop_meta=active_sessions,
store=self._dp_trail.name,
)
self._check_integrity()

# All checks succceeded. The session can start.
self._attach_log_handler()
_commit_trail(
fp=self._session_fp,
commit_id=self._git_history[0],
start_utc=self._session_start_utc,
end_utc=None,
files=self._files,
)
return self

def __exit__(self, *args, **kwargs):
self._session_end_utc = utils.now()
self._detach_log_handler()
active_sessions = _get_active_sessions(self._dp_trail)
self._files = _drop_active(
files=utils.hash_all_files(self.data),
drop_logs=active_sessions - {self._session_number},
drop_meta=active_sessions,
store=self._dp_trail.name,
)
_commit_trail(
fp=self._session_fp,
commit_id=self._git_history[0],
start_utc=self._session_start_utc,
end_utc=self._session_end_utc,
files=self._files,
)
with self._dp_trail_lock.acquire():
self._detach_log_handler()
self._session_end_utc = utils.now()
active_sessions = _get_active_sessions(self._dp_trail)
self._files = _drop_active(
files=utils.hash_all_files(self.data, exclude={self._dp_trail_lock.lock_file}),
drop_logs=active_sessions - {self._session_number},
drop_meta=active_sessions,
store=self._dp_trail.name,
)
_commit_trail(
fp=self._session_fp,
commit_id=self._git_history[0],
start_utc=self._session_start_utc,
end_utc=self._session_end_utc,
files=self._files,
)
return

def _check_integrity(self):
"""Checks that the current contents of the [data] match the audittrail."""
_log.debug("Checking integrity of %s", self.data)

# Read all session files in order to validate that their commit_id is known.
files_before = {}
for s in range(self._session_number):
meta = _meta_read(self._dp_trail / f"{s:04d}.json")
# First read all existing session metadata
metas = [
(s, _meta_read(self._dp_trail / f"{s:04d}.json")) for s in range(self._session_number)
]

# Sort them by the time of the snapshot and collect active vs. completed separately
metas = list(sorted(metas, key=lambda sm: sm[1]["end_utc"] or sm[1]["start_utc"]))
metas_active = [(s, m) for s, m in metas if m["end_utc"] is None]
metas_ended = [(s, m) for s, m in metas if m["end_utc"] is not None]

# All commit IDs must be in the git history
known_files = {}
for s, meta in metas:
cid = meta["commit_id"]
if not cid in self._git_history:
raise exceptions.UnknownCommitError(
f"Audit trail session {s} ran with "
f"commit {cid} that's not in the git history."
)
files_before = meta["files"]
known_files.update(meta["files"])

# Any files missing, compared to the previous session?
missing = {fpr: h for fpr, h in files_before.items() if not fpr in self._files}
# Some files are expected to be missing from the history
expected_extra = set()
for s, meta in metas_active:
# JSON and LOG files of active sessions are subject to change.
expected_extra.add(self._dp_trail / f"{s:04d}.json")
expected_extra.add(self._dp_trail / f"{s:04d}.log")
if metas_ended and metas_ended[-1] == metas[-1]:
# No session began after the last session ended.
s, _ = metas_ended[-1]
expected_extra.add(self._dp_trail / f"{s:04d}.json")
expected_extra = {
str(fp.relative_to(self.data)).replace(os.sep, "/") for fp in expected_extra
}
unexpectedly_known = expected_extra.intersection(known_files)
if unexpectedly_known:
msg = "\n".join(unexpectedly_known)
msg += f"\nKnown: {known_files.keys()}"
msg += f"\nExpected to be missing: {expected_extra}"
raise exceptions.IntegrityError(
f"Some files are, but should not be in the history:\n{msg}"
)

# Any files missing, compared to the historically known files?
missing = {fpr: h for fpr, h in known_files.items() if not fpr in self._files}
if missing:
_log.warning(
"Missing %i files compared to the previous session:\n%s",
len(missing),
"\n".join(missing),
)

# Any new/changed files? (Except the last sessions audittrail file.)
expected_extra = {
self._dp_trail / f"{self._session_number - 1:04d}.json",
}
expected_extra = {
str(fp.relative_to(self.data)).replace(os.sep, "/") for fp in expected_extra
# Any new files? (Not allowed unless other sessions are currently active.)
new = set(self._files) - set(known_files) - expected_extra
if new:
msg = "\n".join(new)
if metas_active:
_log.warning(
"One of the %i currently active sessions added some files:\n%s",
len(metas_active),
msg,
)
else:
raise exceptions.IntegrityError(
f"Found {len(new)} files that were illegally added:\n{msg}"
)

# Any changed files? (Not allowed unless other sessions are currently active.)
changed = {
fp: h
for fp, h in self._files.items()
if fp not in new and fp not in expected_extra and h != known_files.get(fp, h)
}
extra = set(self._files) - set(files_before) - set(expected_extra)
if extra:
msg = "\n".join(extra)
raise exceptions.IntegrityError(
f"Found {len(extra)} files that were illegally added/changed:\n{msg}"
)
if changed:
msg = "\n".join(changed)
if metas_active:
_log.warning(
"One of the %i currently active sessions changed some files:\n%s",
len(metas_active),
msg,
)
else:
raise exceptions.IntegrityError(
f"Found {len(changed)} files that were illegally changed:\n{msg}"
)
return

def _attach_log_handler(self):
Expand Down
Loading

0 comments on commit 425c693

Please sign in to comment.