[CXF-4209] Server side message redelivery support for WS-RM
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6b8a340c Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6b8a340c Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6b8a340c Branch: refs/heads/master Commit: 6b8a340c7c5d5c6cd24f421d0caeaa368e94891d Parents: 3a1fa0b Author: Kai Rommel <kai.rom...@sap.com> Authored: Tue Jun 7 17:47:58 2016 +0200 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Thu Jun 30 11:16:59 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/cxf/ws/rm/Destination.java | 23 + .../apache/cxf/ws/rm/DestinationSequence.java | 100 ++- .../org/apache/cxf/ws/rm/ManagedRMEndpoint.java | 192 ++--- .../cxf/ws/rm/RMCaptureInInterceptor.java | 227 +++++- .../cxf/ws/rm/RMCaptureOutInterceptor.java | 1 + .../apache/cxf/ws/rm/RMDeliveryInterceptor.java | 12 + .../java/org/apache/cxf/ws/rm/RMEndpoint.java | 3 + .../org/apache/cxf/ws/rm/RMInInterceptor.java | 74 +- .../java/org/apache/cxf/ws/rm/RMManager.java | 76 +- .../java/org/apache/cxf/ws/rm/RMProperties.java | 18 + .../org/apache/cxf/ws/rm/RedeliveryQueue.java | 106 +++ .../main/java/org/apache/cxf/ws/rm/Servant.java | 2 +- .../org/apache/cxf/ws/rm/feature/RMFeature.java | 4 +- .../apache/cxf/ws/rm/persistence/RMMessage.java | 9 + .../cxf/ws/rm/persistence/jdbc/RMTxStore.java | 74 +- .../cxf/ws/rm/soap/RMSoapInInterceptor.java | 5 +- .../cxf/ws/rm/soap/RedeliveryQueueImpl.java | 699 +++++++++++++++++++ .../configuration/wsrm-manager-types.xsd | 15 + .../schemas/configuration/wsrm-policy.xjb | 9 + .../cxf/ws/rm/DestinationSequenceTest.java | 4 +- .../org/apache/cxf/ws/rm/RMEndpointTest.java | 4 + .../apache/cxf/ws/rm/RMInInterceptorTest.java | 25 + .../org/apache/cxf/ws/rm/RMManagerTest.java | 43 +- .../rm/persistence/jdbc/RMTxStoreTestBase.java | 4 + .../cxf/systest/ws/rm/RedeliveryTest.java | 186 +++++ 25 files changed, 1703 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java index 178a63c..3d3489b 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java @@ -29,6 +29,7 @@ import java.util.logging.Logger; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.helpers.CastUtils; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; @@ -75,6 +76,14 @@ public class Destination extends AbstractEndpoint { processingSequenceCount.incrementAndGet(); } + // this method ensures to keep the sequence until all the messages are delivered + public void terminateSequence(DestinationSequence seq) { + seq.terminate(); + if (seq.allAcknowledgedMessagesDelivered()) { + removeSequence(seq); + } + } + public void removeSequence(DestinationSequence seq) { DestinationSequence o; o = map.remove(seq.getIdentifier().getValue()); @@ -208,6 +217,20 @@ public class Destination extends AbstractEndpoint { long mn = sequenceType.getMessageNumber().longValue(); seq.processingComplete(mn); seq.purgeAcknowledged(mn); + // remove acknowledged undelivered message + seq.removeDeliveringMessageNumber(mn); + if (seq.isTerminated() && seq.allAcknowledgedMessagesDelivered()) { + removeSequence(seq); + } + } + CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT); + if (saved != null) { + saved.releaseTempFileHold(); + try { + saved.close(); + } catch (IOException e) { + // ignore + } } } http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java index 58b7906..3442fc5 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java @@ -19,6 +19,8 @@ package org.apache.cxf.ws.rm; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -33,12 +35,15 @@ import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationProvider; import org.apache.cxf.continuations.SuspendedInvocationException; +import org.apache.cxf.interceptor.Fault; import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; import org.apache.cxf.ws.addressing.EndpointReferenceType; +import org.apache.cxf.ws.policy.PolicyVerificationInInterceptor; import org.apache.cxf.ws.rm.RMConfiguration.DeliveryAssurance; import org.apache.cxf.ws.rm.manager.AcksPolicyType; +import org.apache.cxf.ws.rm.persistence.PersistenceUtils; import org.apache.cxf.ws.rm.persistence.RMMessage; import org.apache.cxf.ws.rm.persistence.RMStore; import org.apache.cxf.ws.rm.v200702.Identifier; @@ -55,6 +60,7 @@ public class DestinationSequence extends AbstractSequence { private long lastMessageNumber; private SequenceMonitor monitor; private boolean acknowledgeOnNextOccasion; + private boolean terminated; private List<DeferredAcknowledgment> deferredAcknowledgments; private SequenceTermination scheduledTermination; private String correlationID; @@ -62,18 +68,25 @@ public class DestinationSequence extends AbstractSequence { private volatile long highNumberCompleted; private long nextInOrder; private List<Continuation> continuations = new LinkedList<Continuation>(); + // this map is used for robust and redelivery tracking. for redelivery it holds the beingDeliverd messages private Set<Long> deliveringMessageNumbers = new HashSet<Long>(); public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d, ProtocolVariation pv) { - this(i, a, 0, null, pv); + this(i, a, 0, false, null, pv); destination = d; } public DestinationSequence(Identifier i, EndpointReferenceType a, long lmn, SequenceAcknowledgement ac, ProtocolVariation pv) { + this(i, a, lmn, false, ac, pv); + } + + public DestinationSequence(Identifier i, EndpointReferenceType a, + long lmn, boolean t, SequenceAcknowledgement ac, ProtocolVariation pv) { super(i, pv); acksTo = a; lastMessageNumber = lmn; + terminated = t; acknowledgement = ac; if (null == acknowledgement) { acknowledgement = new SequenceAcknowledgement(); @@ -122,6 +135,7 @@ public class DestinationSequence extends AbstractSequence { } monitor.acknowledgeMessage(); + boolean updated = false; synchronized (this) { boolean done = false; @@ -136,11 +150,13 @@ public class DestinationSequence extends AbstractSequence { long diff = r.getLower() - messageNumber; if (diff == 1) { r.setLower(messageNumber); + updated = true; done = true; } else if (diff > 0) { break; } else if (messageNumber - r.getUpper().longValue() == 1) { r.setUpper(messageNumber); + updated = true; done = true; break; } @@ -152,6 +168,7 @@ public class DestinationSequence extends AbstractSequence { AcknowledgementRange range = new AcknowledgementRange(); range.setLower(messageNumber); range.setUpper(messageNumber); + updated = true; acknowledgement.getAcknowledgementRange().add(i, range); if (acknowledgement.getAcknowledgementRange().size() > 1) { @@ -163,18 +180,45 @@ public class DestinationSequence extends AbstractSequence { mergeRanges(); } - RMStore store = destination.getManager().getStore(); - if (null != store) { - RMMessage msg = null; - if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) { - msg = new RMMessage(); - CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); - msg.setContent(cos); - msg.setContentType((String) message.get(Message.CONTENT_TYPE)); - msg.setMessageNumber(st.getMessageNumber()); + if (updated) { + RMStore store = destination.getManager().getStore(); + if (null != store) { + // only save message, when policy verification is successful + // otherwise msgs will be stored and redelivered which do not pass initial verification + // as interceptor is called in a later phase than the capturing + PolicyVerificationInInterceptor intercep = new PolicyVerificationInInterceptor(); + boolean policiesVerified = false; + try { + intercep.handleMessage(message); + policiesVerified = true; + } catch (Fault e) { + // Ignore + } + RMMessage msg = null; + if (policiesVerified + && !MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) { + try { + msg = new RMMessage(); + CachedOutputStream cos = (CachedOutputStream)message + .get(RMMessageConstants.SAVED_CONTENT); + msg.setMessageNumber(st.getMessageNumber()); + msg.setCreatedTime(rmps.getCreatedTime()); + // in case no attachments are available, cos can be saved directly + if (message.getAttachments() == null) { + msg.setContent(cos); + msg.setContentType((String)message.get(Message.CONTENT_TYPE)); + } else { + InputStream is = cos.getInputStream(); + PersistenceUtils.encodeRMContent(msg, message, is); + } + store.persistIncoming(this, msg); + } catch (IOException e) { + throw new Fault(e); + } + } } - store.persistIncoming(this, msg); } + deliveringMessageNumbers.add(messageNumber); RMEndpoint reliableEndpoint = destination.getReliableEndpoint(); RMConfiguration cfg = reliableEndpoint.getConfiguration(); @@ -277,7 +321,7 @@ public class DestinationSequence extends AbstractSequence { return false; } if (robustDelivering) { - deliveringMessageNumbers.add(mn); + addDeliveringMessageNumber(mn); } if (config.isInOrder()) { return waitInQueue(mn, canSkip, message, cont); @@ -286,9 +330,21 @@ public class DestinationSequence extends AbstractSequence { } void removeDeliveringMessageNumber(long mn) { - deliveringMessageNumbers.remove(mn); + synchronized (deliveringMessageNumbers) { + deliveringMessageNumbers.remove(mn); + } + } + void addDeliveringMessageNumber(long mn) { + synchronized (deliveringMessageNumbers) { + deliveringMessageNumbers.add(mn); + } } + // this method is only used for redelivery + boolean allAcknowledgedMessagesDelivered() { + return deliveringMessageNumbers.isEmpty(); + } + private Continuation getContinuation(Message message) { if (message == null) { return null; @@ -496,6 +552,22 @@ public class DestinationSequence extends AbstractSequence { } } } + + void terminate() { + if (!terminated) { + terminated = true; + RMStore store = destination.getManager().getStore(); + if (null == store) { + return; + } + // only updating the sequence + store.persistIncoming(this, null); + } + } + + public boolean isTerminated() { + return terminated; + } final class SequenceTermination extends TimerTask { @@ -521,7 +593,7 @@ public class DestinationSequence extends AbstractSequence { LogUtils.log(LOG, Level.WARNING, "TERMINATING_INACTIVE_SEQ_MSG", DestinationSequence.this.getIdentifier().getValue()); - DestinationSequence.this.destination.removeSequence(DestinationSequence.this); + DestinationSequence.this.destination.terminateSequence(DestinationSequence.this); } else { // reschedule http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java index b96361d..c14bd84 100755 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java @@ -136,8 +136,7 @@ public class ManagedRMEndpoint implements ManagedComponent { if (outbound) { return endpoint.getManager().getRetransmissionQueue().countUnacknowledged(); } else { -// return endpoint.getManager().getRedeliveryQueue().countUndelivered(); - return 0; + return endpoint.getManager().getRedeliveryQueue().countUndelivered(); } } @@ -155,12 +154,11 @@ public class ManagedRMEndpoint implements ManagedComponent { } return manager.getRetransmissionQueue().countUnacknowledged(ss); } else { -// DestinationSequence ds = getDestinationSeq(sid); -// if (null == ds) { -// throw new IllegalArgumentException("no sequence"); -// } -// return manager.getRedeliveryQueue().countUndelivered(ds); - return 0; + DestinationSequence ds = getDestinationSeq(sid); + if (null == ds) { + throw new IllegalArgumentException("no sequence"); + } + return manager.getRedeliveryQueue().countUndelivered(ds); } } @@ -280,55 +278,55 @@ public class ManagedRMEndpoint implements ManagedComponent { return rsps; } -// @ManagedOperation(description = "Redelivery Status") -// @ManagedOperationParameters({ -// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"), -// @ManagedOperationParameter(name = "messageNumber", description = "The message number") -// }) -// public CompositeData getRedeliveryStatus(String sid, long num) throws JMException { -// DestinationSequence ds = getDestinationSeq(sid); -// if (null == ds) { -// throw new IllegalArgumentException("no sequence"); -// } -// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); -// RetryStatus rs = rq.getRedeliveryStatus(ds, num); -// return getRetryStatusProperties(num, rs); -// } - -// @ManagedOperation(description = "Redelivery Statuses") -// @ManagedOperationParameters({ -// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") -// }) -// public CompositeData[] getRedeliveryStatuses(String sid) throws JMException { -// DestinationSequence ds = getDestinationSeq(sid); -// if (null == ds) { -// throw new IllegalArgumentException("no sequence"); -// } -// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); -// Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds); -// -// CompositeData[] rsps = new CompositeData[rsmap.size()]; -// int i = 0; -// for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) { -// rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue()); -// } -// return rsps; -// } - -// @ManagedOperation(description = "List of UnDelivered Message Numbers") -// @ManagedOperationParameters({ -// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") -// }) -// public Long[] getUnDeliveredMessageIdentifiers(String sid) { -// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); -// DestinationSequence ds = getDestinationSeq(sid); -// if (null == ds) { -// throw new IllegalArgumentException("no sequence"); -// } -// -// List<Long> numbers = rq.getUndeliveredMessageNumbers(ds); -// return numbers.toArray(new Long[numbers.size()]); -// } + @ManagedOperation(description = "Redelivery Status") + @ManagedOperationParameters({ + @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"), + @ManagedOperationParameter(name = "messageNumber", description = "The message number") + }) + public CompositeData getRedeliveryStatus(String sid, long num) throws JMException { + DestinationSequence ds = getDestinationSeq(sid); + if (null == ds) { + throw new IllegalArgumentException("no sequence"); + } + RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); + RetryStatus rs = rq.getRedeliveryStatus(ds, num); + return getRetryStatusProperties(num, rs); + } + + @ManagedOperation(description = "Redelivery Statuses") + @ManagedOperationParameters({ + @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") + }) + public CompositeData[] getRedeliveryStatuses(String sid) throws JMException { + DestinationSequence ds = getDestinationSeq(sid); + if (null == ds) { + throw new IllegalArgumentException("no sequence"); + } + RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); + Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds); + + CompositeData[] rsps = new CompositeData[rsmap.size()]; + int i = 0; + for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) { + rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue()); + } + return rsps; + } + + @ManagedOperation(description = "List of UnDelivered Message Numbers") + @ManagedOperationParameters({ + @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") + }) + public Long[] getUnDeliveredMessageIdentifiers(String sid) { + RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); + DestinationSequence ds = getDestinationSeq(sid); + if (null == ds) { + throw new IllegalArgumentException("no sequence"); + } + + List<Long> numbers = rq.getUndeliveredMessageNumbers(ds); + return numbers.toArray(new Long[numbers.size()]); + } @ManagedOperation(description = "List of Source Sequence IDs") @ManagedOperationParameters({ @@ -383,31 +381,31 @@ public class ManagedRMEndpoint implements ManagedComponent { rq.resume(ss); } -// @ManagedOperation(description = "Suspend Redelivery Queue") -// @ManagedOperationParameters({ -// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") -// }) -// public void suspendDestinationQueue(String sid) throws JMException { -// DestinationSequence ds = getDestinationSeq(sid); -// if (null == ds) { -// throw new IllegalArgumentException("no sequence"); -// } -// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); -// rq.suspend(ds); -// } - -// @ManagedOperation(description = "Resume Redelivery Queue") -// @ManagedOperationParameters({ -// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") -// }) -// public void resumeDestinationQueue(String sid) throws JMException { -// DestinationSequence ds = getDestinationSeq(sid); -// if (null == ds) { -// throw new JMException("no source sequence"); -// } -// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); -// rq.resume(ds); -// } + @ManagedOperation(description = "Suspend Redelivery Queue") + @ManagedOperationParameters({ + @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") + }) + public void suspendDestinationQueue(String sid) throws JMException { + DestinationSequence ds = getDestinationSeq(sid); + if (null == ds) { + throw new IllegalArgumentException("no sequence"); + } + RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); + rq.suspend(ds); + } + + @ManagedOperation(description = "Resume Redelivery Queue") + @ManagedOperationParameters({ + @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") + }) + public void resumeDestinationQueue(String sid) throws JMException { + DestinationSequence ds = getDestinationSeq(sid); + if (null == ds) { + throw new JMException("no source sequence"); + } + RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); + rq.resume(ds); + } @ManagedOperation(description = "Current Source Sequence Properties") public CompositeData getCurrentSourceSequence() throws JMException { @@ -572,10 +570,13 @@ public class ManagedRMEndpoint implements ManagedComponent { public void removeDestinationSequence(String sid) throws JMException { DestinationSequence ds = getDestinationSeq(sid); if (null == ds) { - throw new JMException("no source sequence"); + throw new JMException("no destination sequence"); } -// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); -// rq.suspend(ds); + RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); + if (rq.countUndelivered(ds) > 0) { + throw new JMException("sequence not empty"); + } + rq.stop(ds); ds.getDestination().removeSequence(ds); } @@ -591,6 +592,19 @@ public class ManagedRMEndpoint implements ManagedComponent { RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue(); rq.purgeAll(ss); } + + @ManagedOperation(description = "Purge UnDelivered Messages") + @ManagedOperationParameters({ + @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") + }) + public void purgeUnDeliverededMessages(String sid) { + DestinationSequence ds = getDestinationSeq(sid); + if (null == ds) { + throw new IllegalArgumentException("no sequence"); + } + RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); + rq.purgeAll(ds); + } private static String getAddressValue(EndpointReferenceType epr) { if (null != epr && null != epr.getAddress()) { @@ -675,10 +689,10 @@ public class ManagedRMEndpoint implements ManagedComponent { // return endpoint.getManager().countCompleted(); // } -// @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10) -// public int getQueuedMessagesInboundCount() { -// return endpoint.getManager().getRedeliveryQueue().countUndelivered(); -// } + @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10) + public int getQueuedMessagesInboundCount() { + return endpoint.getManager().getRedeliveryQueue().countUndelivered(); + } // @ManagedAttribute(description = "Number of Inbound Completed Messages", currencyTimeLimit = 10) // public int getCompletedMessagesInboundCount() { http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java index 9d48cbc..87ae210 100755 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java @@ -19,51 +19,242 @@ package org.apache.cxf.ws.rm; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.logging.Logger; +import javax.xml.soap.SOAPException; +import javax.xml.soap.SOAPMessage; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; +import javax.xml.transform.stream.StreamSource; + import org.apache.cxf.common.logging.LogUtils; -import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.interceptor.StaxInInterceptor; import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.phase.AbstractPhaseInterceptor; import org.apache.cxf.phase.Phase; +import org.apache.cxf.staxutils.StaxUtils; +import org.apache.cxf.staxutils.transform.OutTransformWriter; +import org.apache.cxf.ws.addressing.AddressingProperties; /** * */ public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> { + private static final Logger LOG = LogUtils.getLogger(RMCaptureInInterceptor.class); - + public RMCaptureInInterceptor() { - super(Phase.PRE_STREAM); + super(Phase.POST_STREAM); + addAfter(StaxInInterceptor.class.getName()); } + @Override protected void handle(Message message) throws SequenceFault, RMException { - LOG.entering(getClass().getName(), "handleMessage"); - // This message capturing mechanism will need to be changed at some point. - // Until then, we keep this interceptor here and utilize the robust - // option to avoid the unnecessary message capturing/caching. - if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) { - InputStream is = message.getContent(InputStream.class); - if (is != null) { + + // all messages are initially captured as they cannot be distinguished at this phase + // Non application messages temp files are released (cos.releaseTempFileHold()) in RMInInterceptor + if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)) + && (getManager().getStore() != null || (getManager().getDestinationPolicy() != null && getManager() + .getDestinationPolicy().getRetryPolicy() != null))) { + + message.getInterceptorChain().add(new RMCaptureInEnd()); + XMLStreamReader reader = message.getContent(XMLStreamReader.class); + + if (null != reader) { CachedOutputStream saved = new CachedOutputStream(); + // REVISIT check factory for READER try { - IOUtils.copy(is, saved); - + StaxUtils.copy(reader, saved); saved.flush(); - is.close(); - saved.lockOutputStream(); - + saved.holdTempFile(); + reader.close(); + LOG.fine("Create new XMLStreamReader"); + InputStream is = saved.getInputStream(); + // keep References to clean-up tmp files in RMDeliveryInterceptor + setCloseable(message, saved, is); + XMLStreamReader newReader = StaxUtils.createXMLStreamReader(is); + StaxUtils.configureReader(reader, message); + message.setContent(XMLStreamReader.class, newReader); LOG.fine("Capturing the original RM message"); - //RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream()); - message.setContent(InputStream.class, saved.getInputStream()); message.put(RMMessageConstants.SAVED_CONTENT, saved); - } catch (Exception e) { + } catch (XMLStreamException | IOException e) { throw new Fault(e); } + } else { + org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message( + "No message found for redeliver", LOG, Collections.<String> emptyList()); + RMException ex = new RMException(msg); + throw new Fault(ex); + } + } + } + + private boolean isApplicationMessage(Message message) { + final AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false); + if (null != maps && null != maps.getAction()) { + return !RMContextUtils.isRMProtocolMessage(maps.getAction().getValue()); + } + return false; + } + + private void setCloseable(Message message, CachedOutputStream cos, InputStream is) { + message.put("org.apache.cxf.ws.rm.content.closeable", new Closeable() { + @Override + public void close() throws IOException { + try { + is.close(); + } catch (IOException e) { + // Ignore + } + try { + cos.close(); + } catch (IOException e) { + // Ignore + } + } + }); + } + + /** + * RMCaptureInEnd interceptor is used to switch saved_content, in case WSS is activated. + */ + private class RMCaptureInEnd extends AbstractPhaseInterceptor<Message> { + RMCaptureInEnd() { + super(Phase.PRE_LOGICAL); + addBefore(RMInInterceptor.class.getName()); + } + + @Override + public void handleFault(Message message) { + // in case of a SequenceFault SAVED_CONTENT must be released + Exception ex = message.getContent(Exception.class); + if (ex instanceof SequenceFault) { + Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable"); + if (null != closable) { + try { + closable.close(); + } catch (IOException e) { + // Ignore + } + } + CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); + if (saved != null) { + saved.releaseTempFileHold(); + try { + saved.close(); + } catch (IOException e) { + // ignore + } + } + } + } + + public void handleMessage(Message message) { + LOG.entering(getClass().getName(), "handleMessage"); + // Capturing the soap envelope. In case of WSS was activated, decrypted envelope is captured. + if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)) + && isApplicationMessage(message) + && (getManager().getStore() != null || (getManager().getDestinationPolicy() != null && getManager() + .getDestinationPolicy().getRetryPolicy() != null))) { + + CachedOutputStream saved = new CachedOutputStream(); + SOAPMessage soapMessage = message.getContent(SOAPMessage.class); + + if (soapMessage != null) { + try { + javax.xml.transform.Source envelope = soapMessage.getSOAPPart().getContent(); + StaxUtils.copy(envelope, saved); + saved.flush(); + // create a new source part from cos + InputStream is = saved.getInputStream(); + // close old saved content + closeOldSavedContent(message); + // keep References to clean-up tmp files in RMDeliveryInterceptor + setCloseable(message, saved, is); + StreamSource source = new StreamSource(is); + soapMessage.getSOAPPart().setContent(source); + // when WSS was activated, saved content still contains soap headers to be removed + message.put(RMMessageConstants.SAVED_CONTENT, removeUnnecessarySoapHeaders(saved)); + } catch (SOAPException | IOException | XMLStreamException e) { + throw new Fault(e); + } + } + } + } + + private void closeOldSavedContent(Message message) { + CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); + if (saved != null) { + saved.releaseTempFileHold(); + try { + saved.close(); + } catch (IOException e) { + // ignore + } + } + Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable"); + if (null != closable) { + try { + closable.close(); + } catch (IOException e) { + // Ignore + } + } + } + + private CachedOutputStream removeUnnecessarySoapHeaders(CachedOutputStream saved) { + CachedOutputStream newSaved = new CachedOutputStream(); + + InputStream is = null; + try { + is = saved.getInputStream(); + XMLStreamWriter capture = StaxUtils.createXMLStreamWriter(newSaved, + StandardCharsets.UTF_8.name()); + Map<String, String> map = new HashMap<String, String>(); + map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}Sequence", ""); + map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}SequenceAcknowledgement", ""); + map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}Sequence", ""); + map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}SequenceAcknowledgement", ""); + map.put("{http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd}Security", + ""); + // attributes to be removed + Map<String, String> amap = new HashMap<String, String>(); + amap.put("{http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd}Id", + ""); + + capture = new OutTransformWriter(capture, map, Collections.<String, String> emptyMap(), + Collections.<String> emptyList(), amap, false, null); + StaxUtils.copy(new StreamSource(is), capture); + capture.flush(); + capture.close(); + newSaved.flush(); + // hold temp file, otherwise it will be deleted in case msg was written to RMTxStore + // or resend was executed + newSaved.holdTempFile(); + is.close(); + } catch (IOException | XMLStreamException e) { + throw new Fault(e); + } finally { + if (null != is) { + try { + is.close(); + } catch (IOException e) { + // Ignore + } + } } + return newSaved; } } } http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java index b3c412d..af682c3 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java @@ -291,6 +291,7 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> { } // serializes the message content and the attachments into // the RMMessage content + msg.setCreatedTime(rmps.getCreatedTime()); PersistenceUtils.encodeRMContent(msg, message, is); store.persistOutgoing(ss, msg); } http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java index 262e066..ff34c95 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java @@ -19,6 +19,8 @@ package org.apache.cxf.ws.rm; +import java.io.Closeable; +import java.io.IOException; import java.util.logging.Logger; import org.apache.cxf.common.logging.LogUtils; @@ -60,5 +62,15 @@ public class RMDeliveryInterceptor extends AbstractRMInterceptor<Message> { dest.acknowledge(message); } dest.processingComplete(message); + + // close InputStream of RMCaptureInInterceptor, to delete tmp files in filesystem + Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable"); + if (null != closable) { + try { + closable.close(); + } catch (IOException e) { + // Ignore + } + } } } http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java index e393124..03f6d13 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java @@ -830,6 +830,9 @@ public class RMEndpoint { for (SourceSequence ss : getSource().getAllSequences()) { manager.getRetransmissionQueue().stop(ss); } + for (DestinationSequence ds : getDestination().getAllSequences()) { + manager.getRedeliveryQueue().stop(ds); + } // unregistering of this managed bean from the server is done by the bus itself } http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java index 5b516e4..c82412b 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java @@ -24,6 +24,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; @@ -32,6 +33,7 @@ import org.apache.cxf.ws.addressing.ContextUtils; import org.apache.cxf.ws.addressing.MAPAggregator; import org.apache.cxf.ws.rm.v200702.Identifier; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; +import org.apache.cxf.ws.rm.v200702.SequenceType; import org.apache.cxf.ws.security.trust.STSUtils; /** @@ -49,17 +51,21 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> { @Override public void handleFault(Message message) { message.put(MAPAggregator.class.getName(), true); - if (RMContextUtils.getProtocolVariation(message) != null - && MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) { - // revert the delivering entry from the destination sequence - try { - Destination destination = getManager().getDestination(message); - if (destination != null) { - destination.releaseDeliveringStatus(message); + if (RMContextUtils.getProtocolVariation(message) != null) { + if (MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) { + // revert the delivering entry from the destination sequence + try { + Destination destination = getManager().getDestination(message); + if (destination != null) { + destination.releaseDeliveringStatus(message); + } + } catch (RMException e) { + LOG.log(Level.WARNING, "Failed to revert the delivering status"); } - } catch (RMException e) { - LOG.log(Level.WARNING, "Failed to revert the delivering status"); - } + } else if (isRedeliveryEnabled(message) && RMContextUtils.isServerSide(message) + && isApplicationMessage(message) && hasValidSequence(message)) { + getManager().getRedeliveryQueue().addUndelivered(message); + } } // make sure the fault is returned for an ws-rm related fault or an invalid ws-rm message // note that OneWayProcessingInterceptor handles the robust case, hence not handled here. @@ -82,6 +88,42 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> { || message.getContent(Exception.class) instanceof SequenceFault); } + private boolean hasValidSequence(Message message) { + final RMProperties rmps = RMContextUtils.retrieveRMProperties(message, false); + if (rmps != null) { + SequenceType st = rmps.getSequence(); + if (st != null && st.getIdentifier() != null) { + try { + Destination destination = getManager().getDestination(message); + if (destination != null && destination.getSequence(st.getIdentifier()) != null) { + return true; + } + } catch (RMException e) { + // fall through + } + } + } + return false; + } + + private static boolean isApplicationMessage(Message message) { + final AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false); + if (null != maps && null != maps.getAction()) { + return !RMContextUtils.isRMProtocolMessage(maps.getAction().getValue()); + } + return false; + } + + private boolean isRedeliveryEnabled(Message message) { + // deprecated redelivery mode check + if (MessageUtils.isTrue(message.getContextualProperty("org.apache.cxf.ws.rm.destination.redeliver"))) { + LOG.warning("Use RetryPolicy to enable the redelivery mode"); + return true; + } + return getManager().getDestinationPolicy() != null + && getManager().getDestinationPolicy().getRetryPolicy() != null; + } + protected void handle(Message message) throws SequenceFault, RMException { LOG.entering(getClass().getName(), "handleMessage"); @@ -138,10 +180,6 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> { } RMContextUtils.setProtocolVariation(message, protocol); - // Destination destination = getManager().getDestination(message); - // RMEndpoint rme = getManager().getReliableEndpoint(message); - // Servant servant = new Servant(rme); - boolean isApplicationMessage = !RMContextUtils.isRMProtocolMessage(action); LOG.fine("isApplicationMessage: " + isApplicationMessage); @@ -164,6 +202,12 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> { rme.receivedApplicationMessage(); } } else { + // in case message is not an application message, release SAVED_CONTENT + // otherwise tmp files will not be closed + CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT); + if (null != cos) { + cos.releaseTempFileHold(); + } rme.receivedControlMessage(); if (RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action) || RM11Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)) { @@ -215,7 +259,7 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> { final boolean robust = MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)); if (robust) { - // set this property to change the acknlowledging behavior + // set this property to change the acknowledging behavior message.put(RMMessageConstants.DELIVERING_ROBUST_ONEWAY, Boolean.TRUE); } destination.acknowledge(message); http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java index 9639cfe..6a0839e 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java @@ -53,6 +53,9 @@ import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; import org.apache.cxf.phase.PhaseInterceptorChain; import org.apache.cxf.service.Service; +import org.apache.cxf.service.model.BindingInfo; +import org.apache.cxf.service.model.InterfaceInfo; +import org.apache.cxf.service.model.ServiceInfo; import org.apache.cxf.transport.Conduit; import org.apache.cxf.ws.addressing.AddressingProperties; import org.apache.cxf.ws.addressing.ContextUtils; @@ -70,6 +73,7 @@ import org.apache.cxf.ws.rm.persistence.PersistenceUtils; import org.apache.cxf.ws.rm.persistence.RMMessage; import org.apache.cxf.ws.rm.persistence.RMStore; import org.apache.cxf.ws.rm.policy.RMPolicyUtilities; +import org.apache.cxf.ws.rm.soap.RedeliveryQueueImpl; import org.apache.cxf.ws.rm.soap.RetransmissionQueueImpl; import org.apache.cxf.ws.rm.soap.SoapFaultFactory; import org.apache.cxf.ws.rm.v200702.CloseSequenceType; @@ -112,6 +116,7 @@ public class RMManager { private RMStore store; private SequenceIdentifierGenerator idGenerator; private RetransmissionQueue retransmissionQueue; + private RedeliveryQueue redeliveryQueue; private Map<Endpoint, RMEndpoint> reliableEndpoints = new ConcurrentHashMap<Endpoint, RMEndpoint>(); private AtomicReference<Timer> timer = new AtomicReference<Timer>(); private RMConfiguration configuration; @@ -185,6 +190,14 @@ public class RMManager { retransmissionQueue = rq; } + public RedeliveryQueue getRedeliveryQueue() { + return redeliveryQueue; + } + + public void setRedeliveryQueue(RedeliveryQueue redeliveryQueue) { + this.redeliveryQueue = redeliveryQueue; + } + public SequenceIdentifierGenerator getIdGenerator() { return idGenerator; } @@ -556,10 +569,10 @@ public class RMManager { } for (DestinationSequence ds : dss) { - reconverDestinationSequence(endpoint, conduit, rme.getDestination(), ds); + recoverDestinationSequence(endpoint, conduit, rme.getDestination(), ds); } retransmissionQueue.start(); - + redeliveryQueue.start(); } private void recoverSourceSequence(Endpoint endpoint, Conduit conduit, Source s, @@ -570,7 +583,7 @@ public class RMManager { return; } LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size()); - + // only recover the sequence if there are pending messages s.addSequence(ss, false); // choosing an arbitrary valid source sequence as the current source sequence if (s.getAssociatedSequence(null) == null && !ss.isExpired() && !ss.isLastMessage()) { @@ -596,6 +609,7 @@ public class RMManager { st.setMessageNumber(m.getMessageNumber()); RMProperties rmps = new RMProperties(); rmps.setSequence(st); + rmps.setCreatedTime(m.getCreatedTime()); rmps.exposeAs(ss.getProtocol().getWSRMNamespace()); if (ss.isLastMessage() && ss.getCurrentMessageNr() == m.getMessageNumber()) { CloseSequenceType close = new CloseSequenceType(); @@ -623,10 +637,59 @@ public class RMManager { } } - private void reconverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d, + private void recoverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d, DestinationSequence ds) { + // always recover the sequence d.addSequence(ds, false); - //TODO add the redelivery code + + Collection<RMMessage> ms = store.getMessages(ds.getIdentifier(), false); + if (null == ms || 0 == ms.size()) { + return; + } + LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size()); + + for (RMMessage m : ms) { + Message message = new MessageImpl(); + Exchange exchange = new ExchangeImpl(); + message.setExchange(exchange); + if (null != conduit) { + exchange.setConduit(conduit); + } + exchange.put(Endpoint.class, endpoint); + exchange.put(Service.class, endpoint.getService()); + if (endpoint.getEndpointInfo().getService() != null) { + exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService()); + exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface()); + } + exchange.put(Binding.class, endpoint.getBinding()); + exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding()); + exchange.put(Bus.class, bus); + + SequenceType st = new SequenceType(); + st.setIdentifier(ds.getIdentifier()); + st.setMessageNumber(m.getMessageNumber()); + RMProperties rmps = new RMProperties(); + rmps.setSequence(st); + rmps.setCreatedTime(m.getCreatedTime()); + RMContextUtils.storeRMProperties(message, rmps, false); + try { + // RMMessage is stored in a serialized way, therefore + // RMMessage content must be splitted into soap root message + // and attachments + PersistenceUtils.decodeRMContent(m, message); + redeliveryQueue.addUndelivered(message); + // add acknowledged undelivered message + ds.addDeliveringMessageNumber(m.getMessageNumber()); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Error reading persisted message data", e); + } + } + + // if no messages are recovered and the sequence has been already terminated, remove the sequence + if (ds.isTerminated() && ds.allAcknowledgedMessagesDelivered()) { + d.removeSequence(ds); + store.removeDestinationSequence(ds.getIdentifier()); + } } RMEndpoint createReliableEndpoint(final Endpoint endpoint) { @@ -664,6 +727,9 @@ public class RMManager { if (null == retransmissionQueue) { retransmissionQueue = new RetransmissionQueueImpl(this); } + if (null == redeliveryQueue) { + redeliveryQueue = new RedeliveryQueueImpl(this); + } if (null == idGenerator) { idGenerator = new DefaultSequenceIdentifierGenerator(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java index 448bfc6..f137803 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java @@ -37,6 +37,7 @@ public class RMProperties { private CloseSequenceType closeSequence; private String namespaceURI; private boolean lastMessage; + private long createdTime = System.currentTimeMillis(); public Collection<SequenceAcknowledgement> getAcks() { return acks; @@ -120,4 +121,21 @@ public class RMProperties { public void exposeAs(String uri) { namespaceURI = uri; } + + /** + * Get the initial creation time of this RM properties instance. + * @return Returns the createdTime. + */ + public long getCreatedTime() { + return createdTime; + } + + /** + * Set the initial creation time of this RM properties instance. + * + * @param createdTime The createdTime to set. + */ + public void setCreatedTime(long createdTime) { + this.createdTime = createdTime; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java new file mode 100644 index 0000000..4737859 --- /dev/null +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.ws.rm; + +import java.util.List; +import java.util.Map; + +import org.apache.cxf.message.Message; + +public interface RedeliveryQueue { + + + String DEFAULT_BASE_REDELIVERY_INTERVAL = "3000"; + int DEFAULT_EXPONENTIAL_BACKOFF = 2; + + /** + * @param seq the sequence under consideration + * @return the number of unacknowledged messages for that sequence + */ + int countUndelivered(DestinationSequence seq); + + /** + * @return the total number of undelivered messages in this queue + */ + int countUndelivered(); + + /** + * @return true if there are no unacknowledged messages in the queue + */ + boolean isEmpty(); + + /** + * Accepts a failed message for possible future redelivery. + * @param message the message context. + */ + void addUndelivered(Message message); + + /** + * Purge all candiates for the given sequence. + * + * @param seq the sequence object + */ + void purgeAll(DestinationSequence seq); + + /** + * + * @param seq + * @return + */ + List<Long> getUndeliveredMessageNumbers(DestinationSequence seq); + + /** + * Returns the retransmission status for the specified message. + * @param seq + * @param num + * @return + */ + RetryStatus getRedeliveryStatus(DestinationSequence seq, long num); + + /** + * Return the retransmission status of all the messages assigned to the sequence. + * @param seq + * @return + */ + Map<Long, RetryStatus> getRedeliveryStatuses(DestinationSequence seq); + + /** + * Initiate resends. + */ + void start(); + + /** + * Stops redelivery queue. + * @param seq + */ + void stop(DestinationSequence seq); + + /** + * Suspends the redelivery attempts for the specified sequence + * @param seq + */ + void suspend(DestinationSequence seq); + + /** + * Resumes the redelivery attempts for the specified sequence + * @param seq + */ + void resume(DestinationSequence seq); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java index 7cfeade..80a64bc 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java @@ -245,7 +245,7 @@ public class Servant implements Invoker { return null; } - destination.removeSequence(terminatedSeq); + destination.terminateSequence(terminatedSeq); // the following may be necessary if the last message for this sequence was a oneway // request and hence there was no response to which a last message could have been added http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java index 08cbf2a..90db17b 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java @@ -123,9 +123,7 @@ public class RMFeature extends AbstractFeature { provider.getInInterceptors().add(rmLogicalIn); provider.getInInterceptors().add(rmInCodec); provider.getInInterceptors().add(rmDelivery); - if (null != store) { - provider.getInInterceptors().add(rmCaptureIn); - } + provider.getInInterceptors().add(rmCaptureIn); provider.getOutInterceptors().add(rmLogicalOut); provider.getOutInterceptors().add(rmOutCodec); http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java index 348117c..ad61b04 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java @@ -32,6 +32,7 @@ public class RMMessage { private String contentType; private long messageNumber; private String to; + private long createdTime; /** * Returns the message number of the message within its sequence. @@ -120,4 +121,12 @@ public class RMMessage { this.contentType = contentType; } + public long getCreatedTime() { + return createdTime; + } + + public void setCreatedTime(long createdTime) { + this.createdTime = createdTime; + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java index 641df46..e3cfa0b 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java @@ -75,6 +75,7 @@ public class RMTxStore implements RMStore { {"LAST_MSG_NO", "DECIMAL(19, 0)"}, {"ENDPOINT_ID", "VARCHAR(1024)"}, {"ACKNOWLEDGED", "BLOB"}, + {"TERMINATED", "CHAR(1)"}, {"PROTOCOL_VERSION", "VARCHAR(256)"}}; private static final String[] DEST_SEQUENCES_TABLE_KEYS = {"SEQ_ID"}; private static final String[][] SRC_SEQUENCES_TABLE_COLS @@ -90,6 +91,7 @@ public class RMTxStore implements RMStore { = {{"SEQ_ID", "VARCHAR(256) NOT NULL"}, {"MSG_NO", "DECIMAL(19, 0) NOT NULL"}, {"SEND_TO", "VARCHAR(256)"}, + {"CREATED_TIME", "DECIMAL(19, 0)"}, {"CONTENT", "BLOB"}, {"CONTENT_TYPE", "VARCHAR(1024)"}}; private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"}; @@ -119,27 +121,27 @@ public class RMTxStore implements RMStore { private static final String DELETE_SRC_SEQUENCE_STMT_STR = "DELETE FROM CXF_RM_SRC_SEQUENCES WHERE SEQ_ID = ?"; private static final String UPDATE_DEST_SEQUENCE_STMT_STR = - "UPDATE CXF_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?"; + "UPDATE CXF_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, TERMINATED = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?"; private static final String UPDATE_SRC_SEQUENCE_STMT_STR = "UPDATE CXF_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?"; private static final String CREATE_MESSAGE_STMT_STR - = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?)"; + = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CREATED_TIME, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?, ?)"; private static final String DELETE_MESSAGE_STMT_STR = "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?"; private static final String SELECT_DEST_SEQUENCE_STMT_STR = - "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES " + "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, TERMINATED, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES " + "WHERE SEQ_ID = ?"; private static final String SELECT_SRC_SEQUENCE_STMT_STR = "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION FROM CXF_RM_SRC_SEQUENCES " + "WHERE SEQ_ID = ?"; private static final String SELECT_DEST_SEQUENCES_STMT_STR = - "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES " + "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, TERMINATED, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES " + "WHERE ENDPOINT_ID = ?"; private static final String SELECT_SRC_SEQUENCES_STMT_STR = "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION " + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?"; private static final String SELECT_MESSAGES_STMT_STR = - "SELECT MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?"; + "SELECT MSG_NO, SEND_TO, CREATED_TIME, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?"; private static final String ALTER_TABLE_STMT_STR = "ALTER TABLE {0} ADD {1} {2}"; private static final String CREATE_INBOUND_MESSAGE_STMT_STR = @@ -395,13 +397,14 @@ public class RMTxStore implements RMStore { EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1)); long lm = res.getLong(2); ProtocolVariation pv = decodeProtocolVersion(res.getString(3)); - InputStream is = res.getBinaryStream(4); + boolean t = res.getBoolean(4); + InputStream is = res.getBinaryStream(5); SequenceAcknowledgement ack = null; if (null != is) { ack = PersistenceUtils.getInstance() .deserialiseAcknowledgment(is); } - return new DestinationSequence(sid, acksTo, lm, ack, pv); + return new DestinationSequence(sid, acksTo, lm, t, ack, pv); } } catch (SQLException ex) { conex = ex; @@ -521,13 +524,14 @@ public class RMTxStore implements RMStore { EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2)); long lm = res.getLong(3); ProtocolVariation pv = decodeProtocolVersion(res.getString(4)); - InputStream is = res.getBinaryStream(5); + boolean t = res.getBoolean(5); + InputStream is = res.getBinaryStream(6); SequenceAcknowledgement ack = null; if (null != is) { ack = PersistenceUtils.getInstance() .deserialiseAcknowledgment(is); } - DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, pv); + DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, t, ack, pv); seqs.add(seq); } } catch (SQLException ex) { @@ -596,11 +600,13 @@ public class RMTxStore implements RMStore { while (res.next()) { long mn = res.getLong(1); String to = res.getString(2); - Blob blob = res.getBlob(3); - String contentType = res.getString(4); + long ct = res.getLong(3); + Blob blob = res.getBlob(4); + String contentType = res.getString(5); RMMessage msg = new RMMessage(); msg.setMessageNumber(mn); msg.setTo(to); + msg.setCreatedTime(ct); CachedOutputStream cos = new CachedOutputStream(); IOUtils.copyAndCloseInput(blob.getBinaryStream(), cos); cos.flush(); @@ -752,8 +758,9 @@ public class RMTxStore implements RMStore { stmt.setString(1, id); stmt.setLong(2, nr); stmt.setString(3, to); - stmt.setBinaryStream(4, msgin); - stmt.setString(5, contentType); + stmt.setLong(4, msg.getCreatedTime()); + stmt.setBinaryStream(5, msgin); + stmt.setString(6, contentType); stmt.execute(); if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}", @@ -805,10 +812,11 @@ public class RMTxStore implements RMStore { stmt = getStatement(con, UPDATE_DEST_SEQUENCE_STMT_STR); long lastMessageNr = seq.getLastMessageNumber(); - stmt.setLong(1, lastMessageNr); + stmt.setLong(1, lastMessageNr); + stmt.setString(2, seq.isTerminated() ? "1" : "0"); InputStream is = PersistenceUtils.getInstance().serialiseAcknowledgment(seq.getAcknowledgment()); - stmt.setBinaryStream(2, is, is.available()); - stmt.setString(3, seq.getIdentifier().getValue()); + stmt.setBinaryStream(3, is, is.available()); + stmt.setString(4, seq.getIdentifier().getValue()); stmt.execute(); } finally { releaseResources(stmt, null); @@ -884,12 +892,11 @@ public class RMTxStore implements RMStore { } protected void verifyTable(Connection con, String tableName, String[][] tableCols) { - List<String[]> newCols = new ArrayList<String[]>(); - ResultSet rs = null; try { DatabaseMetaData metadata = con.getMetaData(); - rs = metadata.getColumns(null, null, tableName, "%"); + ResultSet rs = metadata.getColumns(null, null, tableName, "%"); Set<String> dbCols = new HashSet<String>(); + List<String[]> newCols = new ArrayList<String[]>(); while (rs.next()) { dbCols.add(rs.getString(4)); } @@ -898,26 +905,12 @@ public class RMTxStore implements RMStore { newCols.add(col); } } - } catch (SQLException ex) { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("Table " + tableName + " cannot be verified."); - } - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - // ignore + if (newCols.size() > 0) { + // need to add the new columns + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "Table " + tableName + " needs additional columns"); } - } - } - if (newCols.size() > 0) { - // need to add the new columns - if (LOG.isLoggable(Level.FINE)) { - LOG.log(Level.FINE, "Table " + tableName + " needs additional columns"); - } - - try { + for (String[] newCol : newCols) { Statement st = con.createStatement(); try { @@ -931,9 +924,9 @@ public class RMTxStore implements RMStore { st.close(); } } - } catch (SQLException ex) { - LOG.log(Level.WARNING, "Table " + tableName + " cannot be altered.", ex); } + } catch (SQLException ex) { + LOG.log(Level.WARNING, "Table " + tableName + " cannot be altered.", ex); } } @@ -965,7 +958,6 @@ public class RMTxStore implements RMStore { for (int i = 0; i < SET_SCHEMA_STMT_STRS.length; i++) { try { stmt.executeUpdate(MessageFormat.format(SET_SCHEMA_STMT_STRS[i], schemaName)); - ex0 = null; break; } catch (SQLException ex) { ex.setNextException(ex0); http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java index 8e1b25e..9215031 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java @@ -139,7 +139,10 @@ public class RMSoapInInterceptor extends AbstractSoapInterceptor { * @return the RM properties */ public RMProperties unmarshalRMProperties(SoapMessage message) { - RMProperties rmps = new RMProperties(); + RMProperties rmps = (RMProperties)message.get(RMContextUtils.getRMPropertiesKey(false)); + if (rmps == null) { + rmps = new RMProperties(); + } List<Header> headers = message.getHeaders(); if (headers != null) { decodeHeaders(message, headers, rmps);