This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ec7926e90 OAK-10423 - Improve logging of metrics in the indexing job
(#1093)
6ec7926e90 is described below
commit 6ec7926e90b99bdc52eb9463675c309a298ccaa8
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Sep 8 10:26:28 2023 +0200
OAK-10423 - Improve logging of metrics in the indexing job (#1093)
* Improve logging of metrics in the indexing job.
* Add integrity checks to the metrics formatter
* Clean up log messages
* Add logging of metrics to reindex, merge in node store and total job time.
* Improve logging
* Add missing license header.
Fix typo in class name.
* Simplify logic.
* Print elapsed time in hours:minutes instead of decimal minutes.
* Do not call .name() on reference that is potentially null.
---
.../oak/plugins/index/FormattingUtils.java | 35 ++++++++++++
.../oak/plugins/index/MetricsFormatter.java | 64 ++++++++++++++++++++++
.../oak/plugins/index/importer/IndexImporter.java | 27 ++++++---
.../oak/plugins/index/MetricsFormatterTest.java | 37 +++++++++++++
.../indexer/document/DocumentStoreIndexerBase.java | 27 ++++++---
.../flatfile/pipelined/PipelinedMergeSortTask.java | 31 ++++++++---
.../pipelined/PipelinedMongoDownloadTask.java | 32 +++++++----
.../flatfile/pipelined/PipelinedSortBatchTask.java | 22 ++++++--
.../flatfile/pipelined/PipelinedStrategy.java | 42 +++++++++-----
.../flatfile/pipelined/PipelinedTransformTask.java | 37 ++++++++-----
.../pipelined/TransformStageStatistics.java | 36 ++++++------
.../oak/index/ElasticDocumentStoreIndexer.java | 2 +-
.../apache/jackrabbit/oak/index/IndexCommand.java | 11 +++-
.../indexer/document/DocumentStoreIndexer.java | 2 +-
14 files changed, 316 insertions(+), 89 deletions(-)
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/FormattingUtils.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/FormattingUtils.java
new file mode 100644
index 0000000000..4c1a640b1a
--- /dev/null
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/FormattingUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.apache.jackrabbit.guava.common.base.Stopwatch;
+
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.TimeUnit;
+
+public class FormattingUtils {
+ public static String formatToSeconds(Stopwatch stopwatch) {
+ LocalTime seconds =
LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS));
+ return DateTimeFormatter.ISO_TIME.format(seconds);
+ }
+
+ public static String formatToMillis(Stopwatch stopwatch) {
+ LocalTime nanoSeconds =
LocalTime.ofNanoOfDay(stopwatch.elapsed(TimeUnit.MILLISECONDS)*1000000);
+ return DateTimeFormatter.ISO_TIME.format(nanoSeconds);
+ }
+}
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
new file mode 100644
index 0000000000..f32e5f4102
--- /dev/null
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.apache.jackrabbit.guava.common.base.Preconditions;
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+
+public class MetricsFormatter {
+ private final JsopBuilder jsopBuilder = new JsopBuilder();
+ private boolean isWritable = true;
+ public static MetricsFormatter newBuilder() {
+ return new MetricsFormatter();
+ }
+
+ private MetricsFormatter() {
+ jsopBuilder.object();
+ }
+
+ public MetricsFormatter add(String key, String value) {
+ Preconditions.checkState(isWritable, "Formatter already built, in
read-only mode");
+ jsopBuilder.key(key).value(value);
+ return this;
+ }
+
+ public MetricsFormatter add(String key, int value) {
+ Preconditions.checkState(isWritable, "Formatter already built, in
read-only mode");
+ jsopBuilder.key(key).value(value);
+ return this;
+ }
+
+ public MetricsFormatter add(String key, long value) {
+ Preconditions.checkState(isWritable, "Formatter already built, in
read-only mode");
+ jsopBuilder.key(key).value(value);
+ return this;
+ }
+
+ public MetricsFormatter add(String key, boolean value) {
+ Preconditions.checkState(isWritable, "Formatter already built, in
read-only mode");
+ jsopBuilder.key(key).value(value);
+ return this;
+ }
+
+ public String build() {
+ if (isWritable){
+ jsopBuilder.endObject();
+ isWritable = false;
+ }
+ return jsopBuilder.toString();
+ }
+}
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
index 1c8d7d368d..d06ab32481 100644
---
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
@@ -26,17 +26,21 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.collect.ArrayListMultimap;
import org.apache.jackrabbit.guava.common.collect.ListMultimap;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import
org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken;
import org.apache.jackrabbit.oak.plugins.index.upgrade.IndexDisabler;
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
@@ -60,8 +64,8 @@ public class IndexImporter {
*/
static final String ASYNC_LANE_SYNC = "sync";
/*
- * System property name for flag for preserve checkpoint. If this is set to
true, then checkpoint cleanup will be skipped.
- * Default is set to false.
+ * System property name for flag for preserve checkpoint. If this is set
to true, then checkpoint cleanup will be skipped.
+ * Default is set to false.
*/
public static final String OAK_INDEX_IMPORTER_PRESERVE_CHECKPOINT =
"oak.index.importer.preserveCheckpoint";
@@ -230,7 +234,7 @@ public class IndexImporter {
private void bringIndexUpToDate() throws CommitFailedException {
for (String laneName : asyncLaneToIndexMapping.keySet()) {
- if (ASYNC_LANE_SYNC.equals(laneName)){
+ if (ASYNC_LANE_SYNC.equals(laneName)) {
continue; //TODO Handle sync indexes
}
bringAsyncIndexUpToDate(laneName,
asyncLaneToIndexMapping.get(laneName));
@@ -247,7 +251,7 @@ public class IndexImporter {
//TODO Support case where checkpoint got lost or complete
reindexing is done
NodeState after = nodeStore.retrieve(checkpoint);
- checkNotNull(after, "No state found for checkpoint [%s] for lane
[%s]",checkpoint, laneName);
+ checkNotNull(after, "No state found for checkpoint [%s] for lane
[%s]", checkpoint, laneName);
LOG.info("Proceeding to update imported indexes {} to checkpoint
[{}] for lane [{}]",
indexInfos, checkpoint, laneName);
@@ -399,12 +403,11 @@ public class IndexImporter {
*
* @param indexPath path of index. Mostly used in reporting exception
* @param indexState nodeState for index at given path
- *
* @return async lane name or null which would be the case for sync indexes
*/
static String getAsyncLaneName(String indexPath, NodeState indexState) {
PropertyState asyncPrevious =
indexState.getProperty(AsyncLaneSwitcher.ASYNC_PREVIOUS);
- if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)){
+ if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious))
{
return IndexUtils.getAsyncLaneName(indexState, indexPath,
asyncPrevious);
}
return IndexUtils.getAsyncLaneName(indexState, indexPath);
@@ -426,7 +429,7 @@ public class IndexImporter {
private void incrementReIndexCount(NodeBuilder definition) {
long count = 0;
- if(definition.hasProperty(REINDEX_COUNT)){
+ if (definition.hasProperty(REINDEX_COUNT)) {
count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
}
definition.setProperty(REINDEX_COUNT, count + 1);
@@ -463,10 +466,18 @@ public class IndexImporter {
void runWithRetry(int maxRetries, IndexImportState indexImportState,
IndexImporterStepExecutor step) throws CommitFailedException, IOException {
int count = 1;
+ Stopwatch start = Stopwatch.createStarted();
while (count <= maxRetries) {
- LOG.info("IndexImporterStepExecutor:{} ,count:{}",
indexImportState, count);
+ LOG.info("IndexImporterStepExecutor:{}, count:{}",
indexImportState, count);
+ LOG.info("[TASK:{}:START]", indexImportState);
try {
step.execute();
+ LOG.info("[TASK:{}:END] Metrics: {}", indexImportState,
+ MetricsFormatter.newBuilder()
+ .add("duration",
FormattingUtils.formatToSeconds(start))
+ .add("durationSeconds",
start.elapsed(TimeUnit.SECONDS))
+ .build()
+ );
break;
} catch (CommitFailedException | IOException e) {
LOG.warn("IndexImporterStepExecutor:{} fail count: {}, retries
left: {}", indexImportState, count, maxRetries - count, e);
diff --git
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatterTest.java
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatterTest.java
new file mode 100644
index 0000000000..2ccc4ecd22
--- /dev/null
+++
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatterTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MetricsFormatterTest {
+
+ @Test
+ public void testAdd() {
+ MetricsFormatter metricsFormatter = MetricsFormatter.newBuilder();
+ String result = metricsFormatter.add("key", "value")
+ .add("key", 1)
+ .add("key", 1L)
+ .add("key", true)
+ .build();
+ assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}",
result);
+ assertThrows(IllegalStateException.class, () ->
metricsFormatter.add("key", "value"));
+ assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}",
metricsFormatter.build());
+ }
+}
\ No newline at end of file
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 7a31811de8..bb7e7977b2 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -38,8 +38,10 @@ import
org.apache.jackrabbit.oak.plugins.document.mongo.DocumentStoreSplitter;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
import
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
@@ -68,6 +70,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
@@ -138,7 +141,7 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
}
private List<FlatFileStore> buildFlatFileStoreList(NodeState
checkpointedState, CompositeIndexer indexer, Predicate<String> pathPredicate,
Set<String> preferredPathElements,
- boolean splitFlatFile, Set<IndexDefinition> indexDefinitions)
throws IOException {
+ boolean splitFlatFile,
Set<IndexDefinition> indexDefinitions) throws IOException {
List<FlatFileStore> storeList = new ArrayList<>();
Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
@@ -205,7 +208,6 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
}
/**
- *
* @return an Instance of FlatFileStore, whose getFlatFileStorePath()
method can be used to get the absolute path to this store.
* @throws IOException
* @throws CommitFailedException
@@ -220,7 +222,7 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
Predicate<String> predicate = s ->
indexDefinitions.stream().anyMatch(indexDef ->
indexDef.getPathFilter().filter(s) != PathFilter.Result.EXCLUDE);
FlatFileStore flatFileStore =
buildFlatFileStoreList(checkpointedState, null, predicate,
- preferredPathElements,
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
+ preferredPathElements,
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
log.info("FlatFileStore built at {}. To use this flatFileStore in a
reindex step, set System Property-{} with value {}",
flatFileStore.getFlatFileStorePath(),
OAK_INDEXER_SORTED_FILE_PATH, flatFileStore.getFlatFileStorePath());
return flatFileStore;
@@ -244,14 +246,15 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
closer.register(indexer);
List<FlatFileStore> flatFileStores =
buildFlatFileStoreList(checkpointedState, indexer,
- indexer::shouldInclude, null,
IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());
+ indexer::shouldInclude, null,
IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());
progressReporter.reset();
progressReporter.reindexingTraversalStart("/");
- preIndexOpertaions(indexer.getIndexers());
+ preIndexOperations(indexer.getIndexers());
+ log.info("[TASK:INDEXING:START] Starting indexing");
Stopwatch indexerWatch = Stopwatch.createStarted();
if (flatFileStores.size() > 1) {
@@ -267,14 +270,24 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
progressReporter.reindexingTraversalEnd();
progressReporter.logReport();
log.info("Completed the indexing in {}", indexerWatch);
+ log.info("[TASK:INDEXING:END] Metrics: {}",
MetricsFormatter.newBuilder()
+ .add("duration", FormattingUtils.formatToSeconds(indexerWatch))
+ .add("durationSeconds", indexerWatch.elapsed(TimeUnit.SECONDS))
+ .build());
+ log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store");
+ Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}",
MetricsFormatter.newBuilder()
+ .add("duration",
FormattingUtils.formatToSeconds(mergeNodeStoreWatch))
+ .add("durationSeconds",
mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS))
+ .build());
indexerSupport.postIndexWork(copyOnWriteStore);
}
private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer
indexer, IndexingProgressReporter progressReporter)
- throws IOException {
+ throws IOException {
ExecutorService service =
Executors.newFixedThreadPool(IndexerConfiguration.indexThreadPoolSize());
List<Future> futureList = new ArrayList<>();
@@ -394,7 +407,7 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
protected abstract List<NodeStateIndexerProvider> createProviders() throws
IOException;
- protected abstract void preIndexOpertaions(List<NodeStateIndexer>
indexers);
+ protected abstract void preIndexOperations(List<NodeStateIndexer>
indexers);
//TODO OAK-7098 - Taken from IndexUpdate. Refactor to abstract out common
logic like this
private void removeIndexState(NodeBuilder definition) {
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
index 8f248ac209..a857a4422c 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
@@ -21,6 +21,8 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,8 +32,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
@@ -70,6 +74,8 @@ class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result>
private static final Logger LOG =
LoggerFactory.getLogger(PipelinedMergeSortTask.class);
+ private static final String THREAD_NAME = "mongo-merge-sort-files";
+
private final File storeDir;
private final Comparator<NodeStateHolder> comparator;
private final Compression algorithm;
@@ -89,17 +95,28 @@ class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result>
@Override
public Result call() throws Exception {
String originalName = Thread.currentThread().getName();
- Thread.currentThread().setName("mongo-merge-sort-files");
+ Thread.currentThread().setName(THREAD_NAME);
try {
- LOG.info("Starting merge sort thread");
+ LOG.info("[TASK:{}:START] Starting merge sort task",
THREAD_NAME.toUpperCase(Locale.ROOT));
while (true) {
LOG.info("Waiting for next intermediate sorted file");
File sortedIntermediateFile = sortedFilesQueue.take();
if (sortedIntermediateFile == SENTINEL_SORTED_FILES_QUEUE) {
- LOG.info("Going to sort {} files, total size {}",
sortedFiles.size(), humanReadableByteCountBin(sizeOf(sortedFiles)));
+ long sortedFilesSizeBytes = sizeOf(sortedFiles);
+ LOG.info("Going to sort {} files, total size {}",
sortedFiles.size(), humanReadableByteCountBin(sortedFilesSizeBytes));
+ Stopwatch w = Stopwatch.createStarted();
File flatFileStore = sortStoreFile(sortedFiles);
- LOG.info("Terminating sort task. Merged {} files to create
the FFS: {} of size {}",
- sortedFiles.size(),
flatFileStore.getAbsolutePath(),
humanReadableByteCountBin(flatFileStore.length()));
+ LOG.info("Final merge completed in {}. Created file: {}",
FormattingUtils.formatToSeconds(w), flatFileStore.getAbsolutePath());
+ long ffsSizeBytes = flatFileStore.length();
+ String metrics = MetricsFormatter.newBuilder()
+ .add("duration",
FormattingUtils.formatToSeconds(w))
+ .add("durationSeconds",
w.elapsed(TimeUnit.SECONDS))
+ .add("filesMerged", sortedFiles.size())
+ .add("ffsSizeBytes", ffsSizeBytes)
+ .add("ffsSize",
humanReadableByteCountBin(ffsSizeBytes))
+ .build();
+
+ LOG.info("[TASK:{}:END] Metrics: {}",
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
return new Result(flatFileStore, sortedFiles.size());
}
sortedFiles.add(sortedIntermediateFile);
@@ -111,7 +128,7 @@ class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result>
LOG.warn("Thread interrupted", t);
throw t;
} catch (Throwable t) {
- LOG.warn("Thread terminating with exception.", t);
+ LOG.warn("Thread terminating with exception", t);
throw t;
} finally {
Thread.currentThread().setName(originalName);
@@ -119,7 +136,6 @@ class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result>
}
private File sortStoreFile(List<File> sortedFilesBatch) throws IOException
{
- Stopwatch w = Stopwatch.createStarted();
File sortedFile = new File(storeDir,
getSortedStoreFileName(algorithm));
try (BufferedWriter writer = createWriter(sortedFile, algorithm)) {
Function<String, NodeStateHolder> stringToType = (line) -> line ==
null ? null : new NodeStateHolder(line);
@@ -134,7 +150,6 @@ class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result>
stringToType
);
}
- LOG.info("Merging of sorted files completed in {}", w);
return sortedFile;
}
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index bb53f40a35..3ddcaf9649 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -31,6 +31,8 @@ import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
@@ -43,6 +45,7 @@ import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -102,6 +105,8 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES =
10;
private final static BsonDocument NATURAL_HINT = BsonDocument.parse("{
$natural:1}");
+ private static final String THREAD_NAME = "mongo-dump";
+
private final int batchSize;
private final BlockingQueue<BasicDBObject[]> mongoDocQueue;
private final List<PathFilter> pathFilters;
@@ -150,11 +155,9 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
@Override
public Result call() throws Exception {
String originalName = Thread.currentThread().getName();
- Thread.currentThread().setName("mongo-dump");
+ Thread.currentThread().setName(THREAD_NAME);
+ LOG.info("[TASK:{}:START] Starting to download from MongoDB",
THREAD_NAME.toUpperCase(Locale.ROOT));
try {
- LOG.info("Starting to download from MongoDB.");
- //TODO This may lead to reads being routed to secondary depending
on MongoURI
- //So caller must ensure that its safe to read from secondary
this.nextLastModified = 0;
this.lastIdDownloaded = null;
@@ -164,10 +167,16 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
} else {
downloadWithNaturalOrdering();
}
- LOG.info("Terminating task. Downloaded {} Mongo documents in {}.
Total enqueuing delay: {} ms ({}%)",
- documentsRead, downloadStartWatch,
- totalEnqueueWaitTimeMillis,
- String.format("%1.2f", (100.0 *
totalEnqueueWaitTimeMillis) /
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS)));
+ String enqueueingDelayPercentage = String.format("%1.2f", (100.0 *
totalEnqueueWaitTimeMillis) /
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS));
+ String metrics = MetricsFormatter.newBuilder()
+ .add("duration",
FormattingUtils.formatToSeconds(downloadStartWatch))
+ .add("durationSeconds",
downloadStartWatch.elapsed(TimeUnit.SECONDS))
+ .add("documentsDownloaded", documentsRead)
+ .add("enqueueingDelayMs", totalEnqueueWaitTimeMillis)
+ .add("enqueueingDelayPercentage",
enqueueingDelayPercentage)
+ .build();
+
+ LOG.info("[TASK:{}:END] Metrics: {}",
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
return new Result(documentsRead);
} catch (InterruptedException t) {
LOG.warn("Thread interrupted", t);
@@ -185,12 +194,11 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
double rate = ((double) this.documentsRead) /
downloadStartWatch.elapsed(TimeUnit.SECONDS);
String formattedRate = String.format("%1.2f nodes/s, %1.2f
nodes/hr", rate, rate * 3600);
LOG.info("Dumping from NSET Traversed #{} {} [{}] (Elapsed {})",
- this.documentsRead, id, formattedRate, downloadStartWatch);
+ this.documentsRead, id, formattedRate,
FormattingUtils.formatToSeconds(downloadStartWatch));
}
traversalLog.trace(id);
}
-
private void downloadWithRetryOnConnectionErrors() throws
InterruptedException, TimeoutException {
// If regex filtering is enabled, start by downloading the ancestors
of the path used for filtering.
// That is, download "/", "/content", "/content/dam" for a base path
of "/content/dam". These nodes will not be
@@ -425,11 +433,11 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
}
private void tryEnqueue(BasicDBObject[] block) throws TimeoutException,
InterruptedException {
- Stopwatch stopwatch = Stopwatch.createStarted();
+ Stopwatch enqueueDelayStopwatch = Stopwatch.createStarted();
if (!mongoDocQueue.offer(block, MONGO_QUEUE_OFFER_TIMEOUT.toMillis(),
TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timeout trying to enqueue batch of
MongoDB documents. Waited " + MONGO_QUEUE_OFFER_TIMEOUT);
}
- long enqueueDelay = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ long enqueueDelay =
enqueueDelayStopwatch.elapsed(TimeUnit.MILLISECONDS);
totalEnqueueWaitTimeMillis += enqueueDelay;
if (enqueueDelay > 1) {
logWithRateLimit(() ->
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
index b543b261bc..22a7552445 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
@@ -21,6 +21,7 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +31,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -55,6 +57,8 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
private static final Logger LOG =
LoggerFactory.getLogger(PipelinedSortBatchTask.class);
+ private static final String THREAD_NAME = "mongo-sort-batch";
+
private final Comparator<SortKey> pathComparator;
private final Compression algorithm;
private final BlockingQueue<NodeStateEntryBatch> emptyBuffersQueue;
@@ -63,6 +67,7 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
private final File sortWorkDir;
private final byte[] copyBuffer = new byte[4096];
private long entriesProcessed = 0;
+ private long batchesProcessed = 0;
public PipelinedSortBatchTask(File storeDir,
PathElementComparator pathComparator,
@@ -81,14 +86,18 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
@Override
public Result call() throws Exception {
String originalName = Thread.currentThread().getName();
- Thread.currentThread().setName("mongo-sort-batch");
+ Thread.currentThread().setName(THREAD_NAME);
try {
- LOG.info("Starting sort-and-save task");
+ LOG.info("[TASK:{}:START] Starting sort-and-save task",
THREAD_NAME.toUpperCase(Locale.ROOT));
while (true) {
LOG.info("Waiting for next batch");
NodeStateEntryBatch nseBuffer = nonEmptyBuffersQueue.take();
if (nseBuffer == SENTINEL_NSE_BUFFER) {
- LOG.info("Terminating thread, processed {} entries",
entriesProcessed);
+ String metrics = MetricsFormatter.newBuilder()
+ .add("batchesProcessed", batchesProcessed)
+ .add("entriesProcessed", entriesProcessed)
+ .build();
+ LOG.info("[TASK:{}:END] Metrics: {}",
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
return new Result(entriesProcessed);
}
sortAndSaveBatch(nseBuffer);
@@ -99,7 +108,7 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
LOG.warn("Thread interrupted", t);
throw t;
} catch (Throwable t) {
- LOG.warn("Thread terminating with exception.", t);
+ LOG.warn("Thread terminating with exception", t);
throw t;
} finally {
Thread.currentThread().setName(originalName);
@@ -111,15 +120,16 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
ByteBuffer buffer = nseb.getBuffer();
LOG.info("Going to sort batch in memory. Entries: {}, Size: {}",
sortBuffer.size(),
humanReadableByteCountBin(buffer.remaining()));
- if (sortBuffer.size() == 0) {
+ if (sortBuffer.isEmpty()) {
return;
}
Stopwatch sortClock = Stopwatch.createStarted();
sortBuffer.sort(pathComparator);
- LOG.info("Sorted batch in {}. Saving to disk.", sortClock);
+ LOG.info("Sorted batch in {}. Saving to disk", sortClock);
Stopwatch saveClock = Stopwatch.createStarted();
File newtmpfile = File.createTempFile("sortInBatch", "flatfile",
sortWorkDir);
long textSize = 0;
+ batchesProcessed++;
try (BufferedOutputStream writer = createOutputStream(newtmpfile,
algorithm)) {
for (SortKey entry : sortBuffer) {
entriesProcessed++;
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 4379010aee..64650010f6 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -25,6 +25,8 @@ import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy;
import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -169,20 +171,27 @@ public class PipelinedStrategy implements SortStrategy {
ArrayBlockingQueue<File> sortedFilesQueue,
TransformStageStatistics
transformStageStatistics,
boolean printHistogramsAtInfo) {
- // Summary stats
- LOG.info("Queue sizes: {mongoDocQueue:{}, emptyBuffersQueue:{},
nonEmptyBuffersQueue:{}, sortedFilesQueue:{}}; Transform Stats: {}",
- mongoDocQueue.size(), emptyBuffersQueue.size(),
nonEmptyBuffersQueue.size(), sortedFilesQueue.size(),
transformStageStatistics.formatStats());
+
+ String queueSizeStats = MetricsFormatter.newBuilder()
+ .add("mongoDocQueue", mongoDocQueue.size())
+ .add("emptyBuffersQueue", emptyBuffersQueue.size())
+ .add("nonEmptyBuffersQueue", nonEmptyBuffersQueue.size())
+ .add("sortedFilesQueue", sortedFilesQueue.size())
+ .build();
+
+ LOG.info("Queue sizes: {}", queueSizeStats);
+ LOG.info("Transform stats: {}",
transformStageStatistics.formatStats());
prettyPrintTransformStatisticsHistograms(transformStageStatistics,
printHistogramsAtInfo);
}
private void
prettyPrintTransformStatisticsHistograms(TransformStageStatistics
transformStageStatistics, boolean printHistogramAtInfo) {
if (printHistogramAtInfo) {
- LOG.info("Top hidden paths rejected: {}.",
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
- LOG.info("Top paths filtered: {}.",
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
+ LOG.info("Top hidden paths rejected: {}",
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
+ LOG.info("Top paths filtered: {}",
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
LOG.info("Top empty node state documents: {}",
transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
} else {
- LOG.debug("Top hidden paths rejected: {}.",
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
- LOG.debug("Top paths filtered: {}.",
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
+ LOG.debug("Top hidden paths rejected: {}",
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
+ LOG.debug("Top paths filtered: {}",
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
LOG.debug("Top empty node state documents: {}",
transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
}
}
@@ -319,6 +328,7 @@ public class PipelinedStrategy implements SortStrategy {
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(bufferSizeBytes,
maxNumberOfEntriesPerBuffer));
}
+ LOG.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
Stopwatch start = Stopwatch.createStarted();
MongoCollection<BasicDBObject> dbCollection =
MongoDocumentStoreHelper.getDBCollection(docStore, Collection.NODES);
PipelinedMongoDownloadTask downloadTask = new
PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue,
pathFilters);
@@ -353,7 +363,7 @@ public class PipelinedStrategy implements SortStrategy {
try {
- LOG.info("Waiting for tasks to complete.");
+ LOG.info("Waiting for tasks to complete");
int tasksFinished = 0;
int transformTasksFinished = 0;
while (tasksFinished < numberOfThreads) {
@@ -372,10 +382,10 @@ public class PipelinedStrategy implements SortStrategy {
PipelinedTransformTask.Result transformResult =
(PipelinedTransformTask.Result) result;
transformTasksFinished++;
entryCount += transformResult.getEntryCount();
- LOG.info("Transform thread {} finished. Entries
processed: {}",
+ LOG.info("Transform task {} finished. Entries
processed: {}",
transformResult.getThreadId(),
transformResult.getEntryCount());
if (transformTasksFinished == transformThreads) {
- LOG.info("All transform tasks finished. Node
states retrieved: {}", entryCount);
+ LOG.info("All transform tasks finished. Total
entries processed: {}", entryCount);
// No need to keep monitoring the queues, the
download and transform threads are done.
monitorFuture.cancel(false);
// Terminate the sort thread.
@@ -384,14 +394,14 @@ public class PipelinedStrategy implements SortStrategy {
} else if (result instanceof
PipelinedSortBatchTask.Result) {
PipelinedSortBatchTask.Result sortTaskResult =
(PipelinedSortBatchTask.Result) result;
- LOG.info("Sort task finished. Entries processed:
{}", sortTaskResult.getTotalEntries());
+ LOG.info("Sort batch task finished. Entries
processed: {}", sortTaskResult.getTotalEntries());
printStatistics(mongoDocQueue, emptyBatchesQueue,
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
} else if (result instanceof
PipelinedMergeSortTask.Result) {
PipelinedMergeSortTask.Result mergeSortedFilesTask
= (PipelinedMergeSortTask.Result) result;
File ffs =
mergeSortedFilesTask.getFlatFileStoreFile();
- LOG.info("Sort task finished. FFS: {}, Size: {}",
ffs, humanReadableByteCountBin(ffs.length()));
+ LOG.info("Merge-sort sort task finished. FFS: {},
Size: {}", ffs, humanReadableByteCountBin(ffs.length()));
flatFileStore =
mergeSortedFilesTask.getFlatFileStoreFile();
} else {
@@ -399,7 +409,7 @@ public class PipelinedStrategy implements SortStrategy {
}
tasksFinished++;
} catch (ExecutionException ex) {
- LOG.warn("Execution error dumping from MongoDB: " + ex
+ ". Shutting down all threads.");
+ LOG.warn("Execution error dumping from MongoDB: " + ex
+ ". Shutting down all threads");
threadPool.shutdownNow();
boolean terminated = threadPool.awaitTermination(5,
TimeUnit.SECONDS);
if (!terminated) {
@@ -416,7 +426,11 @@ public class PipelinedStrategy implements SortStrategy {
throw new RuntimeException(ex);
}
}
- LOG.info("Dumped {} nodestates in json format in {}",
entryCount, start);
+ LOG.info("[TASK:PIPELINED-DUMP:END] Metrics: {}",
MetricsFormatter.newBuilder()
+ .add("duration",
FormattingUtils.formatToSeconds(start))
+ .add("durationSeconds",
start.elapsed(TimeUnit.SECONDS))
+ .add("entryCount", entryCount)
+ .build());
printStatistics(mongoDocQueue, emptyBatchesQueue,
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
} catch (InterruptedException e) {
throw new RuntimeException(e);
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
index 9f6e012b53..0781f41f71 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
@@ -21,6 +21,8 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import com.mongodb.BasicDBObject;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -41,6 +43,7 @@ import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -71,9 +74,9 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
return transformThreadId;
}
}
-
private static final Logger LOG =
LoggerFactory.getLogger(PipelinedTransformTask.class);
private static final AtomicInteger threadIdGenerator = new AtomicInteger();
+ private static final String THREAD_NAME_PREFIX = "mongo-transform-";
private final MongoDocumentStore mongoStore;
private final DocumentNodeStore documentNodeStore;
@@ -89,7 +92,7 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
private final ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue;
private final TransformStageStatistics statistics;
private final int threadId = threadIdGenerator.getAndIncrement();
- private long totalEnqueueWaitTimeMillis = 0;
+ private long totalEnqueueDelayMillis = 0;
public PipelinedTransformTask(MongoDocumentStore mongoStore,
DocumentNodeStore documentNodeStore,
@@ -116,26 +119,34 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
@Override
public Result call() throws Exception {
String originalName = Thread.currentThread().getName();
- Thread.currentThread().setName("mongo-transform-" + threadId);
+ String threadName = THREAD_NAME_PREFIX + threadId;
+ Thread.currentThread().setName(threadName);
try {
- LOG.info("Starting transform task");
+ LOG.info("[TASK:{}:START] Starting transform task",
threadName.toUpperCase(Locale.ROOT));
NodeDocumentCache nodeCache =
MongoDocumentStoreHelper.getNodeDocumentCache(mongoStore);
Stopwatch taskStartWatch = Stopwatch.createStarted();
long totalEntryCount = 0;
long mongoObjectsProcessed = 0;
- LOG.info("Waiting for an empty buffer");
+ LOG.debug("Waiting for an empty buffer");
NodeStateEntryBatch nseBatch = emptyBatchesQueue.take();
// Used to serialize a node state entry before writing it to the
buffer
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
OutputStreamWriter writer = new OutputStreamWriter(baos,
PipelinedStrategy.FLATFILESTORE_CHARSET);
- LOG.info("Obtained an empty buffer. Starting to convert Mongo
documents to node state entries");
+ LOG.debug("Obtained an empty buffer. Starting to convert Mongo
documents to node state entries");
while (true) {
BasicDBObject[] dbObjectBatch = mongoDocQueue.take();
if (dbObjectBatch == SENTINEL_MONGO_DOCUMENT) {
- LOG.info("Task terminating. Dumped {} nodestates in json
format in {}. Total enqueue delay: {} ms ({}%)",
- totalEntryCount, taskStartWatch,
totalEnqueueWaitTimeMillis,
- String.format("%1.2f", (100.0 *
totalEnqueueWaitTimeMillis) / taskStartWatch.elapsed(TimeUnit.MILLISECONDS)));
+
+ String totalEnqueueDelayPercentage =
String.format("%1.2f", (100.0 * totalEnqueueDelayMillis) /
taskStartWatch.elapsed(TimeUnit.MILLISECONDS));
+ String metrics = MetricsFormatter.newBuilder()
+ .add("duration",
FormattingUtils.formatToSeconds(taskStartWatch))
+ .add("durationSeconds",
taskStartWatch.elapsed(TimeUnit.SECONDS))
+ .add("nodeStateEntriesGenerated", totalEntryCount)
+ .add("enqueueDelayMillis", totalEnqueueDelayMillis)
+ .add("enqueueDelayPercentage",
totalEnqueueDelayPercentage)
+ .build();
+ LOG.info("[TASK:{}:END] Metrics: {}",
threadName.toUpperCase(Locale.ROOT), metrics);
//Save the last batch
nseBatch.getBuffer().flip();
tryEnqueue(nseBatch);
@@ -202,7 +213,7 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
LOG.warn("Thread interrupted", t);
throw t;
} catch (Throwable t) {
- LOG.warn("Thread terminating with exception.", t);
+ LOG.warn("Thread terminating with exception", t);
throw t;
} finally {
Thread.currentThread().setName(originalName);
@@ -210,10 +221,10 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
}
private void tryEnqueue(NodeStateEntryBatch nseBatch) throws
InterruptedException {
- Stopwatch stopwatch = Stopwatch.createStarted();
+ Stopwatch enqueueDelayStopwatch = Stopwatch.createStarted();
nonEmptyBatchesQueue.put(nseBatch);
- long enqueueDelay = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- totalEnqueueWaitTimeMillis += enqueueDelay;
+ long enqueueDelay =
enqueueDelayStopwatch.elapsed(TimeUnit.MILLISECONDS);
+ totalEnqueueDelayMillis += enqueueDelay;
if (enqueueDelay > 1) {
LOG.info("Enqueuing of node state entries batch was delayed, took
{} ms. nonEmptyBatchesQueue size {}. ",
enqueueDelay, nonEmptyBatchesQueue.size());
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
index a14e854c23..060c5aad8f 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import java.util.concurrent.atomic.LongAdder;
@@ -141,23 +142,24 @@ public class TransformStageStatistics {
String.format("%2.1f%%", (100.0 * documentsAcceptedTotal) /
mongoDocumentsTraversedSum);
String entriesAcceptedPercentage = totalEntries == 0 ? "N/A" :
String.format("%1.1f%%", (100.0 * entriesAcceptedSum) /
totalEntries);
- String avgEntrySize = entriesAcceptedSum == 0 ? "N/A" :
- Long.toString(extractedEntriesTotalSizeSum /
entriesAcceptedSum);
- return "{documentsTraversed:" + mongoDocumentsTraversedSum +
- ", documentsAccepted:" + documentsAcceptedTotal +
- ", documentsRejected:" + documentsRejectedTotal +
- ", documentsAcceptedPercentage:" + documentsAcceptedPercentage
+
- ", documentsRejectedSplit:" + documentsRejectedSplitSum +
- ", documentsRejectedEmptyNodeState:" +
documentsRejectedEmptyNodeStateSum +
- ", entriesTraversed:" + totalEntries +
- ", entriesAccepted:" + entriesAcceptedSum +
- ", entriesRejected:" + entriesRejectedSum +
- ", entriesAcceptedPercentage:" + entriesAcceptedPercentage +
- ", entriesRejectedHiddenPaths:" + entriesRejectedHiddenPaths +
- ", entriesRejectedPathFiltered:" + entriesRejectedPathFiltered
+
- ", extractedEntriesTotalSize:" + extractedEntriesTotalSizeSum +
- ", avgEntrySize:" + avgEntrySize +
- "}";
+ long avgEntrySize = entriesAcceptedSum == 0 ? -1 :
+ extractedEntriesTotalSizeSum / entriesAcceptedSum;
+ return MetricsFormatter.newBuilder()
+ .add("documentsTraversed", mongoDocumentsTraversedSum)
+ .add("documentsAccepted", documentsAcceptedTotal)
+ .add("documentsRejected", documentsRejectedTotal)
+ .add("documentsAcceptedPercentage",
documentsAcceptedPercentage)
+ .add("documentsRejectedSplit", documentsRejectedSplitSum)
+ .add("documentsRejectedEmptyNodeState",
documentsRejectedEmptyNodeStateSum)
+ .add("entriesTraversed", totalEntries)
+ .add("entriesAccepted", entriesAcceptedSum)
+ .add("entriesRejected", entriesRejectedSum)
+ .add("entriesAcceptedPercentage", entriesAcceptedPercentage)
+ .add("entriesRejectedHiddenPaths",
entriesRejectedHiddenPaths.sum())
+ .add("entriesRejectedPathFiltered",
entriesRejectedPathFiltered.sum())
+ .add("extractedEntriesTotalSize", extractedEntriesTotalSizeSum)
+ .add("avgEntrySize", avgEntrySize)
+ .build();
}
private static String getPathPrefix(String path, int depth) {
diff --git
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
index b331b6b9e6..33003fe552 100644
---
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
+++
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
@@ -65,7 +65,7 @@ public class ElasticDocumentStoreIndexer extends
DocumentStoreIndexerBase {
Otherwise proper alias naming and mapping will not be applied
*/
@Override
- protected void preIndexOpertaions(List<NodeStateIndexer> indexers) {
+ protected void preIndexOperations(List<NodeStateIndexer> indexers) {
// For all the available indexers check if it's an ElasticIndexer
// and then provision the index
for (NodeStateIndexer indexer : indexers) {
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
index 55f9aa08ce..4d0be380d3 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
@@ -31,6 +31,8 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.index.async.AsyncIndexerLucene;
import org.apache.jackrabbit.oak.index.indexer.document.DocumentStoreIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStore;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
import org.apache.jackrabbit.oak.run.cli.CommonOptions;
import org.apache.jackrabbit.oak.run.cli.DocumentBuilderCustomizer;
@@ -235,7 +237,8 @@ public class IndexCommand implements Command {
private File reindex(IndexOptions idxOpts, ExtendedIndexHelper
extendedIndexHelper, String checkpoint) throws IOException,
CommitFailedException {
checkNotNull(checkpoint, "Checkpoint value is required for reindexing
done in read only mode");
- Stopwatch w = Stopwatch.createStarted();
+ log.info("[TASK:REINDEX:START] Starting reindexing");
+ Stopwatch reindexWatch = Stopwatch.createStarted();
IndexerSupport indexerSupport =
createIndexerSupport(extendedIndexHelper, checkpoint);
log.info("Proceeding to index {} upto checkpoint {} {}",
extendedIndexHelper.getIndexPaths(), checkpoint,
indexerSupport.getCheckpointInfo());
@@ -259,7 +262,11 @@ public class IndexCommand implements Command {
indexerSupport.writeMetaInfo(checkpoint);
File destDir = indexerSupport.copyIndexFilesToOutput();
log.info("Indexing completed for indexes {} in {} ({} ms) and index
files are copied to {}",
- extendedIndexHelper.getIndexPaths(), w,
w.elapsed(TimeUnit.MILLISECONDS), IndexCommand.getPath(destDir));
+ extendedIndexHelper.getIndexPaths(), reindexWatch,
reindexWatch.elapsed(TimeUnit.MILLISECONDS), IndexCommand.getPath(destDir));
+ log.info("[TASK:REINDEX:END] Metrics: {}",
MetricsFormatter.newBuilder()
+ .add("duration", FormattingUtils.formatToSeconds(reindexWatch))
+ .add("durationSeconds", reindexWatch.elapsed(TimeUnit.SECONDS))
+ .build());
return destDir;
}
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
index 7809fe5de8..fc3596daab 100644
---
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
+++
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
@@ -50,7 +50,7 @@ public class DocumentStoreIndexer extends
DocumentStoreIndexerBase implements Cl
}
@Override
- protected void preIndexOpertaions(List<NodeStateIndexer> indexers) {
+ protected void preIndexOperations(List<NodeStateIndexer> indexers) {
// NOOP
}
}