Skip to content

Commit

Permalink
improve creating mqtt3 and mqtt5 connections
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g committed Dec 6, 2023
1 parent 1006761 commit 38f5376
Showing 1 changed file with 139 additions and 129 deletions.
268 changes: 139 additions & 129 deletions servicetests/tests/JobsExecution/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,140 @@ using namespace Aws::Iotjobs;

void getAvailableJobs(Aws::Crt::String thingName, IotJobsClient &jobsClient);


std::shared_ptr<IotJobsClient> build_mqtt3_client(
Utils::cmdData &cmdData,
std::shared_ptr<Aws::Crt::Mqtt::MqttConnection> &connection,
std::promise<bool> &connectionCompletedPromise,
std::promise<void> &connectionClosedPromise)
{
Aws::Iot::MqttClientConnectionConfigBuilder clientConfigBuilder;
// Create the MQTT builder and populate it with data from cmdData.
clientConfigBuilder =
Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str());
clientConfigBuilder.WithEndpoint(cmdData.input_endpoint);
if (cmdData.input_ca != "")
{
clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str());
}

// Create the MQTT connection from the MQTT builder
auto clientConfig = clientConfigBuilder.Build();
if (!clientConfig)
{
fprintf(
stderr,
"Client Configuration initialization failed with error %s\n",
Aws::Crt::ErrorDebugString(clientConfig.LastError()));
exit(-1);
}

Aws::Iot::MqttClient client3 = Aws::Iot::MqttClient();
connection = client3.NewConnection(clientConfig);
if (!*connection)
{
fprintf(
stderr,
"MQTT Connection Creation failed with error %s\n",
Aws::Crt::ErrorDebugString(connection->LastError()));
exit(-1);
}

// Invoked when a MQTT connect has completed or failed
auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
if (errorCode)
{
fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode));
connectionCompletedPromise.set_value(false);
}
else
{
fprintf(stdout, "Connection completed with return code %d\n", returnCode);
connectionCompletedPromise.set_value(true);
}
};

// Invoked when a disconnect has been completed
auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) {
{
fprintf(stdout, "Disconnect completed\n");
connectionClosedPromise.set_value();
}
};

connection->OnConnectionCompleted = std::move(onConnectionCompleted);
connection->OnDisconnect = std::move(onDisconnect);

if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0))
{
fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError()));
exit(-1);
}
return std::make_shared<IotJobsClient>(connection);
}
std::shared_ptr<IotJobsClient> build_mqtt5_client(
Utils::cmdData &cmdData,
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> &client5,
std::promise<bool> &connectionCompletedPromise,
std::promise<void> &connectionClosedPromise)
{
std::shared_ptr<Aws::Iot::Mqtt5ClientBuilder> builder(
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()));

// Check if the builder setup correctly.
if (builder == nullptr)
{
printf(
"Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError()));
return nullptr;
}
// Create the MQTT5 builder and populate it with data from cmdData.
// Setup connection options
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
connectOptions->WithClientId(cmdData.input_clientId);
builder->WithConnectOptions(connectOptions);
if (cmdData.input_port != 0)
{
builder->WithPort(static_cast<uint16_t>(cmdData.input_port));
}
// Setup lifecycle callbacks
builder->WithClientConnectionSuccessCallback(
[&connectionCompletedPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
"Mqtt5 Client connection succeed, clientid: %s.\n",
eventData.negotiatedSettings->getClientId().c_str());
connectionCompletedPromise.set_value(true);
});
builder->WithClientConnectionFailureCallback([&connectionCompletedPromise](
const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
connectionCompletedPromise.set_value(false);
});
builder->WithClientStoppedCallback([&connectionClosedPromise](const Mqtt5::OnStoppedEventData &) {
fprintf(stdout, "Mqtt5 Client stopped.\n");
connectionClosedPromise.set_value();
});

client5 = builder->Build();
if (client5 == nullptr)
{
fprintf(
stdout, "Failed to Init Mqtt5Client with error code %d: %s.\n", LastError(), ErrorDebugString(LastError()));
exit(-1);
}

if (!client5->Start())
{
fprintf(stderr, "MQTT5 Connection failed to start");
exit(-1);
}
return std::make_shared<IotJobsClient>(client5);
}



std::vector<Aws::Crt::String> availableJobs;

int main(int argc, char *argv[])
Expand All @@ -64,145 +198,23 @@ int main(int argc, char *argv[])
*/
std::promise<bool> connectionCompletedPromise;
std::promise<void> connectionClosedPromise;
std::shared_ptr<Aws::Crt::Mqtt::MqttConnection> connection;
std::shared_ptr<Aws::Crt::Mqtt::MqttConnection> connection = nullptr;
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client5 = nullptr;
std::shared_ptr<IotJobsClient> jobsClient = nullptr;

Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str());

// Check if the builder setup correctly.
if (builder == nullptr)
{
printf(
"Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError()));
return -1;
}
// Create Mqtt5Client
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client5;
IotJobsClient *jobsClient = nullptr;
if (cmdData.input_mqtt_version == 5UL)
{
// Create the MQTT5 builder and populate it with data from cmdData.
// Setup connection options
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
connectOptions->WithClientId(cmdData.input_clientId);
builder->WithConnectOptions(connectOptions);
if (cmdData.input_port != 0)
{
builder->WithPort(static_cast<uint16_t>(cmdData.input_port));
}
// Setup lifecycle callbacks
builder->WithClientConnectionSuccessCallback(
[&connectionCompletedPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
"Mqtt5 Client connection succeed, clientid: %s.\n",
eventData.negotiatedSettings->getClientId().c_str());
connectionCompletedPromise.set_value(true);
});
builder->WithClientConnectionFailureCallback([&connectionCompletedPromise](
const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(
stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
connectionCompletedPromise.set_value(false);
});
builder->WithClientStoppedCallback([&connectionClosedPromise](const Mqtt5::OnStoppedEventData &) {
fprintf(stdout, "Mqtt5 Client stopped.\n");
connectionClosedPromise.set_value();
});

client5 = builder->Build();
if (client5 == nullptr)
{
fprintf(
stdout,
"Failed to Init Mqtt5Client with error code %d: %s.\n",
LastError(),
ErrorDebugString(LastError()));
return -1;
}
fprintf(stdout, "Connecting with MQTT5...\n");
if (!client5->Start())
{
fprintf(stderr, "MQTT5 Connection failed to start");
exit(-1);
}
jobsClient = new IotJobsClient(client5);
jobsClient = build_mqtt5_client(cmdData, client5, connectionCompletedPromise, connectionClosedPromise);
}
else if (cmdData.input_mqtt_version == 3UL)
{
Aws::Iot::MqttClientConnectionConfigBuilder clientConfigBuilder;
// Create the MQTT builder and populate it with data from cmdData.
clientConfigBuilder =
Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str());
clientConfigBuilder.WithEndpoint(cmdData.input_endpoint);
if (cmdData.input_ca != "")
{
clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str());
}

// Create the MQTT connection from the MQTT builder
auto clientConfig = clientConfigBuilder.Build();
if (!clientConfig)
{
fprintf(
stderr,
"Client Configuration initialization failed with error %s\n",
Aws::Crt::ErrorDebugString(clientConfig.LastError()));
exit(-1);
}

Aws::Iot::MqttClient client3 = Aws::Iot::MqttClient();
connection = client3.NewConnection(clientConfig);
if (!*connection)
{
fprintf(
stderr,
"MQTT Connection Creation failed with error %s\n",
Aws::Crt::ErrorDebugString(connection->LastError()));
exit(-1);
}

// Invoked when a MQTT connect has completed or failed
auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
if (errorCode)
{
fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode));
connectionCompletedPromise.set_value(false);
}
else
{
fprintf(stdout, "Connection completed with return code %d\n", returnCode);
connectionCompletedPromise.set_value(true);
}
};

// Invoked when a disconnect has been completed
auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) {
{
fprintf(stdout, "Disconnect completed\n");
connectionClosedPromise.set_value();
}
};

connection->OnConnectionCompleted = std::move(onConnectionCompleted);
connection->OnDisconnect = std::move(onDisconnect);

fprintf(stdout, "Connecting with MQTT3...\n");
if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0))
{
fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError()));
exit(-1);
}
jobsClient = new IotJobsClient(connection);
jobsClient = build_mqtt3_client(cmdData, connection, connectionCompletedPromise, connectionClosedPromise);
}
else
{
fprintf(stderr, "MQTT Version not supported\n");
exit(-1);
}

delete builder;

/************************ Run the sample ****************************/
if (connectionCompletedPromise.get_future().get())
{
Expand Down Expand Up @@ -453,8 +465,6 @@ int main(int argc, char *argv[])
connectionClosedPromise.get_future().wait();
}
}
delete jobsClient;

return 0;
}

Expand Down

0 comments on commit 38f5376

Please sign in to comment.