Skip to content

Commit

Permalink
Better tarball checksum for s3 fix 4998 (#5013)
Browse files Browse the repository at this point in the history
* import calculateCheksum from WMCore/UserFileCache

* pass hashkey to testS3upload, do not recompute

* use calculateChecksum to get tarball hashkey. Fix #4998

* indentation for pylint

* minor pylint fixes
  • Loading branch information
belforte authored Jun 7, 2021
1 parent c99e76b commit 5c34004
Showing 1 changed file with 68 additions and 23 deletions.
91 changes: 68 additions & 23 deletions src/python/CRABClient/JobType/UserTarball.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import socket
import tarfile
import tempfile
import shutil
import hashlib
import uuid

Expand All @@ -27,17 +28,8 @@
from ServerUtilities import FILE_SIZE_LIMIT
from ServerUtilities import uploadToS3, tempSetLogLevel

def testS3upload(s3tester, archiveName, logger):
hasher = hashlib.sha256()
with open(archiveName) as f:
BUF_SIZE = 1024 * 1024 # lets read stuff in 1MByte chunks!
while True:
data = f.read(BUF_SIZE)
if not data:
break
hasher.update(data)
s3hash = hasher.hexdigest()
cachename = "%s.tgz" % s3hash
def testS3upload(s3tester, archiveName, hashkey, logger):
cachename = "%s.tgz" % hashkey
try:
t1 = time.time()
timestamp = time.strftime('%y%m%d_%H%M%S', time.gmtime())
Expand Down Expand Up @@ -66,6 +58,68 @@ def testS3upload(s3tester, archiveName, logger):
s3report['seconds'] = int(t2-t1)
return s3report

def calculateChecksum(tarfile_, exclude=None):
"""
Imported here from WMCore/Services/UserFileCache.py
Originally written by Marco Mascheroni: refs
https://github.com/mmascher/WMCore/commit/01855223030c4936234be62df6a5a0b2a911144e
https://github.com/dmwm/WMCore/commit/5e910461eb82e7bfcba473def6511e3f94259672
https://github.com/dmwm/CRABServer/issues/4948
Calculate the checksum of the tar file in input.
The tarfile_ input parameter could be a string or a file object (anything compatible
with the fileobj parameter of tarfile.open).
The exclude parameter could be a list of strings, or a callable that takes as input
the output of the list of tarfile.getmembers() and return a list of strings.
The exclude param is interpreted as a list of files that will not be taken into consideration
when calculating the checksum.
The output is the checksum of the tar input file.
The checksum is calculated taking into consideration the names of the objects
in the tarfile (files, directories etc) and the content of each file.
Each file is exctracted, read, and then deleted right after the input is passed
to the hasher object. The file is read in chuncks of 4096 bytes to avoid memory
issues.
"""
if not exclude: # [] is a dangerous value for a param
exclude = []

hasher = hashlib.sha256()

## "massage" out the input parameters
if isinstance(tarfile_, (str, bytes)):
tar = tarfile.open(tarfile_, mode='r')
else:
tar = tarfile.open(fileobj=tarfile_, mode='r')

if exclude and hasattr(exclude, '__call__'):
excludeList = exclude(tar.getmembers())
else:
excludeList = exclude

tmpDir = tempfile.mkdtemp()
try:
for tarmember in tar:
if tarmember.name in excludeList:
continue
hasher.update(tarmember.name)
if tarmember.isfile() and tarmember.name.split('.')[-1] != 'pkl':
tar.extractall(path=tmpDir, members=[tarmember])
fn = os.path.join(tmpDir, tarmember.name)
with open(fn, 'rb') as fd:
while True:
buf = fd.read(4096)
if not buf:
break
hasher.update(buf)
os.remove(fn)
finally:
# never leave tmddir around
shutil.rmtree(tmpDir)
checksum = hasher.hexdigest()

return checksum


class UserTarball(object):
"""
_UserTarball_
Expand Down Expand Up @@ -197,7 +251,6 @@ def printSortedContent(self):
contentList += ("\n%" + str(ndigits) + "s\t%s") % (size, name)
return contentList


def upload(self, filecacheurl=None):
"""
Upload the tarball to the File Cache
Expand All @@ -218,7 +271,7 @@ def upload(self, filecacheurl=None):
archiveSize = "%d MB" % (archiveSizeKB//1024)
if archiveSizeBytes > FILE_SIZE_LIMIT:
msg = ("%sError%s: input tarball size %s exceeds maximum allowed limit of %d MB" %
(colors.RED, colors.NORMAL, archiveSize, FILE_SIZE_LIMIT//1024//1024))
(colors.RED, colors.NORMAL, archiveSize, FILE_SIZE_LIMIT//1024//1024))
msg += self.printSortedContent()
raise SandboxTooBigException(msg)

Expand All @@ -229,15 +282,7 @@ def upload(self, filecacheurl=None):
if 'S3' in filecacheurl.upper():
# use S3
# generate a 32char hash like UserFileCache used to do
hasher = hashlib.sha256()
with open(archiveName) as f:
BUF_SIZE = 1024*1024 # lets read stuff in 1MByte chunks!
while True:
data = f.read(BUF_SIZE)
if not data:
break
hasher.update(data)
hashkey = hasher.hexdigest()
hashkey = calculateChecksum(archiveName, exclude=NEW_USER_SANDBOX_EXCLUSIONS)
# the ".tar.gz" suffix here is forced by other places in the client which add it when
# storing tarball name in task table. Not very elegant to need to hardcode in several places.
cachename = "%s.tar.gz" % hashkey
Expand All @@ -256,7 +301,7 @@ def upload(self, filecacheurl=None):
raise CachefileNotFoundException
hashkey = str(result['hashkey'])
# upload a copy to S3 dev as well, just to stress it a bit, this never raises
s3report = testS3upload(self.s3tester, archiveName, self.logger)
s3report = testS3upload(self.s3tester, archiveName, hashkey, self.logger)
# report also how long it took uploading to UFC (which surely worked if we are here)
s3report['ufcseconds'] = ufcSeconds
# upload S3 test report to crabcache
Expand Down

0 comments on commit 5c34004

Please sign in to comment.