This is an automated email from the ASF dual-hosted git repository. siano pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 684ef57 CAMEL-12227 - upgrade to AHC 2.3.0 684ef57 is described below commit 684ef5786838e051cece2f510d9dafabdc310d02 Author: Stephan Siano <stephan.si...@sap.com> AuthorDate: Thu Feb 8 10:24:10 2018 +0100 CAMEL-12227 - upgrade to AHC 2.3.0 --- .../apache/camel/component/ahc/ws/WsEndpoint.java | 12 +++++----- .../apache/camel/component/ahc/ws/WsProducer.java | 18 +++++++------- .../org/apache/camel/component/ahc/AhcBinding.java | 5 ++-- .../apache/camel/component/ahc/AhcEndpoint.java | 11 ++++++--- .../apache/camel/component/ahc/AhcProducer.java | 6 ++--- .../camel/component/ahc/DefaultAhcBinding.java | 28 ++++++++++------------ .../component/atmosphere/websocket/TestClient.java | 17 +++++++------ parent/pom.xml | 2 +- 8 files changed, 51 insertions(+), 48 deletions(-) diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java index 29a593b..10da4ac 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsEndpoint.java @@ -29,8 +29,8 @@ import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; -import org.asynchttpclient.ws.DefaultWebSocketListener; import org.asynchttpclient.ws.WebSocket; +import org.asynchttpclient.ws.WebSocketListener; import org.asynchttpclient.ws.WebSocketUpgradeHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +133,7 @@ public class WsEndpoint extends AhcEndpoint { LOG.debug("Disconnecting from {}", getHttpUri().toASCIIString()); } websocket.removeWebSocketListener(listener); - websocket.close(); + websocket.sendCloseFrame(); websocket = null; } super.doStop(); @@ -156,7 +156,7 @@ public class WsEndpoint extends AhcEndpoint { } } - class WsListener extends DefaultWebSocketListener { + class WsListener implements WebSocketListener { @Override public void onOpen(WebSocket websocket) { @@ -164,7 +164,7 @@ public class WsEndpoint extends AhcEndpoint { } @Override - public void onClose(WebSocket websocket) { + public void onClose(WebSocket websocket, int code, String reason) { LOG.debug("websocket closed - reconnecting"); try { reConnect(); @@ -184,7 +184,7 @@ public class WsEndpoint extends AhcEndpoint { } @Override - public void onMessage(byte[] message) { + public void onBinaryFrame(byte[] message, boolean finalFragment, int rsv) { LOG.debug("Received message --> {}", message); for (WsConsumer consumer : consumers) { consumer.sendMessage(message); @@ -192,7 +192,7 @@ public class WsEndpoint extends AhcEndpoint { } @Override - public void onMessage(String message) { + public void onTextFrame(String message, boolean finalFragment, int rsv) { LOG.debug("Received message --> {}", message); for (WsConsumer consumer : consumers) { consumer.sendMessage(message); diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java index 5ba9759..347f8d6 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsProducer.java @@ -55,7 +55,7 @@ public class WsProducer extends DefaultProducer { sendStreamMessage(getWebSocket(), (InputStream)message); } else { //TODO provide other binding option, for now use the converted string - getWebSocket().sendMessage(in.getMandatoryBody(String.class)); + getWebSocket().sendTextFrame(in.getMandatoryBody(String.class)); } } } @@ -65,15 +65,15 @@ public class WsProducer extends DefaultProducer { int p = 0; while (p < msg.length()) { if (msg.length() - p < streamBufferSize) { - webSocket.stream(msg.substring(p), true); + webSocket.sendTextFrame(msg.substring(p), true, 0); p = msg.length(); } else { - webSocket.stream(msg.substring(p, streamBufferSize), false); + webSocket.sendTextFrame(msg.substring(p, streamBufferSize), false, 0); p += streamBufferSize; } } } else { - webSocket.sendMessage(msg); + webSocket.sendTextFrame(msg); } } @@ -89,20 +89,20 @@ public class WsProducer extends DefaultProducer { System.arraycopy(msg, p, writebuf, 0, rest); byte[] tmpbuf = new byte[rest]; System.arraycopy(writebuf, 0, tmpbuf, 0, rest); - webSocket.stream(tmpbuf, 0, rest, true); + webSocket.sendBinaryFrame(tmpbuf, true, 0); // ends p = msg.length; } else { // bug in grizzly? we need to create a byte array with the exact length //webSocket.stream(msg, p, streamBufferSize, false); System.arraycopy(msg, p, writebuf, 0, streamBufferSize); - webSocket.stream(writebuf, 0, streamBufferSize, false); + webSocket.sendBinaryFrame(writebuf, false, 0); // ends p += streamBufferSize; } } } else { - webSocket.sendMessage(msg); + webSocket.sendBinaryFrame(msg); } } @@ -114,7 +114,7 @@ public class WsProducer extends DefaultProducer { try { while ((rn = in.read(readbuf, 0, readbuf.length)) != -1) { if (wn > 0) { - webSocket.stream(writebuf, 0, writebuf.length, false); + webSocket.sendBinaryFrame(writebuf, false, 0); } System.arraycopy(readbuf, 0, writebuf, 0, rn); wn = rn; @@ -125,7 +125,7 @@ public class WsProducer extends DefaultProducer { writebuf = new byte[wn]; System.arraycopy(tmpbuf, 0, writebuf, 0, wn); } // ends - webSocket.stream(writebuf, 0, wn, true); + webSocket.sendBinaryFrame(writebuf, true, 0); } finally { in.close(); } diff --git a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java index 189d364..b6d940f 100644 --- a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java +++ b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcBinding.java @@ -19,10 +19,11 @@ package org.apache.camel.component.ahc; import java.io.ByteArrayOutputStream; import org.apache.camel.Exchange; -import org.asynchttpclient.HttpResponseHeaders; import org.asynchttpclient.HttpResponseStatus; import org.asynchttpclient.Request; +import io.netty.handler.codec.http.HttpHeaders; + /** * Binding from Camel to/from {@link com.ning.http.client.AsyncHttpClient} */ @@ -66,7 +67,7 @@ public interface AhcBinding { * @param headers the HTTP headers * @throws Exception is thrown if error occurred in the callback */ - void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange, HttpResponseHeaders headers) throws Exception; + void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange, HttpHeaders headers) throws Exception; /** * Callback from the {@link com.ning.http.client.AsyncHttpClient} when complete and all the response has been received. diff --git a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java index 54dc8a3..610be59 100644 --- a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java +++ b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java @@ -280,7 +280,7 @@ public class AhcEndpoint extends DefaultEndpoint implements AsyncEndpoint, Heade super.doStart(); if (client == null) { - AsyncHttpClientConfig config = null; + AsyncHttpClientConfig config; if (clientConfig != null) { DefaultAsyncHttpClientConfig.Builder builder = AhcComponent.cloneConfig(clientConfig); @@ -293,13 +293,18 @@ public class AhcEndpoint extends DefaultEndpoint implements AsyncEndpoint, Heade config = builder.build(); } else { + DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder(); + /* + * Not doing this will always create a cookie handler per endpoint, which is incompatible + * to prior versions and interferes with the cookie handling in camel + */ + builder.setCookieStore(null); if (sslContextParameters != null) { - DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder(); SSLContext sslContext = sslContextParameters.createSSLContext(getCamelContext()); JdkSslContext ssl = new JdkSslContext(sslContext, true, ClientAuth.REQUIRE); builder.setSslContext(ssl); - config = builder.build(); } + config = builder.build(); } client = createClient(config); } diff --git a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java index 887a458..dbec1a0 100644 --- a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java +++ b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java @@ -24,10 +24,11 @@ import org.apache.camel.impl.DefaultAsyncProducer; import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.HttpResponseBodyPart; -import org.asynchttpclient.HttpResponseHeaders; import org.asynchttpclient.HttpResponseStatus; import org.asynchttpclient.Request; +import io.netty.handler.codec.http.HttpHeaders; + /** * */ @@ -144,7 +145,7 @@ public class AhcProducer extends DefaultAsyncProducer { } @Override - public State onHeadersReceived(HttpResponseHeaders headers) throws Exception { + public State onHeadersReceived(HttpHeaders headers) throws Exception { if (log.isTraceEnabled()) { log.trace("{} onHeadersReceived {}", exchange.getExchangeId(), headers); } @@ -156,5 +157,4 @@ public class AhcProducer extends DefaultAsyncProducer { return State.CONTINUE; } } - } diff --git a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java index 2e5c3d9..86466fa 100644 --- a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java +++ b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java @@ -26,10 +26,8 @@ import java.io.UnsupportedEncodingException; import java.net.URI; import java.nio.charset.Charset; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.TreeMap; import org.apache.camel.CamelExchangeException; @@ -43,17 +41,17 @@ import org.apache.camel.util.GZIPHelper; import org.apache.camel.util.IOHelper; import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ObjectHelper; -import org.asynchttpclient.HttpResponseHeaders; import org.asynchttpclient.HttpResponseStatus; import org.asynchttpclient.Request; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.request.body.generator.BodyGenerator; -import org.asynchttpclient.request.body.generator.ByteArrayBodyGenerator; import org.asynchttpclient.request.body.generator.FileBodyGenerator; import org.asynchttpclient.request.body.generator.InputStreamBodyGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.handler.codec.http.HttpHeaders; + public class DefaultAhcBinding implements AhcBinding { protected final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -167,7 +165,7 @@ public class DefaultAhcBinding implements AhcBinding { ByteArrayOutputStream bos = new ByteArrayOutputStream(endpoint.getBufferSize()); AhcHelper.writeObjectToStream(bos, obj); byte[] bytes = bos.toByteArray(); - body = new ByteArrayBodyGenerator(bytes); + body = new InputStreamBodyGenerator(new ByteArrayInputStream(bytes)); IOHelper.close(bos); } else if (data instanceof File || data instanceof GenericFile) { // file based (could potentially also be a FTP file etc) @@ -181,9 +179,9 @@ public class DefaultAhcBinding implements AhcBinding { // do not fallback to use the default charset as it can influence the request // (for example application/x-www-form-urlencoded forms being sent) if (charset != null) { - body = new ByteArrayBodyGenerator(((String) data).getBytes(charset)); + body = new InputStreamBodyGenerator(new ByteArrayInputStream(((String) data).getBytes(charset))); } else { - body = new ByteArrayBodyGenerator(((String) data).getBytes()); + body = new InputStreamBodyGenerator(new ByteArrayInputStream(((String) data).getBytes())); } } // fallback as input stream @@ -232,16 +230,16 @@ public class DefaultAhcBinding implements AhcBinding { } @Override - public void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange, HttpResponseHeaders headers) throws Exception { + public void onHeadersReceived(AhcEndpoint endpoint, Exchange exchange, HttpHeaders headers) throws Exception { Map<String, List<String>> m = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER); - for (Entry<String, String> entry : headers.getHeaders().entries()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (!m.containsKey(key)) { - m.put(key, new LinkedList<String>()); - exchange.getOut().getHeaders().put(key, value); + for (String name:headers.names()) { + List<String> values = headers.getAll(name); + if (values.size() == 1) { + exchange.getOut().getHeaders().put(name, values.get(0)); + } else { + exchange.getOut().getHeaders().put(name, values); } - m.get(key).add(value); + m.put(name, values); } // handle cookies if (endpoint.getCookieHandler() != null) { diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java index e9ee2ce..ec75e7b 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/TestClient.java @@ -28,8 +28,7 @@ import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.ws.WebSocket; -import org.asynchttpclient.ws.WebSocketByteListener; -import org.asynchttpclient.ws.WebSocketTextListener; +import org.asynchttpclient.ws.WebSocketListener; import org.asynchttpclient.ws.WebSocketUpgradeHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,11 +68,11 @@ public class TestClient { } public void sendTextMessage(String message) { - websocket.sendMessage(message); + websocket.sendTextFrame(message); } public void sendBytesMessage(byte[] message) { - websocket.sendMessage(message); + websocket.sendBinaryFrame(message); } public boolean await(int secs) throws InterruptedException { @@ -116,11 +115,11 @@ public class TestClient { } public void close() throws IOException { - websocket.close(); + websocket.sendCloseFrame(); client.close(); } - private class TestWebSocketListener implements WebSocketTextListener, WebSocketByteListener { + private class TestWebSocketListener implements WebSocketListener { @Override public void onOpen(WebSocket websocket) { @@ -128,7 +127,7 @@ public class TestClient { } @Override - public void onClose(WebSocket websocket) { + public void onClose(WebSocket websocket, int code, String reason) { LOG.info("[ws] closed"); } @@ -138,7 +137,7 @@ public class TestClient { } @Override - public void onMessage(byte[] message) { + public void onBinaryFrame(byte[] message, boolean finalFragment, int rsv) { received.add(message); LOG.info("[ws] received bytes --> " + Arrays.toString(message)); latch.countDown(); @@ -146,7 +145,7 @@ public class TestClient { @Override - public void onMessage(String message) { + public void onTextFrame(String message, boolean finalFragment, int rsv) { received.add(message); LOG.info("[ws] received --> " + message); latch.countDown(); diff --git a/parent/pom.xml b/parent/pom.xml index acd5543..32a7790 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -43,7 +43,7 @@ <activemq-version>5.15.3</activemq-version> <activemq-artemis-version>2.4.0</activemq-artemis-version> <aether-version>1.0.2.v20150114</aether-version> - <ahc-version>2.0.38</ahc-version> + <ahc-version>2.3.0</ahc-version> <ant-bundle-version>1.7.0_6</ant-bundle-version> <antlr-bundle-version>3.5.2_1</antlr-bundle-version> <antlr-runtime-bundle-version>3.5.2_1</antlr-runtime-bundle-version> -- To stop receiving notification emails like this one, please contact si...@apache.org.