This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/master by this push:
     new 3afbc02  added initial consumer(subscription support) for apache camel
3afbc02 is described below

commit 3afbc0251f840f6eca7dd3dc91d9960de6296bb5
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Thu May 24 16:43:32 2018 +0200

    added initial consumer(subscription support) for apache camel
---
 .../{PLC4XComponent.java => Plc4XComponent.java}   |   4 +-
 .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 101 +++++++++++++++++++++
 .../{PLC4XEndpoint.java => Plc4XEndpoint.java}     |  46 ++++++++--
 .../{PLC4XProducer.java => Plc4XProducer.java}     |   6 +-
 .../services/org/apache/camel/component/plc4x      |   2 +-
 .../java/org/apache/plc4x/camel/MockDriver.java    |  43 ++++++++-
 ...XComponentTest.java => Plc4XComponentTest.java} |  12 ++-
 ...C4XEndpointTest.java => Plc4XEndpointTest.java} |  10 +-
 ...C4XProducerTest.java => Plc4XProducerTest.java} |  10 +-
 9 files changed, 206 insertions(+), 28 deletions(-)

diff --git 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XComponent.java
 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
similarity index 90%
rename from 
integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XComponent.java
rename to 
integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
index acfc67e..29495c9 100644
--- 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XComponent.java
+++ 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java
@@ -23,11 +23,11 @@ import org.apache.camel.impl.DefaultComponent;
 
 import java.util.Map;
 
-public class PLC4XComponent extends DefaultComponent {
+public class Plc4XComponent extends DefaultComponent {
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
-        Endpoint endpoint = new PLC4XEndpoint(uri, this);
+        Endpoint endpoint = new Plc4XEndpoint(uri, this);
         setProperties(endpoint, parameters);
         return endpoint;
     }
diff --git 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
new file mode 100644
index 0000000..f80b7de
--- /dev/null
+++ 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java
@@ -0,0 +1,101 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.camel;
+
+import org.apache.camel.*;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.support.LoggingExceptionHandler;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
+import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.model.Address;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+
+public class Plc4XConsumer extends ServiceSupport implements Consumer, 
java.util.function.Consumer<PlcNotification<Object>> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Plc4XConsumer.class);
+
+    private Plc4XEndpoint endpoint;
+    private AsyncProcessor processor;
+    private ExecutorService executor;
+    private ExceptionHandler exceptionHandler;
+    private PlcConnection plcConnection;
+    private Address address;
+    private Class dataType;
+
+
+    public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws 
PlcException {
+        this.endpoint = endpoint;
+        this.dataType = endpoint.getDataType();
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
+        this.exceptionHandler = new 
LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
+        String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", 
"");
+        this.plcConnection = 
endpoint.getPlcDriverManager().getConnection(plc4xURI);
+        this.address = plcConnection.parseAddress(endpoint.getAddress());
+    }
+
+    @Override
+    public String toString() {
+        return "Plc4XConsumer[" + endpoint + "]";
+    }
+
+    @Override
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public ExceptionHandler getExceptionHandler() {
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    @Override
+    protected void doStart() {
+        getSubscriber().subscribe(this, address, dataType);
+    }
+
+    @Override
+    protected void doStop() {
+        getSubscriber().unsubscribe(this, address);
+    }
+
+    private PlcSubscriber getSubscriber() {
+        return plcConnection.getSubscriber().orElseThrow(() -> new 
RuntimeException("No subscriber available"));
+    }
+
+    @Override
+    public void accept(PlcNotification<Object> plcNotification) {
+        LOGGER.debug("Received {}", plcNotification);
+        try {
+            Exchange exchange = endpoint.createExchange();
+            exchange.getIn().setBody(plcNotification.getValues());
+            processor.process(exchange);
+        } catch (Exception e) {
+            exceptionHandler.handleException(e);
+        }
+    }
+}
diff --git 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
similarity index 68%
rename from 
integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
rename to 
integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
index 7f6ea5a..c5c4e44 100644
--- 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
+++ 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java
@@ -25,11 +25,12 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.plc4x.java.PlcDriverManager;
 
 @UriEndpoint(scheme = "plc4x", title = "PLC4X", syntax = 
"plc4x:driver:address", label = "plc4x")
-public class PLC4XEndpoint extends DefaultEndpoint {
+public class Plc4XEndpoint extends DefaultEndpoint {
 
     /**
      * The name 0f the PLC4X driver
@@ -42,26 +43,35 @@ public class PLC4XEndpoint extends DefaultEndpoint {
     /**
      * The address for the PLC4X driver
      */
-    @UriPath
-    @Metadata(required = "true")
+    @UriParam
+    @Metadata(required = "false")
     @SuppressWarnings("unused")
     private String address;
 
+    /**
+     * TODO: document me
+     */
+    @UriParam
+    @Metadata(required = "false")
+    @SuppressWarnings("unused")
+    private Class dataType;
+
     private final PlcDriverManager plcDriverManager;
 
-    public PLC4XEndpoint(String endpointUri, Component component) {
+    public Plc4XEndpoint(String endpointUri, Component component) {
         super(endpointUri, component);
+        // TODO: why doesnt the annotation work
         plcDriverManager = new PlcDriverManager();
     }
 
     @Override
     public Producer createProducer() throws Exception {
-        return new PLC4XProducer(this);
+        return new Plc4XProducer(this);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new UnsupportedOperationException("The PLC4X endpoint doesn't 
support consumers.");
+        return new Plc4XConsumer(this, processor);
     }
 
     @Override
@@ -72,4 +82,28 @@ public class PLC4XEndpoint extends DefaultEndpoint {
     public PlcDriverManager getPlcDriverManager() {
         return plcDriverManager;
     }
+
+    public String getDriver() {
+        return driver;
+    }
+
+    public void setDriver(String driver) {
+        this.driver = driver;
+    }
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public Class getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(Class dataType) {
+        this.dataType = dataType;
+    }
 }
diff --git 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
similarity index 96%
rename from 
integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
rename to 
integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
index ab67d46..3a6751f 100644
--- 
a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
+++ 
b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java
@@ -34,13 +34,13 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class PLC4XProducer extends DefaultAsyncProducer {
+public class Plc4XProducer extends DefaultAsyncProducer {
     @SuppressWarnings("unused")
-    private PLC4XEndpoint endpoint;
+    private Plc4XEndpoint endpoint;
     private PlcConnection plcConnection;
     private AtomicInteger openRequests;
 
-    public PLC4XProducer(PLC4XEndpoint endpoint) throws PlcException {
+    public Plc4XProducer(Plc4XEndpoint endpoint) throws PlcException {
         super(endpoint);
         this.endpoint = endpoint;
         String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", 
"");
diff --git 
a/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x
 
b/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x
index 26947ee..9f443db 100644
--- 
a/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x
+++ 
b/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x
@@ -16,4 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-class=org.apache.plc4x.camel.PLC4XComponent
\ No newline at end of file
+class=org.apache.plc4x.camel.Plc4XComponent
\ No newline at end of file
diff --git 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
index 87c529d..dca996b 100644
--- 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
+++ 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java
@@ -21,15 +21,31 @@ package org.apache.plc4x.camel;
 import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.connection.PlcWriter;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.model.Address;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Date;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static org.mockito.Mockito.*;
 
 public class MockDriver implements PlcDriver {
 
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(MockDriver.class);
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+
     @Override
     public String getProtocolCode() {
         return "mock";
@@ -41,9 +57,32 @@ public class MockDriver implements PlcDriver {
     }
 
     @Override
-    public PlcConnection connect(String url) throws PlcConnectionException {
-        PlcConnection plcConnectionMock = mock(PlcConnection.class);
+    public PlcConnection connect(String url) {
+        PlcConnection plcConnectionMock = mock(PlcConnection.class, 
RETURNS_DEEP_STUBS);
+        try {
+            
when(plcConnectionMock.parseAddress(anyString())).thenReturn(mock(Address.class));
+        } catch (PlcException e) {
+            throw new RuntimeException(e);
+        }
         
when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class,
 RETURNS_DEEP_STUBS)));
+        PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, 
RETURNS_DEEP_STUBS);
+        doAnswer(invocation -> {
+            LOGGER.info("Received {}", invocation);
+            Consumer consumer = invocation.getArgument(0);
+            executorService.submit(() -> {
+                while (!Thread.currentThread().isInterrupted()) {
+                    consumer.accept(new PlcNotification(new Date(), 
mock(Address.class), Collections.singletonList("HelloWorld")));
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+            return null;
+        }).when(plcSubscriber).subscribe(any(), any(), any());
+        
when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber));
         return plcConnectionMock;
     }
 
diff --git 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
similarity index 86%
rename from 
integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
rename to 
integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
index 6e14090..47ae8a0 100644
--- 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
+++ 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-public class PLC4XComponentTest extends CamelTestSupport {
+public class Plc4XComponentTest extends CamelTestSupport {
 
     @Test
     public void testSimpleRouting() throws Exception {
@@ -42,19 +42,23 @@ public class PLC4XComponentTest extends CamelTestSupport {
     }
 
     @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
+    protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:plc4x")
-                    .setHeader(Constants.ADDRESS_HEADER, constant(new 
Address() {}))
+                    .setHeader(Constants.ADDRESS_HEADER, constant(new 
Address() {
+                    }))
                     .setBody(constant((byte) 0x0))
                     .to("plc4x:mock:10.10.10.1/1/1")
                     .to("mock:result");
                 from("direct:plc4x2")
-                    .setHeader(Constants.ADDRESS_HEADER, constant(new 
Address() {}))
+                    .setHeader(Constants.ADDRESS_HEADER, constant(new 
Address() {
+                    }))
                     .setBody(constant(Arrays.asList((byte) 0x0, (byte) 0x1, 
(byte) 0x2, (byte) 0x3)))
                     .to("plc4x:mock:10.10.10.1/1/1")
                     .to("mock:result");
+                
from("plc4x:mock:10.10.10.1/1/1?address=Main.by0&dataType=java.lang.String")
+                    .log("Got ${body}");
             }
         };
     }
diff --git 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XEndpointTest.java
 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java
similarity index 87%
rename from 
integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XEndpointTest.java
rename to 
integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java
index f223615..9aa057f 100644
--- 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XEndpointTest.java
+++ 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java
@@ -27,13 +27,13 @@ import static org.hamcrest.core.IsNull.notNullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 
-public class PLC4XEndpointTest {
+public class Plc4XEndpointTest {
 
-    PLC4XEndpoint SUT;
+    Plc4XEndpoint SUT;
 
     @Before
-    public void setUp() throws Exception {
-        SUT = new PLC4XEndpoint("plc4x:mock:10.10.10.1/1/1", 
mock(Component.class));
+    public void setUp() {
+        SUT = new Plc4XEndpoint("plc4x:mock:10.10.10.1/1/1", 
mock(Component.class));
     }
 
     @Test
@@ -47,7 +47,7 @@ public class PLC4XEndpointTest {
     }
 
     @Test
-    public void isSingleton() throws Exception {
+    public void isSingleton() {
         assertThat(SUT.isSingleton(), is(true));
     }
 
diff --git 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XProducerTest.java
 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
similarity index 94%
rename from 
integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XProducerTest.java
rename to 
integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
index 4e965f4..50726d1 100644
--- 
a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XProducerTest.java
+++ 
b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java
@@ -35,21 +35,21 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.mockito.Mockito.*;
 
-public class PLC4XProducerTest {
+public class Plc4XProducerTest {
 
-    private PLC4XProducer SUT;
+    private Plc4XProducer SUT;
 
     private Exchange testExchange;
 
     @Before
     public void setUp() throws Exception {
-        PLC4XEndpoint endpointMock = mock(PLC4XEndpoint.class, 
RETURNS_DEEP_STUBS);
+        Plc4XEndpoint endpointMock = mock(Plc4XEndpoint.class, 
RETURNS_DEEP_STUBS);
         
when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1");
         PlcDriverManager plcDriverManagerMock = mock(PlcDriverManager.class, 
RETURNS_DEEP_STUBS);
         when(plcDriverManagerMock.getConnection(anyString()).getWriter())
             .thenReturn(Optional.of(mock(PlcWriter.class, 
RETURNS_DEEP_STUBS)));
         
when(endpointMock.getPlcDriverManager()).thenReturn(plcDriverManagerMock);
-        SUT = new PLC4XProducer(endpointMock);
+        SUT = new Plc4XProducer(endpointMock);
         testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
         when(testExchange.getIn().getHeader(eq(Constants.ADDRESS_HEADER), 
eq(Address.class)))
             .thenReturn(mock(Address.class));
@@ -70,7 +70,7 @@ public class PLC4XProducerTest {
     }
 
     @Test
-    public void process_Async() throws Exception {
+    public void process_Async() {
         SUT.process(testExchange, doneSync -> {
         });
         when(testExchange.getPattern()).thenReturn(ExchangePattern.InOnly);

-- 
To stop receiving notification emails like this one, please contact
sru...@apache.org.

Reply via email to