This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new b74629c48a OAK-11290 - Improve logging of publish indexing job (#1887)
b74629c48a is described below
commit b74629c48ab49da61e504b00d5b213862f753168
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Dec 6 18:29:07 2024 +0100
OAK-11290 - Improve logging of publish indexing job (#1887)
---
.../oak/plugins/index/importer/IndexImporter.java | 38 ++++++-------
.../jackrabbit/oak/index/IndexerMetrics.java | 29 ++++++++++
.../jackrabbit/oak/index/IndexerSupport.java | 61 +++++++++++++++++++--
.../jackrabbit/oak/index/OutOfBandIndexerBase.java | 64 +++++++++++++++++-----
.../indexer/document/DocumentStoreIndexerBase.java | 23 ++------
.../flatfile/FlatFileNodeStoreBuilder.java | 14 -----
.../jackrabbit/oak/index/ElasticIndexCommand.java | 5 ++
.../apache/jackrabbit/oak/index/IndexCommand.java | 6 ++
.../oak/index/DocumentStoreIndexerIT.java | 1 -
9 files changed, 169 insertions(+), 72 deletions(-)
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
index 02f3394c69..69c9b4424a 100644
---
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
@@ -106,16 +106,16 @@ public class IndexImporter {
AsyncIndexerLock indexerLock, StatisticsProvider
statisticsProvider, IndexingReporter indexingReporter) throws IOException {
this.statisticsProvider = statisticsProvider;
this.indexingReporter = indexingReporter;
- checkArgument(indexDir.exists() && indexDir.isDirectory(), "Path [%s]
does not point " +
- "to existing directory", indexDir.getAbsolutePath());
+ checkArgument(indexDir.exists() && indexDir.isDirectory(),
+ "Path [%s] does not point to existing directory",
indexDir.getAbsolutePath());
this.nodeStore = nodeStore;
this.indexDir = indexDir;
this.indexEditorProvider = indexEditorProvider;
- indexerInfo = IndexerInfo.fromDirectory(indexDir);
+ this.indexerInfo = IndexerInfo.fromDirectory(indexDir);
this.indexerLock = indexerLock;
- indexes = indexerInfo.getIndexes();
- indexedState =
requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint),
String.format("Cannot retrieve " +
- "checkpointed state [%s]", indexerInfo.checkpoint));
+ this.indexes = indexerInfo.getIndexes();
+ this.indexedState =
requireNonNull(nodeStore.retrieve(indexerInfo.checkpoint),
+ "Cannot retrieve checkpointed state [" +
indexerInfo.checkpoint + "]");
this.indexDefinitionUpdater = new IndexDefinitionUpdater(new
File(indexDir, INDEX_DEFINITIONS_JSON));
this.asyncLaneToIndexMapping = mapIndexesToLanes(indexes);
this.indexPathsToUpdate = new HashSet<>();
@@ -127,7 +127,7 @@ public class IndexImporter {
public void importIndex() throws IOException, CommitFailedException {
try {
- if (indexes.keySet().isEmpty()) {
+ if (indexes.isEmpty()) {
LOG.warn("No indexes to import (possibly index definitions
outside of a oak:index node?)");
}
LOG.info("Proceeding to import {} indexes from {}",
indexes.keySet(), indexDir.getAbsolutePath());
@@ -178,10 +178,9 @@ public class IndexImporter {
mergeWithConcurrentCheck(nodeStore, builder);
});
} catch (CommitFailedException commitFailedException) {
- LOG.error("Unable to revert back index lanes for: "
- + indexPathsToUpdate.stream()
- .collect(StringBuilder::new,
StringBuilder::append, (a, b) -> a.append(",").append(b)),
- commitFailedException);
+ LOG.error("Unable to revert back index lanes for: {}",
+ indexPathsToUpdate.stream()
+ .collect(StringBuilder::new,
StringBuilder::append, (a, b) -> a.append(",").append(b)),
commitFailedException);
throw e;
}
}
@@ -265,12 +264,12 @@ public class IndexImporter {
boolean success = false;
try {
String checkpoint = getAsync().getString(laneName);
- requireNonNull(checkpoint, String.format("No current checkpoint
found for lane [%s]", laneName));
+ requireNonNull(checkpoint, "No current checkpoint found for lane
[" + laneName + "]");
//TODO Support case where checkpoint got lost or complete
reindexing is done
NodeState after = nodeStore.retrieve(checkpoint);
- requireNonNull(after, String.format("No state found for checkpoint
[%s] for lane [%s]", checkpoint, laneName));
+ requireNonNull(after, "No state found for checkpoint [" +
checkpoint + "] for lane [" + laneName + "]");
LOG.info("Proceeding to update imported indexes {} to checkpoint
[{}] for lane [{}]",
indexInfos, checkpoint, laneName);
@@ -297,8 +296,8 @@ public class IndexImporter {
updateIndexImporterState(builder,
IndexImportState.IMPORT_INDEX_DATA, IndexImportState.BRING_INDEX_UPTODATE,
false);
mergeWithConcurrentCheck(nodeStore, builder);
success = true;
- LOG.info("Imported index is updated to repository state at
checkpoint [{}] for " +
- "indexing lane [{}]", checkpoint, laneName);
+ LOG.info("Imported index is updated to repository state at
checkpoint [{}] for indexing lane [{}]",
+ checkpoint, laneName);
} catch (CommitFailedException e) {
LOG.error("Failed while performing bringIndexUpToDate and updating
indexImportState from [{}] to [{}]",
IndexImportState.IMPORT_INDEX_DATA,
IndexImportState.BRING_INDEX_UPTODATE);
@@ -375,7 +374,7 @@ public class IndexImporter {
private IndexImporterProvider getImporter(String type) {
IndexImporterProvider provider = importers.get(type);
- return requireNonNull(provider, String.format("No
IndexImporterProvider found for type [%s]", type));
+ return requireNonNull(provider, "No IndexImporterProvider found for
type [" + type + "]");
}
private ListMultimap<String, IndexInfo> mapIndexesToLanes(Map<String,
File> indexes) {
@@ -391,7 +390,7 @@ public class IndexImporter {
boolean newIndex = !NodeStateUtils.getNode(rootState,
indexPath).exists();
String type =
indexState.getString(IndexConstants.TYPE_PROPERTY_NAME);
- requireNonNull(type, String.format("No 'type' property found for
index at path [%s]", indexPath));
+ requireNonNull(type, "No 'type' property found for index at path
[" + indexPath + "]");
String asyncName = getAsyncLaneName(indexPath, indexState);
if (asyncName == null) {
@@ -448,8 +447,9 @@ public class IndexImporter {
private void incrementReIndexCount(NodeBuilder definition) {
long count = 0;
- if (definition.hasProperty(REINDEX_COUNT)) {
- count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
+ PropertyState reindexCountProp = definition.getProperty(REINDEX_COUNT);
+ if (reindexCountProp != null) {
+ count = reindexCountProp.getValue(Type.LONG);
}
definition.setProperty(REINDEX_COUNT, count + 1);
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerMetrics.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerMetrics.java
new file mode 100644
index 0000000000..7f0bac00e0
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerMetrics.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index;
+
+public interface IndexerMetrics {
+ String INDEXER_METRICS_PREFIX = "oak_indexer_";
+ String METRIC_INDEXING_INDEX_DATA_SIZE = INDEXER_METRICS_PREFIX +
"index_data_size";
+
+ String INDEXER_PUBLISH_METRICS_PREFIX = "oak_indexer_publish_";
+ String METRIC_INDEXING_PUBLISH_DURATION_SECONDS =
INDEXER_PUBLISH_METRICS_PREFIX + "indexing_duration_seconds";
+ String METRIC_INDEXING_PUBLISH_NODES_TRAVERSED =
INDEXER_PUBLISH_METRICS_PREFIX + "nodes_traversed";
+ String METRIC_INDEXING_PUBLISH_NODES_INDEXED =
INDEXER_PUBLISH_METRICS_PREFIX + "nodes_indexed";
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
index c9ffee0677..e6fec0a9ac 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
@@ -21,6 +21,7 @@ package org.apache.jackrabbit.oak.index;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@@ -29,6 +30,7 @@ import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.felix.inventory.Format;
+
import java.util.function.Predicate;
import java.util.regex.Pattern;
@@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
public class IndexerSupport {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* Directory name in output directory under which indexes are
* stored
@@ -131,7 +133,7 @@ public class IndexerSupport {
NodeState checkpointedState;
if (HEAD_AS_CHECKPOINT.equals(checkpoint)) {
checkpointedState = indexHelper.getNodeStore().getRoot();
- log.warn("Using head state for indexing. Such an index cannot be
imported back");
+ LOG.warn("Using head state for indexing. Such an index cannot be
imported back");
} else {
checkpointedState =
indexHelper.getNodeStore().retrieve(checkpoint);
requireNonNull(checkpointedState, String.format("Not able to
retrieve revision referred via checkpoint [%s]", checkpoint));
@@ -169,7 +171,7 @@ public class IndexerSupport {
}
copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- log.info("Switched the async lane for indexes at {} to {} and marked
them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
+ LOG.info("Switched the async lane for indexes at {} to {} and marked
them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
}
public void postIndexWork(NodeStore copyOnWriteStore) throws
CommitFailedException, IOException {
@@ -187,7 +189,7 @@ public class IndexerSupport {
}
copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- log.info("Switched the async lane for indexes at {} back to there
original lanes", indexHelper.getIndexPaths());
+ LOG.info("Switched the async lane for indexes at {} back to there
original lanes", indexHelper.getIndexPaths());
}
public Map<String, String> getCheckpointInfo() {
@@ -236,7 +238,7 @@ public class IndexerSupport {
}
/**
- * @param indexDefinitions set of IndexDefinition to be used to calculate
the Path Predicate
+ * @param indexDefinitions set of IndexDefinition to be used to
calculate the Path Predicate
* @param typeToRepositoryPath Function to convert type <T> to valid
repository path of type <String>
* @param <T>
* @return filter predicate based on the include/exclude path rules of the
given set of index definitions.
@@ -246,7 +248,7 @@ public class IndexerSupport {
}
/**
- * @param pattern Pattern for a custom excludes regex based on which paths
would be filtered out
+ * @param pattern Pattern for a custom excludes regex based
on which paths would be filtered out
* @param typeToRepositoryPath Function to convert type <T> to valid
repository path of type <String>
* @param <T>
* @return Return a predicate that should test true for all paths that do
not match the provided regex pattern.
@@ -254,4 +256,51 @@ public class IndexerSupport {
public <T> Predicate<T> getFilterPredicateBasedOnCustomRegex(Pattern
pattern, Function<T, String> typeToRepositoryPath) {
return t -> !pattern.matcher(typeToRepositoryPath.apply(t)).find();
}
+
+ /**
+ * Computes the total size of the generated index data. This method is
intended to be used when creating Lucene
+ * indexes, which are created locally. With Elastic, this will not include
the Lucene files since the indexes
+ * are updated remotely.
+ *
+ * @return The total size of the index data generated or -1 if there is
some error while computing the size.
+ */
+ public long computeSizeOfGeneratedIndexData() {
+ try {
+ File localIndexDir = getLocalIndexDir();
+ long totalSize = 0;
+ if (localIndexDir == null || !localIndexDir.isDirectory()) {
+ LOG.warn("Local index directory is invalid, this should not
happen: {}", localIndexDir);
+ return -1;
+ } else {
+ // Each index is stored in a separate directory
+ File[] directories =
localIndexDir.listFiles(File::isDirectory);
+ if (directories == null) {
+ LOG.warn("Error listing sub directories in the local index
directory: {}", localIndexDir);
+ return -1;
+ }
+ // Print the indexes in alphabetic order
+ Arrays.sort(directories);
+ StringBuilder sb = new StringBuilder();
+ for (File indexDir : directories) {
+ long size = FileUtils.sizeOfDirectory(indexDir);
+ totalSize += size;
+ File[] files = indexDir.listFiles(File::isFile);
+ if (files == null) {
+ LOG.warn("Error listing files in directory: {}",
indexDir);
+ // continue to the next index
+ } else {
+ long numberOfFiles = files.length;
+ sb.append("\n - " + indexDir.getName() + ": " +
numberOfFiles + " files, " +
+ size + " (" +
FileUtils.byteCountToDisplaySize(size) + ")");
+ }
+ }
+ LOG.info("Total size of index data generated: {} ({}){}",
+ totalSize,
FileUtils.byteCountToDisplaySize(totalSize), sb);
+ return totalSize;
+ }
+ } catch (Throwable t) {
+ LOG.warn("Error while computing size of generated index data.", t);
+ return -1;
+ }
+ }
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
index 31bcb56779..2a9fb02b72 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexerBase.java
@@ -19,11 +19,13 @@
package org.apache.jackrabbit.oak.index;
import com.codahale.metrics.MetricRegistry;
+import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.io.Closer;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.index.*;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
import
org.apache.jackrabbit.oak.plugins.index.progress.NodeCounterMBeanEstimator;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -38,15 +40,27 @@ import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_INDEX_DATA_SIZE;
+import static
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_DURATION_SECONDS;
+import static
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_INDEXED;
+import static
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_PUBLISH_NODES_TRAVERSED;
+import static
org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_LOGGER;
-public abstract class OutOfBandIndexerBase implements Closeable,
IndexUpdateCallback, NodeTraversalCallback{
+public abstract class OutOfBandIndexerBase implements Closeable,
IndexUpdateCallback, NodeTraversalCallback {
protected final Closer closer = Closer.create();
private final IndexHelper indexHelper;
+ private final IndexerSupport indexerSupport;
+ private final IndexingReporter indexingReporter;
+ private final StatisticsProvider statisticsProvider;
private NodeStore copyOnWriteStore;
- private IndexerSupport indexerSupport;
+ private long nodesTraversed = 0;
+ private long nodesIndexed = 0;
/**
* Index lane name which is used for indexing
@@ -64,18 +78,38 @@ public abstract class OutOfBandIndexerBase implements
Closeable, IndexUpdateCall
public OutOfBandIndexerBase(IndexHelper indexHelper, IndexerSupport
indexerSupport) {
this.indexHelper = requireNonNull(indexHelper);
this.indexerSupport = requireNonNull(indexerSupport);
+ this.indexingReporter = indexHelper.getIndexReporter();
+ this.statisticsProvider = indexHelper.getStatisticsProvider();
}
public void reindex() throws CommitFailedException, IOException {
- NodeState checkpointedState =
indexerSupport.retrieveNodeStateForCheckpoint();
-
- copyOnWriteStore = new MemoryNodeStore(checkpointedState);
- NodeState baseState = copyOnWriteStore.getRoot();
- //TODO Check for indexPaths being empty
-
- indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
- preformIndexUpdate(baseState);
- indexerSupport.postIndexWork(copyOnWriteStore);
+ List<String> indexNames =
indexerSupport.getIndexDefinitions().stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
+ indexingReporter.setIndexNames(indexNames);
+ INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:START] Starting
indexing job");
+ Stopwatch indexJobWatch = Stopwatch.createStarted();
+ try {
+ NodeState checkpointedState =
indexerSupport.retrieveNodeStateForCheckpoint();
+
+ copyOnWriteStore = new MemoryNodeStore(checkpointedState);
+ NodeState baseState = copyOnWriteStore.getRoot();
+
+ indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
+ preformIndexUpdate(baseState);
+ indexerSupport.postIndexWork(copyOnWriteStore);
+
+ long indexingDurationSeconds =
indexJobWatch.elapsed(TimeUnit.SECONDS);
+ long totalSize = indexerSupport.computeSizeOfGeneratedIndexData();
+ INDEXING_PHASE_LOGGER.info("[TASK:INDEXING:END] Metrics: {}",
MetricsFormatter.createMetricsWithDurationOnly(indexingDurationSeconds));
+ MetricsUtils.addMetric(statisticsProvider, indexingReporter,
METRIC_INDEXING_PUBLISH_DURATION_SECONDS, indexingDurationSeconds);
+ MetricsUtils.addMetric(statisticsProvider, indexingReporter,
METRIC_INDEXING_PUBLISH_NODES_TRAVERSED, nodesTraversed);
+ MetricsUtils.addMetric(statisticsProvider, indexingReporter,
METRIC_INDEXING_PUBLISH_NODES_INDEXED, nodesIndexed);
+ MetricsUtils.addMetricByteSize(statisticsProvider,
indexingReporter, METRIC_INDEXING_INDEX_DATA_SIZE, totalSize);
+ indexingReporter.addTiming("Build Lucene Index",
FormattingUtils.formatToSeconds(indexingDurationSeconds));
+ } catch (Throwable t) {
+ INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:FAIL]
Metrics: {}, Error: {}",
+
MetricsFormatter.createMetricsWithDurationOnly(indexJobWatch), t.toString());
+ throw t;
+ }
}
protected File getLocalIndexDir() throws IOException {
@@ -90,13 +124,13 @@ public abstract class OutOfBandIndexerBase implements
Closeable, IndexUpdateCall
//~---------------------------------------------------< callbacks >
@Override
- public void indexUpdate() throws CommitFailedException {
-
+ public void indexUpdate() {
+ nodesIndexed++;
}
@Override
- public void traversedNode(NodeTraversalCallback.PathSource pathSource)
throws CommitFailedException {
-
+ public void traversedNode(NodeTraversalCallback.PathSource pathSource) {
+ nodesTraversed++;
}
protected void preformIndexUpdate(NodeState baseState) throws IOException,
CommitFailedException {
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 1f298a806a..536da52c74 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -83,6 +83,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_INDEX_DATA_SIZE;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORTED_FILE_PATH;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CUSTOM_EXCLUDED_PATHS;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CUSTOM_EXCLUDE_ENTRIES_REGEX;
@@ -96,7 +97,6 @@ import static
org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_
public abstract class DocumentStoreIndexerBase implements Closeable {
public static final String INDEXER_METRICS_PREFIX = "oak_indexer_";
public static final String METRIC_INDEXING_DURATION_SECONDS =
INDEXER_METRICS_PREFIX + "indexing_duration_seconds";
- public static final String METRIC_MERGE_NODE_STORE_DURATION_SECONDS =
INDEXER_METRICS_PREFIX + "merge_node_store_duration_seconds";
public static final String METRIC_FULL_INDEX_CREATION_DURATION_SECONDS =
INDEXER_METRICS_PREFIX + "full_index_creation_duration_seconds";
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -198,7 +198,6 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
.withNodeStore(nodeStore)
.withMongoDocumentStore(getMongoDocumentStore())
.withMongoClientURI(getMongoClientURI())
- .withMongoDatabase(getMongoDatabase())
.withNodeStateEntryTraverserFactory(new
MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
nodeStore, getMongoDocumentStore(),
traversalLog))
.withCheckpoint(indexerSupport.getCheckpoint())
@@ -341,6 +340,8 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
public void reindex() throws CommitFailedException, IOException {
INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:START] Starting
indexing job");
+ List<String> indexNames =
indexerSupport.getIndexDefinitions().stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
+ indexingReporter.setIndexNames(indexNames);
Stopwatch indexJobWatch = Stopwatch.createStarted();
try {
IndexingProgressReporter progressReporter =
@@ -420,25 +421,13 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
MetricsFormatter.createMetricsWithDurationOnly(indexerWatch), t.toString());
throw t;
}
-
- INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:START] Starting
merge node store");
- Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
- try {
- copyOnWriteStore.merge(builder, EmptyHook.INSTANCE,
CommitInfo.EMPTY);
- long mergeNodeStoreDurationSeconds =
mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS);
- INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:END]
Metrics: {}",
MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreDurationSeconds));
- MetricsUtils.addMetric(statisticsProvider, indexingReporter,
METRIC_MERGE_NODE_STORE_DURATION_SECONDS, mergeNodeStoreDurationSeconds);
- indexingReporter.addTiming("Merge node store",
FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds));
- } catch (Throwable t) {
- INDEXING_PHASE_LOGGER.info("[TASK:MERGE_NODE_STORE:FAIL]
Metrics: {}, Error: {}",
-
MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreWatch),
t.toString());
- throw t;
- }
-
+ copyOnWriteStore.merge(builder, EmptyHook.INSTANCE,
CommitInfo.EMPTY);
indexerSupport.postIndexWork(copyOnWriteStore);
long fullIndexCreationDurationSeconds =
indexJobWatch.elapsed(TimeUnit.SECONDS);
INDEXING_PHASE_LOGGER.info("[TASK:FULL_INDEX_CREATION:END] Metrics
{}",
MetricsFormatter.createMetricsWithDurationOnly(fullIndexCreationDurationSeconds));
+ long totalSize = indexerSupport.computeSizeOfGeneratedIndexData();
+ MetricsUtils.addMetricByteSize(statisticsProvider,
indexingReporter, METRIC_INDEXING_INDEX_DATA_SIZE, totalSize);
MetricsUtils.addMetric(statisticsProvider, indexingReporter,
METRIC_FULL_INDEX_CREATION_DURATION_SECONDS, fullIndexCreationDurationSeconds);
indexingReporter.addTiming("Total time",
FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds));
} catch (Throwable t) {
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
index 1cf50aa69d..8875acfd1e 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
@@ -20,7 +20,6 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoDatabase;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.guava.common.collect.Iterables;
import org.apache.jackrabbit.oak.commons.Compression;
@@ -107,7 +106,6 @@ public class FlatFileNodeStoreBuilder {
private RevisionVector rootRevision = null;
private DocumentNodeStore nodeStore = null;
private MongoDocumentStore mongoDocumentStore = null;
- private MongoDatabase mongoDatabase = null;
private Set<IndexDefinition> indexDefinitions = null;
private String checkpoint;
private long minModified;
@@ -209,11 +207,6 @@ public class FlatFileNodeStoreBuilder {
return this;
}
- public FlatFileNodeStoreBuilder withMongoDatabase(MongoDatabase
mongoDatabase) {
- this.mongoDatabase = mongoDatabase;
- return this;
- }
-
public FlatFileNodeStoreBuilder withStatisticsProvider(StatisticsProvider
statisticsProvider) {
this.statisticsProvider = statisticsProvider;
return this;
@@ -230,7 +223,6 @@ public class FlatFileNodeStoreBuilder {
* enable the support here.
*
* @param aotSupportEnabled
- * @return
*/
public FlatFileNodeStoreBuilder withAheadOfTimeBlobDownloader(boolean
aotSupportEnabled) {
this.withAheadOfTimeBlobDownloading = aotSupportEnabled;
@@ -332,8 +324,6 @@ public class FlatFileNodeStoreBuilder {
* initializes the flat file store.
*
* @return pair of "list of flat files" and metadata file
- * @throws IOException
- * @throws CompositeException
*/
private IndexStoreFiles createdSortedStoreFiles() throws IOException,
CompositeException {
// Check system property defined path
@@ -409,8 +399,6 @@ public class FlatFileNodeStoreBuilder {
case PIPELINED: {
log.info("Using PipelinedStrategy");
List<PathFilter> pathFilters =
indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
- List<String> indexNames =
indexDefinitions.stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
- indexingReporter.setIndexNames(indexNames);
return new PipelinedStrategy(mongoClientURI,
mongoDocumentStore, nodeStore, rootRevision,
preferredPathElements, blobStore, dir, algorithm,
pathPredicate, pathFilters, checkpoint,
statisticsProvider, indexingReporter);
@@ -418,8 +406,6 @@ public class FlatFileNodeStoreBuilder {
case PIPELINED_TREE: {
log.info("Using PipelinedTreeStoreStrategy");
List<PathFilter> pathFilters =
indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
- List<String> indexNames =
indexDefinitions.stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
- indexingReporter.setIndexNames(indexNames);
return new PipelinedTreeStoreStrategy(mongoClientURI,
mongoDocumentStore, nodeStore, rootRevision,
preferredPathElements, blobStore, dir, algorithm,
pathPredicate, pathFilters, checkpoint,
minModified, statisticsProvider, indexingReporter);
diff --git
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
index e5cdf06548..0f87865c3a 100644
---
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
+++
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
@@ -32,6 +32,7 @@ import
org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.run.cli.CommonOptions;
import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
@@ -56,6 +57,7 @@ import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
@@ -200,6 +202,9 @@ public class ElasticIndexCommand implements Command {
log.info("Proceeding to index {} upto checkpoint {} {}",
indexHelper.getIndexPaths(), checkpoint,
indexerSupport.getCheckpointInfo());
+ List<String> indexNames =
indexerSupport.getIndexDefinitions().stream().map(IndexDefinition::getIndexName).collect(Collectors.toList());
+ indexHelper.getIndexReporter().setIndexNames(indexNames);
+
if (opts.getCommonOpts().isMongo() && indexOpts.isDocTraversalMode()) {
log.info("Using Document order traversal to perform reindexing");
try (ElasticDocumentStoreIndexer indexer = new
ElasticDocumentStoreIndexer(indexHelper, indexerSupport,
indexOpts.getIndexPrefix(),
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
index 2d5a3c899f..56ac283c05 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
@@ -29,6 +29,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.index.async.AsyncIndexerLucene;
import org.apache.jackrabbit.oak.index.indexer.document.DocumentStoreIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
+import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
import org.apache.jackrabbit.oak.run.cli.CommonOptions;
import org.apache.jackrabbit.oak.run.cli.DocumentBuilderCustomizer;
@@ -57,6 +58,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
+import static
org.apache.jackrabbit.oak.index.IndexerMetrics.METRIC_INDEXING_INDEX_DATA_SIZE;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORTED_FILE_PATH;
public class IndexCommand implements Command {
@@ -263,6 +265,10 @@ public class IndexCommand implements Command {
}
}
+ long totalSize = indexerSupport.computeSizeOfGeneratedIndexData();
+
MetricsUtils.addMetricByteSize(extendedIndexHelper.getStatisticsProvider(),
extendedIndexHelper.getIndexReporter(),
+ METRIC_INDEXING_INDEX_DATA_SIZE, totalSize);
+
indexerSupport.writeMetaInfo(checkpoint);
File destDir = indexerSupport.copyIndexFilesToOutput();
log.info("Indexing completed for indexes {} in {} ({} ms) and index
files are copied to {}",
diff --git
a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
index 317c5bca73..cafb98502a 100644
---
a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
+++
b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/DocumentStoreIndexerIT.java
@@ -472,7 +472,6 @@ public class DocumentStoreIndexerIT extends
LuceneAbstractIndexCommandTest {
SortedMap<String, Counter> counters =
metricsStatisticsProvider.getRegistry().getCounters();
assertMetric(counters,
DocumentStoreIndexerBase.METRIC_INDEXING_DURATION_SECONDS);
- assertMetric(counters,
DocumentStoreIndexerBase.METRIC_MERGE_NODE_STORE_DURATION_SECONDS);
assertMetric(counters,
DocumentStoreIndexerBase.METRIC_FULL_INDEX_CREATION_DURATION_SECONDS);
} finally {
executor.shutdown();