Author: mduerig
Date: Wed Jul  8 10:31:38 2015
New Revision: 1689834

URL: http://svn.apache.org/r1689834
Log:
OAK-2849: Improve revision gc on SegmentMK
SegmentCompactionIT improvement: immediately stop operations

Modified:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java?rev=1689834&r1=1689833&r2=1689834&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
 Wed Jul  8 10:31:38 2015
@@ -26,6 +26,7 @@ import static com.google.common.util.con
 import static 
com.google.common.util.concurrent.Futures.immediateCancelledFuture;
 import static 
com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
 import static java.io.File.createTempFile;
+import static java.lang.Integer.MAX_VALUE;
 import static java.lang.String.valueOf;
 import static java.lang.System.getProperty;
 import static java.util.concurrent.TimeUnit.MINUTES;
@@ -117,7 +118,7 @@ public class SegmentCompactionIT {
     private final TestGCMonitor gcMonitor = new 
TestGCMonitor(fileStoreGCMonitor);
     private final Set<ListenableScheduledFuture<?>> writers = 
newConcurrentHashSet();
     private final Set<ListenableScheduledFuture<?>> readers = 
newConcurrentHashSet();
-    private final Set<Reference> references = newConcurrentHashSet();
+    private final Set<ListenableScheduledFuture<?>> references = 
newConcurrentHashSet();
     private final SegmentCompactionITMBean segmentCompactionMBean = new 
SegmentCompactionITMBean();
     private final CompactionStrategy compactionStrategy = new 
CompactionStrategy(
             false, false, CLEAN_OLD, 60000, MEMORY_THRESHOLD_DEFAULT) {
@@ -179,15 +180,11 @@ public class SegmentCompactionIT {
     }
 
     public void removeReferences(int count) {
-        Iterator<Reference> it = references.iterator();
-        while (it.hasNext() && count-- > 0) {
-            it.next().run();
-            it.remove();
-        }
+        remove(references, count);
     }
 
-    private static void remove(Set<ListenableScheduledFuture<?>> futures, int 
count) {
-        Iterator<ListenableScheduledFuture<?>> it = futures.iterator();
+    private static void remove(Set<ListenableScheduledFuture<?>> ops, int 
count) {
+        Iterator<ListenableScheduledFuture<?>> it = ops.iterator();
         while (it.hasNext() && count-- > 0) {
             it.next().cancel(false);
         }
@@ -243,6 +240,9 @@ public class SegmentCompactionIT {
             if (mBeanRegistration != null) {
                 mBeanRegistration.unregister();
             }
+            remove(writers, MAX_VALUE);
+            remove(readers, MAX_VALUE);
+            remove(references, MAX_VALUE);
             scheduler.shutdown();
             if (fileStore != null) {
                 fileStore.close();
@@ -297,22 +297,23 @@ public class SegmentCompactionIT {
 
     private void scheduleWriter() {
         if (writers.size() < maxWriters) {
-            final ListenableScheduledFuture<Void> writer = scheduler.schedule(
-                    new RandomWriter(rnd, nodeStore, rnd.nextInt(maxWriteOps), 
"W" + rnd.nextInt(5)),
-                    rnd.nextInt(30), SECONDS);
-            writers.add(writer);
-            addCallback(writer, new FutureCallback<Void>() {
+            final RandomWriter writer = new RandomWriter(rnd, nodeStore, 
rnd.nextInt(maxWriteOps), "W" + rnd.nextInt(5));
+            final ListenableScheduledFuture<Void> futureWriter = 
scheduler.schedule(
+                    writer, rnd.nextInt(30), SECONDS);
+            writers.add(futureWriter);
+            addCallback(futureWriter, new FutureCallback<Void>() {
                 @Override
                 public void onSuccess(Void result) {
-                    writers.remove(writer);
-                    if (!writer.isCancelled()) {
+                    writers.remove(futureWriter);
+                    if (!futureWriter.isCancelled()) {
                         scheduleWriter();
                     }
                 }
 
                 @Override
                 public void onFailure(Throwable t) {
-                    writers.remove(writer);
+                    writer.cancel();
+                    writers.remove(futureWriter);
                     segmentCompactionMBean.error("Writer error", t);
                 }
             });
@@ -321,15 +322,17 @@ public class SegmentCompactionIT {
 
     private void scheduleReader() {
         if (readers.size() < maxReaders) {
-            final ListenableScheduledFuture<?> reader = rnd.nextBoolean()
-                ? scheduler.schedule(new RandomNodeReader(rnd, nodeStore), 
rnd.nextInt(30), SECONDS)
-                : scheduler.schedule(new RandomPropertyReader(rnd, nodeStore), 
rnd.nextInt(30), SECONDS);
-            readers.add(reader);
-            addCallback(reader, new FutureCallback<Object>() {
+            final RandomReader<?> reader = rnd.nextBoolean()
+                ? new RandomNodeReader(rnd, nodeStore)
+                : new RandomPropertyReader(rnd, nodeStore);
+            final ListenableScheduledFuture<?> futureReader = 
scheduler.schedule(
+                    reader, rnd.nextInt(30), SECONDS);
+            readers.add(futureReader);
+            addCallback(futureReader, new FutureCallback<Object>() {
                 @Override
                 public void onSuccess(Object node) {
-                    readers.remove(reader);
-                    if (!reader.isCancelled()) {
+                    readers.remove(futureReader);
+                    if (!futureReader.isCancelled()) {
                         if (rnd.nextBoolean()) {
                             scheduleReference(node);
                         } else {
@@ -340,7 +343,8 @@ public class SegmentCompactionIT {
 
                 @Override
                 public void onFailure(Throwable t) {
-                    readers.remove(reader);
+                    reader.cancel();
+                    readers.remove(futureReader);
                     segmentCompactionMBean.error("Node reader error", t);
                 }
             });
@@ -350,20 +354,21 @@ public class SegmentCompactionIT {
     private void scheduleReference(Object object) {
         if (references.size() < maxReferences) {
             final Reference reference = new Reference(object);
-            final ListenableScheduledFuture<?> ref = scheduler.schedule(
+            final ListenableScheduledFuture<?> futureReference = 
scheduler.schedule(
                     reference, rnd.nextInt(600), SECONDS);
-            references.add(reference);
-            addCallback(ref, new FutureCallback<Object>() {
+            references.add(futureReference);
+            addCallback(futureReference, new FutureCallback<Object>() {
                 @Override
                 public void onSuccess(Object result) {
                     references.remove(reference);
-                    if (!ref.isCancelled()) {
+                    if (!futureReference.isCancelled()) {
                         scheduleReader();
                     }
                 }
 
                 @Override
                 public void onFailure(Throwable t) {
+                    reference.run();
                     references.remove(reference);
                     segmentCompactionMBean.error("Reference error", t);
                 }
@@ -379,6 +384,8 @@ public class SegmentCompactionIT {
         private final int opCount;
         private final String itemPrefix;
 
+        private volatile boolean cancelled;
+
         RandomWriter(Random rnd, NodeStore nodeStore, int opCount, String 
itemPrefix) {
             this.rnd = rnd;
             this.nodeStore = nodeStore;
@@ -386,13 +393,20 @@ public class SegmentCompactionIT {
             this.itemPrefix = itemPrefix;
         }
 
+
+        public void cancel() {
+            cancelled = true;
+        }
+
         @Override
         public Void call() throws IOException, CommitFailedException {
             NodeBuilder root = nodeStore.getRoot().builder();
-            for (int k = 0; k < opCount; k++) {
+            for (int k = 0; k < opCount && !cancelled; k++) {
                 modify(nodeStore, root);
             }
-            nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            if (!cancelled) {
+                nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            }
             return null;
         }
 
@@ -495,11 +509,17 @@ public class SegmentCompactionIT {
         protected final Random rnd;
         protected final NodeStore nodeStore;
 
+        protected volatile boolean cancelled;
+
         RandomReader(Random rnd, NodeStore nodeStore) {
             this.rnd = rnd;
             this.nodeStore = nodeStore;
         }
 
+        public void cancel() {
+            cancelled = true;
+        }
+
         private NodeState randomStep(NodeState parent, NodeState node) {
             int count = (int) node.getChildNodeCount(Long.MAX_VALUE);
             int k = rnd.nextInt(count + 1);
@@ -513,7 +533,7 @@ public class SegmentCompactionIT {
 
         protected final NodeState chooseRandomNode(NodeState parent) {
             NodeState child = parent;
-            for (int k = 0; k < rnd.nextInt(1000); k++) {
+            for (int k = 0; k < rnd.nextInt(1000) && !cancelled; k++) {
                 child = randomStep(parent, parent = child);
             }
             return child;


Reply via email to