Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Sep 16, 2024
1 parent d4b438a commit 5f2268c
Show file tree
Hide file tree
Showing 7 changed files with 652 additions and 421 deletions.
2 changes: 1 addition & 1 deletion include/aws/crt/mqtt/MqttConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ namespace Aws
bool Disconnect() noexcept;

/// @private
aws_mqtt_client_connection *GetUnderlyingConnection() noexcept;
aws_mqtt_client_connection *GetUnderlyingConnection() const noexcept;

/**
* Subscribes to topicFilter. OnMessageReceivedHandler will be invoked from an event-loop
Expand Down
2 changes: 1 addition & 1 deletion include/aws/crt/mqtt/private/MqttConnectionCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ namespace Aws
bool Disconnect() noexcept;

/// @private
aws_mqtt_client_connection *GetUnderlyingConnection() noexcept;
aws_mqtt_client_connection *GetUnderlyingConnection() const noexcept;

/**
* @internal
Expand Down
320 changes: 160 additions & 160 deletions include/aws/iot/MqttRequestResponseClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,217 +15,217 @@

#include <functional>

namespace Aws {

namespace Crt {
namespace Mqtt {
class MqttConnection;
}

namespace Mqtt5 {
class Mqtt5Client;
}
}

namespace Iot {
namespace RequestResponse
namespace Aws
{

class MqttRequestResponseClientImpl;

/**
* The type of change to the state of a streaming operation subscription
*/
enum class SubscriptionStatusEventType
{

/**
* The streaming operation is successfully subscribed to its topic (filter)
*/
SubscriptionEstablished = ARRSSET_SUBSCRIPTION_ESTABLISHED,

/**
* The streaming operation has temporarily lost its subscription to its topic (filter)
*/
SubscriptionLost = ARRSSET_SUBSCRIPTION_LOST,

/**
* The streaming operation has entered a terminal state where it has given up trying to subscribe
* to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy).
*/
SubscriptionHalted = ARRSSET_SUBSCRIPTION_HALTED,
};

/**
* An event that describes a change in subscription status for a streaming operation.
*/
struct AWS_CRT_CPP_API SubscriptionStatusEvent
namespace Crt
{
SubscriptionStatusEventType type;
int errorCode;
};

using SubscriptionStatusEventHandler = std::function<void(SubscriptionStatusEvent &&)>;

struct AWS_CRT_CPP_API IncomingPublishEvent {
Aws::Crt::ByteCursor payload;
};

using IncomingPublishEventHandler = std::function<void(IncomingPublishEvent &&)>;
namespace Mqtt
{
class MqttConnection;
}

/**
* Encapsulates a response to an AWS IoT Core MQTT-based service request
*/
struct AWS_CRT_CPP_API Response {
namespace Mqtt5
{
class Mqtt5Client;
}
} // namespace Crt

/**
* MQTT Topic that the response was received on. Different topics map to different types within the
* service model, so we need this value in order to know what to deserialize the payload into.
*/
Aws::Crt::ByteCursor topic;
namespace Iot
{
namespace RequestResponse
{

/**
* Payload of the response that correlates to a submitted request.
*/
Aws::Crt::ByteCursor payload;
};
class MqttRequestResponseClientImpl;

template <typename R, typename E> struct Result {
public:
/**
* The type of change to the state of a streaming operation subscription
*/
enum class SubscriptionStatusEventType
{

Result() = delete;
Result(const Result &result) = default;
Result(Result &&result) = default;
/**
* The streaming operation is successfully subscribed to its topic (filter)
*/
SubscriptionEstablished = ARRSSET_SUBSCRIPTION_ESTABLISHED,

Result(const R &response) :
rawResult(response)
{}
/**
* The streaming operation has temporarily lost its subscription to its topic (filter)
*/
SubscriptionLost = ARRSSET_SUBSCRIPTION_LOST,

Result(R &&response) :
rawResult(std::move(response))
{}
/**
* The streaming operation has entered a terminal state where it has given up trying to subscribe
* to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission
* policy).
*/
SubscriptionHalted = ARRSSET_SUBSCRIPTION_HALTED,
};

Result(const E &error) :
rawResult(error)
{}
/**
* An event that describes a change in subscription status for a streaming operation.
*/
struct AWS_CRT_CPP_API SubscriptionStatusEvent
{
SubscriptionStatusEventType type;
int errorCode;
};

Result(E &&error) :
rawResult(std::move(error))
{}
using SubscriptionStatusEventHandler = std::function<void(SubscriptionStatusEvent &&)>;

~Result() = default;
struct AWS_CRT_CPP_API IncomingPublishEvent
{
Aws::Crt::ByteCursor payload;
};

Result &operator=(const Result &result) = default;
Result &operator=(Result &&result) = default;
using IncomingPublishEventHandler = std::function<void(IncomingPublishEvent &&)>;

Result &operator=(const R &response) {
this->rawResult = response;
/**
* Encapsulates a response to an AWS IoT Core MQTT-based service request
*/
struct AWS_CRT_CPP_API UnmodeledResponse
{

return *this;
}
/**
* MQTT Topic that the response was received on. Different topics map to different types within the
* service model, so we need this value in order to know what to deserialize the payload into.
*/
Aws::Crt::ByteCursor topic;

Result &operator=(R &&response) {
this->rawResult = std::move(response);
/**
* Payload of the response that correlates to a submitted request.
*/
Aws::Crt::ByteCursor payload;
};

return *this;
}
template <typename R, typename E> struct Result
{
public:
Result() = delete;
Result(const Result &result) = default;
Result(Result &&result) = default;

Result &operator=(const E &error) {
this->rawResult = error;
}
Result(const R &response) : rawResult(response) {}

Result &operator=(E &&error) {
this->rawResult = std::move(error);
Result(R &&response) : rawResult(std::move(response)) {}

return *this;
}
Result(const E &error) : rawResult(error) {}

bool isSuccess() const {
return rawResult.holds_alternative<R>();
}
Result(E &&error) : rawResult(std::move(error)) {}

const R &getResponse() const {
AWS_FATAL_ASSERT(isSuccess());
~Result() = default;

return rawResult.get<Response>();
}
Result &operator=(const Result &result) = default;
Result &operator=(Result &&result) = default;

const E &getError() const {
AWS_FATAL_ASSERT(!isSuccess());
Result &operator=(const R &response)
{
this->rawResult = response;

return rawResult.get<int>();
}
return *this;
}

private:
Result &operator=(R &&response)
{
this->rawResult = std::move(response);

Aws::Crt::Variant<R, E> rawResult;
};
return *this;
}

using UnmodeledResult = Result<Response, int>;
Result &operator=(const E &error) { this->rawResult = error; }

using UnmodeledResultHandler = std::function<void(UnmodeledResult &&)>;
Result &operator=(E &&error)
{
this->rawResult = std::move(error);

struct AWS_CRT_CPP_API StreamingOperationOptions {
Aws::Crt::ByteCursor subscriptionTopicFilter;
return *this;
}

SubscriptionStatusEventHandler subscriptionStatusEventHandler;
bool isSuccess() const { return rawResult.template holds_alternative<R>(); }

IncomingPublishEventHandler incomingPublishEventHandler;
};
const R &getResponse() const
{
AWS_FATAL_ASSERT(isSuccess());

class AWS_CRT_CPP_API IStreamingOperation {
public:
return rawResult.template get<R>();
}

virtual ~IStreamingOperation() = 0;
const E &getError() const
{
AWS_FATAL_ASSERT(!isSuccess());

};
return rawResult.template get<E>();
}

/**
* MQTT-based request-response client configuration options
*/
struct AWS_CRT_CPP_API RequestResponseClientOptions {
private:
Aws::Crt::Variant<R, E> rawResult;
};

/**
* Maximum number of subscriptions that the client will concurrently use for request-response operations
*/
uint32_t maxRequestResponseSubscriptions;
using UnmodeledResult = Result<UnmodeledResponse, int>;

/**
* Maximum number of subscriptions that the client will concurrently use for streaming operations
*/
uint32_t maxStreamingSubscriptions;
using UnmodeledResultHandler = std::function<void(UnmodeledResult &&)>;

/**
* Duration, in seconds, that a request-response operation will wait for completion before giving up
*/
uint32_t operationTimeoutInSeconds;
};
struct AWS_CRT_CPP_API StreamingOperationOptions
{
Aws::Crt::ByteCursor subscriptionTopicFilter;

SubscriptionStatusEventHandler subscriptionStatusEventHandler;

IncomingPublishEventHandler incomingPublishEventHandler;
};

class AWS_CRT_CPP_API MqttRequestResponseClient {
public:
class AWS_CRT_CPP_API IStreamingOperation
{
public:
virtual ~IStreamingOperation() = 0;

virtual ~MqttRequestResponseClient();
virtual void activate() = 0;
};

static MqttRequestResponseClient *newFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator());
/**
* MQTT-based request-response client configuration options
*/
struct AWS_CRT_CPP_API RequestResponseClientOptions
{

static MqttRequestResponseClient *newFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, RequestResponseClientOptions &&options, Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator());
/**
* Maximum number of subscriptions that the client will concurrently use for request-response operations
*/
uint32_t maxRequestResponseSubscriptions;

int submitRequest(const aws_mqtt_request_operation_options &requestOptions, UnmodeledResultHandler &&resultHandler);
/**
* Maximum number of subscriptions that the client will concurrently use for streaming operations
*/
uint32_t maxStreamingSubscriptions;

std::shared_ptr<IStreamingOperation> createStream(StreamingOperationOptions &&options);
/**
* Duration, in seconds, that a request-response operation will wait for completion before giving up
*/
uint32_t operationTimeoutInSeconds;
};

private:
class AWS_CRT_CPP_API IMqttRequestResponseClient
{
public:
virtual ~IMqttRequestResponseClient() = 0;

MqttRequestResponseClient(Aws::Crt::Allocator *allocator, MqttRequestResponseClientImpl *impl);
virtual int submitRequest(
const aws_mqtt_request_operation_options &requestOptions,
UnmodeledResultHandler &&resultHandler) = 0;

Aws::Crt::Allocator *m_allocator;
virtual std::shared_ptr<IStreamingOperation> createStream(StreamingOperationOptions &&options) = 0;

MqttRequestResponseClientImpl *m_impl;
};
static IMqttRequestResponseClient *newFrom5(
const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient,
RequestResponseClientOptions &&options,
Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator());

static IMqttRequestResponseClient *newFrom311(
const Aws::Crt::Mqtt::MqttConnection &protocolClient,
RequestResponseClientOptions &&options,
Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator());
};

} // RequestResponse
} // Iot
} // Aws
} // namespace RequestResponse
} // namespace Iot
} // namespace Aws
Loading

0 comments on commit 5f2268c

Please sign in to comment.