Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Sep 12, 2024
1 parent e14c814 commit e73f8c1
Show file tree
Hide file tree
Showing 7 changed files with 591 additions and 1 deletion.
2 changes: 1 addition & 1 deletion crt/aws-c-mqtt
Submodule aws-c-mqtt updated 45 files
+12 −3 .github/workflows/ci.yml
+4 −6 .github/workflows/clang-format.yml
+2 −0 CMakeLists.txt
+29 −0 bin/elastishadow/CMakeLists.txt
+1,272 −0 bin/elastishadow/main.c
+47 −0 format-check.py
+0 −24 format-check.sh
+9 −0 include/aws/mqtt/mqtt.h
+54 −0 include/aws/mqtt/private/client_impl.h
+24 −0 include/aws/mqtt/private/client_impl_shared.h
+204 −0 include/aws/mqtt/private/mqtt311_listener.h
+220 −0 include/aws/mqtt/private/request-response/protocol_adapter.h
+23 −0 include/aws/mqtt/private/request-response/request_response_client.h
+264 −0 include/aws/mqtt/private/request-response/subscription_manager.h
+2 −0 include/aws/mqtt/private/shared.h
+274 −0 include/aws/mqtt/request-response/request_response_client.h
+3 −0 include/aws/mqtt/v5/mqtt5_client.h
+176 −58 source/client.c
+6 −0 source/client_channel_handler.c
+9 −0 source/client_impl_shared.c
+25 −0 source/mqtt.c
+329 −0 source/mqtt311_listener.c
+2 −3 source/packets.c
+964 −0 source/request-response/protocol_adapter.c
+2,276 −0 source/request-response/request_response_client.c
+822 −0 source/request-response/subscription_manager.c
+1 −1 source/shared.c
+1 −1 source/v5/mqtt5_client.c
+1 −0 source/v5/mqtt5_listener.c
+14 −0 source/v5/mqtt5_to_mqtt3_adapter.c
+135 −3 tests/CMakeLists.txt
+1,784 −0 tests/request-response/protocol_adapter_tests.c
+3,151 −0 tests/request-response/request_response_client_tests.c
+2,877 −0 tests/request-response/subscription_manager_tests.c
+739 −998 tests/v3/connection_state_test.c
+488 −0 tests/v3/mqtt311_listener_test.c
+582 −0 tests/v3/mqtt311_testing_utils.c
+155 −0 tests/v3/mqtt311_testing_utils.h
+51 −2 tests/v3/mqtt_mock_server_handler.c
+18 −0 tests/v3/mqtt_mock_server_handler.h
+22 −42 tests/v5/mqtt5_client_tests.c
+26 −0 tests/v5/mqtt5_testing_utils.c
+20 −0 tests/v5/mqtt5_testing_utils.h
+0 −4 tests/v5/mqtt5_to_mqtt3_adapter_tests.c
+2 −1 tests/v5/mqtt5_topic_alias_tests.c
2 changes: 2 additions & 0 deletions include/aws/crt/mqtt/Mqtt5Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ namespace Aws

virtual ~Mqtt5Client();

struct aws_mqtt5_client *GetUnderlyingHandle() const noexcept;

private:
Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator = ApiAllocator()) noexcept;

Expand Down
2 changes: 2 additions & 0 deletions include/aws/crt/mqtt/private/Mqtt5ClientCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ namespace Aws

virtual ~Mqtt5ClientCore();

struct aws_mqtt5_client *GetUnderlyingHandle() const noexcept { return m_client; }

private:
Mqtt5ClientCore(const Mqtt5ClientOptions &options, Allocator *allocator = ApiAllocator()) noexcept;

Expand Down
277 changes: 277 additions & 0 deletions include/aws/iot/MqttRequestResponseClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
#pragma once
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/crt/Exports.h>

#include <aws/crt/Allocator.h>
#include <aws/crt/Optional.h>
#include <aws/crt/Types.h>
#include <aws/crt/Variant.h>

#include <aws/mqtt/request-response/request_response_client.h>

#include <functional>

namespace Aws {

namespace Crt {
namespace Mqtt {
class MqttConnection;
}

namespace Mqtt5 {
class Mqtt5Client;
}
}

namespace Iot {
namespace RequestResponse
{

class MqttRequestResponseClientImpl;

/**
* The type of change to the state of a streaming operation subscription
*/
enum class SubscriptionStatusEventType
{

/**
* The streaming operation is successfully subscribed to its topic (filter)
*/
SubscriptionEstablished = ARRSSET_SUBSCRIPTION_ESTABLISHED,

/**
* The streaming operation has temporarily lost its subscription to its topic (filter)
*/
SubscriptionLost = ARRSSET_SUBSCRIPTION_LOST,

/**
* The streaming operation has entered a terminal state where it has given up trying to subscribe
* to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy).
*/
SubscriptionHalted = ARRSSET_SUBSCRIPTION_HALTED,
};

/**
* An event that describes a change in subscription status for a streaming operation.
*/
struct AWS_CRT_CPP_API SubscriptionStatusEvent
{
SubscriptionStatusEventType type;
int errorCode;
};

using SubscriptionStatusEventHandler = std::function<void(SubscriptionStatusEvent &&)>;

struct AWS_CRT_CPP_API IncomingPublishEvent {
Aws::Crt::Vector<uint8_t> payload;
};

using IncomingPublishEventHandler = std::function<void(IncomingPublishEvent &&)>;

/**
* A response path is a pair of values - MQTT topic and a JSON path - that describe how a response to
* an MQTT-based request may arrive. For a given request type, there may be multiple response paths and each
* one is associated with a separate JSON schema for the response body.
*/
struct AWS_CRT_CPP_API ResponsePath {

/**
* MQTT topic that a response may arrive on.
*/
Aws::Crt::String topic;

/**
* JSON path for finding correlation tokens within payloads that arrive on this path's topic.
*/
Aws::Crt::Optional<Aws::Crt::String> correlationTokenJsonPath;
};

/**
* Configuration options for an MQTT-based request-response operation.
*/
struct AWS_CRT_CPP_API RequestResponseOperationOptions {

/**
* Set of topic filters that should be subscribed to in order to cover all possible response paths. Sometimes
* using wildcards can cut down on the subscriptions needed; other times that isn't valid.
*/
Aws::Crt::Vector<Aws::Crt::String> subscriptionTopicFilters;

/**
* Set of all possible response paths associated with this request type.
*/
Aws::Crt::Vector<ResponsePath> responsePaths;

/**
* Topic to publish the request to once response subscriptions have been established.
*/
Aws::Crt::String publishTopic;

/**
* Payload to publish to 'publishTopic' in order to initiate the request
*/
Aws::Crt::Vector<uint8_t> payload;

/**
* Correlation token embedded in the request that must be found in a response message. This can be null
* to support certain services which don't use correlation tokens. In that case, the client
* only allows one token-less request at a time.
*/
Aws::Crt::Optional<Aws::Crt::String> correlationToken;
};

/**
* Configuration options for an MQTT-based streaming operation.
*/
struct AWS_CRT_CPP_API StreamingOperationOptions {

/**
* Topic filter that the streaming operation should listen on
*/
Aws::Crt::String subscriptionTopicFilter;
};

/**
* Encapsulates a response to an AWS IoT Core MQTT-based service request
*/
struct AWS_CRT_CPP_API Response {

/**
* MQTT Topic that the response was received on. Different topics map to different types within the
* service model, so we need this value in order to know what to deserialize the payload into.
*/
struct aws_byte_cursor topic;

/**
* Payload of the response that correlates to a submitted request.
*/
struct aws_byte_cursor payload;
};

template <typename R, typename E> struct Result {
public:

Result() = delete;
Result(const Result &result) = default;
Result(Result &&result) = default;

Result(const R &response) :
rawResult(response)
{}

Result(R &&response) :
rawResult(std::move(response))
{}

Result(const E &error) :
rawResult(error)
{}

Result(E &&error) :
rawResult(std::move(error))
{}

~Result() = default;

Result &operator=(const Result &result) = default;
Result &operator=(Result &&result) = default;

Result &operator=(const R &response) {
this->rawResult = response;

return *this;
}

Result &operator=(R &&response) {
this->rawResult = std::move(response);

return *this;
}

Result &operator=(const E &error) {
this->rawResult = error;
}

Result &operator=(E &&error) {
this->rawResult = std::move(error);

return *this;
}

bool isSuccess() const {
return rawResult.holds_alternative<R>();
}

const R &getResponse() const {
AWS_FATAL_ASSERT(isSuccess());

return rawResult.get<Response>();
}

const E &getError() const {
AWS_FATAL_ASSERT(!isSuccess());

return rawResult.get<int>();
}

private:

Aws::Crt::Variant<R, E> rawResult;
};

using UnmodeledResult = Result<Response, int>;

using UnmodeledResultHandler = std::function<void(UnmodeledResult &&)>;

/**
* MQTT-based request-response client configuration options
*/
struct AWS_CRT_CPP_API RequestResponseClientOptions {

/**
* Maximum number of subscriptions that the client will concurrently use for request-response operations
*/
uint32_t maxRequestResponseSubscriptions;

/**
* Maximum number of subscriptions that the client will concurrently use for streaming operations
*/
uint32_t maxStreamingSubscriptions;

/**
* Duration, in seconds, that a request-response operation will wait for completion before giving up
*/
uint32_t operationTimeoutInSeconds;
};



class AWS_CRT_CPP_API MqttRequestResponseClient {
public:

virtual ~MqttRequestResponseClient();

static MqttRequestResponseClient *newFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator());

static MqttRequestResponseClient *newFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator());

int submitRequest(const RequestResponseOperationOptions &requestOptions, UnmodeledResultHandler &&resultHandler);

private:

MqttRequestResponseClient(Aws::Crt::Allocator *allocator, std::shared_ptr<MqttRequestResponseClientImpl> impl);

Aws::Crt::Allocator *m_allocator;

std::shared_ptr<MqttRequestResponseClientImpl> m_impl;
};


} // RequestResponse
} // Iot
} // Aws
60 changes: 60 additions & 0 deletions include/aws/iot/private/MqttRequestResponseClientImpl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#pragma once
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/crt/Exports.h>

#include <aws/common/linked_list.h>
#include <aws/common/rw_lock.h>
#include <aws/iot/MqttRequestResponseClient.h>

struct aws_allocator;
struct aws_mqtt_request_response_client;

namespace Aws
{
namespace Iot
{
namespace RequestResponse
{

struct IncompleteRequest;

class AWS_CRT_CPP_API MqttRequestResponseClientImpl
{
public:
MqttRequestResponseClientImpl(Aws::Crt::Allocator *allocator) noexcept;
~MqttRequestResponseClientImpl();

void seatClient(struct aws_mqtt_request_response_client *client) noexcept;

void close() noexcept;

int submitRequest(
const RequestResponseOperationOptions &requestOptions,
IncompleteRequest *incompleteRequest) noexcept;

void onRequestCompletion(struct IncompleteRequest *incompleteRequest, const struct aws_byte_cursor *response_topic,
const struct aws_byte_cursor *payload,
int error_code);

private:

Aws::Crt::Allocator *m_allocator;

struct aws_event_loop *protocolClientLoop;

struct aws_rw_lock m_lock;

struct aws_mqtt_request_response_client *m_client;

struct aws_linked_list m_incompleteRequests;

bool m_closed;
};

}
}
}
Loading

0 comments on commit e73f8c1

Please sign in to comment.