Skip to content

Commit

Permalink
Make status2 return dict (mostly) backwards compatible (#4692)
Browse files Browse the repository at this point in the history
  • Loading branch information
emaszs authored Apr 27, 2017
1 parent e0c3749 commit 9978ac2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 34 deletions.
14 changes: 7 additions & 7 deletions src/python/CRABClient/Commands/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.JobType.BasicJobType import BasicJobType
from CRABClient.UserUtilities import getMutedStatusInfo
from CRABClient.ClientExceptions import RESTCommunicationException, ConfigurationException, \
from CRABClient.ClientExceptions import ConfigurationException, \
UnknownOptionException, ClientException

from ServerUtilities import FEEDBACKMAIL, getColumn
from ServerUtilities import FEEDBACKMAIL

class report(SubCommand):
"""
Expand Down Expand Up @@ -289,12 +289,12 @@ def collectReportData(self):
self.logger.debug('Looking up report for task %s' % self.cachedinfo['RequestName'])

# Query server for information from the taskdb, intput/output file metadata from metadatadb
dictresult, status, reason = server.get(self.uri, data = {'workflow': self.cachedinfo['RequestName'], 'subresource': 'report2'})
dictresult, status, _ = server.get(self.uri, data = {'workflow': self.cachedinfo['RequestName'], 'subresource': 'report2'})

self.logger.debug("Result: %s" % dictresult)
self.logger.info("Running crab status2 first to fetch necessary information.")
# Get job statuses
crabDBInfo, shortResult = getMutedStatusInfo(self.logger)
statusDict, shortResult = getMutedStatusInfo(self.logger)

if not shortResult:
# No point in continuing if the job list is empty.
Expand All @@ -316,12 +316,12 @@ def collectReportData(self):
if jobStatusDict.get(jobId) in ['finished']:
reportData['runsAndLumis'][jobId] = dictresult['result'][0]['runsAndLumis'][jobId]

reportData['publication'] = True if getColumn(crabDBInfo, 'tm_publication') == "T" else False
userWebDirURL = getColumn(crabDBInfo, 'tm_user_webdir')
reportData['publication'] = statusDict['publicationEnabled']
userWebDirURL = statusDict['userWebDirURL']
numJobs = len(shortResult['jobList'])

reportData['lumisToProcess'] = self.getLumisToProcess(userWebDirURL, numJobs, self.cachedinfo['RequestName'])
reportData['inputDataset'] = getColumn(crabDBInfo, 'tm_input_dataset')
reportData['inputDataset'] = statusDict['inputDataset']

inputDatasetInfo = self.getInputDatasetLumis(reportData['inputDataset'], userWebDirURL)['inputDataset']
reportData['inputDatasetLumis'] = inputDatasetInfo['lumis']
Expand Down
8 changes: 4 additions & 4 deletions src/python/CRABClient/Commands/resubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from CRABClient import __version__
from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientExceptions import ConfigurationException
from CRABClient.UserUtilities import getMutedStatusInfo, getColumn
from CRABClient.UserUtilities import getMutedStatusInfo
from CRABClient.ClientUtilities import validateJobids, checkStatusLoop, colors

class resubmit(SubCommand):
Expand All @@ -32,7 +32,7 @@ def __call__(self):
serverFactory = CRABClient.Emulator.getEmulator('rest')
server = serverFactory(self.serverurl, self.proxyfilename, self.proxyfilename, version = __version__)

crabDBInfo, jobList = getMutedStatusInfo(self.logger)
statusDict, jobList = getMutedStatusInfo(self.logger)

if not jobList:
msg = "%sError%s:" % (colors.RED, colors.NORMAL)
Expand All @@ -41,11 +41,11 @@ def __call__(self):
self.logger.info(msg)
return None

publicationEnabled = getColumn(crabDBInfo, "tm_publication")
publicationEnabled = statusDict['publicationEnabled']
jobsPerStatus = jobList['jobsPerStatus']

if self.options.publication:
if publicationEnabled == "F":
if publicationEnabled:
msg = "Publication was disabled for this task. Therefore, "
msg += "there are no publications to resubmit."
self.logger.info(msg)
Expand Down
92 changes: 69 additions & 23 deletions src/python/CRABClient/Commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import urllib
from ast import literal_eval
from datetime import datetime

import CRABClient.Emulator
from CRABClient import __version__
Expand All @@ -14,8 +15,7 @@
from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientExceptions import ClientException, ConfigurationException

from ServerUtilities import TASKDBSTATUSES_TMP, FEEDBACKMAIL

from ServerUtilities import getEpochFromDBTime, TASKDBSTATUSES_TMP, FEEDBACKMAIL

PUBLICATION_STATES = {
'not_published': 'idle',
Expand All @@ -25,9 +25,8 @@
}

class status(SubCommand):
"""
Query the status of your tasks, or detailed information of one or more tasks
identified by the -d/--dir option.
""" Query the status of your tasks, or detailed information of one or more tasks
identified by the -d/--dir option.
"""

shortnames = ['st']
Expand Down Expand Up @@ -55,8 +54,9 @@ def __call__(self):
#Print information from the database
self.printTaskInfo(crabDBInfo, user)
if not rootDagId:
self.logger.debug("The task has not been submitted to the Grid scheduler yet. Not printing job information.")
return crabDBInfo, None
failureMsg = "The task has not been submitted to the Grid scheduler yet. Not printing job information."
self.logger.debug(failureMsg)
return self.makeStatusReturnDict(crabDBInfo, statusFailureMsg=failureMsg), None

self.logger.debug("The CRAB server submitted your task to the Grid scheduler (cluster ID: %s)" % rootDagId)

Expand All @@ -71,16 +71,17 @@ def __call__(self):
# If we didn't find a webdir in the DB and the DAG is held,
# the task bootstrapping failed before or during the webdir
# upload and the reason should be printed.
msg = "The task failed to bootstrap on the Grid scheduler."
msg += " Please send an e-mail to %s." % (FEEDBACKMAIL)
msg += "\nHold reason: %s" % (res['DagmanHoldReason'])
self.logger.info(msg)
failureMsg = "The task failed to bootstrap on the Grid scheduler."
failureMsg += " Please send an e-mail to %s." % (FEEDBACKMAIL)
failureMsg += "\nHold reason: %s" % (res['DagmanHoldReason'])
self.logger.info(failureMsg)
else:
# if the dag is submitted and the webdir is not there we have to wait that AdjustSites run
# and upload the webdir location to the server
self.logger.info("Waiting for the Grid scheduler to bootstrap your task")
self.logger.debug("Schedd has not reported back the webdir (yet)")
return crabDBInfo, None
failureMsg = "Schedd has not reported back the webdir (yet)"
self.logger.debug(failureMsg)
return self.makeStatusReturnDict(crabDBInfo, statusFailureMsg=failureMsg), None

self.logger.debug("Webdir is located at %s", webdir)
# Download status_cache file
Expand All @@ -93,9 +94,10 @@ def __call__(self):
statusCacheFilename = getFileFromURL(url, filename=filename, proxyfilename=self.proxyfilename)
except ClientException as ce:
self.logger.info("Waiting for the Grid scheduler to report back the status of your task")
self.logger.debug("Cannot retrieve the status_cache file. Maybe the task process has not run yet?")
self.logger.debug("Got: %s" % ce)
return crabDBInfo, None
failureMsg = "Cannot retrieve the status_cache file. Maybe the task process has not run yet?"
failureMsg += "\nGot: %s" % ce
self.logger.debug(failureMsg)
return self.makeStatusReturnDict(crabDBInfo, statusFailureMsg=failureMsg), None
else:
with open(statusCacheFilename) as fd:
# Skip first two lines of the file. They contain the checkpoint locations for the job_log / fjr_parse_results
Expand All @@ -108,7 +110,7 @@ def __call__(self):

self.printDAGStatus(crabDBInfo, statusCacheInfo)
shortResult = self.printShort(statusCacheInfo)
self.printPublication(publicationEnabled, shortResult['jobsPerStatus'], asourl, asodb,
pubStatus = self.printPublication(publicationEnabled, shortResult['jobsPerStatus'], asourl, asodb,
taskname, user, crabDBInfo)
self.printErrors(statusCacheInfo)

Expand All @@ -125,7 +127,50 @@ def __call__(self):
if self.options.json:
self.logger.info(json.dumps(statusCacheInfo))

return crabDBInfo, shortResult
statusDict = self.makeStatusReturnDict(crabDBInfo, '', shortResult, statusCacheInfo, pubStatus)
return statusDict, shortResult

def makeStatusReturnDict(self, crabDBInfo, statusFailureMsg = '',
shortResult = {}, statusCacheInfo = {},
pubStatus = {}):
""" Create a dictionary which is mostly identical to the dictionary
that was being returned by the old status (plus a few other keys
needed by the other client commands). This is to ensure backward
compatibility after the status2 transition for users relying on
this dictionary in their scripts.
"""

statusDict = {}
statusDict['status'] = getColumn(crabDBInfo, 'tm_task_status')
statusDict['username'] = getColumn(crabDBInfo, 'tm_username')
statusDict['taskFailureMsg'] = getColumn(crabDBInfo, 'tm_task_failure')
statusDict['taskWarningMsg'] = getColumn(crabDBInfo, 'tm_task_warnings')
statusDict['outdatasets'] = getColumn(crabDBInfo, 'tm_output_dataset')
statusDict['schedd'] = getColumn(crabDBInfo, 'tm_schedd')
statusDict['collector'] = getColumn(crabDBInfo, 'tm_collector')
statusDict['ASOURL'] = getColumn(crabDBInfo, 'tm_asourl')
statusDict['command'] = getColumn(crabDBInfo, 'tm_task_command')
statusDict['publicationEnabled'] = True if getColumn(crabDBInfo, 'tm_publication') == 'T' else False
statusDict['userWebDirURL'] = getColumn(crabDBInfo, 'tm_user_webdir')
statusDict['inputDataset'] = getColumn(crabDBInfo, 'tm_input_dataset')

dbStartTime = getColumn(crabDBInfo, 'tm_start_time')
statusDict['submissionTime'] = getEpochFromDBTime(
datetime.strptime(dbStartTime, '%Y-%m-%d %H:%M:%S.%f'))

statusDict['statusFailureMsg'] = statusFailureMsg
statusDict['jobsPerStatus'] = shortResult.get('jobsPerStatus', {})
statusDict['publication'] = pubStatus.get('status', {})
statusDict['publicationFailures'] = pubStatus.get('failure_reasons', {})

jobs = {}
if statusCacheInfo:
for jobid, status in statusCacheInfo.items():
jobs[jobid] = {'State': status['State']}
if status['State'] == 'failed' and 'Error' in status:
jobs[jobid]['Error'] = status['Error']
statusDict['jobs'] = jobs
return statusDict

def _percentageString(self, state, value, total):
state = PUBLICATION_STATES.get(state, state)
Expand Down Expand Up @@ -361,7 +406,7 @@ def printShort(self, statusCacheInfo):
status = info['State']
jobsPerStatus.setdefault(status, 0)
jobsPerStatus[status] += 1
jobList.append((status, job))
jobList.append([status, job])
result['jobsPerStatus'] = jobsPerStatus
result['jobList'] = jobList

Expand Down Expand Up @@ -712,12 +757,12 @@ def printPublication(self, publicationEnabled, jobsPerStatus, asourl, asodb, tas
pubInfo = pubInfo

if 'publication' not in pubInfo:
return
return pubStatus
## If publication was disabled, print a pertinent message and return.
if 'disabled' in pubInfo['publication']:
msg = "\nNo publication information (publication has been disabled in the CRAB configuration file)"
self.logger.info(msg)
return
return pubStatus
## List of output datasets that are going to be (or are already) published. This
## list is written into the Tasks DB by the post-job when it does the upload of
## the output files metadata. This means that the list will be empty until one
Expand All @@ -730,14 +775,14 @@ def printPublication(self, publicationEnabled, jobsPerStatus, asourl, asodb, tas
self.printOutputDatasets(outputDatasets)
msg = "\nNo publication information available yet"
self.logger.info(msg)
return
return pubStatus
## Case in which there was an error in retrieving the publication status.
if 'error' in pubInfo['publication']:
msg = "\nPublication status:\t\t%s" % (pubInfo['publication']['error'])
self.logger.info(msg)
## Print the output datasets with the corresponding DAS URL.
self.printOutputDatasets(outputDatasets, includeDASURL = True)
return
return pubStatus
if pubInfo['publication'] and outputDatasets:
states = pubInfo['publication']
## Don't consider publication states for which 0 files are in this state.
Expand Down Expand Up @@ -780,6 +825,7 @@ def printPublication(self, publicationEnabled, jobsPerStatus, asourl, asodb, tas
## Print the output datasets with the corresponding DAS URL.
self.printOutputDatasets(outputDatasets, includeDASURL = True)

return pubStatus


def setOptions(self):
Expand Down

0 comments on commit 9978ac2

Please sign in to comment.