Author: alexparvulescu Date: Fri Jun 3 08:27:59 2016 New Revision: 1746686
URL: http://svn.apache.org/viewvc?rev=1746686&view=rev Log: OAK-4279 Rework offline compaction - introduced flags to control binary content based de-duplication Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java?rev=1746686&r1=1746685&r2=1746686&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java Fri Jun 3 08:27:59 2016 @@ -36,6 +36,7 @@ import org.apache.jackrabbit.oak.commons import org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState; import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState; import org.apache.jackrabbit.oak.plugins.memory.PropertyStates; +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.ApplyDiff; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -70,6 +71,17 @@ public class Compactor { private final ProgressTracker progress = new ProgressTracker(); /** + * Enables content based de-duplication of binaries. Involves a fair amount + * of I/O when reading/comparing potentially equal blobs. + */ + private final boolean binaryDedup; + + /** + * Set the upper bound for the content based de-duplication checks. + */ + private final long binaryDedupMaxSize; + + /** * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted blob * record identifiers. Used to de-duplicate copies of the same binary * values. @@ -109,11 +121,13 @@ public class Compactor { cacheSize).get(); public Compactor(SegmentReader reader, SegmentWriter writer, - BlobStore blobStore, Supplier<Boolean> cancel) { + BlobStore blobStore, Supplier<Boolean> cancel, SegmentGCOptions gc) { this.reader = reader; this.writer = writer; this.blobStore = blobStore; this.cancel = cancel; + this.binaryDedup = gc.isBinaryDeduplication(); + this.binaryDedupMaxSize = gc.getBinaryDeduplicationMaxSize(); } private SegmentNodeBuilder process(NodeState before, NodeState after, @@ -311,6 +325,8 @@ public class Compactor { try { // Check if we've already cloned this specific record RecordId id = sb.getRecordId(); + + // TODO verify binary impact on cache RecordId compactedId = cache.get(id); if (compactedId != null) { return new SegmentBlob(blobStore, compactedId); @@ -331,27 +347,37 @@ public class Compactor { return clone; } - // alternatively look if the exact same binary has been cloned - String key = getBlobKey(blob); - List<RecordId> ids = binaries.get(key); - if (ids != null) { - for (RecordId duplicateId : ids) { - if (new SegmentBlob(blobStore, duplicateId).equals(sb)) { - cache.put(id, duplicateId); - return new SegmentBlob(blobStore, duplicateId); + List<RecordId> ids = null; + String key = null; + boolean dedup = binaryDedup + && blob.length() <= binaryDedupMaxSize; + if (dedup) { + // alternatively look if the exact same binary has been + // cloned + key = getBlobKey(blob); + ids = binaries.get(key); + if (ids != null) { + for (RecordId duplicateId : ids) { + if (new SegmentBlob(blobStore, duplicateId) + .equals(sb)) { + cache.put(id, duplicateId); + return new SegmentBlob(blobStore, duplicateId); + } } } } // if not, clone the large blob and keep track of the result sb = writer.writeBlob(blob); - cache.put(id, sb.getRecordId()); - if (ids == null) { - ids = newArrayList(); - binaries.put(key, ids); + + if (dedup) { + if (ids == null) { + ids = newArrayList(); + binaries.put(key, ids); + } + ids.add(sb.getRecordId()); } - ids.add(sb.getRecordId()); return sb; } catch (IOException e) { Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java?rev=1746686&r1=1746685&r2=1746686&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java Fri Jun 3 08:27:59 2016 @@ -84,6 +84,13 @@ public class SegmentGCOptions { private boolean offline = false; + private boolean ocBinDeduplication = Boolean + .getBoolean("oak.segment.compaction.binaryDeduplication"); + + private long ocBinMaxSize = Long.getLong( + "oak.segment.compaction.binaryDeduplicationMaxSize", + 100 * 1024 * 1024); + public SegmentGCOptions(boolean paused, int memoryThreshold, int gainThreshold, int retryCount, boolean forceAfterFail, int lockWaitTime) { this.paused = paused; @@ -240,15 +247,22 @@ public class SegmentGCOptions { @Override public String toString() { - return getClass().getSimpleName() + "{" + - "paused=" + paused + - ", memoryThreshold=" + memoryThreshold + - ", gainThreshold=" + gainThreshold + - ", retryCount=" + retryCount + - ", forceAfterFail=" + forceAfterFail + - ", lockWaitTime=" + lockWaitTime + - ", retainedGenerations=" + retainedGenerations + - ", offline=" + offline + "}"; + if (offline) { + return getClass().getSimpleName() + "{" + + "offline=" + offline + + ", retainedGenerations=" + retainedGenerations + + ", ocBinDeduplication=" + ocBinDeduplication + + ", ocBinMaxSize=" + ocBinMaxSize + "}"; + } else { + return getClass().getSimpleName() + "{" + + "paused=" + paused + + ", memoryThreshold=" + memoryThreshold + + ", gainThreshold=" + gainThreshold + + ", retryCount=" + retryCount + + ", forceAfterFail=" + forceAfterFail + + ", lockWaitTime=" + lockWaitTime + + ", retainedGenerations=" + retainedGenerations + "}"; + } } /** @@ -279,4 +293,35 @@ public class SegmentGCOptions { this.retainedGenerations = 1; return this; } + + /** + * Offline compaction only. Enables content based de-duplication of + * binaries. Involves a fair amount of I/O when reading/comparing + * potentially equal blobs. set via the + * 'oak.segment.compaction.binaryDeduplication' system property + * @return this instance. + */ + public SegmentGCOptions withBinaryDeduplication() { + this.ocBinDeduplication = true; + return this; + } + + public boolean isBinaryDeduplication() { + return this.ocBinDeduplication; + } + + /** + * Offline compaction only. Set the upper bound for the content based + * de-duplication checks. + * @param binMaxSize + * @return this instance + */ + public SegmentGCOptions setBinaryDeduplicationMaxSize(long binMaxSize) { + this.ocBinMaxSize = binMaxSize; + return this; + } + + public long getBinaryDeduplicationMaxSize() { + return this.ocBinMaxSize; + } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1746686&r1=1746685&r2=1746686&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Fri Jun 3 08:27:59 2016 @@ -1141,7 +1141,7 @@ public class FileStore implements Segmen if (gcOptions.isOffline()) { // Capital C to indicate offline compaction SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, tracker, bufferWriter); - return new Compactor(segmentReader, writer, blobStore, cancel) + return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions) .compact(EMPTY_NODE, head, EMPTY_NODE); } else { return segmentWriter.writeNode(head, bufferWriter, cancel); Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1746686&r1=1746685&r2=1746686&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Fri Jun 3 08:27:59 2016 @@ -256,6 +256,219 @@ public class CompactionAndCleanupIT { } } + /** + * Create a lot of data nodes (no binaries) and a few checkpoints, verify + * that compacting checkpoints will not cause the size to explode + */ + @Test + public void offlineCompactionCps() throws IOException, + CommitFailedException { + SegmentGCOptions gcOptions = DEFAULT.setOffline(); + FileStore fileStore = FileStore.builder(getFileStoreFolder()) + .withMaxFileSize(1) + .withGCOptions(gcOptions) + .build(); + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + try { + // Create ~2MB of data + NodeBuilder extra = nodeStore.getRoot().builder(); + NodeBuilder content = extra.child("content"); + for (int i = 0; i < 10000; i++) { + NodeBuilder c = content.child("c" + i); + for (int j = 0; j < 1000; j++) { + c.setProperty("p" + i, "v" + i); + } + } + nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + fileStore.compact(); + fileStore.cleanup(); + // Compacts to 548Kb + long size0 = fileStore.size(); + + int cpNo = 4; + Set<String> cps = new HashSet<String>(); + for (int i = 0; i < cpNo; i++) { + cps.add(nodeStore.checkpoint(60000)); + } + assertEquals(cpNo, cps.size()); + for (String cp : cps) { + assertTrue(nodeStore.retrieve(cp) != null); + } + + long size1 = fileStore.size(); + assertSize("with checkpoints added", size1, size0, size0 * 11 / 10); + fileStore.compact(); + fileStore.cleanup(); + long size2 = fileStore.size(); + assertSize("with checkpoints compacted", size2, size1 * 9/10, size1 * 11 / 10); + } finally { + fileStore.close(); + } + } + + /** + * Create 2 binary nodes with same content but not same reference. Verify + * de-duplication capabilities of compaction. + */ + @Test + public void offlineCompactionBinC1() throws IOException, + CommitFailedException { + SegmentGCOptions gcOptions = DEFAULT.setOffline() + .withBinaryDeduplication(); + FileStore fileStore = FileStore.builder(getFileStoreFolder()) + .withMaxFileSize(1).withGCOptions(gcOptions).build(); + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders + .builder(fileStore).build(); + + try { + NodeBuilder extra = nodeStore.getRoot().builder(); + NodeBuilder content = extra.child("content"); + + int blobSize = 5 * 1024 * 1024; + byte[] data = new byte[blobSize]; + new Random().nextBytes(data); + + NodeBuilder c1 = content.child("c1"); + Blob b1 = nodeStore.createBlob(new ByteArrayInputStream(data)); + c1.setProperty("blob1", b1); + NodeBuilder c2 = content.child("c2"); + Blob b2 = nodeStore.createBlob(new ByteArrayInputStream(data)); + c2.setProperty("blob2", b2); + nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + + int cpNo = 4; + Set<String> cps = new HashSet<String>(); + for (int i = 0; i < cpNo; i++) { + cps.add(nodeStore.checkpoint(60000)); + } + assertEquals(cpNo, cps.size()); + for (String cp : cps) { + assertTrue(nodeStore.retrieve(cp) != null); + } + + long size1 = fileStore.size(); + fileStore.compact(); + fileStore.cleanup(); + long size2 = fileStore.size(); + assertSize("with compacted binaries", size2, 0, size1 - blobSize); + } finally { + fileStore.close(); + } + } + + /** + * Create 2 binary nodes with same content but not same reference. Reduce + * the max size if de-duplicated binaries under the binary length. Verify + * de-duplication capabilities of compaction. + */ + @Test + public void offlineCompactionBinC2() throws IOException, + CommitFailedException { + int blobSize = 5 * 1024 * 1024; + + SegmentGCOptions gcOptions = DEFAULT.setOffline() + .withBinaryDeduplication() + .setBinaryDeduplicationMaxSize(blobSize / 2); + FileStore fileStore = FileStore.builder(getFileStoreFolder()) + .withMaxFileSize(1).withGCOptions(gcOptions).build(); + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders + .builder(fileStore).build(); + + try { + NodeBuilder extra = nodeStore.getRoot().builder(); + NodeBuilder content = extra.child("content"); + + byte[] data = new byte[blobSize]; + new Random().nextBytes(data); + + NodeBuilder c1 = content.child("c1"); + Blob b1 = nodeStore.createBlob(new ByteArrayInputStream(data)); + c1.setProperty("blob1", b1); + NodeBuilder c2 = content.child("c2"); + Blob b2 = nodeStore.createBlob(new ByteArrayInputStream(data)); + c2.setProperty("blob2", b2); + nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + + int cpNo = 4; + Set<String> cps = new HashSet<String>(); + for (int i = 0; i < cpNo; i++) { + cps.add(nodeStore.checkpoint(60000)); + } + assertEquals(cpNo, cps.size()); + for (String cp : cps) { + assertTrue(nodeStore.retrieve(cp) != null); + } + + long size1 = fileStore.size(); + fileStore.compact(); + fileStore.cleanup(); + long size2 = fileStore.size(); + + // not expected to reduce the size too much, as the binaries are + // above the threshold + assertSize("with compacted binaries", size2, size1 * 9 / 10, + size1 * 11 / 10); + } finally { + fileStore.close(); + } + } + + /** + * Create 2 binary nodes with same content and same reference. Verify + * de-duplication capabilities of compaction + */ + @Test + public void offlineCompactionBinR1() throws IOException, + CommitFailedException { + SegmentGCOptions gcOptions = DEFAULT.setOffline(); + FileStore fileStore = FileStore.builder(getFileStoreFolder()) + .withMaxFileSize(1).withGCOptions(gcOptions).build(); + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders + .builder(fileStore).build(); + + try { + NodeBuilder extra = nodeStore.getRoot().builder(); + NodeBuilder content = extra.child("content"); + + int blobSize = 5 * 1024 * 1024; + byte[] data = new byte[blobSize]; + new Random().nextBytes(data); + Blob b = nodeStore.createBlob(new ByteArrayInputStream(data)); + + NodeBuilder c1 = content.child("c1"); + c1.setProperty("blob1", b); + NodeBuilder c2 = content.child("c2"); + c2.setProperty("blob2", b); + nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); + fileStore.flush(); + + int cpNo = 4; + Set<String> cps = new HashSet<String>(); + for (int i = 0; i < cpNo; i++) { + cps.add(nodeStore.checkpoint(60000)); + } + assertEquals(cpNo, cps.size()); + for (String cp : cps) { + assertTrue(nodeStore.retrieve(cp) != null); + } + + // 5Mb, de-duplication by the SegmentWriter + long size1 = fileStore.size(); + fileStore.compact(); + fileStore.cleanup(); + long size2 = fileStore.size(); + assertSize("with compacted binaries", size2, 0, size1 * 11 / 10); + + System.err.println(size2); + + } finally { + fileStore.close(); + } + } + private static void assertSize(String info, long size, long lower, long upper) { log.debug("File Store {} size {}, expected in interval [{},{}]", info, size, lower, upper); Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java?rev=1746686&r1=1746685&r2=1746686&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java Fri Jun 3 08:27:59 2016 @@ -25,8 +25,10 @@ import static org.junit.Assert.assertFal import java.io.IOException; import com.google.common.base.Suppliers; + import org.apache.jackrabbit.oak.Oak; import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.segment.memory.MemoryStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; @@ -53,7 +55,7 @@ public class CompactorTest { SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1); Compactor compactor = new Compactor(memoryStore.getReader(), writer, - memoryStore.getBlobStore(), Suppliers.ofInstance(false)); + memoryStore.getBlobStore(), Suppliers.ofInstance(false), SegmentGCOptions.DEFAULT); addTestContent(store, 0); NodeState initial = store.getRoot(); @@ -75,8 +77,9 @@ public class CompactorTest { NodeStore store = SegmentNodeStoreBuilders.builder(memoryStore).build(); SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1); - Compactor compactor = new Compactor(memoryStore.getReader(), writer, memoryStore.getBlobStore(), - Suppliers.ofInstance(true)); + Compactor compactor = new Compactor(memoryStore.getReader(), writer, + memoryStore.getBlobStore(), Suppliers.ofInstance(true), + SegmentGCOptions.DEFAULT); SegmentNodeState sns = compactor.compact(store.getRoot(), addChild(store.getRoot(), "b"), store.getRoot()); assertFalse(sns.hasChildNode("b"));
