This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push: new 3afbc02 added initial consumer(subscription support) for apache camel 3afbc02 is described below commit 3afbc0251f840f6eca7dd3dc91d9960de6296bb5 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Thu May 24 16:43:32 2018 +0200 added initial consumer(subscription support) for apache camel --- .../{PLC4XComponent.java => Plc4XComponent.java} | 4 +- .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 101 +++++++++++++++++++++ .../{PLC4XEndpoint.java => Plc4XEndpoint.java} | 46 ++++++++-- .../{PLC4XProducer.java => Plc4XProducer.java} | 6 +- .../services/org/apache/camel/component/plc4x | 2 +- .../java/org/apache/plc4x/camel/MockDriver.java | 43 ++++++++- ...XComponentTest.java => Plc4XComponentTest.java} | 12 ++- ...C4XEndpointTest.java => Plc4XEndpointTest.java} | 10 +- ...C4XProducerTest.java => Plc4XProducerTest.java} | 10 +- 9 files changed, 206 insertions(+), 28 deletions(-) diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XComponent.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java similarity index 90% rename from integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XComponent.java rename to integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java index acfc67e..29495c9 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XComponent.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java @@ -23,11 +23,11 @@ import org.apache.camel.impl.DefaultComponent; import java.util.Map; -public class PLC4XComponent extends DefaultComponent { +public class Plc4XComponent extends DefaultComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - Endpoint endpoint = new PLC4XEndpoint(uri, this); + Endpoint endpoint = new Plc4XEndpoint(uri, this); setProperties(endpoint, parameters); return endpoint; } diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java new file mode 100644 index 0000000..f80b7de --- /dev/null +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java @@ -0,0 +1,101 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ +package org.apache.plc4x.camel; + +import org.apache.camel.*; +import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.support.LoggingExceptionHandler; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorConverterHelper; +import org.apache.plc4x.java.api.connection.PlcConnection; +import org.apache.plc4x.java.api.connection.PlcSubscriber; +import org.apache.plc4x.java.api.exceptions.PlcException; +import org.apache.plc4x.java.api.messages.PlcNotification; +import org.apache.plc4x.java.api.model.Address; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; + +public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util.function.Consumer<PlcNotification<Object>> { + private static final Logger LOGGER = LoggerFactory.getLogger(Plc4XConsumer.class); + + private Plc4XEndpoint endpoint; + private AsyncProcessor processor; + private ExecutorService executor; + private ExceptionHandler exceptionHandler; + private PlcConnection plcConnection; + private Address address; + private Class dataType; + + + public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException { + this.endpoint = endpoint; + this.dataType = endpoint.getDataType(); + this.processor = AsyncProcessorConverterHelper.convert(processor); + this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); + String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", ""); + this.plcConnection = endpoint.getPlcDriverManager().getConnection(plc4xURI); + this.address = plcConnection.parseAddress(endpoint.getAddress()); + } + + @Override + public String toString() { + return "Plc4XConsumer[" + endpoint + "]"; + } + + @Override + public Endpoint getEndpoint() { + return endpoint; + } + + public ExceptionHandler getExceptionHandler() { + return exceptionHandler; + } + + public void setExceptionHandler(ExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + + @Override + protected void doStart() { + getSubscriber().subscribe(this, address, dataType); + } + + @Override + protected void doStop() { + getSubscriber().unsubscribe(this, address); + } + + private PlcSubscriber getSubscriber() { + return plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("No subscriber available")); + } + + @Override + public void accept(PlcNotification<Object> plcNotification) { + LOGGER.debug("Received {}", plcNotification); + try { + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(plcNotification.getValues()); + processor.process(exchange); + } catch (Exception e) { + exceptionHandler.handleException(e); + } + } +} diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java similarity index 68% rename from integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java rename to integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java index 7f6ea5a..c5c4e44 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java @@ -25,11 +25,12 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.plc4x.java.PlcDriverManager; @UriEndpoint(scheme = "plc4x", title = "PLC4X", syntax = "plc4x:driver:address", label = "plc4x") -public class PLC4XEndpoint extends DefaultEndpoint { +public class Plc4XEndpoint extends DefaultEndpoint { /** * The name 0f the PLC4X driver @@ -42,26 +43,35 @@ public class PLC4XEndpoint extends DefaultEndpoint { /** * The address for the PLC4X driver */ - @UriPath - @Metadata(required = "true") + @UriParam + @Metadata(required = "false") @SuppressWarnings("unused") private String address; + /** + * TODO: document me + */ + @UriParam + @Metadata(required = "false") + @SuppressWarnings("unused") + private Class dataType; + private final PlcDriverManager plcDriverManager; - public PLC4XEndpoint(String endpointUri, Component component) { + public Plc4XEndpoint(String endpointUri, Component component) { super(endpointUri, component); + // TODO: why doesnt the annotation work plcDriverManager = new PlcDriverManager(); } @Override public Producer createProducer() throws Exception { - return new PLC4XProducer(this); + return new Plc4XProducer(this); } @Override public Consumer createConsumer(Processor processor) throws Exception { - throw new UnsupportedOperationException("The PLC4X endpoint doesn't support consumers."); + return new Plc4XConsumer(this, processor); } @Override @@ -72,4 +82,28 @@ public class PLC4XEndpoint extends DefaultEndpoint { public PlcDriverManager getPlcDriverManager() { return plcDriverManager; } + + public String getDriver() { + return driver; + } + + public void setDriver(String driver) { + this.driver = driver; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public Class getDataType() { + return dataType; + } + + public void setDataType(Class dataType) { + this.dataType = dataType; + } } diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java similarity index 96% rename from integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java rename to integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java index ab67d46..3a6751f 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XProducer.java @@ -34,13 +34,13 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -public class PLC4XProducer extends DefaultAsyncProducer { +public class Plc4XProducer extends DefaultAsyncProducer { @SuppressWarnings("unused") - private PLC4XEndpoint endpoint; + private Plc4XEndpoint endpoint; private PlcConnection plcConnection; private AtomicInteger openRequests; - public PLC4XProducer(PLC4XEndpoint endpoint) throws PlcException { + public Plc4XProducer(Plc4XEndpoint endpoint) throws PlcException { super(endpoint); this.endpoint = endpoint; String plc4xURI = endpoint.getEndpointUri().replaceFirst("plc4x:/?/?", ""); diff --git a/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x b/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x index 26947ee..9f443db 100644 --- a/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x +++ b/integrations/apache-camel/src/main/resources/META-INF/services/org/apache/camel/component/plc4x @@ -16,4 +16,4 @@ # specific language governing permissions and limitations # under the License. # -class=org.apache.plc4x.camel.PLC4XComponent \ No newline at end of file +class=org.apache.plc4x.camel.Plc4XComponent \ No newline at end of file diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java index 87c529d..dca996b 100644 --- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java +++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/MockDriver.java @@ -21,15 +21,31 @@ package org.apache.plc4x.camel; import org.apache.plc4x.java.api.PlcDriver; import org.apache.plc4x.java.api.authentication.PlcAuthentication; import org.apache.plc4x.java.api.connection.PlcConnection; +import org.apache.plc4x.java.api.connection.PlcSubscriber; import org.apache.plc4x.java.api.connection.PlcWriter; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; +import org.apache.plc4x.java.api.exceptions.PlcException; +import org.apache.plc4x.java.api.messages.PlcNotification; +import org.apache.plc4x.java.api.model.Address; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Date; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static org.mockito.Mockito.*; public class MockDriver implements PlcDriver { + public static final Logger LOGGER = LoggerFactory.getLogger(MockDriver.class); + + ExecutorService executorService = Executors.newFixedThreadPool(10); + @Override public String getProtocolCode() { return "mock"; @@ -41,9 +57,32 @@ public class MockDriver implements PlcDriver { } @Override - public PlcConnection connect(String url) throws PlcConnectionException { - PlcConnection plcConnectionMock = mock(PlcConnection.class); + public PlcConnection connect(String url) { + PlcConnection plcConnectionMock = mock(PlcConnection.class, RETURNS_DEEP_STUBS); + try { + when(plcConnectionMock.parseAddress(anyString())).thenReturn(mock(Address.class)); + } catch (PlcException e) { + throw new RuntimeException(e); + } when(plcConnectionMock.getWriter()).thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS))); + PlcSubscriber plcSubscriber = mock(PlcSubscriber.class, RETURNS_DEEP_STUBS); + doAnswer(invocation -> { + LOGGER.info("Received {}", invocation); + Consumer consumer = invocation.getArgument(0); + executorService.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { + consumer.accept(new PlcNotification(new Date(), mock(Address.class), Collections.singletonList("HelloWorld"))); + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + }); + return null; + }).when(plcSubscriber).subscribe(any(), any(), any()); + when(plcConnectionMock.getSubscriber()).thenReturn(Optional.of(plcSubscriber)); return plcConnectionMock; } diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java similarity index 86% rename from integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java rename to integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java index 6e14090..47ae8a0 100644 --- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java +++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XComponentTest.java @@ -27,7 +27,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.concurrent.TimeUnit; -public class PLC4XComponentTest extends CamelTestSupport { +public class Plc4XComponentTest extends CamelTestSupport { @Test public void testSimpleRouting() throws Exception { @@ -42,19 +42,23 @@ public class PLC4XComponentTest extends CamelTestSupport { } @Override - protected RouteBuilder createRouteBuilder() throws Exception { + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { from("direct:plc4x") - .setHeader(Constants.ADDRESS_HEADER, constant(new Address() {})) + .setHeader(Constants.ADDRESS_HEADER, constant(new Address() { + })) .setBody(constant((byte) 0x0)) .to("plc4x:mock:10.10.10.1/1/1") .to("mock:result"); from("direct:plc4x2") - .setHeader(Constants.ADDRESS_HEADER, constant(new Address() {})) + .setHeader(Constants.ADDRESS_HEADER, constant(new Address() { + })) .setBody(constant(Arrays.asList((byte) 0x0, (byte) 0x1, (byte) 0x2, (byte) 0x3))) .to("plc4x:mock:10.10.10.1/1/1") .to("mock:result"); + from("plc4x:mock:10.10.10.1/1/1?address=Main.by0&dataType=java.lang.String") + .log("Got ${body}"); } }; } diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XEndpointTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java similarity index 87% rename from integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XEndpointTest.java rename to integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java index f223615..9aa057f 100644 --- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XEndpointTest.java +++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XEndpointTest.java @@ -27,13 +27,13 @@ import static org.hamcrest.core.IsNull.notNullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; -public class PLC4XEndpointTest { +public class Plc4XEndpointTest { - PLC4XEndpoint SUT; + Plc4XEndpoint SUT; @Before - public void setUp() throws Exception { - SUT = new PLC4XEndpoint("plc4x:mock:10.10.10.1/1/1", mock(Component.class)); + public void setUp() { + SUT = new Plc4XEndpoint("plc4x:mock:10.10.10.1/1/1", mock(Component.class)); } @Test @@ -47,7 +47,7 @@ public class PLC4XEndpointTest { } @Test - public void isSingleton() throws Exception { + public void isSingleton() { assertThat(SUT.isSingleton(), is(true)); } diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XProducerTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java similarity index 94% rename from integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XProducerTest.java rename to integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java index 4e965f4..50726d1 100644 --- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XProducerTest.java +++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/Plc4XProducerTest.java @@ -35,21 +35,21 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.*; -public class PLC4XProducerTest { +public class Plc4XProducerTest { - private PLC4XProducer SUT; + private Plc4XProducer SUT; private Exchange testExchange; @Before public void setUp() throws Exception { - PLC4XEndpoint endpointMock = mock(PLC4XEndpoint.class, RETURNS_DEEP_STUBS); + Plc4XEndpoint endpointMock = mock(Plc4XEndpoint.class, RETURNS_DEEP_STUBS); when(endpointMock.getEndpointUri()).thenReturn("plc4x:mock:10.10.10.1/1/1"); PlcDriverManager plcDriverManagerMock = mock(PlcDriverManager.class, RETURNS_DEEP_STUBS); when(plcDriverManagerMock.getConnection(anyString()).getWriter()) .thenReturn(Optional.of(mock(PlcWriter.class, RETURNS_DEEP_STUBS))); when(endpointMock.getPlcDriverManager()).thenReturn(plcDriverManagerMock); - SUT = new PLC4XProducer(endpointMock); + SUT = new Plc4XProducer(endpointMock); testExchange = mock(Exchange.class, RETURNS_DEEP_STUBS); when(testExchange.getIn().getHeader(eq(Constants.ADDRESS_HEADER), eq(Address.class))) .thenReturn(mock(Address.class)); @@ -70,7 +70,7 @@ public class PLC4XProducerTest { } @Test - public void process_Async() throws Exception { + public void process_Async() { SUT.process(testExchange, doneSync -> { }); when(testExchange.getPattern()).thenReturn(ExchangePattern.InOnly); -- To stop receiving notification emails like this one, please contact sru...@apache.org.