This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch 3.6.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 2923b3324db22481b8f2b3868c3c66ddff29b222
Author: Daniel Kulp <d...@kulp.com>
AuthorDate: Tue Sep 5 10:59:52 2023 -0400

    Handle empty http responses better to make sure they are marked as complete
    
    (cherry picked from commit 48b89768082a31802f65ec9c5985ca7b7b5c0988)
---
 .../cxf/transport/http/HttpClientHTTPConduit.java  | 91 +++++++++++++---------
 1 file changed, 55 insertions(+), 36 deletions(-)

diff --git 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
index 9012f3a047..4cab8ae253 100644
--- 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
+++ 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
@@ -342,7 +342,6 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
         int rtimeout;
         volatile Throwable exception;
         volatile boolean connectionComplete;
-        PipedInputStream pin;
         PipedOutputStream pout;
         HttpRequest request;
         
@@ -367,7 +366,9 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
         @Override
         protected void handleNoOutput() throws IOException {
             contentLen = 0;
-            pout.close();
+            if (pout != null) {
+                pout.close();
+            }
             if (exception != null) {
                 if (exception instanceof IOException) {
                     throw (IOException)exception;
@@ -449,44 +450,50 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
             String httpRequestMethod =
                 (String)outMessage.get(Message.HTTP_REQUEST_METHOD);
 
-            pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
-                                        ? 4096 : csPolicy.getChunkLength());
-            pout = new PipedOutputStream(pin) {
-                synchronized boolean canWrite() throws IOException {
-                    return isConnectionAttemptCompleted(csPolicy, this);
-                }
-                @Override
-                public void write(int b) throws IOException {
-                    if (connectionComplete || canWrite()) {
-                        super.write(b);
-                    }
-                }
-                @Override
-                public void write(byte[] b, int off, int len) throws 
IOException {
-                    if (connectionComplete || canWrite()) {
-                        super.write(b, off, len);
-                    }
-                }
-                
-            };
             
             if (KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
                 || 
PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY))) {
                 contentLen = 0;
             }
 
+            final PipedInputStream pin = new 
PipedInputStream(csPolicy.getChunkLength() <= 0
+                ? 4096 : csPolicy.getChunkLength());
+            if (contentLen != 0) {
+                pout = new PipedOutputStream(pin) {
+                    synchronized boolean canWrite() throws IOException {
+                        return isConnectionAttemptCompleted(csPolicy, this);
+                    }
+                    @Override
+                    public void write(int b) throws IOException {
+                        if (connectionComplete || canWrite()) {
+                            super.write(b);
+                        }
+                    }
+                    @Override
+                    public void write(byte[] b, int off, int len) throws 
IOException {
+                        if (connectionComplete || canWrite()) {
+                            super.write(b, off, len);
+                        }
+                    }
+                };
+            }
+            
             BodyPublisher bp = new BodyPublisher() {
                 @Override
                 public void subscribe(Subscriber<? super ByteBuffer> 
subscriber) {
                     connectionComplete = true;
-                    synchronized (pout) {
-                        pout.notifyAll();
+                    if (pout != null) {
+                        synchronized (pout) {
+                            pout.notifyAll();                       
+                        }
+                        BodyPublishers.ofInputStream(new 
Supplier<InputStream>() {
+                            public InputStream get() {
+                                return pin;
+                            }                
+                        }).subscribe(subscriber);
+                    } else {
+                        BodyPublishers.noBody().subscribe(subscriber);
                     }
-                    BodyPublishers.ofInputStream(new Supplier<InputStream>() {
-                        public InputStream get() {
-                            return pin;
-                        }                
-                    }).subscribe(subscriber);
                 }
 
                 @Override
@@ -526,8 +533,10 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
 
             future = cl.sendAsync(request, handler);
             future.exceptionally(ex -> {
-                synchronized (pout) {
-                    pout.notifyAll();
+                if (pout != null) {
+                    synchronized (pout) {
+                        pout.notifyAll();
+                    }
                 }
                 return null;
             });
@@ -611,7 +620,9 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
             String method = 
(String)outMessage.get(Message.HTTP_REQUEST_METHOD);
             int sc = resp.statusCode();
             if ("HEAD".equals(method)) {
-                return null;
+                try (InputStream in = resp.body()) {
+                    return null;
+                }
             }
             if (sc == 204) {
                 //no content
@@ -623,16 +634,22 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
                 if (f.isPresent()) {
                     long l = Long.parseLong(f.get());
                     if (l == 0) {
-                        return null;
+                        try (InputStream in = resp.body()) {
+                            return null;
+                        }
                     }
                 } else if (!fChunk.isPresent() || 
!"chunked".equals(fChunk.get())) {
                     if (resp.version() == Version.HTTP_2) {
                         InputStream in = resp.body();
                         if (in.available() <= 0) {
-                            return null;
+                            try (in) {
+                                return null;
+                            }
                         }
                     } else {
-                        return null;
+                        try (InputStream in = resp.body()) {
+                            return null;
+                        }
                     }
                 }
             }
@@ -798,7 +815,9 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
         @Override
         protected void retransmitStream() throws IOException {
             cachedStream.writeCacheTo(pout);
-            pout.close();
+            if (pout != null) {
+                pout.close();
+            }
         }
 
         @Override

Reply via email to