Repository: cxf
Updated Branches:
  refs/heads/master 3a1fa0b23 -> 6b8a340c7


http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java 
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java
new file mode 100644
index 0000000..e2ff271
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RedeliveryQueueImpl.java
@@ -0,0 +1,699 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.soap;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TimerTask;
+import java.util.TreeSet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.stream.XMLStreamReader;
+
+import org.w3c.dom.Node;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.binding.soap.SoapMessage;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptorChain;
+import org.apache.cxf.phase.PhaseManager;
+import org.apache.cxf.staxutils.StaxSource;
+import org.apache.cxf.ws.rm.DestinationSequence;
+import org.apache.cxf.ws.rm.RMCaptureInInterceptor;
+import org.apache.cxf.ws.rm.RMContextUtils;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.RedeliveryQueue;
+import org.apache.cxf.ws.rm.RetryStatus;
+import org.apache.cxf.ws.rm.manager.RetryPolicyType;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.v200702.Identifier;
+import org.apache.cxf.ws.rm.v200702.SequenceType;
+
+/**
+ * 
+ */
+public class RedeliveryQueueImpl implements RedeliveryQueue {
+    private static final Logger LOG = 
LogUtils.getL7dLogger(RedeliveryQueueImpl.class);
+
+    private Map<String, List<RedeliverCandidate>> candidates = 
+        new HashMap<String, List<RedeliverCandidate>>();
+    private Map<String, List<RedeliverCandidate>> suspendedCandidates = 
+        new HashMap<String, List<RedeliverCandidate>>();
+    
+    private RMManager manager;
+    
+    private int undeliveredCount;
+
+    public RedeliveryQueueImpl(RMManager m) {
+        manager = m;
+    }
+    
+    public RMManager getManager() {
+        return manager;
+    }
+
+    public void setManager(RMManager m) {
+        manager = m;
+    }
+
+    public void addUndelivered(Message message) {
+        cacheUndelivered(message);
+    }
+
+    /**
+     * @param seq the sequence under consideration
+     * @return the number of undelivered messages for that sequence
+     */
+    public synchronized int countUndelivered(DestinationSequence seq) {
+        List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(seq);
+        return sequenceCandidates == null ? 0 : sequenceCandidates.size();
+    }
+
+    public int countUndelivered() {
+        return undeliveredCount;
+    }
+
+    public boolean isEmpty() {
+        return 0 == getUndelivered().size();
+    }
+    public void purgeAll(DestinationSequence seq) {
+        Collection<Long> purged = new ArrayList<Long>();
+        synchronized (this) {
+            LOG.fine("Start purging redeliver candidates.");
+            List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(seq);
+            if (null != sequenceCandidates) {
+                for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+                    RedeliverCandidate candidate = sequenceCandidates.get(i);
+                    long m = candidate.getNumber();
+                    sequenceCandidates.remove(i);
+                    candidate.resolved();
+                    undeliveredCount--;
+                    purged.add(m);
+                }
+                if (sequenceCandidates.isEmpty()) {
+                    candidates.remove(seq.getIdentifier().getValue());
+                }
+            }
+            LOG.fine("Completed purging redeliver candidates.");
+        }
+        if (purged.size() > 0) {
+            RMStore store = manager.getStore();
+            if (null != store) {
+                store.removeMessages(seq.getIdentifier(), purged, false);
+            }
+        }
+    }
+
+    public List<Long> getUndeliveredMessageNumbers(DestinationSequence seq) {
+        List<Long> undelivered = new ArrayList<Long>();
+        List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(seq);
+        if (null != sequenceCandidates) {
+            for (int i = 0; i < sequenceCandidates.size(); i++) {
+                RedeliverCandidate candidate = sequenceCandidates.get(i);
+                RMProperties properties = 
RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+                                                                              
false);
+                SequenceType st = properties.getSequence();
+                undelivered.add(st.getMessageNumber());
+            }
+        }
+        return undelivered;
+    }
+
+    /**
+     * @param seq the sequence under consideration
+     * @return the list of resend candidates for that sequence
+     * @pre called with mutex held
+     */
+    protected List<RedeliverCandidate> 
getSequenceCandidates(DestinationSequence seq) {
+        return getSequenceCandidates(seq.getIdentifier().getValue());
+    }
+
+    /**
+     * @param key the sequence identifier under consideration
+     * @return the list of resend candidates for that sequence
+     * @pre called with mutex held
+     */
+    protected List<RedeliverCandidate> getSequenceCandidates(String key) {
+        List<RedeliverCandidate> sc = candidates.get(key);
+        if (null == sc) {
+            sc = suspendedCandidates.get(key);
+        }
+        return sc;
+    }
+
+    /**
+     * @param key the sequence identifier under consideration
+     * @return true if the sequence is currently suspended; false otherwise
+     * @pre called with mutex held
+     */
+    protected boolean isSequenceSuspended(String key) {
+        return suspendedCandidates.containsKey(key);
+    }
+    
+    public RetryStatus getRedeliveryStatus(DestinationSequence seq, long num) {
+        List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(seq);
+        if (null != sequenceCandidates) {
+            for (int i = 0; i < sequenceCandidates.size(); i++) {
+                RedeliverCandidate candidate = sequenceCandidates.get(i);
+                RMProperties properties = 
RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+                                                                              
false);
+                SequenceType st = properties.getSequence();
+                if (num == st.getMessageNumber()) {
+                    return candidate;
+                }
+            }
+        }
+        return null;
+    }
+
+
+    public Map<Long, RetryStatus> getRedeliveryStatuses(DestinationSequence 
seq) {
+        Map<Long, RetryStatus> cp = new HashMap<Long, RetryStatus>();
+        List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(seq);
+        if (null != sequenceCandidates) {
+            for (int i = 0; i < sequenceCandidates.size(); i++) {
+                RedeliverCandidate candidate = sequenceCandidates.get(i);
+                RMProperties properties = 
RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+                                                                              
false);
+                SequenceType st = properties.getSequence();
+                cp.put(st.getMessageNumber(), candidate);
+            }
+        }
+        return cp;
+    }
+
+    
+    public void start() {
+        // TODO Auto-generated method stub
+
+    }
+
+
+    public void stop(DestinationSequence seq) {
+        synchronized (this) {
+            List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(seq);
+            if (null != sequenceCandidates) {
+                for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+                    RedeliverCandidate candidate = sequenceCandidates.get(i);
+                    candidate.cancel();
+                }
+                LOG.log(Level.FINE, "Cancelled redeliveriss for sequence 
{0}.", 
+                        seq.getIdentifier().getValue());
+            }           
+        }
+    }
+
+
+    public void suspend(DestinationSequence seq) {
+        synchronized (this) {
+            String key = seq.getIdentifier().getValue();
+            List<RedeliverCandidate> sequenceCandidates = 
candidates.remove(key);
+            if (null != sequenceCandidates) {
+                for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+                    RedeliverCandidate candidate = sequenceCandidates.get(i);
+                    candidate.suspend();
+                }
+                suspendedCandidates.put(key, sequenceCandidates);
+                LOG.log(Level.FINE, "Suspended redeliveris for sequence {0}.", 
key);
+            }
+        }
+    }
+
+
+    public void resume(DestinationSequence seq) {
+        synchronized (this) {
+            String key = seq.getIdentifier().getValue();
+            List<RedeliverCandidate> sequenceCandidates = 
suspendedCandidates.remove(key);
+            if (null != sequenceCandidates) {
+                for (int i = 0; i < sequenceCandidates.size(); i++) {
+                    RedeliverCandidate candidate = sequenceCandidates.get(i);
+                    candidate.resume();
+                }
+                candidates.put(key, sequenceCandidates);
+                LOG.log(Level.FINE, "Resumed redeliveries for sequence {0}.", 
key);
+            }           
+        }
+    }
+    
+    /**
+     * Accepts a new resend candidate.
+     * 
+     * @param ctx the message context.
+     * @return ResendCandidate
+     */    
+    protected RedeliverCandidate cacheUndelivered(Message message) {
+        RMProperties rmps = RMContextUtils.retrieveRMProperties(message, 
false);
+        SequenceType st = rmps.getSequence();
+        Identifier sid = st.getIdentifier();
+        String key = sid.getValue();
+        
+        RedeliverCandidate candidate = null;
+        
+        synchronized (this) {
+            List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(key);
+            if (null == sequenceCandidates) {
+                sequenceCandidates = new ArrayList<RedeliverCandidate>();
+                candidates.put(key, sequenceCandidates);
+            }
+            candidate = getRedeliverCandidate(st, sequenceCandidates);
+            if (candidate == null) {
+                candidate = new RedeliverCandidate(message);
+                if (isSequenceSuspended(key)) {
+                    candidate.suspend();
+                }
+                sequenceCandidates.add(candidate);
+                undeliveredCount++;
+            }
+        }
+        LOG.fine("Cached undelivered message.");
+        return candidate;
+    }
+
+    private RedeliverCandidate getRedeliverCandidate(SequenceType st, 
List<RedeliverCandidate> rcs) {
+        // assume the size of candidates to be relatively small; otherwise we 
should use message numbers as keys
+        for (RedeliverCandidate rc : rcs) {
+            if (st.getMessageNumber() == rc.getNumber()) {
+                return rc;
+            }
+        }
+        return null;
+    }
+
+    protected void purgeDelivered(RedeliverCandidate candidate) {
+        RMProperties rmps = 
RMContextUtils.retrieveRMProperties(candidate.getMessage(), false);
+        SequenceType st = rmps.getSequence();
+        Identifier sid = st.getIdentifier();
+        String key = sid.getValue();
+
+        synchronized (this) {
+            List<RedeliverCandidate> sequenceCandidates = 
getSequenceCandidates(key);
+            if (null != sequenceCandidates) {
+                // TODO use a constant op instead of this inefficient linear op
+                sequenceCandidates.remove(candidate);
+                undeliveredCount--;
+            }
+            if (sequenceCandidates.isEmpty()) {
+                candidates.remove(sid.getValue());
+            }
+
+        }
+        LOG.fine("Purged delivered message.");
+
+    }
+
+    /**
+     * @return a map relating sequence ID to a lists of un-acknowledged 
messages
+     *         for that sequence
+     */
+    protected Map<String, List<RedeliverCandidate>> getUndelivered() {
+        return candidates;
+    }
+    
+    private static InterceptorChain getRedeliveryInterceptorChain(Message m, 
String phase) {
+        Exchange exchange = m.getExchange();
+        Endpoint ep = exchange.getEndpoint();
+        Bus bus = exchange.getBus();
+                
+        PhaseManager pm = bus.getExtension(PhaseManager.class);
+        SortedSet<Phase> phases = new TreeSet<Phase>(pm.getInPhases());
+        for (Iterator<Phase> it = phases.iterator(); it.hasNext();) {
+            Phase p = it.next();
+            if (phase.equals(p.getName())) {
+                break;
+            }
+            it.remove();
+        }
+        PhaseInterceptorChain chain = new PhaseInterceptorChain(phases);
+        List<Interceptor<? extends Message>> il = ep.getInInterceptors();
+        addInterceptors(chain, il);
+        il = ep.getService().getInInterceptors();
+        addInterceptors(chain, il);
+        il = ep.getBinding().getInInterceptors();
+        addInterceptors(chain, il);
+        il = bus.getInInterceptors();
+        addInterceptors(chain, il);
+        if (ep.getService().getDataBinding() instanceof InterceptorProvider) {
+            il = 
((InterceptorProvider)ep.getService().getDataBinding()).getInInterceptors();
+            addInterceptors(chain, il);
+        }
+        
+        return chain;
+    }
+
+    private static void addInterceptors(PhaseInterceptorChain chain, 
+                                        List<Interceptor<? extends Message>> 
il) {
+        for (Interceptor<? extends Message> i : il) {
+            final String iname = i.getClass().getSimpleName();
+            if ("OneWayProcessorInterceptor".equals(iname)
+                || "MAPAggregatorImpl".equals(iname)
+                || "RMInInterceptor".equals(iname)) {
+                continue;
+            }
+            chain.add(i);
+        }
+    }
+
+    //TODO refactor this class to unify its functionality with that of 
ResendCandidate
+    protected class RedeliverCandidate implements Runnable, RetryStatus {
+        private Message message;
+        private long number;
+        private Date next;
+        private TimerTask nextTask;
+        private int retries;
+        private int maxRetries;
+        private long nextInterval;
+        private long backoff;
+        private boolean pending;
+        private boolean suspended;
+
+        protected RedeliverCandidate(Message m) {
+            message = m;
+            if (message instanceof SoapMessage) {
+                // remove old message headers like WSS headers
+                ((SoapMessage)message).getHeaders().clear();
+            }
+            RetryPolicyType rmrp = null != manager.getDestinationPolicy() 
+                ? manager.getDestinationPolicy().getRetryPolicy() : null; 
+            long baseRedeliveryInterval = 
Long.parseLong(DEFAULT_BASE_REDELIVERY_INTERVAL);
+            if (null != rmrp && rmrp.getInterval() > 0L) {
+                baseRedeliveryInterval = rmrp.getInterval();
+            }
+            backoff = RedeliveryQueue.DEFAULT_EXPONENTIAL_BACKOFF;
+            next = new Date(System.currentTimeMillis() + 
baseRedeliveryInterval);
+            nextInterval = baseRedeliveryInterval * backoff;
+            maxRetries = null != rmrp ? rmrp.getMaxRetries() : 0;
+
+            RMProperties rmprops = 
RMContextUtils.retrieveRMProperties(message, false);
+            if (null != rmprops) {
+                number = rmprops.getSequence().getMessageNumber();
+            }
+
+            if (null != manager.getTimer() && maxRetries != 0) {
+                schedule();
+            }
+
+        }
+
+        /**
+         * Initiate redelivery asynchronsly.
+         * 
+         */
+        protected void initiate() {
+            pending = true;
+            Endpoint ep = message.getExchange().get(Endpoint.class);
+            Executor executor = ep.getExecutor();
+            if (null == executor) {
+                executor = ep.getService().getExecutor();
+                LOG.log(Level.FINE, "Using service executor {0}", 
executor.getClass().getName());
+            } else {
+                LOG.log(Level.FINE, "Using endpoint executor {0}", 
executor.getClass().getName());
+            }
+            
+            try {
+                executor.execute(this);
+            } catch (RejectedExecutionException ex) {
+                LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
+            }
+        }
+
+        public void run() {
+            try {
+                if (isPending()) {
+                    // redeliver
+                    redeliver();
+                    purgeDelivered(this);
+                    resolved();
+                }
+            } catch (Exception ex) {
+                LOG.log(Level.WARNING, "redelivery failed", ex);
+            } finally {
+                attempted();
+            }
+        }
+        
+        
+        private void redeliver() throws Exception {
+            LOG.log(Level.INFO, "Redelivering ... for " + (1 + retries));
+            String restartingPhase;
+            if (message.getContent(Exception.class) != null) {
+                message.removeContent(Exception.class);
+                message.getExchange().put(Exception.class, null);
+                
+                // clean-up message for redelivery
+                closeStreamResources();
+                message.removeContent(Node.class);
+            }
+
+
+            InputStream is = null;
+            CachedOutputStream cos = 
(CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            is = cos.getInputStream();
+            message.setContent(InputStream.class, is);
+            message = 
message.getExchange().getEndpoint().getBinding().createMessage(message);
+            restartingPhase = Phase.POST_STREAM;
+            // skip some interceptor chain phases for redelivery
+            InterceptorChain chain = getRedeliveryInterceptorChain(message, 
restartingPhase);
+            ListIterator<Interceptor<? extends Message>> iterator = 
chain.getIterator();
+            while (iterator.hasNext()) {
+                Interceptor<? extends Message> incept = iterator.next();
+                if 
(incept.getClass().getName().equals(RMCaptureInInterceptor.class.getName())) {
+                    chain.remove(incept);
+                }
+            }
+            message.getExchange().setInMessage(message);
+            message.setInterceptorChain(chain);
+            chain.doIntercept(message);
+            Exception ex = message.getContent(Exception.class);
+            if (null != ex) {
+                throw ex;
+            }
+        }
+        
+        public long getNumber() {
+            return number;
+        }
+
+        public Date getNext() {
+            return next;
+        }
+
+        public Date getPrevious() {
+            if (retries > 0) {
+                return new Date(next.getTime() - nextInterval / backoff);
+            }
+            return null;
+        }
+
+        public int getRetries() {
+            return retries;
+        }
+
+        public int getMaxRetries() {
+            return maxRetries;
+        }
+
+        public long getNextInterval() {
+            return nextInterval;
+        }
+
+        public long getBackoff() {
+            return backoff;
+        }
+
+        public boolean isPending() {
+            return pending;
+        }
+
+        public boolean isSuspended() {
+            return suspended;
+        }
+        
+        /**
+         * the message has been delivered to the application
+         */
+        protected synchronized void resolved() {
+            pending = false;
+            next = null;
+            if (null != nextTask) {
+                nextTask.cancel();
+            }
+        }
+
+        /**
+         * Cancel further redelivery (although not successfully delivered).
+         */
+        protected void cancel() {
+            if (null != nextTask) {
+                nextTask.cancel();
+                closeStreamResources();
+                releaseSavedMessage();
+            }
+        }
+        
+        protected void suspend() {
+            suspended = true;
+            pending = false;
+            //TODO release the message and later reload it upon resume
+            //cancel();
+            if (null != nextTask) {
+                nextTask.cancel();
+            }
+            
+        }
+        
+        protected void resume() {
+            suspended = false;
+            next = new Date(System.currentTimeMillis());
+            attempted();
+        }
+        
+        private void releaseSavedMessage() {
+            CachedOutputStream saved = 
(CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT);
+            if (saved != null) {
+                saved.releaseTempFileHold();
+                try {
+                    saved.close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+            // Any unclosed resources must be closed to release the temp files.
+            Closeable closeable = 
(Closeable)message.get(RMMessageConstants.ATTACHMENTS_CLOSEABLE);
+            if (closeable != null) {
+                try {
+                    closeable.close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }  
+        }
+        
+        /*
+         * Close all stream-like resources stored in the message
+         */
+        private void closeStreamResources() {
+            InputStream oin = message.getContent(InputStream.class);
+            if (oin != null) {
+                try {
+                    oin.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+                message.removeContent(InputStream.class);
+            }
+            XMLStreamReader oreader = 
message.getContent(XMLStreamReader.class);
+            if (oreader != null) {
+                try {
+                    oreader.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+                message.removeContent(XMLStreamReader.class);
+            }
+            List olist = message.getContent(List.class);
+            if (olist != null && olist.size() == 1) {
+                Object o = olist.get(0);
+                if (o instanceof XMLStreamReader) {
+                    oreader = (XMLStreamReader)o;
+                } else if (o instanceof StaxSource) {
+                    oreader = ((StaxSource)o).getXMLStreamReader();    
+                }
+                
+                if (oreader != null) {
+                    try {
+                        oreader.close();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+                message.removeContent(List.class);
+            }
+        }
+
+        /**
+         * @return associated message context
+         */
+        protected Message getMessage() {
+            return message;
+        }
+        
+        /**
+         * A resend has been attempted. Schedule the next attempt.
+         */
+        protected synchronized void attempted() {
+            pending = false;
+            retries++;
+            if (null != next && maxRetries != retries) {
+                next = new Date(next.getTime() + nextInterval);
+                nextInterval *= backoff;
+                schedule();
+            }
+        }
+
+        protected final synchronized void schedule() {
+            if (null == manager.getTimer()) {
+                return;
+            }
+            class RedeliverTask extends TimerTask {
+                RedeliverCandidate candidate;
+
+                RedeliverTask(RedeliverCandidate c) {
+                    candidate = c;
+                }
+
+                @Override
+                public void run() {
+                    if (!candidate.isPending()) {
+                        candidate.initiate();
+                    }
+                }
+            }
+            nextTask = new RedeliverTask(this);
+            try {
+                manager.getTimer().schedule(nextTask, next);
+            } catch (IllegalStateException ex) {
+                LOG.log(Level.WARNING, "SCHEDULE_RESEND_FAILED_MSG", ex);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd 
b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
index 99b60c4..e5425cd 100644
--- a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
+++ b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
@@ -214,6 +214,21 @@
                 </xs:documentation>
             </xs:annotation>      
         </xs:attribute> 
+        <xs:attribute name="interval" type="xs:unsignedLong" use="optional" 
default="3000">
+            <xs:annotation>
+                <xs:documentation>
+                    The time interval between each retry in milliseconds.
+                </xs:documentation>
+            </xs:annotation>      
+        </xs:attribute> 
+        <xs:attribute name="algorithm" type="xs:string" use="optional" 
default="ExponentialBackoff">
+            <xs:annotation>
+                <xs:documentation>
+                    The name of the interval adjustment algorithm.
+                    The default algorithm is ExponentialBackoff.
+                </xs:documentation>
+            </xs:annotation>      
+        </xs:attribute>
     </xs:complexType>
     
     <xs:complexType name="DeliveryAssuranceType">

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb 
b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb
index 5bfd418..a049c3f 100644
--- a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb
+++ b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-policy.xjb
@@ -53,4 +53,13 @@
         </jaxb:property>
     </jaxb:bindings>
 
+    <jaxb:bindings schemaLocation="wsrm-manager-types.xsd"
+         
node="//xs:complexType[@name='RetryPolicyType']//xs:attribute[@name='interval']">
+        <jaxb:property>
+            <jaxb:baseType>
+                <jaxb:javaType name="java.lang.Long"/>
+            </jaxb:baseType>
+        </jaxb:property>
+    </jaxb:bindings>
+
 </jaxb:bindings>

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java 
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
index a189932..546f5f3 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
@@ -567,7 +567,7 @@ public class DestinationSequenceTest extends Assert {
         
         DestinationSequence seq = new DestinationSequence(id, ref, destination,
             ProtocolVariation.RM10WSA200408);
-        destination.removeSequence(seq);
+        destination.terminateSequence(seq);
         EasyMock.expectLastCall();
         
         Message message = setUpMessage("1");
@@ -601,7 +601,7 @@ public class DestinationSequenceTest extends Assert {
         long lastAppMessage = System.currentTimeMillis() - 30000L;
         EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
         
EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage);
-        destination.removeSequence(seq);
+        destination.terminateSequence(seq);
         EasyMock.expectLastCall();
         control.replay();
         st.run();

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java 
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
index 7600cd5..acd179c 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
@@ -292,6 +292,10 @@ public class RMEndpointTest extends Assert {
         
EasyMock.expect(manager.getRetransmissionQueue()).andReturn(queue).anyTimes();
         queue.stop(ss);
         EasyMock.expectLastCall().anyTimes();
+        RedeliveryQueue dqueue = control.createMock(RedeliveryQueue.class);
+        
EasyMock.expect(manager.getRedeliveryQueue()).andReturn(dqueue).anyTimes();
+        dqueue.stop(ds);
+        EasyMock.expectLastCall().anyTimes();
         control.replay();
         rme.getDestination().addSequence(ds, false);
         rme.getSource().addSequence(ss, false);

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java 
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
index 2b5c47c..0796a46 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
@@ -40,6 +40,8 @@ import org.apache.cxf.ws.addressing.JAXWSAConstants;
 import org.apache.cxf.ws.addressing.MAPAggregator;
 import org.apache.cxf.ws.addressing.VersionTransformer.Names200408;
 import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
+import org.apache.cxf.ws.rm.manager.RetryPolicyType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
@@ -367,6 +369,29 @@ public class RMInInterceptorTest extends Assert {
         } catch (Exception e) {
             fail("unexpected exception thrown from handleFault: " + e);
         }
+
+        control.reset();
+        org.apache.cxf.transport.Destination td = 
control.createMock(org.apache.cxf.transport.Destination.class);
+        EasyMock.expect(exchange.getDestination()).andReturn(td).anyTimes();
+        EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
+        EasyMock.expect(message.get(RMMessageConstants.RM_PROTOCOL_VARIATION))
+            .andReturn(ProtocolVariation.RM10WSA200408).anyTimes();
+        EasyMock.expect(message.getContent(Exception.class)).andReturn(new 
SequenceFault("no sequence")).anyTimes();
+        DestinationPolicyType dp = new DestinationPolicyType();
+        RetryPolicyType rp = new RetryPolicyType();
+        dp.setRetryPolicy(rp);
+        
EasyMock.expect(manager.getDestinationPolicy()).andReturn(dp).anyTimes();
+        RedeliveryQueue rq = control.createMock(RedeliveryQueue.class);
+        EasyMock.expect(manager.getRedeliveryQueue()).andReturn(rq).anyTimes();
+        rq.addUndelivered(message);
+        EasyMock.expectLastCall().andThrow(new RuntimeException("shouldn't be 
queued")).anyTimes();
+        control.replay();
+
+        try {
+            interceptor.handleFault(message);
+        } catch (Exception e) {
+            fail("unexpected exception thrown from handleFault: " + e);
+        }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java 
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
index 37fb6ac..5231bd4 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
@@ -519,7 +519,7 @@ public class RMManagerTest extends Assert {
         InterfaceInfo ii = control.createMock(InterfaceInfo.class);
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);          
         Conduit conduit = control.createMock(Conduit.class);        
-        setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null, 
null);
+        setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null);
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();
@@ -528,7 +528,7 @@ public class RMManagerTest extends Assert {
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
         SourceSequence ss = control.createMock(SourceSequence.class);
         DestinationSequence ds = control.createMock(DestinationSequence.class);
-        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null, null);
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null);
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();
@@ -536,16 +536,10 @@ public class RMManagerTest extends Assert {
         control.reset();
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);  
         RMMessage m = control.createMock(RMMessage.class);
-        Capture<Message> mc = Capture.newInstance();
-        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m, mc);        
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m);        
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();
-        
-        Message msg = mc.getValue();
-        assertNotNull(msg);
-        assertNotNull(msg.getExchange());
-        assertSame(msg, msg.getExchange().getOutMessage());
     }
  
     @Test
@@ -595,9 +589,11 @@ public class RMManagerTest extends Assert {
                                       DestinationSequence ds, RMMessage m,
                                       Capture<Message> mc) throws IOException {
         RMStore store = control.createMock(RMStore.class);
-        RetransmissionQueue queue = 
control.createMock(RetransmissionQueue.class);
+        RetransmissionQueue oqueue = 
control.createMock(RetransmissionQueue.class);
+        RedeliveryQueue iqueue = control.createMock(RedeliveryQueue.class);
         manager.setStore(store);
-        manager.setRetransmissionQueue(queue);
+        manager.setRetransmissionQueue(oqueue);
+        manager.setRedeliveryQueue(iqueue);
         
         Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
         if (null != ss) {
@@ -652,10 +648,11 @@ public class RMManagerTest extends Assert {
             return;
         }
 
-        queue.addUnacknowledged(EasyMock.capture(mc));
-        
+        oqueue.addUnacknowledged(EasyMock.capture(mc));
         EasyMock.expectLastCall();
-        queue.start();
+        oqueue.start();
+        EasyMock.expectLastCall();
+        iqueue.start();
         EasyMock.expectLastCall();
     }
 
@@ -678,12 +675,14 @@ public class RMManagerTest extends Assert {
     void setUpRecoverReliableEndpoint(Endpoint endpoint,
                                       Conduit conduit, 
                                       SourceSequence ss, 
-                                      DestinationSequence ds, RMMessage m, 
Capture<Message> mc) 
+                                      DestinationSequence ds, RMMessage m) 
                                           throws IOException  {                
         RMStore store = control.createMock(RMStore.class);
-        RetransmissionQueue queue = 
control.createMock(RetransmissionQueue.class);
+        RetransmissionQueue oqueue = 
control.createMock(RetransmissionQueue.class);
+        RedeliveryQueue iqueue = control.createMock(RedeliveryQueue.class);
         manager.setStore(store);
-        manager.setRetransmissionQueue(queue);
+        manager.setRetransmissionQueue(oqueue);
+        manager.setRedeliveryQueue(iqueue);
         
         Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
         if (null != ss) {
@@ -747,13 +746,11 @@ public class RMManagerTest extends Assert {
         is.close();
         EasyMock.expect(m.getContent()).andReturn(cos).anyTimes();
 
-        if (mc != null) {
-            queue.addUnacknowledged(EasyMock.capture(mc));
-        } else {
-            queue.addUnacknowledged(EasyMock.isA(Message.class));
-        }
+        oqueue.addUnacknowledged(EasyMock.isA(Message.class));
+        EasyMock.expectLastCall();
+        oqueue.start();
         EasyMock.expectLastCall();
-        queue.start();
+        iqueue.start();
         EasyMock.expectLastCall();
     }
     

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
 
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
index a5f9723..87d8e6e 100644
--- 
a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
+++ 
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreTestBase.java
@@ -68,6 +68,8 @@ public abstract class RMTxStoreTestBase extends Assert {
     private static SequenceAcknowledgement ack1;
     private static SequenceAcknowledgement ack2;
     
+    private static final long TIME = System.currentTimeMillis();
+    
     protected IMocksControl control;
     
     public static void setUpOnce() {
@@ -867,6 +869,7 @@ public abstract class RMTxStoreTestBase extends Assert {
         EasyMock.expect(msg.getTo()).andReturn(to).anyTimes();
 
         EasyMock.expect(msg.getContentType()).andReturn("text/xml").anyTimes();
+        EasyMock.expect(msg.getCreatedTime()).andReturn(TIME);
         byte[] value = ("Message " + mn.longValue()).getBytes();
         ByteArrayInputStream bais = new ByteArrayInputStream(value);
         CachedOutputStream cos = new CachedOutputStream();
@@ -936,6 +939,7 @@ public abstract class RMTxStoreTestBase extends Assert {
             } else {
                 assertNull(msg.getTo());
             }
+            assertEquals(TIME, msg.getCreatedTime());
             try {
                 InputStream actual = msg.getContent().getInputStream();
                 assertEquals(new String("Message " + mn), 
IOUtils.readStringFromStream(actual));

http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java
----------------------------------------------------------------------
diff --git 
a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java 
b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java
new file mode 100644
index 0000000..91ada49
--- /dev/null
+++ 
b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/RedeliveryTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Logger;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.ws.rm.RMConfiguration;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.manager.RetryPolicyType;
+import org.apache.cxf.ws.rm.persistence.jdbc.RMTxStore;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the redelivery of the message upon a delivery error.
+ */
+public class RedeliveryTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = allocatePort(Server.class);
+    private static final Logger LOG = LogUtils.getLogger(RedeliveryTest.class);
+
+    private static GreeterRecorderImpl serverGreeter;
+    private static Bus serverBus;
+    private Greeter greeter;
+
+
+    public static class Server extends AbstractBusTestServerBase {
+        String port;
+        String pfx;
+        Endpoint ep;
+
+        public Server(String args[]) {
+            port = args[0];
+            pfx = args[1];
+        }
+
+        protected void run() {
+            SpringBusFactory bf = new SpringBusFactory();
+            // use a at-most-once server with sync ack processing
+            System.setProperty("db.name", pfx + "-server");
+            serverBus = 
bf.createBus("/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml");
+            System.clearProperty("db.name");
+            BusFactory.setDefaultBus(serverBus);
+            RMManager manager = serverBus.getExtension(RMManager.class);
+            RMConfiguration cfg = manager.getConfiguration();
+            cfg.setAcknowledgementInterval(0L);
+
+            RetryPolicyType rp = new RetryPolicyType();
+            rp.setMaxRetries(-1);
+            
serverBus.getExtension(RMManager.class).getDestinationPolicy().setRetryPolicy(rp);
+            serverGreeter = new GreeterRecorderImpl();
+            String address = "http://localhost:"; + port + 
"/SoapContext/GreeterPort";
+
+            // publish this robust oneway endpoint
+            ep = Endpoint.create(serverGreeter);
+            ep.publish(address);
+            LOG.info("Published greeter endpoint.");
+            BusFactory.setDefaultBus(null);
+            BusFactory.setThreadDefaultBus(null);
+        }
+        public void tearDown() {
+            ep.stop();
+            ep = null;
+        }
+
+        public static void main(String[] args) {
+            try {
+                Server s = new Server(args);
+                s.start();
+            } catch (Exception ex) {
+                ex.printStackTrace();
+                System.exit(-1);
+            } finally {
+                System.out.println("done!");
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void startServers() throws Exception {
+        RMTxStore.deleteDatabaseFiles("redlv-server", true);
+        assertTrue("server did not launch correctly",
+                   launchServer(Server.class, null, new String[]{PORT, 
"redlv"}, true));
+    }
+
+    @AfterClass
+    public static void cleanUpDerby() throws Exception {
+        RMTxStore.deleteDatabaseFiles("redlv-server", true);
+    }
+
+    @Test
+    public void testAutomaticRedeliveryAfterError() throws Exception {
+        LOG.fine("Creating greeter client");
+        SpringBusFactory bf = new SpringBusFactory();
+        bus = bf.createBus("/org/apache/cxf/systest/ws/rm/rminterceptors.xml");
+        // set the client retry interval much shorter than the slow processing 
delay
+        RMManager manager = bus.getExtension(RMManager.class);
+        RMConfiguration cfg = manager.getConfiguration();
+        cfg.setBaseRetransmissionInterval(3000L);
+
+        BusFactory.setDefaultBus(bus);
+        GreeterService gs = new GreeterService();
+        greeter = gs.getGreeterPort();
+        updateAddressPort(greeter, PORT);
+
+        assertNull("last greeted by none", serverGreeter.getValue());
+
+        LOG.fine("Invoking greeter for one");
+        greeter.greetMeOneWay("one");
+        LOG.fine("Wait for 4 secs ...");
+        Thread.sleep(4000);
+
+        assertEquals("last greeted by one", "one", serverGreeter.getValue());
+        assertTrue("retransmission running", 
manager.getRetransmissionQueue().isEmpty());
+
+        LOG.fine("Activating the error trigger and invoking greeter for two");
+        serverGreeter.setThrowAlways(true);
+        greeter.greetMeOneWay("two");
+        LOG.fine("Wait for 4 secs ...");
+        Thread.sleep(4000);
+
+        RMManager serverManager = serverBus.getExtension(RMManager.class);
+
+        assertEquals("last greeted by one", "one", serverGreeter.getValue());
+        assertTrue("retransmission running", 
manager.getRetransmissionQueue().isEmpty());
+        assertFalse("redelivery not running", 
serverManager.getRedeliveryQueue().isEmpty());
+
+        LOG.fine("Deactivating the error trigger and wait for 9 secs ...");
+        serverGreeter.setThrowAlways(false);
+        Thread.sleep(9000);
+
+        assertEquals("last greeted by two", "two", serverGreeter.getValue());
+        assertTrue("redelivery running", 
serverManager.getRedeliveryQueue().isEmpty());
+    }
+
+    private static class GreeterRecorderImpl extends GreeterImpl {
+        private String value;
+        private boolean ex;
+
+        public void greetMeOneWay(String arg0) {
+            if (ex) {
+                throw new RuntimeException("intentional exception");
+            }
+            super.greetMeOneWay(arg0);
+            value = arg0;
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public void setThrowAlways(boolean b) {
+            super.setThrowAlways(b);
+            ex = b;
+        }
+    }
+}

Reply via email to