[CXF-6863] WS-RM 3.x fix for retransmission works with attachments upon a 
network error

Change-Id: I92c59f572c31fe94e2fa6413304db9ec4b9f514c


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/96eee75e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/96eee75e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/96eee75e

Branch: refs/heads/master-jaxrs-2.1
Commit: 96eee75e4d2ef481fabf2745e9e0ccba9e682d93
Parents: 574b2a9
Author: Kai Rommel <kai.rom...@sap.com>
Authored: Fri Jun 3 19:13:12 2016 +0200
Committer: Akitoshi Yoshida <a...@apache.org>
Committed: Tue Jun 7 14:21:08 2016 +0200

----------------------------------------------------------------------
 .../org/apache/cxf/io/CachedOutputStream.java   | 17 +++++++-----
 .../cxf/ws/rm/RMCaptureOutInterceptor.java      | 10 ++++++++
 .../cxf/ws/rm/soap/RetransmissionQueueImpl.java | 27 ++++++++++----------
 3 files changed, 34 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/96eee75e/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java 
b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
index 905f6f7..eeced83 100644
--- a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
+++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
@@ -258,14 +258,17 @@ public class CachedOutputStream extends OutputStream {
                 }
             } else {
                 // read the file
-                currentStream.close();
-                if (copyOldContent) {
-                    InputStream fin = createInputStream(tempFile);
-                    IOUtils.copyAndCloseInput(fin, out);
+                try {
+                    currentStream.close();
+                    if (copyOldContent) {
+                        InputStream fin = createInputStream(tempFile);
+                        IOUtils.copyAndCloseInput(fin, out);
+                    }
+                } finally {
+                    streamList.remove(currentStream);
+                    deleteTempFile();
+                    inmem = true;
                 }
-                streamList.remove(currentStream);
-                deleteTempFile();
-                inmem = true;
             }
         }
         currentStream = out;

http://git-wip-us.apache.org/repos/asf/cxf/blob/96eee75e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java 
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index 4514e03..b3c412d 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -21,6 +21,7 @@ package org.apache.cxf.ws.rm;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.cxf.interceptor.AttachmentOutInterceptor;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.LoggingOutInterceptor;
 import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.io.WriteOnCloseOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.FaultMode;
@@ -200,6 +202,14 @@ public class RMCaptureOutInterceptor extends 
AbstractRMInterceptor<Message>  {
         
         // capture message if retransmission possible
         if (isApplicationMessage && !isPartialResponse) {
+            OutputStream os = msg.getContent(OutputStream.class);
+            // We need to ensure that we have an output stream which won't 
start writing the 
+            // message until connection is setup
+            if (!(os instanceof WriteOnCloseOutputStream)) {
+                WriteOnCloseOutputStream cached = new 
WriteOnCloseOutputStream(os);
+                msg.setContent(OutputStream.class, cached);
+                os = cached;
+            }
             getManager().initializeInterceptorChain(msg);
             //doneCaptureMessage(msg);
             captureMessage(msg);

http://git-wip-us.apache.org/repos/asf/cxf/blob/96eee75e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java 
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
index 5ae80dd..3181efe 100644
--- 
a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
+++ 
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
@@ -741,6 +741,7 @@ public class RetransmissionQueueImpl implements 
RetransmissionQueue {
     }
 
     private void doResend(SoapMessage message) {
+        InputStream is = null;
         try {
             
             // initialize copied interceptor chain for message
@@ -779,7 +780,7 @@ public class RetransmissionQueueImpl implements 
RetransmissionQueue {
             // read SOAP headers from saved input stream
             CachedOutputStream cos = 
(CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
             cos.holdTempFile(); // CachedOutputStream is hold until delivering 
was successful
-            InputStream is = cos.getInputStream(); // instance is needed to 
close input stream later on
+            is = cos.getInputStream(); // instance is needed to close input 
stream later on
             XMLStreamReader reader = StaxUtils.createXMLStreamReader(is, 
StandardCharsets.UTF_8.name());
             message.getHeaders().clear();
             if (reader.getEventType() != XMLStreamConstants.START_ELEMENT
@@ -832,7 +833,7 @@ public class RetransmissionQueueImpl implements 
RetransmissionQueue {
                     retransmitChain.remove(incept);
                 }
             }
-            retransmitChain.add(new CopyOutInterceptor(reader, is));
+            retransmitChain.add(new CopyOutInterceptor(reader));
             
             // restore callbacks on output stream
             if (callbacks != null) {
@@ -868,6 +869,15 @@ public class RetransmissionQueueImpl implements 
RetransmissionQueue {
             
         } catch (Exception ex) {
             LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
+        } finally {
+            // make sure to always close InputStreams of the 
CachedOutputStream to avoid leaving temp files undeleted
+            if (null != is) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+            }
         }
     }
 
@@ -940,12 +950,10 @@ public class RetransmissionQueueImpl implements 
RetransmissionQueue {
     
     public static class CopyOutInterceptor extends 
AbstractOutDatabindingInterceptor {
         private final XMLStreamReader reader;
-        private InputStream is;
         
-        public CopyOutInterceptor(XMLStreamReader rdr, InputStream is) {
+        public CopyOutInterceptor(XMLStreamReader rdr) {
             super(Phase.MARSHAL);
             reader = rdr;
-            this.is = is;
         }
         
         @Override
@@ -953,16 +961,9 @@ public class RetransmissionQueueImpl implements 
RetransmissionQueue {
             try {
                 XMLStreamWriter writer = 
message.getContent(XMLStreamWriter.class);
                 StaxUtils.copy(reader, writer);
-                if (is != null) {
-                    try {
-                        is.close();
-                    } catch (IOException e) {
-                        // ignore
-                    }
-                }
             } catch (XMLStreamException e) {
                 throw new Fault("COULD_NOT_READ_XML_STREAM", LOG, e);
             }
         }
     }
-}
\ No newline at end of file
+}

Reply via email to