[GitHub] drill pull request #1227: Drill-6236: batch sizing for hash join
Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r183181073 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -560,6 +554,40 @@ public void close() { super.close(); } + @Override + protected void updateBatchMemoryManagerStats() { +stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); --- End diff -- Should be fine. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r183171726 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -560,6 +554,40 @@ public void close() { super.close(); } + @Override + protected void updateBatchMemoryManagerStats() { +stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); --- End diff -- @sohami I will create a JIRA and address that in a separate PR. For now, I would like to override this method. is that ok ? ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r183169484 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -560,6 +554,40 @@ public void close() { super.close(); } + @Override + protected void updateBatchMemoryManagerStats() { +stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); --- End diff -- @ppadma - The main motive of moving the metrics inside JoinBatchMemoryManager was to avoid duplicate definition in all the BinaryRecordBatches. I think we should improve on OperatorMetricsRegistry rather than just overriding this method. Or if you want you can create a new JIRA to track metrics related improvement. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r183145171 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -147,7 +150,19 @@ NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, -RESIZING_TIME_MS; +RESIZING_TIME_MS, +LEFT_INPUT_BATCH_COUNT, +LEFT_AVG_INPUT_BATCH_BYTES, +LEFT_AVG_INPUT_ROW_BYTES, +LEFT_INPUT_RECORD_COUNT, +RIGHT_INPUT_BATCH_COUNT, +RIGHT_AVG_INPUT_BATCH_BYTES, +RIGHT_AVG_INPUT_ROW_BYTES, +RIGHT_INPUT_RECORD_COUNT, +OUTPUT_BATCH_COUNT, +AVG_OUTPUT_BATCH_BYTES, +AVG_OUTPUT_ROW_BYTES, +OUTPUT_RECORD_COUNT; --- End diff -- Putting these metrics inside operator Metric class will not work. For joins these metrics were moved inside `JoinBatchMemoryManager.Metric` class since they are memory manager metrics. So when you call `updateBatchMemoryManagerStats()` it updates the operator stats but using ordinals from `JoinBatchMemoryManager.Metric` class. So the ordinal for LEFT_INPUT_BATCH_COUNT will be 0 not 4 (which is required). I think we should improve our OperatorsMetricRegistry to register multiple Metric classes for an operator. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r183145138 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -157,14 +172,20 @@ public int metricId() { } } + public class HashJoinMemoryManager extends JoinBatchMemoryManager { --- End diff -- Not required you can directly use `JoinBatchMemoryManager`. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r183112258 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -147,7 +150,19 @@ NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, -RESIZING_TIME_MS; +RESIZING_TIME_MS, +LEFT_INPUT_BATCH_COUNT, +LEFT_AVG_INPUT_BATCH_BYTES, +LEFT_AVG_INPUT_ROW_BYTES, +LEFT_INPUT_RECORD_COUNT, +RIGHT_INPUT_BATCH_COUNT, +RIGHT_AVG_INPUT_BATCH_BYTES, +RIGHT_AVG_INPUT_ROW_BYTES, +RIGHT_INPUT_RECORD_COUNT, +OUTPUT_BATCH_COUNT, +AVG_OUTPUT_BATCH_BYTES, +AVG_OUTPUT_ROW_BYTES, +OUTPUT_RECORD_COUNT; --- End diff -- It is relevant in the sense that they provide high level picture of amount of data being processed, memory usage etc. by each operator. This is also helpful when debugging trying to figure out what is going on. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r183108078 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -300,13 +322,14 @@ public void setupHashTable() throws IOException, SchemaChangeException, ClassTra public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { //Setup the underlying hash table - // skip first batch if count is zero, as it may be an empty schema batch if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() == 0) { for (final VectorWrapper w : right) { w.clear(); } rightUpstream = next(right); + // For build side, use aggregate i.e. average row width across batches + batchMemoryManager.update(RIGHT_INDEX, 0,true); --- End diff -- There is a call to "next" right above the update. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r182929294 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java --- @@ -188,12 +196,18 @@ public int getOutgoingRowWidth() { public void setRecordBatchSizer(int index, RecordBatchSizer sizer) { Preconditions.checkArgument(index >= 0 && index < numInputs); this.sizer[index] = sizer; -inputBatchStats[index] = new BatchStats(); +if (inputBatchStats[index] == null) { + inputBatchStats[index] = new BatchStats(); +} +updateIncomingStats(index); } public void setRecordBatchSizer(RecordBatchSizer sizer) { --- End diff -- Can instead just call the above method with DEFAULT_INPUT_INDEX as the first parameter. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r182929487 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java --- @@ -147,6 +149,12 @@ public int update(int inputIndex, int outputPosition) { return getOutputRowCount(); } + public int update(int inputIndex, int outputPosition, boolean useAggregate) { +// by default just return the outputRowCount +return update(inputIndex, outputPosition, false); --- End diff -- Is this an infinite recursive call ?? ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r182910955 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -346,6 +369,7 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio } // Fall through case OK: +batchMemoryManager.update(LEFT_INDEX); --- End diff -- Should it be the RIGHT_INDEX here ? ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r182911591 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java --- @@ -241,4 +261,41 @@ public int getOutputBatchSize() { public int getOffsetVectorWidth() { return UInt4Vector.VALUE_WIDTH; } + + public void allocateVectors(VectorContainer container) { --- End diff -- Cleaner implementation: Just call the following method, with outputRowCount as the second parameter. ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r182919074 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java --- @@ -241,4 +261,41 @@ public int getOutputBatchSize() { public int getOffsetVectorWidth() { return UInt4Vector.VALUE_WIDTH; } + + public void allocateVectors(VectorContainer container) { +// Allocate memory for the vectors. +// This will iteratively allocate memory for all nested columns underneath. +for (VectorWrapper w : container) { + RecordBatchSizer.ColumnSize colSize = getColumnSize(w.getField().getName()); + colSize.allocateVector(w.getValueVector(), outputRowCount); +} + } + + public void allocateVectors(VectorContainer container, int recordCount) { +// Allocate memory for the vectors. +// This will iteratively allocate memory for all nested columns underneath. +for (VectorWrapper w : container) { + RecordBatchSizer.ColumnSize colSize = getColumnSize(w.getField().getName()); + colSize.allocateVector(w.getValueVector(), recordCount); +} + } + + public void allocateVectors(List valueVectors) { +// Allocate memory for the vectors. --- End diff -- Same idea/comment as above; can avoid some duplicate code by calling allocateVectors(valueVectors, outputRecordCount) ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r182910786 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -300,13 +322,14 @@ public void setupHashTable() throws IOException, SchemaChangeException, ClassTra public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { //Setup the underlying hash table - // skip first batch if count is zero, as it may be an empty schema batch if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() == 0) { for (final VectorWrapper w : right) { w.clear(); } rightUpstream = next(right); + // For build side, use aggregate i.e. average row width across batches + batchMemoryManager.update(RIGHT_INDEX, 0,true); --- End diff -- Why is update() being called when the right has zero rows ? Shouldn't it be called for every new right incoming batch ? ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/1227#discussion_r182898395 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java --- @@ -147,7 +150,19 @@ NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, -RESIZING_TIME_MS; +RESIZING_TIME_MS, +LEFT_INPUT_BATCH_COUNT, +LEFT_AVG_INPUT_BATCH_BYTES, +LEFT_AVG_INPUT_ROW_BYTES, +LEFT_INPUT_RECORD_COUNT, +RIGHT_INPUT_BATCH_COUNT, +RIGHT_AVG_INPUT_BATCH_BYTES, +RIGHT_AVG_INPUT_ROW_BYTES, +RIGHT_INPUT_RECORD_COUNT, +OUTPUT_BATCH_COUNT, +AVG_OUTPUT_BATCH_BYTES, +AVG_OUTPUT_ROW_BYTES, +OUTPUT_RECORD_COUNT; --- End diff -- The metrics are to be used also by customers; is this information relevant for them ? Is this too detailed (e.g., can be logged instead). ---
[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join
GitHub user ppadma opened a pull request: https://github.com/apache/drill/pull/1227 Drill 6236: batch sizing for hash join You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppadma/drill DRILL-6236 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1227.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1227 commit f41aadab1cdf61bb1818724597aaa7726000af09 Author: Padma PenumarthyDate: 2018-04-07T03:53:26Z DRILL-6236: batch sizing for hash join commit e2ddf8761e006a16ed7b6ceea3c2fa849cc6a6e5 Author: Padma Penumarthy Date: 2018-04-19T05:23:54Z DRILL-6343: bit vector copyFromSafe is not doing realloc ---