This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push: new 20b938e Add basic Netty UDP tests 20b938e is described below commit 20b938eef1aa8b3da9ee8e2d18ce956deb52b78d Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Wed Feb 17 15:07:32 2021 +0000 Add basic Netty UDP tests Fixes #2187 --- integration-tests/netty/pom.xml | 21 +++++- .../camel/quarkus/component/netty/CamelRoute.java | 27 ------- .../quarkus/component/netty/NettyCodecHelper.java | 76 ++++++++++++++++++++ .../quarkus/component/netty/NettyResource.java | 83 ++++++++++++++++++++++ .../camel/quarkus/component/netty/NettyRoutes.java | 52 ++++++++++++++ .../camel/quarkus/component/netty/NettyTest.java | 57 ++++++++++----- .../quarkus/component/netty/NettyTestResource.java | 7 +- 7 files changed, 276 insertions(+), 47 deletions(-) diff --git a/integration-tests/netty/pom.xml b/integration-tests/netty/pom.xml index 22a6638..d553d2b 100644 --- a/integration-tests/netty/pom.xml +++ b/integration-tests/netty/pom.xml @@ -34,11 +34,13 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-netty</artifactId> </dependency> - - <!-- include in build for integration testing support --> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-seda</artifactId> + </dependency> <dependency> <groupId>io.quarkus</groupId> - <artifactId>quarkus-undertow</artifactId> + <artifactId>quarkus-resteasy</artifactId> </dependency> <!-- test dependencies --> @@ -74,6 +76,19 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-seda-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/CamelRoute.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/CamelRoute.java deleted file mode 100644 index b19ca3e..0000000 --- a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/CamelRoute.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.quarkus.component.netty; - -import org.apache.camel.builder.RouteBuilder; - -public class CamelRoute extends RouteBuilder { - @Override - public void configure() { - from("netty:tcp://0.0.0.0:{{camel.netty.test-port}}?textline=true&sync=true") - .setBody().constant("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today."); - } -} diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyCodecHelper.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyCodecHelper.java new file mode 100644 index 0000000..ef5cc72 --- /dev/null +++ b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyCodecHelper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.netty; + +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageToMessageEncoder; +import org.apache.camel.component.netty.ChannelHandlerFactories; +import org.apache.camel.component.netty.ChannelHandlerFactory; + +public final class NettyCodecHelper { + + private NettyCodecHelper() { + // Utility class + } + + public static ChannelHandlerFactory createNullDelimitedHandler(String protocol) { + ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[] { 0 }); + ByteBuf[] delimiters = new ByteBuf[] { delimiter, delimiter }; + return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, delimiters, protocol); + } + + public static BytesDecoder createBytesDecoder() { + return new BytesDecoder(); + } + + public static BytesEncoder createBytesEncoder() { + return new BytesEncoder(); + } + + @ChannelHandler.Sharable + static class BytesDecoder extends MessageToMessageDecoder<ByteBuf> { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { + if (msg.isReadable()) { + byte[] bytes = new byte[msg.readableBytes()]; + int readerIndex = msg.readerIndex(); + msg.getBytes(readerIndex, bytes); + out.add(bytes); + } + } + } + + @ChannelHandler.Sharable + static class BytesEncoder extends MessageToMessageEncoder<byte[]> { + + @Override + protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception { + byte[] bytes = msg; + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(bytes.length); + buf.writeBytes(bytes); + out.add(buf); + } + } +} diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyResource.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyResource.java new file mode 100644 index 0000000..25104c7 --- /dev/null +++ b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyResource.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.netty; + +import java.nio.charset.StandardCharsets; + +import javax.inject.Inject; +import javax.ws.rs.POST; +import javax.ws.rs.Path; + +import org.apache.camel.ConsumerTemplate; +import org.apache.camel.ProducerTemplate; + +@Path("/netty") +public class NettyResource { + + @Inject + ProducerTemplate producerTemplate; + + @Inject + ConsumerTemplate consumerTemplate; + + @Path("/tcp") + @POST + public String sendNettyTcpMessage(String message) { + return producerTemplate.requestBody("netty:tcp://localhost:{{camel.netty.test-tcp-port}}?textline=true&sync=true", + message, String.class); + } + + @Path("/udp") + @POST + public String sendNettyUdpMessage(String message) { + return producerTemplate.requestBody("netty:udp://localhost:{{camel.netty.test-udp-port}}?sync=true", message, + String.class); + } + + @Path("/tcp/codec") + @POST + public Object sendNettyTcpMessageWithCodec(String message) { + producerTemplate.sendBody("netty:tcp://localhost:{{camel.netty.test-codec-tcp-port}}?disconnect=true" + + "&sync=false&allowDefaultCodec=false" + + "&decoders=#tcpNullDelimitedHandler,#bytesDecoder" + + "&encoders=#bytesEncoder", createNullDelimitedMessage(message)); + + return consumerTemplate.receiveBody("seda:custom-tcp-codec", 5000, String.class); + } + + @Path("/udp/codec") + @POST + public Object sendNettyUdpMessageWithCodec(String message) { + producerTemplate.sendBody("netty:udp://localhost:{{camel.netty.test-codec-udp-port}}?sync=false" + + "&udpByteArrayCodec=true", message.getBytes(StandardCharsets.UTF_8)); + + return consumerTemplate.receiveBody("seda:custom-udp-codec", 5000, String.class); + } + + private byte[] createNullDelimitedMessage(String message) { + byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8); + byte[] bytes = new byte[messageBytes.length + 2]; + bytes[message.length() - 1] = 0; + bytes[message.length() - 2] = 0; + + for (int i = 0; i < messageBytes.length; i++) { + bytes[i] = messageBytes[i]; + } + + return bytes; + } +} diff --git a/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyRoutes.java b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyRoutes.java new file mode 100644 index 0000000..117d5f6 --- /dev/null +++ b/integration-tests/netty/src/main/java/org/apache/camel/quarkus/component/netty/NettyRoutes.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.netty; + +import io.netty.channel.ChannelHandler; +import org.apache.camel.BindToRegistry; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.netty.ChannelHandlerFactory; + +public class NettyRoutes extends RouteBuilder { + + @BindToRegistry("tcpNullDelimitedHandler") + private ChannelHandlerFactory tcpNullDelimitedHandler = NettyCodecHelper.createNullDelimitedHandler("tcp"); + + @BindToRegistry("bytesDecoder") + private ChannelHandler bytesDecoder = NettyCodecHelper.createBytesDecoder(); + + @BindToRegistry("bytesEncoder") + private ChannelHandler bytesEncoder = NettyCodecHelper.createBytesEncoder(); + + @Override + public void configure() { + from("netty:tcp://localhost:{{camel.netty.test-tcp-port}}?textline=true&sync=true") + .transform().simple("Hello ${body} TCP"); + + from("netty:udp://localhost:{{camel.netty.test-udp-port}}?sync=true") + .transform().simple("Hello ${body} UDP"); + + from("netty:tcp://localhost:{{camel.netty.test-codec-tcp-port}}?disconnect=true&sync=false&allowDefaultCodec=false&decoders=#tcpNullDelimitedHandler,#bytesDecoder&encoders=#bytesEncoder") + .convertBodyTo(String.class) + .transform().simple("Hello ${body} TCP") + .to("seda:custom-tcp-codec"); + + from("netty:udp://localhost:{{camel.netty.test-codec-udp-port}}?udpByteArrayCodec=true&sync=false") + .transform().simple("Hello ${body} UDP") + .to("seda:custom-udp-codec"); + } +} diff --git a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java index 850c98e..d49fdc4 100644 --- a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java +++ b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTest.java @@ -16,35 +16,60 @@ */ package org.apache.camel.quarkus.component.netty; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.net.Socket; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; -import org.junit.jupiter.api.Assertions; +import io.restassured.RestAssured; import org.junit.jupiter.api.Test; +import static org.hamcrest.Matchers.is; + @QuarkusTest @QuarkusTestResource(NettyTestResource.class) class NettyTest { - private static final String POEM = "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds"; - private static final String EXPECTED_RESPONSE = "When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today."; @Test - public void testPoem() throws IOException { + public void testNettyTcpProduceConsume() throws IOException { + RestAssured.given() + .body("Camel Quarkus Netty") + .post("/netty/tcp") + .then() + .statusCode(200) + .body(is("Hello Camel Quarkus Netty TCP")); + + } - try ( - final Socket socket = new Socket("localhost", Integer.getInteger("camel.netty.test-port")); - final PrintWriter outputWriter = new PrintWriter(socket.getOutputStream(), true); - final BufferedReader inputReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));) { - outputWriter.println(POEM); - String response = inputReader.readLine(); - Assertions.assertTrue(response.equalsIgnoreCase(EXPECTED_RESPONSE), "Response did not match expected response"); - } + @Test + public void testNettyTcpProduceConsumeWithCodec() throws IOException { + String message = "Camel Quarkus Netty Custom Codec"; + RestAssured.given() + .body(message) + .post("/netty/tcp/codec") + .then() + .statusCode(200) + .body(is("Hello Camel Quarkus Netty Custom Codec TCP")); + } + @Test + public void testNettyUdpProduceConsumeWithCodec() throws IOException { + String message = "Camel Quarkus Netty Custom Codec"; + RestAssured.given() + .body(message) + .post("/netty/udp/codec") + .then() + .statusCode(200) + .body(is("Hello Camel Quarkus Netty Custom Codec UDP")); + } + + @Test + public void testNettyUdpProduceConsume() throws IOException { + RestAssured.given() + .body("Camel Quarkus Netty") + .post("/netty/udp") + .then() + .statusCode(200) + .body(is("Hello Camel Quarkus Netty UDP")); } } diff --git a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java index 79e8452..7bff80b 100644 --- a/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java +++ b/integration-tests/netty/src/test/java/org/apache/camel/quarkus/component/netty/NettyTestResource.java @@ -25,7 +25,12 @@ import org.apache.camel.quarkus.test.AvailablePortFinder; public class NettyTestResource implements QuarkusTestResourceLifecycleManager { @Override public Map<String, String> start() { - return AvailablePortFinder.reserveNetworkPorts(Objects::toString, "camel.netty.test-port"); + return AvailablePortFinder.reserveNetworkPorts( + Objects::toString, + "camel.netty.test-tcp-port", + "camel.netty.test-codec-tcp-port", + "camel.netty.test-udp-port", + "camel.netty.test-codec-udp-port"); } @Override