This is an automated email from the ASF dual-hosted git repository. dkulp pushed a commit to branch 3.6.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/3.6.x-fixes by this push: new 1f11e61729 [CXF-8895] ConnectionExceptions with larger payloads could cause a hang with new HttpClient based conduit 1f11e61729 is described below commit 1f11e61729d9482d23048076703a6431646d6c38 Author: Daniel Kulp <d...@kulp.com> AuthorDate: Wed Jun 28 18:48:55 2023 -0400 [CXF-8895] ConnectionExceptions with larger payloads could cause a hang with new HttpClient based conduit --- .../cxf/transport/http/HttpClientHTTPConduit.java | 70 ++++++++++++++++++++-- .../systest/dispatch/DispatchClientServerTest.java | 28 +++++++++ 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index f836903431..0451e9af68 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -74,6 +74,7 @@ import org.apache.cxf.common.util.PropertyUtils; import org.apache.cxf.configuration.jsse.TLSClientParameters; import org.apache.cxf.helpers.HttpHeaderHelper; import org.apache.cxf.helpers.JavaUtils; +import org.apache.cxf.interceptor.Fault; import org.apache.cxf.io.CacheAndWriteOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; @@ -379,19 +380,69 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { } } + private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOutputStream out) + throws IOException { + if (!connectionComplete) { + // if we haven't connected yet, we'll see if an exception is the reason + // why we haven't connected. Otherwise, wait for the connection + // to complete. + if (future.isDone()) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw new Fault("Could not send Message.", LOG, (IOException)e.getCause()); + } + } + return false; + } + try { + out.wait(csPolicy.getConnectionTimeout()); + } catch (InterruptedException e) { + //ignore + } + if (future.isDone()) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw new Fault("Could not send Message.", LOG, (IOException)e.getCause()); + } + } + return false; + } + } + return true; + } + @Override protected void setProtocolHeaders() throws IOException { HttpClient cl = outMessage.get(HttpClient.class); Address address = (Address)outMessage.get(KEY_HTTP_CONNECTION_ADDRESS); - HTTPClientPolicy csPolicy = getClient(outMessage); + final HTTPClientPolicy csPolicy = getClient(outMessage); String httpRequestMethod = (String)outMessage.get(Message.HTTP_REQUEST_METHOD); pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 ? 4096 : csPolicy.getChunkLength()); - pout = new PipedOutputStream(pin); - - + pout = new PipedOutputStream(pin) { + synchronized boolean canWrite() throws IOException { + return isConnectionAttemptCompleted(csPolicy, this); + } + @Override + public void write(int b) throws IOException { + if (connectionComplete || canWrite()) { + super.write(b); + } + } + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (connectionComplete || canWrite()) { + super.write(b, off, len); + } + } + + }; if (KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) || PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY))) { @@ -402,6 +453,9 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { @Override public void subscribe(Subscriber<? super ByteBuffer> subscriber) { connectionComplete = true; + synchronized (pout) { + pout.notifyAll(); + } BodyPublishers.ofInputStream(new Supplier<InputStream>() { public InputStream get() { return pin; @@ -443,8 +497,14 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { final BodyHandler<InputStream> handler = BodyHandlers.ofInputStream(); - + future = cl.sendAsync(request, handler); + future.exceptionally(ex -> { + synchronized (pout) { + pout.notifyAll(); + } + return null; + }); } @Override protected void setupWrappedStream() throws IOException { diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java index 0c52a947cb..08c35d240d 100644 --- a/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java +++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/dispatch/DispatchClientServerTest.java @@ -25,6 +25,7 @@ import java.net.URL; import java.net.http.HttpConnectTimeoutException; import java.net.http.HttpTimeoutException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -35,6 +36,7 @@ import javax.jws.WebService; import javax.xml.bind.JAXBContext; import javax.xml.namespace.QName; import javax.xml.soap.MessageFactory; +import javax.xml.soap.SOAPElement; import javax.xml.soap.SOAPMessage; import javax.xml.transform.Source; import javax.xml.transform.dom.DOMSource; @@ -205,6 +207,32 @@ public class DispatchClientServerTest extends AbstractBusClientServerTestBase { || ex.getCause() instanceof java.net.SocketTimeoutException || ex.getCause() instanceof HttpConnectTimeoutException); } + + try { + //create a really big message to make sure the write gets called + Iterator<javax.xml.soap.Node> nodes = soapReqMsg.getSOAPBody().getChildElements(); + while (nodes.hasNext()) { + javax.xml.soap.Node nd = nodes.next(); + if (nd instanceof SOAPElement) { + SOAPElement se = (SOAPElement)nd; + for (int x = 0; x < 100; x++) { + se.addTextNode("TestSoapMessageTestSoapMessageTestSoapMessageTestSoapMessage"); + } + } + } + + disp.invoke(soapReqMsg); + fail("Should have faulted"); + } catch (SOAPFaultException ex) { + fail("should not be a SOAPFaultException"); + } catch (WebServiceException ex) { + //expected + assertTrue(ex.getCause().getClass().getName(), + ex.getCause() instanceof java.net.ConnectException + || ex.getCause() instanceof java.net.SocketTimeoutException + || ex.getCause() instanceof HttpConnectTimeoutException); + } + dispImpl.close(); }