This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b74629c48a OAK-11290 - Improve logging of publish indexing job (#1887)
b74629c48a is described below

commit b74629c48ab49da61e504b00d5b213862f753168
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Dec 6 18:29:07 2024 +0100

    OAK-11290 - Improve logging of publish indexing job (#1887)
---
 .../oak/plugins/index/importer/IndexImporter.java  | 38 ++++++-------
 .../jackrabbit/oak/index/IndexerMetrics.java       | 29 ++++++++++
 .../jackrabbit/oak/index/IndexerSupport.java       | 61 +++++++++++++++++++--
 .../jackrabbit/oak/index/OutOfBandIndexerBase.java | 64 +++++++++++++++++-----
 .../indexer/document/DocumentStoreIndexerBase.java | 23 ++------
 .../flatfile/FlatFileNodeStoreBuilder.java         | 14 -----
 .../jackrabbit/oak/index/ElasticIndexCommand.java  |  5 ++
 .../apache/jackrabbit/oak/index/IndexCommand.java  |  6 ++
 .../oak/index/DocumentStoreIndexerIT.java          |  1 -
 9 files changed, 169 insertions(+), 72 deletions(-)

diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
index 02f3394c69..69c9b4424a 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
@@ -106,16 +106,16 @@ public class IndexImporter {
                          AsyncIndexerLock indexerLock, StatisticsProvider 
statisticsProvider, IndexingReporter indexingReporter) throws IOException {
         this.statisticsProvider = statisticsProvider;
         this.indexingReporter = indexingReporter;
-        checkArgument(indexDir.exists() && indexDir.isDirectory(), "Path [%s] 
does not point " +
-                "to existing directory", indexDir.getAbsolutePath());
+        checkArgument(indexDir.exists() && indexDir.isDirectory(),
+                "Path [%s] does not point to existing directory", 
indexDir.getAbsolutePath());
         this.nodeStore = nodeStore;
         this.indexDir = indexDir;
         this.indexEditorProvider = indexEditorProvider;
-        indexerInfo = IndexerInfo.fromDirectory(indexDir);
+        this.indexerInfo = IndexerInfo.fromDirectory(indexDir);
         this.indexerLock = indexerLock;
-        indexes = indexerInfo.getIndexes();
-        indexedState = 
requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint), 
String.format("Cannot retrieve " +
-                "checkpointed state [%s]", indexerInfo.checkpoint));
+        this.indexes = indexerInfo.getIndexes();
+        this.indexedState = 
requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint),
+                "Cannot retrieve checkpointed state [" + 
indexerInfo.checkpoint + "]");
         this.indexDefinitionUpdater = new IndexDefinitionUpdater(new 
File(indexDir, INDEX_DEFINITIONS_JSON));
         this.asyncLaneToIndexMapping = mapIndexesToLanes(indexes);
         this.indexPathsToUpdate = new HashSet<>();
@@ -127,7 +127,7 @@ public class IndexImporter {
 
     public void importIndex() throws IOException, CommitFailedException {
         try {
-            if (indexes.keySet().isEmpty()) {
+            if (indexes.isEmpty()) {
                 LOG.warn("No indexes to import (possibly index definitions 
outside of a oak:index node?)");
             }
             LOG.info("Proceeding to import {} indexes from {}", 
indexes.keySet(), indexDir.getAbsolutePath());
@@ -178,10 +178,9 @@ public class IndexImporter {
                     mergeWithConcurrentCheck(nodeStore, builder);
                 });
             } catch (CommitFailedException commitFailedException) {
-                LOG.error("Unable to revert back index lanes for: "
-                                + indexPathsToUpdate.stream()
-                                .collect(StringBuilder::new, 
StringBuilder::append, (a, b) -> a.append(",").append(b)),
-                        commitFailedException);
+                LOG.error("Unable to revert back index lanes for: {}",
+                        indexPathsToUpdate.stream()
+                                .collect(StringBuilder::new, 
StringBuilder::append, (a, b) -> a.append(",").append(b)), 
commitFailedException);
                 throw e;
             }
         }
@@ -265,12 +264,12 @@ public class IndexImporter {
         boolean success = false;
         try {
             String checkpoint = getAsync().getString(laneName);
-            requireNonNull(checkpoint, String.format("No current checkpoint 
found for lane [%s]", laneName));
+            requireNonNull(checkpoint, "No current checkpoint found for lane 
[" + laneName + "]");
 
             //TODO Support case where checkpoint got lost or complete 
reindexing is done
 
             NodeState after = nodeStore.retrieve(checkpoint);
-            requireNonNull(after, String.format("No state found for checkpoint 
[%s] for lane [%s]", checkpoint, laneName));
+            requireNonNull(after, "No state found for checkpoint [" + 
checkpoint + "] for lane [" + laneName + "]");
             LOG.info("Proceeding to update imported indexes {} to checkpoint 
[{}] for lane [{}]",
                     indexInfos, checkpoint, laneName);
 
@@ -297,8 +296,8 @@ public class IndexImporter {
             updateIndexImporterState(builder, 
IndexImportState.IMPORT_INDEX_DATA, IndexImportState.BRING_INDEX_UPTODATE, 
false);
             mergeWithConcurrentCheck(nodeStore, builder);
             success = true;
-            LOG.info("Imported index is updated to repository state at 
checkpoint [{}] for " +
-                    "indexing lane [{}]", checkpoint, laneName);
+            LOG.info("Imported index is updated to repository state at 
checkpoint [{}] for indexing lane [{}]",
+                    checkpoint, laneName);
         } catch (CommitFailedException e) {
             LOG.error("Failed while performing bringIndexUpToDate and updating 
indexImportState from  [{}] to  [{}]",
                     IndexImportState.IMPORT_INDEX_DATA, 
IndexImportState.BRING_INDEX_UPTODATE);
@@ -375,7 +374,7 @@ public class IndexImporter {
 
     private IndexImporterProvider getImporter(String type) {
         IndexImporterProvider provider = importers.get(type);
-        return requireNonNull(provider, String.format("No 
IndexImporterProvider found for type [%s]", type));
+        return requireNonNull(provider, "No IndexImporterProvider found for 
type [" + type + "]");
     }
 
     private ListMultimap<String, IndexInfo> mapIndexesToLanes(Map<String, 
File> indexes) {
@@ -391,7 +390,7 @@ public class IndexImporter {
             boolean newIndex = !NodeStateUtils.getNode(rootState, 
indexPath).exists();
 
             String type = 
indexState.getString(IndexConstants.TYPE_PROPERTY_NAME);
-            requireNonNull(type, String.format("No 'type' property found for 
index at path [%s]", indexPath));
+            requireNonNull(type, "No 'type' property found for index at path 
[" + indexPath + "]");
 
             String asyncName = getAsyncLaneName(indexPath, indexState);
             if (asyncName == null) {
@@ -448,8 +447,9 @@ public class IndexImporter {
 
     private void incrementReIndexCount(NodeBuilder definition) {
         long count = 0;
-        if (definition.hasProperty(REINDEX_COUNT)) {
-            count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
+        PropertyState reindexCountProp = definition.getProperty(REINDEX_COUNT);
+        if (reindexCountProp != null) {
+            count = reindexCountProp.getValue(Type.LONG);
         }
         definition.setProperty(REINDEX_COUNT, count + 1);
     }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerMetrics.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerMetrics.java
new file mode 100644
index 0000000000..7f0bac00e0
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerMetrics.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index;
+
+public interface IndexerMetrics {
+    String INDEXER_METRICS_PREFIX = "oak_indexer_";
+    String METRIC_INDEXING_INDEX_DATA_SIZE = INDEXER_METRICS_PREFIX + 
"index_data_size";
+
+    String INDEXER_PUBLISH_METRICS_PREFIX = "oak_indexer_publish_";
+    String METRIC_INDEXING_PUBLISH_DURATION_SECONDS = 
INDEXER_PUBLISH_METRICS_PREFIX + "indexing_duration_seconds";
+    String METRIC_INDEXING_PUBLISH_NODES_TRAVERSED = 
INDEXER_PUBLISH_METRICS_PREFIX + "nodes_traversed";
+    String METRIC_INDEXING_PUBLISH_NODES_INDEXED = 
INDEXER_PUBLISH_METRICS_PREFIX + "nodes_indexed";
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
index c9ffee0677..e6fec0a9ac 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
@@ -21,6 +21,7 @@ package org.apache.jackrabbit.oak.index;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.function.Function;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.felix.inventory.Format;
+
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
@@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory;
 import static java.util.Objects.requireNonNull;
 
 public class IndexerSupport {
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
     /**
      * Directory name in output directory under which indexes are
      * stored
@@ -131,7 +133,7 @@ public class IndexerSupport {
         NodeState checkpointedState;
         if (HEAD_AS_CHECKPOINT.equals(checkpoint)) {
             checkpointedState = indexHelper.getNodeStore().getRoot();
-            log.warn("Using head state for indexing. Such an index cannot be 
imported back");
+            LOG.warn("Using head state for indexing. Such an index cannot be 
imported back");
         } else {
             checkpointedState = 
indexHelper.getNodeStore().retrieve(checkpoint);
             requireNonNull(checkpointedState, String.format("Not able to 
retrieve revision referred via checkpoint [%s]", checkpoint));
@@ -169,7 +171,7 @@ public class IndexerSupport {
         }
 
         copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-        log.info("Switched the async lane for indexes at {} to {} and marked 
them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
+        LOG.info("Switched the async lane for indexes at {} to {} and marked 
them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
     }
 
     public void postIndexWork(NodeStore copyOnWriteStore) throws 
CommitFailedException, IOException {
@@ -187,7 +189,7 @@ public class IndexerSupport {
         }
 
         copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-        log.info("Switched the async lane for indexes at {} back to there 
original lanes", indexHelper.getIndexPaths());
+        LOG.info("Switched the async lane for indexes at {} back to there 
original lanes", indexHelper.getIndexPaths());
     }
 
     public Map<String, String> getCheckpointInfo() {
@@ -236,7 +238,7 @@ public class IndexerSupport {
     }
 
     /**
-     * @param indexDefinitions set of IndexDefinition to be used to calculate 
the Path Predicate
+     * @param indexDefinitions     set of IndexDefinition to be used to 
calculate the Path Predicate
      * @param typeToRepositoryPath Function to convert type <T> to valid 
repository path of type <String>
      * @param <T>
      * @return filter predicate based on the include/exclude path rules of the 
given set of index definitions.
@@ -246,7 +248,7 @@ public class IndexerSupport {
     }
 
     /**
-     * @param pattern Pattern for a custom excludes regex based on which paths 
would be filtered out
+     * @param pattern              Pattern for a custom excludes regex based 
on which paths would be filtered out
      * @param typeToRepositoryPath Function to convert type <T> to valid 
repository path of type <String>
      * @param <T>
      * @return Return a predicate that should test true for all paths that do 
not match the provided regex pattern.
@@ -254,4 +256,51 @@ public class IndexerSupport {
     public <T> Predicate<T> getFilterPredicateBasedOnCustomRegex(Pattern 
pattern, Function<T, String> typeToRepositoryPath) {
         return t -> !pattern.matcher(typeToRepositoryPath.apply(t)).find();
     }
+
+    /**
+     * Computes the total size of the generated index data. This method is 
intended to be used when creating Lucene
+     * indexes, which are created locally. With Elastic, this will not include 
the Lucene files since the indexes
+     * are updated remotely.
+     *
+     * @return The total size of the index data generated or -1 if there is 
some error while computing the size.
+     */
+    public long computeSizeOfGeneratedIndexData() {
+        try {
+            File localIndexDir = getLocalIndexDir();
+            long totalSize = 0;
+            if (localIndexDir == null || !localIndexDir.isDirectory()) {
+                LOG.warn("Local index directory is invalid, this should not 
happen: {}", localIndexDir);
+                return -1;
+            } else {
+                // Each index is stored in a separate directory
+                File[] directories = 
localIndexDir.listFiles(File::isDirectory);
+                if (directories == null) {
+                    LOG.warn("Error listing sub directories in the local index 
directory: {}", localIndexDir);
+                    return -1;
+                }
+                // Print the indexes in alphabetic order
+                Arrays.sort(directories);
+                StringBuilder sb = new StringBuilder();
+                for (File indexDir : directories) {
+                    long size = FileUtils.sizeOfDirectory(indexDir);
+                    totalSize += size;
+                    File[] files = indexDir.listFiles(File::isFile);
+                    if (files == null) {
+                        LOG.warn("Error listing files in directory: {}", 
indexDir);
+                        // continue to the next index
+                    } else {
+                        long numberOfFiles = files.length;
+                        sb.append("\n  - " + indexDir.getName() + ": " + 
numberOfFiles + " files, " +
+                                size + " (" + 
FileUtils.byteCountToDisplaySize(size) + ")");
+                    }
+                }
+                LOG.info("Total size of index data generated: {} ({}){}",
+                        totalSize, 
FileUtils.byteCountToDisplaySize(totalSize), sb);
+                return totalSize;
+            }
+        } catch (Throwable t) {
+            LOG.warn("Error while computing size of generated index data.", t);
+            return -1;
+        }
+    }
 }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
index 31bcb56779..2a9fb02b72 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
@@ -19,11 +19,13 @@
 package org.apache.jackrabbit.oak.index;
 
 import com.codahale.metrics.MetricRegistry;
+import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.guava.common.io.Closer;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.plugins.index.*;
 import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
 import 
org.apache.jackrabbit.oak.plugins.index.progress.NodeCounterMBeanEstimator;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
 import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -38,15 +40,27 @@ import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_INDEX_DATA_SIZE;
+import static 
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_DURATION_SECONDS;
+import static 
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_INDEXED;
+import static 
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_TRAVERSED;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_LOGGER;
 
-public abstract class OutOfBandIndexerBase implements Closeable, 
IndexUpdateCallback, NodeTraversalCallback{
+public abstract class OutOfBandIndexerBase implements Closeable, 
IndexUpdateCallback, NodeTraversalCallback {
 
     protected final Closer closer = Closer.create();
     private final IndexHelper indexHelper;
+    private final IndexerSupport indexerSupport;
+    private final IndexingReporter indexingReporter;
+    private final StatisticsProvider statisticsProvider;
     private NodeStore copyOnWriteStore;
-    private IndexerSupport indexerSupport;
+    private long nodesTraversed = 0;
+    private long nodesIndexed = 0;
 
     /**
      * Index lane name which is used for indexing
@@ -64,18 +78,38 @@ public abstract class OutOfBandIndexerBase implements 
Closeable, IndexUpdateCall
     public OutOfBandIndexerBase(IndexHelper indexHelper, IndexerSupport 
indexerSupport) {
         this.indexHelper = requireNonNull(indexHelper);
         this.indexerSupport = requireNonNull(indexerSupport);
+        this.indexingReporter = indexHelper.getIndexReporter();
+        this.statisticsProvider = indexHelper.getStatisticsProvider();
     }
 
     public void reindex() throws CommitFailedException, IOException {
-        NodeState checkpointedState = 
indexerSupport.retrieveNodeStateForCheckpoint();
-
-        copyOnWriteStore = new MemoryNodeStore(checkpointedState);
-        NodeState baseState = copyOnWriteStore.getRoot();
-        //TODO Check for indexPaths being empty
-
-        indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
-        preformIndexUpdate(baseState);
-        indexerSupport.postIndexWork(copyOnWriteStore);
+        List<String> indexNames = 
indexerSupport.getIndexDefinitions().stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
+        indexingReporter.setIndexNames(indexNames);
+        INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:START] Starting 
indexing job");
+        Stopwatch indexJobWatch = Stopwatch.createStarted();
+        try {
+            NodeState checkpointedState = 
indexerSupport.retrieveNodeStateForCheckpoint();
+
+            copyOnWriteStore = new MemoryNodeStore(checkpointedState);
+            NodeState baseState = copyOnWriteStore.getRoot();
+
+            indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
+            preformIndexUpdate(baseState);
+            indexerSupport.postIndexWork(copyOnWriteStore);
+
+            long indexingDurationSeconds = 
indexJobWatch.elapsed(TimeUnit.SECONDS);
+            long totalSize = indexerSupport.computeSizeOfGeneratedIndexData();
+            INDEXING_PHASE_LOGGER.info("[TASK:INDEXING:END] Metrics: {}", 
MetricsFormatter.createMetricsWithDurationOnly(indexingDurationSeconds));
+            MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_INDEXING_PUBLISH_DURATION_SECONDS, indexingDurationSeconds);
+            MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_INDEXING_PUBLISH_NODES_TRAVERSED, nodesTraversed);
+            MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_INDEXING_PUBLISH_NODES_INDEXED, nodesIndexed);
+            MetricsUtils.addMetricByteSize(statisticsProvider, 
indexingReporter, METRIC_INDEXING_INDEX_DATA_SIZE, totalSize);
+            indexingReporter.addTiming("Build Lucene Index", 
FormattingUtils.formatToSeconds(indexingDurationSeconds));
+        } catch (Throwable t) {
+            INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:FAIL] 
Metrics: {}, Error: {}",
+                    
MetricsFormatter.createMetricsWithDurationOnly(indexJobWatch), t.toString());
+            throw t;
+        }
     }
 
     protected File getLocalIndexDir() throws IOException {
@@ -90,13 +124,13 @@ public abstract class OutOfBandIndexerBase implements 
Closeable, IndexUpdateCall
     //~---------------------------------------------------< callbacks >
 
     @Override
-    public void indexUpdate() throws CommitFailedException {
-
+    public void indexUpdate() {
+        nodesIndexed++;
     }
 
     @Override
-    public void traversedNode(NodeTraversalCallback.PathSource pathSource) 
throws CommitFailedException {
-
+    public void traversedNode(NodeTraversalCallback.PathSource pathSource) {
+        nodesTraversed++;
     }
 
     protected void preformIndexUpdate(NodeState baseState) throws IOException, 
CommitFailedException {
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 1f298a806a..536da52c74 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -83,6 +83,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_INDEX_DATA_SIZE;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORTED_FILE_PATH;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CUSTOM_EXCLUDED_PATHS;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CUSTOM_EXCLUDE_ENTRIES_REGEX;
@@ -96,7 +97,6 @@ import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_
 public abstract class DocumentStoreIndexerBase implements Closeable {
     public static final String INDEXER_METRICS_PREFIX = "oak_indexer_";
     public static final String METRIC_INDEXING_DURATION_SECONDS = 
INDEXER_METRICS_PREFIX + "indexing_duration_seconds";
-    public static final String METRIC_MERGE_NODE_STORE_DURATION_SECONDS = 
INDEXER_METRICS_PREFIX + "merge_node_store_duration_seconds";
     public static final String METRIC_FULL_INDEX_CREATION_DURATION_SECONDS = 
INDEXER_METRICS_PREFIX + "full_index_creation_duration_seconds";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
@@ -198,7 +198,6 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                         .withNodeStore(nodeStore)
                         .withMongoDocumentStore(getMongoDocumentStore())
                         .withMongoClientURI(getMongoClientURI())
-                        .withMongoDatabase(getMongoDatabase())
                         .withNodeStateEntryTraverserFactory(new 
MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
                                 nodeStore, getMongoDocumentStore(), 
traversalLog))
                         .withCheckpoint(indexerSupport.getCheckpoint())
@@ -341,6 +340,8 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
 
     public void reindex() throws CommitFailedException, IOException {
         INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:START] Starting 
indexing job");
+        List<String> indexNames = 
indexerSupport.getIndexDefinitions().stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
+        indexingReporter.setIndexNames(indexNames);
         Stopwatch indexJobWatch = Stopwatch.createStarted();
         try {
             IndexingProgressReporter progressReporter =
@@ -420,25 +421,13 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                         
MetricsFormatter.createMetricsWithDurationOnly(indexerWatch), t.toString());
                 throw t;
             }
-
-            INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:START] Starting 
merge node store");
-            Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
-            try {
-                copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, 
CommitInfo.EMPTY);
-                long mergeNodeStoreDurationSeconds = 
mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS);
-                INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:END] 
Metrics: {}", 
MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreDurationSeconds));
-                MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_MERGE_NODE_STORE_DURATION_SECONDS, mergeNodeStoreDurationSeconds);
-                indexingReporter.addTiming("Merge node store", 
FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds));
-            } catch (Throwable t) {
-                INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:FAIL] 
Metrics: {}, Error: {}",
-                        
MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreWatch), 
t.toString());
-                throw t;
-            }
-
+            copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, 
CommitInfo.EMPTY);
             indexerSupport.postIndexWork(copyOnWriteStore);
 
             long fullIndexCreationDurationSeconds = 
indexJobWatch.elapsed(TimeUnit.SECONDS);
             INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:END] Metrics 
{}", 
MetricsFormatter.createMetricsWithDurationOnly(fullIndexCreationDurationSeconds));
+            long totalSize = indexerSupport.computeSizeOfGeneratedIndexData();
+            MetricsUtils.addMetricByteSize(statisticsProvider, 
indexingReporter, METRIC_INDEXING_INDEX_DATA_SIZE, totalSize);
             MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_FULL_INDEX_CREATION_DURATION_SECONDS, fullIndexCreationDurationSeconds);
             indexingReporter.addTiming("Total time", 
FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds));
         } catch (Throwable t) {
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
index 1cf50aa69d..8875acfd1e 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
@@ -20,7 +20,6 @@
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
 
 import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoDatabase;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.guava.common.collect.Iterables;
 import org.apache.jackrabbit.oak.commons.Compression;
@@ -107,7 +106,6 @@ public class FlatFileNodeStoreBuilder {
     private RevisionVector rootRevision = null;
     private DocumentNodeStore nodeStore = null;
     private MongoDocumentStore mongoDocumentStore = null;
-    private MongoDatabase mongoDatabase = null;
     private Set<IndexDefinition> indexDefinitions = null;
     private String checkpoint;
     private long minModified;
@@ -209,11 +207,6 @@ public class FlatFileNodeStoreBuilder {
         return this;
     }
 
-    public FlatFileNodeStoreBuilder withMongoDatabase(MongoDatabase 
mongoDatabase) {
-        this.mongoDatabase = mongoDatabase;
-        return this;
-    }
-
     public FlatFileNodeStoreBuilder withStatisticsProvider(StatisticsProvider 
statisticsProvider) {
         this.statisticsProvider = statisticsProvider;
         return this;
@@ -230,7 +223,6 @@ public class FlatFileNodeStoreBuilder {
      * enable the support here.
      *
      * @param aotSupportEnabled
-     * @return
      */
     public FlatFileNodeStoreBuilder withAheadOfTimeBlobDownloader(boolean 
aotSupportEnabled) {
         this.withAheadOfTimeBlobDownloading = aotSupportEnabled;
@@ -332,8 +324,6 @@ public class FlatFileNodeStoreBuilder {
      * initializes the flat file store.
      *
      * @return pair of "list of flat files" and metadata file
-     * @throws IOException
-     * @throws CompositeException
      */
     private IndexStoreFiles createdSortedStoreFiles() throws IOException, 
CompositeException {
         // Check system property defined path
@@ -409,8 +399,6 @@ public class FlatFileNodeStoreBuilder {
             case PIPELINED: {
                 log.info("Using PipelinedStrategy");
                 List<PathFilter> pathFilters = 
indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
-                List<String> indexNames = 
indexDefinitions.stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
-                indexingReporter.setIndexNames(indexNames);
                 return new PipelinedStrategy(mongoClientURI, 
mongoDocumentStore, nodeStore, rootRevision,
                         preferredPathElements, blobStore, dir, algorithm, 
pathPredicate, pathFilters, checkpoint,
                         statisticsProvider, indexingReporter);
@@ -418,8 +406,6 @@ public class FlatFileNodeStoreBuilder {
             case PIPELINED_TREE: {
                 log.info("Using PipelinedTreeStoreStrategy");
                 List<PathFilter> pathFilters = 
indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
-                List<String> indexNames = 
indexDefinitions.stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
-                indexingReporter.setIndexNames(indexNames);
                 return new PipelinedTreeStoreStrategy(mongoClientURI, 
mongoDocumentStore, nodeStore, rootRevision,
                         preferredPathElements, blobStore, dir, algorithm, 
pathPredicate, pathFilters, checkpoint,
                         minModified, statisticsProvider, indexingReporter);
diff --git 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
index e5cdf06548..0f87865c3a 100644
--- 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
+++ 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
@@ -32,6 +32,7 @@ import 
org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
 import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.run.cli.CommonOptions;
 import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
 import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
@@ -56,6 +57,7 @@ import java.lang.management.ManagementFactory;
 import java.nio.file.Path;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
 
@@ -200,6 +202,9 @@ public class ElasticIndexCommand implements Command {
         log.info("Proceeding to index {} upto checkpoint {} {}", 
indexHelper.getIndexPaths(), checkpoint,
                 indexerSupport.getCheckpointInfo());
 
+        List<String> indexNames = 
indexerSupport.getIndexDefinitions().stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
+        indexHelper.getIndexReporter().setIndexNames(indexNames);
+
         if (opts.getCommonOpts().isMongo() && indexOpts.isDocTraversalMode()) {
             log.info("Using Document order traversal to perform reindexing");
             try (ElasticDocumentStoreIndexer indexer = new 
ElasticDocumentStoreIndexer(indexHelper, indexerSupport, 
indexOpts.getIndexPrefix(),
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
index 2d5a3c899f..56ac283c05 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
@@ -29,6 +29,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.index.async.AsyncIndexerLucene;
 import org.apache.jackrabbit.oak.index.indexer.document.DocumentStoreIndexer;
 import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
+import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
 import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
 import org.apache.jackrabbit.oak.run.cli.CommonOptions;
 import org.apache.jackrabbit.oak.run.cli.DocumentBuilderCustomizer;
@@ -57,6 +58,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
 import static java.util.Collections.emptyMap;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_INDEX_DATA_SIZE;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORTED_FILE_PATH;
 
 public class IndexCommand implements Command {
@@ -263,6 +265,10 @@ public class IndexCommand implements Command {
             }
         }
 
+        long totalSize = indexerSupport.computeSizeOfGeneratedIndexData();
+        
MetricsUtils.addMetricByteSize(extendedIndexHelper.getStatisticsProvider(), 
extendedIndexHelper.getIndexReporter(),
+                METRIC_INDEXING_INDEX_DATA_SIZE, totalSize);
+
         indexerSupport.writeMetaInfo(checkpoint);
         File destDir = indexerSupport.copyIndexFilesToOutput();
         log.info("Indexing completed for indexes {} in {} ({} ms) and index 
files are copied to {}",
diff --git 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
index 317c5bca73..cafb98502a 100644
--- 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
+++ 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
@@ -472,7 +472,6 @@ public class DocumentStoreIndexerIT extends 
LuceneAbstractIndexCommandTest {
 
             SortedMap<String, Counter> counters = 
metricsStatisticsProvider.getRegistry().getCounters();
             assertMetric(counters, 
DocumentStoreIndexerBase.METRIC_INDEXING_DURATION_SECONDS);
-            assertMetric(counters, 
DocumentStoreIndexerBase.METRIC_MERGE_NODE_STORE_DURATION_SECONDS);
             assertMetric(counters, 
DocumentStoreIndexerBase.METRIC_FULL_INDEX_CREATION_DURATION_SECONDS);
         } finally {
             executor.shutdown();


Reply via email to