This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 80fb761f2cef0dc78c573081aaf83e52b32b46fc
Author: Salim Achouche <sachouc...@gmail.com>
AuthorDate: Fri Jun 29 17:31:40 2018 -0700

    DRILL-6560: Enhanced the batch statistics logging enablement
    
    closes #1355
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   3 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |  11 +-
 .../exec/server/options/SystemOptionManager.java   |   5 +-
 .../parquet/columnreaders/ParquetRecordReader.java |   3 +-
 .../parquet/columnreaders/VarLenBinaryReader.java  |  15 +-
 .../batchsizing/OverflowSerDeUtil.java             |  12 +-
 .../batchsizing/RecordBatchOverflow.java           |  17 +-
 .../batchsizing/RecordBatchSizerManager.java       |  45 +++--
 .../drill/exec/util/record/RecordBatchStats.java   | 181 ++++++++++++++-------
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 10 files changed, 186 insertions(+), 107 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4c840a4..d0842d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -711,5 +711,8 @@ public final class ExecConstants {
   public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = 
"drill.exec.stats.logging.fine_grained.batch_size";
   public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = 
new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION);
 
+  /** Controls the list of operators for which batch sizing stats should be 
enabled */
+  public static final String STATS_LOGGING_BATCH_OPERATOR_OPTION = 
"drill.exec.stats.logging.enabled_operators";
+  public static final StringValidator STATS_LOGGING_BATCH_OPERATOR_VALIDATOR = 
new StringValidator(STATS_LOGGING_BATCH_OPERATOR_OPTION);
 
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 09e785e..4a2cd2c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -82,7 +82,7 @@ public class ScanBatch implements CloseableRecordBatch {
   private final BufferAllocator allocator;
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
-  private final RecordBatchStatsContext batchStatsLogging;
+  private final RecordBatchStatsContext batchStatsContext;
 
   /**
    *
@@ -121,7 +121,7 @@ public class ScanBatch implements CloseableRecordBatch {
       this.implicitColumnList = implicitColumnList;
       addImplicitVectors();
       currentReader = null;
-      batchStatsLogging = new RecordBatchStatsContext(context, oContext);
+      batchStatsContext = new RecordBatchStatsContext(context, oContext);
     } finally {
       oContext.getStats().stopProcessing();
     }
@@ -304,12 +304,7 @@ public class ScanBatch implements CloseableRecordBatch {
       return; // NOOP
     }
 
-    RecordBatchStats.logRecordBatchStats(
-      batchStatsLogging.getContextOperatorId(),
-      getFQNForLogging(MAX_FQN_LENGTH),
-      this,
-      batchStatsLogging,
-      logger);
+    RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), 
this, batchStatsContext);
   }
 
   /** Might truncate the FQN if too long */
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a16bb4d..5ee3825 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -46,8 +46,8 @@ import com.google.common.collect.Lists;
 
 /**
  *  <p> {@link OptionManager} that holds options within {@link 
org.apache.drill.exec.server.DrillbitContext}.
- *  Only one instance of this class exists per drillbit. Options set at the 
system level affect the entire system and
- *  persist between restarts.
+ * Only one instance of this class exists per drillbit. Options set at the 
system level affect the entire system and
+ * persist between restarts.
  *  </p>
  *
  *  <p> All the system options are externalized into conf file. While adding a 
new system option
@@ -235,6 +235,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, 
new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, 
true)),
       new 
OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new 
OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_VALIDATOR,new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new 
OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, 
new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, 
new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index e1ca73f..e33a505 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
+import 
org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -240,7 +241,7 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
     this.operatorContext = operatorContext;
     schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, 
footer, isStarQuery() ? null : getColumns());
-    batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), 
schema, numRecordsToRead);
+    batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), 
schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, 
operatorContext));
 
     logger.debug("Reading row group({}) with {} records in file {}.", 
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
         hadoopPath.toUri().getPath());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 7bdc33e..1fb224d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -34,11 +34,11 @@ import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatch
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.vector.ValueVector;
 
 /** Class which handles reading a batch of rows from a set of variable columns 
*/
 public class VarLenBinaryReader {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(VarLenBinaryReader.class);
 
   final ParquetRecordReader parentReader;
   final RecordBatchSizerManager batchSizer;
@@ -170,7 +170,8 @@ public class VarLenBinaryReader {
 
         // Lazy initialization
         if (builder == null) {
-          builder = 
RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator());
+          builder = 
RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator(),
+            batchSizer.getBatchStatsContext());
         }
 
         final int numOverflowValues = columnStat.numValuesRead - 
batchNumRecords;
@@ -181,7 +182,7 @@ public class VarLenBinaryReader {
     // Register batch overflow data with the record batch sizer manager (if 
any)
     if (builder != null) {
       Map<String, FieldOverflowStateContainer> overflowContainerMap = 
parentReader.batchSizerMgr.getFieldOverflowMap();
-      Map<String, FieldOverflowDefinition> overflowDefMap           = 
builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
+      Map<String, FieldOverflowDefinition> overflowDefMap = 
builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
 
       for (Map.Entry<String, FieldOverflowDefinition> entry : 
overflowDefMap.entrySet()) {
         FieldOverflowStateContainer overflowStateContainer = new 
FieldOverflowStateContainer(entry.getValue(), null);
@@ -197,9 +198,9 @@ public class VarLenBinaryReader {
     // Finally, re-order the variable length columns since an overflow occurred
     Collections.sort(orderedColumns, comparator);
 
-    if (logger.isDebugEnabled()) {
-      boolean isFirstValue    = true;
-      final StringBuilder msg = new 
StringBuilder(RecordBatchSizerManager.BATCH_STATS_PREFIX);
+    if (batchSizer.getBatchStatsContext().isEnableBatchSzLogging()) {
+      boolean isFirstValue = true;
+      final StringBuilder msg = new StringBuilder();
       msg.append(": Dumping the variable length columns read order: ");
 
       for (VLColumnContainer container : orderedColumns) {
@@ -212,7 +213,7 @@ public class VarLenBinaryReader {
       }
       msg.append('.');
 
-      logger.debug(msg.toString());
+      RecordBatchStats.logRecordBatchStats(msg.toString(), 
batchSizer.getBatchStatsContext());
     }
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
index c542803..4a0e1e8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -26,6 +26,8 @@ import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatch
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry;
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer;
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import 
org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.UInt1Vector;
 import org.apache.drill.exec.vector.UInt4Vector;
 
@@ -44,7 +46,6 @@ import org.apache.drill.exec.vector.UInt4Vector;
  * </ul>
  */
 final class OverflowSerDeUtil {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class);
 
   /**
    * Serializes a collection of overflow fields into a memory buffer:
@@ -56,10 +57,12 @@ final class OverflowSerDeUtil {
    *
    * @param fieldOverflowEntries input collection of field overflow entries
    * @param allocator buffer allocator
+   * @param batchStatsContext batch statistics context object
    * @return record overflow container; null if the input buffer is empty
    */
   static RecordOverflowContainer serialize(List<FieldOverflowEntry> 
fieldOverflowEntries,
-    BufferAllocator allocator) {
+    BufferAllocator allocator,
+    RecordBatchStatsContext batchStatsContext) {
 
     if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) {
       return null;
@@ -82,8 +85,9 @@ final class OverflowSerDeUtil {
     // Allocate the required memory to serialize the overflow fields
     final DrillBuf buffer = allocator.buffer(bufferLength);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug(String.format("Allocated a buffer of length %d to handle 
overflow", bufferLength));
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String msg = String.format("Allocated a buffer of length [%d] to 
handle overflow", bufferLength);
+      RecordBatchStats.logRecordBatchStats(msg, batchStatsContext);
     }
 
     // Create the result object
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
index 76422ae..462ddf0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
+import 
org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
@@ -39,10 +40,11 @@ public final class RecordBatchOverflow {
 
   /**
    * @param allocator buffer allocator
+   * @param batchStatsContext batch statistics context
    * @return new builder object
    */
-  public static Builder newBuilder(BufferAllocator allocator) {
-    return new Builder(allocator);
+  public static Builder newBuilder(BufferAllocator allocator, 
RecordBatchStatsContext batchStatsContext) {
+    return new Builder(allocator, batchStatsContext);
   }
 
   /**
@@ -75,13 +77,17 @@ public final class RecordBatchOverflow {
     private final List<FieldOverflowEntry> fieldOverflowEntries = new 
ArrayList<FieldOverflowEntry>();
     /** Buffer allocator */
     private final BufferAllocator allocator;
+    /** Batch statistics context */
+    private final RecordBatchStatsContext batchStatsContext;
 
     /**
      * Build class to construct a {@link RecordBatchOverflow} object.
      * @param allocator buffer allocator
+     * @param batchStatsContext batch statistics context
      */
-    private Builder(BufferAllocator allocator) {
+    private Builder(BufferAllocator allocator, RecordBatchStatsContext 
batchStatsContext) {
       this.allocator = allocator;
+      this.batchStatsContext = batchStatsContext;
     }
 
     /**
@@ -101,9 +107,8 @@ public final class RecordBatchOverflow {
      * @return a new built {link BatchRecordOverflow} object instance
      */
     public RecordBatchOverflow build() {
-      RecordOverflowContainer overflowContainer = 
OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator);
-      RecordBatchOverflow result                =
-        new RecordBatchOverflow(overflowContainer.recordOverflowDef, 
allocator);
+      RecordOverflowContainer overflowContainer = 
OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator, batchStatsContext);
+      RecordBatchOverflow result = new 
RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
 
       return result;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index 01644f7..5ddcf7e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -30,6 +30,8 @@ import 
org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput;
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import 
org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -39,7 +41,7 @@ import org.apache.drill.exec.vector.ValueVector;
  */
 public final class RecordBatchSizerManager {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class);
-  public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
 
   /** Minimum column memory size */
   private static final int MIN_COLUMN_MEMORY_SZ = 
VarLenColumnBulkInput.getMinVLColumnMemorySize();
@@ -78,6 +80,9 @@ public final class RecordBatchSizerManager {
    */
   private Map<String, FieldOverflowStateContainer> fieldOverflowMap = 
CaseInsensitiveMap.newHashMap();
 
+  /** For controlling batch statistics logging */
+  private final RecordBatchStatsContext batchStatsContext;
+
   /**
    * Constructor.
    *
@@ -87,7 +92,8 @@ public final class RecordBatchSizerManager {
    */
   public RecordBatchSizerManager(OptionManager options,
     ParquetSchema schema,
-    long totalRecordsToRead) {
+    long totalRecordsToRead,
+    RecordBatchStatsContext batchStatsContext) {
 
     this.schema = schema;
     this.totalRecordsToRead = totalRecordsToRead;
@@ -97,6 +103,7 @@ public final class RecordBatchSizerManager {
     this.maxRecordsPerBatch = this.configRecordsPerBatch;
     this.recordsPerBatch = this.configRecordsPerBatch;
     this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap);
+    this.batchStatsContext = batchStatsContext;
   }
 
   /**
@@ -131,6 +138,13 @@ public final class RecordBatchSizerManager {
   }
 
   /**
+   * @return batch statistics context
+   */
+  public RecordBatchStatsContext getBatchStatsContext() {
+    return batchStatsContext;
+  }
+
+  /**
    * Allocates value vectors for the current batch.
    *
    * @param vectorMap a collection of value vectors keyed by their field names
@@ -282,10 +296,9 @@ public final class RecordBatchSizerManager {
       normalizedNumRecords = (int) totalRecordsToRead;
     }
 
-    if (logger.isDebugEnabled()) {
-      final String message = String.format("%s: The Parquet reader number of 
record(s) has been set to [%d]",
-        BATCH_STATS_PREFIX, normalizedNumRecords);
-      logger.debug(message);
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String message = String.format("The Parquet reader number of 
record(s) has been set to [%d]", normalizedNumRecords);
+      RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
     }
 
     return normalizedNumRecords;
@@ -319,10 +332,9 @@ public final class RecordBatchSizerManager {
       logger.warn(message);
     }
 
-    if (logger.isDebugEnabled()) {
-      final String message = String.format("%s: The Parquet reader batch 
memory has been set to [%d] byte(s)",
-        BATCH_STATS_PREFIX, normalizedMemorySize);
-      logger.debug(message);
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String message = String.format("The Parquet reader batch memory 
has been set to [%d] byte(s)", normalizedMemorySize);
+      RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
     }
 
     return normalizedMemorySize;
@@ -370,13 +382,12 @@ public final class RecordBatchSizerManager {
     assignFineGrainedMemoryQuota();
 
     // log the new record batch if it changed
-    if (logger.isDebugEnabled()) {
+    if (batchStatsContext.isEnableBatchSzLogging()) {
       assert recordsPerBatch <= maxRecordsPerBatch;
 
       if (originalRecordsPerBatch != recordsPerBatch) {
-        final String message = String.format("%s: The Parquet records per 
batch [%d] has been decreased to [%d]",
-          BATCH_STATS_PREFIX, originalRecordsPerBatch, recordsPerBatch);
-        logger.debug(message);
+        final String message = String.format("The Parquet records per batch 
[%d] has been decreased to [%d]", originalRecordsPerBatch, recordsPerBatch);
+        RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
       }
 
       // Now dump the per column memory quotas
@@ -504,12 +515,12 @@ public final class RecordBatchSizerManager {
   }
 
   private void dumpColumnMemoryQuotas() {
-    StringBuilder msg = new StringBuilder(BATCH_STATS_PREFIX);
+    StringBuilder msg = new StringBuilder();
     msg.append(": Field Quotas:\n\tName\tType\tPrec\tQuota\n");
 
     for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
       msg.append("\t");
-      msg.append(BATCH_STATS_PREFIX);
+      msg.append(RecordBatchStats.BATCH_STATS_PREFIX);
       msg.append("\t");
       msg.append(columnInfo.columnMeta.getField().getName());
       msg.append("\t");
@@ -521,7 +532,7 @@ public final class RecordBatchSizerManager {
       msg.append("\n");
     }
 
-    logger.debug(msg.toString());
+    RecordBatchStats.logRecordBatchStats(msg.toString(), batchStatsContext);
   }
 
   private  static void printType(MaterializedField field, StringBuilder msg) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
index 8b213a8..0b24244 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.util.record;
 
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -25,7 +24,6 @@ import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
@@ -34,13 +32,14 @@ import 
org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
  * Utility class to capture key record batch statistics.
  */
 public final class RecordBatchStats {
+  // Logger
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordBatchStats.class);
+
   /** A prefix for all batch stats to simplify search */
   public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
 
   /** Helper class which loads contextual record batch logging options */
   public static final class RecordBatchStatsContext {
-    private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordBatchStatsContext.class);
-
     /** batch size logging for all readers */
     private final boolean enableBatchSzLogging;
     /** Fine grained batch size logging */
@@ -52,8 +51,17 @@ public final class RecordBatchStats {
      * @param options options manager
      */
     public RecordBatchStatsContext(FragmentContext context, OperatorContext 
oContext) {
-      enableBatchSzLogging = 
context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
-      enableFgBatchSzLogging = 
context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+      final boolean operatorEnabledForStatsLogging = 
isBatchStatsEnabledForOperator(context, oContext);
+
+      if (operatorEnabledForStatsLogging) {
+        enableBatchSzLogging = 
context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
+        enableFgBatchSzLogging = 
context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+
+      } else {
+        enableBatchSzLogging = false;
+        enableFgBatchSzLogging = false;
+      }
+
       contextOperatorId = new StringBuilder()
         .append(getQueryId(context))
         .append(":")
@@ -100,6 +108,104 @@ public final class RecordBatchStats {
       }
       return "NA";
     }
+
+    private boolean isBatchStatsEnabledForOperator(FragmentContext context, 
OperatorContext oContext) {
+      // The configuration can select what operators should log batch 
statistics
+      final String statsLoggingOperator = 
context.getOptions().getString(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_OPTION).toUpperCase();
+      final String allOperatorsStr = "ALL";
+
+      // All operators are allowed to log batch statistics
+      if (allOperatorsStr.equals(statsLoggingOperator)) {
+        return true;
+      }
+
+      // No, only a select few are allowed; syntax: 
operator-id-1,operator-id-2,..
+      final String[] operators = statsLoggingOperator.split(",");
+      final String operatorId = oContext.getStats().getId().toUpperCase();
+
+      for (int idx = 0; idx < operators.length; idx++) {
+        // We use "contains" because the operator identifier is a composite 
string; e.g., 3:[PARQUET_ROW_GROUP_SCAN]
+        if (operatorId.contains(operators[idx].trim())) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+  }
+
+  /**
+   * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, 
RecordBatchStatsContext)}
+   */
+  public static void logRecordBatchStats(RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    logRecordBatchStats(null, recordBatch, batchStatsContext);
+  }
+
+  /**
+   * Logs record batch statistics for the input record batch (logging happens 
only
+   * when record statistics logging is enabled).
+   *
+   * @param sourceId optional source identifier for scanners
+   * @param recordBatch a set of records
+   * @param batchStatsContext batch stats context object
+   */
+  public static void logRecordBatchStats(String sourceId,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    final String statsId = batchStatsContext.getContextOperatorId();
+    final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging();
+    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, 
verbose);
+
+    logBatchStatsMsg(batchStatsContext, msg, false);
+  }
+
+  /**
+   * Logs a generic batch statistics message
+   *
+   * @param message log message
+   * @param batchStatsLogging
+   * @param batchStatsContext batch stats context object
+   */
+  public static void logRecordBatchStats(String message,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    logBatchStatsMsg(batchStatsContext, message, true);
+  }
+
+  /**
+   * @param allocator dumps allocator statistics
+   * @return string with allocator statistics
+   */
+  public static String printAllocatorStats(BufferAllocator allocator) {
+    StringBuilder msg = new StringBuilder();
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": dumping allocator statistics:\n");
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": ");
+    msg.append(allocator.toString());
+
+    return msg.toString();
+  }
+
+// ----------------------------------------------------------------------------
+// Local Implementation
+// ----------------------------------------------------------------------------
+
+  /**
+   * Disabling class object instantiation.
+   */
+  private RecordBatchStats() {
   }
 
   /**
@@ -112,7 +218,7 @@ public final class RecordBatchStats {
    *
    * @return a string containing the record batch statistics
    */
-  public static String printRecordBatchStats(String statsId,
+  private static String printRecordBatchStats(String statsId,
     String sourceId,
     RecordBatch recordBatch,
     boolean verbose) {
@@ -158,68 +264,19 @@ public final class RecordBatchStats {
     return msg.toString();
   }
 
-  /**
-   * Logs record batch statistics for the input record batch (logging happens 
only
-   * when record statistics logging is enabled).
-   *
-   * @param stats instance identifier
-   * @param sourceId optional source identifier for scanners
-   * @param recordBatch a set of records
-   * @param verbose whether to include fine-grained stats
-   * @param logger Logger where to print the record batch statistics
-   */
-  public static void logRecordBatchStats(String statsId,
-    String sourceId,
-    RecordBatch recordBatch,
-    RecordBatchStatsContext batchStatsLogging,
-    org.slf4j.Logger logger) {
+  private static void logBatchStatsMsg(RecordBatchStatsContext 
batchStatsContext,
+    String msg,
+    boolean includePrefix) {
 
-    if (!batchStatsLogging.isEnableBatchSzLogging()) {
-      return; // NOOP
+    if (includePrefix) {
+      msg = BATCH_STATS_PREFIX + '\t' + msg;
     }
 
-    final boolean verbose = batchStatsLogging.isEnableFgBatchSzLogging();
-    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, 
verbose);
-
-    if (batchStatsLogging.useInfoLevelLogging()) {
+    if (batchStatsContext.useInfoLevelLogging()) {
       logger.info(msg);
     } else {
       logger.debug(msg);
     }
   }
 
-  /**
-   * Prints a materialized field type
-   * @param field materialized field
-   * @param msg string builder where to append the field type
-   */
-  public static void printFieldType(MaterializedField field, StringBuilder 
msg) {
-    final MajorType type = field.getType();
-
-    msg.append(type.getMinorType().name());
-    msg.append(':');
-    msg.append(type.getMode().name());
-  }
-
-  /**
-   * @param allocator dumps allocator statistics
-   * @return string with allocator statistics
-   */
-  public static String printAllocatorStats(BufferAllocator allocator) {
-    StringBuilder msg = new StringBuilder();
-    msg.append(BATCH_STATS_PREFIX);
-    msg.append(": dumping allocator statistics:\n");
-    msg.append(BATCH_STATS_PREFIX);
-    msg.append(": ");
-    msg.append(allocator.toString());
-
-    return msg.toString();
-  }
-
-  /**
-   * Disabling class object instantiation.
-   */
-  private RecordBatchStats() {
-  }
-
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index b0cc209..19e779d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -488,6 +488,7 @@ drill.exec.options: {
     exec.udf.use_dynamic: true,
     drill.exec.stats.logging.batch_size: false,
     drill.exec.stats.logging.fine_grained.batch_size: false,
+    drill.exec.stats.logging.enabled_operators: all,
     new_view_default_permissions: 700,
     org.apache.drill.exec.compile.ClassTransformer.scalar_replacement: "try",
     planner.add_producer_consumer: false,

Reply via email to