Author: mduerig
Date: Wed Jul 8 10:31:38 2015
New Revision: 1689834
URL: http://svn.apache.org/r1689834
Log:
OAK-2849: Improve revision gc on SegmentMK
SegmentCompactionIT improvement: immediately stop operations
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java?rev=1689834&r1=1689833&r2=1689834&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentCompactionIT.java
Wed Jul 8 10:31:38 2015
@@ -26,6 +26,7 @@ import static com.google.common.util.con
import static
com.google.common.util.concurrent.Futures.immediateCancelledFuture;
import static
com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.io.File.createTempFile;
+import static java.lang.Integer.MAX_VALUE;
import static java.lang.String.valueOf;
import static java.lang.System.getProperty;
import static java.util.concurrent.TimeUnit.MINUTES;
@@ -117,7 +118,7 @@ public class SegmentCompactionIT {
private final TestGCMonitor gcMonitor = new
TestGCMonitor(fileStoreGCMonitor);
private final Set<ListenableScheduledFuture<?>> writers =
newConcurrentHashSet();
private final Set<ListenableScheduledFuture<?>> readers =
newConcurrentHashSet();
- private final Set<Reference> references = newConcurrentHashSet();
+ private final Set<ListenableScheduledFuture<?>> references =
newConcurrentHashSet();
private final SegmentCompactionITMBean segmentCompactionMBean = new
SegmentCompactionITMBean();
private final CompactionStrategy compactionStrategy = new
CompactionStrategy(
false, false, CLEAN_OLD, 60000, MEMORY_THRESHOLD_DEFAULT) {
@@ -179,15 +180,11 @@ public class SegmentCompactionIT {
}
public void removeReferences(int count) {
- Iterator<Reference> it = references.iterator();
- while (it.hasNext() && count-- > 0) {
- it.next().run();
- it.remove();
- }
+ remove(references, count);
}
- private static void remove(Set<ListenableScheduledFuture<?>> futures, int
count) {
- Iterator<ListenableScheduledFuture<?>> it = futures.iterator();
+ private static void remove(Set<ListenableScheduledFuture<?>> ops, int
count) {
+ Iterator<ListenableScheduledFuture<?>> it = ops.iterator();
while (it.hasNext() && count-- > 0) {
it.next().cancel(false);
}
@@ -243,6 +240,9 @@ public class SegmentCompactionIT {
if (mBeanRegistration != null) {
mBeanRegistration.unregister();
}
+ remove(writers, MAX_VALUE);
+ remove(readers, MAX_VALUE);
+ remove(references, MAX_VALUE);
scheduler.shutdown();
if (fileStore != null) {
fileStore.close();
@@ -297,22 +297,23 @@ public class SegmentCompactionIT {
private void scheduleWriter() {
if (writers.size() < maxWriters) {
- final ListenableScheduledFuture<Void> writer = scheduler.schedule(
- new RandomWriter(rnd, nodeStore, rnd.nextInt(maxWriteOps),
"W" + rnd.nextInt(5)),
- rnd.nextInt(30), SECONDS);
- writers.add(writer);
- addCallback(writer, new FutureCallback<Void>() {
+ final RandomWriter writer = new RandomWriter(rnd, nodeStore,
rnd.nextInt(maxWriteOps), "W" + rnd.nextInt(5));
+ final ListenableScheduledFuture<Void> futureWriter =
scheduler.schedule(
+ writer, rnd.nextInt(30), SECONDS);
+ writers.add(futureWriter);
+ addCallback(futureWriter, new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
- writers.remove(writer);
- if (!writer.isCancelled()) {
+ writers.remove(futureWriter);
+ if (!futureWriter.isCancelled()) {
scheduleWriter();
}
}
@Override
public void onFailure(Throwable t) {
- writers.remove(writer);
+ writer.cancel();
+ writers.remove(futureWriter);
segmentCompactionMBean.error("Writer error", t);
}
});
@@ -321,15 +322,17 @@ public class SegmentCompactionIT {
private void scheduleReader() {
if (readers.size() < maxReaders) {
- final ListenableScheduledFuture<?> reader = rnd.nextBoolean()
- ? scheduler.schedule(new RandomNodeReader(rnd, nodeStore),
rnd.nextInt(30), SECONDS)
- : scheduler.schedule(new RandomPropertyReader(rnd, nodeStore),
rnd.nextInt(30), SECONDS);
- readers.add(reader);
- addCallback(reader, new FutureCallback<Object>() {
+ final RandomReader<?> reader = rnd.nextBoolean()
+ ? new RandomNodeReader(rnd, nodeStore)
+ : new RandomPropertyReader(rnd, nodeStore);
+ final ListenableScheduledFuture<?> futureReader =
scheduler.schedule(
+ reader, rnd.nextInt(30), SECONDS);
+ readers.add(futureReader);
+ addCallback(futureReader, new FutureCallback<Object>() {
@Override
public void onSuccess(Object node) {
- readers.remove(reader);
- if (!reader.isCancelled()) {
+ readers.remove(futureReader);
+ if (!futureReader.isCancelled()) {
if (rnd.nextBoolean()) {
scheduleReference(node);
} else {
@@ -340,7 +343,8 @@ public class SegmentCompactionIT {
@Override
public void onFailure(Throwable t) {
- readers.remove(reader);
+ reader.cancel();
+ readers.remove(futureReader);
segmentCompactionMBean.error("Node reader error", t);
}
});
@@ -350,20 +354,21 @@ public class SegmentCompactionIT {
private void scheduleReference(Object object) {
if (references.size() < maxReferences) {
final Reference reference = new Reference(object);
- final ListenableScheduledFuture<?> ref = scheduler.schedule(
+ final ListenableScheduledFuture<?> futureReference =
scheduler.schedule(
reference, rnd.nextInt(600), SECONDS);
- references.add(reference);
- addCallback(ref, new FutureCallback<Object>() {
+ references.add(futureReference);
+ addCallback(futureReference, new FutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
references.remove(reference);
- if (!ref.isCancelled()) {
+ if (!futureReference.isCancelled()) {
scheduleReader();
}
}
@Override
public void onFailure(Throwable t) {
+ reference.run();
references.remove(reference);
segmentCompactionMBean.error("Reference error", t);
}
@@ -379,6 +384,8 @@ public class SegmentCompactionIT {
private final int opCount;
private final String itemPrefix;
+ private volatile boolean cancelled;
+
RandomWriter(Random rnd, NodeStore nodeStore, int opCount, String
itemPrefix) {
this.rnd = rnd;
this.nodeStore = nodeStore;
@@ -386,13 +393,20 @@ public class SegmentCompactionIT {
this.itemPrefix = itemPrefix;
}
+
+ public void cancel() {
+ cancelled = true;
+ }
+
@Override
public Void call() throws IOException, CommitFailedException {
NodeBuilder root = nodeStore.getRoot().builder();
- for (int k = 0; k < opCount; k++) {
+ for (int k = 0; k < opCount && !cancelled; k++) {
modify(nodeStore, root);
}
- nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ if (!cancelled) {
+ nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
return null;
}
@@ -495,11 +509,17 @@ public class SegmentCompactionIT {
protected final Random rnd;
protected final NodeStore nodeStore;
+ protected volatile boolean cancelled;
+
RandomReader(Random rnd, NodeStore nodeStore) {
this.rnd = rnd;
this.nodeStore = nodeStore;
}
+ public void cancel() {
+ cancelled = true;
+ }
+
private NodeState randomStep(NodeState parent, NodeState node) {
int count = (int) node.getChildNodeCount(Long.MAX_VALUE);
int k = rnd.nextInt(count + 1);
@@ -513,7 +533,7 @@ public class SegmentCompactionIT {
protected final NodeState chooseRandomNode(NodeState parent) {
NodeState child = parent;
- for (int k = 0; k < rnd.nextInt(1000); k++) {
+ for (int k = 0; k < rnd.nextInt(1000) && !cancelled; k++) {
child = randomStep(parent, parent = child);
}
return child;