Author: mduerig Date: Mon Sep 7 15:21:30 2015 New Revision: 1701635 URL: http://svn.apache.org/r1701635 Log: OAK-2849: Improve revision gc on SegmentMK Add option to run compaction exclusively wrt. concurrent writers
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java?rev=1701635&r1=1701634&r2=1701635&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java Mon Sep 7 15:21:30 2015 @@ -56,6 +56,8 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanRegistrationException; @@ -139,6 +141,7 @@ public class SegmentCompactionIT { private Registration mBeanRegistration; private volatile ListenableFuture<?> compactor = immediateCancelledFuture(); + private volatile ReadWriteLock compactionLock = null; private volatile int lockWaitTime = 60; private volatile int maxReaders = 10; private volatile int maxWriters = 10; @@ -428,23 +431,42 @@ public class SegmentCompactionIT { cancelled = true; } - @Override - public Void call() throws IOException, CommitFailedException { - NodeBuilder root = nodeStore.getRoot().builder(); - for (int k = 0; k < opCount && !cancelled; k++) { - modify(nodeStore, root); - } - if (!cancelled) { + private <T> T run(Callable<T> thunk) throws Exception { + ReadWriteLock lock = compactionLock; + if (lock != null) { + lock.readLock().lock(); try { - CommitHook commitHook = rnd.nextBoolean() - ? new CompositeHook(new ConflictHook(DefaultConflictHandler.OURS)) - : new CompositeHook(new ConflictHook(DefaultConflictHandler.THEIRS)); - nodeStore.merge(root, commitHook, CommitInfo.EMPTY); - } catch (CommitFailedException e) { - LOG.warn("Commit failed: {}", e.getMessage()); + return thunk.call(); + } finally { + lock.readLock().unlock(); } + } else { + return thunk.call(); } - return null; + } + + @Override + public Void call() throws Exception { + return run(new Callable<Void>() { + @Override + public Void call() throws Exception { + NodeBuilder root = nodeStore.getRoot().builder(); + for (int k = 0; k < opCount && !cancelled; k++) { + modify(nodeStore, root); + } + if (!cancelled) { + try { + CommitHook commitHook = rnd.nextBoolean() + ? new CompositeHook(new ConflictHook(DefaultConflictHandler.OURS)) + : new CompositeHook(new ConflictHook(DefaultConflictHandler.THEIRS)); + nodeStore.merge(root, commitHook, CommitInfo.EMPTY); + } catch (CommitFailedException e) { + LOG.warn("Commit failed: {}", e.getMessage()); + } + } + return null; + } + }); } private void modify(NodeStore nodeStore, NodeBuilder nodeBuilder) throws IOException { @@ -621,7 +643,7 @@ public class SegmentCompactionIT { } } - private static class Compactor implements Runnable { + private class Compactor implements Runnable { private final FileStore fileStore; private final TestGCMonitor gcMonitor; @@ -630,12 +652,36 @@ public class SegmentCompactionIT { this.gcMonitor = gcMonitor; } + private <T> T run(Callable<T> thunk) throws Exception { + ReadWriteLock lock = compactionLock; + if (lock != null) { + lock.writeLock().lock(); + try { + return thunk.call(); + } finally { + lock.writeLock().unlock(); + } + } else { + return thunk.call(); + } + } + @Override public void run() { if (gcMonitor.isCleaned()) { LOG.info("Running compaction"); - gcMonitor.resetCleaned(); - fileStore.maybeCompact(true); + try { + run(new Callable<Void>() { + @Override + public Void call() throws Exception { + gcMonitor.resetCleaned(); + fileStore.maybeCompact(true); + return null; + } + }); + } catch (Exception e) { + LOG.error("Error while running compaction", e); + } } else { LOG.info("Not running compaction as no cleanup has taken place"); } @@ -742,6 +788,20 @@ public class SegmentCompactionIT { } @Override + public void setUseCompactionLock(boolean value) { + if (value && compactionLock == null) { + compactionLock = new ReentrantReadWriteLock(); + } else { + compactionLock = null; + } + } + + @Override + public boolean getUseCompactionLock() { + return compactionLock != null; + } + + @Override public void setLockWaitTime(int seconds) { lockWaitTime = seconds; } Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java?rev=1701635&r1=1701634&r2=1701635&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionMBean.java Mon Sep 7 15:21:30 2015 @@ -60,6 +60,17 @@ public interface SegmentCompactionMBean String getLastCompaction(); /** + * Determine whether to compaction should run exclusively wrt. concurrent writers. + * @param value run compaction exclusively iff {@code true} + */ + void setUseCompactionLock(boolean value); + + /** + * @return Compaction runs exclusively wrt. concurrent writers iff {@code true} + */ + boolean getUseCompactionLock(); + + /** * Time to wait for the commit lock for committing the compacted head. * @param seconds number of seconds to wait * @see SegmentNodeStore#locked(java.util.concurrent.Callable, long, java.util.concurrent.TimeUnit)