This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ec7926e90 OAK-10423 - Improve logging of metrics in the indexing job 
(#1093)
6ec7926e90 is described below

commit 6ec7926e90b99bdc52eb9463675c309a298ccaa8
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Sep 8 10:26:28 2023 +0200

    OAK-10423 - Improve logging of metrics in the indexing job (#1093)
    
    * Improve logging of metrics in the indexing job.
    
    * Add integrity checks to the metrics formatter
    
    * Clean up log messages
    
    * Add logging of metrics to reindex, merge in node store and total job time.
    
    * Improve logging
    
    * Add missing license header.
    Fix typo in class name.
    
    * Simplify logic.
    
    * Print elapsed time in hours:minutes instead of decimal minutes.
    
    * Do not call .name() on reference that is potentially null.
---
 .../oak/plugins/index/FormattingUtils.java         | 35 ++++++++++++
 .../oak/plugins/index/MetricsFormatter.java        | 64 ++++++++++++++++++++++
 .../oak/plugins/index/importer/IndexImporter.java  | 27 ++++++---
 .../oak/plugins/index/MetricsFormatterTest.java    | 37 +++++++++++++
 .../indexer/document/DocumentStoreIndexerBase.java | 27 ++++++---
 .../flatfile/pipelined/PipelinedMergeSortTask.java | 31 ++++++++---
 .../pipelined/PipelinedMongoDownloadTask.java      | 32 +++++++----
 .../flatfile/pipelined/PipelinedSortBatchTask.java | 22 ++++++--
 .../flatfile/pipelined/PipelinedStrategy.java      | 42 +++++++++-----
 .../flatfile/pipelined/PipelinedTransformTask.java | 37 ++++++++-----
 .../pipelined/TransformStageStatistics.java        | 36 ++++++------
 .../oak/index/ElasticDocumentStoreIndexer.java     |  2 +-
 .../apache/jackrabbit/oak/index/IndexCommand.java  | 11 +++-
 .../indexer/document/DocumentStoreIndexer.java     |  2 +-
 14 files changed, 316 insertions(+), 89 deletions(-)

diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/FormattingUtils.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/FormattingUtils.java
new file mode 100644
index 0000000000..4c1a640b1a
--- /dev/null
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/FormattingUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.apache.jackrabbit.guava.common.base.Stopwatch;
+
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.concurrent.TimeUnit;
+
+public class FormattingUtils {
+    public static String formatToSeconds(Stopwatch stopwatch) {
+        LocalTime seconds = 
LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS));
+        return DateTimeFormatter.ISO_TIME.format(seconds);
+    }
+
+    public static String formatToMillis(Stopwatch stopwatch) {
+        LocalTime nanoSeconds = 
LocalTime.ofNanoOfDay(stopwatch.elapsed(TimeUnit.MILLISECONDS)*1000000);
+        return DateTimeFormatter.ISO_TIME.format(nanoSeconds);
+    }
+}
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
new file mode 100644
index 0000000000..f32e5f4102
--- /dev/null
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.apache.jackrabbit.guava.common.base.Preconditions;
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+
+public class MetricsFormatter {
+    private final JsopBuilder jsopBuilder = new JsopBuilder();
+    private boolean isWritable = true;
+    public static MetricsFormatter newBuilder() {
+        return new MetricsFormatter();
+    }
+
+    private MetricsFormatter() {
+        jsopBuilder.object();
+    }
+
+    public MetricsFormatter add(String key, String value) {
+        Preconditions.checkState(isWritable, "Formatter already built, in 
read-only mode");
+        jsopBuilder.key(key).value(value);
+        return this;
+    }
+
+    public MetricsFormatter add(String key, int value) {
+        Preconditions.checkState(isWritable, "Formatter already built, in 
read-only mode");
+        jsopBuilder.key(key).value(value);
+        return this;
+    }
+
+    public MetricsFormatter add(String key, long value) {
+        Preconditions.checkState(isWritable, "Formatter already built, in 
read-only mode");
+        jsopBuilder.key(key).value(value);
+        return this;
+    }
+
+    public MetricsFormatter add(String key, boolean value) {
+        Preconditions.checkState(isWritable, "Formatter already built, in 
read-only mode");
+        jsopBuilder.key(key).value(value);
+        return this;
+    }
+
+    public String build() {
+        if (isWritable){
+            jsopBuilder.endObject();
+            isWritable = false;
+        }
+        return jsopBuilder.toString();
+    }
+}
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
index 1c8d7d368d..d06ab32481 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
@@ -26,17 +26,21 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.guava.common.collect.ArrayListMultimap;
 import org.apache.jackrabbit.guava.common.collect.ListMultimap;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdate;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import 
org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken;
 import org.apache.jackrabbit.oak.plugins.index.upgrade.IndexDisabler;
 import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
@@ -60,8 +64,8 @@ public class IndexImporter {
      */
     static final String ASYNC_LANE_SYNC = "sync";
     /*
-    * System property name for flag for preserve checkpoint. If this is set to 
true, then checkpoint cleanup will be skipped.
-    * Default is set to false.
+     * System property name for flag for preserve checkpoint. If this is set 
to true, then checkpoint cleanup will be skipped.
+     * Default is set to false.
      */
     public static final String OAK_INDEX_IMPORTER_PRESERVE_CHECKPOINT = 
"oak.index.importer.preserveCheckpoint";
 
@@ -230,7 +234,7 @@ public class IndexImporter {
 
     private void bringIndexUpToDate() throws CommitFailedException {
         for (String laneName : asyncLaneToIndexMapping.keySet()) {
-            if (ASYNC_LANE_SYNC.equals(laneName)){
+            if (ASYNC_LANE_SYNC.equals(laneName)) {
                 continue; //TODO Handle sync indexes
             }
             bringAsyncIndexUpToDate(laneName, 
asyncLaneToIndexMapping.get(laneName));
@@ -247,7 +251,7 @@ public class IndexImporter {
             //TODO Support case where checkpoint got lost or complete 
reindexing is done
 
             NodeState after = nodeStore.retrieve(checkpoint);
-            checkNotNull(after, "No state found for checkpoint [%s] for lane 
[%s]",checkpoint, laneName);
+            checkNotNull(after, "No state found for checkpoint [%s] for lane 
[%s]", checkpoint, laneName);
             LOG.info("Proceeding to update imported indexes {} to checkpoint 
[{}] for lane [{}]",
                     indexInfos, checkpoint, laneName);
 
@@ -399,12 +403,11 @@ public class IndexImporter {
      *
      * @param indexPath  path of index. Mostly used in reporting exception
      * @param indexState nodeState for index at given path
-     *
      * @return async lane name or null which would be the case for sync indexes
      */
     static String getAsyncLaneName(String indexPath, NodeState indexState) {
         PropertyState asyncPrevious = 
indexState.getProperty(AsyncLaneSwitcher.ASYNC_PREVIOUS);
-        if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)){
+        if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)) 
{
             return IndexUtils.getAsyncLaneName(indexState, indexPath, 
asyncPrevious);
         }
         return IndexUtils.getAsyncLaneName(indexState, indexPath);
@@ -426,7 +429,7 @@ public class IndexImporter {
 
     private void incrementReIndexCount(NodeBuilder definition) {
         long count = 0;
-        if(definition.hasProperty(REINDEX_COUNT)){
+        if (definition.hasProperty(REINDEX_COUNT)) {
             count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
         }
         definition.setProperty(REINDEX_COUNT, count + 1);
@@ -463,10 +466,18 @@ public class IndexImporter {
 
     void runWithRetry(int maxRetries, IndexImportState indexImportState, 
IndexImporterStepExecutor step) throws CommitFailedException, IOException {
         int count = 1;
+        Stopwatch start = Stopwatch.createStarted();
         while (count <= maxRetries) {
-            LOG.info("IndexImporterStepExecutor:{} ,count:{}", 
indexImportState, count);
+            LOG.info("IndexImporterStepExecutor:{}, count:{}", 
indexImportState, count);
+            LOG.info("[TASK:{}:START]", indexImportState);
             try {
                 step.execute();
+                LOG.info("[TASK:{}:END] Metrics: {}", indexImportState,
+                        MetricsFormatter.newBuilder()
+                                .add("duration", 
FormattingUtils.formatToSeconds(start))
+                                .add("durationSeconds", 
start.elapsed(TimeUnit.SECONDS))
+                                .build()
+                );
                 break;
             } catch (CommitFailedException | IOException e) {
                 LOG.warn("IndexImporterStepExecutor:{} fail count: {}, retries 
left: {}", indexImportState, count, maxRetries - count, e);
diff --git 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatterTest.java
 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatterTest.java
new file mode 100644
index 0000000000..2ccc4ecd22
--- /dev/null
+++ 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatterTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MetricsFormatterTest {
+
+    @Test
+    public void testAdd() {
+        MetricsFormatter metricsFormatter = MetricsFormatter.newBuilder();
+        String result = metricsFormatter.add("key", "value")
+                .add("key", 1)
+                .add("key", 1L)
+                .add("key", true)
+                .build();
+        assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", 
result);
+        assertThrows(IllegalStateException.class, () -> 
metricsFormatter.add("key", "value"));
+        assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", 
metricsFormatter.build());
+    }
+}
\ No newline at end of file
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 7a31811de8..bb7e7977b2 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -38,8 +38,10 @@ import 
org.apache.jackrabbit.oak.plugins.document.mongo.DocumentStoreSplitter;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
 import 
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
 import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
@@ -68,6 +70,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
@@ -138,7 +141,7 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
     }
 
     private List<FlatFileStore> buildFlatFileStoreList(NodeState 
checkpointedState, CompositeIndexer indexer, Predicate<String> pathPredicate, 
Set<String> preferredPathElements,
-            boolean splitFlatFile, Set<IndexDefinition> indexDefinitions) 
throws IOException {
+                                                       boolean splitFlatFile, 
Set<IndexDefinition> indexDefinitions) throws IOException {
         List<FlatFileStore> storeList = new ArrayList<>();
 
         Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
@@ -205,7 +208,6 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
     }
 
     /**
-     *
      * @return an Instance of FlatFileStore, whose getFlatFileStorePath() 
method can be used to get the absolute path to this store.
      * @throws IOException
      * @throws CommitFailedException
@@ -220,7 +222,7 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
 
         Predicate<String> predicate = s -> 
indexDefinitions.stream().anyMatch(indexDef -> 
indexDef.getPathFilter().filter(s) != PathFilter.Result.EXCLUDE);
         FlatFileStore flatFileStore = 
buildFlatFileStoreList(checkpointedState, null, predicate,
-            preferredPathElements, 
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
+                preferredPathElements, 
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
         log.info("FlatFileStore built at {}. To use this flatFileStore in a 
reindex step, set System Property-{} with value {}",
                 flatFileStore.getFlatFileStorePath(), 
OAK_INDEXER_SORTED_FILE_PATH, flatFileStore.getFlatFileStorePath());
         return flatFileStore;
@@ -244,14 +246,15 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         closer.register(indexer);
 
         List<FlatFileStore> flatFileStores = 
buildFlatFileStoreList(checkpointedState, indexer,
-            indexer::shouldInclude, null, 
IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());
+                indexer::shouldInclude, null, 
IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());
 
         progressReporter.reset();
 
         progressReporter.reindexingTraversalStart("/");
 
-        preIndexOpertaions(indexer.getIndexers());
+        preIndexOperations(indexer.getIndexers());
 
+        log.info("[TASK:INDEXING:START] Starting indexing");
         Stopwatch indexerWatch = Stopwatch.createStarted();
 
         if (flatFileStores.size() > 1) {
@@ -267,14 +270,24 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         progressReporter.reindexingTraversalEnd();
         progressReporter.logReport();
         log.info("Completed the indexing in {}", indexerWatch);
+        log.info("[TASK:INDEXING:END] Metrics: {}", 
MetricsFormatter.newBuilder()
+                .add("duration", FormattingUtils.formatToSeconds(indexerWatch))
+                .add("durationSeconds", indexerWatch.elapsed(TimeUnit.SECONDS))
+                .build());
 
+        log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store");
+        Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
         copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", 
MetricsFormatter.newBuilder()
+                .add("duration", 
FormattingUtils.formatToSeconds(mergeNodeStoreWatch))
+                .add("durationSeconds", 
mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS))
+                .build());
 
         indexerSupport.postIndexWork(copyOnWriteStore);
     }
 
     private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer 
indexer, IndexingProgressReporter progressReporter)
-        throws IOException {
+            throws IOException {
         ExecutorService service = 
Executors.newFixedThreadPool(IndexerConfiguration.indexThreadPoolSize());
         List<Future> futureList = new ArrayList<>();
 
@@ -394,7 +407,7 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
 
     protected abstract List<NodeStateIndexerProvider> createProviders() throws 
IOException;
 
-    protected abstract void preIndexOpertaions(List<NodeStateIndexer> 
indexers);
+    protected abstract void preIndexOperations(List<NodeStateIndexer> 
indexers);
 
     //TODO OAK-7098 - Taken from IndexUpdate. Refactor to abstract out common 
logic like this
     private void removeIndexState(NodeBuilder definition) {
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
index 8f248ac209..a857a4422c 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
@@ -21,6 +21,8 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.commons.Compression;
 import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,8 +32,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import static 
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
@@ -70,6 +74,8 @@ class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result>
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedMergeSortTask.class);
 
+    private static final String THREAD_NAME = "mongo-merge-sort-files";
+
     private final File storeDir;
     private final Comparator<NodeStateHolder> comparator;
     private final Compression algorithm;
@@ -89,17 +95,28 @@ class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result>
     @Override
     public Result call() throws Exception {
         String originalName = Thread.currentThread().getName();
-        Thread.currentThread().setName("mongo-merge-sort-files");
+        Thread.currentThread().setName(THREAD_NAME);
         try {
-            LOG.info("Starting merge sort thread");
+            LOG.info("[TASK:{}:START] Starting merge sort task", 
THREAD_NAME.toUpperCase(Locale.ROOT));
             while (true) {
                 LOG.info("Waiting for next intermediate sorted file");
                 File sortedIntermediateFile = sortedFilesQueue.take();
                 if (sortedIntermediateFile == SENTINEL_SORTED_FILES_QUEUE) {
-                    LOG.info("Going to sort {} files, total size {}", 
sortedFiles.size(), humanReadableByteCountBin(sizeOf(sortedFiles)));
+                    long sortedFilesSizeBytes = sizeOf(sortedFiles);
+                    LOG.info("Going to sort {} files, total size {}", 
sortedFiles.size(), humanReadableByteCountBin(sortedFilesSizeBytes));
+                    Stopwatch w = Stopwatch.createStarted();
                     File flatFileStore = sortStoreFile(sortedFiles);
-                    LOG.info("Terminating sort task. Merged {} files to create 
the FFS: {} of size {}",
-                            sortedFiles.size(), 
flatFileStore.getAbsolutePath(), 
humanReadableByteCountBin(flatFileStore.length()));
+                    LOG.info("Final merge completed in {}. Created file: {}", 
FormattingUtils.formatToSeconds(w), flatFileStore.getAbsolutePath());
+                    long ffsSizeBytes = flatFileStore.length();
+                    String metrics = MetricsFormatter.newBuilder()
+                            .add("duration", 
FormattingUtils.formatToSeconds(w))
+                            .add("durationSeconds", 
w.elapsed(TimeUnit.SECONDS))
+                            .add("filesMerged", sortedFiles.size())
+                            .add("ffsSizeBytes", ffsSizeBytes)
+                            .add("ffsSize", 
humanReadableByteCountBin(ffsSizeBytes))
+                            .build();
+
+                    LOG.info("[TASK:{}:END] Metrics: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
                     return new Result(flatFileStore, sortedFiles.size());
                 }
                 sortedFiles.add(sortedIntermediateFile);
@@ -111,7 +128,7 @@ class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result>
             LOG.warn("Thread interrupted", t);
             throw t;
         } catch (Throwable t) {
-            LOG.warn("Thread terminating with exception.", t);
+            LOG.warn("Thread terminating with exception", t);
             throw t;
         } finally {
             Thread.currentThread().setName(originalName);
@@ -119,7 +136,6 @@ class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result>
     }
 
     private File sortStoreFile(List<File> sortedFilesBatch) throws IOException 
{
-        Stopwatch w = Stopwatch.createStarted();
         File sortedFile = new File(storeDir, 
getSortedStoreFileName(algorithm));
         try (BufferedWriter writer = createWriter(sortedFile, algorithm)) {
             Function<String, NodeStateHolder> stringToType = (line) -> line == 
null ? null : new NodeStateHolder(line);
@@ -134,7 +150,6 @@ class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result>
                     stringToType
             );
         }
-        LOG.info("Merging of sorted files completed in {}", w);
         return sortedFile;
     }
 }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index bb53f40a35..3ddcaf9649 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -31,6 +31,8 @@ import org.apache.jackrabbit.guava.common.base.Preconditions;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import org.apache.jackrabbit.oak.spi.filter.PathFilter;
 import org.bson.BsonDocument;
 import org.bson.conversions.Bson;
@@ -43,6 +45,7 @@ import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -102,6 +105,8 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
     private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 
10;
     private final static BsonDocument NATURAL_HINT = BsonDocument.parse("{ 
$natural:1}");
 
+    private static final String THREAD_NAME = "mongo-dump";
+
     private final int batchSize;
     private final BlockingQueue<BasicDBObject[]> mongoDocQueue;
     private final List<PathFilter> pathFilters;
@@ -150,11 +155,9 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
     @Override
     public Result call() throws Exception {
         String originalName = Thread.currentThread().getName();
-        Thread.currentThread().setName("mongo-dump");
+        Thread.currentThread().setName(THREAD_NAME);
+        LOG.info("[TASK:{}:START] Starting to download from MongoDB", 
THREAD_NAME.toUpperCase(Locale.ROOT));
         try {
-            LOG.info("Starting to download from MongoDB.");
-            //TODO This may lead to reads being routed to secondary depending 
on MongoURI
-            //So caller must ensure that its safe to read from secondary
             this.nextLastModified = 0;
             this.lastIdDownloaded = null;
 
@@ -164,10 +167,16 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             } else {
                 downloadWithNaturalOrdering();
             }
-            LOG.info("Terminating task. Downloaded {} Mongo documents in {}. 
Total enqueuing delay: {} ms ({}%)",
-                    documentsRead, downloadStartWatch,
-                    totalEnqueueWaitTimeMillis,
-                    String.format("%1.2f", (100.0 * 
totalEnqueueWaitTimeMillis) / 
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS)));
+            String enqueueingDelayPercentage = String.format("%1.2f", (100.0 * 
totalEnqueueWaitTimeMillis) / 
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS));
+            String metrics = MetricsFormatter.newBuilder()
+                    .add("duration", 
FormattingUtils.formatToSeconds(downloadStartWatch))
+                    .add("durationSeconds", 
downloadStartWatch.elapsed(TimeUnit.SECONDS))
+                    .add("documentsDownloaded", documentsRead)
+                    .add("enqueueingDelayMs", totalEnqueueWaitTimeMillis)
+                    .add("enqueueingDelayPercentage", 
enqueueingDelayPercentage)
+                    .build();
+
+            LOG.info("[TASK:{}:END] Metrics: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
             return new Result(documentsRead);
         } catch (InterruptedException t) {
             LOG.warn("Thread interrupted", t);
@@ -185,12 +194,11 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             double rate = ((double) this.documentsRead) / 
downloadStartWatch.elapsed(TimeUnit.SECONDS);
             String formattedRate = String.format("%1.2f nodes/s, %1.2f 
nodes/hr", rate, rate * 3600);
             LOG.info("Dumping from NSET Traversed #{} {} [{}] (Elapsed {})",
-                    this.documentsRead, id, formattedRate, downloadStartWatch);
+                    this.documentsRead, id, formattedRate, 
FormattingUtils.formatToSeconds(downloadStartWatch));
         }
         traversalLog.trace(id);
     }
 
-
     private void downloadWithRetryOnConnectionErrors() throws 
InterruptedException, TimeoutException {
         // If regex filtering is enabled, start by downloading the ancestors 
of the path used for filtering.
         // That is, download "/", "/content", "/content/dam" for a base path 
of "/content/dam". These nodes will not be
@@ -425,11 +433,11 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
     }
 
     private void tryEnqueue(BasicDBObject[] block) throws TimeoutException, 
InterruptedException {
-        Stopwatch stopwatch = Stopwatch.createStarted();
+        Stopwatch enqueueDelayStopwatch = Stopwatch.createStarted();
         if (!mongoDocQueue.offer(block, MONGO_QUEUE_OFFER_TIMEOUT.toMillis(), 
TimeUnit.MILLISECONDS)) {
             throw new TimeoutException("Timeout trying to enqueue batch of 
MongoDB documents. Waited " + MONGO_QUEUE_OFFER_TIMEOUT);
         }
-        long enqueueDelay = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+        long enqueueDelay = 
enqueueDelayStopwatch.elapsed(TimeUnit.MILLISECONDS);
         totalEnqueueWaitTimeMillis += enqueueDelay;
         if (enqueueDelay > 1) {
             logWithRateLimit(() ->
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
index b543b261bc..22a7552445 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
@@ -21,6 +21,7 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +31,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.Locale;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 
@@ -55,6 +57,8 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedSortBatchTask.class);
 
+    private static final String THREAD_NAME = "mongo-sort-batch";
+
     private final Comparator<SortKey> pathComparator;
     private final Compression algorithm;
     private final BlockingQueue<NodeStateEntryBatch> emptyBuffersQueue;
@@ -63,6 +67,7 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
     private final File sortWorkDir;
     private final byte[] copyBuffer = new byte[4096];
     private long entriesProcessed = 0;
+    private long batchesProcessed = 0;
 
     public PipelinedSortBatchTask(File storeDir,
                                   PathElementComparator pathComparator,
@@ -81,14 +86,18 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
     @Override
     public Result call() throws Exception {
         String originalName = Thread.currentThread().getName();
-        Thread.currentThread().setName("mongo-sort-batch");
+        Thread.currentThread().setName(THREAD_NAME);
         try {
-            LOG.info("Starting sort-and-save task");
+            LOG.info("[TASK:{}:START] Starting sort-and-save task", 
THREAD_NAME.toUpperCase(Locale.ROOT));
             while (true) {
                 LOG.info("Waiting for next batch");
                 NodeStateEntryBatch nseBuffer = nonEmptyBuffersQueue.take();
                 if (nseBuffer == SENTINEL_NSE_BUFFER) {
-                    LOG.info("Terminating thread, processed {} entries", 
entriesProcessed);
+                    String metrics = MetricsFormatter.newBuilder()
+                            .add("batchesProcessed", batchesProcessed)
+                            .add("entriesProcessed", entriesProcessed)
+                            .build();
+                    LOG.info("[TASK:{}:END] Metrics: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
                     return new Result(entriesProcessed);
                 }
                 sortAndSaveBatch(nseBuffer);
@@ -99,7 +108,7 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
             LOG.warn("Thread interrupted", t);
             throw t;
         } catch (Throwable t) {
-            LOG.warn("Thread terminating with exception.", t);
+            LOG.warn("Thread terminating with exception", t);
             throw t;
         } finally {
             Thread.currentThread().setName(originalName);
@@ -111,15 +120,16 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
         ByteBuffer buffer = nseb.getBuffer();
         LOG.info("Going to sort batch in memory. Entries: {}, Size: {}",
                 sortBuffer.size(), 
humanReadableByteCountBin(buffer.remaining()));
-        if (sortBuffer.size() == 0) {
+        if (sortBuffer.isEmpty()) {
             return;
         }
         Stopwatch sortClock = Stopwatch.createStarted();
         sortBuffer.sort(pathComparator);
-        LOG.info("Sorted batch in {}. Saving to disk.", sortClock);
+        LOG.info("Sorted batch in {}. Saving to disk", sortClock);
         Stopwatch saveClock = Stopwatch.createStarted();
         File newtmpfile = File.createTempFile("sortInBatch", "flatfile", 
sortWorkDir);
         long textSize = 0;
+        batchesProcessed++;
         try (BufferedOutputStream writer = createOutputStream(newtmpfile, 
algorithm)) {
             for (SortKey entry : sortBuffer) {
                 entriesProcessed++;
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 4379010aee..64650010f6 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -25,6 +25,8 @@ import org.apache.jackrabbit.guava.common.base.Preconditions;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
 import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -169,20 +171,27 @@ public class PipelinedStrategy implements SortStrategy {
                                  ArrayBlockingQueue<File> sortedFilesQueue,
                                  TransformStageStatistics 
transformStageStatistics,
                                  boolean printHistogramsAtInfo) {
-        // Summary stats
-        LOG.info("Queue sizes: {mongoDocQueue:{}, emptyBuffersQueue:{}, 
nonEmptyBuffersQueue:{}, sortedFilesQueue:{}}; Transform Stats: {}",
-                mongoDocQueue.size(), emptyBuffersQueue.size(), 
nonEmptyBuffersQueue.size(), sortedFilesQueue.size(), 
transformStageStatistics.formatStats());
+
+        String queueSizeStats = MetricsFormatter.newBuilder()
+                .add("mongoDocQueue", mongoDocQueue.size())
+                .add("emptyBuffersQueue", emptyBuffersQueue.size())
+                .add("nonEmptyBuffersQueue", nonEmptyBuffersQueue.size())
+                .add("sortedFilesQueue", sortedFilesQueue.size())
+                .build();
+
+        LOG.info("Queue sizes: {}", queueSizeStats);
+        LOG.info("Transform stats: {}", 
transformStageStatistics.formatStats());
         prettyPrintTransformStatisticsHistograms(transformStageStatistics, 
printHistogramsAtInfo);
     }
 
     private void 
prettyPrintTransformStatisticsHistograms(TransformStageStatistics 
transformStageStatistics, boolean printHistogramAtInfo) {
         if (printHistogramAtInfo) {
-            LOG.info("Top hidden paths rejected: {}.", 
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
-            LOG.info("Top paths filtered: {}.", 
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
+            LOG.info("Top hidden paths rejected: {}", 
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
+            LOG.info("Top paths filtered: {}", 
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
             LOG.info("Top empty node state documents: {}", 
transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
         } else {
-            LOG.debug("Top hidden paths rejected: {}.", 
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
-            LOG.debug("Top paths filtered: {}.", 
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
+            LOG.debug("Top hidden paths rejected: {}", 
transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
+            LOG.debug("Top paths filtered: {}", 
transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
             LOG.debug("Top empty node state documents: {}", 
transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
         }
     }
@@ -319,6 +328,7 @@ public class PipelinedStrategy implements SortStrategy {
                 
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(bufferSizeBytes,
 maxNumberOfEntriesPerBuffer));
             }
 
+            LOG.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
             Stopwatch start = Stopwatch.createStarted();
             MongoCollection<BasicDBObject> dbCollection = 
MongoDocumentStoreHelper.getDBCollection(docStore, Collection.NODES);
             PipelinedMongoDownloadTask downloadTask = new 
PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue, 
pathFilters);
@@ -353,7 +363,7 @@ public class PipelinedStrategy implements SortStrategy {
 
 
             try {
-                LOG.info("Waiting for tasks to complete.");
+                LOG.info("Waiting for tasks to complete");
                 int tasksFinished = 0;
                 int transformTasksFinished = 0;
                 while (tasksFinished < numberOfThreads) {
@@ -372,10 +382,10 @@ public class PipelinedStrategy implements SortStrategy {
                             PipelinedTransformTask.Result transformResult = 
(PipelinedTransformTask.Result) result;
                             transformTasksFinished++;
                             entryCount += transformResult.getEntryCount();
-                            LOG.info("Transform thread {} finished. Entries 
processed: {}",
+                            LOG.info("Transform task {} finished. Entries 
processed: {}",
                                     transformResult.getThreadId(), 
transformResult.getEntryCount());
                             if (transformTasksFinished == transformThreads) {
-                                LOG.info("All transform tasks finished. Node 
states retrieved: {}", entryCount);
+                                LOG.info("All transform tasks finished. Total 
entries processed: {}", entryCount);
                                 // No need to keep monitoring the queues, the 
download and transform threads are done.
                                 monitorFuture.cancel(false);
                                 // Terminate the sort thread.
@@ -384,14 +394,14 @@ public class PipelinedStrategy implements SortStrategy {
 
                         } else if (result instanceof 
PipelinedSortBatchTask.Result) {
                             PipelinedSortBatchTask.Result sortTaskResult = 
(PipelinedSortBatchTask.Result) result;
-                            LOG.info("Sort task finished. Entries processed: 
{}", sortTaskResult.getTotalEntries());
+                            LOG.info("Sort batch task finished. Entries 
processed: {}", sortTaskResult.getTotalEntries());
                             printStatistics(mongoDocQueue, emptyBatchesQueue, 
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
                             sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
 
                         } else if (result instanceof 
PipelinedMergeSortTask.Result) {
                             PipelinedMergeSortTask.Result mergeSortedFilesTask 
= (PipelinedMergeSortTask.Result) result;
                             File ffs = 
mergeSortedFilesTask.getFlatFileStoreFile();
-                            LOG.info("Sort task finished. FFS: {}, Size: {}", 
ffs, humanReadableByteCountBin(ffs.length()));
+                            LOG.info("Merge-sort sort task finished. FFS: {}, 
Size: {}", ffs, humanReadableByteCountBin(ffs.length()));
                             flatFileStore = 
mergeSortedFilesTask.getFlatFileStoreFile();
 
                         } else {
@@ -399,7 +409,7 @@ public class PipelinedStrategy implements SortStrategy {
                         }
                         tasksFinished++;
                     } catch (ExecutionException ex) {
-                        LOG.warn("Execution error dumping from MongoDB: " + ex 
+ ". Shutting down all threads.");
+                        LOG.warn("Execution error dumping from MongoDB: " + ex 
+ ". Shutting down all threads");
                         threadPool.shutdownNow();
                         boolean terminated = threadPool.awaitTermination(5, 
TimeUnit.SECONDS);
                         if (!terminated) {
@@ -416,7 +426,11 @@ public class PipelinedStrategy implements SortStrategy {
                         throw new RuntimeException(ex);
                     }
                 }
-                LOG.info("Dumped {} nodestates in json format in {}", 
entryCount, start);
+                LOG.info("[TASK:PIPELINED-DUMP:END] Metrics: {}", 
MetricsFormatter.newBuilder()
+                        .add("duration", 
FormattingUtils.formatToSeconds(start))
+                        .add("durationSeconds", 
start.elapsed(TimeUnit.SECONDS))
+                        .add("entryCount", entryCount)
+                        .build());
                 printStatistics(mongoDocQueue, emptyBatchesQueue, 
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
index 9f6e012b53..0781f41f71 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
@@ -21,6 +21,8 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 import com.mongodb.BasicDBObject;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -41,6 +43,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -71,9 +74,9 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
             return transformThreadId;
         }
     }
-
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedTransformTask.class);
     private static final AtomicInteger threadIdGenerator = new AtomicInteger();
+    private static final String THREAD_NAME_PREFIX = "mongo-transform-";
 
     private final MongoDocumentStore mongoStore;
     private final DocumentNodeStore documentNodeStore;
@@ -89,7 +92,7 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
     private final ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue;
     private final TransformStageStatistics statistics;
     private final int threadId = threadIdGenerator.getAndIncrement();
-    private long totalEnqueueWaitTimeMillis = 0;
+    private long totalEnqueueDelayMillis = 0;
 
     public PipelinedTransformTask(MongoDocumentStore mongoStore,
                                   DocumentNodeStore documentNodeStore,
@@ -116,26 +119,34 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
     @Override
     public Result call() throws Exception {
         String originalName = Thread.currentThread().getName();
-        Thread.currentThread().setName("mongo-transform-" + threadId);
+        String threadName = THREAD_NAME_PREFIX + threadId;
+        Thread.currentThread().setName(threadName);
         try {
-            LOG.info("Starting transform task");
+            LOG.info("[TASK:{}:START] Starting transform task", 
threadName.toUpperCase(Locale.ROOT));
             NodeDocumentCache nodeCache = 
MongoDocumentStoreHelper.getNodeDocumentCache(mongoStore);
             Stopwatch taskStartWatch = Stopwatch.createStarted();
             long totalEntryCount = 0;
             long mongoObjectsProcessed = 0;
-            LOG.info("Waiting for an empty buffer");
+            LOG.debug("Waiting for an empty buffer");
             NodeStateEntryBatch nseBatch = emptyBatchesQueue.take();
 
             // Used to serialize a node state entry before writing it to the 
buffer
             ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
             OutputStreamWriter writer = new OutputStreamWriter(baos, 
PipelinedStrategy.FLATFILESTORE_CHARSET);
-            LOG.info("Obtained an empty buffer. Starting to convert Mongo 
documents to node state entries");
+            LOG.debug("Obtained an empty buffer. Starting to convert Mongo 
documents to node state entries");
             while (true) {
                 BasicDBObject[] dbObjectBatch = mongoDocQueue.take();
                 if (dbObjectBatch == SENTINEL_MONGO_DOCUMENT) {
-                    LOG.info("Task terminating. Dumped {} nodestates in json 
format in {}. Total enqueue delay: {} ms ({}%)",
-                            totalEntryCount, taskStartWatch, 
totalEnqueueWaitTimeMillis,
-                            String.format("%1.2f", (100.0 * 
totalEnqueueWaitTimeMillis) / taskStartWatch.elapsed(TimeUnit.MILLISECONDS)));
+
+                    String totalEnqueueDelayPercentage =  
String.format("%1.2f", (100.0 * totalEnqueueDelayMillis) / 
taskStartWatch.elapsed(TimeUnit.MILLISECONDS));
+                    String metrics = MetricsFormatter.newBuilder()
+                            .add("duration", 
FormattingUtils.formatToSeconds(taskStartWatch))
+                            .add("durationSeconds", 
taskStartWatch.elapsed(TimeUnit.SECONDS))
+                            .add("nodeStateEntriesGenerated", totalEntryCount)
+                            .add("enqueueDelayMillis", totalEnqueueDelayMillis)
+                            .add("enqueueDelayPercentage", 
totalEnqueueDelayPercentage)
+                            .build();
+                    LOG.info("[TASK:{}:END] Metrics: {}", 
threadName.toUpperCase(Locale.ROOT), metrics);
                     //Save the last batch
                     nseBatch.getBuffer().flip();
                     tryEnqueue(nseBatch);
@@ -202,7 +213,7 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
             LOG.warn("Thread interrupted", t);
             throw t;
         } catch (Throwable t) {
-            LOG.warn("Thread terminating with exception.", t);
+            LOG.warn("Thread terminating with exception", t);
             throw t;
         } finally {
             Thread.currentThread().setName(originalName);
@@ -210,10 +221,10 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
     }
 
     private void tryEnqueue(NodeStateEntryBatch nseBatch) throws 
InterruptedException {
-        Stopwatch stopwatch = Stopwatch.createStarted();
+        Stopwatch enqueueDelayStopwatch = Stopwatch.createStarted();
         nonEmptyBatchesQueue.put(nseBatch);
-        long enqueueDelay = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-        totalEnqueueWaitTimeMillis += enqueueDelay;
+        long enqueueDelay = 
enqueueDelayStopwatch.elapsed(TimeUnit.MILLISECONDS);
+        totalEnqueueDelayMillis += enqueueDelay;
         if (enqueueDelay > 1) {
             LOG.info("Enqueuing of node state entries batch was delayed, took 
{} ms. nonEmptyBatchesQueue size {}. ",
                     enqueueDelay, nonEmptyBatchesQueue.size());
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
index a14e854c23..060c5aad8f 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
@@ -19,6 +19,7 @@
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 
 import java.util.concurrent.atomic.LongAdder;
 
@@ -141,23 +142,24 @@ public class TransformStageStatistics {
                 String.format("%2.1f%%", (100.0 * documentsAcceptedTotal) / 
mongoDocumentsTraversedSum);
         String entriesAcceptedPercentage = totalEntries == 0 ? "N/A" :
                 String.format("%1.1f%%", (100.0 * entriesAcceptedSum) / 
totalEntries);
-        String avgEntrySize = entriesAcceptedSum == 0 ? "N/A" :
-                Long.toString(extractedEntriesTotalSizeSum / 
entriesAcceptedSum);
-        return "{documentsTraversed:" + mongoDocumentsTraversedSum +
-                ", documentsAccepted:" + documentsAcceptedTotal +
-                ", documentsRejected:" + documentsRejectedTotal +
-                ", documentsAcceptedPercentage:" + documentsAcceptedPercentage 
+
-                ", documentsRejectedSplit:" + documentsRejectedSplitSum +
-                ", documentsRejectedEmptyNodeState:" + 
documentsRejectedEmptyNodeStateSum +
-                ", entriesTraversed:" + totalEntries +
-                ", entriesAccepted:" + entriesAcceptedSum +
-                ", entriesRejected:" + entriesRejectedSum +
-                ", entriesAcceptedPercentage:" + entriesAcceptedPercentage +
-                ", entriesRejectedHiddenPaths:" + entriesRejectedHiddenPaths +
-                ", entriesRejectedPathFiltered:" + entriesRejectedPathFiltered 
+
-                ", extractedEntriesTotalSize:" + extractedEntriesTotalSizeSum +
-                ", avgEntrySize:" + avgEntrySize +
-                "}";
+        long avgEntrySize = entriesAcceptedSum == 0 ? -1 :
+                extractedEntriesTotalSizeSum / entriesAcceptedSum;
+        return MetricsFormatter.newBuilder()
+                .add("documentsTraversed", mongoDocumentsTraversedSum)
+                .add("documentsAccepted", documentsAcceptedTotal)
+                .add("documentsRejected", documentsRejectedTotal)
+                .add("documentsAcceptedPercentage", 
documentsAcceptedPercentage)
+                .add("documentsRejectedSplit", documentsRejectedSplitSum)
+                .add("documentsRejectedEmptyNodeState", 
documentsRejectedEmptyNodeStateSum)
+                .add("entriesTraversed", totalEntries)
+                .add("entriesAccepted", entriesAcceptedSum)
+                .add("entriesRejected", entriesRejectedSum)
+                .add("entriesAcceptedPercentage", entriesAcceptedPercentage)
+                .add("entriesRejectedHiddenPaths", 
entriesRejectedHiddenPaths.sum())
+                .add("entriesRejectedPathFiltered", 
entriesRejectedPathFiltered.sum())
+                .add("extractedEntriesTotalSize", extractedEntriesTotalSizeSum)
+                .add("avgEntrySize", avgEntrySize)
+                .build();
     }
 
     private static String getPathPrefix(String path, int depth) {
diff --git 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
index b331b6b9e6..33003fe552 100644
--- 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
+++ 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
@@ -65,7 +65,7 @@ public class ElasticDocumentStoreIndexer extends 
DocumentStoreIndexerBase {
     Otherwise proper alias naming and mapping will not be applied
      */
     @Override
-    protected void preIndexOpertaions(List<NodeStateIndexer> indexers) {
+    protected void preIndexOperations(List<NodeStateIndexer> indexers) {
         // For all the available indexers check if it's an ElasticIndexer
         // and then provision the index
         for (NodeStateIndexer indexer : indexers) {
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
index 55f9aa08ce..4d0be380d3 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
@@ -31,6 +31,8 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.index.async.AsyncIndexerLucene;
 import org.apache.jackrabbit.oak.index.indexer.document.DocumentStoreIndexer;
 import org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStore;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
 import org.apache.jackrabbit.oak.run.cli.CommonOptions;
 import org.apache.jackrabbit.oak.run.cli.DocumentBuilderCustomizer;
@@ -235,7 +237,8 @@ public class IndexCommand implements Command {
     private File reindex(IndexOptions idxOpts, ExtendedIndexHelper 
extendedIndexHelper, String checkpoint) throws IOException, 
CommitFailedException {
         checkNotNull(checkpoint, "Checkpoint value is required for reindexing 
done in read only mode");
 
-        Stopwatch w = Stopwatch.createStarted();
+        log.info("[TASK:REINDEX:START] Starting reindexing");
+        Stopwatch reindexWatch = Stopwatch.createStarted();
         IndexerSupport indexerSupport = 
createIndexerSupport(extendedIndexHelper, checkpoint);
         log.info("Proceeding to index {} upto checkpoint {} {}", 
extendedIndexHelper.getIndexPaths(), checkpoint,
                 indexerSupport.getCheckpointInfo());
@@ -259,7 +262,11 @@ public class IndexCommand implements Command {
         indexerSupport.writeMetaInfo(checkpoint);
         File destDir = indexerSupport.copyIndexFilesToOutput();
         log.info("Indexing completed for indexes {} in {} ({} ms) and index 
files are copied to {}",
-                extendedIndexHelper.getIndexPaths(), w, 
w.elapsed(TimeUnit.MILLISECONDS), IndexCommand.getPath(destDir));
+                extendedIndexHelper.getIndexPaths(), reindexWatch, 
reindexWatch.elapsed(TimeUnit.MILLISECONDS), IndexCommand.getPath(destDir));
+        log.info("[TASK:REINDEX:END] Metrics: {}",  
MetricsFormatter.newBuilder()
+                .add("duration", FormattingUtils.formatToSeconds(reindexWatch))
+                .add("durationSeconds", reindexWatch.elapsed(TimeUnit.SECONDS))
+                .build());
         return destDir;
     }
 
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
index 7809fe5de8..fc3596daab 100644
--- 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
+++ 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
@@ -50,7 +50,7 @@ public class DocumentStoreIndexer extends 
DocumentStoreIndexerBase implements Cl
     }
 
     @Override
-    protected void preIndexOpertaions(List<NodeStateIndexer> indexers) {
+    protected void preIndexOperations(List<NodeStateIndexer> indexers) {
         // NOOP
     }
 }

Reply via email to