Author: thomasm Date: Wed Apr 8 07:41:19 2020 New Revision: 1876276 URL: http://svn.apache.org/viewvc?rev=1876276&view=rev Log: OAK-8997 Index importer: ClusterNodeStoreLock needs a retry logic
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java?rev=1876276&r1=1876275&r2=1876276&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLock.java Wed Apr 8 07:41:19 2020 @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugins.index.importer; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -41,6 +42,8 @@ public class ClusterNodeStoreLock implem * in between the import process which can take some time */ private static final long LOCK_TIMEOUT = TimeUnit.DAYS.toMillis(100); + // retry for at most 2 minutes + private static final long MAX_RETRY_TIME = 2 * 60 * 1000; private final Logger log = LoggerFactory.getLogger(getClass()); private final NodeStore nodeStore; private final Clock clock; @@ -56,6 +59,10 @@ public class ClusterNodeStoreLock implem @Override public ClusteredLockToken lock(String asyncIndexerLane) throws CommitFailedException { + return retryIfNeeded(() -> tryLock(asyncIndexerLane)); + } + + private ClusteredLockToken tryLock(String asyncIndexerLane) throws CommitFailedException { NodeBuilder builder = nodeStore.getRoot().builder(); NodeBuilder async = builder.child(":async"); @@ -67,7 +74,6 @@ public class ClusterNodeStoreLock implem "commit to fail. Such a failure should be ignored"); } - //TODO Attempt few times if merge failure due to current running indexer cycle async.setProperty(leaseName, leaseEndTime); async.setProperty(lockName(asyncIndexerLane), true); NodeStoreUtils.mergeWithConcurrentCheck(nodeStore, builder); @@ -76,9 +82,42 @@ public class ClusterNodeStoreLock implem return new ClusteredLockToken(asyncIndexerLane, leaseEndTime); } + + private <T> T retryIfNeeded(Callable<T> r) throws CommitFailedException { + // Attempt few times if merge failure due to current running indexer cycle + int backOffMaxMillis = 1; + long start = System.currentTimeMillis(); + while (true) { + try { + return r.call(); + } catch (Exception e) { + log.info("Commit failed, retrying: " + e); + long time = System.currentTimeMillis() - start; + if (time > MAX_RETRY_TIME) { + if (e instanceof CommitFailedException) { + throw (CommitFailedException) e; + } + log.error("Unexpected failure retrying", e); + throw new CommitFailedException(CommitFailedException.UNSUPPORTED, 2, e.getMessage(), e); + } + int sleep = (int) (backOffMaxMillis * Math.random()); + backOffMaxMillis *= 2; + log.info("Wait " + sleep + " ms"); + try { + Thread.sleep(sleep); + } catch (InterruptedException e1) { + // ignore + } + } + } + } @Override public void unlock(ClusteredLockToken token) throws CommitFailedException { + retryIfNeeded(() -> tryUnlock(token)); + } + + private Void tryUnlock(ClusteredLockToken token) throws CommitFailedException { String leaseName = AsyncIndexUpdate.leasify(token.laneName); NodeBuilder builder = nodeStore.getRoot().builder(); @@ -87,6 +126,7 @@ public class ClusterNodeStoreLock implem async.removeProperty(lockName(token.laneName)); NodeStoreUtils.mergeWithConcurrentCheck(nodeStore, builder); log.info("Remove the lock for async indexer lane [{}]", token.laneName); + return null; } public boolean isLocked(String asyncIndexerLane) { Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java?rev=1876276&r1=1876275&r2=1876276&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/ClusterNodeStoreLockTest.java Wed Apr 8 07:41:19 2020 @@ -19,10 +19,18 @@ package org.apache.jackrabbit.oak.plugins.index.importer; -import com.google.common.collect.ImmutableSet; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate; import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider; -import org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken; import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -33,10 +41,7 @@ import org.apache.jackrabbit.oak.spi.sta import org.junit.Before; import org.junit.Test; -import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; -import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; -import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition; -import static org.junit.Assert.*; +import com.google.common.collect.ImmutableSet; public class ClusterNodeStoreLockTest { private NodeStore store = new MemoryNodeStore(); @@ -53,6 +58,36 @@ public class ClusterNodeStoreLockTest { builder.child("testRoot").setProperty("foo", "abc"); store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); } + + @Test + public void lockConcurrently() throws Exception { + final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>()); + ArrayList<Thread> threads = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < 100; i++) { + try { + ClusterNodeStoreLock lock = new ClusterNodeStoreLock(store); + ClusteredLockToken token = lock.lock("async"); + lock.unlock(token); + } catch (Throwable e) { + exceptions.add(e); + } + } + } + }); + t.start(); + threads.add(t); + } + for(Thread t : threads) { + t.join(); + } + if (!exceptions.isEmpty()) { + throw new RuntimeException(exceptions.get(0)); + } + } @Test public void locking() throws Exception{