Author: reschke
Date: Thu Jan 26 16:37:47 2017
New Revision: 1780424
URL: http://svn.apache.org/viewvc?rev=1780424&view=rev
Log:
OAK-5446: leaseUpdateThread might be blocked by leaseUpdateCheck
- add test
- move cluster state update out of background leaseUpdateThread
- make sure that cluster state update bypasses the leaseCheckWrapper
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentLeaseUpdateRetryTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1780424&r1=1780423&r2=1780424&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Thu Jan 26 16:37:47 2017
@@ -192,6 +192,11 @@ public final class DocumentNodeStore
private int journalPushThreshold =
Integer.getInteger("oak.journalPushThreshold", 100000);
/**
+ * The document store without potentially lease checking wrapper.
+ */
+ private final DocumentStore nonLeaseCheckingStore;
+
+ /**
* The document store (might be used by multiple node stores).
*/
protected final DocumentStore store;
@@ -310,6 +315,12 @@ public final class DocumentNodeStore
private Thread leaseUpdateThread;
/**
+ * Background thread performing the cluster update
+ */
+ @Nonnull
+ private Thread clusterUpdateThread;
+
+ /**
* Read/Write lock for background operations. Regular commits will acquire
* a shared lock, while a background write acquires an exclusive lock.
*/
@@ -486,6 +497,8 @@ public final class DocumentNodeStore
// this cluster id
cid = clusterNodeInfo.getId();
+ this.nonLeaseCheckingStore = s;
+
if (builder.getLeaseCheck()) {
s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
@@ -596,7 +609,14 @@ public final class DocumentNodeStore
if (!readOnlyMode) {
leaseUpdateThread.start();
}
-
+
+ clusterUpdateThread = new Thread(new BackgroundClusterUpdate(this,
isDisposed),
+ "DocumentNodeStore cluster update thread " +
threadNamePostfix);
+ clusterUpdateThread.setDaemon(true);
+ if (!readOnlyMode) {
+ clusterUpdateThread.start();
+ }
+
persistentCache = builder.getPersistentCache();
if (!readOnlyMode && persistentCache != null) {
DynamicBroadcastConfig broadcastConfig = new
DocumentBroadcastConfig(this);
@@ -686,6 +706,12 @@ public final class DocumentNodeStore
}
try {
+ clusterUpdateThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ try {
leaseUpdateThread.join();
} catch (InterruptedException e) {
// ignore
@@ -1913,7 +1939,7 @@ public final class DocumentNodeStore
boolean updateClusterState() {
boolean hasChanged = false;
Set<Integer> clusterIds = Sets.newHashSet();
- for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(store))
{
+ for (ClusterNodeInfoDocument doc :
ClusterNodeInfoDocument.all(nonLeaseCheckingStore)) {
int cId = doc.getClusterId();
clusterIds.add(cId);
ClusterNodeInfoDocument old = clusterNodes.get(cId);
@@ -2851,7 +2877,7 @@ public final class DocumentNodeStore
/** OAK-4859 : log if time between two renewClusterIdLease calls is
too long **/
private long lastRenewClusterIdLeaseCall = -1;
-
+
BackgroundLeaseUpdate(DocumentNodeStore nodeStore,
AtomicBoolean isDisposed) {
super(nodeStore, isDisposed, Suppliers.ofInstance(1000));
@@ -2873,11 +2899,19 @@ public final class DocumentNodeStore
}
// first renew the clusterId lease
nodeStore.renewClusterIdLease();
+ }
+ }
+
+ static class BackgroundClusterUpdate extends NodeStoreTask {
- // then, independently if the lease had to be updated or not, check
- // the status:
+ BackgroundClusterUpdate(DocumentNodeStore nodeStore,
+ AtomicBoolean isDisposed) {
+ super(nodeStore, isDisposed, Suppliers.ofInstance(1000));
+ }
+
+ @Override
+ protected void execute(@Nonnull DocumentNodeStore nodeStore) {
if (nodeStore.updateClusterState()) {
- // then inform the discovery lite listener - if it is
registered
nodeStore.signalClusterStateChange();
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentLeaseUpdateRetryTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentLeaseUpdateRetryTest.java?rev=1780424&r1=1780423&r2=1780424&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentLeaseUpdateRetryTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentLeaseUpdateRetryTest.java
Thu Jan 26 16:37:47 2017
@@ -17,8 +17,10 @@
package org.apache.jackrabbit.oak.plugins.document;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import java.util.List;
+
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.Clock.Virtual;
import org.junit.After;
@@ -31,21 +33,35 @@ public class DocumentLeaseUpdateRetryTes
private DocumentNodeStore ns;
private Virtual clock;
+ private TestStore ds;
@Before
public void setup() throws Exception {
clock = new Clock.Virtual();
ClusterNodeInfo.setClock(clock);
- ns = new DocumentMK.Builder().clock(clock).getNodeStore();
+ ds = new TestStore();
+ ns = new
DocumentMK.Builder().clock(clock).setDocumentStore(ds).setLeaseCheck(true).getNodeStore();
}
@After
public void tearDown() throws Exception {
ClusterNodeInfo.resetClockToDefault();
+ ns.dispose();
}
@Test
public void testLeaseRetryLoop() throws Exception {
+ internalTestLeaseRetryLoop(false);
+ }
+
+ @Test
+ public void testLeaseRetryLoopWithDelay() throws Exception {
+ // see OAK-5446
+ // (simulates a very slow read access on the clusterNodes collection)
+ internalTestLeaseRetryLoop(true);
+ }
+
+ private void internalTestLeaseRetryLoop(boolean withDelay) throws
Exception {
ClusterNodeInfo clusterInfo = ns.getClusterInfo();
long leaseTime = clusterInfo.getLeaseTime();
long leaseEndTime1 = clusterInfo.getLeaseEndTime();
@@ -73,18 +89,45 @@ public class DocumentLeaseUpdateRetryTes
// again assert that lease is fine -> do some dummy ns call
ns.checkpoint(2);
+ if (withDelay) {
+ // mark the TestStore as delaying from now on
+ ds.setDelaying(true);
+ Thread.sleep(1200);
+ }
+
// now forward the virtual clock by more than the lease time - which
// should cause lease to time out
clock.waitUntil(clock.getTime() + leaseTime + leaseUpdateInterval +
1000);
// so the next call to the lease check wrapper should now run into the
// retry loop, as the lease has timed out
- try {
- ns.checkpoint(3);
- } catch (Exception e) {
- // it should not fail however, since we should be able to do the
- // retry
- fail("call should not have failed: " + e);
+ ns.checkpoint(3); // should not fail
+ }
+
+ final class TestStore extends DocumentStoreWrapper {
+
+ private boolean delaying = false;
+
+ TestStore() {
+ super(new MemoryDocumentStore());
+ }
+
+ void setDelaying(boolean delaying) {
+ this.delaying = delaying;
+ }
+
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection,
String fromKey, String toKey, int limit) {
+ if (delaying && collection == Collection.CLUSTER_NODES) {
+ try {
+ // make the lookup on the clusterNodes collection *really*
+ // slow
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return super.query(collection, fromKey, toKey, limit);
}
}
}