From e2cbd683dc0869fe45f24325b8df50d72fd7bd9a Mon Sep 17 00:00:00 2001
From: Vi-Nk <36918693+Vi-Nk@users.noreply.github.com>
Date: Tue, 11 Jan 2022 00:49:37 +0530
Subject: [PATCH] Add OPC UA client driver (#7)
---
gradle.properties | 11 +-
pravega-sensor-collector/build.gradle | 6 +-
.../src/main/dist/conf/env-sample-opcua.sh | 20 ++
.../collector/opcua/OpcUaClientDriver.java | 202 ++++++++++++++++++
.../sensor/collector/opcua/OpcUaRawData.java | 26 +++
.../memoryless/DataCollectorService.java | 69 ++++++
.../memoryless/SimpleMemorylessDriver.java | 141 ++++++++++++
.../test/resources/OpcUaClientTest.properties | 19 ++
8 files changed, 488 insertions(+), 6 deletions(-)
create mode 100644 pravega-sensor-collector/src/main/dist/conf/env-sample-opcua.sh
create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaClientDriver.java
create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaRawData.java
create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/DataCollectorService.java
create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java
create mode 100644 pravega-sensor-collector/src/test/resources/OpcUaClientTest.properties
diff --git a/gradle.properties b/gradle.properties
index 66584d93..850edfcf 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -13,18 +13,21 @@ commonsCLIVersion=1.4
commonsCSVVersion=1.8
commonsCodecVersion=1.14
commonsMath3Version=3.6.1
+grizzlyVersion=2.25.1
+gsonVersion=2.8.9
includePravegaCredentials=true
jacksonVersion=2.9.10.3
junitVersion=4.12
+jakartaBindVersion=2.3.2
+jaxbVersion=2.3.2
+javaxServletApiVersion=3.0.1
+miloVersion=0.6.3
pravegaCredentialsVersion=0.9.0
pravegaVersion=0.9.0
qosLogbackVersion=1.2.3
slf4jApiVersion=1.7.25
sqliteVersion=3.32.3
-grizzlyVersion=2.25.1
-jakartaBindVersion=2.3.2
-jaxbVersion=2.3.2
-javaxServletApiVersion=3.0.1
+
# Application version. This will be overridden by APP_VERSION in scripts/env.sh when using scripts/publish.sh.
version=unknown
diff --git a/pravega-sensor-collector/build.gradle b/pravega-sensor-collector/build.gradle
index 91beb918..0513b018 100644
--- a/pravega-sensor-collector/build.gradle
+++ b/pravega-sensor-collector/build.gradle
@@ -15,8 +15,8 @@ group = "io.pravega"
archivesBaseName = "pravega-sensor-collector"
description = "pravega-sensor-collector"
mainClassName = "io.pravega.sensor.collector.PravegaSensorCollectorApp"
-sourceCompatibility = 1.8
-targetCompatibility = 1.8
+sourceCompatibility = 11
+targetCompatibility = 11
dependencies {
compile "org.slf4j:slf4j-api:${slf4jApiVersion}"
@@ -37,6 +37,8 @@ dependencies {
compile "org.apache.commons:commons-csv:${commonsCSVVersion}"
compile "commons-codec:commons-codec:${commonsCodecVersion}"
compile "com.github.vladimir-bukhtoyarov:bucket4j-core:${bucket4jVersion}"
+ compile "org.eclipse.milo:sdk-client:${miloVersion}"
+ compile "com.google.code.gson:gson:${gsonVersion}"
testCompile "junit:junit:${junitVersion}"
diff --git a/pravega-sensor-collector/src/main/dist/conf/env-sample-opcua.sh b/pravega-sensor-collector/src/main/dist/conf/env-sample-opcua.sh
new file mode 100644
index 00000000..6e08d058
--- /dev/null
+++ b/pravega-sensor-collector/src/main/dist/conf/env-sample-opcua.sh
@@ -0,0 +1,20 @@
+#
+# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_CLASS=io.pravega.sensor.collector.opcua.OpcUaClientDriver
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_SCOPE=examples
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_STREAM=opc_sensor_stream
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_EXACTLY_ONCE=false
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_TRANSACTION_TIMEOUT_MINUTES=2.0
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_ROUTING_KEY=$(hostname)
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_POLL_PERIODICITY_MS=1000
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_ENDPOINT=opc.tcp://127.0.0.1:49320
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_NAMESPACE_INDEX=2
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_IDENTIFIER=TestSim.Device1.Random
+export PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_FILTER_REGEX='^(?!_).*'
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaClientDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaClientDriver.java
new file mode 100644
index 00000000..d76ecc99
--- /dev/null
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaClientDriver.java
@@ -0,0 +1,202 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.pravega.sensor.collector.opcua;
+
+import com.google.gson.Gson;
+import io.pravega.sensor.collector.DeviceDriverConfig;
+import io.pravega.sensor.collector.simple.memoryless.SimpleMemorylessDriver;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.stack.client.security.ClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.NamespaceTable;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.*;
+import org.eclipse.milo.opcua.stack.core.util.TypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
+
+public class OpcUaClientDriver extends SimpleMemorylessDriver {
+
+ private static final Logger log = LoggerFactory.getLogger(OpcUaClientDriver.class);
+ private static final Gson jsonParser = new Gson();
+
+ private static String ENDPOINT = "ENDPOINT";
+ private static String NS_INDEX = "NAMESPACE_INDEX";
+ private static String NODE_ID = "NODE_IDENTIFIER";
+ private static String NODE_FILTER = "NODE_FILTER_REGEX";
+ private static OpcUaClient opcUaClient;
+ private static Pattern nodeFilter;
+ private static List sensorList;
+ private static List readValueIds;
+ private static NamespaceTable ns;
+
+ public OpcUaClientDriver(DeviceDriverConfig config) throws UaException, ExecutionException, InterruptedException {
+ super(config);
+ log.info("Trying to establish connection with OPC server");
+ //TODO :connection monitoring and restore
+ opcUaClient = buildClient();
+ opcUaClient.connect().get();
+ log.info("Connection established with OPC server");
+ ns = opcUaClient.getNamespaceTable();
+ log.info("Creating sensor List");
+ nodeFilter = Pattern.compile(getNodeFilter());
+ sensorList = new LinkedList<>();
+ if (opcUaClient.getAddressSpace().getNode(getNodeID()).getNodeClass().getValue() == NodeClass.Variable.getValue()) {
+ sensorList.add(getNodeID());
+ }
+ filterNode(opcUaClient, getNodeID());
+ readValueIds = sensorList.stream().map(nodeId -> new ReadValueId(nodeId, AttributeId.Value.uid(), null, null)).collect(Collectors.toUnmodifiableList());
+ }
+
+
+ @Override
+ public List readRawData() throws ExecutionException, InterruptedException {
+ List dataList = new LinkedList<>();
+ ReadResponse readResp = opcUaClient.read(0, TimestampsToReturn.Source, readValueIds).get();
+ DataValue[] aggregatedData = readResp.getResults();
+ int i = 0;
+ for (DataValue data : aggregatedData) {
+ // As bulk read operation is in-place read , the list ordering of the input nodes will match the data fetched from responses.
+ Variant rawVariant = data.getValue();
+ String dataTypeClass = TypeUtil.getBackingClass(rawVariant.getDataType().get().toNodeId(ns).get()).getName();
+ log.trace("Sensor name {} : Raw Data {}", sensorList.get(i).getIdentifier(), rawVariant.getValue());
+ dataList.add(new OpcUaRawData(rawVariant.getValue(), data.getSourceTime().getUtcTime(), sensorList.get(i++).getIdentifier().toString(), dataTypeClass));
+ }
+ return dataList;
+ }
+
+ private void filterNode(OpcUaClient client, NodeId rootNode) {
+ BrowseDescription browse = new BrowseDescription(
+ rootNode,
+ BrowseDirection.Forward,
+ Identifiers.References,
+ true,
+ uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue()), // Get both Objects and Variable types while browsing
+ uint(BrowseResultMask.All.getValue())
+ );
+
+ try {
+ BrowseResult browseResult = client.browse(browse).get();
+ List references = toList(browseResult.getReferences());
+
+ for (ReferenceDescription rd : references) {
+ if (rd.getBrowseName().getName().equalsIgnoreCase("_Hints")) {
+ //Skip node iteration if the node name is _Hints as it contains hints about variables creation functions supported by server.
+ continue;
+ } else if (nodeFilter.matcher(rd.getBrowseName().getName()).find() && rd.getNodeClass().getValue() == NodeClass.Variable.getValue()) {
+ //Sensor which matches RegEx and node type being a Variable.
+ log.info("Qualified Sensor: {}", rd.getNodeId().toNodeId(ns).get().getIdentifier());
+ sensorList.add(rd.getNodeId().toNodeId(ns).get());
+ }
+
+ rd.getNodeId().toNodeId(client.getNamespaceTable())
+ .ifPresent(nodeId -> filterNode(client, nodeId));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Browsing nodeId={} failed: {}", rootNode, e.getMessage(), e);
+ }
+ }
+
+ private NodeId getNodeID() {
+ return new NodeId(Integer.parseInt(getProperty(NS_INDEX, "2")), getProperty(NODE_ID));
+ }
+
+ private String getNodeFilter() {
+ return getProperty(NODE_FILTER, "^(?!_).*");
+ }
+
+
+ @Override
+ public byte[] getEvent(OpcUaRawData rawData) {
+ return jsonParser.toJson(rawData).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public long getTimestamp(OpcUaRawData rawData) {
+ return rawData.timestamp;
+ }
+
+ private OpcUaClient buildClient() throws UaException {
+ return OpcUaClient.create(
+ getEndpointUrl(),
+ endpoints ->
+ endpoints.stream()
+ .filter(endpointFilter())
+ .findFirst(),
+ configBuilder ->
+ configBuilder
+ .setApplicationName(LocalizedText.english("Pravega Sensor Collector opc-ua client"))
+ .setApplicationUri("urn:pravega:sensor:collector:client")
+ .setRequestTimeout(UInteger.valueOf(5000))
+ .build()); //Insecure Client connection creation.
+ }
+
+
+ //TODO : Fetch appropriate security policy : Current -NONE
+ private Predicate endpointFilter() {
+ return e -> SecurityPolicy.None.getUri().equals(e.getSecurityPolicyUri());
+ }
+
+ //TODO : Keystore
+ private ClientCertificateValidator getCertificateValidator() {
+ return null;
+ }
+
+ //TODO : Keystore
+ private X509Certificate[] getClientCertificateChain() {
+ return null;
+ }
+
+ //TODO : Keystore
+ private X509Certificate getClientCertificate() {
+ return null;
+ }
+
+ //TODO : Keystore
+ private KeyPair getClientKeyPair() {
+ return null;
+ }
+
+ //TODO : Keystore
+ private IdentityProvider getIdentityProvider() {
+ return null;
+ }
+
+ private String getEndpointUrl() {
+ return getProperty(ENDPOINT, "opc.tcp://127.0.0.1:49320");
+ }
+
+}
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaRawData.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaRawData.java
new file mode 100644
index 00000000..592685c3
--- /dev/null
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaRawData.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.pravega.sensor.collector.opcua;
+
+public class OpcUaRawData {
+ public final String nodeIdentifier;
+ public final long timestamp;
+ public final String dataType;
+ public final Object data;
+
+ public OpcUaRawData(Object data, long timestamp, String nodeIdentifier, String dataType) {
+ this.data = data;
+ this.timestamp = timestamp;
+ this.nodeIdentifier = nodeIdentifier;
+ this.dataType = dataType;
+ }
+
+}
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/DataCollectorService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/DataCollectorService.java
new file mode 100644
index 00000000..8235d230
--- /dev/null
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/DataCollectorService.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.pravega.sensor.collector.simple.memoryless;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import io.pravega.sensor.collector.util.EventWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DataCollectorService extends AbstractExecutionThreadService {
+
+ private final String instanceName;
+ private final SimpleMemorylessDriver driver;
+ private final EventWriter writer;
+ private final long readPeriodicityMs;
+
+ private static final Logger log = LoggerFactory.getLogger(DataCollectorService.class);
+
+ public DataCollectorService(String instanceName, SimpleMemorylessDriver driver, EventWriter writer, long readPeriodicityMs) {
+ this.instanceName = instanceName;
+ this.driver = driver;
+ this.writer = writer;
+ this.readPeriodicityMs = readPeriodicityMs;
+ }
+
+ @Override
+ protected String serviceName() {
+ return super.serviceName() + "-" + instanceName;
+ }
+
+ @Override
+ protected void run() throws InterruptedException {
+ for (; ; ) {
+ try {
+ final long t0 = System.nanoTime();
+ // Place blocking read request to get sensor data.
+ final List rawData = driver.readRawData();
+ int eventCounter = 0;
+ long timestamp = 0;
+ for (R dataItr : rawData) {
+ byte[] event = driver.getEvent(dataItr);
+ timestamp = Long.max(timestamp, driver.getTimestamp(dataItr));
+ //Write the data onto Pravega stream
+ writer.writeEvent(driver.getRoutingKey(), event);
+ eventCounter++;
+ }
+ writer.flush();
+ writer.commit(timestamp);
+ final double ms = (System.nanoTime() - t0) * 1e-6;
+ log.info(String.format("Done writing %d event in %.3f ms to Pravega", eventCounter, ms));
+ } catch (Exception e) {
+ log.error("Error", e);
+
+ }
+ Thread.sleep(readPeriodicityMs);
+ }
+
+ }
+}
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java
new file mode 100644
index 00000000..5b22e7d8
--- /dev/null
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.pravega.sensor.collector.simple.memoryless;
+
+import io.pravega.client.EventStreamClientFactory;
+import io.pravega.client.stream.EventWriterConfig;
+import io.pravega.client.stream.impl.ByteArraySerializer;
+import io.pravega.sensor.collector.DeviceDriver;
+import io.pravega.sensor.collector.DeviceDriverConfig;
+import io.pravega.sensor.collector.util.EventWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public abstract class SimpleMemorylessDriver extends DeviceDriver {
+
+ private static final Logger log = LoggerFactory.getLogger(SimpleMemorylessDriver.class);
+ private static final String SCOPE_KEY = "SCOPE";
+ private static final String STREAM_KEY = "STREAM";
+ private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
+ private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
+ private static final String ROUTING_KEY_KEY = "ROUTING_KEY";
+ private static final String SENSOR_POLL_PERIODICITY_MS = "POLL_PERIODICITY_MS";
+
+ private final DataCollectorService dataCollectorService;
+ private final EventStreamClientFactory clientFactory;
+ private final EventWriter writer;
+ private final String writerId;
+ private final String routingKey;
+ private final long readPeriodicityMs;
+
+ public SimpleMemorylessDriver(DeviceDriverConfig config) {
+ super(config);
+ final String scopeName = getScopeName();
+ final String streamName = getStreamName();
+ writerId = java.util.UUID.randomUUID().toString(); //TODO: Bind to meaningful ID if necessary
+ clientFactory = getEventStreamClientFactory(scopeName);
+ final double transactionTimeoutMinutes = getTransactionTimeoutMinutes();
+ final boolean exactlyOnce = getExactlyOnce();
+ routingKey = getRoutingKey("");
+ readPeriodicityMs = getReadPeriodicityMs();
+ createStream(scopeName, streamName);
+ writer = EventWriter.create(
+ clientFactory,
+ writerId,
+ streamName,
+ new ByteArraySerializer(),
+ EventWriterConfig.builder()
+ .enableConnectionPooling(true)
+ .retryAttempts(Integer.MAX_VALUE)
+ .transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0))
+ .build(),
+ exactlyOnce);
+ dataCollectorService = new DataCollectorService<>(config.getInstanceName(), this, writer, readPeriodicityMs);
+ }
+
+ @Override
+ protected void doStart() {
+ dataCollectorService.startAsync();
+ dataCollectorService.awaitRunning();
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ dataCollectorService.stopAsync();
+ dataCollectorService.awaitTerminated();
+ notifyStopped();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ log.info("Close the streams/events created in constructor");
+ }
+
+ protected String getScopeName() {
+ return getProperty(SCOPE_KEY);
+ }
+
+ String getStreamName() {
+ return getProperty(STREAM_KEY);
+ }
+
+ /**
+ * This time duration must not exceed the controller property controller.transaction.maxLeaseValue (milliseconds).
+ */
+ double getTransactionTimeoutMinutes() {
+ // TODO: Values 24 hours or greater result in the following error: WARN [2020-11-09 04:18:33.837] [grpc-default-executor-0]
+ // i.p.c.control.impl.ControllerImpl: PingTransaction 00000000-0000-0000-0000-000000000036 failed:
+ // java.util.concurrent.CompletionException: io.pravega.client.stream.PingFailedException:
+ // Ping transaction for StreamImpl(scope=examples, streamName=network) 00000000-0000-0000-0000-000000000036 failed with status MAX_EXECUTION_TIME_EXCEEDED
+ return Double.parseDouble(getProperty(TRANSACTION_TIMEOUT_MINUTES_KEY, Double.toString(18.0 * 60.0)));
+ }
+
+ boolean getExactlyOnce() {
+ return Boolean.parseBoolean(getProperty(EXACTLY_ONCE_KEY, Boolean.toString(true)));
+ }
+
+ /**
+ * Reads raw data (byte arrays) from a sensor.
+ */
+ abstract public List readRawData() throws Exception;
+
+ /**
+ * Create a payload event to be written from raw data.
+ *
+ * @param rawData
+ */
+ abstract public byte[] getEvent(R rawData);
+
+ /**
+ * Get timestamp value sourced from server for the particular Raw data object
+ *
+ * @param rawData : Object to fetch timestamp from
+ * @return timestamp in Unix Epoch format
+ */
+ abstract public long getTimestamp(R rawData);
+
+ private String getRoutingKey(String defaultVal) {
+ return getProperty(ROUTING_KEY_KEY, defaultVal);
+ }
+
+ private long getReadPeriodicityMs() {
+ return Long.parseLong(getProperty(SENSOR_POLL_PERIODICITY_MS, Integer.toString(10)));
+ }
+
+ public String getRoutingKey() {
+ return routingKey;
+ }
+
+}
diff --git a/pravega-sensor-collector/src/test/resources/OpcUaClientTest.properties b/pravega-sensor-collector/src/test/resources/OpcUaClientTest.properties
new file mode 100644
index 00000000..c6777a64
--- /dev/null
+++ b/pravega-sensor-collector/src/test/resources/OpcUaClientTest.properties
@@ -0,0 +1,19 @@
+#
+# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+PRAVEGA_SENSOR_COLLECTOR_OPC1_CLASS=io.pravega.sensor.collector.opcua.OpcUaClientDriver
+PRAVEGA_SENSOR_COLLECTOR_OPC1_SCOPE=examples
+PRAVEGA_SENSOR_COLLECTOR_OPC1_STREAM=opc_sensor_stream
+PRAVEGA_SENSOR_COLLECTOR_OPC1_EXACTLY_ONCE=false
+PRAVEGA_SENSOR_COLLECTOR_OPC1_TRANSACTION_TIMEOUT_MINUTES=2.0
+PRAVEGA_SENSOR_COLLECTOR_OPC1_ROUTING_KEY=$(hostname)
+PRAVEGA_SENSOR_COLLECTOR_OPC1_POLL_PERIODICITY_MS=1000
+PRAVEGA_SENSOR_COLLECTOR_OPC1_ENDPOINT=opc.tcp://127.0.0.1:49320
+PRAVEGA_SENSOR_COLLECTOR_OPC1_NAMESPACE_INDEX=2
+PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_IDENTIFIER=TestSim.Device1
+PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_FILTER_REGEX='^(?!_).*'