[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198938982 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java ## @@ -90,22 +91,161 @@ public long readFields(long recordsToReadInThisPass) throws IOException { recordsReadInCurrentPass = readRecordsInBulk((int) recordsToReadInThisPass); } +// Publish this information +parentReader.readState.setValuesReadInCurrentPass((int) recordsReadInCurrentPass); + +// Update the stats parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS)); return recordsReadInCurrentPass; } private int readRecordsInBulk(int recordsToReadInThisPass) throws IOException { -int recordsReadInCurrentPass = -1; +int batchNumRecords = recordsToReadInThisPass; +List columnStats = new ArrayList(columns.size()); +int prevReadColumns = -1; +boolean overflowCondition = false; + +for (VLColumnContainer columnContainer : orderedColumns) { + VarLengthColumn columnReader = columnContainer.column; + + // Read the column data + int readColumns = columnReader.readRecordsInBulk(batchNumRecords); + assert readColumns <= batchNumRecords : "Reader cannot return more values than requested.."; + + if (!overflowCondition) { +if (prevReadColumns >= 0 && prevReadColumns != readColumns) { + overflowCondition = true; +} else { + prevReadColumns = readColumns; +} + } + + // Enqueue this column entry information to handle overflow conditions; we will not know + // whether an overflow happened till all variable length columns have been processed + columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec, readColumns)); + // Decrease the number of records to read when a column returns less records (minimize overflow) + if (batchNumRecords > readColumns) { +batchNumRecords = readColumns; +// it seems this column caused an overflow (higher layer will not ask for more values than remaining) +++columnContainer.numCausedOverflows; + } +} + +// Set the value-count for each column for (VarLengthColumn columnReader : columns) { - int readColumns = columnReader.readRecordsInBulk(recordsToReadInThisPass); - assert (readColumns >= 0 && recordsReadInCurrentPass == readColumns || recordsReadInCurrentPass == -1); + columnReader.valuesReadInCurrentPass = batchNumRecords; +} + +// Publish this batch statistics +publishBatchStats(columnStats, batchNumRecords); - recordsReadInCurrentPass = readColumns; +// Handle column(s) overflow if any +if (overflowCondition) { + handleColumnOverflow(columnStats, batchNumRecords); } -return recordsReadInCurrentPass; +return batchNumRecords; + } + + private void handleColumnOverflow(List columnStats, int batchNumRecords) { +// Overflow would happen if a column returned more values than "batchValueCount"; this can happen +// when a column Ci is called first, returns num-values-i, and then another column cj is called which +// returns less values than num-values-i. +RecordBatchOverflow.Builder builder = null; + +// We need to collect all columns which are subject to an overflow (except for the ones which are already +// returning values from previous batch overflow) +for (VarLenColumnBatchStats columnStat : columnStats) { + if (columnStat.numValuesRead > batchNumRecords) { +// We need to figure out whether this column was already returning values from a previous batch +// overflow; if it is, then this is a NOOP (as the overflow data is still available to be replayed) +if (fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) { + continue; +} + +// We need to set the value-count as otherwise some size related vector APIs won't work +columnStat.vector.getMutator().setValueCount(batchNumRecords); Review comment: Correct; for this reason the design a) optimizes overflow handling and b) attempts to minimize overflow occurrence. NOTE - The same behavior is observed when the Drillbuf exponentially expands the buffer size. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198940291 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java ## @@ -72,14 +79,22 @@ this.columnPrecInfo = columnPrecInfoInput; this.entry = new VarLenColumnBulkEntry(this.columnPrecInfo); this.containerCallback = containerCallbackInput; +this.fieldOverflowStateContainer = fieldOverflowStateContainer; // Initialize the Variable Length Entry Readers -fixedReader = new VarLenFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry); -nullableFixedReader = new VarLenNullableFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry); -variableLengthReader = new VarLenEntryReader(buffer, pageInfo, columnPrecInfo, entry); -nullableVLReader = new VarLenNullableEntryReader(buffer, pageInfo, columnPrecInfo, entry); -dictionaryReader = new VarLenEntryDictionaryReader(buffer, pageInfo, columnPrecInfo, entry); -nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry); +fixedReader = new VarLenFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback); Review comment: Variable columns bulk reading involves a couple of 4k buffers; not sure if RS is needed for this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199030833 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ## @@ -322,6 +309,11 @@ public void close() { readState = null; } +if (batchSizerMgr != null) { Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198930977 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ## @@ -315,6 +315,13 @@ private ExecConstants() { public static final String PARQUET_FLAT_READER_BULK = "store.parquet.flat.reader.bulk"; public static final OptionValidator PARQUET_FLAT_READER_BULK_VALIDATOR = new BooleanValidator(PARQUET_FLAT_READER_BULK); + // Controls the flat parquet reader batching constraints (number of record and memory limit) + public static final String PARQUET_FLAT_BATCH_NUM_RECORDS = "store.parquet.flat.batch.num_records"; Review comment: - First of all, these constraints are meant for internal use - Providing a constraint on the number of rows allows us a) to cap this number (e.g., less than 64k-1 to avoid overflowing vectors with offsets or nullables) and b) to all allow the performance team tune the best number of rows per batch; for example, the memory constraint could be 32/16MB but yet a batch of 8k rows is more than enough for a good performance. The higher memory is to handle wide selection.. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198935830 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java ## @@ -132,16 +123,20 @@ public FixedWidthReader(ReadState readState) { super(readState); } -@Override -protected long getReadCount(ColumnReader firstColumnStatus) { - return Math.min(readState.schema().getRecordsPerBatch(), - firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead); -} - @Override protected int readRecords(ColumnReader firstColumnStatus, long recordsToRead) throws Exception { readAllFixedFields(recordsToRead); - return firstColumnStatus.getRecordsReadInCurrentPass(); + + if (firstColumnStatus != null) { + readState.setValuesReadInCurrentPass(firstColumnStatus.getRecordsReadInCurrentPass()); + } else { +// No rows to return if there are no columns +readState.setValuesReadInCurrentPass(0); + } + + readState.updateCounts((int) recordsToRead); Review comment: I was trying to handle the following case: - Assume query attempts to read columns 'a' and 'b' - The first few parquet files contain these columns - But one of the input file doesn't have 'a' and 'b' - The correct behavior is to honor the total number of rows albeit filling nulls for the missing columns After your comment, I did some digging and notice this situation cannot arise with a Fixed or Variable length reader: - The Parquet record reader catches this occurrence early on and instead uses the Mock reader which will implement what I tried to implement in the Fixed reader - I have removed the added logic (for handling a null firstColumnStatus) and added a check that there should be at least one column to process (that is, firstColumnStatus should never be null) - I am re-running the tests to ensure that my understanding is correct This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199036653 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java ## @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders.batchsizing; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema; +import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; + +/** + * This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic. + * Currently a record batch size is constrained with two parameters: Number of rows and Memory usage. + */ +public final class RecordBatchSizerManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class); + public static final String BATCH_STATS_PREFIX = "BATCH_STATS"; + + /** Minimum column memory size */ + private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize(); + /** Default memory per batch */ + private static final int DEFAULT_MEMORY_SZ_PER_BATCH = 16 * 1024 * 1024; + /** Default records per batch */ + private static final int DEFAULT_RECORDS_PER_BATCH = 32 * 1024 -1; Review comment: The default values have been added previously before we agreed on a common operator configuration. I have removed these default values. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199036942 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java ## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.record; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.FragmentContextImpl; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize; + +/** + * Utility class to capture key record batch statistics. + */ +public final class RecordBatchStats { + /** A prefix for all batch stats to simplify search */ + public static final String BATCH_STATS_PREFIX = "BATCH_STATS"; + + /** Helper class which loads contextual record batch logging options */ + public static final class RecordBatchStatsContext { +private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStatsContext.class); + +/** batch size logging for all readers */ +private final boolean enableBatchSzLogging; +/** Fine grained batch size logging */ +private final boolean enableFgBatchSzLogging; +/** Unique Operator Identifier */ +private final String contextOperatorId; + +/** + * @param options options manager + */ +public RecordBatchStatsContext(FragmentContext context, OperatorContext oContext) { + enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION); + enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION); + contextOperatorId = new StringBuilder() +.append(getQueryId(context)) +.append(":") +.append(oContext.getStats().getId()) +.toString(); +} + +/** + * @return the enableBatchSzLogging + */ +public boolean isEnableBatchSzLogging() { + return enableBatchSzLogging || enableFgBatchSzLogging || logger.isDebugEnabled(); +} + +/** + * @return the enableFgBatchSzLogging + */ +public boolean isEnableFgBatchSzLogging() { + return enableFgBatchSzLogging || logger.isDebugEnabled(); +} + +/** + * @return indicates whether stats messages should be logged in info or debug level + */ +public boolean useInfoLevelLogging() { + return isEnableBatchSzLogging() && !logger.isDebugEnabled(); Review comment: please see response above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199031515 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java ## @@ -160,11 +191,41 @@ public final void done() { // Page read position is meaningful only when dictionary mode is off if (pageInfo.dictionaryValueReader == null - || !pageInfo.dictionaryValueReader.isDefined()) { +|| !pageInfo.dictionaryValueReader.isDefined()) { parentInst.pageReader.readyToReadPosInBytes = oprReadState.pageReadPos; } parentInst.pageReader.valuesRead = oprReadState.numPageFieldsProcessed; -parentInst.totalValuesRead += oprReadState.batchFieldIndex; +parentInst.totalValuesRead += oprReadState.batchNumValuesReadFromPages; + +if (logger.isDebugEnabled()) { + String message = String.format("requested=%d, returned=%d, total-returned=%d", Review comment: I prefer the String.format(...) API because it has superior type checking and formatting capabilities. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199037140 ## File path: exec/java-exec/src/main/resources/drill-module.conf ## @@ -482,6 +482,8 @@ drill.exec.options: { exec.storage.enable_new_text_reader: true, exec.udf.enable_dynamic_support: true, exec.udf.use_dynamic: true, +drill.exec.stats.logging.batch_size: false, +drill.exec.stats.logging.fine_grained.batch_size: false, Review comment: Please refer to the response above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198937973 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java ## @@ -90,22 +91,161 @@ public long readFields(long recordsToReadInThisPass) throws IOException { recordsReadInCurrentPass = readRecordsInBulk((int) recordsToReadInThisPass); } +// Publish this information +parentReader.readState.setValuesReadInCurrentPass((int) recordsReadInCurrentPass); + +// Update the stats parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS)); return recordsReadInCurrentPass; } private int readRecordsInBulk(int recordsToReadInThisPass) throws IOException { -int recordsReadInCurrentPass = -1; +int batchNumRecords = recordsToReadInThisPass; +List columnStats = new ArrayList(columns.size()); +int prevReadColumns = -1; +boolean overflowCondition = false; + +for (VLColumnContainer columnContainer : orderedColumns) { + VarLengthColumn columnReader = columnContainer.column; + + // Read the column data + int readColumns = columnReader.readRecordsInBulk(batchNumRecords); + assert readColumns <= batchNumRecords : "Reader cannot return more values than requested.."; + + if (!overflowCondition) { +if (prevReadColumns >= 0 && prevReadColumns != readColumns) { + overflowCondition = true; +} else { + prevReadColumns = readColumns; +} + } + + // Enqueue this column entry information to handle overflow conditions; we will not know + // whether an overflow happened till all variable length columns have been processed + columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec, readColumns)); + // Decrease the number of records to read when a column returns less records (minimize overflow) + if (batchNumRecords > readColumns) { +batchNumRecords = readColumns; +// it seems this column caused an overflow (higher layer will not ask for more values than remaining) +++columnContainer.numCausedOverflows; + } +} + +// Set the value-count for each column for (VarLengthColumn columnReader : columns) { - int readColumns = columnReader.readRecordsInBulk(recordsToReadInThisPass); - assert (readColumns >= 0 && recordsReadInCurrentPass == readColumns || recordsReadInCurrentPass == -1); + columnReader.valuesReadInCurrentPass = batchNumRecords; +} + +// Publish this batch statistics +publishBatchStats(columnStats, batchNumRecords); - recordsReadInCurrentPass = readColumns; +// Handle column(s) overflow if any +if (overflowCondition) { + handleColumnOverflow(columnStats, batchNumRecords); } -return recordsReadInCurrentPass; +return batchNumRecords; + } + + private void handleColumnOverflow(List columnStats, int batchNumRecords) { +// Overflow would happen if a column returned more values than "batchValueCount"; this can happen +// when a column Ci is called first, returns num-values-i, and then another column cj is called which +// returns less values than num-values-i. +RecordBatchOverflow.Builder builder = null; + +// We need to collect all columns which are subject to an overflow (except for the ones which are already +// returning values from previous batch overflow) +for (VarLenColumnBatchStats columnStat : columnStats) { + if (columnStat.numValuesRead > batchNumRecords) { +// We need to figure out whether this column was already returning values from a previous batch +// overflow; if it is, then this is a NOOP (as the overflow data is still available to be replayed) +if (fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) { Review comment: Sure! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198941031 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java ## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders.batchsizing; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BaseAllocator; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.NullableVarDecimalVector; +import org.apache.drill.exec.vector.UInt1Vector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.exec.vector.VarDecimalVector; +import org.apache.drill.exec.vector.VariableWidthVector; + +/** Helper class to assist the Flat Parquet reader build batches which adhere to memory sizing constraints */ +public final class BatchSizingMemoryUtil { + + /** BYTE in-memory width */ + public static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH; + /** INT in-memory width */ + public static final int INT_VALUE_WIDTH = UInt4Vector.VALUE_WIDTH; + /** Default variable length column average precision; + * computed in such a way that 64k values will fit within one MB to minimize internal fragmentation + */ + public static final int DEFAULT_VL_COLUMN_AVG_PRECISION = 16; + + /** + * This method will also load detailed information about this column's current memory usage (with regard + * to the value vectors). + * + * @param columnMemoryUsage container which contains column's memory usage information (usage information will + *be automatically updated by this method) + * @param newBitsMemory New nullable data which might be inserted when processing a new input chunk + * @param newOffsetsMemory New offsets data which might be inserted when processing a new input chunk + * @param newDataMemory New data which might be inserted when processing a new input chunk + * + * @return true if adding the new data will not lead this column's Value Vector go beyond the allowed + * limit; false otherwise + */ + public static boolean canAddNewData(ColumnMemoryUsageInfo columnMemoryUsage, +int newBitsMemory, +int newOffsetsMemory, +int newDataMemory) { + +// First we need to update the vector memory usage +final VectorMemoryUsageInfo vectorMemoryUsage = columnMemoryUsage.vectorMemoryUsage; +getMemoryUsage(columnMemoryUsage.vector, columnMemoryUsage.currValueCount, vectorMemoryUsage); + +// We need to compute the new ValueVector memory usage if we attempt to add the new payload +// usedCapacity, int newPayload, int currentCapacity +int totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed, + newBitsMemory, + vectorMemoryUsage.bitsBytesCapacity); + +int totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed, + newOffsetsMemory, + vectorMemoryUsage.offsetsByteCapacity); + +int totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed, + newDataMemory, + vectorMemoryUsage.dataByteCapacity); + +// Alright now we can figure out whether the new payload will take us over the maximum memory threshold +int totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory; +assert totalMemory >= 0; + +return totalMemory <= columnMemoryUsage.memoryQuota.getMaxMemoryUsage(); + } + + /** + * Load memory usage information for a variable length value vector + * + * @param vector
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198933150 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ## @@ -149,6 +149,8 @@ new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR), + new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR), + new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR), Review comment: Thanks @ilooner for this info; that was my goal but didn't think the logistic was there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198937814 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java ## @@ -90,22 +91,161 @@ public long readFields(long recordsToReadInThisPass) throws IOException { recordsReadInCurrentPass = readRecordsInBulk((int) recordsToReadInThisPass); } +// Publish this information +parentReader.readState.setValuesReadInCurrentPass((int) recordsReadInCurrentPass); + +// Update the stats parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS)); return recordsReadInCurrentPass; } private int readRecordsInBulk(int recordsToReadInThisPass) throws IOException { -int recordsReadInCurrentPass = -1; +int batchNumRecords = recordsToReadInThisPass; +List columnStats = new ArrayList(columns.size()); +int prevReadColumns = -1; +boolean overflowCondition = false; + +for (VLColumnContainer columnContainer : orderedColumns) { + VarLengthColumn columnReader = columnContainer.column; + + // Read the column data + int readColumns = columnReader.readRecordsInBulk(batchNumRecords); + assert readColumns <= batchNumRecords : "Reader cannot return more values than requested.."; + + if (!overflowCondition) { +if (prevReadColumns >= 0 && prevReadColumns != readColumns) { + overflowCondition = true; +} else { + prevReadColumns = readColumns; +} + } + + // Enqueue this column entry information to handle overflow conditions; we will not know + // whether an overflow happened till all variable length columns have been processed + columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec, readColumns)); + // Decrease the number of records to read when a column returns less records (minimize overflow) + if (batchNumRecords > readColumns) { +batchNumRecords = readColumns; +// it seems this column caused an overflow (higher layer will not ask for more values than remaining) +++columnContainer.numCausedOverflows; Review comment: No this is not the case: - For the last entry, will have the following state: overflowCondition = false, prevReadColumns = 200, readColumns = 100 - This means overflowCondition will be changed to true This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198942743 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java ## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders.batchsizing; + +import io.netty.buffer.DrillBuf; +import java.util.List; +import java.util.Map; +import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition; +import org.apache.drill.exec.vector.UInt1Vector; +import org.apache.drill.exec.vector.UInt4Vector; + +/** + * Field overflow SERDE utility; note that overflow data is serialized as a way to minimize + * memory usage. This information is deserialized back to ValueVectors when it is needed in + * the next batch. + * + * NOTE -We use a specialized implementation for overflow SERDE (instead of reusing + * existing ones) because of the following reasons: + * + * We want to only serialize a subset of the VV data + * Other SERDE methods will not copy the data contiguously and instead rely on the + * RPC layer to write the drill buffers in the correct order so that they are + * de-serialized as a single contiguous drill buffer + * + */ +final class OverflowSerDeUtil { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class); + + /** + * Serializes a collection of overflow fields into a memory buffer: + * + * Serialization logic can handle a subset of values (should be contiguous) + * Serialized data is copied into a single DrillBuf + * Currently, only variable length data is supported + * + * + * @param fieldOverflowEntries input collection of field overflow entries + * @param allocator buffer allocator + * @return record overflow container; null if the input buffer is empty + */ + static RecordOverflowContainer serialize(List fieldOverflowEntries, +BufferAllocator allocator) { + +if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) { + return null; +} + +// We need to: +// - Construct a map of VLVectorSerDe for each overflow field +// - Compute the total space required for efficient serialization of all overflow data +final Map fieldSerDeMap = CaseInsensitiveMap.newHashMap(); +int bufferLength = 0; + +for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) { + final VLVectorSerializer fieldVLSerDe = new VLVectorSerializer(fieldOverflowEntry); + fieldSerDeMap.put(fieldOverflowEntry.vector.getField().getName(), fieldVLSerDe); + + bufferLength += fieldVLSerDe.getBytesUsed(fieldOverflowEntry.firstValueIdx, fieldOverflowEntry.numValues); +} +assert bufferLength >= 0; + +// Allocate the required memory to serialize the overflow fields +final DrillBuf buffer = allocator.buffer(bufferLength); + +if (logger.isDebugEnabled()) { + logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength)); +} + +// Create the result object +final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer(); +final RecordOverflowDefinition recordOverflowDef = recordOverflowContainer.recordOverflowDef; + +// Now serialize field overflow into the drill buffer +int bufferOffset = 0; +FieldSerializerContainer fieldSerializerContainer = new FieldSerializerContainer(); + +for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) { + fieldSerializerContainer.clear();
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199029854 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ## @@ -149,6 +149,8 @@ new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR), + new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR), + new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR), Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199036789 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java ## @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders.batchsizing; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema; +import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput; +import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; + +/** + * This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic. + * Currently a record batch size is constrained with two parameters: Number of rows and Memory usage. + */ +public final class RecordBatchSizerManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class); + public static final String BATCH_STATS_PREFIX = "BATCH_STATS"; + + /** Minimum column memory size */ + private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize(); + /** Default memory per batch */ + private static final int DEFAULT_MEMORY_SZ_PER_BATCH = 16 * 1024 * 1024; + /** Default records per batch */ + private static final int DEFAULT_RECORDS_PER_BATCH = 32 * 1024 -1; + + /** Parquet schema object */ + private final ParquetSchema schema; + /** Total records to read */ + private final long totalRecordsToRead; + /** Logic to minimize overflow occurrences */ + private final BatchOverflowOptimizer overflowOptimizer; + + /** Configured Parquet records per batch */ + private final int configRecordsPerBatch; + /** Configured Parquet memory size per batch */ + private final int configMemorySizePerBatch; + /** An upper bound on the Parquet records per batch based on the configured value and schema */ + private int maxRecordsPerBatch; + /** An upper bound on the Parquet memory size per batch based on the configured value and schema */ + private int maxMemorySizePerBatch; + /** The current number of records per batch as it can be dynamically optimized */ + private int recordsPerBatch; + + /** List of fixed columns */ + private final List fixedLengthColumns = new ArrayList(); + /** List of variable columns */ + private final List variableLengthColumns = new ArrayList(); + /** Field to column memory information map */ + private final Map columnMemoryInfoMap = CaseInsensitiveMap.newHashMap(); + /** Indicator invoked when column(s) precision change */ + private boolean columnPrecisionChanged; + + /** + * Field overflow map; this information is stored within this class for two reasons: + * a) centralization to simplify resource deallocation (overflow data is backed by Direct Memory) + * b) overflow is a result of batch constraints enforcement which this class manages the overflow logic + */ + private Map fieldOverflowMap = CaseInsensitiveMap.newHashMap(); + + /** + * Constructor. + * + * @param options drill options + * @param schema current reader schema + * @param totalRecordsToRead total number of rows to read + */ + public RecordBatchSizerManager(OptionManager options, +ParquetSchema schema, +long totalRecordsToRead) { + +this.schema = schema; +this.totalRecordsToRead = totalRecordsToRead; +this.configRecordsPerBatch =
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r199031792 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java ## @@ -311,6 +372,113 @@ private void loadPageIfNeeed() throws IOException { } } + private boolean batchConstraintsReached() { +// Let's update this column's memory quota +columnMemoryQuota = batchSizerMgr.getCurrentFieldBatchMemory(parentInst.valueVec.getField().getName()); +assert columnMemoryQuota.getMaxMemoryUsage() > 0; + +// Now try to figure out whether the next chunk will take us beyond the memory quota +final int maxNumRecordsInChunk = VarLenBulkPageReader.BUFF_SZ / BatchSizingMemoryUtil.INT_VALUE_WIDTH; Review comment: This is related to the Bulk Entry containing a maximum of 1024 values (for a 4k buffer). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
sachouche commented on a change in pull request #1330: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality URL: https://github.com/apache/drill/pull/1330#discussion_r198932595 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ## @@ -680,4 +687,13 @@ public static String bootDefaultFor(String name) { public static final String ALLOW_LOOPBACK_ADDRESS_BINDING = "drill.exec.allow_loopback_address_binding"; + /** Enables batch size statistics logging */ + public static final String STATS_LOGGING_BATCH_SIZE_OPTION = "drill.exec.stats.logging.batch_size"; + public static final BooleanValidator STATS_LOGGING_BATCH_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_BATCH_SIZE_OPTION); + + /** Enables fine-grained batch size statistics logging */ + public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size"; + public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION); Review comment: @ilooner and I had an offline chat where I explained what this commit was trying to achieve: - Centralize batch statistics logging behavior - All batch statistics should be logged when debug level set - User should be able to control batch statistics behavior within a client session: a) what operator stats to log and b) coarse vs fine grained stats NOTE - Currently, Drill logging is static as a server restart is required to modify the logging behavior This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services