-
Notifications
You must be signed in to change notification settings - Fork 211
/
shadow_mqtt5.py
433 lines (354 loc) · 16.3 KB
/
shadow_mqtt5.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
from time import sleep
from awscrt import mqtt5, http
from awsiot import iotshadow, mqtt5_client_builder
from concurrent.futures import Future
import sys
import threading
import traceback
from uuid import uuid4
from utils.command_line_utils import CommandLineUtils
# - Overview -
# This sample uses the AWS IoT Device Shadow Service to keep a property in
# sync between device and server. Imagine a light whose color may be changed
# through an app, or set by a local user.
#
# - Instructions -
# Once connected, type a value in the terminal and press Enter to update
# the property's "reported" value. The sample also responds when the "desired"
# value changes on the server. To observe this, edit the Shadow document in
# the AWS Console and set a new "desired" value.
#
# - Detail -
# On startup, the sample requests the shadow document to learn the property's
# initial state. The sample also subscribes to "delta" events from the server,
# which are sent when a property's "desired" value differs from its "reported"
# value. When the sample learns of a new desired value, that value is changed
# on the device and an update is sent to the server with the new "reported"
# value.
# cmdData is the arguments/input from the command line placed into a single struct for
# use in this sample. This handles all of the command line parsing, validating, etc.
# See the Utils/CommandLineUtils for more information.
cmdData = CommandLineUtils.parse_sample_input_shadow()
# Using globals to simplify sample code
is_sample_done = threading.Event()
mqtt5_client = None
future_connection_success = Future()
shadow_thing_name = cmdData.input_thing_name
shadow_property = cmdData.input_shadow_property
SHADOW_VALUE_DEFAULT = "off"
class LockedData:
def __init__(self):
self.lock = threading.Lock()
self.shadow_value = None
self.disconnect_called = False
self.request_tokens = set()
locked_data = LockedData()
# Function for gracefully quitting this sample
def exit(msg_or_exception):
if isinstance(msg_or_exception, Exception):
print("Exiting sample due to exception.")
traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2])
else:
print("Exiting sample:", msg_or_exception)
with locked_data.lock:
if not locked_data.disconnect_called:
print("Stop the client...")
locked_data.disconnect_called = True
mqtt5_client.stop()
# Callback for the lifecycle event Connection Success
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("Lifecycle Connection Success")
global future_connection_success
future_connection_success.set_result(lifecycle_connect_success_data)
# Callback for the lifecycle event on Client Stopped
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
# type: (Future) -> None
print("Client Stopped.")
# Signal that sample is finished
is_sample_done.set()
def on_get_shadow_accepted(response):
# type: (iotshadow.GetShadowResponse) -> None
try:
with locked_data.lock:
# check that this is a response to a request from this session
try:
locked_data.request_tokens.remove(response.client_token)
except KeyError:
print("Ignoring get_shadow_accepted message due to unexpected token.")
return
print("Finished getting initial shadow state.")
if locked_data.shadow_value is not None:
print(" Ignoring initial query because a delta event has already been received.")
return
if response.state:
if response.state.delta:
value = response.state.delta.get(shadow_property)
if value:
print(" Shadow contains delta value '{}'.".format(value))
change_shadow_value(value)
return
if response.state.reported:
value = response.state.reported.get(shadow_property)
if value:
print(" Shadow contains reported value '{}'.".format(value))
set_local_value_due_to_initial_query(response.state.reported[shadow_property])
return
print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property))
change_shadow_value(SHADOW_VALUE_DEFAULT)
return
except Exception as e:
exit(e)
def on_get_shadow_rejected(error):
# type: (iotshadow.ErrorResponse) -> None
try:
# check that this is a response to a request from this session
with locked_data.lock:
try:
locked_data.request_tokens.remove(error.client_token)
except KeyError:
print("Ignoring get_shadow_rejected message due to unexpected token.")
return
if error.code == 404:
print("Thing has no shadow document. Creating with defaults...")
change_shadow_value(SHADOW_VALUE_DEFAULT)
else:
exit("Get request was rejected. code:{} message:'{}'".format(
error.code, error.message))
except Exception as e:
exit(e)
def on_shadow_delta_updated(delta):
# type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
try:
print("Received shadow delta event.")
if delta.state and (shadow_property in delta.state):
value = delta.state[shadow_property]
if value is None:
print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property))
change_shadow_value(SHADOW_VALUE_DEFAULT)
return
else:
print(" Delta reports that desired value is '{}'. Changing local value...".format(value))
if (delta.client_token is not None):
print(" ClientToken is: " + delta.client_token)
change_shadow_value(value)
else:
print(" Delta did not report a change in '{}'".format(shadow_property))
except Exception as e:
exit(e)
def on_publish_update_shadow(future):
# type: (Future) -> None
try:
future.result()
print("Update request published.")
except Exception as e:
print("Failed to publish update request.")
exit(e)
def on_update_shadow_accepted(response):
# type: (iotshadow.UpdateShadowResponse) -> None
try:
# check that this is a response to a request from this session
with locked_data.lock:
try:
locked_data.request_tokens.remove(response.client_token)
except KeyError:
print("Ignoring update_shadow_accepted message due to unexpected token.")
return
try:
if response.state.reported is not None:
if shadow_property in response.state.reported:
print("Finished updating reported shadow value to '{}'.".format(
response.state.reported[shadow_property])) # type: ignore
else:
print("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore
else:
print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None
print("Enter desired value: ") # remind user they can input new values
except BaseException:
exit("Updated shadow is missing the target property")
except Exception as e:
exit(e)
def on_update_shadow_rejected(error):
# type: (iotshadow.ErrorResponse) -> None
try:
# check that this is a response to a request from this session
with locked_data.lock:
try:
locked_data.request_tokens.remove(error.client_token)
except KeyError:
print("Ignoring update_shadow_rejected message due to unexpected token.")
return
exit("Update request was rejected. code:{} message:'{}'".format(
error.code, error.message))
except Exception as e:
exit(e)
def set_local_value_due_to_initial_query(reported_value):
with locked_data.lock:
locked_data.shadow_value = reported_value
print("Enter desired value: ") # remind user they can input new values
def change_shadow_value(value):
with locked_data.lock:
if locked_data.shadow_value == value:
print("Local value is already '{}'.".format(value))
print("Enter desired value: ") # remind user they can input new values
return
print("Changed local shadow value to '{}'.".format(value))
locked_data.shadow_value = value
print("Updating reported shadow value to '{}'...".format(value))
# use a unique token so we can correlate this "request" message to
# any "response" messages received on the /accepted and /rejected topics
token = str(uuid4())
# if the value is "clear shadow" then send a UpdateShadowRequest with None
# for both reported and desired to clear the shadow document completely.
if value == "clear_shadow":
tmp_state = iotshadow.ShadowState(
reported=None,
desired=None,
reported_is_nullable=True,
desired_is_nullable=True)
request = iotshadow.UpdateShadowRequest(
thing_name=shadow_thing_name,
state=tmp_state,
client_token=token,
)
# Otherwise, send a normal update request
else:
# if the value is "none" then set it to a Python none object to
# clear the individual shadow property
if value == "none":
value = None
request = iotshadow.UpdateShadowRequest(
thing_name=shadow_thing_name,
state=iotshadow.ShadowState(
reported={shadow_property: value},
desired={shadow_property: value},
),
client_token=token,
)
future = shadow_client.publish_update_shadow(request, mqtt5.QoS.AT_LEAST_ONCE)
locked_data.request_tokens.add(token)
future.add_done_callback(on_publish_update_shadow)
def user_input_thread_fn():
# If we are not in CI, then take terminal input
if not cmdData.input_is_ci:
while True:
try:
# Read user input
new_value = input()
# If user wants to quit sample, then quit.
# Otherwise change the shadow value.
if new_value in ['exit', 'quit']:
exit("User has quit")
break
else:
change_shadow_value(new_value)
except Exception as e:
print("Exception on input thread.")
exit(e)
break
# Otherwise, send shadow updates automatically
else:
try:
messages_sent = 0
while messages_sent < 5:
cli_input = "Shadow_Value_" + str(messages_sent)
change_shadow_value(cli_input)
sleep(1)
messages_sent += 1
exit("CI has quit")
except Exception as e:
print("Exception on input thread (CI)")
exit(e)
if __name__ == '__main__':
# Create the proxy options if the data is present in cmdData
proxy_options = None
if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0:
proxy_options = http.HttpProxyOptions(
host_name=cmdData.input_proxy_host,
port=cmdData.input_proxy_port)
# Create a mqtt5 connection from the command line data
mqtt5_client = mqtt5_client_builder.mtls_from_path(
endpoint=cmdData.input_endpoint,
port=cmdData.input_port,
cert_filepath=cmdData.input_cert,
pri_key_filepath=cmdData.input_key,
ca_filepath=cmdData.input_ca,
client_id=cmdData.input_clientId,
clean_session=False,
keep_alive_secs=30,
http_proxy_options=proxy_options,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_stopped=on_lifecycle_stopped)
if not cmdData.input_is_ci:
print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")
else:
print("Connecting to endpoint with client ID")
mqtt5_client.start()
shadow_client = iotshadow.IotShadowClient(mqtt5_client)
# Wait for connection to be fully established.
# Note that it's not necessary to wait, commands issued to the
# mqtt5_client before its fully connected will simply be queued.
# But this sample waits here so it's obvious when a connection
# fails or succeeds.
future_connection_success.result()
print("Connected!")
try:
# Subscribe to necessary topics.
# Note that is **is** important to wait for "accepted/rejected" subscriptions
# to succeed before publishing the corresponding "request".
print("Subscribing to Update responses...")
update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_accepted)
update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_rejected)
# Wait for subscriptions to succeed
result = update_accepted_subscribed_future.result()
update_rejected_subscribed_future.result()
print("Subscribing to Get responses...")
get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_accepted)
get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_rejected)
# Wait for subscriptions to succeed
get_accepted_subscribed_future.result()
get_rejected_subscribed_future.result()
print("Subscribing to Delta events...")
delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events(
request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_shadow_delta_updated)
# Wait for subscription to succeed
delta_subscribed_future.result()
# The rest of the sample runs asynchronously.
# Issue request for shadow's current state.
# The response will be received by the on_get_accepted() callback
print("Requesting current shadow state...")
with locked_data.lock:
# use a unique token so we can correlate this "request" message to
# any "response" messages received on the /accepted and /rejected topics
token = str(uuid4())
publish_get_future = shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(thing_name=shadow_thing_name, client_token=token),
qos=mqtt5.QoS.AT_LEAST_ONCE)
locked_data.request_tokens.add(token)
# Ensure that publish succeeds
publish_get_future.result()
# Launch thread to handle user input.
# A "daemon" thread won't prevent the program from shutting down.
print("Launching thread to read user input...")
user_input_thread = threading.Thread(target=user_input_thread_fn, name='user_input_thread')
user_input_thread.daemon = True
user_input_thread.start()
except Exception as e:
exit(e)
# Wait for the sample to finish (user types 'quit', or an error occurs)
is_sample_done.wait()