This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.24.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.24.x by this push: new 309084b CAMEL-14162: camel-stream - When using http url then data is not sent over the wire 309084b is described below commit 309084bb96f47bb67c381a5a01b0f56f6e003f67 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Nov 8 16:06:02 2019 +0100 CAMEL-14162: camel-stream - When using http url then data is not sent over the wire --- components/camel-stream/pom.xml | 5 ++ .../camel/component/stream/StreamProducer.java | 25 +++++++--- .../component/stream/StreamToUrlJettyTest.java | 53 ++++++++++++++++++++++ 3 files changed, 77 insertions(+), 6 deletions(-) diff --git a/components/camel-stream/pom.xml b/components/camel-stream/pom.xml index 750e81a..7865371 100644 --- a/components/camel-stream/pom.xml +++ b/components/camel-stream/pom.xml @@ -70,6 +70,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jetty</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java index 72826c8..5feec2e 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java @@ -20,6 +20,7 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; @@ -50,6 +51,7 @@ public class StreamProducer extends DefaultProducer { private StreamEndpoint endpoint; private String uri; private OutputStream outputStream; + private URLConnection urlConnection; private AtomicInteger count = new AtomicInteger(); public StreamProducer(StreamEndpoint endpoint, String uri) throws Exception { @@ -88,18 +90,18 @@ public class StreamProducer extends DefaultProducer { LOG.debug("About to write to url: {}", u); URL url = new URL(u); - URLConnection c = url.openConnection(); - c.setDoOutput(true); + urlConnection = url.openConnection(); + urlConnection.setDoOutput(true); if (endpoint.getConnectTimeout() > 0) { - c.setConnectTimeout(endpoint.getConnectTimeout()); + urlConnection.setConnectTimeout(endpoint.getConnectTimeout()); } if (endpoint.getReadTimeout() > 0) { - c.setReadTimeout(endpoint.getReadTimeout()); + urlConnection.setReadTimeout(endpoint.getReadTimeout()); } if (endpoint.getHttpHeaders() != null) { - endpoint.getHttpHeaders().forEach((k, v) -> c.addRequestProperty(k, v.toString())); + endpoint.getHttpHeaders().forEach((k, v) -> urlConnection.addRequestProperty(k, v.toString())); } - return c.getOutputStream(); + return urlConnection.getOutputStream(); } private OutputStream resolveStreamFromFile() throws IOException { @@ -205,8 +207,19 @@ public class StreamProducer extends DefaultProducer { // never ever close a system stream if (!systemStream && expiredStream) { + if (urlConnection != null) { + // force a flush as it may first send data over the wire when we are done + try { + InputStream is = urlConnection.getInputStream(); + IOHelper.close(is); + } catch (Throwable e) { + // ignore + } + } + outputStream.close(); outputStream = null; + urlConnection = null; LOG.debug("Closed stream '{}'", endpoint.getEndpointKey()); } } diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java new file mode 100644 index 0000000..586ee35 --- /dev/null +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Unit test for producer writing to URL. + */ +public class StreamToUrlJettyTest extends CamelTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start") + // just send one message at a time + .to("stream:url?autoCloseCount=1&url=http://localhost:8080/foo&httpHeaders.content-type=text/plain"); + + from("jetty:http://localhost:8080/foo") + .log("Jetty foo") + .to("mock:foo"); + } + }; + } + + @Test + public void shouldSendToUrlOutputStream() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello" + System.lineSeparator(), "World" + System.lineSeparator()); + + template.sendBody("direct:start", "Hello"); + template.sendBody("direct:start", "World"); + + assertMockEndpointsSatisfied(); + } + +}