From 0f2807e360bc3b5f98b5eaeb9188d142daa0ce1f Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 27 Jun 2024 16:10:40 -0700 Subject: [PATCH] New shadow sample --- lib/mqtt_request_response_utils.ts | 7 +- samples/node/deprecated/shadow/README.md | 138 ++++++ samples/node/deprecated/shadow/index.ts | 399 ++++++++++++++++ samples/node/deprecated/shadow/package.json | 27 ++ samples/node/deprecated/shadow/tsconfig.json | 62 +++ samples/node/shadow/index.ts | 460 +++++-------------- 6 files changed, 741 insertions(+), 352 deletions(-) create mode 100644 samples/node/deprecated/shadow/README.md create mode 100644 samples/node/deprecated/shadow/index.ts create mode 100644 samples/node/deprecated/shadow/package.json create mode 100644 samples/node/deprecated/shadow/tsconfig.json diff --git a/lib/mqtt_request_response_utils.ts b/lib/mqtt_request_response_utils.ts index 150534c8..cfeb5271 100644 --- a/lib/mqtt_request_response_utils.ts +++ b/lib/mqtt_request_response_utils.ts @@ -128,6 +128,7 @@ export async function doRequestResponse(options: RequestResponseOp let response = await options.client.submitRequest(requestOptions); let responseTopic = response.topic; + let wasSuccess = responseTopic.endsWith("accepted"); // May need to eventually model let responsePayload = response.payload; let deserializer = deserializerMap.get(responseTopic); @@ -137,7 +138,11 @@ export async function doRequestResponse(options: RequestResponseOp } let deserializedResponse = deserializer(responsePayload) as ResponseType; - resolve(deserializedResponse); + if (wasSuccess) { + resolve(deserializedResponse); + } else { + reject(createServiceError("Request failed", undefined, deserializedResponse)); + } } catch (err) { if (err instanceof ServiceError) { reject(err); diff --git a/samples/node/deprecated/shadow/README.md b/samples/node/deprecated/shadow/README.md new file mode 100644 index 00000000..ca06d8f1 --- /dev/null +++ b/samples/node/deprecated/shadow/README.md @@ -0,0 +1,138 @@ +# Node: Shadow + +[**Return to main sample list**](../../README.md) + +This sample uses the AWS IoT [Device Shadow](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html) Service to keep a property in sync between device and server. Imagine a light whose color may be changed through an app, or set by a local user. + +Once connected, type a value in the terminal and press Enter to update the property's "reported" value. The sample also responds when the "desired" value changes on the server. To observe this, edit the Shadow document in the AWS Console and set a new "desired" value. + +On startup, the sample requests the shadow document to learn the property's initial state. The sample also subscribes to "delta" events from the server, which are sent when a property's "desired" value differs from its "reported" value. When the sample learns of a new desired value, that value is changed on the device and an update is sent to the server with the new "reported" value. + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+Sample Policy +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Publish"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Receive"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get/accepted",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get/rejected",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/accepted",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/rejected",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/delta"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/get/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/get/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/delta"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/test-*"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: The name of your AWS IoT Core thing you want the device connection to be associated with + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +To run the Shadow sample, go to the `node/shadow` folder and run the following commands: + +``` sh +npm install +node dist/index.js --endpoint --cert --key --thing_name --shadow_property +``` + +You can also pass `--mqtt5` to run the sample with Mqtt5 Client +```sh +npm install +node dist/index.js --endpoint --cert --key --thing_name --shadow_property --mqtt5 +``` + +You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: + +``` sh +npm install +node dist/index.js --endpoint --cert --key --thing_name --shadow_property --ca_file +``` + + +## Service Client Notes +### Differences between MQTT5 and MQTT311 +The service client with Mqtt5 client is almost identical to Mqtt3 one. The only difference is that you would need setup up a Mqtt5 Client and pass it to the service client. +For how to setup a Mqtt5 Client, please refer to [MQTT5 User Guide](https://github.com/awslabs/aws-crt-nodejs/blob/main/MQTT5-UserGuide.md) and [MQTT5 PubSub Sample](../pub_sub_mqtt5/README.md) + + + + + + + + + + +
Create a IoTShadowClient with Mqtt5Create a IoTShadowClient with Mqtt311
+ +```js + // Create a Mqtt5 Client + config_builder = iot.AwsIotMqtt5ClientConfigBuilder.newDirectMqttBuilderWithMtlsFromPath(argv.endpoint, argv.cert, argv.key); + client = new mqtt5.Mqtt5Client(config_builder.build()); + + // Create the shadow client from Mqtt5 Client + shadow = iotshadow.IotShadowClient.newFromMqtt5Client(client5); +``` + + + +```js + // Create a Mqtt311 Connection from the command line data + config_builder = iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder_from_path(argv.cert, argv.key); + config_builder.with_client_id(argv.client_id || "test-" + Math.floor(Math.random() * 100000000)); + config_builder.with_endpoint(argv.endpoint); + client = new mqtt.MqttClient(); + connection = client.new_connection(config); + + // Create the shadow client from Mqtt311 Connection + shadow = new iotshadow.IotShadowClient(connection); +``` + +
+ +### mqtt5.QoS v.s. mqtt.QoS +As the service client interface is unchanged for both Mqtt3 Connection and Mqtt5 Client,the service client will use mqtt.QoS instead of mqtt5.QoS even with a Mqtt5 Client. diff --git a/samples/node/deprecated/shadow/index.ts b/samples/node/deprecated/shadow/index.ts new file mode 100644 index 00000000..d2b316fa --- /dev/null +++ b/samples/node/deprecated/shadow/index.ts @@ -0,0 +1,399 @@ +import { mqtt, iotshadow } from 'aws-iot-device-sdk-v2'; +import { stringify } from 'querystring'; +import readline from 'readline'; +import {once} from "events"; + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout +}); +const prompt = (query: string) => new Promise((resolve) => rl.question(query, resolve)); +const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + +type Args = { [index: string]: any }; +const yargs = require('yargs'); + +// The relative path is '../../util/cli_args' from here, but the compiled javascript file gets put one level +// deeper inside the 'dist' folder +const common_args = require('../../../../util/cli_args'); + +var shadow_value: unknown; +var shadow_property: string; + +var shadow_update_complete = false; + +yargs.command('*', false, (yargs: any) => { + common_args.add_direct_connection_establishment_arguments(yargs); + common_args.add_shadow_arguments(yargs); +}, main).parse(); + +async function sub_to_shadow_update(shadow: iotshadow.IotShadowClient, argv: Args) { + return new Promise(async (resolve, reject) => { + try { + function updateAccepted(error?: iotshadow.IotShadowError, response?: iotshadow.model.UpdateShadowResponse) { + if (response) { + + if (response.clientToken !== undefined) { + console.log("Succcessfully updated shadow for clientToken: " + response.clientToken + "."); + } + else { + console.log("Succcessfully updated shadow."); + } + if (response.state?.desired !== undefined) { + console.log("\t desired state: " + stringify(response.state.desired)); + } + if (response.state?.reported !== undefined) { + console.log("\t reported state: " + stringify(response.state.reported)); + } + } + + if (error || !response) { + console.log("Updated shadow is missing the target property."); + } + resolve(true); + } + + function updateRejected(error?: iotshadow.IotShadowError, response?: iotshadow.model.ErrorResponse) { + if (response) { + console.log("Update request was rejected."); + } + + if (error) { + console.log("Error occurred..") + } + reject(error); + } + + console.log("Subscribing to Update events.."); + const updateShadowSubRequest: iotshadow.model.UpdateNamedShadowSubscriptionRequest = { + shadowName: argv.shadow_property, + thingName: argv.thing_name + }; + + await shadow.subscribeToUpdateShadowAccepted( + updateShadowSubRequest, + mqtt.QoS.AtLeastOnce, + (error, response) => updateAccepted(error, response)); + + await shadow.subscribeToUpdateShadowRejected( + updateShadowSubRequest, + mqtt.QoS.AtLeastOnce, + (error, response) => updateRejected(error, response)); + + resolve(true); + } + catch (error) { + reject(error); + } + }); +} + +async function sub_to_shadow_get(shadow: iotshadow.IotShadowClient, argv: Args) { + return new Promise(async (resolve, reject) => { + try { + function getAccepted(error?: iotshadow.IotShadowError, response?: iotshadow.model.GetShadowResponse) { + + if (response?.state) { + if (response?.state.delta) { + const value = response.state.delta; + if (value) { + console.log("Shadow contains delta value '" + stringify(value) + "'."); + change_shadow_value(shadow, argv, value); + } + } + + if (response?.state.reported) { + const value_any: any = response.state.reported; + if (value_any) { + let found_property = false; + for (var prop in value_any) { + if (prop === shadow_property) { + found_property = true; + console.log("Shadow contains '" + prop + "'. Reported value: '" + String(value_any[prop]) + "'."); + break; + } + } + if (found_property === false) { + console.log("Shadow does not contain '" + shadow_property + "' property."); + } + } + } + } + + if (error || !response) { + console.log("Error occurred.."); + } + shadow_update_complete = true; + resolve(true); + } + + function getRejected(error?: iotshadow.IotShadowError, response?: iotshadow.model.ErrorResponse) { + + if (response) { + console.log("In getRejected response."); + } + + if (error) { + console.log("Error occurred.."); + } + + shadow_update_complete = true; + reject(error); + } + + console.log("Subscribing to Get events.."); + const getShadowSubRequest: iotshadow.model.GetShadowSubscriptionRequest = { + thingName: argv.thing_name + }; + + await shadow.subscribeToGetShadowAccepted( + getShadowSubRequest, + mqtt.QoS.AtLeastOnce, + (error, response) => getAccepted(error, response)); + + await shadow.subscribeToGetShadowRejected( + getShadowSubRequest, + mqtt.QoS.AtLeastOnce, + (error, response) => getRejected(error, response)); + + resolve(true); + } + catch (error) { + reject(error); + } + }); +} + +async function sub_to_shadow_delta(shadow: iotshadow.IotShadowClient, argv: Args) { + return new Promise(async (resolve, reject) => { + try { + function deltaEvent(error?: iotshadow.IotShadowError, response?: iotshadow.model.GetShadowResponse) { + console.log("\nReceived shadow delta event."); + + if (response?.clientToken != null) { + console.log(" ClientToken: " + response.clientToken); + } + + if (response?.state !== null) { + let value_any: any = response?.state; + if (value_any === null || value_any === undefined) { + console.log("Delta reports that '" + shadow_property + "' was deleted. Resetting defaults.."); + let data_to_send: any = {}; + data_to_send[shadow_property] = argv.shadow_value; + change_shadow_value(shadow, argv, data_to_send); + } + else { + if (value_any[shadow_property] !== undefined) { + if (value_any[shadow_property] !== shadow_value) { + console.log("Delta reports that desired value is '" + value_any[shadow_property] + "'. Changing local value.."); + let data_to_send: any = {}; + data_to_send[shadow_property] = value_any[shadow_property]; + change_shadow_value(shadow, argv, data_to_send); + } + else { + console.log("Delta did not report a change in '" + shadow_property + "'."); + } + } + else { + console.log("Desired value not found in delta. Skipping.."); + } + } + } + else { + console.log("Delta did not report a change in '" + shadow_property + "'."); + } + + resolve(true); + } + + console.log("Subscribing to Delta events.."); + const deltaShadowSubRequest: iotshadow.model.ShadowDeltaUpdatedSubscriptionRequest = { + thingName: argv.thing_name + }; + + await shadow.subscribeToShadowDeltaUpdatedEvents( + deltaShadowSubRequest, + mqtt.QoS.AtLeastOnce, + (error, response) => deltaEvent(error, response)); + + resolve(true); + } + catch (error) { + reject(error); + } + }); +} + +async function get_current_shadow(shadow: iotshadow.IotShadowClient, argv: Args) { + return new Promise(async (resolve, reject) => { + try { + const getShadow: iotshadow.model.GetShadowRequest = { + thingName: argv.thing_name + } + + shadow_update_complete = false; + console.log("Requesting current shadow state.."); + shadow.publishGetShadow( + getShadow, + mqtt.QoS.AtLeastOnce); + + await get_current_shadow_update_wait(); + resolve(true); + } + catch (error) { + reject(error); + } + }); +} + + +async function get_current_shadow_update_wait() { + // Wait until shadow_update_complete is true, showing the result returned + return await new Promise(resolve => { + const interval = setInterval(() => { + if (shadow_update_complete == true) { + resolve(true); + clearInterval(interval); + }; + }, 200); + }); +} + +function change_shadow_value(shadow: iotshadow.IotShadowClient, argv: Args, new_value?: object) { + return new Promise(async (resolve, reject) => { + try { + if (typeof new_value !== 'undefined') { + let new_value_any: any = new_value; + let skip_send = false; + + if (new_value_any !== null) { + if (new_value_any[shadow_property] == shadow_value) { + skip_send = true; + } + } + if (skip_send == false) { + if (new_value_any === null) { + shadow_value = new_value_any; + } + else { + shadow_value = new_value_any[shadow_property]; + } + + console.log("Changed local shadow value to '" + shadow_value + "'."); + + var updateShadow: iotshadow.model.UpdateShadowRequest = { + state: { + desired: new_value, + reported: new_value + }, + thingName: argv.thing_name + }; + + await shadow.publishUpdateShadow( + updateShadow, + mqtt.QoS.AtLeastOnce) + + console.log("Update request published."); + } + } + } + catch (error) { + console.log("Failed to publish update request.") + reject(error); + } + resolve(true) + }); +} + +async function main(argv: Args) { + common_args.apply_sample_arguments(argv); + + shadow_property = argv.shadow_property; + + var connection; + var client5; + var shadow; + + console.log("Connecting..."); + if (argv.mqtt5) { // Build the mqtt5 client + client5 = common_args.build_mqtt5_client_from_cli_args(argv); + shadow = iotshadow.IotShadowClient.newFromMqtt5Client(client5); + + const connectionSuccess = once(client5, "connectionSuccess"); + client5.start(); + await connectionSuccess; + console.log("Connected with Mqtt5 Client..."); + } else { // Build the mqtt3 based connection + connection = common_args.build_connection_from_cli_args(argv); + shadow = new iotshadow.IotShadowClient(connection); + + await connection.connect(); + console.log("Connected with Mqtt3 Client..."); + } + + try { + await sub_to_shadow_update(shadow, argv); + await sub_to_shadow_get(shadow, argv); + await sub_to_shadow_delta(shadow, argv); + await get_current_shadow(shadow, argv); + + await sleep(500); // wait half a second + + // Take console input when this sample is not running in CI + if (argv.is_ci == false) { + while (true) { + const userInput = await prompt("Enter desired value: "); + if (userInput === "quit") { + break; + } + else { + let data_to_send: any = {}; + + if (userInput == "clear_shadow") { + data_to_send = null; + } + else if (userInput == "null") { + data_to_send[shadow_property] = null; + } + else { + data_to_send[shadow_property] = userInput; + } + + await change_shadow_value(shadow, argv, data_to_send); + await get_current_shadow(shadow, argv); + } + } + } + // If this is running in CI, then automatically update the shadow + else { + var messages_sent = 0; + while (messages_sent < 5) { + let data_to_send: any = {} + data_to_send[shadow_property] = "Shadow_Value_" + messages_sent.toString() + await change_shadow_value(shadow, argv, data_to_send); + await get_current_shadow(shadow, argv); + messages_sent += 1 + } + } + + } catch (error) { + console.log(error); + } + + console.log("Disconnecting.."); + + if (connection) { + await connection.disconnect(); + } else { + let stopped = once(client5, "stopped"); + client5.stop(); + await stopped; + client5.close(); + } + + // force node to wait a second before quitting to finish any promises + await sleep(1000); + console.log("Disconnected"); + // Quit NodeJS + process.exit(0); +} diff --git a/samples/node/deprecated/shadow/package.json b/samples/node/deprecated/shadow/package.json new file mode 100644 index 00000000..085c0446 --- /dev/null +++ b/samples/node/deprecated/shadow/package.json @@ -0,0 +1,27 @@ +{ + "name": "shadow", + "version": "1.0.0", + "description": "NodeJS IoT SDK v2 Shadow Sample", + "homepage": "https://github.com/aws/aws-iot-device-sdk-js-v2", + "repository": { + "type": "git", + "url": "git+https://github.com/aws/aws-iot-device-sdk-js-v2.git" + }, + "contributors": [ + "AWS SDK Common Runtime Team " + ], + "license": "Apache-2.0", + "main": "./dist/index.js", + "scripts": { + "tsc": "tsc", + "prepare": "npm run tsc" + }, + "devDependencies": { + "@types/node": "^10.17.50", + "typescript": "^4.7.4" + }, + "dependencies": { + "aws-iot-device-sdk-v2": "../../../", + "yargs": "^16.2.0" + } +} diff --git a/samples/node/deprecated/shadow/tsconfig.json b/samples/node/deprecated/shadow/tsconfig.json new file mode 100644 index 00000000..92617173 --- /dev/null +++ b/samples/node/deprecated/shadow/tsconfig.json @@ -0,0 +1,62 @@ +{ + "compilerOptions": { + /* Basic Options */ + "target": "es6", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017','ES2018' or 'ESNEXT'. */ + "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', or 'ESNext'. */ + // "lib": [], /* Specify library files to be included in the compilation. */ + // "allowJs": true, /* Allow javascript files to be compiled. */ + // "checkJs": true, /* Report errors in .js files. */ + // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', or 'react'. */ + "declaration": true, /* Generates corresponding '.d.ts' file. */ + // "declarationMap": true, /* Generates a sourcemap for each corresponding '.d.ts' file. */ + "sourceMap": true, /* Generates corresponding '.map' file. */ + // "outFile": "./", /* Concatenate and emit output to single file. */ + "outDir": "./dist", /* Redirect output structure to the directory. */ + // "rootDir": "./", /* Specify the root directory of input files. Use to control the output directory structure with --outDir. */ + // "composite": true, /* Enable project compilation */ + // "removeComments": false, /* Do not emit comments to output. */ + // "noEmit": true, /* Do not emit outputs. */ + // "importHelpers": true, /* Import emit helpers from 'tslib'. */ + // "downlevelIteration": true, /* Provide full support for iterables in 'for-of', spread, and destructuring when targeting 'ES5' or 'ES3'. */ + // "isolatedModules": true, /* Transpile each file as a separate module (similar to 'ts.transpileModule'). */ + /* Strict Type-Checking Options */ + "strict": true, /* Enable all strict type-checking options. */ + "noImplicitAny": true, /* Raise error on expressions and declarations with an implied 'any' type. */ + "strictNullChecks": true, /* Enable strict null checks. */ + "strictFunctionTypes": true, /* Enable strict checking of function types. */ + "strictBindCallApply": true, /* Enable strict 'bind', 'call', and 'apply' methods on functions. */ + "strictPropertyInitialization": true, /* Enable strict checking of property initialization in classes. */ + "noImplicitThis": true, /* Raise error on 'this' expressions with an implied 'any' type. */ + "alwaysStrict": true, /* Parse in strict mode and emit "use strict" for each source file. */ + /* Additional Checks */ + "noUnusedLocals": true, /* Report errors on unused locals. */ + // "noUnusedParameters": true, /* Report errors on unused parameters. */ + "noImplicitReturns": true, /* Report error when not all code paths in function return a value. */ + // "noFallthroughCasesInSwitch": true, /* Report errors for fallthrough cases in switch statement. */ + /* Module Resolution Options */ + // "moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */ + // "baseUrl": "./", /* Base directory to resolve non-absolute module names. */ + // "paths": {}, /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */ + // "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */ + // "typeRoots": [], /* List of folders to include type definitions from. */ + // "types": [], /* Type declaration files to be included in compilation. */ + // "allowSyntheticDefaultImports": true, /* Allow default imports from modules with no default export. This does not affect code emit, just typechecking. */ + "esModuleInterop": true /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */ + // "preserveSymlinks": true, /* Do not resolve the real path of symlinks. */ + /* Source Map Options */ + // "sourceRoot": "", /* Specify the location where debugger should locate TypeScript files instead of source locations. */ + // "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */ + // "inlineSourceMap": true, /* Emit a single file with source maps instead of having a separate file. */ + // "inlineSources": true, /* Emit the source alongside the sourcemaps within a single file; requires '--inlineSourceMap' or '--sourceMap' to be set. */ + /* Experimental Options */ + // "experimentalDecorators": true, /* Enables experimental support for ES7 decorators. */ + // "emitDecoratorMetadata": true, /* Enables experimental support for emitting type metadata for decorators. */ + }, + "include": [ + "*.ts" + ], + "exclude": [ + "node_modules", + "dist" + ] +} \ No newline at end of file diff --git a/samples/node/shadow/index.ts b/samples/node/shadow/index.ts index e5d941b2..99f0562f 100644 --- a/samples/node/shadow/index.ts +++ b/samples/node/shadow/index.ts @@ -1,14 +1,11 @@ -import { mqtt, iotshadow } from 'aws-iot-device-sdk-v2'; -import { stringify } from 'querystring'; +import { iotshadow } from 'aws-iot-device-sdk-v2'; import readline from 'readline'; import {once} from "events"; -const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout -}); -const prompt = (query: string) => new Promise((resolve) => rl.question(query, resolve)); -const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); +interface SampleContext { + thingName: string, + client: iotshadow.IotShadowClientv2 +} type Args = { [index: string]: any }; const yargs = require('yargs'); @@ -17,383 +14,144 @@ const yargs = require('yargs'); // deeper inside the 'dist' folder const common_args = require('../../../util/cli_args'); -var shadow_value: unknown; -var shadow_property: string; - -var shadow_update_complete = false; - yargs.command('*', false, (yargs: any) => { common_args.add_direct_connection_establishment_arguments(yargs); common_args.add_shadow_arguments(yargs); }, main).parse(); -async function sub_to_shadow_update(shadow: iotshadow.IotShadowClient, argv: Args) { - return new Promise(async (resolve, reject) => { - try { - function updateAccepted(error?: iotshadow.IotShadowError, response?: iotshadow.model.UpdateShadowResponse) { - if (response) { - - if (response.clientToken !== undefined) { - console.log("Succcessfully updated shadow for clientToken: " + response.clientToken + "."); - } - else { - console.log("Succcessfully updated shadow."); - } - if (response.state?.desired !== undefined) { - console.log("\t desired state: " + stringify(response.state.desired)); - } - if (response.state?.reported !== undefined) { - console.log("\t reported state: " + stringify(response.state.reported)); - } - } - - if (error || !response) { - console.log("Updated shadow is missing the target property."); - } - resolve(true); - } - - function updateRejected(error?: iotshadow.IotShadowError, response?: iotshadow.model.ErrorResponse) { - if (response) { - console.log("Update request was rejected."); - } - - if (error) { - console.log("Error occurred..") - } - reject(error); - } - - console.log("Subscribing to Update events.."); - const updateShadowSubRequest: iotshadow.model.UpdateNamedShadowSubscriptionRequest = { - shadowName: argv.shadow_property, - thingName: argv.thing_name - }; - - await shadow.subscribeToUpdateShadowAccepted( - updateShadowSubRequest, - mqtt.QoS.AtLeastOnce, - (error, response) => updateAccepted(error, response)); - - await shadow.subscribeToUpdateShadowRejected( - updateShadowSubRequest, - mqtt.QoS.AtLeastOnce, - (error, response) => updateRejected(error, response)); - - resolve(true); - } - catch (error) { - reject(error); - } - }); +function printHelp() { + console.log('Supported commands:'); + console.log(" get - gets the current value of the IoT thing's shadow"); + console.log(" delete - deletes the IoT thing's shadow"); + console.log(" update-desired - updates the desired state of the IoT thing's shadow. If the shadow does not exist, it will be created."); + console.log(" update-reported - updates the reported state of the IoT thing's shadow. If the shadow does not exist, it will be created."); + console.log(" quit - quits the sample application\n"); } -async function sub_to_shadow_get(shadow: iotshadow.IotShadowClient, argv: Args) { - return new Promise(async (resolve, reject) => { - try { - function getAccepted(error?: iotshadow.IotShadowError, response?: iotshadow.model.GetShadowResponse) { - - if (response?.state) { - if (response?.state.delta) { - const value = response.state.delta; - if (value) { - console.log("Shadow contains delta value '" + stringify(value) + "'."); - change_shadow_value(shadow, argv, value); - } +async function handleCommand(context: SampleContext, input: string) : Promise { + try { + let command = input.split(' ')[0]; + let remaining = input.substring(command.length); + console.log(""); + + switch (command) { + case "get": + let getResponse = await context.client.getShadow({ + thingName: context.thingName + }); + console.log(`Get response: ${JSON.stringify(getResponse)}\n`); + break; + + case "delete": + let deleteResponse = await context.client.deleteShadow({ + thingName: context.thingName + }); + console.log(`\nDelete response: ${JSON.stringify(deleteResponse)}\n`); + break; + + case "update-desired": + let updateDesiredResponse = await context.client.updateShadow({ + thingName: context.thingName, + state: { + desired: JSON.parse(remaining) } - - if (response?.state.reported) { - const value_any: any = response.state.reported; - if (value_any) { - let found_property = false; - for (var prop in value_any) { - if (prop === shadow_property) { - found_property = true; - console.log("Shadow contains '" + prop + "'. Reported value: '" + String(value_any[prop]) + "'."); - break; - } - } - if (found_property === false) { - console.log("Shadow does not contain '" + shadow_property + "' property."); - } - } + }); + console.log(`Update Desired response: ${JSON.stringify(updateDesiredResponse)}\n`); + break; + + case "update-reported": + let updateReportedResponse = await context.client.updateShadow({ + thingName: context.thingName, + state: { + reported: JSON.parse(remaining) } - } + }); + console.log(`Update Reported response: ${JSON.stringify(updateReportedResponse)}\n`); + break; - if (error || !response) { - console.log("Error occurred.."); - } - shadow_update_complete = true; - resolve(true); - } + case "quit": + return true; - function getRejected(error?: iotshadow.IotShadowError, response?: iotshadow.model.ErrorResponse) { + case "help": + printHelp(); + break; - if (response) { - console.log("In getRejected response."); - } - - if (error) { - console.log("Error occurred.."); - } - - shadow_update_complete = true; - reject(error); - } - - console.log("Subscribing to Get events.."); - const getShadowSubRequest: iotshadow.model.GetShadowSubscriptionRequest = { - thingName: argv.thing_name - }; - - await shadow.subscribeToGetShadowAccepted( - getShadowSubRequest, - mqtt.QoS.AtLeastOnce, - (error, response) => getAccepted(error, response)); - - await shadow.subscribeToGetShadowRejected( - getShadowSubRequest, - mqtt.QoS.AtLeastOnce, - (error, response) => getRejected(error, response)); - - resolve(true); - } - catch (error) { - reject(error); + default: + console.log(`Unknown command: ${command}\n`); + printHelp(); + break; } - }); -} - -async function sub_to_shadow_delta(shadow: iotshadow.IotShadowClient, argv: Args) { - return new Promise(async (resolve, reject) => { - try { - function deltaEvent(error?: iotshadow.IotShadowError, response?: iotshadow.model.GetShadowResponse) { - console.log("\nReceived shadow delta event."); - - if (response?.clientToken != null) { - console.log(" ClientToken: " + response.clientToken); - } + } catch (error) { + console.log(`Error processing command: ${JSON.stringify(error)}\n`); + } - if (response?.state !== null) { - let value_any: any = response?.state; - if (value_any === null || value_any === undefined) { - console.log("Delta reports that '" + shadow_property + "' was deleted. Resetting defaults.."); - let data_to_send: any = {}; - data_to_send[shadow_property] = argv.shadow_value; - change_shadow_value(shadow, argv, data_to_send); - } - else { - if (value_any[shadow_property] !== undefined) { - if (value_any[shadow_property] !== shadow_value) { - console.log("Delta reports that desired value is '" + value_any[shadow_property] + "'. Changing local value.."); - let data_to_send: any = {}; - data_to_send[shadow_property] = value_any[shadow_property]; - change_shadow_value(shadow, argv, data_to_send); - } - else { - console.log("Delta did not report a change in '" + shadow_property + "'."); - } - } - else { - console.log("Desired value not found in delta. Skipping.."); - } - } - } - else { - console.log("Delta did not report a change in '" + shadow_property + "'."); - } + return false; +} - resolve(true); - } +async function main(argv: Args) { + common_args.apply_sample_arguments(argv); - console.log("Subscribing to Delta events.."); - const deltaShadowSubRequest: iotshadow.model.ShadowDeltaUpdatedSubscriptionRequest = { - thingName: argv.thing_name - }; + console.log("Connecting..."); - await shadow.subscribeToShadowDeltaUpdatedEvents( - deltaShadowSubRequest, - mqtt.QoS.AtLeastOnce, - (error, response) => deltaEvent(error, response)); + let protocolClient = common_args.build_mqtt5_client_from_cli_args(argv); + const connectionSuccess = once(protocolClient, "connectionSuccess"); + protocolClient.start(); + await connectionSuccess; + console.log("Connected!"); - resolve(true); - } - catch (error) { - reject(error); - } + let shadowClient = iotshadow.IotShadowClientv2.newFromMqtt5(protocolClient, { + maxRequestResponseSubscriptions: 5, + maxStreamingSubscriptions: 2, + operationTimeoutInSeconds: 60 }); -} - -async function get_current_shadow(shadow: iotshadow.IotShadowClient, argv: Args) { - return new Promise(async (resolve, reject) => { - try { - const getShadow: iotshadow.model.GetShadowRequest = { - thingName: argv.thing_name - } - shadow_update_complete = false; - console.log("Requesting current shadow state.."); - shadow.publishGetShadow( - getShadow, - mqtt.QoS.AtLeastOnce); + let context: SampleContext = { + thingName: argv.thing_name, + client: shadowClient + }; - await get_current_shadow_update_wait(); - resolve(true); - } - catch (error) { - reject(error); - } + // invoked when the shadow state changes + let shadowUpdatedStream = shadowClient.createShadowUpdatedStream({ + thingName: context.thingName }); -} - - -async function get_current_shadow_update_wait() { - // Wait until shadow_update_complete is true, showing the result returned - return await new Promise(resolve => { - const interval = setInterval(() => { - if (shadow_update_complete == true) { - resolve(true); - clearInterval(interval); - }; - }, 200); + shadowUpdatedStream.on('incomingPublish', (event) => { + console.log(`Received ShadowUpdated event: ${JSON.stringify(event.message)}\n`) + }) + shadowUpdatedStream.open(); + + // invoked when there's a change to the delta between reported and desired + let shadowDeltaUpdatedStream = shadowClient.createShadowDeltaUpdatedStream({ + thingName: context.thingName }); -} - -function change_shadow_value(shadow: iotshadow.IotShadowClient, argv: Args, new_value?: object) { - return new Promise(async (resolve, reject) => { - try { - if (typeof new_value !== 'undefined') { - let new_value_any: any = new_value; - let skip_send = false; - - if (new_value_any !== null) { - if (new_value_any[shadow_property] == shadow_value) { - skip_send = true; - } - } - if (skip_send == false) { - if (new_value_any === null) { - shadow_value = new_value_any; - } - else { - shadow_value = new_value_any[shadow_property]; - } - - console.log("Changed local shadow value to '" + shadow_value + "'."); - - var updateShadow: iotshadow.model.UpdateShadowRequest = { - state: { - desired: new_value, - reported: new_value - }, - thingName: argv.thing_name - }; - - await shadow.publishUpdateShadow( - updateShadow, - mqtt.QoS.AtLeastOnce) - - console.log("Update request published."); - } - } - } - catch (error) { - console.log("Failed to publish update request.") - reject(error); - } - resolve(true) + shadowDeltaUpdatedStream.on('incomingPublish', (event) => { + console.log(`Received ShadowDeltaUpdated event: ${JSON.stringify(event.message)}\n`) + }) + shadowDeltaUpdatedStream.open(); + + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout }); -} - -async function main(argv: Args) { - common_args.apply_sample_arguments(argv); - - shadow_property = argv.shadow_property; - - var connection; - var client5; - var shadow; - - console.log("Connecting..."); - if (argv.mqtt5) { // Build the mqtt5 client - client5 = common_args.build_mqtt5_client_from_cli_args(argv); - shadow = iotshadow.IotShadowClient.newFromMqtt5Client(client5); - - const connectionSuccess = once(client5, "connectionSuccess"); - client5.start(); - await connectionSuccess; - console.log("Connected with Mqtt5 Client..."); - } else { // Build the mqtt3 based connection - connection = common_args.build_connection_from_cli_args(argv); - shadow = new iotshadow.IotShadowClient(connection); - - await connection.connect(); - console.log("Connected with Mqtt3 Client..."); - } try { - await sub_to_shadow_update(shadow, argv); - await sub_to_shadow_get(shadow, argv); - await sub_to_shadow_delta(shadow, argv); - await get_current_shadow(shadow, argv); - - await sleep(500); // wait half a second - - // Take console input when this sample is not running in CI - if (argv.is_ci == false) { - while (true) { - const userInput = await prompt("Enter desired value: "); - if (userInput === "quit") { - break; - } - else { - let data_to_send: any = {}; - - if (userInput == "clear_shadow") { - data_to_send = null; - } - else if (userInput == "null") { - data_to_send[shadow_property] = null; - } - else { - data_to_send[shadow_property] = userInput; - } - - await change_shadow_value(shadow, argv, data_to_send); - await get_current_shadow(shadow, argv); - } - } - } - // If this is running in CI, then automatically update the shadow - else { - var messages_sent = 0; - while (messages_sent < 5) { - let data_to_send: any = {} - data_to_send[shadow_property] = "Shadow_Value_" + messages_sent.toString() - await change_shadow_value(shadow, argv, data_to_send); - await get_current_shadow(shadow, argv); - messages_sent += 1 - } + let done = false; + while (!done) { + const userInput : string = await new Promise((resolve) => rl.question("Enter command:\n", resolve)); + done = await handleCommand(context, userInput.trimStart()); } - } catch (error) { console.log(error); } + shadowClient.close(); console.log("Disconnecting.."); - if (connection) { - await connection.disconnect(); - } else { - let stopped = once(client5, "stopped"); - client5.stop(); - await stopped; - client5.close(); - } + let stopped = once(protocolClient, "stopped"); + protocolClient.stop(); + await stopped; + protocolClient.close(); - // force node to wait a second before quitting to finish any promises - await sleep(1000); - console.log("Disconnected"); + console.log("Stopped"); // Quit NodeJS process.exit(0); }