Author: mreutegg
Date: Mon Apr 18 09:26:06 2016
New Revision: 1739717
URL: http://svn.apache.org/viewvc?rev=1739717&view=rev
Log:
OAK-3488: LastRevRecovery for self async?
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
Mon Apr 18 09:26:06 2016
@@ -477,6 +477,7 @@ public class ClusterNodeInfo {
int clusterNodeId = 0;
int maxId = 0;
ClusterNodeState state = ClusterNodeState.NONE;
+ RecoverLockState lockState = RecoverLockState.NONE;
Long prevLeaseEnd = null;
boolean newEntry = false;
@@ -553,6 +554,7 @@ public class ClusterNodeInfo {
clusterNodeId = id;
state = ClusterNodeState.fromString((String) doc.get(STATE));
prevLeaseEnd = leaseEnd;
+ lockState = RecoverLockState.fromString((String)
doc.get(REV_RECOVERY_LOCK));
}
}
@@ -574,7 +576,7 @@ public class ClusterNodeInfo {
// Do not expire entries and stick on the earlier state, and leaseEnd
so,
// that _lastRev recovery if needed is done.
return new ClusterNodeInfo(clusterNodeId, store, machineId,
instanceId, state,
- RecoverLockState.NONE, prevLeaseEnd, newEntry);
+ lockState, prevLeaseEnd, newEntry);
}
private static boolean waitForLeaseExpiry(DocumentStore store,
ClusterNodeInfoDocument cdoc, long leaseEnd, String machineId,
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
Mon Apr 18 09:26:06 2016
@@ -18,6 +18,8 @@ package org.apache.jackrabbit.oak.plugin
import java.util.List;
+import javax.annotation.CheckForNull;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState;
import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
@@ -71,10 +73,35 @@ public class ClusterNodeInfoDocument ext
return getState() == ClusterNodeState.ACTIVE;
}
+ /**
+ * @return {@code true} if the recovery lock state is
+ * {@link RecoverLockState#ACQUIRED ACQUIRED}.
+ */
public boolean isBeingRecovered(){
return getRecoveryState() == RecoverLockState.ACQUIRED;
}
+ /**
+ * Returns {@code true} if the cluster node represented by this document
+ * is currently being recovered by the given {@code clusterId}.
+ *
+ * @param clusterId the id of a cluster node.
+ * @return {@code true} if being recovered by the given id; {@code false}
+ * otherwise.
+ */
+ public boolean isBeingRecoveredBy(int clusterId) {
+ return Long.valueOf(clusterId).equals(getRecoveryBy());
+ }
+
+ /**
+ * @return the id of the cluster node performing recovery or {@code null}
if
+ * currently not set.
+ */
+ @CheckForNull
+ public Long getRecoveryBy() {
+ return (Long) get(ClusterNodeInfo.REV_RECOVERY_BY);
+ }
+
public int getClusterId() {
return Integer.parseInt(getId());
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Mon Apr 18 09:26:06 2016
@@ -980,9 +980,9 @@ public class DocumentMK {
public MissingLastRevSeeker createMissingLastRevSeeker() {
final DocumentStore store = getDocumentStore();
if (store instanceof MongoDocumentStore) {
- return new MongoMissingLastRevSeeker((MongoDocumentStore)
store);
+ return new MongoMissingLastRevSeeker((MongoDocumentStore)
store, getClock());
} else {
- return new MissingLastRevSeeker(store);
+ return new MissingLastRevSeeker(store, getClock());
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Mon Apr 18 09:26:06 2016
@@ -26,6 +26,7 @@ import static com.google.common.collect.
import static com.google.common.collect.Lists.reverse;
import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static
org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
@@ -149,6 +150,13 @@ public final class DocumentNodeStore
Boolean.parseBoolean(System.getProperty("oak.fairBackgroundOperationLock",
"true"));
/**
+ * The timeout in milliseconds to wait for the recovery performed by
+ * another cluster node.
+ */
+ private long recoveryWaitTimeoutMS =
+ Long.getLong("oak.recoveryWaitTimeoutMS", 60000);
+
+ /**
* The document store (might be used by multiple node stores).
*/
protected final DocumentStore store;
@@ -559,9 +567,26 @@ public final class DocumentNodeStore
/**
* Recover _lastRev recovery if needed.
+ *
+ * @throws DocumentStoreException if recovery did not finish within
+ * {@link #recoveryWaitTimeoutMS}.
*/
- private void checkLastRevRecovery() {
- lastRevRecoveryAgent.recover(clusterId);
+ private void checkLastRevRecovery() throws DocumentStoreException {
+ long timeout = clock.getTime() + recoveryWaitTimeoutMS;
+ int numRecovered = lastRevRecoveryAgent.recover(clusterId, timeout);
+ if (numRecovered == -1) {
+ ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES,
String.valueOf(clusterId));
+ String otherId = "n/a";
+ if (doc != null) {
+ otherId =
String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY));
+ }
+ String msg = "This cluster node (" + clusterId + ") requires " +
+ "_lastRev recovery which is currently performed by " +
+ "another cluster node (" + otherId + "). Recovery is " +
+ "still ongoing after " + recoveryWaitTimeoutMS + " ms. " +
+ "Failing startup of this DocumentNodeStore now!";
+ throw new DocumentStoreException(msg);
+ }
}
public void dispose() {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
Mon Apr 18 09:26:06 2016
@@ -738,7 +738,7 @@ public class DocumentNodeStoreService {
private void registerLastRevRecoveryJob(final DocumentNodeStore nodeStore)
{
long leaseTime =
toLong(context.getProperties().get(PROP_REV_RECOVERY_INTERVAL),
- ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS);
+ ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS);
Runnable recoverJob = new Runnable() {
@Override
public void run() {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
Mon Apr 18 09:26:06 2016
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.document;
+import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Maps.filterKeys;
import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
@@ -26,20 +27,20 @@ import static org.apache.jackrabbit.oak.
import static
org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
+import com.google.common.base.Function;
import com.google.common.base.Predicate;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.util.MapFactory;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,16 +62,37 @@ public class LastRevRecoveryAgent {
}
public LastRevRecoveryAgent(DocumentNodeStore nodeStore) {
- this(nodeStore, new
MissingLastRevSeeker(nodeStore.getDocumentStore()));
+ this(nodeStore, new MissingLastRevSeeker(
+ nodeStore.getDocumentStore(), nodeStore.getClock()));
}
/**
- * Recover the correct _lastRev updates for potentially missing candidate
nodes.
+ * Recover the correct _lastRev updates for potentially missing candidate
+ * nodes. If another cluster node is already performing the recovery for
the
+ * given {@code clusterId}, this method will {@code waitUntil} the given
+ * time in milliseconds for the recovery to finish.
+ *
+ * This method will return:
+ * <ul>
+ * <li>{@code -1} when another cluster node is busy performing recovery
+ * for the given {@code clusterId} and the {@code waitUntil} time is
+ * reached.</li>
+ * <li>{@code 0} when no recovery was needed or this thread waited
+ * for another cluster node to finish the recovery within the given
+ * {@code waitUntil} time.</li>
+ * <li>A positive value for the number of recovered documents when
+ * recovery was performed by this thread / cluster node.</li>
+ * </ul>
*
* @param clusterId the cluster id for which the _lastRev are to be
recovered
- * @return the number of restored nodes
+ * @param waitUntil wait until this time is milliseconds for recovery of
the
+ * given {@code clusterId} if another cluster node is
+ * already performing the recovery.
+ * @return the number of restored nodes or {@code -1} if a timeout occurred
+ * while waiting for an ongoing recovery by another cluster node.
*/
- public int recover(int clusterId) {
+ public int recover(int clusterId, long waitUntil)
+ throws DocumentStoreException {
ClusterNodeInfoDocument nodeInfo =
missingLastRevUtil.getClusterNodeInfo(clusterId);
//TODO Currently leaseTime remains same per cluster node. If this
@@ -80,15 +102,15 @@ public class LastRevRecoveryAgent {
if (nodeInfo != null) {
// Check if _lastRev recovery needed for this cluster node
- // state is Active && recoveryLock not held by someone
- if (isRecoveryNeeded(nodeInfo)) {
+ // state is Active && current time past leaseEnd
+ if (missingLastRevUtil.isRecoveryNeeded(nodeInfo)) {
long leaseEnd = nodeInfo.getLeaseEndTime();
// retrieve the root document's _lastRev
NodeDocument root = missingLastRevUtil.getRoot();
Revision lastRev = root.getLastRev().get(clusterId);
- // start time is the _lastRev timestamp of this cluster node
+ // start time is the _lastRev timestamp of the cluster node
final long startTime;
final String reason;
//lastRev can be null if other cluster node did not got
@@ -103,10 +125,7 @@ public class LastRevRecoveryAgent {
leaseTime, asyncDelay);
}
- log.info("Recovering candidates modified after: [{}] for
clusterId [{}] [{}]",
- Utils.timestampToString(startTime), clusterId, reason);
-
- return recoverCandidates(clusterId, startTime);
+ return recoverCandidates(nodeInfo, startTime, waitUntil,
reason);
}
}
@@ -115,6 +134,18 @@ public class LastRevRecoveryAgent {
}
/**
+ * Same as {@link #recover(int, long)}, but does not wait for ongoing
+ * recovery.
+ *
+ * @param clusterId the cluster id for which the _lastRev are to be
recovered
+ * @return the number of restored nodes or {@code -1} if there is an
ongoing
+ * recovery by another cluster node.
+ */
+ public int recover(int clusterId) {
+ return recover(clusterId, 0);
+ }
+
+ /**
* Recover the correct _lastRev updates for the given candidate nodes.
*
* @param suspects the potential suspects
@@ -277,40 +308,72 @@ public class LastRevRecoveryAgent {
* Retrieves possible candidates which have been modified after the given
* {@code startTime} and recovers the missing updates.
*
- * @param clusterId the cluster id
+ * @param nodeInfo the info of the cluster node to recover.
* @param startTime the start time
- * @return the number of restored nodes
+ * @param waitUntil wait at most until this time for an ongoing recovery
+ * done by another cluster node.
+ * @param info a string with additional information how recovery is run.
+ * @return the number of restored nodes or {@code -1} if recovery is still
+ * ongoing by another process even when {@code waitUntil} time was
+ * reached.
*/
- private int recoverCandidates(final int clusterId, final long startTime) {
+ private int recoverCandidates(final ClusterNodeInfoDocument nodeInfo,
+ final long startTime,
+ final long waitUntil,
+ final String info) {
+ ClusterNodeInfoDocument infoDoc = nodeInfo;
+ int clusterId = infoDoc.getClusterId();
+ for (;;) {
+ if (missingLastRevUtil.acquireRecoveryLock(
+ clusterId, nodeStore.getClusterId())) {
+ break;
+ }
- int myClusterId = nodeStore.getClusterId();
- boolean lockAcquired =
missingLastRevUtil.acquireRecoveryLock(clusterId, myClusterId);
+ Clock clock = nodeStore.getClock();
+ long remaining = waitUntil - clock.getTime();
+ if (remaining < 0) {
+ // no need to wait for lock release, waitUntil already reached
+ return -1;
+ }
- //TODO What if recovery is being performed for current clusterNode by
some other node
- //should we halt the startup
- if(!lockAcquired){
- log.info("Last revision recovery already being performed by some
other node. " +
- "Would not attempt recovery");
- return 0;
+ log.info("Last revision recovery already being performed by " +
+ "cluster node {}. Waiting at most until {} for recovery " +
+ "to finish ({} seconds remaining).",
+ infoDoc.getRecoveryBy(),
Utils.timestampToString(waitUntil),
+ remaining / 1000);
+ // check once every five seconds
+ long time = Math.min(waitUntil, clock.getTime() + 5000);
+ try {
+ clock.waitUntil(time);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ String msg = "Interrupted while waiting for _lastRev recovery
to finish.";
+ throw new DocumentStoreException(msg, e);
+ }
+ infoDoc = missingLastRevUtil.getClusterNodeInfo(clusterId);
+ if (!missingLastRevUtil.isRecoveryNeeded(infoDoc)) {
+ // meanwhile another process finished recovery
+ return 0;
+ }
}
- Iterable<NodeDocument> suspects =
missingLastRevUtil.getCandidates(startTime);
-
- log.info("Performing Last Revision Recovery for clusterNodeId {}",
clusterId);
-
+ // if we get here, the recovery lock was acquired successfully
+ boolean success = false;
try {
- return recover(suspects.iterator(), clusterId);
- } finally {
- Utils.closeIfCloseable(suspects);
+ log.info("Recovering candidates modified after: [{}] for clusterId
[{}] [{}]",
+ Utils.timestampToString(startTime), clusterId, info);
- // Relinquish the lock on the recovery for the cluster on the
- // clusterInfo
- // TODO: in case recover throws a RuntimeException (or Error..)
then
- // the recovery might have failed, yet the instance is marked
- // as 'recovered' (by setting the state to NONE).
- // is this really fine here? or should we not retry - or at least
- // log the throwable?
- missingLastRevUtil.releaseRecoveryLock(clusterId);
+ Iterable<NodeDocument> suspects =
missingLastRevUtil.getCandidates(startTime);
+ try {
+ log.info("Performing Last Revision Recovery for clusterNodeId
{}", clusterId);
+ int num = recover(suspects.iterator(), clusterId);
+ success = true;
+ return num;
+ } finally {
+ Utils.closeIfCloseable(suspects);
+ }
+ } finally {
+ missingLastRevUtil.releaseRecoveryLock(clusterId, success);
nodeStore.signalClusterStateChange();
}
@@ -347,60 +410,50 @@ public class LastRevRecoveryAgent {
/**
* Determines if any of the cluster node failed to renew its lease and
- * did not properly shutdown. If any such cluster node is found then are
potential
- * candidates for last rev recovery
+ * did not properly shutdown. If any such cluster node is found then are
+ * potential candidates for last rev recovery. This method also returns
+ * true when there is a cluster node with an ongoing recovery.
*
- * @return true if last rev recovery needs to be performed for any of the
cluster nodes
+ * @return true if last rev recovery needs to be performed for any of the
+ * cluster nodes
*/
public boolean isRecoveryNeeded(){
- return
missingLastRevUtil.isRecoveryNeeded(nodeStore.getClock().getTime());
+ return missingLastRevUtil.isRecoveryNeeded();
}
public void performRecoveryIfNeeded() {
if (isRecoveryNeeded()) {
- List<Integer> clusterIds = getRecoveryCandidateNodes();
- log.info("ClusterNodeId [{}] starting Last Revision Recovery for
clusterNodeId(s) {}", nodeStore.getClusterId(),
- clusterIds);
+ Iterable<Integer> clusterIds = getRecoveryCandidateNodes();
+ log.info("ClusterNodeId [{}] starting Last Revision Recovery for
clusterNodeId(s) {}",
+ nodeStore.getClusterId(), clusterIds);
for (int clusterId : clusterIds) {
- recover(clusterId);
+ if (recover(clusterId) == -1) {
+ log.info("Last Revision Recovery for cluster node {} " +
+ "ongoing by other cluster node.", clusterId);
+ }
}
}
}
/**
- * Gets the _lastRev recovery candidate cluster nodes.
+ * Gets the _lastRev recovery candidate cluster nodes. This also includes
+ * cluster nodes that are currently being recovered.
*
- * @return the recovery candidate nodes
+ * @return the recovery candidate nodes.
*/
- public List<Integer> getRecoveryCandidateNodes() {
-
- Iterable<ClusterNodeInfoDocument> clusters =
missingLastRevUtil.getAllClusters();
- List<Integer> candidateClusterNodes = Lists.newArrayList();
- List<String> beingRecoveredRightNow = Lists.newArrayList();
-
- for (ClusterNodeInfoDocument nodeInfo : clusters) {
- String id = nodeInfo.getId();
- if (nodeInfo.isBeingRecovered()) {
- Long recoveredBy = (Long)
nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_BY);
- beingRecoveredRightNow.add(nodeInfo == null ? id :
String.format("%s (by %d)", id, recoveredBy));
- } else if (isRecoveryNeeded(nodeInfo)) {
- candidateClusterNodes.add(Integer.valueOf(id));
+ public Iterable<Integer> getRecoveryCandidateNodes() {
+ return Iterables.transform(filter(missingLastRevUtil.getAllClusters(),
+ new Predicate<ClusterNodeInfoDocument>() {
+ @Override
+ public boolean apply(ClusterNodeInfoDocument input) {
+ return missingLastRevUtil.isRecoveryNeeded(input);
}
- }
-
- if (!beingRecoveredRightNow.isEmpty()) {
- log.info("Active cluster nodes already in the process of being
recovered: " + beingRecoveredRightNow);
- }
-
- return candidateClusterNodes;
- }
-
- /**
- * Check if _lastRev recovery needed for this cluster node state is Active
- * && currentTime past the leaseEnd time && recoveryLock not held by
someone
- */
- private boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument
nodeInfo) {
- return nodeInfo.isActive() && nodeStore.getClock().getTime() >
nodeInfo.getLeaseEndTime() && !nodeInfo.isBeingRecovered();
+ }), new Function<ClusterNodeInfoDocument, Integer>() {
+ @Override
+ public Integer apply(ClusterNodeInfoDocument input) {
+ return input.getClusterId();
+ }
+ });
}
private static class ClusterPredicate implements Predicate<Revision> {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
Mon Apr 18 09:26:06 2016
@@ -19,14 +19,25 @@
package org.apache.jackrabbit.oak.plugins.document;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
import
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState.ACTIVE;
+import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.LEASE_END_KEY;
+import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_BY;
+import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.REV_RECOVERY_LOCK;
+import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState.ACQUIRED;
+import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.STATE;
+import static
org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
import static
org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS;
import static
org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs;
import static
org.apache.jackrabbit.oak.plugins.document.util.Utils.getSelectedDocuments;
@@ -42,8 +53,19 @@ public class MissingLastRevSeeker {
private final DocumentStore store;
- public MissingLastRevSeeker(DocumentStore store) {
+ protected final Clock clock;
+
+ private final Predicate<ClusterNodeInfoDocument> isRecoveryNeeded =
+ new Predicate<ClusterNodeInfoDocument>() {
+ @Override
+ public boolean apply(ClusterNodeInfoDocument nodeInfo) {
+ return isRecoveryNeeded(nodeInfo);
+ }
+ };
+
+ public MissingLastRevSeeker(DocumentStore store, Clock clock) {
this.store = store;
+ this.clock = clock;
}
/**
@@ -51,6 +73,7 @@ public class MissingLastRevSeeker {
*
* @return the clusters
*/
+ @Nonnull
public Iterable<ClusterNodeInfoDocument> getAllClusters() {
return ClusterNodeInfoDocument.all(store);
}
@@ -61,9 +84,10 @@ public class MissingLastRevSeeker {
* @param clusterId the cluster id
* @return the cluster node info
*/
+ @CheckForNull
public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
// Fetch all documents.
- return store.find(Collection.CLUSTER_NODES, String.valueOf(clusterId));
+ return store.find(CLUSTER_NODES, String.valueOf(clusterId));
}
/**
@@ -73,6 +97,7 @@ public class MissingLastRevSeeker {
* @param startTime the start time.
* @return the candidates
*/
+ @Nonnull
public Iterable<NodeDocument> getCandidates(final long startTime) {
// Fetch all documents where lastmod >= startTime
Iterable<NodeDocument> nodes = getSelectedDocuments(store,
@@ -87,40 +112,59 @@ public class MissingLastRevSeeker {
}
/**
- * Acquire a recovery lock for the given cluster node info document
+ * Acquire a recovery lock for the given cluster node info document. This
+ * method may break a lock when it determines the cluster node holding the
+ * recovery lock is no more active or its lease expired.
*
* @param clusterId
* id of the cluster that is going to be recovered
* @param recoveredBy
- * id of cluster doing the recovery ({@code 0} when unknown)
+ * id of cluster doing the recovery
* @return whether the lock has been acquired
*/
public boolean acquireRecoveryLock(int clusterId, int recoveredBy) {
- try {
- UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
- update.notEquals(ClusterNodeInfo.REV_RECOVERY_LOCK,
RecoverLockState.ACQUIRED.name());
- update.set(ClusterNodeInfo.REV_RECOVERY_LOCK,
RecoverLockState.ACQUIRED.name());
- if (recoveredBy != 0) {
- update.set(ClusterNodeInfo.REV_RECOVERY_BY, recoveredBy);
- }
- ClusterNodeInfoDocument old =
store.findAndUpdate(Collection.CLUSTER_NODES, update);
- return old != null;
- } catch (RuntimeException ex) {
- LOG.error("Failed to acquire the recovery lock for clusterNodeId "
+ clusterId, ex);
- throw (ex);
+ ClusterNodeInfoDocument doc = getClusterNodeInfo(clusterId);
+ if (doc == null) {
+ // this is unexpected...
+ return false;
}
+ if (!isRecoveryNeeded(doc)) {
+ return false;
+ }
+ boolean acquired = tryAcquireRecoveryLock(doc, recoveredBy);
+ if (acquired) {
+ return true;
+ }
+ // either we already own the lock or were able to break the lock
+ return doc.isBeingRecoveredBy(recoveredBy)
+ || tryBreakRecoveryLock(doc, recoveredBy);
}
- public void releaseRecoveryLock(int clusterId) {
+ /**
+ * Releases the recovery lock on the given {@code clusterId}. If
+ * {@code success} is {@code true}, the state of the cluster node entry
+ * is reset, otherwise it is left as is. That is, for a cluster node which
+ * requires recovery and the recovery process failed, the state will still
+ * be active, when this release method is called with {@code success} set
+ * to {@code false}.
+ *
+ * @param clusterId the id of the cluster node that was recovered.
+ * @param success whether recovery was successful.
+ */
+ public void releaseRecoveryLock(int clusterId, boolean success) {
try {
UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
- update.set(ClusterNodeInfo.REV_RECOVERY_LOCK,
RecoverLockState.NONE.name());
- update.set(ClusterNodeInfo.REV_RECOVERY_BY, null);
- update.set(ClusterNodeInfo.STATE, null);
- ClusterNodeInfoDocument old =
store.findAndUpdate(Collection.CLUSTER_NODES, update);
+ update.set(REV_RECOVERY_LOCK, RecoverLockState.NONE.name());
+ update.set(REV_RECOVERY_BY, null);
+ if (success) {
+ update.set(STATE, null);
+ }
+ ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES,
update);
if (old == null) {
throw new RuntimeException("ClusterNodeInfo document for " +
clusterId + " missing.");
}
+ LOG.info("Released recovery lock for cluster id {} (recovery
successful: {})",
+ clusterId, success);
} catch (RuntimeException ex) {
LOG.error("Failed to release the recovery lock for clusterNodeId "
+ clusterId, ex);
throw (ex);
@@ -131,16 +175,94 @@ public class MissingLastRevSeeker {
return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH));
}
- public boolean isRecoveryNeeded(long currentTime) {
- for(ClusterNodeInfoDocument nodeInfo : getAllClusters()){
- // Check if _lastRev recovery needed for this cluster node
- // state is Active && currentTime past the leaseEnd time &&
recoveryLock not held by someone
- if (nodeInfo.isActive()
- && currentTime > nodeInfo.getLeaseEndTime()
- && !nodeInfo.isBeingRecovered()) {
- return true;
+ public boolean isRecoveryNeeded() {
+ return Iterables.any(getAllClusters(), isRecoveryNeeded);
+ }
+
+ /**
+ * Check if _lastRev recovery needed for this cluster node
+ * state is Active && currentTime past the leaseEnd time
+ */
+ public boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo)
{
+ return nodeInfo.isActive() && clock.getTime() >
nodeInfo.getLeaseEndTime();
+ }
+
+ //-------------------------< internal
>-------------------------------------
+
+ /**
+ * Acquire a recovery lock for the given cluster node info document
+ *
+ * @param info
+ * info document of the cluster that is going to be recovered
+ * @param recoveredBy
+ * id of cluster doing the recovery ({@code 0} when unknown)
+ * @return whether the lock has been acquired
+ */
+ private boolean tryAcquireRecoveryLock(ClusterNodeInfoDocument info,
+ int recoveredBy) {
+ int clusterId = info.getClusterId();
+ try {
+ UpdateOp update = new UpdateOp(Integer.toString(clusterId), false);
+ update.equals(STATE, ACTIVE.name());
+ update.equals(LEASE_END_KEY, info.getLeaseEndTime());
+ update.notEquals(REV_RECOVERY_LOCK, ACQUIRED.name());
+ update.set(REV_RECOVERY_LOCK, ACQUIRED.name());
+ if (recoveredBy != 0) {
+ update.set(REV_RECOVERY_BY, recoveredBy);
+ }
+ ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES,
update);
+ if (old != null) {
+ LOG.info("Acquired recovery lock for cluster id {}",
clusterId);
+ }
+ return old != null;
+ } catch (RuntimeException ex) {
+ LOG.error("Failed to acquire the recovery lock for clusterNodeId "
+ clusterId, ex);
+ throw (ex);
+ }
+ }
+
+ /**
+ * Checks if the recovering cluster node is inactive and then tries to
+ * break the recovery lock.
+ *
+ * @param doc the cluster node info document of the cluster node to acquire
+ * the recovery lock for.
+ * @param recoveredBy id of cluster doing the recovery.
+ * @return whether the lock has been acquired.
+ */
+ private boolean tryBreakRecoveryLock(ClusterNodeInfoDocument doc,
+ int recoveredBy) {
+ Long recoveryBy = doc.getRecoveryBy();
+ if (recoveryBy == null) {
+ // cannot determine current lock owner
+ return false;
+ }
+ ClusterNodeInfoDocument recovering =
getClusterNodeInfo(recoveryBy.intValue());
+ if (recovering == null) {
+ // cannot determine current lock owner
+ return false;
+ }
+ if (recovering.isActive() && recovering.getLeaseEndTime() >
clock.getTime()) {
+ // still active, cannot break lock
+ return false;
+ }
+ // try to break the lock
+ try {
+ UpdateOp update = new
UpdateOp(Integer.toString(doc.getClusterId()), false);
+ update.equals(STATE, ACTIVE.name());
+ update.equals(REV_RECOVERY_LOCK, ACQUIRED.name());
+ update.equals(REV_RECOVERY_BY, recoveryBy);
+ update.set(REV_RECOVERY_BY, recoveredBy);
+ ClusterNodeInfoDocument old = store.findAndUpdate(CLUSTER_NODES,
update);
+ if (old != null) {
+ LOG.info("Acquired (broke) recovery lock for cluster id {}. " +
+ "Previous lock owner: {}", doc.getClusterId(),
recoveryBy);
}
+ return old != null;
+ } catch (RuntimeException ex) {
+ LOG.error("Failed to break the recovery lock for clusterNodeId " +
+ doc.getClusterId(), ex);
+ throw (ex);
}
- return false;
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
Mon Apr 18 09:26:06 2016
@@ -19,9 +19,10 @@
package org.apache.jackrabbit.oak.plugins.document.mongo;
+import javax.annotation.Nonnull;
+
import static com.google.common.collect.Iterables.transform;
import static com.mongodb.QueryBuilder.start;
-import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.RecoverLockState;
import com.google.common.base.Function;
import com.mongodb.BasicDBObject;
@@ -36,6 +37,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+import org.apache.jackrabbit.oak.stats.Clock;
/**
* Mongo specific version of MissingLastRevSeeker which uses mongo queries
@@ -46,12 +48,13 @@ import org.apache.jackrabbit.oak.plugins
public class MongoMissingLastRevSeeker extends MissingLastRevSeeker {
private final MongoDocumentStore store;
- public MongoMissingLastRevSeeker(MongoDocumentStore store) {
- super(store);
+ public MongoMissingLastRevSeeker(MongoDocumentStore store, Clock clock) {
+ super(store, clock);
this.store = store;
}
@Override
+ @Nonnull
public CloseableIterable<NodeDocument> getCandidates(final long startTime)
{
DBObject query =
start(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals(
@@ -72,11 +75,10 @@ public class MongoMissingLastRevSeeker e
}
@Override
- public boolean isRecoveryNeeded(long currentTime) {
+ public boolean isRecoveryNeeded() {
QueryBuilder query =
start(ClusterNodeInfo.STATE).is(ClusterNodeInfo.ClusterNodeState.ACTIVE.name())
- .put(ClusterNodeInfo.LEASE_END_KEY).lessThan(currentTime)
-
.put(ClusterNodeInfo.REV_RECOVERY_LOCK).notEquals(RecoverLockState.ACQUIRED.name());
+ .put(ClusterNodeInfo.LEASE_END_KEY).lessThan(clock.getTime());
return getClusterNodeCollection().findOne(query.get()) != null;
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceCrashTest.java
Mon Apr 18 09:26:06 2016
@@ -32,6 +32,7 @@ import junitx.util.PrivateAccessor;
import static org.junit.Assert.assertNotNull;
import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -175,10 +176,12 @@ public class DocumentDiscoveryLiteServic
logger.info("Going to waitBeforeUnlocking");
waitBeforeUnlocking.acquire();
logger.info("Done with waitBeforeUnlocking");
- missingLastRevUtil.releaseRecoveryLock((Integer)
invocation.getArguments()[0]);
+ missingLastRevUtil.releaseRecoveryLock(
+ (Integer) invocation.getArguments()[0],
+ (Boolean) invocation.getArguments()[1]);
return null;
}
-
}).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt());
+
}).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt(),
anyBoolean());
// let go (or tschaedere loh)
waitBeforeLocking.release();
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgentTest.java
Mon Apr 18 09:26:06 2016
@@ -22,6 +22,7 @@ package org.apache.jackrabbit.oak.plugin
import java.io.IOException;
import java.util.List;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -151,11 +152,11 @@ public class LastRevRecoveryAgentTest {
assertTrue(ds1.getLastRevRecoveryAgent().isRecoveryNeeded());
- List<Integer> cids =
ds1.getLastRevRecoveryAgent().getRecoveryCandidateNodes();
- assertEquals(1, cids.size());
- assertEquals(c2Id, cids.get(0).intValue());
+ Iterable<Integer> cids =
ds1.getLastRevRecoveryAgent().getRecoveryCandidateNodes();
+ assertEquals(1, Iterables.size(cids));
+ assertEquals(c2Id, Iterables.get(cids, 0).intValue());
- ds1.getLastRevRecoveryAgent().recover(cids.get(0));
+ ds1.getLastRevRecoveryAgent().recover(Iterables.get(cids, 0));
assertEquals(zlastRev2, getDocument(ds1,
"/x/y").getLastRev().get(c2Id));
assertEquals(zlastRev2, getDocument(ds1, "/x").getLastRev().get(c2Id));
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
Mon Apr 18 09:26:06 2016
@@ -19,9 +19,9 @@
package org.apache.jackrabbit.oak.plugins.document;
-import java.util.List;
import java.util.Map;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -40,8 +40,10 @@ import org.junit.Test;
import static com.google.common.collect.Lists.newArrayList;
import static
org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class LastRevRecoveryTest {
@Rule
@@ -59,6 +61,7 @@ public class LastRevRecoveryTest {
clock = new Clock.Virtual();
clock.waitUntil(System.currentTimeMillis());
Revision.setClock(clock);
+ ClusterNodeInfo.setClock(clock);
// disable lease check because we fiddle with the virtual clock
final boolean leaseCheck = false;
sharedStore = new MemoryDocumentStore();
@@ -85,6 +88,7 @@ public class LastRevRecoveryTest {
public void tearDown() {
ds1.dispose();
ds2.dispose();
+ ClusterNodeInfo.resetClockToDefault();
Revision.resetClockToDefault();
}
@@ -158,12 +162,130 @@ public class LastRevRecoveryTest {
// run recovery on ds2
LastRevRecoveryAgent agent = new LastRevRecoveryAgent(ds2);
- List<Integer> clusterIds = agent.getRecoveryCandidateNodes();
- assertTrue(clusterIds.contains(c1Id));
+ Iterable<Integer> clusterIds = agent.getRecoveryCandidateNodes();
+ assertTrue(Iterables.contains(clusterIds, c1Id));
assertEquals("must not recover any documents",
0, agent.recover(c1Id));
}
+ // OAK-3488
+ @Test
+ public void recoveryWithTimeout() throws Exception {
+ String clusterId = String.valueOf(c1Id);
+ ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES,
clusterId);
+
+ NodeBuilder builder = ds1.getRoot().builder();
+ builder.child("x").child("y").child("z");
+ merge(ds1, builder);
+ ds1.dispose();
+
+ // reset clusterNodes entry to simulate a crash
+ sharedStore.remove(CLUSTER_NODES, clusterId);
+ sharedStore.create(CLUSTER_NODES,
newArrayList(updateOpFromDocument(doc)));
+
+ // 'wait' until lease expires
+ clock.waitUntil(doc.getLeaseEndTime() + 1);
+
+ // simulate ongoing recovery by cluster node 2
+ MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore,
clock);
+ seeker.acquireRecoveryLock(c1Id, c2Id);
+
+ // run recovery from ds1
+ LastRevRecoveryAgent a1 = new LastRevRecoveryAgent(ds1);
+ // use current time -> do not wait for recovery of other agent
+ assertEquals(-1, a1.recover(c1Id, clock.getTime()));
+
+ seeker.releaseRecoveryLock(c1Id, true);
+
+ assertEquals(0, a1.recover(c1Id, clock.getTime() + 1000));
+ }
+
+ // OAK-3488
+ @Test
+ public void failStartupOnRecoveryTimeout() throws Exception {
+ String clusterId = String.valueOf(c1Id);
+ ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES,
clusterId);
+
+ NodeBuilder builder = ds1.getRoot().builder();
+ builder.child("x").child("y").child("z");
+ merge(ds1, builder);
+ ds1.dispose();
+
+ // reset clusterNodes entry to simulate a crash
+ sharedStore.remove(CLUSTER_NODES, clusterId);
+ sharedStore.create(CLUSTER_NODES,
newArrayList(updateOpFromDocument(doc)));
+
+ // 'wait' until lease expires
+ clock.waitUntil(doc.getLeaseEndTime() + 1);
+ // make sure ds2 lease is still fine
+ ds2.getClusterInfo().renewLease();
+
+ // simulate ongoing recovery by cluster node 2
+ MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore,
clock);
+ assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id));
+
+ // attempt to restart ds1 while lock is acquired
+ try {
+ ds1 = new DocumentMK.Builder()
+ .clock(clock)
+ .setDocumentStore(sharedStore)
+ .setClusterId(c1Id)
+ .getNodeStore();
+ fail("DocumentStoreException expected");
+ } catch (DocumentStoreException e) {
+ // expected
+ }
+ seeker.releaseRecoveryLock(c1Id, true);
+ }
+
+ // OAK-3488
+ @Test
+ public void breakRecoveryLockWithExpiredLease() throws Exception {
+ String clusterId = String.valueOf(c1Id);
+ ClusterNodeInfoDocument info1 = sharedStore.find(CLUSTER_NODES,
clusterId);
+
+ NodeBuilder builder = ds1.getRoot().builder();
+ builder.child("x").child("y").child("z");
+ merge(ds1, builder);
+ ds1.dispose();
+
+ // reset clusterNodes entry to simulate a crash of ds1
+ sharedStore.remove(CLUSTER_NODES, clusterId);
+ sharedStore.create(CLUSTER_NODES,
newArrayList(updateOpFromDocument(info1)));
+
+ // 'wait' until lease expires
+ clock.waitUntil(info1.getLeaseEndTime() + 1);
+ // make sure ds2 lease is still fine
+ ds2.getClusterInfo().renewLease();
+
+ // start of recovery by ds2
+ MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore,
clock);
+ assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id));
+ // simulate crash of ds2
+ ClusterNodeInfoDocument info2 = sharedStore.find(CLUSTER_NODES,
String.valueOf(c2Id));
+ ds2.dispose();
+ // reset clusterNodes entry
+ sharedStore.remove(CLUSTER_NODES, String.valueOf(c2Id));
+ sharedStore.create(CLUSTER_NODES,
newArrayList(updateOpFromDocument(info2)));
+ // 'wait' until ds2's lease expires
+ clock.waitUntil(info2.getLeaseEndTime() + 1);
+
+ info1 = sharedStore.find(CLUSTER_NODES, clusterId);
+ assertTrue(seeker.isRecoveryNeeded(info1));
+ assertTrue(info1.isBeingRecovered());
+
+ // restart ds1
+ ds1 = builderProvider.newBuilder()
+ .clock(clock)
+ .setLeaseCheck(false)
+ .setAsyncDelay(0)
+ .setDocumentStore(sharedStore)
+ .setClusterId(1)
+ .getNodeStore();
+ info1 = sharedStore.find(CLUSTER_NODES, clusterId);
+ assertFalse(seeker.isRecoveryNeeded(info1));
+ assertFalse(info1.isBeingRecovered());
+ }
private NodeDocument getDocument(DocumentNodeStore nodeStore, String path)
{
return nodeStore.getDocumentStore().find(Collection.NODES,
Utils.getIdFromPath(path));
@@ -186,11 +308,13 @@ public class LastRevRecoveryTest {
}
} else {
if (obj instanceof Boolean) {
- op.set(key, ((Boolean) obj).booleanValue());
+ op.set(key, (Boolean) obj);
} else if (obj instanceof Number) {
op.set(key, ((Number) obj).longValue());
- } else {
+ } else if (obj != null) {
op.set(key, obj.toString());
+ } else {
+ op.set(key, null);
}
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
Mon Apr 18 09:26:06 2016
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.stats.Clock;
@@ -172,10 +173,10 @@ public class LastRevSingleNodeRecoveryTe
clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() +
1000);
LastRevRecoveryAgent recoveryAgent =
mk.getNodeStore().getLastRevRecoveryAgent();
- List<Integer> cids = recoveryAgent.getRecoveryCandidateNodes();
+ Iterable<Integer> cids = recoveryAgent.getRecoveryCandidateNodes();
- assertEquals(1, cids.size());
- assertEquals(Integer.valueOf(1), cids.get(0));
+ assertEquals(1, Iterables.size(cids));
+ assertEquals(Integer.valueOf(1), Iterables.get(cids, 0));
}
private void setupScenario() throws InterruptedException {
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java?rev=1739717&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
Mon Apr 18 09:26:06 2016
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static
org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MissingLastRevSeekerTest {
+
+ private Clock clock;
+ private DocumentStore store;
+ private MissingLastRevSeeker seeker;
+
+ @Before
+ public void before() throws Exception {
+ clock = new Clock.Virtual();
+ clock.waitUntil(System.currentTimeMillis());
+ Revision.setClock(clock);
+ ClusterNodeInfo.setClock(clock);
+ store = new MemoryDocumentStore();
+ seeker = new MissingLastRevSeeker(store, clock);
+ }
+
+ @After
+ public void after() throws Exception {
+ ClusterNodeInfo.resetClockToDefault();
+ Revision.resetClockToDefault();
+ }
+
+ @Test
+ public void acquireRecoveryLockOnActiveClusterNode() {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+
+ assertFalse(seeker.acquireRecoveryLock(1, 2));
+ }
+
+ @Test
+ public void acquireRecoveryLockOnInactiveClusterNode() {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+ nodeInfo1.dispose();
+
+ assertFalse(seeker.acquireRecoveryLock(1, 2));
+ }
+
+ @Test
+ public void acquireRecoveryLockOnExpiredLease() throws Exception {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+ // expire the lease
+ clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ }
+
+ @Test
+ public void acquireRecoveryLockOnAlreadyLocked() throws Exception {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+ // expire the lease
+ clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+ ClusterNodeInfo nodeInfo2 = ClusterNodeInfo.getInstance(store, 2);
+ nodeInfo2.renewLease();
+
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ assertFalse(seeker.acquireRecoveryLock(1, 3));
+ }
+
+ @Test
+ public void acquireRecoveryLockAgain() throws Exception {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+ // expire the lease
+ clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ }
+
+ @Test
+ public void releaseRecoveryLockSuccessTrue() throws Exception {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+ // expire the lease
+ clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ assertTrue(getClusterNodeInfo(1).isBeingRecovered());
+ assertTrue(getClusterNodeInfo(1).isActive());
+ seeker.releaseRecoveryLock(1, true);
+ assertFalse(getClusterNodeInfo(1).isBeingRecovered());
+ assertFalse(getClusterNodeInfo(1).isActive());
+ // recovery not needed anymore
+ assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+ assertFalse(seeker.acquireRecoveryLock(1, 2));
+ }
+
+ @Test
+ public void releaseRecoveryLockSuccessFalse() throws Exception {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+ // expire the lease
+ clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ assertTrue(getClusterNodeInfo(1).isBeingRecovered());
+ assertTrue(getClusterNodeInfo(1).isActive());
+ seeker.releaseRecoveryLock(1, false);
+ assertFalse(getClusterNodeInfo(1).isBeingRecovered());
+ assertTrue(getClusterNodeInfo(1).isActive());
+ // recovery still needed
+ assertTrue(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ }
+
+ @Test
+ public void isRecoveryNeeded() throws Exception {
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+ // expire the lease
+ clock.waitUntil(clock.getTime() + DEFAULT_LEASE_DURATION_MILLIS + 1);
+
+ ClusterNodeInfo nodeInfo2 = ClusterNodeInfo.getInstance(store, 2);
+ nodeInfo2.renewLease();
+
+ assertTrue(seeker.isRecoveryNeeded());
+ assertTrue(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+ assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(2)));
+
+ assertTrue(seeker.acquireRecoveryLock(1, 2));
+ seeker.releaseRecoveryLock(1, true);
+
+ assertFalse(seeker.isRecoveryNeeded());
+ assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(1)));
+ assertFalse(seeker.isRecoveryNeeded(getClusterNodeInfo(2)));
+ }
+
+ @Test
+ public void getAllClusterNodes() {
+ assertEquals(0, Iterables.size(seeker.getAllClusters()));
+
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+
+ assertEquals(1, Iterables.size(seeker.getAllClusters()));
+
+ ClusterNodeInfo nodeInfo2 = ClusterNodeInfo.getInstance(store, 2);
+ nodeInfo2.renewLease();
+
+ assertEquals(2, Iterables.size(seeker.getAllClusters()));
+ }
+
+ @Test
+ public void getClusterNodeInfo() {
+ assertNull(getClusterNodeInfo(1));
+
+ ClusterNodeInfo nodeInfo1 = ClusterNodeInfo.getInstance(store, 1);
+ nodeInfo1.renewLease();
+
+ assertNotNull(getClusterNodeInfo(1));
+ }
+
+ private ClusterNodeInfoDocument getClusterNodeInfo(int clusterId) {
+ return seeker.getClusterNodeInfo(clusterId);
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeekerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/AcquireRecoveryLockTest.java
Mon Apr 18 09:26:06 2016
@@ -18,29 +18,53 @@ package org.apache.jackrabbit.oak.plugin
import java.util.List;
+import com.mongodb.DB;
+
import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo;
import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
import org.junit.Test;
import static com.google.common.collect.Lists.newArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class AcquireRecoveryLockTest extends AbstractMongoConnectionTest {
+ private Clock clock = new Clock.Virtual();
+
+ @Before
+ public void before() throws Exception {
+ clock.waitUntil(System.currentTimeMillis());
+ }
+
+ @Override
+ protected DocumentMK.Builder newBuilder(DB db) throws Exception {
+ // disable lease check because test waits until lease times out
+ return super.newBuilder(db).setLeaseCheck(false);
+ }
+
+ @Override
+ protected Clock getTestClock() throws InterruptedException {
+ return clock;
+ }
+
// OAK-4131
@Test
public void recoveryBy() throws Exception {
MongoDocumentStore store = new MongoDocumentStore(
mongoConnection.getDB(), new DocumentMK.Builder());
- MongoMissingLastRevSeeker seeker = new
MongoMissingLastRevSeeker(store);
+ MongoMissingLastRevSeeker seeker = new
MongoMissingLastRevSeeker(store, getTestClock());
List<ClusterNodeInfoDocument> infoDocs =
newArrayList(seeker.getAllClusters());
assertEquals(1, infoDocs.size());
int clusterId = infoDocs.get(0).getClusterId();
int otherClusterId = clusterId + 1;
- seeker.acquireRecoveryLock(clusterId, otherClusterId);
+ getTestClock().waitUntil(getTestClock().getTime() +
ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS + 1000);
+ assertTrue(seeker.acquireRecoveryLock(clusterId, otherClusterId));
ClusterNodeInfoDocument doc = seeker.getClusterNodeInfo(clusterId);
Object recoveryBy = doc.get(ClusterNodeInfo.REV_RECOVERY_BY);
assertNotNull(recoveryBy);
Modified:
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/OakMongoNSRepositoryStub.java
Mon Apr 18 09:26:06 2016
@@ -33,6 +33,14 @@ import org.apache.jackrabbit.oak.query.Q
*/
public class OakMongoNSRepositoryStub extends OakRepositoryStub {
+ static {
+ MongoConnection c = MongoUtils.getConnection();
+ if (c != null) {
+ MongoUtils.dropCollections(c.getDB());
+ c.close();
+ }
+ }
+
private final MongoConnection connection;
private final Repository repository;
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java?rev=1739717&r1=1739716&r2=1739717&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
(original)
+++
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/RecoveryCommand.java
Mon Apr 18 09:26:06 2016
@@ -50,7 +50,8 @@ class RecoveryCommand implements Command
}
MongoDocumentStore docStore = (MongoDocumentStore)
dns.getDocumentStore();
LastRevRecoveryAgent agent = new LastRevRecoveryAgent(dns);
- MongoMissingLastRevSeeker seeker = new
MongoMissingLastRevSeeker(docStore);
+ MongoMissingLastRevSeeker seeker = new MongoMissingLastRevSeeker(
+ docStore, dns.getClock());
CloseableIterable<NodeDocument> docs = seeker.getCandidates(0);
closer.register(docs);
boolean dryRun = Arrays.asList(args).contains("dryRun");