Skip to content

Commit

Permalink
Fix client connection
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g committed Dec 19, 2023
1 parent 589b21f commit 363c2c5
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,12 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
int client1_messages = 0;
int client2_messages = 0;

std::vector<int> receivedMessages;
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
receivedMessages.push_back(0);
}

Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
Expand Down Expand Up @@ -1989,7 +1995,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
//++receivedMessages[message_int];
++receivedMessages[message_int];
client1_messages++;
if (client1_messages == 5)
{
Expand All @@ -2009,7 +2015,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
//++receivedMessages[message_int];
++receivedMessages[message_int];
client2_messages++;
if (client2_messages == 5)
{
Expand Down Expand Up @@ -2038,14 +2044,14 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi

/* second subscriber */
s_setupConnectionLifeCycle(builder2, connectionPromise2, stoppedPromise2, "Subscriber 2");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client2 = builder->Build();
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client2 = builder2->Build();

ASSERT_TRUE(mqtt5Client2);
ASSERT_TRUE(mqtt5Client2->Start());

/* publisher */
s_setupConnectionLifeCycle(publish_builder, connectionPromise3, stoppedPromise3, "Publisher");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Publisher = builder->Build();
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Publisher = publish_builder->Build();

ASSERT_TRUE(mqtt5Publisher);
ASSERT_TRUE(mqtt5Publisher->Start());
Expand All @@ -2060,14 +2066,22 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(allocator);
subscribe->WithSubscription(std::move(subscription));

/* Subscribe to test topic */
Mqtt5::Subscription subscription2(sharedTopicFilter, Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe2 = std::make_shared<Mqtt5::SubscribePacket>(allocator);
subscribe2->WithSubscription(std::move(subscription));

std::promise<void> suback;
auto onSubAck = [&](int, std::shared_ptr<SubAckPacket>) { suback.set_value(); };

/*subscribe both clients */
/* subscribe first client */
ASSERT_TRUE(mqtt5Client->Subscribe(subscribe, onSubAck));
suback.get_future().wait();

suback = std::promise<void>();
ASSERT_TRUE(mqtt5Client2->Subscribe(subscribe, onSubAck));

/* subscribe second client */
ASSERT_TRUE(mqtt5Client2->Subscribe(subscribe2, onSubAck));
suback.get_future().wait();

/* Publish message 10 to test topic */
Expand All @@ -2082,9 +2096,16 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
client1_received.get_future().wait();
client2_received.get_future().wait();

/* makes sure messages are distrubuted evenly between the two clients*/
ASSERT_INT_EQUALS(5, client1_messages);
ASSERT_INT_EQUALS(5, client2_messages);

/* make sure all messages are received */
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
ASSERT_TRUE(receivedMessages[i] > 0);
}

/* Stop all clients */
mqtt5Client->Stop();
mqtt5Client2->Stop();
Expand Down

0 comments on commit 363c2c5

Please sign in to comment.