[CXF-6863] WS-RM 3.x fix for retransmission works with attachments upon a network error
Change-Id: I92c59f572c31fe94e2fa6413304db9ec4b9f514c Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/96eee75e Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/96eee75e Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/96eee75e Branch: refs/heads/master-jaxrs-2.1 Commit: 96eee75e4d2ef481fabf2745e9e0ccba9e682d93 Parents: 574b2a9 Author: Kai Rommel <kai.rom...@sap.com> Authored: Fri Jun 3 19:13:12 2016 +0200 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Tue Jun 7 14:21:08 2016 +0200 ---------------------------------------------------------------------- .../org/apache/cxf/io/CachedOutputStream.java | 17 +++++++----- .../cxf/ws/rm/RMCaptureOutInterceptor.java | 10 ++++++++ .../cxf/ws/rm/soap/RetransmissionQueueImpl.java | 27 ++++++++++---------- 3 files changed, 34 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/96eee75e/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java index 905f6f7..eeced83 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java +++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java @@ -258,14 +258,17 @@ public class CachedOutputStream extends OutputStream { } } else { // read the file - currentStream.close(); - if (copyOldContent) { - InputStream fin = createInputStream(tempFile); - IOUtils.copyAndCloseInput(fin, out); + try { + currentStream.close(); + if (copyOldContent) { + InputStream fin = createInputStream(tempFile); + IOUtils.copyAndCloseInput(fin, out); + } + } finally { + streamList.remove(currentStream); + deleteTempFile(); + inmem = true; } - streamList.remove(currentStream); - deleteTempFile(); - inmem = true; } } currentStream = out; http://git-wip-us.apache.org/repos/asf/cxf/blob/96eee75e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java index 4514e03..b3c412d 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java @@ -21,6 +21,7 @@ package org.apache.cxf.ws.rm; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Collection; import java.util.List; import java.util.Map; @@ -41,6 +42,7 @@ import org.apache.cxf.interceptor.AttachmentOutInterceptor; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.LoggingOutInterceptor; import org.apache.cxf.io.CachedOutputStream; +import org.apache.cxf.io.WriteOnCloseOutputStream; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.FaultMode; @@ -200,6 +202,14 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> { // capture message if retransmission possible if (isApplicationMessage && !isPartialResponse) { + OutputStream os = msg.getContent(OutputStream.class); + // We need to ensure that we have an output stream which won't start writing the + // message until connection is setup + if (!(os instanceof WriteOnCloseOutputStream)) { + WriteOnCloseOutputStream cached = new WriteOnCloseOutputStream(os); + msg.setContent(OutputStream.class, cached); + os = cached; + } getManager().initializeInterceptorChain(msg); //doneCaptureMessage(msg); captureMessage(msg); http://git-wip-us.apache.org/repos/asf/cxf/blob/96eee75e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java index 5ae80dd..3181efe 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java @@ -741,6 +741,7 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { } private void doResend(SoapMessage message) { + InputStream is = null; try { // initialize copied interceptor chain for message @@ -779,7 +780,7 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { // read SOAP headers from saved input stream CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); cos.holdTempFile(); // CachedOutputStream is hold until delivering was successful - InputStream is = cos.getInputStream(); // instance is needed to close input stream later on + is = cos.getInputStream(); // instance is needed to close input stream later on XMLStreamReader reader = StaxUtils.createXMLStreamReader(is, StandardCharsets.UTF_8.name()); message.getHeaders().clear(); if (reader.getEventType() != XMLStreamConstants.START_ELEMENT @@ -832,7 +833,7 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { retransmitChain.remove(incept); } } - retransmitChain.add(new CopyOutInterceptor(reader, is)); + retransmitChain.add(new CopyOutInterceptor(reader)); // restore callbacks on output stream if (callbacks != null) { @@ -868,6 +869,15 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { } catch (Exception ex) { LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex); + } finally { + // make sure to always close InputStreams of the CachedOutputStream to avoid leaving temp files undeleted + if (null != is) { + try { + is.close(); + } catch (IOException e) { + // Ignore + } + } } } @@ -940,12 +950,10 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { public static class CopyOutInterceptor extends AbstractOutDatabindingInterceptor { private final XMLStreamReader reader; - private InputStream is; - public CopyOutInterceptor(XMLStreamReader rdr, InputStream is) { + public CopyOutInterceptor(XMLStreamReader rdr) { super(Phase.MARSHAL); reader = rdr; - this.is = is; } @Override @@ -953,16 +961,9 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { try { XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); StaxUtils.copy(reader, writer); - if (is != null) { - try { - is.close(); - } catch (IOException e) { - // ignore - } - } } catch (XMLStreamException e) { throw new Fault("COULD_NOT_READ_XML_STREAM", LOG, e); } } } -} \ No newline at end of file +}