Author: thomasm
Date: Wed Apr  8 07:41:19 2020
New Revision: 1876276

URL: http://svn.apache.org/viewvc?rev=1876276&view=rev
Log:
OAK-8997 Index importer: ClusterNodeStoreLock needs a retry logic

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java?rev=1876276&r1=1876275&r2=1876276&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java
 Wed Apr  8 07:41:19 2020
@@ -19,6 +19,7 @@
 
 package org.apache.jackrabbit.oak.plugins.index.importer;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -41,6 +42,8 @@ public class ClusterNodeStoreLock implem
      * in between the import process which can take some time
      */
     private static final long LOCK_TIMEOUT = TimeUnit.DAYS.toMillis(100);
+    // retry for at most 2 minutes
+    private static final long MAX_RETRY_TIME = 2 * 60 * 1000;
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final NodeStore nodeStore;
     private final Clock clock;
@@ -56,6 +59,10 @@ public class ClusterNodeStoreLock implem
 
     @Override
     public ClusteredLockToken lock(String asyncIndexerLane) throws 
CommitFailedException {
+        return retryIfNeeded(() -> tryLock(asyncIndexerLane));
+    }
+
+    private ClusteredLockToken tryLock(String asyncIndexerLane) throws 
CommitFailedException {
         NodeBuilder builder = nodeStore.getRoot().builder();
         NodeBuilder async = builder.child(":async");
 
@@ -67,7 +74,6 @@ public class ClusterNodeStoreLock implem
                     "commit to fail. Such a failure should be ignored");
         }
 
-        //TODO Attempt few times if merge failure due to current running 
indexer cycle
         async.setProperty(leaseName, leaseEndTime);
         async.setProperty(lockName(asyncIndexerLane), true);
         NodeStoreUtils.mergeWithConcurrentCheck(nodeStore, builder);
@@ -76,9 +82,42 @@ public class ClusterNodeStoreLock implem
 
         return new ClusteredLockToken(asyncIndexerLane, leaseEndTime);
     }
+    
+    private <T> T retryIfNeeded(Callable<T> r) throws CommitFailedException {
+        // Attempt few times if merge failure due to current running indexer 
cycle
+        int backOffMaxMillis = 1;
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                return r.call();
+            } catch (Exception e) {
+                log.info("Commit failed, retrying: " + e);
+                long time = System.currentTimeMillis() - start;
+                if (time > MAX_RETRY_TIME) {
+                    if (e instanceof CommitFailedException) {
+                        throw (CommitFailedException) e;
+                    }
+                    log.error("Unexpected failure retrying", e);
+                    throw new 
CommitFailedException(CommitFailedException.UNSUPPORTED, 2, e.getMessage(), e);
+                }
+                int sleep = (int) (backOffMaxMillis * Math.random());
+                backOffMaxMillis *= 2;
+                log.info("Wait " + sleep + " ms");
+                try {
+                    Thread.sleep(sleep);
+                } catch (InterruptedException e1) {
+                    // ignore
+                }
+            }
+        }
+    }
 
     @Override
     public void unlock(ClusteredLockToken token) throws CommitFailedException {
+        retryIfNeeded(() -> tryUnlock(token));        
+    }
+    
+    private Void tryUnlock(ClusteredLockToken token) throws 
CommitFailedException {
         String leaseName = AsyncIndexUpdate.leasify(token.laneName);
 
         NodeBuilder builder = nodeStore.getRoot().builder();
@@ -87,6 +126,7 @@ public class ClusterNodeStoreLock implem
         async.removeProperty(lockName(token.laneName));
         NodeStoreUtils.mergeWithConcurrentCheck(nodeStore, builder);
         log.info("Remove the lock for async indexer lane [{}]", 
token.laneName);
+        return null;
     }
 
     public boolean isLocked(String asyncIndexerLane) {

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java?rev=1876276&r1=1876275&r2=1876276&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java
 Wed Apr  8 07:41:19 2020
@@ -19,10 +19,18 @@
 
 package org.apache.jackrabbit.oak.plugins.index.importer;
 
-import com.google.common.collect.ImmutableSet;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
-import 
org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken;
 import 
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -33,10 +41,7 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.junit.Before;
 import org.junit.Test;
 
-import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
-import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
-import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
-import static org.junit.Assert.*;
+import com.google.common.collect.ImmutableSet;
 
 public class ClusterNodeStoreLockTest {
     private NodeStore store = new MemoryNodeStore();
@@ -53,6 +58,36 @@ public class ClusterNodeStoreLockTest {
         builder.child("testRoot").setProperty("foo", "abc");
         store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
     }
+    
+    @Test
+    public void lockConcurrently() throws Exception {
+        final List<Throwable> exceptions = Collections.synchronizedList(new 
ArrayList<>());
+        ArrayList<Thread> threads = new ArrayList<>();
+        for (int j = 0; j < 100; j++) {
+            Thread t = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    for (int i = 0; i < 100; i++) {
+                        try {
+                            ClusterNodeStoreLock lock = new 
ClusterNodeStoreLock(store);
+                            ClusteredLockToken token = lock.lock("async");
+                            lock.unlock(token);
+                        } catch (Throwable e) {
+                            exceptions.add(e);
+                        }
+                    }
+                }
+            });
+            t.start();
+            threads.add(t);
+        }
+        for(Thread t : threads) {
+            t.join();
+        }
+        if (!exceptions.isEmpty()) {
+            throw new RuntimeException(exceptions.get(0));
+        }
+    }
 
     @Test
     public void locking() throws Exception{


Reply via email to