Author: chetanm
Date: Sun Nov 20 16:29:18 2016
New Revision: 1770569

URL: http://svn.apache.org/viewvc?rev=1770569&view=rev
Log:
OAK-2108 - Killing a cluster node may stop async index update to to 30 minutes

Applying modified patch from Alex. For non clusterable NodeStores like 
SegmentNodeStore there would not be any lease check performed. So in case of 
abrupt shutdown indexing would not get delayed

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
    
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
 Sun Nov 20 16:29:18 2016
@@ -247,8 +247,11 @@ public class AsyncIndexUpdate implements
 
         private List<ValidatorProvider> validatorProviders = 
Collections.emptyList();
 
-        /** Expiration time of the last lease we committed */
-        private long lease;
+        /**
+         * Expiration time of the last lease we committed, null if lease is
+         * disabled
+         */
+        private Long lease = null;
 
         private boolean hasLease = false;
 
@@ -269,19 +272,29 @@ public class AsyncIndexUpdate implements
             if (hasLease) {
                 return;
             }
-            long now = System.currentTimeMillis();
-            this.lease = now + 2 * leaseTimeOut;
-
             NodeState root = store.getRoot();
-            long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
-            if (beforeLease > now) {
-                throw CONCURRENT_UPDATE;
-            }
+            NodeState async = root.getChildNode(ASYNC);
+            if(isLeaseCheckEnabled(leaseTimeOut)) {
+                long now = System.currentTimeMillis();
+                this.lease = now + 2 * leaseTimeOut;
+                long beforeLease = async.getLong(leaseName);
+                if (beforeLease > now) {
+                    throw CONCURRENT_UPDATE;
+                }
 
-            NodeBuilder builder = root.builder();
-            NodeBuilder async = builder.child(ASYNC);
-            async.setProperty(leaseName, lease);
-            mergeWithConcurrencyCheck(store, validatorProviders, builder, 
checkpoint, beforeLease, name);
+                NodeBuilder builder = root.builder();
+                builder.child(ASYNC).setProperty(leaseName, lease);
+                mergeWithConcurrencyCheck(store, validatorProviders, builder, 
checkpoint, beforeLease, name);
+            } else {
+                lease = null;
+                // remove stale lease info if needed
+                if (async.hasProperty(leaseName)) {
+                    NodeBuilder builder = root.builder();
+                    builder.child(ASYNC).removeProperty(leaseName);
+                    mergeWithConcurrencyCheck(store, validatorProviders,
+                            builder, checkpoint, null, name);
+                }
+            }
             hasLease = true;
         }
 
@@ -330,10 +343,13 @@ public class AsyncIndexUpdate implements
         }
 
         void close() throws CommitFailedException {
-            NodeBuilder builder = store.getRoot().builder();
-            NodeBuilder async = builder.child(ASYNC);
-            async.removeProperty(leaseName);
-            mergeWithConcurrencyCheck(store, validatorProviders, builder, 
async.getString(name), lease, name);
+            if (isLeaseCheckEnabled(leaseTimeOut)) {
+                NodeBuilder builder = store.getRoot().builder();
+                NodeBuilder async = builder.child(ASYNC);
+                async.removeProperty(leaseName);
+                mergeWithConcurrencyCheck(store, validatorProviders, builder,
+                        async.getString(name), lease, name);
+            }
         }
 
         @Override
@@ -343,7 +359,7 @@ public class AsyncIndexUpdate implements
                 throw INTERRUPTED;
             }
 
-            if (indexStats.incUpdates() % 100 == 0) {
+            if (indexStats.incUpdates() % 100 == 0 && 
isLeaseCheckEnabled(leaseTimeOut)) {
                 long now = System.currentTimeMillis();
                 if (now + leaseTimeOut > lease) {
                     long newLease = now + 2 * leaseTimeOut;
@@ -422,17 +438,19 @@ public class AsyncIndexUpdate implements
         log.debug("[{}] Running background index task", name);
 
         NodeState root = store.getRoot();
-
-        // check for concurrent updates
         NodeState async = root.getChildNode(ASYNC);
-        long leaseEndTime = async.getLong(leasify(name));
-        long currentTime = System.currentTimeMillis();
-        if (leaseEndTime > currentTime) {
-            long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
-            String err = String.format("Another copy of the index update is 
already running; skipping this update. " +
-                    "Time left for lease to expire %d s. Indexing can resume 
by %tT", leaseExpMsg, leaseEndTime);
-            indexStats.failed(new Exception(err, CONCURRENT_UPDATE));
-            return;
+
+        if (isLeaseCheckEnabled(leaseTimeOut)) {
+            // check for concurrent updates
+            long leaseEndTime = async.getLong(leasify(name));
+            long currentTime = System.currentTimeMillis();
+            if (leaseEndTime > currentTime) {
+                long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
+                String err = String.format("Another copy of the index update 
is already running; skipping this update. " +
+                        "Time left for lease to expire %d s. Indexing can 
resume by %tT", leaseExpMsg, leaseEndTime);
+                indexStats.failed(new Exception(err, CONCURRENT_UPDATE));
+                return;
+            }
         }
 
         // start collecting runtime statistics
@@ -702,8 +720,12 @@ public class AsyncIndexUpdate implements
         return name + "-temp";
     }
 
+    private static boolean isLeaseCheckEnabled(long leaseTimeOut) {
+        return leaseTimeOut > 0;
+    }
+
     private static void mergeWithConcurrencyCheck(final NodeStore store, 
List<ValidatorProvider> validatorProviders,
-                                                  NodeBuilder builder, final 
String checkpoint, final long lease,
+                                                  NodeBuilder builder, final 
String checkpoint, final Long lease,
                                                   final String name) throws 
CommitFailedException {
         CommitHook concurrentUpdateCheck = new CommitHook() {
             @Override @Nonnull
@@ -712,9 +734,9 @@ public class AsyncIndexUpdate implements
                     throws CommitFailedException {
                 // check for concurrent updates by this async task
                 NodeState async = before.getChildNode(ASYNC);
-                if ((checkpoint == null || Objects.equal(checkpoint,
-                        async.getString(name)))
-                        && lease == async.getLong(leasify(name))) {
+                if ((checkpoint == null || Objects.equal(checkpoint, 
async.getString(name)))
+                    &&
+                    (lease == null      || lease == 
async.getLong(leasify(name)))) {
                     return after;
                 } else {
                     throw CONCURRENT_UPDATE;
@@ -1217,7 +1239,7 @@ public class AsyncIndexUpdate implements
             this.newIndexTaskName = newIndexTaskName;
         }
 
-        void maybeSplit(@CheckForNull String refCheckpoint, long lease)
+        void maybeSplit(@CheckForNull String refCheckpoint, Long lease)
                 throws CommitFailedException {
             if (paths == null) {
                 return;
@@ -1225,7 +1247,7 @@ public class AsyncIndexUpdate implements
             split(refCheckpoint, lease);
         }
 
-        private void split(@CheckForNull String refCheckpoint, long lease) 
throws CommitFailedException {
+        private void split(@CheckForNull String refCheckpoint, Long lease) 
throws CommitFailedException {
             NodeBuilder builder = store.getRoot().builder();
             if (refCheckpoint != null) {
                 String tempCpName = getTempCpName(name);

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
 Sun Nov 20 16:29:18 2016
@@ -36,6 +36,7 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
 import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardIndexEditorProvider;
@@ -97,6 +98,11 @@ public class AsyncIndexerService {
 
         long leaseTimeOutMin = 
PropertiesUtil.toInteger(config.get(PROP_LEASE_TIME_OUT), 
PROP_LEASE_TIMEOUT_DEFAULT);
 
+        if (!(nodeStore instanceof Clusterable)){
+            leaseTimeOutMin = 0;
+            log.info("Detected non clusterable setup. Lease checking would be 
disabled for async indexing");
+        }
+
         for (AsyncConfig c : asyncIndexerConfig) {
             AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore, 
indexEditorProvider,
                     statisticsProvider, false);

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
 Sun Nov 20 16:29:18 2016
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nonnull;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import 
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdateTest.CommitInfoCollector;
@@ -35,12 +37,12 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -53,19 +55,13 @@ public class AsyncIndexerServiceTest {
     @Rule
     public final OsgiContext context = new OsgiContext();
 
-    private MemoryNodeStore nodeStore = new MemoryNodeStore();
+    private MemoryNodeStore nodeStore = new FakeClusterableMemoryNodeStore();
     private AsyncIndexerService service = new AsyncIndexerService();
 
-    @Before
-    public void setUp() {
-        context.registerService(StatisticsProvider.class, 
StatisticsProvider.NOOP);
-        context.registerService(NodeStore.class, nodeStore);
-        context.registerService(ValidatorProvider.class, new 
ChangeCollectorProvider());
-        MockOsgi.injectServices(service, context.bundleContext());
-    }
 
     @Test
     public void asyncReg() throws Exception{
+        injectDefaultServices();
         Map<String,Object> config = ImmutableMap.<String, Object>of(
                 "asyncConfigs", new String[] {"async:5"}
         );
@@ -79,6 +75,7 @@ public class AsyncIndexerServiceTest {
 
     @Test
     public void leaseTimeout() throws Exception{
+        injectDefaultServices();
         Map<String,Object> config = ImmutableMap.<String, Object>of(
                 "asyncConfigs", new String[] {"async:5"},
                 "leaseTimeOutMinutes" , "20"
@@ -90,6 +87,7 @@ public class AsyncIndexerServiceTest {
 
     @Test
     public void changeCollectionEnabled() throws Exception{
+        injectDefaultServices();
         Map<String,Object> config = ImmutableMap.<String, Object>of(
                 "asyncConfigs", new String[] {"async:5"}
         );
@@ -117,8 +115,18 @@ public class AsyncIndexerServiceTest {
         assertNotNull(changeSet);
     }
 
-    private AsyncIndexUpdate getIndexUpdate(String name) {
-        return (AsyncIndexUpdate) context.getServices(Runnable.class, 
"(oak.async="+name+")")[0];
+    @Test
+    public void nonClusterableNodeStoreAndLeaseTimeout() throws Exception{
+        nodeStore = new MemoryNodeStore();
+        injectDefaultServices();
+
+        Map<String,Object> config = ImmutableMap.<String, Object>of(
+                "asyncConfigs", new String[] {"async:5"},
+                "leaseTimeOutMinutes" , "20"
+        );
+        MockOsgi.activate(service, context.bundleContext(), config);
+        AsyncIndexUpdate indexUpdate = getIndexUpdate("async");
+        assertEquals(0, indexUpdate.getLeaseTimeOut());
     }
 
     @Test
@@ -133,4 +141,23 @@ public class AsyncIndexerServiceTest {
         assertEquals("foo", configs.get(1).name);
         assertEquals(23, configs.get(1).timeIntervalInSecs);
     }
+
+    private void injectDefaultServices() {
+        context.registerService(StatisticsProvider.class, 
StatisticsProvider.NOOP);
+        context.registerService(NodeStore.class, nodeStore);
+        context.registerService(ValidatorProvider.class, new 
ChangeCollectorProvider());
+        MockOsgi.injectServices(service, context.bundleContext());
+    }
+
+    private AsyncIndexUpdate getIndexUpdate(String name) {
+        return (AsyncIndexUpdate) context.getServices(Runnable.class, 
"(oak.async="+name+")")[0];
+    }
+
+    private static class FakeClusterableMemoryNodeStore extends 
MemoryNodeStore implements Clusterable {
+        @Nonnull
+        @Override
+        public String getInstanceId() {
+            return "foo";
+        }
+    }
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1770569&r1=1770568&r2=1770569&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
 Sun Nov 20 16:29:18 2016
@@ -22,6 +22,7 @@ import static org.apache.jackrabbit.oak.
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -349,6 +350,52 @@ public class AsyncIndexUpdateLeaseTest e
                 .setLeaseTimeOut(lease));
     }
 
+
+    @Test
+    public void testLeaseDisabled() throws Exception {
+        // take care of initial reindex before
+        AsyncIndexUpdate async = new AsyncIndexUpdate(name, store, 
provider).setLeaseTimeOut(0);
+        async.run();
+
+        testContent(store);
+        assertRunOk(async);
+
+        testContent(store);
+        assertRunOk(async);
+
+        executed.set(true);
+    }
+
+    @Test
+    public void testLeaseExpiredToDisabled() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        // add extra indexed content
+        testContent(store);
+
+        // make it look like lease got stuck due to force shutdown
+        NodeBuilder builder = store.getRoot().builder();
+        builder.getChildNode(AsyncIndexUpdate.ASYNC).setProperty(
+                AsyncIndexUpdate.leasify(name),
+                System.currentTimeMillis() + 500000);
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void postIndexUpdate() {
+                executed.set(true);
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+                .setLeaseTimeOut(0));
+
+        assertFalse("Stale lease info must be cleaned",
+                store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                        .hasProperty(AsyncIndexUpdate.leasify(name)));
+    }
+
     // -------------------------------------------------------------------
 
     private static String getReferenceCp(NodeStore store, String name) {


Reply via email to