Skip to content

Commit

Permalink
Use token to correlate "requests" and "responses". (#210)
Browse files Browse the repository at this point in the history
**Issue:** MQTT is not a request/response protocol.  Messages for the Shadow service have a "client token" field to help users correlate "request" messages to "response" messages, but this sample script wasn't using the token field. This led to confusion when running Device Advisor tests which send several `shadow/update` "request" messages during setup. The sample was receiving the `shadow/update/accepted` "response" messages unexpectedly leading to crashes.

**Changes:** Set token on all "request" messages. Ignore any "response" message with an unexpected token.
  • Loading branch information
graebm authored Jun 18, 2021
1 parent 8b60066 commit 4fe1db0
Showing 1 changed file with 87 additions and 27 deletions.
114 changes: 87 additions & 27 deletions samples/shadow.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self):
self.lock = threading.Lock()
self.shadow_value = None
self.disconnect_called = False
self.request_tokens = set()

locked_data = LockedData()

Expand Down Expand Up @@ -95,9 +96,15 @@ def on_disconnected(disconnect_future):
def on_get_shadow_accepted(response):
# type: (iotshadow.GetShadowResponse) -> None
try:
print("Finished getting initial shadow state.")

with locked_data.lock:
# check that this is a response to a request from this session
try:
locked_data.request_tokens.remove(response.client_token)
except KeyError:
print("Ignoring get_shadow_accepted message due to unexpected token.")
return

print("Finished getting initial shadow state.")
if locked_data.shadow_value is not None:
print(" Ignoring initial query because a delta event has already been received.")
return
Expand Down Expand Up @@ -126,12 +133,24 @@ def on_get_shadow_accepted(response):

def on_get_shadow_rejected(error):
# type: (iotshadow.ErrorResponse) -> None
if error.code == 404:
print("Thing has no shadow document. Creating with defaults...")
change_shadow_value(SHADOW_VALUE_DEFAULT)
else:
exit("Get request was rejected. code:{} message:'{}'".format(
error.code, error.message))
try:
# check that this is a response to a request from this session
with locked_data.lock:
try:
locked_data.request_tokens.remove(error.client_token)
except KeyError:
print("Ignoring get_shadow_rejected message due to unexpected token.")
return

if error.code == 404:
print("Thing has no shadow document. Creating with defaults...")
change_shadow_value(SHADOW_VALUE_DEFAULT)
else:
exit("Get request was rejected. code:{} message:'{}'".format(
error.code, error.message))

except Exception as e:
exit(e)

def on_shadow_delta_updated(delta):
# type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
Expand Down Expand Up @@ -164,15 +183,39 @@ def on_publish_update_shadow(future):
def on_update_shadow_accepted(response):
# type: (iotshadow.UpdateShadowResponse) -> None
try:
print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore
print("Enter desired value: ") # remind user they can input new values
except:
exit("Updated shadow is missing the target property.")
# check that this is a response to a request from this session
with locked_data.lock:
try:
locked_data.request_tokens.remove(response.client_token)
except KeyError:
print("Ignoring update_shadow_accepted message due to unexpected token.")
return

try:
print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore
print("Enter desired value: ") # remind user they can input new values
except:
exit("Updated shadow is missing the target property.")

except Exception as e:
exit(e)

def on_update_shadow_rejected(error):
# type: (iotshadow.ErrorResponse) -> None
exit("Update request was rejected. code:{} message:'{}'".format(
error.code, error.message))
try:
# check that this is a response to a request from this session
with locked_data.lock:
try:
locked_data.request_tokens.remove(error.client_token)
except KeyError:
print("Ignoring update_shadow_rejected message due to unexpected token.")
return

exit("Update request was rejected. code:{} message:'{}'".format(
error.code, error.message))

except Exception as e:
exit(e)

def set_local_value_due_to_initial_query(reported_value):
with locked_data.lock:
Expand All @@ -189,16 +232,25 @@ def change_shadow_value(value):
print("Changed local shadow value to '{}'.".format(value))
locked_data.shadow_value = value

print("Updating reported shadow value to '{}'...".format(value))
request = iotshadow.UpdateShadowRequest(
thing_name=thing_name,
state=iotshadow.ShadowState(
reported={ shadow_property: value },
desired={ shadow_property: value },
print("Updating reported shadow value to '{}'...".format(value))

# use a unique token so we can correlate this "request" message to
# any "response" messages received on the /accepted and /rejected topics
token = str(uuid4())

request = iotshadow.UpdateShadowRequest(
thing_name=thing_name,
state=iotshadow.ShadowState(
reported={ shadow_property: value },
desired={ shadow_property: value },
),
client_token=token,
)
)
future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
future.add_done_callback(on_publish_update_shadow)
future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)

locked_data.request_tokens.add(token)

future.add_done_callback(on_publish_update_shadow)

def user_input_thread_fn():
while True:
Expand Down Expand Up @@ -318,14 +370,22 @@ def user_input_thread_fn():
# Wait for subscription to succeed
delta_subscribed_future.result()

# The rest of the sample runs asyncronously.
# The rest of the sample runs asynchronously.

# Issue request for shadow's current state.
# The response will be received by the on_get_accepted() callback
print("Requesting current shadow state...")
publish_get_future = shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(thing_name=args.thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE)

with locked_data.lock:
# use a unique token so we can correlate this "request" message to
# any "response" messages received on the /accepted and /rejected topics
token = str(uuid4())

publish_get_future = shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(thing_name=args.thing_name, client_token=token),
qos=mqtt.QoS.AT_LEAST_ONCE)

locked_data.request_tokens.add(token)

# Ensure that publish succeeds
publish_get_future.result()
Expand Down

0 comments on commit 4fe1db0

Please sign in to comment.