[GitHub] drill pull request #1227: Drill-6236: batch sizing for hash join

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r183181073
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -560,6 +554,40 @@ public void close() {
 super.close();
   }
 
+  @Override
+  protected void updateBatchMemoryManagerStats() {
+stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
--- End diff --

Should be fine.


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-20 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r183171726
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -560,6 +554,40 @@ public void close() {
 super.close();
   }
 
+  @Override
+  protected void updateBatchMemoryManagerStats() {
+stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
--- End diff --

@sohami I will create a JIRA and address that in a separate PR. For now, I 
would like to override this method. is that ok ?


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r183169484
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -560,6 +554,40 @@ public void close() {
 super.close();
   }
 
+  @Override
+  protected void updateBatchMemoryManagerStats() {
+stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, 
batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
--- End diff --

@ppadma - The main motive of moving the metrics inside 
JoinBatchMemoryManager was to avoid duplicate definition in all the 
BinaryRecordBatches. I think we should improve on OperatorMetricsRegistry 
rather than just overriding this method.
Or if you want you can create a new JIRA to track metrics related 
improvement.


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r183145171
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -147,7 +150,19 @@
 NUM_BUCKETS,
 NUM_ENTRIES,
 NUM_RESIZING,
-RESIZING_TIME_MS;
+RESIZING_TIME_MS,
+LEFT_INPUT_BATCH_COUNT,
+LEFT_AVG_INPUT_BATCH_BYTES,
+LEFT_AVG_INPUT_ROW_BYTES,
+LEFT_INPUT_RECORD_COUNT,
+RIGHT_INPUT_BATCH_COUNT,
+RIGHT_AVG_INPUT_BATCH_BYTES,
+RIGHT_AVG_INPUT_ROW_BYTES,
+RIGHT_INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
+AVG_OUTPUT_ROW_BYTES,
+OUTPUT_RECORD_COUNT;
--- End diff --

Putting these metrics inside operator Metric class will not work. For joins 
these metrics were moved inside `JoinBatchMemoryManager.Metric` class since 
they are memory manager metrics. So when you call 
`updateBatchMemoryManagerStats()` it updates the operator stats but using 
ordinals from `JoinBatchMemoryManager.Metric` class. So the ordinal for 
LEFT_INPUT_BATCH_COUNT will be 0 not 4 (which is required).
I think we should improve our OperatorsMetricRegistry to register multiple 
Metric classes for an operator.


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-20 Thread sohami
Github user sohami commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r183145138
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -157,14 +172,20 @@ public int metricId() {
 }
   }
 
+  public class HashJoinMemoryManager extends JoinBatchMemoryManager {
--- End diff --

Not required you can directly use `JoinBatchMemoryManager`.


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-20 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r183112258
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -147,7 +150,19 @@
 NUM_BUCKETS,
 NUM_ENTRIES,
 NUM_RESIZING,
-RESIZING_TIME_MS;
+RESIZING_TIME_MS,
+LEFT_INPUT_BATCH_COUNT,
+LEFT_AVG_INPUT_BATCH_BYTES,
+LEFT_AVG_INPUT_ROW_BYTES,
+LEFT_INPUT_RECORD_COUNT,
+RIGHT_INPUT_BATCH_COUNT,
+RIGHT_AVG_INPUT_BATCH_BYTES,
+RIGHT_AVG_INPUT_ROW_BYTES,
+RIGHT_INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
+AVG_OUTPUT_ROW_BYTES,
+OUTPUT_RECORD_COUNT;
--- End diff --

It is relevant in the sense that they provide high level picture of amount 
of data being processed, memory usage etc. by each operator. This is also 
helpful when debugging trying to figure out what is going on. 


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-20 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r183108078
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -300,13 +322,14 @@ public void setupHashTable() throws IOException, 
SchemaChangeException, ClassTra
 
   public void executeBuildPhase() throws SchemaChangeException, 
ClassTransformationException, IOException {
 //Setup the underlying hash table
-
 // skip first batch if count is zero, as it may be an empty schema 
batch
 if (isFurtherProcessingRequired(rightUpstream) && 
right.getRecordCount() == 0) {
   for (final VectorWrapper w : right) {
 w.clear();
   }
   rightUpstream = next(right);
+  // For build side, use aggregate i.e. average row width across 
batches
+  batchMemoryManager.update(RIGHT_INDEX, 0,true);
--- End diff --

There is a call to "next" right above the update. 


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r182929294
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 ---
@@ -188,12 +196,18 @@ public int getOutgoingRowWidth() {
   public void setRecordBatchSizer(int index, RecordBatchSizer sizer) {
 Preconditions.checkArgument(index >= 0 && index < numInputs);
 this.sizer[index] = sizer;
-inputBatchStats[index] = new BatchStats();
+if (inputBatchStats[index] == null) {
+  inputBatchStats[index] = new BatchStats();
+}
+updateIncomingStats(index);
   }
 
   public void setRecordBatchSizer(RecordBatchSizer sizer) {
--- End diff --

Can instead just call the above method with DEFAULT_INPUT_INDEX as the 
first parameter. 


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r182929487
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 ---
@@ -147,6 +149,12 @@ public int update(int inputIndex, int outputPosition) {
 return getOutputRowCount();
   }
 
+  public int update(int inputIndex, int outputPosition, boolean 
useAggregate) {
+// by default just return the outputRowCount
+return update(inputIndex, outputPosition, false);
--- End diff --

Is this an infinite recursive call ??



---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r182910955
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -346,6 +369,7 @@ public void executeBuildPhase() throws 
SchemaChangeException, ClassTransformatio
 }
 // Fall through
   case OK:
+batchMemoryManager.update(LEFT_INDEX);
--- End diff --

Should it be the RIGHT_INDEX here ?



---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r182911591
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 ---
@@ -241,4 +261,41 @@ public int getOutputBatchSize() {
   public int getOffsetVectorWidth() {
 return UInt4Vector.VALUE_WIDTH;
   }
+
+  public void allocateVectors(VectorContainer container) {
--- End diff --

Cleaner implementation: Just call the following method, with outputRowCount 
as the second parameter.



---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r182919074
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
 ---
@@ -241,4 +261,41 @@ public int getOutputBatchSize() {
   public int getOffsetVectorWidth() {
 return UInt4Vector.VALUE_WIDTH;
   }
+
+  public void allocateVectors(VectorContainer container) {
+// Allocate memory for the vectors.
+// This will iteratively allocate memory for all nested columns 
underneath.
+for (VectorWrapper w : container) {
+  RecordBatchSizer.ColumnSize colSize = 
getColumnSize(w.getField().getName());
+  colSize.allocateVector(w.getValueVector(), outputRowCount);
+}
+  }
+
+  public void allocateVectors(VectorContainer container, int recordCount) {
+// Allocate memory for the vectors.
+// This will iteratively allocate memory for all nested columns 
underneath.
+for (VectorWrapper w : container) {
+  RecordBatchSizer.ColumnSize colSize = 
getColumnSize(w.getField().getName());
+  colSize.allocateVector(w.getValueVector(), recordCount);
+}
+  }
+
+  public void allocateVectors(List valueVectors) {
+// Allocate memory for the vectors.
--- End diff --

Same idea/comment as above; can avoid some duplicate code by calling 
allocateVectors(valueVectors, outputRecordCount) 
 



---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r182910786
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -300,13 +322,14 @@ public void setupHashTable() throws IOException, 
SchemaChangeException, ClassTra
 
   public void executeBuildPhase() throws SchemaChangeException, 
ClassTransformationException, IOException {
 //Setup the underlying hash table
-
 // skip first batch if count is zero, as it may be an empty schema 
batch
 if (isFurtherProcessingRequired(rightUpstream) && 
right.getRecordCount() == 0) {
   for (final VectorWrapper w : right) {
 w.clear();
   }
   rightUpstream = next(right);
+  // For build side, use aggregate i.e. average row width across 
batches
+  batchMemoryManager.update(RIGHT_INDEX, 0,true);
--- End diff --

Why is update() being called when the right has zero rows ?  Shouldn't it 
be called for every new right incoming batch ?


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread Ben-Zvi
Github user Ben-Zvi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1227#discussion_r182898395
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ---
@@ -147,7 +150,19 @@
 NUM_BUCKETS,
 NUM_ENTRIES,
 NUM_RESIZING,
-RESIZING_TIME_MS;
+RESIZING_TIME_MS,
+LEFT_INPUT_BATCH_COUNT,
+LEFT_AVG_INPUT_BATCH_BYTES,
+LEFT_AVG_INPUT_ROW_BYTES,
+LEFT_INPUT_RECORD_COUNT,
+RIGHT_INPUT_BATCH_COUNT,
+RIGHT_AVG_INPUT_BATCH_BYTES,
+RIGHT_AVG_INPUT_ROW_BYTES,
+RIGHT_INPUT_RECORD_COUNT,
+OUTPUT_BATCH_COUNT,
+AVG_OUTPUT_BATCH_BYTES,
+AVG_OUTPUT_ROW_BYTES,
+OUTPUT_RECORD_COUNT;
--- End diff --

The metrics are to be used also by customers; is this information relevant 
for them ? Is this too detailed (e.g., can be logged instead).
 


---


[GitHub] drill pull request #1227: Drill 6236: batch sizing for hash join

2018-04-19 Thread ppadma
GitHub user ppadma opened a pull request:

https://github.com/apache/drill/pull/1227

Drill 6236: batch sizing for hash join



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppadma/drill DRILL-6236

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1227


commit f41aadab1cdf61bb1818724597aaa7726000af09
Author: Padma Penumarthy 
Date:   2018-04-07T03:53:26Z

DRILL-6236: batch sizing for hash join

commit e2ddf8761e006a16ed7b6ceea3c2fa849cc6a6e5
Author: Padma Penumarthy 
Date:   2018-04-19T05:23:54Z

DRILL-6343: bit vector copyFromSafe is not doing realloc




---