diff --git a/gradle.properties b/gradle.properties index 66584d93..850edfcf 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,18 +13,21 @@ commonsCLIVersion=1.4 commonsCSVVersion=1.8 commonsCodecVersion=1.14 commonsMath3Version=3.6.1 +grizzlyVersion=2.25.1 +gsonVersion=2.8.9 includePravegaCredentials=true jacksonVersion=2.9.10.3 junitVersion=4.12 +jakartaBindVersion=2.3.2 +jaxbVersion=2.3.2 +javaxServletApiVersion=3.0.1 +miloVersion=0.6.3 pravegaCredentialsVersion=0.9.0 pravegaVersion=0.9.0 qosLogbackVersion=1.2.3 slf4jApiVersion=1.7.25 sqliteVersion=3.32.3 -grizzlyVersion=2.25.1 -jakartaBindVersion=2.3.2 -jaxbVersion=2.3.2 -javaxServletApiVersion=3.0.1 + # Application version. This will be overridden by APP_VERSION in scripts/env.sh when using scripts/publish.sh. version=unknown diff --git a/pravega-sensor-collector/build.gradle b/pravega-sensor-collector/build.gradle index 91beb918..0513b018 100644 --- a/pravega-sensor-collector/build.gradle +++ b/pravega-sensor-collector/build.gradle @@ -15,8 +15,8 @@ group = "io.pravega" archivesBaseName = "pravega-sensor-collector" description = "pravega-sensor-collector" mainClassName = "io.pravega.sensor.collector.PravegaSensorCollectorApp" -sourceCompatibility = 1.8 -targetCompatibility = 1.8 +sourceCompatibility = 11 +targetCompatibility = 11 dependencies { compile "org.slf4j:slf4j-api:${slf4jApiVersion}" @@ -37,6 +37,8 @@ dependencies { compile "org.apache.commons:commons-csv:${commonsCSVVersion}" compile "commons-codec:commons-codec:${commonsCodecVersion}" compile "com.github.vladimir-bukhtoyarov:bucket4j-core:${bucket4jVersion}" + compile "org.eclipse.milo:sdk-client:${miloVersion}" + compile "com.google.code.gson:gson:${gsonVersion}" testCompile "junit:junit:${junitVersion}" diff --git a/pravega-sensor-collector/src/main/dist/conf/env-sample-opcua.sh b/pravega-sensor-collector/src/main/dist/conf/env-sample-opcua.sh new file mode 100644 index 00000000..6e08d058 --- /dev/null +++ b/pravega-sensor-collector/src/main/dist/conf/env-sample-opcua.sh @@ -0,0 +1,20 @@ +# +# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +export PRAVEGA_SENSOR_COLLECTOR_OPC1_CLASS=io.pravega.sensor.collector.opcua.OpcUaClientDriver +export PRAVEGA_SENSOR_COLLECTOR_OPC1_SCOPE=examples +export PRAVEGA_SENSOR_COLLECTOR_OPC1_STREAM=opc_sensor_stream +export PRAVEGA_SENSOR_COLLECTOR_OPC1_EXACTLY_ONCE=false +export PRAVEGA_SENSOR_COLLECTOR_OPC1_TRANSACTION_TIMEOUT_MINUTES=2.0 +export PRAVEGA_SENSOR_COLLECTOR_OPC1_ROUTING_KEY=$(hostname) +export PRAVEGA_SENSOR_COLLECTOR_OPC1_POLL_PERIODICITY_MS=1000 +export PRAVEGA_SENSOR_COLLECTOR_OPC1_ENDPOINT=opc.tcp://127.0.0.1:49320 +export PRAVEGA_SENSOR_COLLECTOR_OPC1_NAMESPACE_INDEX=2 +export PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_IDENTIFIER=TestSim.Device1.Random +export PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_FILTER_REGEX='^(?!_).*' diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaClientDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaClientDriver.java new file mode 100644 index 00000000..d76ecc99 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaClientDriver.java @@ -0,0 +1,202 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.pravega.sensor.collector.opcua; + +import com.google.gson.Gson; +import io.pravega.sensor.collector.DeviceDriverConfig; +import io.pravega.sensor.collector.simple.memoryless.SimpleMemorylessDriver; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.stack.client.security.ClientCertificateValidator; +import org.eclipse.milo.opcua.stack.core.AttributeId; +import org.eclipse.milo.opcua.stack.core.Identifiers; +import org.eclipse.milo.opcua.stack.core.NamespaceTable; +import org.eclipse.milo.opcua.stack.core.UaException; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; +import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; +import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection; +import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask; +import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass; +import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; +import org.eclipse.milo.opcua.stack.core.types.structured.*; +import org.eclipse.milo.opcua.stack.core.util.TypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.security.KeyPair; +import java.security.cert.X509Certificate; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; +import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList; + +public class OpcUaClientDriver extends SimpleMemorylessDriver { + + private static final Logger log = LoggerFactory.getLogger(OpcUaClientDriver.class); + private static final Gson jsonParser = new Gson(); + + private static String ENDPOINT = "ENDPOINT"; + private static String NS_INDEX = "NAMESPACE_INDEX"; + private static String NODE_ID = "NODE_IDENTIFIER"; + private static String NODE_FILTER = "NODE_FILTER_REGEX"; + private static OpcUaClient opcUaClient; + private static Pattern nodeFilter; + private static List sensorList; + private static List readValueIds; + private static NamespaceTable ns; + + public OpcUaClientDriver(DeviceDriverConfig config) throws UaException, ExecutionException, InterruptedException { + super(config); + log.info("Trying to establish connection with OPC server"); + //TODO :connection monitoring and restore + opcUaClient = buildClient(); + opcUaClient.connect().get(); + log.info("Connection established with OPC server"); + ns = opcUaClient.getNamespaceTable(); + log.info("Creating sensor List"); + nodeFilter = Pattern.compile(getNodeFilter()); + sensorList = new LinkedList<>(); + if (opcUaClient.getAddressSpace().getNode(getNodeID()).getNodeClass().getValue() == NodeClass.Variable.getValue()) { + sensorList.add(getNodeID()); + } + filterNode(opcUaClient, getNodeID()); + readValueIds = sensorList.stream().map(nodeId -> new ReadValueId(nodeId, AttributeId.Value.uid(), null, null)).collect(Collectors.toUnmodifiableList()); + } + + + @Override + public List readRawData() throws ExecutionException, InterruptedException { + List dataList = new LinkedList<>(); + ReadResponse readResp = opcUaClient.read(0, TimestampsToReturn.Source, readValueIds).get(); + DataValue[] aggregatedData = readResp.getResults(); + int i = 0; + for (DataValue data : aggregatedData) { + // As bulk read operation is in-place read , the list ordering of the input nodes will match the data fetched from responses. + Variant rawVariant = data.getValue(); + String dataTypeClass = TypeUtil.getBackingClass(rawVariant.getDataType().get().toNodeId(ns).get()).getName(); + log.trace("Sensor name {} : Raw Data {}", sensorList.get(i).getIdentifier(), rawVariant.getValue()); + dataList.add(new OpcUaRawData(rawVariant.getValue(), data.getSourceTime().getUtcTime(), sensorList.get(i++).getIdentifier().toString(), dataTypeClass)); + } + return dataList; + } + + private void filterNode(OpcUaClient client, NodeId rootNode) { + BrowseDescription browse = new BrowseDescription( + rootNode, + BrowseDirection.Forward, + Identifiers.References, + true, + uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue()), // Get both Objects and Variable types while browsing + uint(BrowseResultMask.All.getValue()) + ); + + try { + BrowseResult browseResult = client.browse(browse).get(); + List references = toList(browseResult.getReferences()); + + for (ReferenceDescription rd : references) { + if (rd.getBrowseName().getName().equalsIgnoreCase("_Hints")) { + //Skip node iteration if the node name is _Hints as it contains hints about variables creation functions supported by server. + continue; + } else if (nodeFilter.matcher(rd.getBrowseName().getName()).find() && rd.getNodeClass().getValue() == NodeClass.Variable.getValue()) { + //Sensor which matches RegEx and node type being a Variable. + log.info("Qualified Sensor: {}", rd.getNodeId().toNodeId(ns).get().getIdentifier()); + sensorList.add(rd.getNodeId().toNodeId(ns).get()); + } + + rd.getNodeId().toNodeId(client.getNamespaceTable()) + .ifPresent(nodeId -> filterNode(client, nodeId)); + } + } catch (InterruptedException | ExecutionException e) { + log.error("Browsing nodeId={} failed: {}", rootNode, e.getMessage(), e); + } + } + + private NodeId getNodeID() { + return new NodeId(Integer.parseInt(getProperty(NS_INDEX, "2")), getProperty(NODE_ID)); + } + + private String getNodeFilter() { + return getProperty(NODE_FILTER, "^(?!_).*"); + } + + + @Override + public byte[] getEvent(OpcUaRawData rawData) { + return jsonParser.toJson(rawData).getBytes(StandardCharsets.UTF_8); + } + + @Override + public long getTimestamp(OpcUaRawData rawData) { + return rawData.timestamp; + } + + private OpcUaClient buildClient() throws UaException { + return OpcUaClient.create( + getEndpointUrl(), + endpoints -> + endpoints.stream() + .filter(endpointFilter()) + .findFirst(), + configBuilder -> + configBuilder + .setApplicationName(LocalizedText.english("Pravega Sensor Collector opc-ua client")) + .setApplicationUri("urn:pravega:sensor:collector:client") + .setRequestTimeout(UInteger.valueOf(5000)) + .build()); //Insecure Client connection creation. + } + + + //TODO : Fetch appropriate security policy : Current -NONE + private Predicate endpointFilter() { + return e -> SecurityPolicy.None.getUri().equals(e.getSecurityPolicyUri()); + } + + //TODO : Keystore + private ClientCertificateValidator getCertificateValidator() { + return null; + } + + //TODO : Keystore + private X509Certificate[] getClientCertificateChain() { + return null; + } + + //TODO : Keystore + private X509Certificate getClientCertificate() { + return null; + } + + //TODO : Keystore + private KeyPair getClientKeyPair() { + return null; + } + + //TODO : Keystore + private IdentityProvider getIdentityProvider() { + return null; + } + + private String getEndpointUrl() { + return getProperty(ENDPOINT, "opc.tcp://127.0.0.1:49320"); + } + +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaRawData.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaRawData.java new file mode 100644 index 00000000..592685c3 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/opcua/OpcUaRawData.java @@ -0,0 +1,26 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.pravega.sensor.collector.opcua; + +public class OpcUaRawData { + public final String nodeIdentifier; + public final long timestamp; + public final String dataType; + public final Object data; + + public OpcUaRawData(Object data, long timestamp, String nodeIdentifier, String dataType) { + this.data = data; + this.timestamp = timestamp; + this.nodeIdentifier = nodeIdentifier; + this.dataType = dataType; + } + +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/DataCollectorService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/DataCollectorService.java new file mode 100644 index 00000000..8235d230 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/DataCollectorService.java @@ -0,0 +1,69 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.pravega.sensor.collector.simple.memoryless; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import io.pravega.sensor.collector.util.EventWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class DataCollectorService extends AbstractExecutionThreadService { + + private final String instanceName; + private final SimpleMemorylessDriver driver; + private final EventWriter writer; + private final long readPeriodicityMs; + + private static final Logger log = LoggerFactory.getLogger(DataCollectorService.class); + + public DataCollectorService(String instanceName, SimpleMemorylessDriver driver, EventWriter writer, long readPeriodicityMs) { + this.instanceName = instanceName; + this.driver = driver; + this.writer = writer; + this.readPeriodicityMs = readPeriodicityMs; + } + + @Override + protected String serviceName() { + return super.serviceName() + "-" + instanceName; + } + + @Override + protected void run() throws InterruptedException { + for (; ; ) { + try { + final long t0 = System.nanoTime(); + // Place blocking read request to get sensor data. + final List rawData = driver.readRawData(); + int eventCounter = 0; + long timestamp = 0; + for (R dataItr : rawData) { + byte[] event = driver.getEvent(dataItr); + timestamp = Long.max(timestamp, driver.getTimestamp(dataItr)); + //Write the data onto Pravega stream + writer.writeEvent(driver.getRoutingKey(), event); + eventCounter++; + } + writer.flush(); + writer.commit(timestamp); + final double ms = (System.nanoTime() - t0) * 1e-6; + log.info(String.format("Done writing %d event in %.3f ms to Pravega", eventCounter, ms)); + } catch (Exception e) { + log.error("Error", e); + + } + Thread.sleep(readPeriodicityMs); + } + + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java new file mode 100644 index 00000000..5b22e7d8 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java @@ -0,0 +1,141 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.pravega.sensor.collector.simple.memoryless; + +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.impl.ByteArraySerializer; +import io.pravega.sensor.collector.DeviceDriver; +import io.pravega.sensor.collector.DeviceDriverConfig; +import io.pravega.sensor.collector.util.EventWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public abstract class SimpleMemorylessDriver extends DeviceDriver { + + private static final Logger log = LoggerFactory.getLogger(SimpleMemorylessDriver.class); + private static final String SCOPE_KEY = "SCOPE"; + private static final String STREAM_KEY = "STREAM"; + private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE"; + private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES"; + private static final String ROUTING_KEY_KEY = "ROUTING_KEY"; + private static final String SENSOR_POLL_PERIODICITY_MS = "POLL_PERIODICITY_MS"; + + private final DataCollectorService dataCollectorService; + private final EventStreamClientFactory clientFactory; + private final EventWriter writer; + private final String writerId; + private final String routingKey; + private final long readPeriodicityMs; + + public SimpleMemorylessDriver(DeviceDriverConfig config) { + super(config); + final String scopeName = getScopeName(); + final String streamName = getStreamName(); + writerId = java.util.UUID.randomUUID().toString(); //TODO: Bind to meaningful ID if necessary + clientFactory = getEventStreamClientFactory(scopeName); + final double transactionTimeoutMinutes = getTransactionTimeoutMinutes(); + final boolean exactlyOnce = getExactlyOnce(); + routingKey = getRoutingKey(""); + readPeriodicityMs = getReadPeriodicityMs(); + createStream(scopeName, streamName); + writer = EventWriter.create( + clientFactory, + writerId, + streamName, + new ByteArraySerializer(), + EventWriterConfig.builder() + .enableConnectionPooling(true) + .retryAttempts(Integer.MAX_VALUE) + .transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0)) + .build(), + exactlyOnce); + dataCollectorService = new DataCollectorService<>(config.getInstanceName(), this, writer, readPeriodicityMs); + } + + @Override + protected void doStart() { + dataCollectorService.startAsync(); + dataCollectorService.awaitRunning(); + notifyStarted(); + } + + @Override + protected void doStop() { + dataCollectorService.stopAsync(); + dataCollectorService.awaitTerminated(); + notifyStopped(); + } + + @Override + public void close() throws Exception { + super.close(); + log.info("Close the streams/events created in constructor"); + } + + protected String getScopeName() { + return getProperty(SCOPE_KEY); + } + + String getStreamName() { + return getProperty(STREAM_KEY); + } + + /** + * This time duration must not exceed the controller property controller.transaction.maxLeaseValue (milliseconds). + */ + double getTransactionTimeoutMinutes() { + // TODO: Values 24 hours or greater result in the following error: WARN [2020-11-09 04:18:33.837] [grpc-default-executor-0] + // i.p.c.control.impl.ControllerImpl: PingTransaction 00000000-0000-0000-0000-000000000036 failed: + // java.util.concurrent.CompletionException: io.pravega.client.stream.PingFailedException: + // Ping transaction for StreamImpl(scope=examples, streamName=network) 00000000-0000-0000-0000-000000000036 failed with status MAX_EXECUTION_TIME_EXCEEDED + return Double.parseDouble(getProperty(TRANSACTION_TIMEOUT_MINUTES_KEY, Double.toString(18.0 * 60.0))); + } + + boolean getExactlyOnce() { + return Boolean.parseBoolean(getProperty(EXACTLY_ONCE_KEY, Boolean.toString(true))); + } + + /** + * Reads raw data (byte arrays) from a sensor. + */ + abstract public List readRawData() throws Exception; + + /** + * Create a payload event to be written from raw data. + * + * @param rawData + */ + abstract public byte[] getEvent(R rawData); + + /** + * Get timestamp value sourced from server for the particular Raw data object + * + * @param rawData : Object to fetch timestamp from + * @return timestamp in Unix Epoch format + */ + abstract public long getTimestamp(R rawData); + + private String getRoutingKey(String defaultVal) { + return getProperty(ROUTING_KEY_KEY, defaultVal); + } + + private long getReadPeriodicityMs() { + return Long.parseLong(getProperty(SENSOR_POLL_PERIODICITY_MS, Integer.toString(10))); + } + + public String getRoutingKey() { + return routingKey; + } + +} diff --git a/pravega-sensor-collector/src/test/resources/OpcUaClientTest.properties b/pravega-sensor-collector/src/test/resources/OpcUaClientTest.properties new file mode 100644 index 00000000..c6777a64 --- /dev/null +++ b/pravega-sensor-collector/src/test/resources/OpcUaClientTest.properties @@ -0,0 +1,19 @@ +# +# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +PRAVEGA_SENSOR_COLLECTOR_OPC1_CLASS=io.pravega.sensor.collector.opcua.OpcUaClientDriver +PRAVEGA_SENSOR_COLLECTOR_OPC1_SCOPE=examples +PRAVEGA_SENSOR_COLLECTOR_OPC1_STREAM=opc_sensor_stream +PRAVEGA_SENSOR_COLLECTOR_OPC1_EXACTLY_ONCE=false +PRAVEGA_SENSOR_COLLECTOR_OPC1_TRANSACTION_TIMEOUT_MINUTES=2.0 +PRAVEGA_SENSOR_COLLECTOR_OPC1_ROUTING_KEY=$(hostname) +PRAVEGA_SENSOR_COLLECTOR_OPC1_POLL_PERIODICITY_MS=1000 +PRAVEGA_SENSOR_COLLECTOR_OPC1_ENDPOINT=opc.tcp://127.0.0.1:49320 +PRAVEGA_SENSOR_COLLECTOR_OPC1_NAMESPACE_INDEX=2 +PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_IDENTIFIER=TestSim.Device1 +PRAVEGA_SENSOR_COLLECTOR_OPC1_NODE_FILTER_REGEX='^(?!_).*'