Repository: cxf Updated Branches: refs/heads/3.1.x-fixes 40b19d7f6 -> 907fbb7b8
[CXF-6646] CXF 3.x WSRM Replace RewindableInputStream with CachedOutputStream Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/27a95f05 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/27a95f05 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/27a95f05 Branch: refs/heads/3.1.x-fixes Commit: 27a95f050bfdbeb9d3e63af11fc2e8c4370ae549 Parents: 40b19d7 Author: Kai Rommel <kai.rom...@sap.com> Authored: Wed May 4 02:45:38 2016 +0200 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Tue Jun 7 14:28:51 2016 +0200 ---------------------------------------------------------------------- .../apache/cxf/ws/rm/DestinationSequence.java | 7 +-- .../cxf/ws/rm/RMCaptureInInterceptor.java | 6 +-- .../cxf/ws/rm/RMCaptureOutInterceptor.java | 13 ++++-- .../apache/cxf/ws/rm/RMMessageConstants.java | 6 ++- .../cxf/ws/rm/persistence/PersistenceUtils.java | 48 +++++++++++++++----- .../apache/cxf/ws/rm/persistence/RMMessage.java | 14 +++--- .../cxf/ws/rm/persistence/jdbc/RMTxStore.java | 18 +++++++- .../cxf/ws/rm/soap/RetransmissionQueueImpl.java | 43 ++++++++++++++---- .../org/apache/cxf/ws/rm/RMManagerTest.java | 22 ++++++--- .../ws/rm/persistence/PersistenceUtilsTest.java | 22 ++++++--- .../ws/rm/persistence/RMLargeMessageTest.java | 4 -- .../cxf/ws/rm/persistence/RMMessageTest.java | 16 +------ .../rm/persistence/jdbc/RMTxStoreTestBase.java | 25 +++++++--- 13 files changed, 167 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java index 2e1a54b..58b7906 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java @@ -33,6 +33,7 @@ import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationProvider; import org.apache.cxf.continuations.SuspendedInvocationException; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; import org.apache.cxf.ws.addressing.EndpointReferenceType; @@ -167,9 +168,9 @@ public class DestinationSequence extends AbstractSequence { RMMessage msg = null; if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) { msg = new RMMessage(); - RewindableInputStream in = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT); - in.rewind(); - msg.setContent(in); + CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); + msg.setContent(cos); + msg.setContentType((String) message.get(Message.CONTENT_TYPE)); msg.setMessageNumber(st.getMessageNumber()); } store.persistIncoming(this, msg); http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java index 40b4fab..9d48cbc 100755 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java @@ -57,9 +57,9 @@ public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> { saved.lockOutputStream(); LOG.fine("Capturing the original RM message"); - RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream()); - message.setContent(InputStream.class, ris); - message.put(RMMessageConstants.SAVED_CONTENT, ris); + //RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream()); + message.setContent(InputStream.class, saved.getInputStream()); + message.put(RMMessageConstants.SAVED_CONTENT, saved); } catch (Exception e) { throw new Fault(e); } http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java index c0ca125..4514e03 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java @@ -19,8 +19,8 @@ package org.apache.cxf.ws.rm; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.List; import java.util.Map; @@ -35,10 +35,12 @@ import org.apache.cxf.binding.Binding; import org.apache.cxf.binding.soap.interceptor.SoapOutInterceptor; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor; import org.apache.cxf.interceptor.AttachmentOutInterceptor; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.LoggingOutInterceptor; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.FaultMode; @@ -255,8 +257,11 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> { } // save message for potential retransmission - ByteArrayInputStream bis = cw.getOutputStream().createInputStream(); - message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(bis)); + CachedOutputStream cos = new CachedOutputStream(); + IOUtils.copyAndCloseInput(cw.getOutputStream().createInputStream(), cos); + cos.flush(); + InputStream is = cos.getInputStream(); + message.put(RMMessageConstants.SAVED_CONTENT, cos); RMManager manager = getManager(); manager.getRetransmissionQueue().start(); manager.getRetransmissionQueue().addUnacknowledged(message); @@ -276,7 +281,7 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> { } // serializes the message content and the attachments into // the RMMessage content - PersistenceUtils.encodeRMContent(msg, message, bis); + PersistenceUtils.encodeRMContent(msg, message, is); store.persistOutgoing(ss, msg); } http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java index eb6789c..15999e9 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java @@ -36,8 +36,12 @@ public final class RMMessageConstants { public static final String ORIGINAL_REQUESTOR_ROLE = "org.apache.cxf.client.original"; - /** Message content (must be an instance of {@link RewindableInputStream}. */ + /** Message content must be an instance of {@link CachedOutputStream}. */ public static final String SAVED_CONTENT = "org.apache.cxf.ws.rm.content"; + + /** Variable holds reference to source streams of the attachments. + * It must be an instance of {@link Closeable}. */ + public static final String ATTACHMENTS_CLOSEABLE = "org.apache.cxf.ws.rm.attachment.closeable"; /** Retransmission in progress flag (Boolean.TRUE if in progress). */ public static final String RM_RETRANSMISSION = "org.apache.cxf.ws.rm.retransmitting"; http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java index 0981f8e..43f01d9 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java @@ -19,6 +19,7 @@ package org.apache.cxf.ws.rm.persistence; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -38,7 +39,6 @@ import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; import org.apache.cxf.staxutils.StaxUtils; import org.apache.cxf.ws.rm.RMMessageConstants; -import org.apache.cxf.ws.rm.RewindableInputStream; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; /** @@ -105,13 +105,14 @@ public final class PersistenceUtils { public static void encodeRMContent(RMMessage rmmsg, Message msg, InputStream msgContent) throws IOException { + CachedOutputStream cos = new CachedOutputStream(); if (msg.getAttachments() == null) { rmmsg.setContentType((String)msg.get(Message.CONTENT_TYPE)); - rmmsg.setContent(msgContent); + IOUtils.copyAndCloseInput(msgContent, cos); + cos.flush(); + rmmsg.setContent(cos); } else { MessageImpl msgImpl1 = new MessageImpl(); - // using cached output stream to handle large files - CachedOutputStream cos = new CachedOutputStream(); msgImpl1.setContent(OutputStream.class, cos); msgImpl1.setAttachments(msg.getAttachments()); msgImpl1.put(Message.CONTENT_TYPE, (String) msg.get(Message.CONTENT_TYPE)); @@ -121,26 +122,49 @@ public final class PersistenceUtils { serializer.writeProlog(); // write soap root message into cached output stream IOUtils.copyAndCloseInput(msgContent, cos); + cos.flush(); serializer.writeAttachments(); rmmsg.setContentType((String) msgImpl1.get(Message.CONTENT_TYPE)); - - //TODO will pass the cos instance to rmmessage in the future - rmmsg.setContent(cos.getInputStream()); + rmmsg.setContent(cos); } } public static void decodeRMContent(RMMessage rmmsg, Message msg) throws IOException { String contentType = rmmsg.getContentType(); + final CachedOutputStream cos = rmmsg.getContent(); if ((null != contentType) && contentType.startsWith("multipart/related")) { + final InputStream is = cos.getInputStream(); msg.put(Message.CONTENT_TYPE, contentType); - msg.setContent(InputStream.class, rmmsg.getContent()); + msg.setContent(InputStream.class, is); AttachmentDeserializer ad = new AttachmentDeserializer(msg); ad.initializeAttachments(); + // create new cos with soap envelope only + CachedOutputStream cosSoap = new CachedOutputStream(); + IOUtils.copy(msg.getContent(InputStream.class), cosSoap); + cosSoap.flush(); + msg.put(RMMessageConstants.SAVED_CONTENT, cosSoap); + // REVISIT -- At the moment references must be hold for retransmission + // and the final cleanup of the CachedOutputStream. + msg.put(RMMessageConstants.ATTACHMENTS_CLOSEABLE, new Closeable() { + + @Override + public void close() throws IOException { + try { + is.close(); + } catch (IOException e) { + // Ignore + } + try { + cos.close(); + } catch (IOException e) { + // Ignore + } + } + + }); } else { - msg.setContent(InputStream.class, rmmsg.getContent()); + msg.put(RMMessageConstants.SAVED_CONTENT, cos); } - InputStream is = RewindableInputStream.makeRewindable(msg.getContent(InputStream.class)); - msg.setContent(InputStream.class, is); - msg.put(RMMessageConstants.SAVED_CONTENT, is); } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java index abab221..348117c 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java @@ -22,9 +22,11 @@ import java.io.InputStream; import java.util.Collections; import java.util.List; +import org.apache.cxf.io.CachedOutputStream; + public class RMMessage { - private InputStream content; + private CachedOutputStream content; //TODO remove attachments when we remove the deprecated attachments related methods private List<InputStream> attachments = Collections.emptyList(); private String contentType; @@ -48,11 +50,11 @@ public class RMMessage { } /** - * Sets the message content using the input stream. + * Sets the message content using the CachedOutputStream.class. * @param in */ - public void setContent(InputStream in) { - content = in; + public void setContent(CachedOutputStream cos) { + content = cos; } /** @@ -73,11 +75,11 @@ public class RMMessage { } /** - * Returns the input stream of this message content. + * Returns the CachedOutputStream of this message content. * @return * @throws IOException */ - public InputStream getContent() { + public CachedOutputStream getContent() { return content; } http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java index 7e626a5..641df46 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java @@ -51,6 +51,8 @@ import org.apache.cxf.common.i18n.Message; import org.apache.cxf.common.injection.NoJSR250Annotations; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.common.util.SystemPropertyAction; +import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.ws.rm.DestinationSequence; import org.apache.cxf.ws.rm.ProtocolVariation; @@ -599,7 +601,10 @@ public class RMTxStore implements RMStore { RMMessage msg = new RMMessage(); msg.setMessageNumber(mn); msg.setTo(to); - msg.setContent(blob.getBinaryStream()); + CachedOutputStream cos = new CachedOutputStream(); + IOUtils.copyAndCloseInput(blob.getBinaryStream(), cos); + cos.flush(); + msg.setContent(cos); msg.setContentType(contentType); msgs.add(msg); } @@ -607,6 +612,9 @@ public class RMTxStore implements RMStore { conex = ex; LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG" : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex); + } catch (IOException e) { + abort(con); + throw new RMStoreException(e); } finally { releaseResources(stmt, res); updateConnectionState(con, conex); @@ -735,8 +743,10 @@ public class RMTxStore implements RMStore { new Object[] {outbound ? "outbound" : "inbound", nr, id, to}); } PreparedStatement stmt = null; + CachedOutputStream cos = msg.getContent(); + InputStream msgin = null; try { - InputStream msgin = msg.getContent(); + msgin = cos.getInputStream(); stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR); stmt.setString(1, id); @@ -751,6 +761,10 @@ public class RMTxStore implements RMStore { } } finally { releaseResources(stmt, null); + if (null != msgin) { + msgin.close(); + } + cos.close(); // needed to clean-up tmp file folder } } http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java index 223430e..5ae80dd 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java @@ -19,6 +19,9 @@ package org.apache.cxf.ws.rm.soap; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -57,6 +60,7 @@ import org.apache.cxf.helpers.DOMUtils; import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.Interceptor; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.io.CachedOutputStreamCallback; import org.apache.cxf.io.WriteOnCloseOutputStream; import org.apache.cxf.message.Message; @@ -91,7 +95,6 @@ import org.apache.cxf.ws.rm.RMProperties; import org.apache.cxf.ws.rm.RMUtils; import org.apache.cxf.ws.rm.RetransmissionQueue; import org.apache.cxf.ws.rm.RetryStatus; -import org.apache.cxf.ws.rm.RewindableInputStream; import org.apache.cxf.ws.rm.SourceSequence; import org.apache.cxf.ws.rm.manager.RetryPolicyType; import org.apache.cxf.ws.rm.persistence.RMStore; @@ -585,10 +588,24 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { } private void releaseSavedMessage() { - RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT); - if (is != null) { - is.release(); + CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); + if (cos != null) { + cos.releaseTempFileHold(); + try { + cos.close(); + } catch (IOException e) { + // ignore + } } + // REVISIT -- When reference holder is not needed anymore, code can be removed. + Closeable closeable = (Closeable)message.get(RMMessageConstants.ATTACHMENTS_CLOSEABLE); + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e) { + // ignore + } + } } /** @@ -760,8 +777,9 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { } // read SOAP headers from saved input stream - RewindableInputStream is = (RewindableInputStream)message.get(RMMessageConstants.SAVED_CONTENT); - is.rewind(); + CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); + cos.holdTempFile(); // CachedOutputStream is hold until delivering was successful + InputStream is = cos.getInputStream(); // instance is needed to close input stream later on XMLStreamReader reader = StaxUtils.createXMLStreamReader(is, StandardCharsets.UTF_8.name()); message.getHeaders().clear(); if (reader.getEventType() != XMLStreamConstants.START_ELEMENT @@ -814,7 +832,7 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { retransmitChain.remove(incept); } } - retransmitChain.add(new CopyOutInterceptor(reader)); + retransmitChain.add(new CopyOutInterceptor(reader, is)); // restore callbacks on output stream if (callbacks != null) { @@ -922,10 +940,12 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { public static class CopyOutInterceptor extends AbstractOutDatabindingInterceptor { private final XMLStreamReader reader; + private InputStream is; - public CopyOutInterceptor(XMLStreamReader rdr) { + public CopyOutInterceptor(XMLStreamReader rdr, InputStream is) { super(Phase.MARSHAL); reader = rdr; + this.is = is; } @Override @@ -933,6 +953,13 @@ public class RetransmissionQueueImpl implements RetransmissionQueue { try { XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); StaxUtils.copy(reader, writer); + if (is != null) { + try { + is.close(); + } catch (IOException e) { + // ignore + } + } } catch (XMLStreamException e) { throw new Fault("COULD_NOT_READ_XML_STREAM", LOG, e); } http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java index 645708a..37fb6ac 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java @@ -41,6 +41,8 @@ import org.apache.cxf.endpoint.Client; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.endpoint.Server; import org.apache.cxf.helpers.CastUtils; +import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; import org.apache.cxf.service.Service; @@ -505,7 +507,7 @@ public class RMManagerTest extends Assert { } @Test - public void testRecoverReliableClientEndpoint() throws NoSuchMethodException { + public void testRecoverReliableClientEndpoint() throws NoSuchMethodException, IOException { Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint", new Class[] {Endpoint.class}); manager = control.createMock(RMManager.class, new Method[] {method}); @@ -563,7 +565,10 @@ public class RMManagerTest extends Assert { DestinationSequence ds = control.createMock(DestinationSequence.class); RMMessage m1 = new RMMessage(); InputStream fis = getClass().getResourceAsStream("persistence/SerializedRMMessage.txt"); - m1.setContent(fis); + CachedOutputStream cos = new CachedOutputStream(); + IOUtils.copyAndCloseInput(fis, cos); + cos.flush(); + m1.setContent(cos); m1.setTo("toAddress"); m1.setMessageNumber(new Long(10)); m1.setContentType(MULTIPART_TYPE); @@ -579,8 +584,8 @@ public class RMManagerTest extends Assert { assertNotNull(msg.getExchange()); assertSame(msg, msg.getExchange().getOutMessage()); - InputStream is = (InputStream) msg.get(RMMessageConstants.SAVED_CONTENT); - assertStartsWith(is, "<soap:Envelope"); + CachedOutputStream cos1 = (CachedOutputStream) msg.get(RMMessageConstants.SAVED_CONTENT); + assertStartsWith(cos1.getInputStream(), "<soap:Envelope"); assertEquals(1, msg.getAttachments().size()); } @@ -673,7 +678,8 @@ public class RMManagerTest extends Assert { void setUpRecoverReliableEndpoint(Endpoint endpoint, Conduit conduit, SourceSequence ss, - DestinationSequence ds, RMMessage m, Capture<Message> mc) { + DestinationSequence ds, RMMessage m, Capture<Message> mc) + throws IOException { RMStore store = control.createMock(RMStore.class); RetransmissionQueue queue = control.createMock(RetransmissionQueue.class); manager.setStore(store); @@ -735,7 +741,11 @@ public class RMManagerTest extends Assert { EasyMock.expect(m.getTo()).andReturn("toAddress"); } InputStream is = new ByteArrayInputStream(new byte[0]); - EasyMock.expect(m.getContent()).andReturn(is).anyTimes(); + CachedOutputStream cos = new CachedOutputStream(); + IOUtils.copy(is, cos); + cos.flush(); + is.close(); + EasyMock.expect(m.getContent()).andReturn(cos).anyTimes(); if (mc != null) { queue.addUnacknowledged(EasyMock.capture(mc)); http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java index c0667fb..9cccd2a 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/PersistenceUtilsTest.java @@ -29,9 +29,12 @@ import javax.activation.DataHandler; import javax.mail.util.ByteArrayDataSource; import org.apache.cxf.attachment.AttachmentImpl; +import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Attachment; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; +import org.apache.cxf.ws.rm.RMMessageConstants; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange; @@ -76,7 +79,7 @@ public class PersistenceUtilsTest extends Assert { // update rmmessage PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis); - assertStartsWith(rmmsg.getContent(), "<soap:"); + assertStartsWith(rmmsg.getContent().getInputStream(), "<soap:"); assertNotNull(rmmsg.getContentType()); assertTrue(rmmsg.getContentType().startsWith("text/xml")); } @@ -93,7 +96,7 @@ public class PersistenceUtilsTest extends Assert { // update rmmessage PersistenceUtils.encodeRMContent(rmmsg, messageImpl, bis); - assertStartsWith(rmmsg.getContent(), "--uuid:"); + assertStartsWith(rmmsg.getContent().getInputStream(), "--uuid:"); assertNotNull(rmmsg.getContentType()); assertTrue(rmmsg.getContentType().startsWith("multipart/related")); } @@ -112,21 +115,26 @@ public class PersistenceUtilsTest extends Assert { Message messageImplRestored = new MessageImpl(); PersistenceUtils.decodeRMContent(rmmsg, messageImplRestored); assertEquals(1, messageImplRestored.getAttachments().size()); - - assertStartsWith(messageImplRestored.getContent(InputStream.class), SOAP_PART); + CachedOutputStream cos = (CachedOutputStream)messageImplRestored.get(RMMessageConstants.SAVED_CONTENT); + assertStartsWith(cos.getInputStream(), SOAP_PART); } @Test public void testDecodeRMContentWithAttachment() throws Exception { InputStream is = getClass().getResourceAsStream("SerializedRMMessage.txt"); + CachedOutputStream cos = new CachedOutputStream(); + IOUtils.copyAndCloseInput(is, cos); + cos.flush(); RMMessage msg = new RMMessage(); - msg.setContent(is); + msg.setContent(cos); msg.setContentType(MULTIPART_TYPE); Message messageImpl = new MessageImpl(); PersistenceUtils.decodeRMContent(msg, messageImpl); assertEquals(1, messageImpl.getAttachments().size()); - assertStartsWith(messageImpl.getContent(InputStream.class), "<soap:Envelope"); + CachedOutputStream cos1 = (CachedOutputStream)messageImpl + .get(RMMessageConstants.SAVED_CONTENT); + assertStartsWith(cos1.getInputStream(), "<soap:Envelope"); } private static void addAttachment(Message msg) throws IOException { @@ -137,7 +145,7 @@ public class PersistenceUtilsTest extends Assert { msg.setAttachments(attachments); } - // just read the begining of the input and compare it against the specified string + // just read the beginning of the input and compare it against the specified string private static boolean assertStartsWith(InputStream in, String starting) { assertNotNull(in); byte[] buf = new byte[starting.length()]; http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java index 5badbe6..bf5c246 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMLargeMessageTest.java @@ -46,10 +46,6 @@ public class RMLargeMessageTest extends RMMessageTest { } } - @Test - public void testContentInputStream() throws Exception { - super.testContentInputStream(); - } @Test public void testContentCachedOutputStream() throws Exception { http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java index 41322ff..1247f80 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/RMMessageTest.java @@ -19,8 +19,6 @@ package org.apache.cxf.ws.rm.persistence; -import java.io.ByteArrayInputStream; - import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.io.CachedOutputStream; @@ -49,23 +47,13 @@ public class RMMessageTest extends Assert { } @Test - public void testContentInputStream() throws Exception { - RMMessage msg = new RMMessage(); - msg.setContent(new ByteArrayInputStream(DATA)); - - byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent()); - - assertArrayEquals(DATA, msgbytes); - } - - @Test public void testContentCachedOutputStream() throws Exception { RMMessage msg = new RMMessage(); CachedOutputStream co = new CachedOutputStream(); co.write(DATA); - msg.setContent(co.getInputStream()); + msg.setContent(co); - byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent()); + byte[] msgbytes = IOUtils.readBytesFromStream(msg.getContent().getInputStream()); assertArrayEquals(DATA, msgbytes); co.close(); http://git-wip-us.apache.org/repos/asf/cxf/blob/27a95f05/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java index fb62f34..a5f9723 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java @@ -30,6 +30,7 @@ import java.util.Date; import java.util.List; import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.ws.addressing.Names; import org.apache.cxf.ws.rm.DestinationSequence; @@ -222,8 +223,13 @@ public abstract class RMTxStoreTestBase extends Assert { EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE).anyTimes(); EasyMock.expect(msg2.getMessageNumber()).andReturn(ONE).anyTimes(); byte[] bytes = new byte[89]; - EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); - EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + CachedOutputStream cos = new CachedOutputStream(); + IOUtils.copy(bais, cos); + cos.flush(); + bais.close(); + EasyMock.expect(msg1.getContent()).andReturn(cos).anyTimes(); + EasyMock.expect(msg2.getContent()).andReturn(cos).anyTimes(); EasyMock.expect(msg1.getContentType()).andReturn("text/xml").times(1); control.replay(); @@ -241,7 +247,7 @@ public abstract class RMTxStoreTestBase extends Assert { control.reset(); EasyMock.expect(msg1.getMessageNumber()).andReturn(ONE); - EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)); + EasyMock.expect(msg1.getContent()).andReturn(cos); control.replay(); con = getConnection(); @@ -260,8 +266,8 @@ public abstract class RMTxStoreTestBase extends Assert { control.reset(); EasyMock.expect(msg1.getMessageNumber()).andReturn(TEN).anyTimes(); EasyMock.expect(msg2.getMessageNumber()).andReturn(TEN).anyTimes(); - EasyMock.expect(msg1.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); - EasyMock.expect(msg2.getContent()).andReturn(new ByteArrayInputStream(bytes)).anyTimes(); + EasyMock.expect(msg1.getContent()).andReturn(cos).anyTimes(); + EasyMock.expect(msg2.getContent()).andReturn(cos).anyTimes(); control.replay(); con = getConnection(); @@ -862,7 +868,12 @@ public abstract class RMTxStoreTestBase extends Assert { EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes(); byte[] value = ("Message " + mn.longValue()).getBytes(); - EasyMock.expect(msg.getContent()).andReturn(new ByteArrayInputStream(value)).anyTimes(); + ByteArrayInputStream bais = new ByteArrayInputStream(value); + CachedOutputStream cos = new CachedOutputStream(); + IOUtils.copy(bais, cos); + cos.flush(); + bais.close(); + EasyMock.expect(msg.getContent()).andReturn(cos).anyTimes(); return msg; } @@ -926,7 +937,7 @@ public abstract class RMTxStoreTestBase extends Assert { assertNull(msg.getTo()); } try { - InputStream actual = msg.getContent(); + InputStream actual = msg.getContent().getInputStream(); assertEquals(new String("Message " + mn), IOUtils.readStringFromStream(actual)); } catch (IOException e) { fail("failed to get the input stream");