Author: mreutegg
Date: Mon Apr 18 09:26:06 2016
New Revision: 1739717

URL: http://svn.apache.org/viewvc?rev=1739717&view=rev
Log:
OAK-3488: LastRevRecovery for self async?

Added:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
    
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
 Mon Apr 18 09:26:06 2016
@@ -477,6 +477,7 @@ public class ClusterNodeInfo {
         int clusterNodeId = 0;
         int maxId = 0;
         ClusterNodeState state = ClusterNodeState.NONE;
+        RecoverLockState lockState = RecoverLockState.NONE;
         Long prevLeaseEnd = null;
         boolean newEntry = false;
 
@@ -553,6 +554,7 @@ public class ClusterNodeInfo {
                 clusterNodeId = id;
                 state = ClusterNodeState.fromString((String) doc.get(STATE));
                 prevLeaseEnd = leaseEnd;
+                lockState = RecoverLockState.fromString((String) 
doc.get(REV_RECOVERY_LOCK));
             }
         }
 
@@ -574,7 +576,7 @@ public class ClusterNodeInfo {
         // Do not expire entries and stick on the earlier state, and leaseEnd 
so,
         // that _lastRev recovery if needed is done.
         return new ClusterNodeInfo(clusterNodeId, store, machineId, 
instanceId, state,
-                RecoverLockState.NONE, prevLeaseEnd, newEntry);
+                lockState, prevLeaseEnd, newEntry);
     }
 
     private static boolean waitForLeaseExpiry(DocumentStore store, 
ClusterNodeInfoDocument cdoc, long leaseEnd, String machineId,

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
 Mon Apr 18 09:26:06 2016
@@ -18,6 +18,8 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.List;
 
+import javax.annotation.CheckForNull;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState;
 import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
@@ -71,10 +73,35 @@ public class ClusterNodeInfoDocument ext
         return getState() == ClusterNodeState.ACTIVE;
     }
 
+    /**
+     * @return {@code true} if the recovery lock state is
+     *          {@link RecoverLockState#ACQUIRED ACQUIRED}.
+     */
     public boolean isBeingRecovered(){
         return getRecoveryState() == RecoverLockState.ACQUIRED;
     }
 
+    /**
+     * Returns {@code true} if the cluster node represented by this document
+     * is currently being recovered by the given {@code clusterId}.
+     *
+     * @param clusterId the id of a cluster node.
+     * @return {@code true} if being recovered by the given id; {@code false}
+     *          otherwise.
+     */
+    public boolean isBeingRecoveredBy(int clusterId) {
+        return Long.valueOf(clusterId).equals(getRecoveryBy());
+    }
+
+    /**
+     * @return the id of the cluster node performing recovery or {@code null} 
if
+     *          currently not set.
+     */
+    @CheckForNull
+    public Long getRecoveryBy() {
+        return (Long) get(ClusterNodeInfo.REV_RECOVERY_BY);
+    }
+
     public int getClusterId() {
         return Integer.parseInt(getId());
     }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 Mon Apr 18 09:26:06 2016
@@ -980,9 +980,9 @@ public class DocumentMK {
         public MissingLastRevSeeker createMissingLastRevSeeker() {
             final DocumentStore store = getDocumentStore();
             if (store instanceof MongoDocumentStore) {
-                return new MongoMissingLastRevSeeker((MongoDocumentStore) 
store);
+                return new MongoMissingLastRevSeeker((MongoDocumentStore) 
store, getClock());
             } else {
-                return new MissingLastRevSeeker(store);
+                return new MissingLastRevSeeker(store, getClock());
             }
         }
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 Mon Apr 18 09:26:06 2016
@@ -26,6 +26,7 @@ import static com.google.common.collect.
 import static com.google.common.collect.Lists.reverse;
 import static java.util.Collections.singletonList;
 import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static 
org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
@@ -149,6 +150,13 @@ public final class DocumentNodeStore
             
Boolean.parseBoolean(System.getProperty("oak.fairBackgroundOperationLock", 
"true"));
 
     /**
+     * The timeout in milliseconds to wait for the recovery performed by
+     * another cluster node.
+     */
+    private long recoveryWaitTimeoutMS =
+            Long.getLong("oak.recoveryWaitTimeoutMS", 60000);
+
+    /**
      * The document store (might be used by multiple node stores).
      */
     protected final DocumentStore store;
@@ -559,9 +567,26 @@ public final class DocumentNodeStore
 
     /**
      * Recover _lastRev recovery if needed.
+     *
+     * @throws DocumentStoreException if recovery did not finish within
+     *          {@link #recoveryWaitTimeoutMS}.
      */
-    private void checkLastRevRecovery() {
-        lastRevRecoveryAgent.recover(clusterId);
+    private void checkLastRevRecovery() throws DocumentStoreException {
+        long timeout = clock.getTime() + recoveryWaitTimeoutMS;
+        int numRecovered = lastRevRecoveryAgent.recover(clusterId, timeout);
+        if (numRecovered == -1) {
+            ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, 
String.valueOf(clusterId));
+            String otherId = "n/a";
+            if (doc != null) {
+                otherId = 
String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY));
+            }
+            String msg = "This cluster node (" + clusterId + ") requires " +
+                    "_lastRev recovery which is currently performed by " +
+                    "another cluster node (" + otherId + "). Recovery is " +
+                    "still ongoing after " + recoveryWaitTimeoutMS + " ms. " +
+                    "Failing startup of this DocumentNodeStore now!";
+            throw new DocumentStoreException(msg);
+        }
     }
 
     public void dispose() {

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 Mon Apr 18 09:26:06 2016
@@ -738,7 +738,7 @@ public class DocumentNodeStoreService {
 
     private void registerLastRevRecoveryJob(final DocumentNodeStore nodeStore) 
{
         long leaseTime = 
toLong(context.getProperties().get(PROP_REV_RECOVERY_INTERVAL),
-                ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS);
+                ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS);
         Runnable recoverJob = new Runnable() {
             @Override
             public void run() {

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
 Mon Apr 18 09:26:06 2016
@@ -19,6 +19,7 @@
 
 package org.apache.jackrabbit.oak.plugins.document;
 
+import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Maps.filterKeys;
 import static java.util.Collections.singletonList;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
@@ -26,20 +27,20 @@ import static org.apache.jackrabbit.oak.
 import static 
org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.util.MapFactory;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,16 +62,37 @@ public class LastRevRecoveryAgent {
     }
 
     public LastRevRecoveryAgent(DocumentNodeStore nodeStore) {
-        this(nodeStore, new 
MissingLastRevSeeker(nodeStore.getDocumentStore()));
+        this(nodeStore, new MissingLastRevSeeker(
+                nodeStore.getDocumentStore(), nodeStore.getClock()));
     }
 
     /**
-     * Recover the correct _lastRev updates for potentially missing candidate 
nodes.
+     * Recover the correct _lastRev updates for potentially missing candidate
+     * nodes. If another cluster node is already performing the recovery for 
the
+     * given {@code clusterId}, this method will {@code waitUntil} the given
+     * time in milliseconds for the recovery to finish.
+     *
+     * This method will return:
+     * <ul>
+     *     <li>{@code -1} when another cluster node is busy performing recovery
+     *     for the given {@code clusterId} and the {@code waitUntil} time is
+     *     reached.</li>
+     *     <li>{@code 0} when no recovery was needed or this thread waited
+     *     for another cluster node to finish the recovery within the given
+     *     {@code waitUntil} time.</li>
+     *     <li>A positive value for the number of recovered documents when
+     *     recovery was performed by this thread / cluster node.</li>
+     * </ul>
      *
      * @param clusterId the cluster id for which the _lastRev are to be 
recovered
-     * @return the number of restored nodes
+     * @param waitUntil wait until this time is milliseconds for recovery of 
the
+     *                  given {@code clusterId} if another cluster node is
+     *                  already performing the recovery.
+     * @return the number of restored nodes or {@code -1} if a timeout occurred
+     *          while waiting for an ongoing recovery by another cluster node.
      */
-    public int recover(int clusterId) {
+    public int recover(int clusterId, long waitUntil)
+            throws DocumentStoreException {
         ClusterNodeInfoDocument nodeInfo = 
missingLastRevUtil.getClusterNodeInfo(clusterId);
 
         //TODO Currently leaseTime remains same per cluster node. If this
@@ -80,15 +102,15 @@ public class LastRevRecoveryAgent {
 
         if (nodeInfo != null) {
             // Check if _lastRev recovery needed for this cluster node
-            // state is Active && recoveryLock not held by someone
-            if (isRecoveryNeeded(nodeInfo)) {
+            // state is Active && current time past leaseEnd
+            if (missingLastRevUtil.isRecoveryNeeded(nodeInfo)) {
                 long leaseEnd = nodeInfo.getLeaseEndTime();
 
                 // retrieve the root document's _lastRev
                 NodeDocument root = missingLastRevUtil.getRoot();
                 Revision lastRev = root.getLastRev().get(clusterId);
 
-                // start time is the _lastRev timestamp of this cluster node
+                // start time is the _lastRev timestamp of the cluster node
                 final long startTime;
                 final String reason;
                 //lastRev can be null if other cluster node did not got
@@ -103,10 +125,7 @@ public class LastRevRecoveryAgent {
                             leaseTime, asyncDelay);
                 }
 
-                log.info("Recovering candidates modified after: [{}] for 
clusterId [{}] [{}]",
-                        Utils.timestampToString(startTime), clusterId, reason);
-
-                return recoverCandidates(clusterId, startTime);
+                return recoverCandidates(nodeInfo, startTime, waitUntil, 
reason);
             }
         }
 
@@ -115,6 +134,18 @@ public class LastRevRecoveryAgent {
     }
 
     /**
+     * Same as {@link #recover(int, long)}, but does not wait for ongoing
+     * recovery.
+     *
+     * @param clusterId the cluster id for which the _lastRev are to be 
recovered
+     * @return the number of restored nodes or {@code -1} if there is an 
ongoing
+     *          recovery by another cluster node.
+     */
+    public int recover(int clusterId) {
+        return recover(clusterId, 0);
+    }
+
+    /**
      * Recover the correct _lastRev updates for the given candidate nodes.
      *
      * @param suspects the potential suspects
@@ -277,40 +308,72 @@ public class LastRevRecoveryAgent {
      * Retrieves possible candidates which have been modified after the given
      * {@code startTime} and recovers the missing updates.
      *
-     * @param clusterId the cluster id
+     * @param nodeInfo the info of the cluster node to recover.
      * @param startTime the start time
-     * @return the number of restored nodes
+     * @param waitUntil wait at most until this time for an ongoing recovery
+     *                  done by another cluster node.
+     * @param info a string with additional information how recovery is run.
+     * @return the number of restored nodes or {@code -1} if recovery is still
+     *      ongoing by another process even when {@code waitUntil} time was
+     *      reached.
      */
-    private int recoverCandidates(final int clusterId, final long startTime) {
+    private int recoverCandidates(final ClusterNodeInfoDocument nodeInfo,
+                                  final long startTime,
+                                  final long waitUntil,
+                                  final String info) {
+        ClusterNodeInfoDocument infoDoc = nodeInfo;
+        int clusterId = infoDoc.getClusterId();
+        for (;;) {
+            if (missingLastRevUtil.acquireRecoveryLock(
+                    clusterId, nodeStore.getClusterId())) {
+                break;
+            }
 
-        int myClusterId = nodeStore.getClusterId();
-        boolean lockAcquired = 
missingLastRevUtil.acquireRecoveryLock(clusterId, myClusterId);
+            Clock clock = nodeStore.getClock();
+            long remaining = waitUntil - clock.getTime();
+            if (remaining < 0) {
+                // no need to wait for lock release, waitUntil already reached
+                return -1;
+            }
 
-        //TODO What if recovery is being performed for current clusterNode by 
some other node
-        //should we halt the startup
-        if(!lockAcquired){
-            log.info("Last revision recovery already being performed by some 
other node. " +
-                    "Would not attempt recovery");
-            return 0;
+            log.info("Last revision recovery already being performed by " +
+                    "cluster node {}. Waiting at most until {} for recovery " +
+                    "to finish ({} seconds remaining).",
+                    infoDoc.getRecoveryBy(), 
Utils.timestampToString(waitUntil),
+                    remaining / 1000);
+            // check once every five seconds
+            long time = Math.min(waitUntil, clock.getTime() + 5000);
+            try {
+                clock.waitUntil(time);
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                String msg = "Interrupted while waiting for _lastRev recovery 
to finish.";
+                throw new DocumentStoreException(msg, e);
+            }
+            infoDoc = missingLastRevUtil.getClusterNodeInfo(clusterId);
+            if (!missingLastRevUtil.isRecoveryNeeded(infoDoc)) {
+                // meanwhile another process finished recovery
+                return 0;
+            }
         }
 
-        Iterable<NodeDocument> suspects = 
missingLastRevUtil.getCandidates(startTime);
-
-        log.info("Performing Last Revision Recovery for clusterNodeId {}", 
clusterId);
-
+        // if we get here, the recovery lock was acquired successfully
+        boolean success = false;
         try {
-            return recover(suspects.iterator(), clusterId);
-        } finally {
-            Utils.closeIfCloseable(suspects);
+            log.info("Recovering candidates modified after: [{}] for clusterId 
[{}] [{}]",
+                    Utils.timestampToString(startTime), clusterId, info);
 
-            // Relinquish the lock on the recovery for the cluster on the
-            // clusterInfo
-            // TODO: in case recover throws a RuntimeException (or Error..) 
then
-            // the recovery might have failed, yet the instance is marked
-            // as 'recovered' (by setting the state to NONE).
-            // is this really fine here? or should we not retry - or at least
-            // log the throwable?
-            missingLastRevUtil.releaseRecoveryLock(clusterId);
+            Iterable<NodeDocument> suspects = 
missingLastRevUtil.getCandidates(startTime);
+            try {
+                log.info("Performing Last Revision Recovery for clusterNodeId 
{}", clusterId);
+                int num = recover(suspects.iterator(), clusterId);
+                success = true;
+                return num;
+            } finally {
+                Utils.closeIfCloseable(suspects);
+            }
+        } finally {
+            missingLastRevUtil.releaseRecoveryLock(clusterId, success);
 
             nodeStore.signalClusterStateChange();
         }
@@ -347,60 +410,50 @@ public class LastRevRecoveryAgent {
 
     /**
      * Determines if any of the cluster node failed to renew its lease and
-     * did not properly shutdown. If any such cluster node is found then are 
potential
-     * candidates for last rev recovery
+     * did not properly shutdown. If any such cluster node is found then are
+     * potential candidates for last rev recovery. This method also returns
+     * true when there is a cluster node with an ongoing recovery.
      *
-     * @return true if last rev recovery needs to be performed for any of the 
cluster nodes
+     * @return true if last rev recovery needs to be performed for any of the
+     *          cluster nodes
      */
     public boolean isRecoveryNeeded(){
-        return 
missingLastRevUtil.isRecoveryNeeded(nodeStore.getClock().getTime());
+        return missingLastRevUtil.isRecoveryNeeded();
     }
 
     public void performRecoveryIfNeeded() {
         if (isRecoveryNeeded()) {
-            List<Integer> clusterIds = getRecoveryCandidateNodes();
-            log.info("ClusterNodeId [{}] starting Last Revision Recovery for 
clusterNodeId(s) {}", nodeStore.getClusterId(),
-                    clusterIds);
+            Iterable<Integer> clusterIds = getRecoveryCandidateNodes();
+            log.info("ClusterNodeId [{}] starting Last Revision Recovery for 
clusterNodeId(s) {}",
+                    nodeStore.getClusterId(), clusterIds);
             for (int clusterId : clusterIds) {
-                recover(clusterId);
+                if (recover(clusterId) == -1) {
+                    log.info("Last Revision Recovery for cluster node {} " +
+                            "ongoing by other cluster node.", clusterId);
+                }
             }
         }
     }
 
     /**
-     * Gets the _lastRev recovery candidate cluster nodes.
+     * Gets the _lastRev recovery candidate cluster nodes. This also includes
+     * cluster nodes that are currently being recovered.
      *
-     * @return the recovery candidate nodes
+     * @return the recovery candidate nodes.
      */
-    public List<Integer> getRecoveryCandidateNodes() {
-
-        Iterable<ClusterNodeInfoDocument> clusters = 
missingLastRevUtil.getAllClusters();
-        List<Integer> candidateClusterNodes = Lists.newArrayList();
-        List<String> beingRecoveredRightNow = Lists.newArrayList();
-
-        for (ClusterNodeInfoDocument nodeInfo : clusters) {
-            String id = nodeInfo.getId();
-            if (nodeInfo.isBeingRecovered()) {
-                Long recoveredBy = (Long) 
nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_BY);
-                beingRecoveredRightNow.add(nodeInfo == null ? id : 
String.format("%s (by %d)", id, recoveredBy));
-            } else if (isRecoveryNeeded(nodeInfo)) {
-                candidateClusterNodes.add(Integer.valueOf(id));
+    public Iterable<Integer> getRecoveryCandidateNodes() {
+        return Iterables.transform(filter(missingLastRevUtil.getAllClusters(),
+                new Predicate<ClusterNodeInfoDocument>() {
+            @Override
+            public boolean apply(ClusterNodeInfoDocument input) {
+                return missingLastRevUtil.isRecoveryNeeded(input);
             }
-        }
-
-        if (!beingRecoveredRightNow.isEmpty()) {
-            log.info("Active cluster nodes already in the process of being 
recovered: " + beingRecoveredRightNow);
-        }
-
-        return candidateClusterNodes;
-    }
-
-    /**
-     * Check if _lastRev recovery needed for this cluster node state is Active
-     * && currentTime past the leaseEnd time && recoveryLock not held by 
someone
-     */
-    private boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument 
nodeInfo) {
-        return nodeInfo.isActive() && nodeStore.getClock().getTime() > 
nodeInfo.getLeaseEndTime() && !nodeInfo.isBeingRecovered();
+        }), new Function<ClusterNodeInfoDocument, Integer>() {
+            @Override
+            public Integer apply(ClusterNodeInfoDocument input) {
+                return input.getClusterId();
+            }
+        });
     }
 
     private static class ClusterPredicate implements Predicate<Revision> {

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
 Mon Apr 18 09:26:06 2016
@@ -19,14 +19,25 @@
 
 package org.apache.jackrabbit.oak.plugins.document;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
 import 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
+import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
+import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.LEASE_END_KEY;
+import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_BY;
+import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_LOCK;
+import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.STATE;
+import static 
org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs;
 import static 
org.apache.jackrabbit.oak.plugins.document.util.Utils.getSelectedDocuments;
@@ -42,8 +53,19 @@ public class MissingLastRevSeeker {
 
     private final DocumentStore store;
 
-    public MissingLastRevSeeker(DocumentStore store) {
+    protected final Clock clock;
+
+    private final Predicate<ClusterNodeInfoDocument> isRecoveryNeeded =
+            new Predicate<ClusterNodeInfoDocument>() {
+        @Override
+        public boolean apply(ClusterNodeInfoDocument nodeInfo) {
+            return isRecoveryNeeded(nodeInfo);
+        }
+    };
+
+    public MissingLastRevSeeker(DocumentStore store, Clock clock) {
         this.store = store;
+        this.clock = clock;
     }
 
     /**
@@ -51,6 +73,7 @@ public class MissingLastRevSeeker {
      *
      * @return the clusters
      */
+    @Nonnull
     public Iterable<ClusterNodeInfoDocument> getAllClusters() {
         return ClusterNodeInfoDocument.all(store);
     }
@@ -61,9 +84,10 @@ public class MissingLastRevSeeker {
      * @param clusterId the cluster id
      * @return the cluster node info
      */
+    @CheckForNull
     public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
         // Fetch all documents.
-        return store.find(Collection.CLUSTER_NODES, String.valueOf(clusterId));
+        return store.find(CLUSTER_NODES, String.valueOf(clusterId));
     }
 
     /**
@@ -73,6 +97,7 @@ public class MissingLastRevSeeker {
      * @param startTime the start time.
      * @return the candidates
      */
+    @Nonnull
     public Iterable<NodeDocument> getCandidates(final long startTime) {
         // Fetch all documents where lastmod >= startTime
         Iterable<NodeDocument> nodes = getSelectedDocuments(store,
@@ -87,40 +112,59 @@ public class MissingLastRevSeeker {
     }
 
     /**
-     * Acquire a recovery lock for the given cluster node info document
+     * Acquire a recovery lock for the given cluster node info document. This
+     * method may break a lock when it determines the cluster node holding the
+     * recovery lock is no more active or its lease expired.
      * 
      * @param clusterId
      *            id of the cluster that is going to be recovered
      * @param recoveredBy
-     *            id of cluster doing the recovery ({@code 0} when unknown)
+     *            id of cluster doing the recovery
      * @return whether the lock has been acquired
      */
     public boolean acquireRecoveryLock(int clusterId, int recoveredBy) {
-        try {
-            UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
-            update.notEquals(ClusterNodeInfo.REV_RECOVERY_LOCK, 
RecoverLockState.ACQUIRED.name());
-            update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, 
RecoverLockState.ACQUIRED.name());
-            if (recoveredBy != 0) {
-                update.set(ClusterNodeInfo.REV_RECOVERY_BY, recoveredBy);
-            }
-            ClusterNodeInfoDocument old = 
store.findAndUpdate(Collection.CLUSTER_NODES, update);
-            return old != null;
-        } catch (RuntimeException ex) {
-            LOG.error("Failed to acquire the recovery lock for clusterNodeId " 
+ clusterId, ex);
-            throw (ex);
+        ClusterNodeInfoDocument doc = getClusterNodeInfo(clusterId);
+        if (doc == null) {
+            // this is unexpected...
+            return false;
         }
+        if (!isRecoveryNeeded(doc)) {
+            return false;
+        }
+        boolean acquired = tryAcquireRecoveryLock(doc, recoveredBy);
+        if (acquired) {
+            return true;
+        }
+        // either we already own the lock or were able to break the lock
+        return doc.isBeingRecoveredBy(recoveredBy)
+                || tryBreakRecoveryLock(doc, recoveredBy);
     }
 
-    public void releaseRecoveryLock(int clusterId) {
+    /**
+     * Releases the recovery lock on the given {@code clusterId}. If
+     * {@code success} is {@code true}, the state of the cluster node entry
+     * is reset, otherwise it is left as is. That is, for a cluster node which
+     * requires recovery and the recovery process failed, the state will still
+     * be active, when this release method is called with {@code success} set
+     * to {@code false}.
+     *
+     * @param clusterId the id of the cluster node that was recovered.
+     * @param success whether recovery was successful.
+     */
+    public void releaseRecoveryLock(int clusterId, boolean success) {
         try {
             UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
-            update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, 
RecoverLockState.NONE.name());
-            update.set(ClusterNodeInfo.REV_RECOVERY_BY, null);
-            update.set(ClusterNodeInfo.STATE, null);
-            ClusterNodeInfoDocument old = 
store.findAndUpdate(Collection.CLUSTER_NODES, update);
+            update.set(REV_RECOVERY_LOCK, RecoverLockState.NONE.name());
+            update.set(REV_RECOVERY_BY, null);
+            if (success) {
+                update.set(STATE, null);
+            }
+            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, 
update);
             if (old == null) {
                 throw new RuntimeException("ClusterNodeInfo document for " + 
clusterId + " missing.");
             }
+            LOG.info("Released recovery lock for cluster id {} (recovery 
successful: {})",
+                    clusterId, success);
         } catch (RuntimeException ex) {
             LOG.error("Failed to release the recovery lock for clusterNodeId " 
+ clusterId, ex);
             throw (ex);
@@ -131,16 +175,94 @@ public class MissingLastRevSeeker {
         return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH));
     }
 
-    public boolean isRecoveryNeeded(long currentTime) {
-        for(ClusterNodeInfoDocument nodeInfo : getAllClusters()){
-            // Check if _lastRev recovery needed for this cluster node
-            // state is Active && currentTime past the leaseEnd time && 
recoveryLock not held by someone
-            if (nodeInfo.isActive()
-                    && currentTime > nodeInfo.getLeaseEndTime()
-                    && !nodeInfo.isBeingRecovered()) {
-                return true;
+    public boolean isRecoveryNeeded() {
+        return Iterables.any(getAllClusters(), isRecoveryNeeded);
+    }
+
+    /**
+     * Check if _lastRev recovery needed for this cluster node
+     * state is Active && currentTime past the leaseEnd time
+     */
+    public boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo) 
{
+        return nodeInfo.isActive() && clock.getTime() > 
nodeInfo.getLeaseEndTime();
+    }
+
+    //-------------------------< internal 
>-------------------------------------
+
+    /**
+     * Acquire a recovery lock for the given cluster node info document
+     *
+     * @param info
+     *            info document of the cluster that is going to be recovered
+     * @param recoveredBy
+     *            id of cluster doing the recovery ({@code 0} when unknown)
+     * @return whether the lock has been acquired
+     */
+    private boolean tryAcquireRecoveryLock(ClusterNodeInfoDocument info,
+                                           int recoveredBy) {
+        int clusterId = info.getClusterId();
+        try {
+            UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
+            update.equals(STATE, ACTIVE.name());
+            update.equals(LEASE_END_KEY, info.getLeaseEndTime());
+            update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
+            update.set(REV_RECOVERY_LOCK, ACQUIRED.name());
+            if (recoveredBy != 0) {
+                update.set(REV_RECOVERY_BY, recoveredBy);
+            }
+            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, 
update);
+            if (old != null) {
+                LOG.info("Acquired recovery lock for cluster id {}", 
clusterId);
+            }
+            return old != null;
+        } catch (RuntimeException ex) {
+            LOG.error("Failed to acquire the recovery lock for clusterNodeId " 
+ clusterId, ex);
+            throw (ex);
+        }
+    }
+
+    /**
+     * Checks if the recovering cluster node is inactive and then tries to
+     * break the recovery lock.
+     *
+     * @param doc the cluster node info document of the cluster node to acquire
+     *            the recovery lock for.
+     * @param recoveredBy id of cluster doing the recovery.
+     * @return whether the lock has been acquired.
+     */
+    private boolean tryBreakRecoveryLock(ClusterNodeInfoDocument doc,
+                                         int recoveredBy) {
+        Long recoveryBy = doc.getRecoveryBy();
+        if (recoveryBy == null) {
+            // cannot determine current lock owner
+            return false;
+        }
+        ClusterNodeInfoDocument recovering = 
getClusterNodeInfo(recoveryBy.intValue());
+        if (recovering == null) {
+            // cannot determine current lock owner
+            return false;
+        }
+        if (recovering.isActive() && recovering.getLeaseEndTime() > 
clock.getTime()) {
+            // still active, cannot break lock
+            return false;
+        }
+        // try to break the lock
+        try {
+            UpdateOp update = new 
UpdateOp(Integer.toString(doc.getClusterId()), false);
+            update.equals(STATE, ACTIVE.name());
+            update.equals(REV_RECOVERY_LOCK, ACQUIRED.name());
+            update.equals(REV_RECOVERY_BY, recoveryBy);
+            update.set(REV_RECOVERY_BY, recoveredBy);
+            ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES, 
update);
+            if (old != null) {
+                LOG.info("Acquired (broke) recovery lock for cluster id {}. " +
+                        "Previous lock owner: {}", doc.getClusterId(), 
recoveryBy);
             }
+            return old != null;
+        } catch (RuntimeException ex) {
+            LOG.error("Failed to break the recovery lock for clusterNodeId " +
+                    doc.getClusterId(), ex);
+            throw (ex);
         }
-        return false;
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
 Mon Apr 18 09:26:06 2016
@@ -19,9 +19,10 @@
 
 package org.apache.jackrabbit.oak.plugins.document.mongo;
 
+import javax.annotation.Nonnull;
+
 import static com.google.common.collect.Iterables.transform;
 import static com.mongodb.QueryBuilder.start;
-import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
 
 import com.google.common.base.Function;
 import com.mongodb.BasicDBObject;
@@ -36,6 +37,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+import org.apache.jackrabbit.oak.stats.Clock;
 
 /**
  * Mongo specific version of MissingLastRevSeeker which uses mongo queries
@@ -46,12 +48,13 @@ import org.apache.jackrabbit.oak.plugins
 public class MongoMissingLastRevSeeker extends MissingLastRevSeeker {
     private final MongoDocumentStore store;
 
-    public MongoMissingLastRevSeeker(MongoDocumentStore store) {
-        super(store);
+    public MongoMissingLastRevSeeker(MongoDocumentStore store, Clock clock) {
+        super(store, clock);
         this.store = store;
     }
 
     @Override
+    @Nonnull
     public CloseableIterable<NodeDocument> getCandidates(final long startTime) 
{
         DBObject query =
                 start(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals(
@@ -72,11 +75,10 @@ public class MongoMissingLastRevSeeker e
     }
 
     @Override
-    public boolean isRecoveryNeeded(long currentTime) {
+    public boolean isRecoveryNeeded() {
         QueryBuilder query =
                 
start(ClusterNodeInfo.STATE).is(ClusterNodeInfo.ClusterNodeState.ACTIVE.name())
-                .put(ClusterNodeInfo.LEASE_END_KEY).lessThan(currentTime)
-                
.put(ClusterNodeInfo.REV_RECOVERY_LOCK).notEquals(RecoverLockState.ACQUIRED.name());
+                .put(ClusterNodeInfo.LEASE_END_KEY).lessThan(clock.getTime());
 
         return getClusterNodeCollection().findOne(query.get()) != null;
     }

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
 Mon Apr 18 09:26:06 2016
@@ -32,6 +32,7 @@ import junitx.util.PrivateAccessor;
 
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -175,10 +176,12 @@ public class DocumentDiscoveryLiteServic
                 logger.info("Going to waitBeforeUnlocking");
                 waitBeforeUnlocking.acquire();
                 logger.info("Done with waitBeforeUnlocking");
-                missingLastRevUtil.releaseRecoveryLock((Integer) 
invocation.getArguments()[0]);
+                missingLastRevUtil.releaseRecoveryLock(
+                        (Integer) invocation.getArguments()[0],
+                        (Boolean) invocation.getArguments()[1]);
                 return null;
             }
-        
}).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt());
+        
}).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt(), 
anyBoolean());
 
         // let go (or tschaedere loh)
         waitBeforeLocking.release();

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
 Mon Apr 18 09:26:06 2016
@@ -22,6 +22,7 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -151,11 +152,11 @@ public class LastRevRecoveryAgentTest {
 
         assertTrue(ds1.getLastRevRecoveryAgent().isRecoveryNeeded());
 
-        List<Integer> cids = 
ds1.getLastRevRecoveryAgent().getRecoveryCandidateNodes();
-        assertEquals(1, cids.size());
-        assertEquals(c2Id, cids.get(0).intValue());
+        Iterable<Integer> cids = 
ds1.getLastRevRecoveryAgent().getRecoveryCandidateNodes();
+        assertEquals(1, Iterables.size(cids));
+        assertEquals(c2Id, Iterables.get(cids, 0).intValue());
 
-        ds1.getLastRevRecoveryAgent().recover(cids.get(0));
+        ds1.getLastRevRecoveryAgent().recover(Iterables.get(cids, 0));
 
         assertEquals(zlastRev2, getDocument(ds1, 
"/x/y").getLastRev().get(c2Id));
         assertEquals(zlastRev2, getDocument(ds1, "/x").getLastRev().get(c2Id));

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
 Mon Apr 18 09:26:06 2016
@@ -19,9 +19,9 @@
 
 package org.apache.jackrabbit.oak.plugins.document;
 
-import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -40,8 +40,10 @@ import org.junit.Test;
 import static com.google.common.collect.Lists.newArrayList;
 import static 
org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class LastRevRecoveryTest {
     @Rule
@@ -59,6 +61,7 @@ public class LastRevRecoveryTest {
         clock = new Clock.Virtual();
         clock.waitUntil(System.currentTimeMillis());
         Revision.setClock(clock);
+        ClusterNodeInfo.setClock(clock);
         // disable lease check because we fiddle with the virtual clock
         final boolean leaseCheck = false;
         sharedStore = new MemoryDocumentStore();
@@ -85,6 +88,7 @@ public class LastRevRecoveryTest {
     public void tearDown() {
         ds1.dispose();
         ds2.dispose();
+        ClusterNodeInfo.resetClockToDefault();
         Revision.resetClockToDefault();
     }
 
@@ -158,12 +162,130 @@ public class LastRevRecoveryTest {
 
         // run recovery on ds2
         LastRevRecoveryAgent agent = new LastRevRecoveryAgent(ds2);
-        List<Integer> clusterIds = agent.getRecoveryCandidateNodes();
-        assertTrue(clusterIds.contains(c1Id));
+        Iterable<Integer> clusterIds = agent.getRecoveryCandidateNodes();
+        assertTrue(Iterables.contains(clusterIds, c1Id));
         assertEquals("must not recover any documents",
                 0, agent.recover(c1Id));
     }
 
+    // OAK-3488
+    @Test
+    public void recoveryWithTimeout() throws Exception {
+        String clusterId = String.valueOf(c1Id);
+        ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, 
clusterId);
+
+        NodeBuilder builder = ds1.getRoot().builder();
+        builder.child("x").child("y").child("z");
+        merge(ds1, builder);
+        ds1.dispose();
+
+        // reset clusterNodes entry to simulate a crash
+        sharedStore.remove(CLUSTER_NODES, clusterId);
+        sharedStore.create(CLUSTER_NODES, 
newArrayList(updateOpFromDocument(doc)));
+
+        // 'wait' until lease expires
+        clock.waitUntil(doc.getLeaseEndTime() + 1);
+
+        // simulate ongoing recovery by cluster node 2
+        MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore, 
clock);
+        seeker.acquireRecoveryLock(c1Id, c2Id);
+
+        // run recovery from ds1
+        LastRevRecoveryAgent a1 = new LastRevRecoveryAgent(ds1);
+        // use current time -> do not wait for recovery of other agent
+        assertEquals(-1, a1.recover(c1Id, clock.getTime()));
+
+        seeker.releaseRecoveryLock(c1Id, true);
+
+        assertEquals(0, a1.recover(c1Id, clock.getTime() + 1000));
+    }
+
+    // OAK-3488
+    @Test
+    public void failStartupOnRecoveryTimeout() throws Exception {
+        String clusterId = String.valueOf(c1Id);
+        ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, 
clusterId);
+
+        NodeBuilder builder = ds1.getRoot().builder();
+        builder.child("x").child("y").child("z");
+        merge(ds1, builder);
+        ds1.dispose();
+
+        // reset clusterNodes entry to simulate a crash
+        sharedStore.remove(CLUSTER_NODES, clusterId);
+        sharedStore.create(CLUSTER_NODES, 
newArrayList(updateOpFromDocument(doc)));
+
+        // 'wait' until lease expires
+        clock.waitUntil(doc.getLeaseEndTime() + 1);
+        // make sure ds2 lease is still fine
+        ds2.getClusterInfo().renewLease();
+
+        // simulate ongoing recovery by cluster node 2
+        MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore, 
clock);
+        assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id));
+
+        // attempt to restart ds1 while lock is acquired
+        try {
+            ds1 = new DocumentMK.Builder()
+                    .clock(clock)
+                    .setDocumentStore(sharedStore)
+                    .setClusterId(c1Id)
+                    .getNodeStore();
+            fail("DocumentStoreException expected");
+        } catch (DocumentStoreException e) {
+            // expected
+        }
+        seeker.releaseRecoveryLock(c1Id, true);
+    }
+
+    // OAK-3488
+    @Test
+    public void breakRecoveryLockWithExpiredLease() throws Exception {
+        String clusterId = String.valueOf(c1Id);
+        ClusterNodeInfoDocument info1 = sharedStore.find(CLUSTER_NODES, 
clusterId);
+
+        NodeBuilder builder = ds1.getRoot().builder();
+        builder.child("x").child("y").child("z");
+        merge(ds1, builder);
+        ds1.dispose();
+
+        // reset clusterNodes entry to simulate a crash of ds1
+        sharedStore.remove(CLUSTER_NODES, clusterId);
+        sharedStore.create(CLUSTER_NODES, 
newArrayList(updateOpFromDocument(info1)));
+
+        // 'wait' until lease expires
+        clock.waitUntil(info1.getLeaseEndTime() + 1);
+        // make sure ds2 lease is still fine
+        ds2.getClusterInfo().renewLease();
+
+        // start of recovery by ds2
+        MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore, 
clock);
+        assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id));
+        // simulate crash of ds2
+        ClusterNodeInfoDocument info2 = sharedStore.find(CLUSTER_NODES, 
String.valueOf(c2Id));
+        ds2.dispose();
+        // reset clusterNodes entry
+        sharedStore.remove(CLUSTER_NODES, String.valueOf(c2Id));
+        sharedStore.create(CLUSTER_NODES, 
newArrayList(updateOpFromDocument(info2)));
+        // 'wait' until ds2's lease expires
+        clock.waitUntil(info2.getLeaseEndTime() + 1);
+
+        info1 = sharedStore.find(CLUSTER_NODES, clusterId);
+        assertTrue(seeker.isRecoveryNeeded(info1));
+        assertTrue(info1.isBeingRecovered());
+
+        // restart ds1
+        ds1 = builderProvider.newBuilder()
+                .clock(clock)
+                .setLeaseCheck(false)
+                .setAsyncDelay(0)
+                .setDocumentStore(sharedStore)
+                .setClusterId(1)
+                .getNodeStore();
+        info1 = sharedStore.find(CLUSTER_NODES, clusterId);
+        assertFalse(seeker.isRecoveryNeeded(info1));
+        assertFalse(info1.isBeingRecovered());
+    }
 
     private NodeDocument getDocument(DocumentNodeStore nodeStore, String path) 
{
         return nodeStore.getDocumentStore().find(Collection.NODES, 
Utils.getIdFromPath(path));
@@ -186,11 +308,13 @@ public class LastRevRecoveryTest {
                 }
             } else {
                 if (obj instanceof Boolean) {
-                    op.set(key, ((Boolean) obj).booleanValue());
+                    op.set(key, (Boolean) obj);
                 } else if (obj instanceof Number) {
                     op.set(key, ((Number) obj).longValue());
-                } else {
+                } else if (obj != null) {
                     op.set(key, obj.toString());
+                } else {
+                    op.set(key, null);
                 }
             }
         }

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
 Mon Apr 18 09:26:06 2016
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.apache.jackrabbit.oak.stats.Clock;
@@ -172,10 +173,10 @@ public class LastRevSingleNodeRecoveryTe
         clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 
1000);
 
         LastRevRecoveryAgent recoveryAgent = 
mk.getNodeStore().getLastRevRecoveryAgent();
-        List<Integer> cids = recoveryAgent.getRecoveryCandidateNodes();
+        Iterable<Integer> cids = recoveryAgent.getRecoveryCandidateNodes();
 
-        assertEquals(1, cids.size());
-        assertEquals(Integer.valueOf(1), cids.get(0));
+        assertEquals(1, Iterables.size(cids));
+        assertEquals(Integer.valueOf(1), Iterables.get(cids, 0));
     }
     
     private void setupScenario() throws InterruptedException {

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java?rev=1739717&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
 Mon Apr 18 09:26:06 2016
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static 
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MissingLastRevSeekerTest {
+
+    private Clock clock;
+    private DocumentStore store;
+    private MissingLastRevSeeker seeker;
+
+    @Before
+    public void before() throws Exception {
+        clock = new Clock.Virtual();
+        clock.waitUntil(System.currentTimeMillis());
+        Revision.setClock(clock);
+        ClusterNodeInfo.setClock(clock);
+        store = new MemoryDocumentStore();
+        seeker = new MissingLastRevSeeker(store, clock);
+    }
+
+    @After
+    public void after() throws Exception {
+        ClusterNodeInfo.resetClockToDefault();
+        Revision.resetClockToDefault();
+    }
+
+    @Test
+    public void acquireRecoveryLockOnActiveClusterNode() {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+
+        assertFalse(seeker.acquireRecoveryLock(1, 2));
+    }
+
+    @Test
+    public void acquireRecoveryLockOnInactiveClusterNode() {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+        nodeInfo1.dispose();
+
+        assertFalse(seeker.acquireRecoveryLock(1, 2));
+    }
+
+    @Test
+    public void acquireRecoveryLockOnExpiredLease() throws Exception {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+        // expire the lease
+        clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+    }
+
+    @Test
+    public void acquireRecoveryLockOnAlreadyLocked() throws Exception {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+        // expire the lease
+        clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+        ClusterNodeInfo nodeInfo2 = ClusterNodeInfo.getInstance(store, 2);
+        nodeInfo2.renewLease();
+
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+        assertFalse(seeker.acquireRecoveryLock(1, 3));
+    }
+
+    @Test
+    public void acquireRecoveryLockAgain() throws Exception {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+        // expire the lease
+        clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+    }
+
+    @Test
+    public void releaseRecoveryLockSuccessTrue() throws Exception {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+        // expire the lease
+        clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+        assertTrue(getClusterNodeInfo(1).isBeingRecovered());
+        assertTrue(getClusterNodeInfo(1).isActive());
+        seeker.releaseRecoveryLock(1, true);
+        assertFalse(getClusterNodeInfo(1).isBeingRecovered());
+        assertFalse(getClusterNodeInfo(1).isActive());
+        // recovery not needed anymore
+        assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+        assertFalse(seeker.acquireRecoveryLock(1, 2));
+    }
+
+    @Test
+    public void releaseRecoveryLockSuccessFalse() throws Exception {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+        // expire the lease
+        clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+        assertTrue(getClusterNodeInfo(1).isBeingRecovered());
+        assertTrue(getClusterNodeInfo(1).isActive());
+        seeker.releaseRecoveryLock(1, false);
+        assertFalse(getClusterNodeInfo(1).isBeingRecovered());
+        assertTrue(getClusterNodeInfo(1).isActive());
+        // recovery still needed
+        assertTrue(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+    }
+
+    @Test
+    public void isRecoveryNeeded() throws Exception {
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+        // expire the lease
+        clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+        ClusterNodeInfo nodeInfo2 = ClusterNodeInfo.getInstance(store, 2);
+        nodeInfo2.renewLease();
+
+        assertTrue(seeker.isRecoveryNeeded());
+        assertTrue(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+        assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(2)));
+
+        assertTrue(seeker.acquireRecoveryLock(1, 2));
+        seeker.releaseRecoveryLock(1, true);
+
+        assertFalse(seeker.isRecoveryNeeded());
+        assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+        assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(2)));
+    }
+
+    @Test
+    public void getAllClusterNodes() {
+        assertEquals(0, Iterables.size(seeker.getAllClusters()));
+
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+
+        assertEquals(1, Iterables.size(seeker.getAllClusters()));
+
+        ClusterNodeInfo nodeInfo2 = ClusterNodeInfo.getInstance(store, 2);
+        nodeInfo2.renewLease();
+
+        assertEquals(2, Iterables.size(seeker.getAllClusters()));
+    }
+
+    @Test
+    public void getClusterNodeInfo() {
+        assertNull(getClusterNodeInfo(1));
+
+        ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+        nodeInfo1.renewLease();
+
+        assertNotNull(getClusterNodeInfo(1));
+    }
+
+    private ClusterNodeInfoDocument getClusterNodeInfo(int clusterId) {
+        return seeker.getClusterNodeInfo(clusterId);
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
 Mon Apr 18 09:26:06 2016
@@ -18,29 +18,53 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.List;
 
+import com.mongodb.DB;
+
 import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
 import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo;
 import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
 import org.junit.Test;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class AcquireRecoveryLockTest extends AbstractMongoConnectionTest {
 
+    private Clock clock = new Clock.Virtual();
+
+    @Before
+    public void before() throws Exception {
+        clock.waitUntil(System.currentTimeMillis());
+    }
+
+    @Override
+    protected DocumentMK.Builder newBuilder(DB db) throws Exception {
+        // disable lease check because test waits until lease times out
+        return super.newBuilder(db).setLeaseCheck(false);
+    }
+
+    @Override
+    protected Clock getTestClock() throws InterruptedException {
+        return clock;
+    }
+
     // OAK-4131
     @Test
     public void recoveryBy() throws Exception {
         MongoDocumentStore store = new MongoDocumentStore(
                 mongoConnection.getDB(), new DocumentMK.Builder());
-        MongoMissingLastRevSeeker seeker = new 
MongoMissingLastRevSeeker(store);
+        MongoMissingLastRevSeeker seeker = new 
MongoMissingLastRevSeeker(store, getTestClock());
         List<ClusterNodeInfoDocument> infoDocs = 
newArrayList(seeker.getAllClusters());
         assertEquals(1, infoDocs.size());
         int clusterId = infoDocs.get(0).getClusterId();
         int otherClusterId = clusterId + 1;
-        seeker.acquireRecoveryLock(clusterId, otherClusterId);
+        getTestClock().waitUntil(getTestClock().getTime() + 
ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS + 1000);
+        assertTrue(seeker.acquireRecoveryLock(clusterId, otherClusterId));
         ClusterNodeInfoDocument doc = seeker.getClusterNodeInfo(clusterId);
         Object recoveryBy = doc.get(ClusterNodeInfo.REV_RECOVERY_BY);
         assertNotNull(recoveryBy);

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
 Mon Apr 18 09:26:06 2016
@@ -33,6 +33,14 @@ import org.apache.jackrabbit.oak.query.Q
  */
 public class OakMongoNSRepositoryStub extends OakRepositoryStub {
 
+    static {
+        MongoConnection c = MongoUtils.getConnection();
+        if (c != null) {
+            MongoUtils.dropCollections(c.getDB());
+            c.close();
+        }
+    }
+
     private final MongoConnection connection;
     private final Repository repository;
 

Modified: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
 Mon Apr 18 09:26:06 2016
@@ -50,7 +50,8 @@ class RecoveryCommand implements Command
             }
             MongoDocumentStore docStore = (MongoDocumentStore) 
dns.getDocumentStore();
             LastRevRecoveryAgent agent = new LastRevRecoveryAgent(dns);
-            MongoMissingLastRevSeeker seeker = new 
MongoMissingLastRevSeeker(docStore);
+            MongoMissingLastRevSeeker seeker = new MongoMissingLastRevSeeker(
+                    docStore, dns.getClock());
             CloseableIterable<NodeDocument> docs = seeker.getCandidates(0);
             closer.register(docs);
             boolean dryRun = Arrays.asList(args).contains("dryRun");


Reply via email to