Author: chetanm Date: Thu Dec 21 09:07:31 2017 New Revision: 1818896 URL: http://svn.apache.org/viewvc?rev=1818896&view=rev Log: OAK-7105 - Implement a traverse with sort strategy for DocumentStoreIndexer
Added: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/SortStrategy.java - copied, changed from r1818889, jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java (with props) Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java?rev=1818896&r1=1818895&r2=1818896&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java (original) +++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java Thu Dec 21 09:07:31 2017 @@ -34,7 +34,10 @@ import static com.google.common.collect. public class FlatFileNodeStoreBuilder { public static final String OAK_INDEXER_USE_ZIP = "oak.indexer.useZip"; + private static final String OAK_INDEXER_TRAVERSE_WITH_SORT = "oak.indexer.traverWithSortStrategy"; private static final String OAK_INDEXER_SORTED_FILE_PATH = "oak.indexer.sortedFilePath"; + static final String OAK_INDEXER_MAX_SORT_MEMORY_IN_GB = "oak.indexer.maxSortMemoryInGB"; + static final int OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT = 2; private final Logger log = LoggerFactory.getLogger(getClass()); private final Iterable<NodeStateEntry> nodeStates; private final File workDir; @@ -45,6 +48,8 @@ public class FlatFileNodeStoreBuilder { private long entryCount = 0; private boolean useZip = Boolean.valueOf(System.getProperty(OAK_INDEXER_USE_ZIP, "true")); + //TODO Switch the default + private boolean useTraverseWithSort = Boolean.valueOf(System.getProperty(OAK_INDEXER_TRAVERSE_WITH_SORT, "false")); public FlatFileNodeStoreBuilder(Iterable<NodeStateEntry> nodeStates, File workDir) { this.nodeStates = nodeStates; @@ -88,16 +93,29 @@ public class FlatFileNodeStoreBuilder { } } else { File flatFileStoreDir = createStoreDir(); - StoreAndSortStrategy strategy = new StoreAndSortStrategy(nodeStates, comparator, - entryWriter, flatFileStoreDir, useZip); + SortStrategy strategy = createSortStrategy(flatFileStoreDir); entryCount = strategy.getEntryCount(); return strategy.createSortedStoreFile(); } } + private SortStrategy createSortStrategy(File dir){ + if (useTraverseWithSort) { + log.info("Using TraverseWithSortStrategy"); + return new TraverseWithSortStrategy(nodeStates, comparator, entryWriter, dir, useZip); + } else { + log.info("Using StoreAndSortStrategy"); + return new StoreAndSortStrategy(nodeStates, comparator, entryWriter, dir, useZip); + } + } + private void logFlags() { log.info("Preferred path elements are {}", Iterables.toString(preferredPathElements)); log.info("Compression enabled while sorting : {} ({})", useZip, OAK_INDEXER_USE_ZIP); + + String strategy = useTraverseWithSort ? + TraverseWithSortStrategy.class.getSimpleName() : StoreAndSortStrategy.class.getSimpleName(); + log.info("Sort strategy : {} ({})", strategy, OAK_INDEXER_TRAVERSE_WITH_SORT); } private File createStoreDir() throws IOException { Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java?rev=1818896&r1=1818895&r2=1818896&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java (original) +++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java Thu Dec 21 09:07:31 2017 @@ -30,7 +30,11 @@ class NodeStateEntryHolder { final List<String> pathElements; public NodeStateEntryHolder(String line) { + this(getPath(line), line); + } + + public NodeStateEntryHolder(String path, String line) { + this.pathElements = copyOf(elements(path)); this.line = line; - this.pathElements = copyOf(elements(getPath(line))); } } Copied: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/SortStrategy.java (from r1818889, jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java) URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/SortStrategy.java?p2=jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/SortStrategy.java&p1=jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java&r1=1818889&r2=1818896&rev=1818896&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryHolder.java (original) +++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/SortStrategy.java Thu Dec 21 09:07:31 2017 @@ -19,18 +19,12 @@ package org.apache.jackrabbit.oak.index.indexer.document.flatfile; -import java.util.List; +import java.io.File; +import java.io.IOException; -import static com.google.common.collect.ImmutableList.copyOf; -import static org.apache.jackrabbit.oak.commons.PathUtils.elements; -import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter.getPath; +public interface SortStrategy { -class NodeStateEntryHolder { - final String line; - final List<String> pathElements; + File createSortedStoreFile() throws IOException; - public NodeStateEntryHolder(String line) { - this.line = line; - this.pathElements = copyOf(elements(getPath(line))); - } + long getEntryCount(); } Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java?rev=1818896&r1=1818895&r2=1818896&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java (original) +++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java Thu Dec 21 09:07:31 2017 @@ -31,10 +31,12 @@ import org.slf4j.LoggerFactory; import static com.google.common.base.StandardSystemProperty.LINE_SEPARATOR; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_MAX_SORT_MEMORY_IN_GB; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT; -class StoreAndSortStrategy { +class StoreAndSortStrategy implements SortStrategy { private static final String OAK_INDEXER_DELETE_ORIGINAL = "oak.indexer.deleteOriginal"; - private static final String OAK_INDEXER_MAX_SORT_MEMORY_IN_GB = "oak.indexer.maxSortMemoryInGB"; + private static final int LINE_SEP_LENGTH = LINE_SEPARATOR.value().length(); private final Logger log = LoggerFactory.getLogger(getClass()); private final Iterable<NodeStateEntry> nodeStates; @@ -44,7 +46,7 @@ class StoreAndSortStrategy { private final boolean compressionEnabled; private long entryCount; private boolean deleteOriginal = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_DELETE_ORIGINAL, "true")); - private int maxMemory = Integer.getInteger(OAK_INDEXER_MAX_SORT_MEMORY_IN_GB, 3); + private int maxMemory = Integer.getInteger(OAK_INDEXER_MAX_SORT_MEMORY_IN_GB, OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT); private long textSize; @@ -57,11 +59,13 @@ class StoreAndSortStrategy { this.compressionEnabled = compressionEnabled; } + @Override public File createSortedStoreFile() throws IOException { File storeFile = writeToStore(storeDir, getStoreFileName()); return sortStoreFile(storeFile); } + @Override public long getEntryCount() { return entryCount; } @@ -90,8 +94,9 @@ class StoreAndSortStrategy { try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, compressionEnabled)) { for (NodeStateEntry e : nodeStates) { String line = entryWriter.toString(e); - w.append(line).append(LINE_SEPARATOR.value()); - textSize += line.length() + LINE_SEPARATOR.value().length(); + w.append(line); + w.newLine(); + textSize += line.length() + LINE_SEP_LENGTH; entryCount++; } } Added: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java?rev=1818896&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java (added) +++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java Thu Dec 21 09:07:31 2017 @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.index.indexer.document.flatfile; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.lang.management.MemoryNotificationInfo; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryUsage; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import javax.management.Notification; +import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; + +import com.google.common.base.Stopwatch; +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.commons.StringUtils; +import org.apache.jackrabbit.oak.commons.sort.ExternalSort; +import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Charsets.UTF_8; +import static java.lang.management.ManagementFactory.getMemoryMXBean; +import static java.lang.management.ManagementFactory.getMemoryPoolMXBeans; +import static java.lang.management.MemoryType.HEAP; +import static org.apache.commons.io.FileUtils.ONE_GB; +import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_MAX_SORT_MEMORY_IN_GB; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.sizeOf; + +class TraverseWithSortStrategy implements SortStrategy { + private static final String OAK_INDEXER_MIN_MEMORY = "oak.indexer.minMemoryForWork"; + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicBoolean sufficientMemory = new AtomicBoolean(true); + private final Iterable<NodeStateEntry> nodeStates; + private final NodeStateEntryWriter entryWriter; + private final File storeDir; + private final boolean compressionEnabled; + private final Charset charset = UTF_8; + private final Comparator<NodeStateEntryHolder> comparator; + private NotificationEmitter emitter; + private MemoryListener listener; + private final int maxMemory = Integer.getInteger(OAK_INDEXER_MAX_SORT_MEMORY_IN_GB, OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT); + private final long minMemory = Integer.getInteger(OAK_INDEXER_MIN_MEMORY, 2); + private final long maxMemoryBytes = maxMemory * ONE_GB; + private final long minMemoryBytes = minMemory * ONE_GB; + private boolean useMaxMemory; + private long entryCount; + private long memoryUsed; + private File sortWorkDir; + private List<File> sortedFiles = new ArrayList<>(); + private ArrayList<NodeStateEntryHolder> entryBatch = new ArrayList<>(); + + + TraverseWithSortStrategy(Iterable<NodeStateEntry> nodeStates, PathElementComparator pathComparator, + NodeStateEntryWriter entryWriter, File storeDir, boolean compressionEnabled) { + this.nodeStates = nodeStates; + this.entryWriter = entryWriter; + this.storeDir = storeDir; + this.compressionEnabled = compressionEnabled; + this.comparator = (e1, e2) -> pathComparator.compare(e1.pathElements, e2.pathElements); + } + + @Override + public File createSortedStoreFile() throws IOException { + logFlags(); + configureMemoryListener(); + sortWorkDir = createdSortWorkDir(storeDir); + writeToSortedFiles(); + return sortStoreFile(); + } + + @Override + public long getEntryCount() { + return entryCount; + } + + private File sortStoreFile() throws IOException { + Stopwatch w = Stopwatch.createStarted(); + File sortedFile = new File(storeDir, getSortedStoreFileName()); + try(BufferedWriter writer = createWriter(sortedFile, compressionEnabled)) { + Function<String, NodeStateEntryHolder> func1 = (line) -> line == null ? null : new NodeStateEntryHolder(line); + Function<NodeStateEntryHolder, String> func2 = holder -> holder == null ? null : holder.line; + ExternalSort.mergeSortedFiles(sortedFiles, + writer, + comparator, + charset, + true, //distinct + compressionEnabled, //useZip + func2, + func1 + ); + } + log.info("Merging of sorted files completed in {}", w); + return sortedFile; + } + + private String getSortedStoreFileName() { + return compressionEnabled ? "store-sorted.json.zip" : "store-sorted.json"; + } + + private void writeToSortedFiles() throws IOException { + Stopwatch w = Stopwatch.createStarted(); + for (NodeStateEntry e : nodeStates) { + entryCount++; + addEntry(e); + } + + //Save the last batch + sortAndSaveBatch(); + + //Free up the batch + entryBatch.clear(); + entryBatch.trimToSize(); + + log.info("Dumped {} nodestates in json format in {}",entryCount, w); + log.info("Created {} sorted files of size {} to merge", + sortedFiles.size(), humanReadableByteCount(sizeOf(sortedFiles))); + } + + private void addEntry(NodeStateEntry e) throws IOException { + if (isMemoryLow()) { + sortAndSaveBatch(); + reset(); + } + + String jsonText = entryWriter.asJson(e.getNodeState()); + //Here logic differs from NodeStateEntrySorter in sense that + //Holder line consist only of json and not 'path|json' + NodeStateEntryHolder h = new NodeStateEntryHolder(e.getPath(), jsonText); + entryBatch.add(h); + updateMemoryUsed(h); + + } + + private void reset() { + entryBatch.clear(); + memoryUsed = 0; + sufficientMemory.set(true); + } + + private void sortAndSaveBatch() throws IOException { + if (entryBatch.isEmpty()) { + return; + } + entryBatch.sort(comparator); + Stopwatch w = Stopwatch.createStarted(); + File newtmpfile = File.createTempFile("sortInBatch", "flatfile", sortWorkDir); + long textSize = 0; + try (BufferedWriter writer = FlatFileStoreUtils.createWriter(newtmpfile, compressionEnabled)) { + for (NodeStateEntryHolder h : entryBatch) { + //Here holder line only contains nodeState json + String text = entryWriter.toString(h.pathElements, h.line); + writer.write(text); + writer.newLine(); + textSize += text.length() + 1; + } + } + log.info("Sorted and stored batch of size {} (uncompressed {}) with {} entries in {}", + humanReadableByteCount(newtmpfile.length()), humanReadableByteCount(textSize),entryBatch.size(), w); + sortedFiles.add(newtmpfile); + } + + private boolean isMemoryLow() { + if (useMaxMemory){ + return memoryUsed > maxMemoryBytes; + } + return !sufficientMemory.get(); + } + + private void updateMemoryUsed(NodeStateEntryHolder h) { + for (String e : h.pathElements) { + memoryUsed += StringUtils.estimateMemoryUsage(e); + } + memoryUsed += StringUtils.estimateMemoryUsage(h.line); + } + + private static File createdSortWorkDir(File storeDir) throws IOException { + File sortedFileDir = new File(storeDir, "sort-work-dir"); + FileUtils.forceMkdir(sortedFileDir); + return sortedFileDir; + } + + private void logFlags() { + log.info("Min heap memory (GB) to be required : {} ({})", minMemory, OAK_INDEXER_MIN_MEMORY); + log.info("Max heap memory (GB) to be used for merge sort : {} ({})", maxMemory, OAK_INDEXER_MAX_SORT_MEMORY_IN_GB); + } + + //~-------------------------------------< memory management > + + private void configureMemoryListener() { + MemoryPoolMXBean pool = getMemoryPool(); + if (pool == null) { + log.warn("Unable to setup monitoring of available memory. " + + "Would use configured maxMemory limit of {} GB", maxMemory); + useMaxMemory = true; + return; + } + + emitter = (NotificationEmitter) getMemoryMXBean(); + listener = new MemoryListener(); + emitter.addNotificationListener(listener, null, null); + MemoryUsage usage = pool.getCollectionUsage(); + long maxMemory = usage.getMax(); + long warningThreshold = minMemory * ONE_GB; + log.info("Setting up a listener to monitor pool '{}' and trigger batch save " + + "if memory drop below {} GB (max {})", pool.getName(), minMemory, humanReadableByteCount(maxMemory)); + pool.setCollectionUsageThreshold(warningThreshold); + checkMemory(usage); + } + + private void checkMemory(MemoryUsage usage) { + long maxMemory = usage.getMax(); + long usedMemory = usage.getUsed(); + long avail = maxMemory - usedMemory; + if (avail > minMemoryBytes) { + sufficientMemory.set(true); + log.info("Available memory level {} is good. Current batch size {}", humanReadableByteCount(avail), entryBatch.size()); + } else { + sufficientMemory.set(false); + log.info("Available memory level {} (required {}) is low. Enabling flag to trigger batch save", + humanReadableByteCount(avail), minMemory); + } + } + + //Taken from GCMemoryBarrier + private class MemoryListener implements NotificationListener { + @Override + public void handleNotification(Notification notification, + Object handback) { + if (notification + .getType() + .equals(MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) { + if (sufficientMemory.get()) { + CompositeData cd = (CompositeData) notification + .getUserData(); + MemoryNotificationInfo info = MemoryNotificationInfo + .from(cd); + checkMemory(info.getUsage()); + } + } + } + } + + private static MemoryPoolMXBean getMemoryPool() { + long maxSize = 0; + MemoryPoolMXBean maxPool = null; + for (MemoryPoolMXBean pool : getMemoryPoolMXBeans()) { + if (HEAP == pool.getType() + && pool.isCollectionUsageThresholdSupported()) { + // Get usage after a GC, which is more stable, if available + long poolSize = pool.getCollectionUsage().getMax(); + // Keep the pool with biggest size, by default it should be Old Gen Space + if (poolSize > maxSize) { + maxPool = pool; + } + } + } + return maxPool; + } +} Propchange: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java ------------------------------------------------------------------------------ svn:eol-style = native