This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit a667e0ec643a720a3fc010fdb991b2e89a94d6f4
Author: Sebastian Rühl <sebastian.ru...@codecentric.de>
AuthorDate: Thu Dec 21 11:49:27 2017 +0100

    Cleanup camel component:
    - Fixed shutdown
    - Async call in test
    - Added path variable for driver
---
 .../java/org/apache/plc4x/camel/PLC4XEndpoint.java |  9 ++++
 .../java/org/apache/plc4x/camel/PLC4XProducer.java | 57 +++++++++++-----------
 .../org/apache/plc4x/camel/PLC4XComponentTest.java |  6 ++-
 3 files changed, 41 insertions(+), 31 deletions(-)

diff --git 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
index dd800e1..4fd84e4 100644
--- 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
+++ 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
@@ -32,10 +32,19 @@ import org.apache.plc4x.java.PlcDriverManager;
 public class PLC4XEndpoint extends DefaultEndpoint {
 
     /**
+     * The name 0f the PLC4X driver
+     */
+    @UriPath
+    @Metadata(required = "true")
+    @SuppressWarnings("unused")
+    String driver;
+
+    /**
      * The address for the PLC4X driver
      */
     @UriPath
     @Metadata(required = "true")
+    @SuppressWarnings("unused")
     String address;
 
     final PlcDriverManager plcDriverManager;
diff --git 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
index 29331d2..e11d1e5 100644
--- 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
+++ 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
@@ -21,22 +21,22 @@ package org.apache.plc4x.camel;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.spi.ShutdownAware;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.Address;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class PLC4XProducer extends DefaultAsyncProducer implements 
ShutdownAware {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PLC4XProducer.class);
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
+public class PLC4XProducer extends DefaultAsyncProducer {
+    @SuppressWarnings("unused")
     private PLC4XEndpoint endpoint;
     private PlcConnection plcConnection;
+    private AtomicInteger openRequests;
 
     public PLC4XProducer(PLC4XEndpoint endpoint) {
         super(endpoint);
@@ -47,6 +47,7 @@ public class PLC4XProducer extends DefaultAsyncProducer 
implements ShutdownAware
         } catch (PlcException e) {
             throw new RuntimeException(e);
         }
+        openRequests = new AtomicInteger();
     }
 
     @SuppressWarnings("unchecked")
@@ -57,13 +58,21 @@ public class PLC4XProducer extends DefaultAsyncProducer 
implements ShutdownAware
         Object value = in.getBody(Object.class);
         PlcWriteRequest plcSimpleWriteRequest = new PlcWriteRequest(datatype, 
address, value);
         PlcWriter plcWriter = plcConnection.getWriter().orElseThrow(() -> new 
IllegalArgumentException("Writer for driver not found"));
-        Object response = plcWriter.write(plcSimpleWriteRequest).get();
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            out.setBody(response);
-        } else {
-            in.setBody(response);
+        CompletableFuture<PlcWriteResponse> completableFuture = 
plcWriter.write(plcSimpleWriteRequest);
+        int currentlyOpenRequests = openRequests.incrementAndGet();
+        try {
+            log.debug("Currently open requests including {}:{}", exchange, 
currentlyOpenRequests);
+            PlcWriteResponse plcWriteResponse = completableFuture.get();
+            if (exchange.getPattern().isOutCapable()) {
+                Message out = exchange.getOut();
+                out.copyFrom(exchange.getIn());
+                out.setBody(plcWriteResponse);
+            } else {
+                in.setBody(plcWriteResponse);
+            }
+        } finally {
+            int openRequestsAfterFinish = openRequests.decrementAndGet();
+            log.trace("Open Requests after {}:{}", exchange, 
openRequestsAfterFinish);
         }
     }
 
@@ -81,27 +90,17 @@ public class PLC4XProducer extends DefaultAsyncProducer 
implements ShutdownAware
     }
 
     @Override
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        switch (shutdownRunningTask) {
-            case CompleteCurrentTaskOnly:
-                break;
-            case CompleteAllTasks:
-                break;
+    protected void doStop() throws Exception {
+        int openRequestsAtStop = openRequests.get();
+        log.debug("Stopping with {} open requests", openRequestsAtStop);
+        if (openRequestsAtStop > 0) {
+            log.warn("There are still {} open requests", openRequestsAtStop);
         }
         try {
             plcConnection.close();
         } catch (Exception ignore) {
         }
-        return false;
+        super.doStop();
     }
 
-    @Override
-    public int getPendingExchangesSize() {
-        return 0;
-    }
-
-    @Override
-    public void prepareShutdown(boolean suspendOnly, boolean forced) {
-
-    }
 }
diff --git 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
index cc3ae5f..0e3e8ea 100644
--- 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
+++ 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
@@ -25,6 +25,8 @@ import org.apache.plc4x.java.s7.model.S7Address;
 import org.apache.plc4x.java.s7.netty.model.types.MemoryArea;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 public class PLC4XComponentTest extends CamelTestSupport {
 
     @Test
@@ -32,9 +34,9 @@ public class PLC4XComponentTest extends CamelTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMinimumMessageCount(1);
 
-        template.sendBody("direct:plc4x", 3);
+        template.asyncSendBody("direct:plc4x", "irrelevant");
 
-        assertMockEndpointsSatisfied();
+        assertMockEndpointsSatisfied(2, TimeUnit.SECONDS);
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <commits@plc4x.apache.org>.

Reply via email to