Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logs #105

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,10 @@ public void retry(String workflowId, boolean resumeSubworkflowTasks) {

private void updateAndPushParents(Workflow workflow, String operation) {
String workflowIdentifier = "";
LOGGER.info("CCC Rerun workflow: {} ", workflow);
while (workflow.hasParent()) {

LOGGER.info("CCC Rerun workflow: {} ", workflow);
// update parent's sub workflow task
Task subWorkflowTask =
executionDAOFacade.getTaskById(workflow.getParentWorkflowTaskId());
Expand Down Expand Up @@ -687,6 +690,7 @@ private void updateAndPushParents(Workflow workflow, String operation) {

workflow = parentWorkflow;
}
LOGGER.info("CCC Rerun workflow: {} ", workflow);
}

private void retry(Workflow workflow) {
Expand Down Expand Up @@ -1305,6 +1309,7 @@ public boolean decide(String workflowId) {
if (!executionLockService.acquireLock(workflowId)) {
return false;
}
LOGGER.info("decide workflow: {} ", workflowId);

// If it is a new workflow, the tasks will be still empty even though include tasks is true
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true);
Expand All @@ -1328,18 +1333,23 @@ public boolean decide(String workflowId) {
DeciderService.DeciderOutcome outcome = deciderService.decide(workflow);
if (outcome.isComplete) {
endExecution(workflow);
LOGGER.info("decide workflow: {} ", workflowId);
return true;
}

List<Task> tasksToBeScheduled = outcome.tasksToBeScheduled;
LOGGER.info("CCC tasksToBeScheduled {}", tasksToBeScheduled);
setTaskDomains(tasksToBeScheduled, workflow);
List<Task> tasksToBeUpdated = outcome.tasksToBeUpdated;
LOGGER.info("CCC tasksToBeUpdated {}", tasksToBeUpdated);
boolean stateChanged = false;

tasksToBeScheduled = dedupAndAddTasks(workflow, tasksToBeScheduled);
LOGGER.info("CCC tasksToBeScheduled {}", tasksToBeScheduled);

Workflow workflowInstance = deciderService.populateWorkflowAndTaskData(workflow);
for (Task task : outcome.tasksToBeScheduled) {
LOGGER.info("CCC task {}", task);
if (systemTaskRegistry.isSystemTask(task.getTaskType())
&& NON_TERMINAL_TASK.test(task)) {
WorkflowSystemTask workflowSystemTask =
Expand All @@ -1354,11 +1364,14 @@ public boolean decide(String workflowId) {
}
}

LOGGER.info("CCC tasksToBeScheduled {}", tasksToBeScheduled);
LOGGER.info("CCC tasksToBeUpdated {}", tasksToBeUpdated);
if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) {
executionDAOFacade.updateTasks(tasksToBeUpdated);
executionDAOFacade.updateWorkflow(workflow);
}

LOGGER.info("scheduleTask task: {} ", tasksToBeScheduled);
stateChanged = scheduleTask(workflow, tasksToBeScheduled) || stateChanged;

if (stateChanged) {
Expand Down Expand Up @@ -1632,6 +1645,8 @@ public void addTaskToQueue(Task task) {
task.getWorkflowPriority(),
taskQueueName,
task.getCallbackAfterSeconds());
LOGGER.debug("Add task '{}' to publish.", task.getTaskId());
taskStatusListener.onTaskScheduled(task);
}

@VisibleForTesting
Expand Down Expand Up @@ -1811,8 +1826,8 @@ private void addTaskToQueue(final List<Task> tasks) {
for (Task task : tasks) {
addTaskToQueue(task);
// notify task
LOGGER.debug("Add task '{}' to publish.", task.getTaskId());
taskStatusListener.onTaskScheduled(task);
// LOGGER.debug("Add task '{}' to publish.", task.getTaskId());
// taskStatusListener.onTaskScheduled(task);
}
}

Expand Down Expand Up @@ -1846,10 +1861,14 @@ private boolean rerunWF(

// Get the workflow
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true);
LOGGER.info("CCC Rerun workflow: {} from task {}", workflowId, taskId);

updateAndPushParents(workflow, "reran");
LOGGER.info("CCC updateAndPushParents workflow: {} from task {}", workflowId, taskId);

// If the task Id is null it implies that the entire workflow has to be rerun
if (taskId == null) {
LOGGER.info("CCC Rerun workflow: {} from task {}", workflowId, taskId);
// remove all tasks
workflow.getTasks().forEach(task -> executionDAOFacade.removeTask(task.getTaskId()));
// Set workflow as RUNNING
Expand All @@ -1871,25 +1890,32 @@ private boolean rerunWF(
properties.getWorkflowOffsetTimeout().getSeconds());
executionDAOFacade.updateWorkflow(workflow);

LOGGER.info("decide workflow: {} from task {}", workflowId, taskId);
decide(workflowId);
return true;
}

// Now iterate through the tasks and find the "specific" task
Task rerunFromTask = null;
for (Task task : workflow.getTasks()) {
LOGGER.info("CCC Rerun task: {}", task);
if (task.getTaskId().equals(taskId)) {
LOGGER.info("CCC Rerun task: {}", taskId);
rerunFromTask = task;
break;
}
}

// If not found look into sub workflows
if (rerunFromTask == null) {
LOGGER.info("CCC Rerun task: {}", taskId);
for (Task task : workflow.getTasks()) {
LOGGER.info("CCC Rerun task: {}", task);
if (task.getTaskType().equalsIgnoreCase(TASK_TYPE_SUB_WORKFLOW)) {
LOGGER.info("CCC Rerun task: {}", task);
String subWorkflowId = task.getSubWorkflowId();
if (rerunWF(subWorkflowId, taskId, taskInput, null, null)) {
LOGGER.info("CCC Rerun task: {}", task);
rerunFromTask = task;
break;
}
Expand All @@ -1898,6 +1924,7 @@ private boolean rerunWF(
}

if (rerunFromTask != null) {
LOGGER.info("CCC Rerun rerunFromTask: {}", rerunFromTask);
// set workflow as RUNNING
workflow.setStatus(WorkflowStatus.RUNNING);
// Reset failure reason from previous run to default
Expand All @@ -1921,7 +1948,9 @@ private boolean rerunWF(
executionDAOFacade.updateTasks(workflow.getTasks());
// Remove all tasks after the "rerunFromTask"
for (Task task : workflow.getTasks()) {
LOGGER.info("CCC Rerun task: {}", task);
if (task.getSeq() > rerunFromTask.getSeq()) {
LOGGER.info("CCC Rerun task: {}", task);
executionDAOFacade.removeTask(task.getTaskId());
}
}
Expand Down Expand Up @@ -1951,6 +1980,7 @@ private boolean rerunWF(
.start(workflow, rerunFromTask, this);
} else {
// Set the task to rerun as SCHEDULED
LOGGER.info("CCC schedule rerunFromTask: {}", rerunFromTask);
rerunFromTask.setStatus(SCHEDULED);
addTaskToQueue(rerunFromTask);
}
Expand Down