Author: chetanm
Date: Fri Dec  8 07:26:15 2017
New Revision: 1817463

URL: http://svn.apache.org/viewvc?rev=1817463&view=rev
Log:
OAK-6353 - Use Document order traversal for reindexing performed on 
DocumentNodeStore setups

Implement sorting support with basic FlatFileStore impl which just return
one NodeStateEntry per line where NodeState can only provide access to actual
node and do not support traversal

Added:
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
   (with props)
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
   (with props)
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntrySorter.java
   (with props)
    
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/PathElementComparatorTest.java

Added: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java?rev=1817463&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
 Fri Dec  8 07:26:15 2017
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Collections;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+public class FlatFileNodeStoreBuilder {
+    private final Iterable<NodeStateEntry> nodeStates;
+    private final File workDir;
+    private Iterable<String> preferredPathElements = Collections.emptySet();
+    private BlobStore blobStore;
+
+    public FlatFileNodeStoreBuilder(Iterable<NodeStateEntry> nodeStates, File 
workDir) {
+        this.nodeStates = nodeStates;
+        this.workDir = workDir;
+    }
+
+    public FlatFileNodeStoreBuilder withBlobStore(BlobStore blobStore) {
+        this.blobStore = blobStore;
+        return this;
+    }
+
+    public FlatFileNodeStoreBuilder withPreferredPathElements(Iterable<String> 
preferredPathElements) {
+        this.preferredPathElements = preferredPathElements;
+        return this;
+    }
+
+    public FlatFileStore build() throws IOException {
+        //TODO Check not null blobStore
+        File flatFileStoreDir = createStoreDir();
+        File storeFile = writeToStore(flatFileStoreDir, "store.json");
+        File sortedFile = sortStoreFile(storeFile);
+        return new FlatFileStore(sortedFile, new 
NodeStateEntryReader(blobStore));
+    }
+
+    private File sortStoreFile(File storeFile) throws IOException {
+        File sortWorkDir = new File(storeFile.getParent(), "sort-work-dir");
+        FileUtils.forceMkdir(sortWorkDir);
+        NodeStateEntrySorter sorter =
+                new NodeStateEntrySorter(new 
PathElementComparator(preferredPathElements), storeFile, sortWorkDir);
+        //TODO Configure flags zip and deleteOriginal, maxMemory
+        sorter.sort();
+        return sorter.getSortedFile();
+    }
+
+    private File writeToStore(File dir, String fileName) throws IOException {
+        File file = new File(dir, fileName);
+        try (
+                Writer w = Files.newWriter(file, Charsets.UTF_8);
+                NodeStateEntryWriter entryWriter = new 
NodeStateEntryWriter(blobStore, w)
+        ) {
+            for (NodeStateEntry e : nodeStates) {
+                entryWriter.write(e);
+            }
+        }
+        return file;
+    }
+
+    private File createStoreDir() throws IOException {
+        File dir = new File(workDir, "flat-file-store");
+        FileUtils.forceMkdir(dir);
+        return dir;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java?rev=1817463&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
 Fri Dec  8 07:26:15 2017
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Iterator;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import org.apache.commons.io.LineIterator;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+
+public class FlatFileStore implements Iterable<NodeStateEntry>, Closeable{
+    private final Closer closer = Closer.create();
+    private final File storeFile;
+    private final NodeStateEntryReader entryReader;
+
+    public FlatFileStore(File storeFile, NodeStateEntryReader entryReader) {
+        this.storeFile = storeFile;
+        this.entryReader = entryReader;
+    }
+
+    @Override
+    public Iterator<NodeStateEntry> iterator() {
+        LineIterator itr = new LineIterator(createReader());
+        closer.register(itr::close);
+        return new AbstractIterator<NodeStateEntry>() {
+            @Override
+            protected NodeStateEntry computeNext() {
+                if (itr.hasNext()) {
+                   return convert(itr.nextLine());
+                }
+
+                //End of iterator then close it
+                LineIterator.closeQuietly(itr);
+                return endOfData();
+            }
+        };
+    }
+
+    private NodeStateEntry convert(String line) {
+        return entryReader.read(line);
+    }
+
+    @Override
+    public void close() throws IOException {
+        closer.close();
+    }
+
+    private Reader createReader() {
+        try {
+            return Files.newReader(storeFile, Charsets.UTF_8);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException("Error opening file " + storeFile, e);
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntrySorter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntrySorter.java?rev=1817463&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntrySorter.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntrySorter.java
 Fri Dec  8 07:26:15 2017
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import com.google.common.base.Stopwatch;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.collect.ImmutableList.copyOf;
+import static org.apache.commons.io.FileUtils.ONE_GB;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter.getPath;
+
+public class NodeStateEntrySorter {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final int DEFAULTMAXTEMPFILES = 1024;
+    private final File nodeStateFile;
+    private final File workDir;
+    private final Charset charset = UTF_8;
+    private final Comparator<Iterable<String>> pathComparator;
+    private File sortedFile;
+    private boolean useZip;
+    private boolean deleteOriginal;
+    private long maxMemory = ONE_GB * 5;
+
+    public NodeStateEntrySorter(Comparator<Iterable<String>> pathComparator, 
File nodeStateFile, File workDir) {
+        this(pathComparator, nodeStateFile, workDir, 
getSortedFileName(nodeStateFile));
+    }
+
+    public NodeStateEntrySorter(Comparator<Iterable<String>> pathComparator, 
File nodeStateFile, File workDir, File sortedFile) {
+        this.nodeStateFile = nodeStateFile;
+        this.workDir = workDir;
+        this.sortedFile = sortedFile;
+        this.pathComparator = pathComparator;
+    }
+
+    public void setUseZip(boolean useZip) {
+        this.useZip = useZip;
+    }
+
+    public void setDeleteOriginal(boolean deleteOriginal) {
+        this.deleteOriginal = deleteOriginal;
+    }
+
+    public void setMaxMemoryInGB(long maxMemoryInGb) {
+        this.maxMemory = maxMemory * ONE_GB;
+    }
+
+    public void sort() throws IOException {
+        long estimatedMemory = estimateAvailableMemory();
+        long memory = Math.max(estimatedMemory, maxMemory);
+        log.info("Sorting with memory {} (estimated {})", 
humanReadableByteCount(memory), humanReadableByteCount(estimatedMemory));
+        Stopwatch w = Stopwatch.createStarted();
+
+        Comparator<NodeStateEntryHolder> comparator = 
Comparator.naturalOrder();
+        Function<String, NodeStateEntryHolder> func1 = (line) -> line == null 
? null : new NodeStateEntryHolder(line, pathComparator);
+        Function<NodeStateEntryHolder, String> func2 = holder -> holder == 
null ? null : holder.getLine();
+
+        List<File> sortedFiles = ExternalSort.sortInBatch(nodeStateFile,
+                comparator, //Comparator to use
+                DEFAULTMAXTEMPFILES,
+                memory,
+                charset, //charset
+                workDir,  //temp directory where intermediate files are created
+                false,
+                0,
+                useZip,
+                func2,
+                func1
+        );
+
+        log.info("Batch sorting done in {} with {} files of size {} to merge", 
w, sortedFiles.size(),
+                humanReadableByteCount(sizeOf(sortedFiles)));
+
+        if (deleteOriginal) {
+            log.info("Removing the original file {}", 
nodeStateFile.getAbsolutePath());
+            FileUtils.forceDelete(nodeStateFile);
+        }
+
+        Stopwatch w2 = Stopwatch.createStarted();
+
+        ExternalSort.mergeSortedFiles(sortedFiles,
+                sortedFile,
+                comparator,
+                charset,
+                false,
+                false,
+                useZip,
+                func2,
+                func1
+
+        );
+
+        log.info("Merging of sorted files completed in {}", w2);
+        log.info("Sorting completed in {}", w);
+    }
+
+    public File getSortedFile() {
+        return sortedFile;
+    }
+
+    private static File getSortedFileName(File file) {
+        String extension = FilenameUtils.getExtension(file.getName());
+        String baseName = FilenameUtils.getBaseName(file.getName());
+        return new File(file.getParentFile(), baseName + "-sorted." + 
extension);
+    }
+
+    private static long sizeOf(List<File> sortedFiles) {
+        return sortedFiles.stream().mapToLong(File::length).sum();
+    }
+
+    /**
+     * This method calls the garbage collector and then returns the free
+     * memory. This avoids problems with applications where the GC hasn't
+     * reclaimed memory and reports no available memory.
+     *
+     * @return available memory
+     */
+    private static long estimateAvailableMemory() {
+        System.gc();
+        // 
http://stackoverflow.com/questions/12807797/java-get-available-memory
+        Runtime r = Runtime.getRuntime();
+        long allocatedMemory = r.totalMemory() - r.freeMemory();
+        long presFreeMemory = r.maxMemory() - allocatedMemory;
+        return presFreeMemory;
+    }
+
+    static class NodeStateEntryHolder implements 
Comparable<NodeStateEntryHolder> {
+        final String line;
+        final List<String> pathElements;
+        final Comparator<Iterable<String>> comparator;
+
+        public NodeStateEntryHolder(String line, Comparator<Iterable<String>> 
comparator) {
+            this.line = line;
+            this.comparator = comparator;
+            this.pathElements = copyOf(elements(getPath(line)));
+        }
+
+        public String getLine() {
+            return line;
+        }
+
+        @Override
+        public int compareTo(NodeStateEntryHolder o) {
+            return comparator.compare(this.pathElements, o.pathElements);
+        }
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntrySorter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java?rev=1817463&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
 Fri Dec  8 07:26:15 2017
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.File;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.collect.Iterables;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("StaticPseudoFunctionalStyleMethod")
+public class FlatFileStoreTest {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private Set<String> preferred = singleton("jcr:content");
+
+    @Test
+    public void basicTest() throws Exception {
+        List<String> paths = createTestPaths();
+        FlatFileNodeStoreBuilder builder = new 
FlatFileNodeStoreBuilder(createEntries(paths), folder.getRoot());
+        FlatFileStore flatStore = builder.withBlobStore(new MemoryBlobStore())
+                .withPreferredPathElements(preferred)
+                .build();
+
+        List<String> entryPaths = 
StreamSupport.stream(flatStore.spliterator(), false)
+                .map(NodeStateEntry::getPath)
+                .collect(Collectors.toList());
+
+        List<String> sortedPaths = PathElementComparatorTest.sortPaths(paths, 
preferred);
+
+        assertEquals(sortedPaths, entryPaths);
+    }
+
+    private List<String> createTestPaths() {
+        return asList("/a", "/b", "/c", "/a/b w", "/a/jcr:content", "/a/b", 
"/", "/b/l");
+    }
+
+    private Iterable<NodeStateEntry> createEntries(List<String> paths) {
+        return Iterables.transform(paths, p -> new 
NodeStateEntry(EmptyNodeState.EMPTY_NODE, p));
+    }
+
+}
\ No newline at end of file

Propchange: 
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/PathElementComparatorTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/PathElementComparatorTest.java?rev=1817463&r1=1817462&r2=1817463&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/PathElementComparatorTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/PathElementComparatorTest.java
 Fri Dec  8 07:26:15 2017
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Joiner;
@@ -31,6 +32,7 @@ import org.apache.jackrabbit.oak.commons
 import org.junit.Test;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 
 public class PathElementComparatorTest {
@@ -53,8 +55,8 @@ public class PathElementComparatorTest {
     }
 
     @Test
-    public void preferredElements() throws Exception{
-        PathElementComparator c = new 
PathElementComparator(asList("jcr:content"));
+    public void preferredElements() {
+        PathElementComparator c = new 
PathElementComparator(singleton("jcr:content"));
         assertEquals(asList("/a", "/a/jcr:content", "/a/b"), 
sortPaths(asList("/a/jcr:content", "/a/b", "/a"), c));
 
         assertSorted(asList("/a", "/a/jcr:content", "/a/b"),c);
@@ -72,11 +74,15 @@ public class PathElementComparatorTest {
         assertEquals(sorted, sortedNew);
     }
 
-    private List<String> sortPaths(List<String> paths){
+    static List<String> sortPaths(List<String> paths){
         return sortPaths(paths, new PathElementComparator());
     }
 
-    private List<String> sortPaths(List<String> paths, 
Comparator<Iterable<String>> comparator) {
+    static List<String> sortPaths(List<String> paths, Set<String> 
preferredElements) {
+        return sortPaths(paths, new PathElementComparator(preferredElements));
+    }
+
+    static List<String> sortPaths(List<String> paths, 
Comparator<Iterable<String>> comparator) {
         List<Iterable<String>> copy = paths.stream().map(p -> 
ImmutableList.copyOf(PathUtils.elements(p)))
                 .sorted(comparator).collect(Collectors.toList());
         Joiner j = Joiner.on('/');


Reply via email to