Author: chetanm
Date: Sun Nov 20 16:29:18 2016
New Revision: 1770569
URL: http://svn.apache.org/viewvc?rev=1770569&view=rev
Log:
OAK-2108 - Killing a cluster node may stop async index update to to 30 minutes
Applying modified patch from Alex. For non clusterable NodeStores like
SegmentNodeStore there would not be any lease check performed. So in case of
abrupt shutdown indexing would not get delayed
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
Sun Nov 20 16:29:18 2016
@@ -247,8 +247,11 @@ public class AsyncIndexUpdate implements
private List<ValidatorProvider> validatorProviders =
Collections.emptyList();
- /** Expiration time of the last lease we committed */
- private long lease;
+ /**
+ * Expiration time of the last lease we committed, null if lease is
+ * disabled
+ */
+ private Long lease = null;
private boolean hasLease = false;
@@ -269,19 +272,29 @@ public class AsyncIndexUpdate implements
if (hasLease) {
return;
}
- long now = System.currentTimeMillis();
- this.lease = now + 2 * leaseTimeOut;
-
NodeState root = store.getRoot();
- long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
- if (beforeLease > now) {
- throw CONCURRENT_UPDATE;
- }
+ NodeState async = root.getChildNode(ASYNC);
+ if(isLeaseCheckEnabled(leaseTimeOut)) {
+ long now = System.currentTimeMillis();
+ this.lease = now + 2 * leaseTimeOut;
+ long beforeLease = async.getLong(leaseName);
+ if (beforeLease > now) {
+ throw CONCURRENT_UPDATE;
+ }
- NodeBuilder builder = root.builder();
- NodeBuilder async = builder.child(ASYNC);
- async.setProperty(leaseName, lease);
- mergeWithConcurrencyCheck(store, validatorProviders, builder,
checkpoint, beforeLease, name);
+ NodeBuilder builder = root.builder();
+ builder.child(ASYNC).setProperty(leaseName, lease);
+ mergeWithConcurrencyCheck(store, validatorProviders, builder,
checkpoint, beforeLease, name);
+ } else {
+ lease = null;
+ // remove stale lease info if needed
+ if (async.hasProperty(leaseName)) {
+ NodeBuilder builder = root.builder();
+ builder.child(ASYNC).removeProperty(leaseName);
+ mergeWithConcurrencyCheck(store, validatorProviders,
+ builder, checkpoint, null, name);
+ }
+ }
hasLease = true;
}
@@ -330,10 +343,13 @@ public class AsyncIndexUpdate implements
}
void close() throws CommitFailedException {
- NodeBuilder builder = store.getRoot().builder();
- NodeBuilder async = builder.child(ASYNC);
- async.removeProperty(leaseName);
- mergeWithConcurrencyCheck(store, validatorProviders, builder,
async.getString(name), lease, name);
+ if (isLeaseCheckEnabled(leaseTimeOut)) {
+ NodeBuilder builder = store.getRoot().builder();
+ NodeBuilder async = builder.child(ASYNC);
+ async.removeProperty(leaseName);
+ mergeWithConcurrencyCheck(store, validatorProviders, builder,
+ async.getString(name), lease, name);
+ }
}
@Override
@@ -343,7 +359,7 @@ public class AsyncIndexUpdate implements
throw INTERRUPTED;
}
- if (indexStats.incUpdates() % 100 == 0) {
+ if (indexStats.incUpdates() % 100 == 0 &&
isLeaseCheckEnabled(leaseTimeOut)) {
long now = System.currentTimeMillis();
if (now + leaseTimeOut > lease) {
long newLease = now + 2 * leaseTimeOut;
@@ -422,17 +438,19 @@ public class AsyncIndexUpdate implements
log.debug("[{}] Running background index task", name);
NodeState root = store.getRoot();
-
- // check for concurrent updates
NodeState async = root.getChildNode(ASYNC);
- long leaseEndTime = async.getLong(leasify(name));
- long currentTime = System.currentTimeMillis();
- if (leaseEndTime > currentTime) {
- long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
- String err = String.format("Another copy of the index update is
already running; skipping this update. " +
- "Time left for lease to expire %d s. Indexing can resume
by %tT", leaseExpMsg, leaseEndTime);
- indexStats.failed(new Exception(err, CONCURRENT_UPDATE));
- return;
+
+ if (isLeaseCheckEnabled(leaseTimeOut)) {
+ // check for concurrent updates
+ long leaseEndTime = async.getLong(leasify(name));
+ long currentTime = System.currentTimeMillis();
+ if (leaseEndTime > currentTime) {
+ long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
+ String err = String.format("Another copy of the index update
is already running; skipping this update. " +
+ "Time left for lease to expire %d s. Indexing can
resume by %tT", leaseExpMsg, leaseEndTime);
+ indexStats.failed(new Exception(err, CONCURRENT_UPDATE));
+ return;
+ }
}
// start collecting runtime statistics
@@ -702,8 +720,12 @@ public class AsyncIndexUpdate implements
return name + "-temp";
}
+ private static boolean isLeaseCheckEnabled(long leaseTimeOut) {
+ return leaseTimeOut > 0;
+ }
+
private static void mergeWithConcurrencyCheck(final NodeStore store,
List<ValidatorProvider> validatorProviders,
- NodeBuilder builder, final
String checkpoint, final long lease,
+ NodeBuilder builder, final
String checkpoint, final Long lease,
final String name) throws
CommitFailedException {
CommitHook concurrentUpdateCheck = new CommitHook() {
@Override @Nonnull
@@ -712,9 +734,9 @@ public class AsyncIndexUpdate implements
throws CommitFailedException {
// check for concurrent updates by this async task
NodeState async = before.getChildNode(ASYNC);
- if ((checkpoint == null || Objects.equal(checkpoint,
- async.getString(name)))
- && lease == async.getLong(leasify(name))) {
+ if ((checkpoint == null || Objects.equal(checkpoint,
async.getString(name)))
+ &&
+ (lease == null || lease ==
async.getLong(leasify(name)))) {
return after;
} else {
throw CONCURRENT_UPDATE;
@@ -1217,7 +1239,7 @@ public class AsyncIndexUpdate implements
this.newIndexTaskName = newIndexTaskName;
}
- void maybeSplit(@CheckForNull String refCheckpoint, long lease)
+ void maybeSplit(@CheckForNull String refCheckpoint, Long lease)
throws CommitFailedException {
if (paths == null) {
return;
@@ -1225,7 +1247,7 @@ public class AsyncIndexUpdate implements
split(refCheckpoint, lease);
}
- private void split(@CheckForNull String refCheckpoint, long lease)
throws CommitFailedException {
+ private void split(@CheckForNull String refCheckpoint, Long lease)
throws CommitFailedException {
NodeBuilder builder = store.getRoot().builder();
if (refCheckpoint != null) {
String tempCpName = getTempCpName(name);
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
Sun Nov 20 16:29:18 2016
@@ -36,6 +36,7 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardIndexEditorProvider;
@@ -97,6 +98,11 @@ public class AsyncIndexerService {
long leaseTimeOutMin =
PropertiesUtil.toInteger(config.get(PROP_LEASE_TIME_OUT),
PROP_LEASE_TIMEOUT_DEFAULT);
+ if (!(nodeStore instanceof Clusterable)){
+ leaseTimeOutMin = 0;
+ log.info("Detected non clusterable setup. Lease checking would be
disabled for async indexing");
+ }
+
for (AsyncConfig c : asyncIndexerConfig) {
AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore,
indexEditorProvider,
statisticsProvider, false);
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
Sun Nov 20 16:29:18 2016
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdateTest.CommitInfoCollector;
@@ -35,12 +37,12 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -53,19 +55,13 @@ public class AsyncIndexerServiceTest {
@Rule
public final OsgiContext context = new OsgiContext();
- private MemoryNodeStore nodeStore = new MemoryNodeStore();
+ private MemoryNodeStore nodeStore = new FakeClusterableMemoryNodeStore();
private AsyncIndexerService service = new AsyncIndexerService();
- @Before
- public void setUp() {
- context.registerService(StatisticsProvider.class,
StatisticsProvider.NOOP);
- context.registerService(NodeStore.class, nodeStore);
- context.registerService(ValidatorProvider.class, new
ChangeCollectorProvider());
- MockOsgi.injectServices(service, context.bundleContext());
- }
@Test
public void asyncReg() throws Exception{
+ injectDefaultServices();
Map<String,Object> config = ImmutableMap.<String, Object>of(
"asyncConfigs", new String[] {"async:5"}
);
@@ -79,6 +75,7 @@ public class AsyncIndexerServiceTest {
@Test
public void leaseTimeout() throws Exception{
+ injectDefaultServices();
Map<String,Object> config = ImmutableMap.<String, Object>of(
"asyncConfigs", new String[] {"async:5"},
"leaseTimeOutMinutes" , "20"
@@ -90,6 +87,7 @@ public class AsyncIndexerServiceTest {
@Test
public void changeCollectionEnabled() throws Exception{
+ injectDefaultServices();
Map<String,Object> config = ImmutableMap.<String, Object>of(
"asyncConfigs", new String[] {"async:5"}
);
@@ -117,8 +115,18 @@ public class AsyncIndexerServiceTest {
assertNotNull(changeSet);
}
- private AsyncIndexUpdate getIndexUpdate(String name) {
- return (AsyncIndexUpdate) context.getServices(Runnable.class,
"(oak.async="+name+")")[0];
+ @Test
+ public void nonClusterableNodeStoreAndLeaseTimeout() throws Exception{
+ nodeStore = new MemoryNodeStore();
+ injectDefaultServices();
+
+ Map<String,Object> config = ImmutableMap.<String, Object>of(
+ "asyncConfigs", new String[] {"async:5"},
+ "leaseTimeOutMinutes" , "20"
+ );
+ MockOsgi.activate(service, context.bundleContext(), config);
+ AsyncIndexUpdate indexUpdate = getIndexUpdate("async");
+ assertEquals(0, indexUpdate.getLeaseTimeOut());
}
@Test
@@ -133,4 +141,23 @@ public class AsyncIndexerServiceTest {
assertEquals("foo", configs.get(1).name);
assertEquals(23, configs.get(1).timeIntervalInSecs);
}
+
+ private void injectDefaultServices() {
+ context.registerService(StatisticsProvider.class,
StatisticsProvider.NOOP);
+ context.registerService(NodeStore.class, nodeStore);
+ context.registerService(ValidatorProvider.class, new
ChangeCollectorProvider());
+ MockOsgi.injectServices(service, context.bundleContext());
+ }
+
+ private AsyncIndexUpdate getIndexUpdate(String name) {
+ return (AsyncIndexUpdate) context.getServices(Runnable.class,
"(oak.async="+name+")")[0];
+ }
+
+ private static class FakeClusterableMemoryNodeStore extends
MemoryNodeStore implements Clusterable {
+ @Nonnull
+ @Override
+ public String getInstanceId() {
+ return "foo";
+ }
+ }
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
(original)
+++
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
Sun Nov 20 16:29:18 2016
@@ -22,6 +22,7 @@ import static org.apache.jackrabbit.oak.
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -349,6 +350,52 @@ public class AsyncIndexUpdateLeaseTest e
.setLeaseTimeOut(lease));
}
+
+ @Test
+ public void testLeaseDisabled() throws Exception {
+ // take care of initial reindex before
+ AsyncIndexUpdate async = new AsyncIndexUpdate(name, store,
provider).setLeaseTimeOut(0);
+ async.run();
+
+ testContent(store);
+ assertRunOk(async);
+
+ testContent(store);
+ assertRunOk(async);
+
+ executed.set(true);
+ }
+
+ @Test
+ public void testLeaseExpiredToDisabled() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ // add extra indexed content
+ testContent(store);
+
+ // make it look like lease got stuck due to force shutdown
+ NodeBuilder builder = store.getRoot().builder();
+ builder.getChildNode(AsyncIndexUpdate.ASYNC).setProperty(
+ AsyncIndexUpdate.leasify(name),
+ System.currentTimeMillis() + 500000);
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void postIndexUpdate() {
+ executed.set(true);
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+ .setLeaseTimeOut(0));
+
+ assertFalse("Stale lease info must be cleaned",
+ store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ .hasProperty(AsyncIndexUpdate.leasify(name)));
+ }
+
// -------------------------------------------------------------------
private static String getReferenceCp(NodeStore store, String name) {