Skip to content

Commit

Permalink
Merge pull request #4738 from matz-e/better-splitting-dag-status
Browse files Browse the repository at this point in the history
Improve automatic splitting et al.
  • Loading branch information
belforte authored Jan 30, 2018
2 parents ab8b516 + 102aa28 commit c7ee01b
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/python/CRABClient/ClientMapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
'useparent' : {'default': None, 'config': ['Data.useParent'], 'type': 'BooleanType', 'required': False},
'secondarydata' : {'default': None, 'config': ['Data.secondaryInputDataset'], 'type': 'StringType', 'required': False},
'ignorelocality' : {'default': False, 'config': ['Data.ignoreLocality'], 'type': 'BooleanType', 'required': False},
'splitalgo' : {'default': None, 'config': ['Data.splitting'], 'type': 'StringType', 'required': True },
'algoargs' : {'default': 8 * 60**2, 'config': ['Data.unitsPerJob'], 'type': 'IntType', 'required': False },
'splitalgo' : {'default': 'Automatic', 'config': ['Data.splitting'], 'type': 'StringType', 'required': True },
'algoargs' : {'default': 480, 'config': ['Data.unitsPerJob'], 'type': 'IntType', 'required': False },
'totalunits' : {'default': 0, 'config': ['Data.totalUnits'], 'type': 'IntType', 'required': False},
'lfn' : {'default': None, 'config': ['Data.outLFNDirBase'], 'type': 'StringType', 'required': False},
'publication' : {'default': True, 'config': ['Data.publication'], 'type': 'BooleanType', 'required': False},
Expand Down
2 changes: 2 additions & 0 deletions src/python/CRABClient/Commands/SubCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def validateConfig(self):
if not isinstance(obj, requiredType):
msg = "Invalid CRAB configuration: Parameter %s requires a value of type %s (while a value of type %s was given)." \
% (paramName, str(requiredType), str(type(obj)))
if paramName == "Data.totalUnits" and isinstance(obj, types.FloatType):
continue
if paramName == "Data.userInputFiles":
msg += "\nIn CRAB v3.3.14 the configuration parameter Data.userInputFiles has been modified to directly take a (python) list of primary input files."
msg += " Previously it was taking the name of a local text file where the primary input files were listed."
Expand Down
14 changes: 11 additions & 3 deletions src/python/CRABClient/Commands/resubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,25 @@ def processJobIds(self, jobList):
if self.options.publication:
return None

automatic = any('-' in n for _, n in jobList)

def consider(jobId):
if automatic and (jobId.startswith('0-') or '-' not in jobId):
return False
return True

# Build a dictionary from the jobList
jobStatusDict = {}
for jobStatus, jobId in jobList:
jobStatusDict[jobId] = jobStatus
if consider(jobId):
jobStatusDict[jobId] = jobStatus

failedJobStatus = 'failed'
finishedJobStatus = 'finished'

possibleToResubmitJobIds = []
for jobStatus, jobId in jobList:
if (self.options.force and jobStatus == finishedJobStatus) or jobStatus == failedJobStatus:
if ((self.options.force and jobStatus == finishedJobStatus) or jobStatus == failedJobStatus) and consider(jobId):
possibleToResubmitJobIds.append(jobId)

allowedJobStates = [failedJobStatus]
Expand Down Expand Up @@ -291,7 +299,7 @@ def validateOptions(self):

## Check the format of the jobids option.
if self.options.jobids:
useLists = self.cachedinfo['OriginalConfig'].Data.splitting != 'Automatic'
useLists = getattr(self.cachedinfo['OriginalConfig'].Data, 'splitting', 'Automatic') != 'Automatic'
jobidstuple = validateJobids(self.options.jobids, useLists)
self.jobids = [str(jobid) for (_, jobid) in jobidstuple]

Expand Down
135 changes: 108 additions & 27 deletions src/python/CRABClient/Commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __call__(self):
# If the task is already on the grid, show the dagman status
combinedStatus = dagStatus = self.printDAGStatus(crabDBInfo, statusCacheInfo)

shortResult = self.printShort(statusCacheInfo)
shortResult = self.printOverview(statusCacheInfo)
pubStatus = self.printPublication(publicationEnabled, shortResult['jobsPerStatus'], shortResult['numProbes'],
shortResult['numUnpublishable'], asourl, asodb, taskname, user, crabDBInfo)
self.printErrors(statusCacheInfo)
Expand All @@ -137,8 +137,13 @@ def __call__(self):
# If user correctly passed some jobid CSVs to use in the status --long, self.jobids
# will be a list of strings already parsed from the input by the validateOptions()
if self.jobids:
## Check the format of the jobids option.
if self.options.jobids:
useLists = getattr(self.cachedinfo['OriginalConfig'].Data, 'splitting', 'Automatic') != 'Automatic'
jobidstuple = validateJobids(self.options.jobids, useLists)
self.jobids = [str(jobid) for (_, jobid) in jobidstuple]
self.checkUserJobids(statusCacheInfo, self.jobids)
sortdict = self.printLong(statusCacheInfo, self.jobids, quiet = (not self.options.long))
sortdict = self.printDetails(statusCacheInfo, self.jobids, quiet = (not self.options.long))
if self.options.sort:
self.printSort(sortdict, self.options.sort)
if self.options.json:
Expand Down Expand Up @@ -227,15 +232,61 @@ def publicationStatus(self, workflow, asourl, asodb, user):
# so return only the inner dict.
return pubInfo['result'][0]

@staticmethod
def translateStatus(status, dbstatus):
"""Translate from DAGMan internal integer status to a string.
Uses parameter `dbstatus` to clarify if the state is due to the
user killing the task.
"""
status = {1:'SUBMITTED', 2:'SUBMITTED', 3:'SUBMITTED', 4:'SUBMITTED', 5:'COMPLETED', 6:'FAILED'}[status]
# Unfortunately DAG code for killed task is 6, just as like for finished DAGs with failed jobs
# Relabeling the status from 'FAILED' to 'FAILED (KILLED)' if a successful kill command was issued
if status == 'FAILED' and dbstatus == 'KILLED':
status = 'FAILED (KILLED)'
return status

@classmethod
def collapseDAGStatus(cls, dagInfo, dbstatus):
"""Collapse the status of one or several DAGs to a single one.
Take into account that subdags can be submitted to the queue on the
schedd, but not yet started.
"""
status_order = ['SUBMITTED', 'FAILED', 'FAILED (KILLED)', 'COMPLETED']

subDagInfos = dagInfo.get('SubDags', {})
subDagStatus = dagInfo.get('SubDagStatus', {})
# Regular splitting, return status of DAG
if len(subDagInfos) == 0 and len(subDagStatus) == 0:
return cls.translateStatus(dagInfo['DagStatus'], dbstatus)

def check_queued(status):
# 99 is the status to expect a submitted DAG. If there are less
# actual DAG status informations than expected DAGs, at least one
# DAG has to be queued.
if dbstatus != 'KILLED' and len(subDagInfos) < len([k for k in subDagStatus if subDagStatus[k] == 99]):
return 'SUBMITTED'
return status

# Tails active: return most active tail status according to
# `status_order`
if len(subDagInfos) > 1:
states = [cls.translateStatus(subDagInfos[k]['DagStatus'], dbstatus) for k in subDagInfos if k > 0]
for status in status_order:
if states.count(status) > 0:
return check_queued(status)
# If no tails are active, return the status of the processing DAG.
if len(subDagInfos) > 0:
return check_queued(cls.translateStatus(subDagInfos[0]['DagStatus'], dbstatus))
return check_queued(cls.translateStatus(dagInfo['DagStatus'], dbstatus))

def printDAGStatus(self, crabDBInfo, statusCacheInfo):
# Get dag status from the node_state/job_log summary
dagman_codes = {1:'SUBMITTED', 2:'SUBMITTED', 3:'SUBMITTED', 4:'SUBMITTED', 5:'COMPLETED', 6:'FAILED'}
dagStatus = dagman_codes.get(statusCacheInfo['DagStatus']['DagStatus'])
#Unfortunately DAG code for killed task is 6, just as like for finished DAGs with failed jobs
#Relabeling the status from 'FAILED' to 'FAILED (KILLED)' if a successful kill command was issued
dbstatus = getColumn(crabDBInfo, 'tm_task_status')
if dagStatus == 'FAILED' and dbstatus == 'KILLED':
dagStatus = 'FAILED (KILLED)'

dagInfo = statusCacheInfo['DagStatus']
dagStatus = self.collapseDAGStatus(dagInfo, dbstatus)

msg = "Status on the scheduler:\t" + dagStatus
self.logger.info(msg)
Expand Down Expand Up @@ -298,7 +349,7 @@ def checkUserJobids(self, statusCacheInfo, userJobids):
if wrongJobIds:
raise ConfigurationException("The following jobids were not found in the task: %s" % wrongJobIds)

def printLong(self, dictresult, jobids = None, quiet = False):
def printDetails(self, dictresult, jobids=None, quiet=False):
""" Print detailed information about a task and each job.
"""
sortdict = {}
Expand All @@ -318,11 +369,23 @@ def printLong(self, dictresult, jobids = None, quiet = False):
cpu_sum = 0
wall_sum = 0

automatic = any('-' in k for k in dictresult)

def translateJobStatus(jobid):
status = dictresult[jobid]['State']
if not automatic:
return status
if jobid.startswith('0-') and status in ('finished', 'failed'):
return 'no output'
elif automatic and '-' not in jobid and status == 'failed':
return 'rescheduled'
return status

# Chose between the jobids passed by the user or all jobids that are in the task
jobidsToUse = jobids if jobids else dictresult.keys()
for jobid in sorted(jobidsToUse, cmp=compareJobids):
info = dictresult[str(jobid)]
state = info['State']
state = translateJobStatus(jobid)
site = ''
if info.get('SiteHistory'):
site = info['SiteHistory'][-1]
Expand Down Expand Up @@ -365,7 +428,7 @@ def printLong(self, dictresult, jobids = None, quiet = False):
ec = 'Unknown'
if 'Error' in info:
ec = str(info['Error'][0]) #exit code of this failed job
elif state in ['finished']:
elif info['State'] in ['finished']:
ec = '0'
sortdict[str(jobid)] = {'state': state, 'site': site, 'runtime': wall_str, 'memory': mem, 'cpu': cpu, \
'retries': info.get('Retries', 0), 'restarts': info.get('Restarts', 0), 'waste': waste, 'exitcode': ec}
Expand Down Expand Up @@ -400,7 +463,7 @@ def printLong(self, dictresult, jobids = None, quiet = False):

return sortdict

def printShort(self, statusCacheInfo):
def printOverview(self, statusCacheInfo):
""" Give a summary of the job statuses, keeping in mind that:
- If there is a job with id 0 then this is the probe job for the estimation
This is the so called automatic splitting
Expand All @@ -423,18 +486,9 @@ def printShort(self, statusCacheInfo):
result['jobsPerStatus'] = jobsPerStatus
result['jobList'] = jobList

# Collect information about jobs
# Create a dictionary like { 'finished' : 1, 'running' : 3}
states = {}
for jobid, statusDict in statusCacheInfo.iteritems():
status = statusDict['State']
if '-' in jobid: #skip splitting and completing jobs
continue
states[status] = states.setdefault(status, 0) + 1


# Collect information about probejobs and subjobs
# Create a dictionary like { 'finished' : 1, 'running' : 3}
states = {}
statesPJ = {}
statesSJ = {}
failedProcessing = 0
Expand All @@ -444,17 +498,36 @@ def printShort(self, statusCacheInfo):
statesPJ[status] = statesPJ.setdefault(status, 0) + 1
elif '-' in jobid:
statesSJ[status] = statesSJ.setdefault(status, 0) + 1
elif status == 'failed':
failedProcessing += 1
else:
states[status] = states.setdefault(status, 0) + 1
if status == 'failed':
failedProcessing += 1
result['numProbes'] = sum(statesPJ.values())
result['numUnpublishable'] = 0
if result['numProbes'] > 0:
result['numUnpublishable'] = failedProcessing

def terminate(states, status, target='no output'):
if status in states:
states[target] = states.setdefault(target, 0) + states.pop(status)

toPrint = [('Jobs', states)]
if self.options.long:
toPrint = [('Probe jobs', statesPJ), ('Jobs', states), ('Tail jobs', statesSJ)]
if sum(statesPJ.values()) > 0:
terminate(statesPJ, 'failed')
terminate(statesPJ, 'finished')
terminate(states, 'failed', target='rescheduled')
elif sum(statesPJ.values()) > 0 and not self.options.long:
if 'failed' in states:
states.pop('failed')
for status in statesSJ:
states[status] = states.setdefault(status, 0) + statesSJ[status]

# And if the dictionary is not empty, print it
for jobtype, currStates in [('Probe jobs', statesPJ), ('Jobs', states), ('Tail jobs', statesSJ)]:
for jobtype, currStates in toPrint:
if currStates:
total = sum( currStates[st] for st in currStates )
total = sum(currStates[st] for st in currStates)
state_list = sorted(currStates)
self.logger.info("\n{0:32}{1} {2}".format(jobtype + ' status:', self._printState(state_list[0], 13), self._percentageString(state_list[0], currStates[state_list[0]], total)))
for status in state_list[1:]:
Expand Down Expand Up @@ -509,8 +582,16 @@ def printErrors(self, dictresult):
ec_numjobs = {}
unknown = 0
are_failed_jobs = False
automatic = any('-' in k for k in dictresult)

def consider(jobid):
if self.options.long:
return True
if automatic and (jobid.startswith('0-') or '-' not in jobid):
return False
return True
for jobid, status in dictresult.iteritems():
if status['State'] == 'failed':
if status['State'] == 'failed' and consider(jobid):
are_failed_jobs = True
if 'Error' in status:
ec = status['Error'][0] #exit code of this failed job
Expand Down
2 changes: 1 addition & 1 deletion src/python/CRABClient/Commands/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def validateConfig(self):
if not int(self.configuration.Data.unitsPerJob) > 0:
msg = "Invalid CRAB configuration: Parameter Data.unitsPerJob must be > 0, not %s." % (self.configuration.Data.unitsPerJob)
return False, msg
elif getattr(self.configuration.Data, 'splitting', 'invalid') != 'Automatic':
elif getattr(self.configuration.Data, 'splitting', 'Automatic') != 'Automatic':
# The default value is only valid for automatic splitting!
msg = "Invalid CRAB configuration: Parameter Data.unitsPerJob is missing."
return False, msg
Expand Down
2 changes: 1 addition & 1 deletion src/python/CRABClient/JobType/Analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def validateBasicConfig(self, config):
required values are there and optional values don't conflict.
"""

self.splitAlgo = getattr(config.Data, 'splitting', None)
self.splitAlgo = getattr(config.Data, 'splitting', 'Automatic')
if not self.splitAlgo:
msg = "Invalid CRAB configuration: Parameter Data.splitting not specified."
return False, msg
Expand Down

0 comments on commit c7ee01b

Please sign in to comment.