This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit eb079f92a09404df93545c436cc2ff5d71353918 Author: Andrey Skorikov <andrey.skori...@codecentric.de> AuthorDate: Thu Oct 4 15:54:51 2018 +0200 adjusted connectors and examples to the new api --- .../azure/iothub/S7PlcToAzureIoTHubSample.java | 5 +--- .../google/iotcore/S7PlcToGoogleIoTCoreSample.java | 5 +--- .../plc4x/java/examples/helloplc4x/HelloPlc4x.java | 22 +++++++------- .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 17 +++-------- .../apache/plc4x/camel/Plc4XPollingConsumer.java | 14 +++++---- .../java/org/apache/plc4x/camel/Plc4XProducer.java | 4 +-- .../java/org/apache/plc4x/camel/MockDriver.java | 11 ++----- .../org/apache/plc4x/camel/Plc4XProducerTest.java | 3 -- .../apache/plc4x/edgent/PlcConnectionAdapter.java | 14 ++------- .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 9 +++--- .../org/apache/plc4x/kafka/Plc4xSourceTask.java | 23 +++++++-------- .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java | 8 ++--- .../apache/plc4x/nifi/Plc4xSourceProcessor.java | 24 +++++++-------- .../plc4x/java/api/connection/PlcConnection.java | 8 ----- .../plc4x/java/api/connection/PlcReader.java | 2 -- .../plc4x/java/api/connection/PlcSubscriber.java | 4 --- .../plc4x/java/api/connection/PlcWriter.java | 2 -- .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java | 13 ++++----- .../base/connection/AbstractPlcConnection.java | 28 ------------------ .../java/ethernetip/ManualPlc4XEtherNetIpTest.java | 8 ++--- .../plc4x/java/modbus/ManualPlc4XModbusTest.java | 34 +++++++++------------- .../org/apache/plc4x/java/test/TestConnection.java | 16 ---------- 22 files changed, 83 insertions(+), 191 deletions(-) diff --git a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java index a4f80cc..45ac6c6 100644 --- a/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java +++ b/examples/azure/src/main/java/org/apache/plc4x/java/examples/azure/iothub/S7PlcToAzureIoTHubSample.java @@ -23,7 +23,6 @@ import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol; import com.microsoft.azure.sdk.iot.device.Message; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.slf4j.Logger; @@ -62,15 +61,13 @@ public class S7PlcToAzureIoTHubSample { DeviceClient client = new DeviceClient(iotConnectionString, IotHubClientProtocol.MQTT); client.open(); - // Get a reader instance. - PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalStateException::new); // Prepare a read request. PlcReadRequest request = plcConnection.readRequestBuilder().get().addItem(FIELD_NAME, addressString).build(); while (!Thread.currentThread().isInterrupted()) { // Simulate telemetry. - PlcReadResponse response = plcReader.read(request).get(); + PlcReadResponse response = request.execute().get(); response.getAllLongs(FIELD_NAME) .forEach(longValue -> { String result = Long.toBinaryString(longValue); diff --git a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java index fba969c..c6aac9b 100644 --- a/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java +++ b/examples/google/src/main/java/org/apache/plc4x/java/examples/google/iotcore/S7PlcToGoogleIoTCoreSample.java @@ -23,7 +23,6 @@ import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.eclipse.paho.client.mqttv3.*; @@ -233,13 +232,11 @@ public class S7PlcToGoogleIoTCoreSample { try (PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://10.10.64.20/1/1")) { logger.info("Connected"); - PlcReader plcReader = plcConnection.getReader().orElseThrow(IllegalAccessError::new); - PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("outputs", "OUTPUTS/0").build(); while (!Thread.currentThread().isInterrupted()) { - PlcReadResponse plcReadResponse = plcReader.read(readRequest).get(); + PlcReadResponse plcReadResponse = readRequest.execute().get(); // Refresh the connection credentials before the JWT expires. // [START iot_mqtt_jwt_refresh] diff --git a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java index 402c856..05190e9 100644 --- a/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java +++ b/examples/hello-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java @@ -20,14 +20,12 @@ package org.apache.plc4x.java.examples.helloplc4x; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.apache.plc4x.java.api.types.PlcResponseCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; import java.util.concurrent.CompletableFuture; public class HelloPlc4x { @@ -49,34 +47,36 @@ public class HelloPlc4x { // Establish a connection to the plc using the url provided as first argument try (PlcConnection plcConnection = new PlcDriverManager().getConnection(args[0])) { - Optional<PlcReader> reader = plcConnection.getReader(); - // Check if this connection support reading of data. - if (reader.isPresent()) { - PlcReader plcReader = reader.get(); + if (plcConnection.readRequestBuilder().isPresent()) { // Create a new read request: // - Give the single item requested the alias name "value" - PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get(); + PlcReadRequest.Builder syncBuilder = plcConnection.readRequestBuilder().get(); for (int i = 1; i < args.length; i++) { - builder.addItem("value-" + i, args[i]); + syncBuilder.addItem("value-" + i, args[i]); } - PlcReadRequest plcReadRequest = builder.build(); + PlcReadRequest syncPlcReadRequest = syncBuilder.build(); ////////////////////////////////////////////////////////// // Read synchronously ... // NOTICE: the ".get()" immediately lets this thread pause till // the response is processed and available. System.out.println("\nSynchronous request ..."); - PlcReadResponse syncResponse = plcReader.read(plcReadRequest).get(); + PlcReadResponse syncResponse = syncPlcReadRequest.execute().get(); // Simply iterating over the field names returned in the response. printResponse(syncResponse); ////////////////////////////////////////////////////////// // Read asynchronously ... // Register a callback executed as soon as a response arives. + PlcReadRequest.Builder asyncBuilder = plcConnection.readRequestBuilder().get(); + for (int i = 1; i < args.length; i++) { + asyncBuilder.addItem("value-" + i, args[i]); + } + PlcReadRequest asyncPlcReadRequest = asyncBuilder.build(); System.out.println("\n\nAsynchronous request ..."); - CompletableFuture<PlcReadResponse> asyncResponse = plcReader.read(plcReadRequest); + CompletableFuture<? extends PlcReadResponse> asyncResponse = asyncPlcReadRequest.execute(); asyncResponse.whenComplete((readResponse, throwable) -> { if (readResponse != null) { printResponse(syncResponse); diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java index 9ec281c..e2f983c 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java @@ -24,7 +24,6 @@ import org.apache.camel.support.LoggingExceptionHandler; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcSubscriber; import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.messages.*; import org.slf4j.Logger; @@ -49,7 +48,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util private Class<?> dataType; private PlcSubscriptionResponse subscriptionResponse; - public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException { this.endpoint = endpoint; this.dataType = endpoint.getDataType(); @@ -80,22 +78,19 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util @Override protected void doStart() throws InterruptedException, ExecutionException, PlcException { - PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow( - () -> new PlcException("Connection doesn't support subscriptions.")); // TODO: Is it correct to only support one field? PlcSubscriptionRequest request = plcConnection.subscriptionRequestBuilder().get() .addCyclicField("default", fieldQuery, Duration.of(3, ChronoUnit.SECONDS)).build(); - PlcSubscriptionResponse plcSubscriptionResponse = plcSubscriber.subscribe(request).get(); + subscriptionResponse = request.execute().get(); // TODO: we need to return the plcSubscriptionResponse here too as we need this to unsubscribe... - plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles()); + // TODO: figure out what to do with this + // plcSubscriber.register(this, plcSubscriptionResponse.getSubscriptionHandles()); } @Override protected void doStop() throws InterruptedException, ExecutionException, TimeoutException, PlcException { - PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow( - () -> new PlcException("Connection doesn't support subscriptions.")); PlcUnsubscriptionRequest request = plcConnection.unsubscriptionRequestBuilder().get().addHandles(subscriptionResponse.getSubscriptionHandles()).build(); - CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture = plcSubscriber.unsubscribe(request); + CompletableFuture<? extends PlcUnsubscriptionResponse> unsubscriptionFuture = request.execute(); PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS); // TODO: Handle the response ... try { @@ -105,10 +100,6 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util } } - private PlcSubscriber getSubscriber() { - return plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("No subscriber available")); - } - @Override public void accept(PlcSubscriptionEvent plcSubscriptionEvent) { LOGGER.debug("Received {}", plcSubscriptionEvent); diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java index 4aee7ad..3e90b44 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XPollingConsumer.java @@ -44,8 +44,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu private Plc4XEndpoint endpoint; private ExceptionHandler exceptionHandler; private PlcConnection plcConnection; - private PlcReader plcReader; - private PlcReadRequest readRequest; + private PlcReadRequest.Builder requestBuilder; private Class dataType; public Plc4XPollingConsumer(Plc4XEndpoint endpoint) throws PlcException { @@ -54,8 +53,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", ""); this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI); - this.plcReader = plcConnection.getReader().orElseThrow(() -> new PlcException("This connection doesn't support reading.")); - readRequest = plcConnection.readRequestBuilder().get().addItem("default", endpoint.getAddress()).build(); + this.requestBuilder = plcConnection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading.")); } @Override @@ -79,7 +77,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu @Override public Exchange receive() { Exchange exchange = endpoint.createExchange(); - CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest); + CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute(); try { PlcReadResponse plcReadResponse = read.get(); exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default"))); @@ -97,7 +95,7 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu @Override public Exchange receive(long timeout) { Exchange exchange = endpoint.createExchange(); - CompletableFuture<? extends PlcReadResponse> read = plcReader.read(readRequest); + CompletableFuture<? extends PlcReadResponse> read = createReadRequest().execute(); try { PlcReadResponse plcReadResponse = read.get(timeout, TimeUnit.MILLISECONDS); exchange.getIn().setBody(unwrapIfSingle(plcReadResponse.getAllObjects("default"))); @@ -121,6 +119,10 @@ public class Plc4XPollingConsumer extends ServiceSupport implements PollingConsu } } + private PlcReadRequest createReadRequest() { + return requestBuilder.addItem("default", endpoint.getAddress()).build(); + } + private Object unwrapIfSingle(Collection collection) { if (collection.isEmpty()) { return null; diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java index b4adbcc..f43e603 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java @@ -23,7 +23,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.messages.PlcWriteRequest; import org.apache.plc4x.java.api.messages.PlcWriteResponse; @@ -60,9 +59,8 @@ public class Plc4XProducer extends DefaultAsyncProducer { Object value = in.getBody(Object.class); // builder.addItem(fieldName, fieldQuery, value); } - PlcWriter plcWriter = plcConnection.getWriter().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found")); PlcWriteRequest.Builder builder = plcConnection.writeRequestBuilder().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found")); - CompletableFuture<? extends PlcWriteResponse> completableFuture = plcWriter.write(builder.build()); + CompletableFuture<? extends PlcWriteResponse> completableFuture = builder.build().execute(); int currentlyOpenRequests = openRequests.incrementAndGet(); try { log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests); diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java index 9a3ea9e..2be45b2 100644 --- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java +++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java @@ -21,13 +21,8 @@ package org.apache.plc4x.camel; import org.apache.plc4x.java.api.PlcDriver; import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.connection.PlcSubscriber; -import org.apache.plc4x.java.api.connection.PlcWriter; -import org.apache.plc4x.java.api.messages.PlcReadRequest; -import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; -import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse; -import org.apache.plc4x.java.api.messages.PlcWriteRequest; +import org.apache.plc4x.java.api.messages.*; import org.apache.plc4x.java.base.messages.DefaultPlcSubscriptionResponse; import org.apache.plc4x.java.base.messages.InternalPlcSubscriptionRequest; import org.slf4j.Logger; @@ -61,11 +56,10 @@ public class MockDriver implements PlcDriver { public PlcConnection connect(String url) { // Mock a connection. PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS); - when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS))); - when(plcConnectionMock.getReader()).thenReturn(Optional.of(mock(PlcReader.class, RETURNS_DEEP_STUBS))); when(plcConnectionMock.readRequestBuilder()).thenReturn(Optional.of(mock(PlcReadRequest.Builder.class, RETURNS_DEEP_STUBS))); when(plcConnectionMock.writeRequestBuilder()).thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS))); when(plcConnectionMock.subscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcSubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS))); + when(plcConnectionMock.unsubscriptionRequestBuilder()).thenReturn(Optional.of(mock(PlcUnsubscriptionRequest.Builder.class, RETURNS_DEEP_STUBS))); // Mock a typical subscriber. PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS); @@ -97,7 +91,6 @@ public class MockDriver implements PlcDriver { responseFuture.complete(response); return responseFuture; }); - when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber)); return plcConnectionMock; } diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java index 60706d6..0068a11 100644 --- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java +++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java @@ -48,9 +48,6 @@ public class Plc4XProducerTest { when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1"); PlcDriverManager plcDriverManagerMock = mock(PlcDriverManager.class, RETURNS_DEEP_STUBS); - when(plcDriverManagerMock.getConnection(anyString()).getWriter()) - .thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS))); - when(plcDriverManagerMock.getConnection(anyString()).writeRequestBuilder()) .thenReturn(Optional.of(mock(PlcWriteRequest.Builder.class, RETURNS_DEEP_STUBS))); diff --git a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java index 4e1fbcd..e3e23cc 100644 --- a/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java +++ b/integrations/apache-edgent/src/main/java/org/apache/plc4x/edgent/PlcConnectionAdapter.java @@ -24,8 +24,6 @@ import org.apache.edgent.function.Function; import org.apache.edgent.function.Supplier; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; -import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; @@ -117,9 +115,7 @@ public class PlcConnectionAdapter implements AutoCloseable { PlcConnection connection = null; try { connection = getConnection(); - PlcReader reader = connection.getReader() - .orElseThrow(() -> new PlcException("This connection doesn't support reading")); - return reader.read(readRequest).get(); + return readRequest.execute().get(); } catch (Exception e) { logger.error("reading from plc device {} {} failed", connection, readRequest, e); return null; @@ -154,10 +150,8 @@ public class PlcConnectionAdapter implements AutoCloseable { PlcField field = null; try { connection = getConnection(); - PlcReader reader = connection.getReader() - .orElseThrow(() -> new PlcException("This connection doesn't support reading")); PlcReadRequest readRequest = connection.readRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support reading")).addItem(FIELD_NAME, fieldQuery).build(); - PlcReadResponse readResponse = reader.read(readRequest).get(); + PlcReadResponse readResponse = readRequest.execute().get(); Object value = null; switch (clientDatatype) { case BYTE: @@ -219,12 +213,10 @@ public class PlcConnectionAdapter implements AutoCloseable { PlcConnection connection = null; try { connection = getConnection(); - PlcWriter writer = connection.getWriter() - .orElseThrow(() -> new PlcException("This connection doesn't support writing")); PlcWriteRequest.Builder builder = connection.writeRequestBuilder().orElseThrow(() -> new PlcException("This connection doesn't support writing")); PlcWriteRequest writeRequest = builder.build(); addItem(builder, clientDatatype, fieldQuery, fieldValue); - writer.write(writeRequest).get(); + writeRequest.execute().get(); } catch (Exception e) { logger.error("writing to plc device {} {} failed", connection, fieldQuery, e); } diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java index 6829294..7459ac5 100644 --- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java +++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java @@ -24,7 +24,6 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.messages.PlcWriteRequest; import org.apache.plc4x.kafka.util.VersionUtil; @@ -37,7 +36,6 @@ public class Plc4xSinkTask extends SinkTask { private String url; private PlcConnection plcConnection; - private PlcWriter plcWriter; @Override public String version() { @@ -51,8 +49,9 @@ public class Plc4xSinkTask extends SinkTask { openConnection(); - plcWriter = plcConnection.getWriter() - .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection")); + if (!plcConnection.writeRequestBuilder().isPresent()) { + throw new ConnectException("Writing not supported on this connection"); + } } @Override @@ -107,7 +106,7 @@ public class Plc4xSinkTask extends SinkTask { private void doWrite(PlcWriteRequest request) { try { - plcWriter.write(request).get(); + request.execute().get(); } catch (ExecutionException | InterruptedException e) { throw new ConnectException("Caught exception during write", e); } diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java index 08b3ec1..7a2b21b 100644 --- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java +++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java @@ -27,7 +27,6 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; @@ -61,8 +60,6 @@ public class Plc4xSourceTask extends SourceTask { private List<String> queries; private PlcConnection plcConnection; - private PlcReader plcReader; - private PlcReadRequest plcRequest; // TODO: should we use shared (static) thread pool for this? private ScheduledExecutorService scheduler; @@ -82,15 +79,9 @@ public class Plc4xSourceTask extends SourceTask { openConnection(); - plcReader = plcConnection.getReader() - .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection")); - - - PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get(); - for (String query : queries) { - builder.addItem(query, query); + if (!plcConnection.readRequestBuilder().isPresent()) { + throw new ConnectException("Reading not supported on this connection"); } - plcRequest = builder.build(); int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG)); scheduler = Executors.newScheduledThreadPool(1); @@ -154,7 +145,7 @@ public class Plc4xSourceTask extends SourceTask { } private List<SourceRecord> doFetch() throws InterruptedException { - final CompletableFuture<PlcReadResponse> response = plcReader.read(plcRequest); + final CompletableFuture<? extends PlcReadResponse> response = createReadRequest().execute(); try { final PlcReadResponse received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS); return extractValues(received); @@ -165,6 +156,14 @@ public class Plc4xSourceTask extends SourceTask { } } + private PlcReadRequest createReadRequest() { + PlcReadRequest.Builder builder = plcConnection.readRequestBuilder().get(); + for (String query : queries) { + builder.addItem(query, query); + } + return builder.build(); + } + private List<SourceRecord> extractValues(PlcReadResponse response) { final List<SourceRecord> result = new LinkedList<>(); for (String query : queries) { diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java index 973855b..a72ea23 100644 --- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java +++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java @@ -29,7 +29,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.messages.PlcWriteRequest; import org.apache.plc4x.java.api.messages.PlcWriteResponse; @@ -53,8 +52,9 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor { // Get an instance of a component able to write to a PLC. PlcConnection connection = getConnection(); - PlcWriter writer = connection.getWriter().orElseThrow( - () -> new ProcessException("Writing not supported by connection")); + if (!connection.writeRequestBuilder().isPresent()) { + throw new ProcessException("Writing not supported by connection"); + } // Prepare the request. PlcWriteRequest.Builder builder = connection.writeRequestBuilder().get(); @@ -67,7 +67,7 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor { PlcWriteRequest writeRequest = builder.build(); // Send the request to the PLC. - CompletableFuture<PlcWriteResponse> future = writer.write(writeRequest); + CompletableFuture<? extends PlcWriteResponse> future = writeRequest.execute(); future.whenComplete((response, throwable) -> { if (throwable != null) { session.transfer(session.create(), FAILURE); diff --git a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java index 190a00d..7254e90 100644 --- a/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java +++ b/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java @@ -28,7 +28,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.json.simple.JSONObject; @@ -47,23 +46,24 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { // Get an instance of a component able to read from a PLC. PlcConnection connection = getConnection(); - PlcReader reader = connection.getReader().orElseThrow( - () -> new ProcessException("Writing not supported by connection")); // Prepare the request. - PlcReadRequest.Builder builder = connection.readRequestBuilder().get(); - getFields().forEach(field -> { - String address = getAddress(field); - if(address != null) { - builder.addItem(field, address); - } - }); - PlcReadRequest readRequest = builder.build(); + if (!connection.readRequestBuilder().isPresent()) { + throw new ProcessException("Writing not supported by connection"); + } FlowFile flowFile = session.create(); session.append(flowFile, out -> { try { - PlcReadResponse response = reader.read(readRequest).get(); + PlcReadRequest.Builder builder = connection.readRequestBuilder().get(); + getFields().forEach(field -> { + String address = getAddress(field); + if(address != null) { + builder.addItem(field, address); + } + }); + PlcReadRequest readRequest = builder.build(); + PlcReadResponse response = readRequest.execute().get(); JSONObject obj = new JSONObject(); for (String fieldName : response.getFieldNames()) { for(int i = 0; i < response.getNumberOfValues(fieldName); i++) { diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java index 66a3778..a6da03d 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java @@ -65,12 +65,4 @@ public interface PlcConnection extends AutoCloseable { Optional<PlcUnsubscriptionRequest.Builder> unsubscriptionRequestBuilder(); - // the following methods should be moved to the SPI - - Optional<PlcReader> getReader(); - - Optional<PlcWriter> getWriter(); - - Optional<PlcSubscriber> getSubscriber(); - } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java index 6d50da0..133868d 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcReader.java @@ -37,6 +37,4 @@ public interface PlcReader { */ CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest); - //PlcReadRequest.Builder readRequestBuilder(); - } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java index d1c97ba..4a3f6cc 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java @@ -62,8 +62,4 @@ public interface PlcSubscriber { */ void unregister(PlcConsumerRegistration registration); - //PlcSubscriptionRequest.Builder subscriptionRequestBuilder(); - - //PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder(); - } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java index a4e44bf..db52de6 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcWriter.java @@ -37,6 +37,4 @@ public interface PlcWriter { */ CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest); - //PlcWriteRequest.Builder writeRequestBuilder(); - } diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java index 6e99861..283663a 100644 --- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java +++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java @@ -43,22 +43,19 @@ public class ManualPlc4XAdsTest { try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) { System.out.println("PlcConnection " + plcConnection); - PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found")); - PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("station", "Allgemein_S2.Station:BYTE").build(); - CompletableFuture<PlcReadResponse> response = reader.read(readRequest); + CompletableFuture<? extends PlcReadResponse> response = readRequest.execute(); PlcReadResponse readResponse = response.get(); System.out.println("Response " + readResponse); Collection<Integer> stations = readResponse.getAllIntegers("station"); stations.forEach(System.out::println); - PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available")); - PlcSubscriptionRequest subscriptionRequest = plcConnection.subscriptionRequestBuilder().get().addChangeOfStateField("stationChange", "Allgemein_S2.Station:BYTE").build(); - CompletableFuture<PlcSubscriptionResponse> subscribeResponse = plcSubscriber.subscribe(subscriptionRequest); + CompletableFuture<? extends PlcSubscriptionResponse> subscribeResponse = subscriptionRequest.execute(); PlcSubscriptionResponse plcSubscriptionResponse = subscribeResponse.get(); - PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles()); + // TODO: figure out what to do with this + /*PlcConsumerRegistration plcConsumerRegistration = plcSubscriber.register(System.out::println, plcSubscriptionResponse.getSubscriptionHandles()); TimeUnit.SECONDS.sleep(5); @@ -68,7 +65,7 @@ public class ManualPlc4XAdsTest { unsubscriptionResponse .get(5, TimeUnit.SECONDS); - System.out.println(unsubscriptionResponse); + System.out.println(unsubscriptionResponse);*/ } System.exit(0); } diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java index 8dcedd8..28be93f 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java @@ -23,14 +23,10 @@ import io.netty.channel.ChannelHandler; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; -import org.apache.plc4x.java.api.connection.PlcSubscriber; -import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.exceptions.PlcIoException; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -115,30 +111,6 @@ public abstract class AbstractPlcConnection implements PlcConnection { // Implemented in sub-classes, if needed. } - @Override - public Optional<PlcReader> getReader() { - if (this instanceof PlcReader) { - return Optional.of((PlcReader) this); - } - return Optional.empty(); - } - - @Override - public Optional<PlcWriter> getWriter() { - if (this instanceof PlcWriter) { - return Optional.of((PlcWriter) this); - } - return Optional.empty(); - } - - @Override - public Optional<PlcSubscriber> getSubscriber() { - if (this instanceof PlcSubscriber) { - return Optional.of((PlcSubscriber) this); - } - return Optional.empty(); - } - /** * Can be used to check and cast a parameter to its required internal type (can be used for general type checking too). * diff --git a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java index 949eaeb..95cc511 100644 --- a/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java +++ b/plc4j/protocols/ethernetip/src/test/java/org/apache/plc4x/java/ethernetip/ManualPlc4XEtherNetIpTest.java @@ -20,7 +20,6 @@ package org.apache.plc4x.java.ethernetip; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; @@ -34,14 +33,11 @@ public class ManualPlc4XEtherNetIpTest { try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionUrl)) { System.out.println("PlcConnection " + plcConnection); - // Get a reader instance. - PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found")); - - PlcReadRequest readRequest = plcConnection.readRequestBuilder().get() + PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("Reading not supported")) .addItem("field", "#4#105#3").build(); // Execute the read operation. - CompletableFuture<PlcReadResponse> response = reader.read(readRequest); + CompletableFuture<? extends PlcReadResponse> response = readRequest.execute(); PlcReadResponse readResponse = response.get(); // Output the response. diff --git a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java index 3f6c786..910942a 100644 --- a/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java +++ b/plc4j/protocols/modbus/src/test/java/org/apache/plc4x/java/modbus/ManualPlc4XModbusTest.java @@ -21,8 +21,6 @@ package org.apache.plc4x.java.modbus; import org.apache.commons.lang3.ArrayUtils; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; -import org.apache.plc4x.java.api.connection.PlcReader; -import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.apache.plc4x.java.api.messages.PlcWriteRequest; @@ -50,10 +48,9 @@ public class ManualPlc4XModbusTest { System.out.println("PlcConnection " + plcConnection); { - PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found")); - - PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomRegister", "register:7[3]").build(); - PlcReadResponse readResponse = reader.read(readRequest).get(); + PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found")) + .addItem("randomRegister", "register:7[3]").build(); + PlcReadResponse readResponse = readRequest.execute().get(); System.out.println("Response " + readResponse); readResponse.getAllByteArrays("randomRegister").stream() .map(HexUtil::toHex) @@ -63,11 +60,11 @@ public class ManualPlc4XModbusTest { { // Read an int from 2 registers - PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found")); // Just dump the actual values - PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomRegister", "register:3[2]").build(); - PlcReadResponse readResponse = reader.read(readRequest).get(); + PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found")) + .addItem("randomRegister", "register:3[2]").build(); + PlcReadResponse readResponse = readRequest.execute().get(); System.out.println("Response " + readResponse); Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister"); randomRegisters.stream() @@ -87,17 +84,16 @@ public class ManualPlc4XModbusTest { { // Read an int from 2 registers and multiple requests - PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found")); // Just dump the actual values - PlcReadRequest readRequest = plcConnection.readRequestBuilder().get() + PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found")) .addItem("randomRegister1", "register:1[2]") .addItem("randomRegister2", "register:10[3]") .addItem("randomRegister3", "register:20[4]") .addItem("randomRegister4", "register:30[5]") .addItem("randomRegister5", "register:40[6]") .build(); - PlcReadResponse readResponse = reader.read(readRequest).get(); + PlcReadResponse readResponse = readRequest.execute().get(); System.out.println("Response " + readResponse); IntStream.range(1, 6).forEach(i -> { Collection<Byte[]> randomRegisters = readResponse.getAllByteArrays("randomRegister" + i); @@ -118,10 +114,9 @@ public class ManualPlc4XModbusTest { } { - PlcReader reader = plcConnection.getReader().orElseThrow(() -> new RuntimeException("No Reader found")); - - PlcReadRequest readRequest = plcConnection.readRequestBuilder().get().addItem("randomCoil", "coil:1[9]").build(); - PlcReadResponse readResponse = reader.read(readRequest).get(); + PlcReadRequest readRequest = plcConnection.readRequestBuilder().orElseThrow(() -> new RuntimeException("No Reader found")) + .addItem("randomCoil", "coil:1[9]").build(); + PlcReadResponse readResponse = readRequest.execute().get(); System.out.println("Response " + readResponse); readResponse.getAllBooleans("randomCoil").stream() .map(hex -> "Coil Value: " + hex) @@ -129,10 +124,9 @@ public class ManualPlc4XModbusTest { } { - PlcWriter writer = plcConnection.getWriter().orElseThrow(() -> new RuntimeException("No Writer found")); - - PlcWriteRequest writeRequest = plcConnection.writeRequestBuilder().get().addItem("randomCoilField", "coil:1", true).build(); - PlcWriteResponse writeResponse = writer.write(writeRequest).get(); + PlcWriteRequest writeRequest = plcConnection.writeRequestBuilder().orElseThrow(() -> new RuntimeException("No Writer found")) + .addItem("randomCoilField", "coil:1", true).build(); + PlcWriteResponse writeResponse = writeRequest.execute().get(); System.out.println("Response " + writeResponse); } } catch (Exception e) { diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java index 68b1ba3..4c5764b 100644 --- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java +++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestConnection.java @@ -22,7 +22,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.plc4x.java.api.connection.PlcConnection; import org.apache.plc4x.java.api.connection.PlcReader; -import org.apache.plc4x.java.api.connection.PlcSubscriber; import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.messages.*; import org.apache.plc4x.java.api.types.PlcResponseCode; @@ -62,21 +61,6 @@ class TestConnection implements PlcConnection, PlcReader, PlcWriter { } @Override - public Optional<PlcReader> getReader() { - return Optional.of(this); - } - - @Override - public Optional<PlcWriter> getWriter() { - return Optional.of(this); - } - - @Override - public Optional<PlcSubscriber> getSubscriber() { - return Optional.empty(); // TODO: implement this - } - - @Override public Optional<PlcReadRequest.Builder> readRequestBuilder() { return Optional.of(new DefaultPlcReadRequest.Builder(this, new TestFieldHandler())); }