Skip to content

Commit

Permalink
Execute jobs when running in samples
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g committed Nov 22, 2023
1 parent 74c8b68 commit 168e1bf
Showing 1 changed file with 130 additions and 2 deletions.
132 changes: 130 additions & 2 deletions samples/jobs/mqtt5_describe_job_execution/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
#include <aws/iotjobs/DescribeJobExecutionSubscriptionRequest.h>
#include <aws/iotjobs/IotJobsClient.h>
#include <aws/iotjobs/RejectedError.h>
#include <aws/iotjobs/StartNextJobExecutionResponse.h>
#include <aws/iotjobs/StartNextPendingJobExecutionRequest.h>
#include <aws/iotjobs/StartNextPendingJobExecutionSubscriptionRequest.h>
#include <aws/iotjobs/UpdateJobExecutionRequest.h>
#include <aws/iotjobs/UpdateJobExecutionSubscriptionRequest.h>
#include <aws/iotjobs/GetPendingJobExecutionsResponse.h>
#include <aws/iotjobs/GetPendingJobExecutionsSubscriptionRequest.h>
#include <aws/iotjobs/JobExecutionSummary.h>

#include <algorithm>
#include <chrono>
Expand Down Expand Up @@ -86,7 +94,7 @@ int main(int argc, char *argv[])
});

// Create Mqtt5Client
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client = builder->Build();
const std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client = builder->Build();
delete builder;
/************************ Run the sample ****************************/

Expand Down Expand Up @@ -167,6 +175,126 @@ int main(int argc, char *argv[])
jobsClient.PublishDescribeJobExecution(
std::move(describeJobExecutionRequest), AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);
publishDescribeJobExeCompletedPromise.get_future().wait();


if (cmdData.input_isCI == false)
{
Aws::Crt::String currentJobId;
int64_t currentExecutionNumber;
int32_t currentVersionNumber;

std::promise<void> pendingExecutionPromise;

{
auto OnSubscribeToStartNextPendingJobExecutionAcceptedResponse =
[&](StartNextJobExecutionResponse *response, int ioErr) {
fprintf(stdout, "Start Job %s\n", response->Execution.value().JobId.value().c_str());
currentJobId = response->Execution->JobId.value();
currentExecutionNumber = response->Execution->ExecutionNumber.value();
currentVersionNumber = response->Execution->VersionNumber.value();

pendingExecutionPromise.set_value();
};

StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest;
subscriptionRequest.ThingName = cmdData.input_thingName;
subAckedPromise = std::promise<void>();
jobsClient.SubscribeToStartNextPendingJobExecutionAccepted(
subscriptionRequest,
AWS_MQTT_QOS_AT_LEAST_ONCE,
OnSubscribeToStartNextPendingJobExecutionAcceptedResponse,
subAckHandler);

subAckedPromise.get_future().wait();

subAckedPromise = std::promise<void>();
jobsClient.SubscribeToStartNextPendingJobExecutionRejected(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler);

subAckedPromise.get_future().wait();

StartNextPendingJobExecutionRequest publishRequest;
publishRequest.ThingName = cmdData.input_thingName;
publishRequest.StepTimeoutInMinutes = 15L;

publishDescribeJobExeCompletedPromise = std::promise<void>();
jobsClient.PublishStartNextPendingJobExecution(
publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);

pendingExecutionPromise.get_future().wait();
}

{
pendingExecutionPromise = std::promise<void>();
auto OnSubscribeToUpdateJobExecutionAcceptedResponse = [&](UpdateJobExecutionResponse *response,
int iotErr) {
fprintf(stdout, "Marked Job %s IN_PROGRESS", currentJobId.c_str());
pendingExecutionPromise.set_value();
};
UpdateJobExecutionSubscriptionRequest subscriptionRequest;
subscriptionRequest.ThingName = cmdData.input_thingName;
subscriptionRequest.JobId = currentJobId;

subAckedPromise = std::promise<void>();
jobsClient.SubscribeToUpdateJobExecutionAccepted(
subscriptionRequest,
AWS_MQTT_QOS_AT_LEAST_ONCE,
OnSubscribeToUpdateJobExecutionAcceptedResponse,
subAckHandler);
subAckedPromise.get_future().wait();

subAckedPromise = std::promise<void>();
jobsClient.SubscribeToUpdateJobExecutionRejected(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler);
subAckedPromise.get_future().wait();

UpdateJobExecutionRequest publishRequest;
publishRequest.ThingName = cmdData.input_thingName;
publishRequest.JobId = currentJobId;
publishRequest.ExecutionNumber = currentExecutionNumber;
publishRequest.Status = JobStatus::IN_PROGRESS;
publishRequest.ExpectedVersion = currentVersionNumber++;
publishDescribeJobExeCompletedPromise = std::promise<void>();
jobsClient.PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);

pendingExecutionPromise.get_future().wait();
}

// Pretend doing some work
std::this_thread::sleep_for(std::chrono::milliseconds(1000));

{
pendingExecutionPromise = std::promise<void>();
UpdateJobExecutionSubscriptionRequest subscriptionRequest;
subscriptionRequest.ThingName = cmdData.input_thingName;
subscriptionRequest.JobId = currentJobId;

auto subscribeHandler = [&](UpdateJobExecutionResponse *response, int ioErr) {
fprintf(stdout, "Marked job %s currentJobId SUCCEEDED", currentJobId.c_str());
pendingExecutionPromise.set_value();
};
subAckedPromise = std::promise<void>();
jobsClient.SubscribeToUpdateJobExecutionAccepted(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscribeHandler, subAckHandler);
subAckedPromise.get_future().wait();

subAckedPromise = std::promise<void>();
jobsClient.SubscribeToUpdateJobExecutionRejected(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler);
subAckedPromise.get_future().wait();

UpdateJobExecutionRequest publishRequest;
publishRequest.ThingName = cmdData.input_thingName;
publishRequest.JobId = currentJobId;
publishRequest.ExecutionNumber = currentExecutionNumber;
publishRequest.Status = JobStatus::SUCCEEDED;
publishRequest.ExpectedVersion = currentVersionNumber++;

jobsClient.PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);

pendingExecutionPromise.get_future().wait();
}
}
}

// Wait just a little bit to let the console print
Expand All @@ -179,4 +307,4 @@ int main(int argc, char *argv[])
}

return 0;
}
}

0 comments on commit 168e1bf

Please sign in to comment.