Skip to content

Commit

Permalink
[#3585] Add acknowledgement required command feature for Google Pub/S…
Browse files Browse the repository at this point in the history
…ub based commands

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Feb 2, 2024
1 parent ddebeae commit aabafce
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -59,6 +59,7 @@
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.AbstractCommandContext;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
Expand Down Expand Up @@ -1734,8 +1735,19 @@ private void afterCommandPublished(
reportPublishedCommand(tenantObject, subscription, commandContext, ProcessingOutcome.FORWARDED);
log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]",
msgId, subscription.getTenant(), subscription.getDeviceId(), endpoint.clientIdentifier());
commandContext.getTracingSpan().log("received PUBACK from device");
commandContext.accept();
final Span span = commandContext.getTracingSpan();
span.log("received PUBACK from device");
final Command command = commandContext.getCommand();
if (command.isAckRequired() && command.isValid()
&& commandContext instanceof AbstractCommandContext<?> abstractCommandContext) {
abstractCommandContext
.sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED,
"Command successfully received", span, command.getCorrelationId(),
command.getMessagingType())
.onComplete(v -> commandContext.accept());
} else {
commandContext.accept();
}
};

final Handler<Void> onAckTimeoutHandler = v -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -193,6 +193,11 @@ public boolean isOneWay() {
return replyToId == null;
}

@Override
public boolean isAckRequired() {
return false;
}

@Override
public boolean isValid() {
return !validationError.isPresent();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2021, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -213,6 +213,11 @@ public boolean isOneWay() {
return !responseRequired;
}

@Override
public boolean isAckRequired() {
return false;
}

@Override
public boolean isValid() {
return !validationError.isPresent();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -47,6 +47,7 @@ public final class PubSubBasedCommand implements Command {
private final String contentType;
private final String requestId;
private final boolean responseRequired;
private final boolean ackRequired;

private String gatewayId;

Expand All @@ -58,7 +59,8 @@ private PubSubBasedCommand(
final String correlationId,
final String subject,
final String contentType,
final boolean responseRequired) {
final boolean responseRequired,
final boolean ackRequired) {

this.validationError = validationError;
this.pubsubMessage = pubsubMessage;
Expand All @@ -68,6 +70,7 @@ private PubSubBasedCommand(
this.subject = subject;
this.contentType = contentType;
this.responseRequired = responseRequired;
this.ackRequired = ackRequired;
this.requestId = Commands.encodeRequestIdParameters(correlationId, MessagingType.pubsub);
}

Expand Down Expand Up @@ -134,10 +137,11 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage,

final StringJoiner validationErrorJoiner = new StringJoiner(", ");
final boolean responseRequired = PubSubMessageHelper.isResponseRequired(attributes);
final boolean ackRequired = PubSubMessageHelper.isAckRequired(attributes);
final String correlationId = PubSubMessageHelper.getCorrelationId(attributes)
.filter(id -> !id.isEmpty())
.orElseGet(() -> {
if (responseRequired) {
if (responseRequired || ackRequired) {
validationErrorJoiner.add("correlation-id is not set");
}
return null;
Expand All @@ -157,7 +161,8 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage,
correlationId,
subject,
contentType,
responseRequired);
responseRequired,
ackRequired);
}

/**
Expand All @@ -169,6 +174,11 @@ public PubsubMessage getPubsubMessage() {
return pubsubMessage;
}

@Override
public boolean isAckRequired() {
return !responseRequired && ackRequired;
}

@Override
public boolean isOneWay() {
return !responseRequired;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -14,12 +14,14 @@

import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.MapBasedExecutionContext;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;
Expand All @@ -36,7 +38,8 @@
* in a Pub/Sub and Kafka based command context.
* @param <T> The type of Command.
*/
public abstract class AbstractCommandContext<T extends Command> extends MapBasedExecutionContext implements CommandContext {
public abstract class AbstractCommandContext<T extends Command> extends MapBasedExecutionContext
implements CommandContext {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommandContext.class);

Expand Down Expand Up @@ -157,24 +160,73 @@ protected Future<Void> sendDeliveryFailureCommandResponseMessage(
commandResponse.setAdditionalProperties(
Collections.unmodifiableMap(command.getDeliveryFailureNotificationProperties()));

return commandResponseSender.sendCommandResponse(
// try to retrieve tenant configuration from context
Optional.ofNullable(get(KEY_TENANT_CONFIG))
.filter(TenantObject.class::isInstance)
.map(TenantObject.class::cast)
// and fall back to default configuration
.orElseGet(() -> TenantObject.from(command.getTenant())),
new RegistrationAssertion(command.getDeviceId()),
commandResponse,
span.context())
.onFailure(thr -> {
LOG.debug("failed to publish command response [{}]", commandResponse, thr);
TracingHelper.logError(span, "failed to publish command response message", thr);
})
return sendCommandResponse(commandResponse, span)
.onSuccess(v -> {
LOG.debug("published error command response [{}, cause: {}]", commandResponse,
cause != null ? cause.getMessage() : error);
span.log("published error command response");
});
}

/**
* Sends a command response as a command acknowledgement.
*
* @param status The HTTP status code indicating the outcome of processing the command.
* @param successMessage The message for the response message body.
* @param span The active OpenTracing span to use for tracking this operation.
* @param correlationId The correlation ID of the command that this is the response for.
* @param messagingType The type of the messaging system via which the command message was received.
* @return A future indicating the outcome of the operation.
* <p>
* The future will be succeeded if the command response has been sent.
* <p>
* The future will be failed if the command response could not be sent.
*/
public Future<Void> sendDeliverySuccessCommandResponseMessage(
final int status,
final String successMessage,
final Span span,
final String correlationId,
final MessagingType messagingType) {
if (correlationId == null) {
TracingHelper.logError(span, "can't send command response message - no correlation id set");
return Future.failedFuture("missing correlation id");
}
final JsonObject payloadJson = new JsonObject();
payloadJson.put("acknowledgement", successMessage != null ? successMessage : "");

final CommandResponse commandResponse = new CommandResponse(
command.getTenant(),
command.getDeviceId(),
payloadJson.toBuffer(),
CommandConstants.CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION,
status,
correlationId,
"",
messagingType);
commandResponse.setAdditionalProperties(Map.of(MessageHelper.SYS_PROPERTY_SUBJECT, command.getName()));

return sendCommandResponse(commandResponse, span)
.onSuccess(v -> {
LOG.debug("published ack command response [{}]", commandResponse);
span.log("published ack command response");
});
}

private Future<Void> sendCommandResponse(final CommandResponse commandResponse, final Span span) {
return commandResponseSender.sendCommandResponse(
// try to retrieve tenant configuration from context
Optional.ofNullable(get(KEY_TENANT_CONFIG))
.filter(TenantObject.class::isInstance)
.map(TenantObject.class::cast)
// and fall back to default configuration
.orElseGet(() -> TenantObject.from(command.getTenant())),
new RegistrationAssertion(command.getDeviceId()),
commandResponse,
span.context())
.onFailure(thr -> {
LOG.debug("failed to publish command response [{}]", commandResponse, thr);
TracingHelper.logError(span, "failed to publish command response message", thr);
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -35,6 +35,13 @@ public interface Command {
*/
boolean isOneWay();

/**
* Checks if an acknowledgement of this command should be sent to the messaging infrastructure.
*
* @return {@code true} if an acknowledgement is required.
*/
boolean isAckRequired();

/**
* Checks if this command contains all required information.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -43,6 +43,10 @@ public final class PubSubMessageHelper {
* The name of the Pub/Sub message property indicating whether a response to the message is expected/required.
*/
public static final String PUBSUB_PROPERTY_RESPONSE_REQUIRED = "response-required";
/**
* The name of the Pub/Sub message property indicating whether an acknowledgement to the message is expected/required.
*/
public static final String PUBSUB_PROPERTY_ACK_REQUIRED = "ack-required";

/**
* Prefix to use in the Pub/Sub message properties for marking properties of command messages that should be
Expand Down Expand Up @@ -202,6 +206,17 @@ public static boolean isResponseRequired(final Map<String, String> attributesMap
.parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_RESPONSE_REQUIRED).orElse("false"));
}

/**
* Gets the value of the {@value PUBSUB_PROPERTY_ACK_REQUIRED} attribute.
*
* @param attributesMap The attributes map to get the value from.
* @return The attributes value.
*/
public static boolean isAckRequired(final Map<String, String> attributesMap) {
return Boolean
.parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_ACK_REQUIRED).orElse("false"));
}

/**
* Gets the value of the {@value MessageHelper#SYS_PROPERTY_CONTENT_TYPE} attribute.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -76,6 +76,11 @@ public class CommandConstants {
*/
public static final String COMMAND_RESPONSE_RESPONSE_PART_SHORT = "s";

/**
* The content type that is defined for acknowledgement command response messages.
*/
public static final String CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION = "application/vnd.eclipse-hono-delivery-success-notification+json";

/**
* The content type that is defined for error command response messages sent by a protocol adapter or Command Router.
*/
Expand Down
21 changes: 14 additions & 7 deletions site/documentation/content/api/command-and-control-pubsub/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,23 @@ project and `${tenant_id}` is the ID of the tenant that the client wants to send
Metadata MUST be set as Pub/Sub attributes on a message. The following table provides an overview of the attributes the
*Business Application* needs to set on a one-way command message.

| Name | Mandatory | Type | Description |
|:---------------|:---------:|:---------|:--------------------------------------------------------------|
| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. |
| *subject* | yes | *string* | The name of the command to be executed by the device. |
| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. |
| Name | Mandatory | Type | Description |
|:-----------------|:---------:|:----------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. |
| *subject* | yes | *string* | The name of the command to be executed by the device. |
| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. |
| *ack-required* | no | *boolean* | If set to `true` a command acknowledgement message will be sent on the command-response topic once the device acknowledges the command. Currently this only works with MQTT devices which have a QoS 1 subscription on the command topic. |
| *correlation-id* | no | *string* | MUST be set if *ack-required* is set to `true`. The identifier used to correlate a response message to the original request. It is used as the *correlation-id* attribute in the response. |

The command message MAY contain arbitrary payload, set as message value, to be sent to the device. The value of the
message's *subject* attribute may provide a hint to the device regarding the format, encoding and semantics of the
payload
data.
payload data.

{{% notice info %}}
Currently the acknowledgement mechanism only works with devices connected via the MQTT protocol which have a
subscription on the command topic with a QoS level of 1. Getting an acknowledgement indicates that the device has
successfully received the command. However, it does not confirm whether the device has successfully processed the command.
{{% /notice %}}

## Send a (Request/Response) Command

Expand Down

0 comments on commit aabafce

Please sign in to comment.