This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 80fb761f2cef0dc78c573081aaf83e52b32b46fc Author: Salim Achouche <sachouc...@gmail.com> AuthorDate: Fri Jun 29 17:31:40 2018 -0700 DRILL-6560: Enhanced the batch statistics logging enablement closes #1355 --- .../java/org/apache/drill/exec/ExecConstants.java | 3 + .../apache/drill/exec/physical/impl/ScanBatch.java | 11 +- .../exec/server/options/SystemOptionManager.java | 5 +- .../parquet/columnreaders/ParquetRecordReader.java | 3 +- .../parquet/columnreaders/VarLenBinaryReader.java | 15 +- .../batchsizing/OverflowSerDeUtil.java | 12 +- .../batchsizing/RecordBatchOverflow.java | 17 +- .../batchsizing/RecordBatchSizerManager.java | 45 +++-- .../drill/exec/util/record/RecordBatchStats.java | 181 ++++++++++++++------- .../java-exec/src/main/resources/drill-module.conf | 1 + 10 files changed, 186 insertions(+), 107 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 4c840a4..d0842d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -711,5 +711,8 @@ public final class ExecConstants { public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size"; public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION); + /** Controls the list of operators for which batch sizing stats should be enabled */ + public static final String STATS_LOGGING_BATCH_OPERATOR_OPTION = "drill.exec.stats.logging.enabled_operators"; + public static final StringValidator STATS_LOGGING_BATCH_OPERATOR_VALIDATOR = new StringValidator(STATS_LOGGING_BATCH_OPERATOR_OPTION); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 09e785e..4a2cd2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -82,7 +82,7 @@ public class ScanBatch implements CloseableRecordBatch { private final BufferAllocator allocator; private final List<Map<String, String>> implicitColumnList; private String currentReaderClassName; - private final RecordBatchStatsContext batchStatsLogging; + private final RecordBatchStatsContext batchStatsContext; /** * @@ -121,7 +121,7 @@ public class ScanBatch implements CloseableRecordBatch { this.implicitColumnList = implicitColumnList; addImplicitVectors(); currentReader = null; - batchStatsLogging = new RecordBatchStatsContext(context, oContext); + batchStatsContext = new RecordBatchStatsContext(context, oContext); } finally { oContext.getStats().stopProcessing(); } @@ -304,12 +304,7 @@ public class ScanBatch implements CloseableRecordBatch { return; // NOOP } - RecordBatchStats.logRecordBatchStats( - batchStatsLogging.getContextOperatorId(), - getFQNForLogging(MAX_FQN_LENGTH), - this, - batchStatsLogging, - logger); + RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext); } /** Might truncate the FQN if too long */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index a16bb4d..5ee3825 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -46,8 +46,8 @@ import com.google.common.collect.Lists; /** * <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}. - * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and - * persist between restarts. + * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and + * persist between restarts. * </p> * * <p> All the system options are externalized into conf file. While adding a new system option @@ -235,6 +235,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), + new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), }; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index e1ca73f..e33a505 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.parquet.ParquetReaderStats; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -240,7 +241,7 @@ public class ParquetRecordReader extends AbstractRecordReader { public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { this.operatorContext = operatorContext; schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns()); - batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead); + batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext)); logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(), hadoopPath.toUri().getPath()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java index 7bdc33e..1fb224d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java @@ -34,11 +34,11 @@ import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatch import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.vector.ValueVector; /** Class which handles reading a batch of rows from a set of variable columns */ public class VarLenBinaryReader { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenBinaryReader.class); final ParquetRecordReader parentReader; final RecordBatchSizerManager batchSizer; @@ -170,7 +170,8 @@ public class VarLenBinaryReader { // Lazy initialization if (builder == null) { - builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator()); + builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator(), + batchSizer.getBatchStatsContext()); } final int numOverflowValues = columnStat.numValuesRead - batchNumRecords; @@ -181,7 +182,7 @@ public class VarLenBinaryReader { // Register batch overflow data with the record batch sizer manager (if any) if (builder != null) { Map<String, FieldOverflowStateContainer> overflowContainerMap = parentReader.batchSizerMgr.getFieldOverflowMap(); - Map<String, FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs(); + Map<String, FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs(); for (Map.Entry<String, FieldOverflowDefinition> entry : overflowDefMap.entrySet()) { FieldOverflowStateContainer overflowStateContainer = new FieldOverflowStateContainer(entry.getValue(), null); @@ -197,9 +198,9 @@ public class VarLenBinaryReader { // Finally, re-order the variable length columns since an overflow occurred Collections.sort(orderedColumns, comparator); - if (logger.isDebugEnabled()) { - boolean isFirstValue = true; - final StringBuilder msg = new StringBuilder(RecordBatchSizerManager.BATCH_STATS_PREFIX); + if (batchSizer.getBatchStatsContext().isEnableBatchSzLogging()) { + boolean isFirstValue = true; + final StringBuilder msg = new StringBuilder(); msg.append(": Dumping the variable length columns read order: "); for (VLColumnContainer container : orderedColumns) { @@ -212,7 +213,7 @@ public class VarLenBinaryReader { } msg.append('.'); - logger.debug(msg.toString()); + RecordBatchStats.logRecordBatchStats(msg.toString(), batchSizer.getBatchStatsContext()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java index c542803..4a0e1e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java @@ -26,6 +26,8 @@ import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatch import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; import org.apache.drill.exec.vector.UInt1Vector; import org.apache.drill.exec.vector.UInt4Vector; @@ -44,7 +46,6 @@ import org.apache.drill.exec.vector.UInt4Vector; * </ul> */ final class OverflowSerDeUtil { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class); /** * Serializes a collection of overflow fields into a memory buffer: @@ -56,10 +57,12 @@ final class OverflowSerDeUtil { * * @param fieldOverflowEntries input collection of field overflow entries * @param allocator buffer allocator + * @param batchStatsContext batch statistics context object * @return record overflow container; null if the input buffer is empty */ static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries, - BufferAllocator allocator) { + BufferAllocator allocator, + RecordBatchStatsContext batchStatsContext) { if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) { return null; @@ -82,8 +85,9 @@ final class OverflowSerDeUtil { // Allocate the required memory to serialize the overflow fields final DrillBuf buffer = allocator.buffer(bufferLength); - if (logger.isDebugEnabled()) { - logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength)); + if (batchStatsContext.isEnableBatchSzLogging()) { + final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength); + RecordBatchStats.logRecordBatchStats(msg, batchStatsContext); } // Create the result object diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java index 76422ae..462ddf0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; @@ -39,10 +40,11 @@ public final class RecordBatchOverflow { /** * @param allocator buffer allocator + * @param batchStatsContext batch statistics context * @return new builder object */ - public static Builder newBuilder(BufferAllocator allocator) { - return new Builder(allocator); + public static Builder newBuilder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) { + return new Builder(allocator, batchStatsContext); } /** @@ -75,13 +77,17 @@ public final class RecordBatchOverflow { private final List<FieldOverflowEntry> fieldOverflowEntries = new ArrayList<FieldOverflowEntry>(); /** Buffer allocator */ private final BufferAllocator allocator; + /** Batch statistics context */ + private final RecordBatchStatsContext batchStatsContext; /** * Build class to construct a {@link RecordBatchOverflow} object. * @param allocator buffer allocator + * @param batchStatsContext batch statistics context */ - private Builder(BufferAllocator allocator) { + private Builder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) { this.allocator = allocator; + this.batchStatsContext = batchStatsContext; } /** @@ -101,9 +107,8 @@ public final class RecordBatchOverflow { * @return a new built {link BatchRecordOverflow} object instance */ public RecordBatchOverflow build() { - RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator); - RecordBatchOverflow result = - new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator); + RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator, batchStatsContext); + RecordBatchOverflow result = new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator); return result; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java index 01644f7..5ddcf7e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java @@ -30,6 +30,8 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata; import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema; import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; @@ -39,7 +41,7 @@ import org.apache.drill.exec.vector.ValueVector; */ public final class RecordBatchSizerManager { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class); - public static final String BATCH_STATS_PREFIX = "BATCH_STATS"; + /** Minimum column memory size */ private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize(); @@ -78,6 +80,9 @@ public final class RecordBatchSizerManager { */ private Map<String, FieldOverflowStateContainer> fieldOverflowMap = CaseInsensitiveMap.newHashMap(); + /** For controlling batch statistics logging */ + private final RecordBatchStatsContext batchStatsContext; + /** * Constructor. * @@ -87,7 +92,8 @@ public final class RecordBatchSizerManager { */ public RecordBatchSizerManager(OptionManager options, ParquetSchema schema, - long totalRecordsToRead) { + long totalRecordsToRead, + RecordBatchStatsContext batchStatsContext) { this.schema = schema; this.totalRecordsToRead = totalRecordsToRead; @@ -97,6 +103,7 @@ public final class RecordBatchSizerManager { this.maxRecordsPerBatch = this.configRecordsPerBatch; this.recordsPerBatch = this.configRecordsPerBatch; this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap); + this.batchStatsContext = batchStatsContext; } /** @@ -131,6 +138,13 @@ public final class RecordBatchSizerManager { } /** + * @return batch statistics context + */ + public RecordBatchStatsContext getBatchStatsContext() { + return batchStatsContext; + } + + /** * Allocates value vectors for the current batch. * * @param vectorMap a collection of value vectors keyed by their field names @@ -282,10 +296,9 @@ public final class RecordBatchSizerManager { normalizedNumRecords = (int) totalRecordsToRead; } - if (logger.isDebugEnabled()) { - final String message = String.format("%s: The Parquet reader number of record(s) has been set to [%d]", - BATCH_STATS_PREFIX, normalizedNumRecords); - logger.debug(message); + if (batchStatsContext.isEnableBatchSzLogging()) { + final String message = String.format("The Parquet reader number of record(s) has been set to [%d]", normalizedNumRecords); + RecordBatchStats.logRecordBatchStats(message, batchStatsContext); } return normalizedNumRecords; @@ -319,10 +332,9 @@ public final class RecordBatchSizerManager { logger.warn(message); } - if (logger.isDebugEnabled()) { - final String message = String.format("%s: The Parquet reader batch memory has been set to [%d] byte(s)", - BATCH_STATS_PREFIX, normalizedMemorySize); - logger.debug(message); + if (batchStatsContext.isEnableBatchSzLogging()) { + final String message = String.format("The Parquet reader batch memory has been set to [%d] byte(s)", normalizedMemorySize); + RecordBatchStats.logRecordBatchStats(message, batchStatsContext); } return normalizedMemorySize; @@ -370,13 +382,12 @@ public final class RecordBatchSizerManager { assignFineGrainedMemoryQuota(); // log the new record batch if it changed - if (logger.isDebugEnabled()) { + if (batchStatsContext.isEnableBatchSzLogging()) { assert recordsPerBatch <= maxRecordsPerBatch; if (originalRecordsPerBatch != recordsPerBatch) { - final String message = String.format("%s: The Parquet records per batch [%d] has been decreased to [%d]", - BATCH_STATS_PREFIX, originalRecordsPerBatch, recordsPerBatch); - logger.debug(message); + final String message = String.format("The Parquet records per batch [%d] has been decreased to [%d]", originalRecordsPerBatch, recordsPerBatch); + RecordBatchStats.logRecordBatchStats(message, batchStatsContext); } // Now dump the per column memory quotas @@ -504,12 +515,12 @@ public final class RecordBatchSizerManager { } private void dumpColumnMemoryQuotas() { - StringBuilder msg = new StringBuilder(BATCH_STATS_PREFIX); + StringBuilder msg = new StringBuilder(); msg.append(": Field Quotas:\n\tName\tType\tPrec\tQuota\n"); for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) { msg.append("\t"); - msg.append(BATCH_STATS_PREFIX); + msg.append(RecordBatchStats.BATCH_STATS_PREFIX); msg.append("\t"); msg.append(columnInfo.columnMeta.getField().getName()); msg.append("\t"); @@ -521,7 +532,7 @@ public final class RecordBatchSizerManager { msg.append("\n"); } - logger.debug(msg.toString()); + RecordBatchStats.logRecordBatchStats(msg.toString(), batchStatsContext); } private static void printType(MaterializedField field, StringBuilder msg) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java index 8b213a8..0b24244 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.util.record; -import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -25,7 +24,6 @@ import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize; @@ -34,13 +32,14 @@ import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize; * Utility class to capture key record batch statistics. */ public final class RecordBatchStats { + // Logger + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStats.class); + /** A prefix for all batch stats to simplify search */ public static final String BATCH_STATS_PREFIX = "BATCH_STATS"; /** Helper class which loads contextual record batch logging options */ public static final class RecordBatchStatsContext { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStatsContext.class); - /** batch size logging for all readers */ private final boolean enableBatchSzLogging; /** Fine grained batch size logging */ @@ -52,8 +51,17 @@ public final class RecordBatchStats { * @param options options manager */ public RecordBatchStatsContext(FragmentContext context, OperatorContext oContext) { - enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION); - enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION); + final boolean operatorEnabledForStatsLogging = isBatchStatsEnabledForOperator(context, oContext); + + if (operatorEnabledForStatsLogging) { + enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION); + enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION); + + } else { + enableBatchSzLogging = false; + enableFgBatchSzLogging = false; + } + contextOperatorId = new StringBuilder() .append(getQueryId(context)) .append(":") @@ -100,6 +108,104 @@ public final class RecordBatchStats { } return "NA"; } + + private boolean isBatchStatsEnabledForOperator(FragmentContext context, OperatorContext oContext) { + // The configuration can select what operators should log batch statistics + final String statsLoggingOperator = context.getOptions().getString(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_OPTION).toUpperCase(); + final String allOperatorsStr = "ALL"; + + // All operators are allowed to log batch statistics + if (allOperatorsStr.equals(statsLoggingOperator)) { + return true; + } + + // No, only a select few are allowed; syntax: operator-id-1,operator-id-2,.. + final String[] operators = statsLoggingOperator.split(","); + final String operatorId = oContext.getStats().getId().toUpperCase(); + + for (int idx = 0; idx < operators.length; idx++) { + // We use "contains" because the operator identifier is a composite string; e.g., 3:[PARQUET_ROW_GROUP_SCAN] + if (operatorId.contains(operators[idx].trim())) { + return true; + } + } + + return false; + } + } + + /** + * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)} + */ + public static void logRecordBatchStats(RecordBatch recordBatch, + RecordBatchStatsContext batchStatsContext) { + + logRecordBatchStats(null, recordBatch, batchStatsContext); + } + + /** + * Logs record batch statistics for the input record batch (logging happens only + * when record statistics logging is enabled). + * + * @param sourceId optional source identifier for scanners + * @param recordBatch a set of records + * @param batchStatsContext batch stats context object + */ + public static void logRecordBatchStats(String sourceId, + RecordBatch recordBatch, + RecordBatchStatsContext batchStatsContext) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + final String statsId = batchStatsContext.getContextOperatorId(); + final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging(); + final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose); + + logBatchStatsMsg(batchStatsContext, msg, false); + } + + /** + * Logs a generic batch statistics message + * + * @param message log message + * @param batchStatsLogging + * @param batchStatsContext batch stats context object + */ + public static void logRecordBatchStats(String message, + RecordBatchStatsContext batchStatsContext) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + logBatchStatsMsg(batchStatsContext, message, true); + } + + /** + * @param allocator dumps allocator statistics + * @return string with allocator statistics + */ + public static String printAllocatorStats(BufferAllocator allocator) { + StringBuilder msg = new StringBuilder(); + msg.append(BATCH_STATS_PREFIX); + msg.append(": dumping allocator statistics:\n"); + msg.append(BATCH_STATS_PREFIX); + msg.append(": "); + msg.append(allocator.toString()); + + return msg.toString(); + } + +// ---------------------------------------------------------------------------- +// Local Implementation +// ---------------------------------------------------------------------------- + + /** + * Disabling class object instantiation. + */ + private RecordBatchStats() { } /** @@ -112,7 +218,7 @@ public final class RecordBatchStats { * * @return a string containing the record batch statistics */ - public static String printRecordBatchStats(String statsId, + private static String printRecordBatchStats(String statsId, String sourceId, RecordBatch recordBatch, boolean verbose) { @@ -158,68 +264,19 @@ public final class RecordBatchStats { return msg.toString(); } - /** - * Logs record batch statistics for the input record batch (logging happens only - * when record statistics logging is enabled). - * - * @param stats instance identifier - * @param sourceId optional source identifier for scanners - * @param recordBatch a set of records - * @param verbose whether to include fine-grained stats - * @param logger Logger where to print the record batch statistics - */ - public static void logRecordBatchStats(String statsId, - String sourceId, - RecordBatch recordBatch, - RecordBatchStatsContext batchStatsLogging, - org.slf4j.Logger logger) { + private static void logBatchStatsMsg(RecordBatchStatsContext batchStatsContext, + String msg, + boolean includePrefix) { - if (!batchStatsLogging.isEnableBatchSzLogging()) { - return; // NOOP + if (includePrefix) { + msg = BATCH_STATS_PREFIX + '\t' + msg; } - final boolean verbose = batchStatsLogging.isEnableFgBatchSzLogging(); - final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose); - - if (batchStatsLogging.useInfoLevelLogging()) { + if (batchStatsContext.useInfoLevelLogging()) { logger.info(msg); } else { logger.debug(msg); } } - /** - * Prints a materialized field type - * @param field materialized field - * @param msg string builder where to append the field type - */ - public static void printFieldType(MaterializedField field, StringBuilder msg) { - final MajorType type = field.getType(); - - msg.append(type.getMinorType().name()); - msg.append(':'); - msg.append(type.getMode().name()); - } - - /** - * @param allocator dumps allocator statistics - * @return string with allocator statistics - */ - public static String printAllocatorStats(BufferAllocator allocator) { - StringBuilder msg = new StringBuilder(); - msg.append(BATCH_STATS_PREFIX); - msg.append(": dumping allocator statistics:\n"); - msg.append(BATCH_STATS_PREFIX); - msg.append(": "); - msg.append(allocator.toString()); - - return msg.toString(); - } - - /** - * Disabling class object instantiation. - */ - private RecordBatchStats() { - } - } \ No newline at end of file diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index b0cc209..19e779d 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -488,6 +488,7 @@ drill.exec.options: { exec.udf.use_dynamic: true, drill.exec.stats.logging.batch_size: false, drill.exec.stats.logging.fine_grained.batch_size: false, + drill.exec.stats.logging.enabled_operators: all, new_view_default_permissions: 700, org.apache.drill.exec.compile.ClassTransformer.scalar_replacement: "try", planner.add_producer_consumer: false,