Skip to content

Commit

Permalink
Stream creation API tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Sep 18, 2024
1 parent 0e53bfa commit 3d55010
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
9 changes: 8 additions & 1 deletion include/aws/iot/MqttRequestResponseClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ namespace Aws
Aws::Crt::ByteCursor subscriptionTopicFilter;

SubscriptionStatusEventHandler subscriptionStatusEventHandler;
};

struct AWS_CRT_CPP_API StreamingOperationOptionsInternal
{
Aws::Crt::ByteCursor subscriptionTopicFilter;

SubscriptionStatusEventHandler subscriptionStatusEventHandler;

IncomingPublishEventHandler incomingPublishEventHandler;
};
Expand Down Expand Up @@ -213,7 +220,7 @@ namespace Aws
const aws_mqtt_request_operation_options &requestOptions,
UnmodeledResultHandler &&resultHandler) = 0;

virtual std::shared_ptr<IStreamingOperation> createStream(const StreamingOperationOptions &options) = 0;
virtual std::shared_ptr<IStreamingOperation> createStream(const StreamingOperationOptionsInternal &options) = 0;

static IMqttRequestResponseClient *newFrom5(
const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient,
Expand Down
22 changes: 11 additions & 11 deletions source/iot/MqttRequestResponseClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace Aws
public:
StreamingOperationImpl(
struct aws_mqtt_rr_client_operation *stream,
const StreamingOperationOptions &options,
const StreamingOperationOptionsInternal &options,
struct aws_event_loop *protocolLoop);
virtual ~StreamingOperationImpl();

Expand All @@ -93,7 +93,7 @@ namespace Aws
static void onTerminatedCallback(void *user_data);

private:
StreamingOperationOptions m_config;
StreamingOperationOptionsInternal m_config;

struct aws_mqtt_rr_client_operation *m_stream;

Expand All @@ -113,7 +113,7 @@ namespace Aws

StreamingOperationImpl::StreamingOperationImpl(
struct aws_mqtt_rr_client_operation *stream,
const StreamingOperationOptions &options,
const StreamingOperationOptionsInternal &options,
struct aws_event_loop *protocolLoop)
: m_config(options), m_stream(stream), m_protocolLoop(protocolLoop), m_lock(),
m_closed(false)
Expand Down Expand Up @@ -152,11 +152,11 @@ namespace Aws
{
m_closed = true;
toRelease = m_stream;
m_stream = NULL;
m_stream = nullptr;
}
}

if (toRelease)
if (nullptr != toRelease)
{
aws_mqtt_rr_client_operation_release(toRelease);
}
Expand Down Expand Up @@ -217,7 +217,7 @@ namespace Aws
public:
static std::shared_ptr<IStreamingOperation> create(
Aws::Crt::Allocator *allocator,
const StreamingOperationOptions &options,
const StreamingOperationOptionsInternal &options,
struct aws_mqtt_request_response_client *client);

StreamingOperation(const std::shared_ptr<StreamingOperationImpl> &impl);
Expand All @@ -235,7 +235,7 @@ namespace Aws

std::shared_ptr<IStreamingOperation> StreamingOperation::create(
Aws::Crt::Allocator *allocator,
const StreamingOperationOptions &options,
const StreamingOperationOptionsInternal &options,
struct aws_mqtt_request_response_client *client)
{
StreamingOperationImplHandle *implHandle = Aws::Crt::New<StreamingOperationImplHandle>(allocator);
Expand Down Expand Up @@ -334,7 +334,7 @@ namespace Aws
const aws_mqtt_request_operation_options &requestOptions,
UnmodeledResultHandler &&resultHandler) noexcept;

std::shared_ptr<IStreamingOperation> createStream(const StreamingOperationOptions &options);
std::shared_ptr<IStreamingOperation> createStream(const StreamingOperationOptionsInternal &options);

Aws::Crt::Allocator *getAllocator() const { return m_allocator; }

Expand Down Expand Up @@ -387,7 +387,7 @@ namespace Aws
}

std::shared_ptr<IStreamingOperation> MqttRequestResponseClientImpl::createStream(
const StreamingOperationOptions &options)
const StreamingOperationOptionsInternal &options)
{
return StreamingOperation::create(m_allocator, options, m_client);
}
Expand All @@ -411,7 +411,7 @@ namespace Aws
const aws_mqtt_request_operation_options &requestOptions,
UnmodeledResultHandler &&resultHandler);

std::shared_ptr<IStreamingOperation> createStream(const StreamingOperationOptions &options);
std::shared_ptr<IStreamingOperation> createStream(const StreamingOperationOptionsInternal &options);

private:
MqttRequestResponseClientImpl *m_impl;
Expand Down Expand Up @@ -483,7 +483,7 @@ namespace Aws
}

std::shared_ptr<IStreamingOperation> MqttRequestResponseClient::createStream(
const StreamingOperationOptions &options)
const StreamingOperationOptionsInternal &options)
{
return m_impl->createStream(options);
}
Expand Down

0 comments on commit 3d55010

Please sign in to comment.