DRILL-6284: Add operator metrics for batch sizing for flatten

Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/77f5e901
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/77f5e901
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/77f5e901

Branch: refs/heads/master
Commit: 77f5e901370c4c23195ac226d140e8f3eca5f71d
Parents: 4f2182e
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Authored: Tue Mar 20 13:44:50 2018 -0700
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Fri Apr 6 12:06:49 2018 +0300

----------------------------------------------------------------------
 .../drill/exec/ops/OperatorMetricRegistry.java  |  2 +
 .../drill/exec/physical/config/FlattenPOP.java  |  4 +-
 .../impl/flatten/FlattenRecordBatch.java        | 42 ++++++++++++++++
 .../AbstractRecordBatchMemoryManager.java       | 53 ++++++++++++++++++++
 .../apache/drill/exec/proto/UserBitShared.java  | 20 ++++++--
 .../exec/proto/beans/CoreOperatorType.java      |  4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |  1 +
 7 files changed, 118 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index 0424332..b029154 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.impl.ScreenCreator;
 import org.apache.drill.exec.physical.impl.SingleSenderCreator;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
 import 
org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec;
+import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
@@ -49,6 +50,7 @@ public class OperatorMetricRegistry {
     register(CoreOperatorType.HASH_JOIN_VALUE, HashJoinBatch.Metric.class);
     register(CoreOperatorType.EXTERNAL_SORT_VALUE, 
ExternalSortBatch.Metric.class);
     register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, 
ParquetRecordReader.Metric.class);
+    register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class);
   }
 
   private static void register(final int operatorType, final Class<? extends 
MetricDef> metricDef) {

http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
index 42e6870..f3499bf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared;
 
 import java.util.Iterator;
 import java.util.List;
@@ -67,7 +68,6 @@ public class FlattenPOP extends AbstractSingle {
 
   @Override
   public int getOperatorType() {
-    // TODO - add this operator to the protobuf definition
-    return 0;
+    return UserBitShared.CoreOperatorType.FLATTEN_VALUE;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 7509809..a1f783f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -99,6 +100,22 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     }
   }
 
+  public enum Metric implements MetricDef {
+    INPUT_BATCH_COUNT,
+    AVG_INPUT_BATCH_BYTES,
+    AVG_INPUT_ROW_BYTES,
+    TOTAL_INPUT_RECORDS,
+    OUTPUT_BATCH_COUNT,
+    AVG_OUTPUT_BATCH_BYTES,
+    AVG_OUTPUT_ROW_BYTES,
+    TOTAL_OUTPUT_RECORDS;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+
   private class FlattenMemoryManager extends AbstractRecordBatchMemoryManager {
 
     @Override
@@ -129,6 +146,8 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       // Number of rows in outgoing batch
       setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
 
+      setOutgoingRowWidth(avgOutgoingRowWidth);
+
       // Limit to lower bound of total number of rows possible for this batch
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
@@ -136,6 +155,8 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
         "avgOutgoingRowWidth : {}, outputRowCount : {}", 
getRecordBatchSizer(), outputBatchSize,
         avgOutgoingRowWidth, getOutputRowCount());
+
+      updateIncomingStats();
     }
   }
 
@@ -232,6 +253,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       container.buildSchema(SelectionVectorMode.NONE);
     }
 
+    flattenMemoryManager.updateOutgoingStats(outputRecords);
     return IterOutcome.OK;
   }
 
@@ -265,6 +287,9 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
+
+    flattenMemoryManager.updateOutgoingStats(projRecords);
+
   }
 
   public void addComplexWriter(ComplexWriter writer) {
@@ -466,4 +491,21 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     }
     return exprs;
   }
+
+  private void updateStats() {
+    stats.setLongStat(Metric.INPUT_BATCH_COUNT, 
flattenMemoryManager.getNumIncomingBatches());
+    stats.setLongStat(Metric.AVG_INPUT_BATCH_BYTES, 
flattenMemoryManager.getAvgInputBatchSize());
+    stats.setLongStat(Metric.AVG_INPUT_ROW_BYTES, 
flattenMemoryManager.getAvgInputRowWidth());
+    stats.setLongStat(Metric.TOTAL_INPUT_RECORDS, 
flattenMemoryManager.getTotalInputRecords());
+    stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, 
flattenMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, 
flattenMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, 
flattenMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(Metric.TOTAL_OUTPUT_RECORDS, 
flattenMemoryManager.getTotalOutputRecords());
+  }
+
+  @Override
+  public void close() {
+    updateStats();
+    super.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
index 1abd365..67c9cee 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
@@ -29,6 +29,48 @@ public abstract class AbstractRecordBatchMemoryManager {
   private int outgoingRowWidth;
   private RecordBatchSizer sizer;
 
+  /**
+   * operator metric stats
+   */
+  private long numIncomingBatches;
+  private long sumInputBatchSizes;
+  private long totalInputRecords;
+  private long numOutgoingBatches;
+  private long sumOutputBatchSizes;
+  private long totalOutputRecords;
+
+  public long getNumIncomingBatches() {
+    return numIncomingBatches;
+  }
+
+  public long getTotalInputRecords() {
+    return totalInputRecords;
+  }
+
+  public long getNumOutgoingBatches() {
+    return numOutgoingBatches;
+  }
+
+  public long getTotalOutputRecords() {
+    return totalOutputRecords;
+  }
+
+  public long getAvgInputBatchSize() {
+    return RecordBatchSizer.safeDivide(sumInputBatchSizes, numIncomingBatches);
+  }
+
+  public long getAvgInputRowWidth() {
+    return RecordBatchSizer.safeDivide(sumInputBatchSizes, totalInputRecords);
+  }
+
+  public long getAvgOutputBatchSize() {
+    return RecordBatchSizer.safeDivide(sumOutputBatchSizes, 
numOutgoingBatches);
+  }
+
+  public long getAvgOutputRowWidth() {
+    return RecordBatchSizer.safeDivide(sumOutputBatchSizes, 
totalOutputRecords);
+  }
+
   public void update(int inputIndex) {};
 
   public void update() {};
@@ -78,4 +120,15 @@ public abstract class AbstractRecordBatchMemoryManager {
     return sizer.getColumn(name);
   }
 
+  public void updateIncomingStats() {
+    numIncomingBatches++;
+    sumInputBatchSizes += sizer.netSize();
+    totalInputRecords += sizer.rowCount();
+  }
+
+  public void updateOutgoingStats(int outputRecords) {
+    numOutgoingBatches++;
+    totalOutputRecords += outputRecords;
+    sumOutputBatchSizes += outgoingRowWidth * outputRecords;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 6ae9deb..b2cc57d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -525,6 +525,10 @@ public final class UserBitShared {
      * <code>KUDU_SUB_SCAN = 39;</code>
      */
     KUDU_SUB_SCAN(39, 39),
+    /**
+     * <code>FLATTEN = 40;</code>
+     */
+    FLATTEN(40, 40),
     ;
 
     /**
@@ -687,6 +691,10 @@ public final class UserBitShared {
      * <code>KUDU_SUB_SCAN = 39;</code>
      */
     public static final int KUDU_SUB_SCAN_VALUE = 39;
+    /**
+     * <code>FLATTEN = 40;</code>
+     */
+    public static final int FLATTEN_VALUE = 40;
 
 
     public final int getNumber() { return value; }
@@ -733,6 +741,7 @@ public final class UserBitShared {
         case 37: return PCAP_SUB_SCAN;
         case 38: return KAFKA_SUB_SCAN;
         case 39: return KUDU_SUB_SCAN;
+        case 40: return FLATTEN;
         default: return null;
       }
     }
@@ -24113,7 +24122,7 @@ public final class UserBitShared {
       "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" +
       
"OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t"
 +
       "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" 
+
-      "REQUESTED\020\006*\227\006\n\020CoreOperatorType\022\021\n\rSING" +
+      "REQUESTED\020\006*\244\006\n\020CoreOperatorType\022\021\n\rSING" +
       "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" 
+
       
"TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" +
       "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" +
@@ -24133,10 +24142,11 @@ public final class UserBitShared {
       "ASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOO" +
       "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_" +
       "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S" +
-      "CAN\020\'*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n" +
-      
"\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014S" +
-      "ASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.ap" +
-      "ache.drill.exec.protoB\rUserBitSharedH\001"
+      "CAN\020\'\022\013\n\007FLATTEN\020(*g\n\nSaslStatus\022\020\n\014SASL" +
+      "_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PR" 
+
+      
"OGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILE" +
+      "D\020\004B.\n\033org.apache.drill.exec.protoB\rUser" +
+      "BitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 71595f7..dc3f158 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -61,7 +61,8 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     AVRO_SUB_SCAN(36),
     PCAP_SUB_SCAN(37),
     KAFKA_SUB_SCAN(38),
-    KUDU_SUB_SCAN(39);
+    KUDU_SUB_SCAN(39),
+    FLATTEN(40);
     
     public final int number;
     
@@ -119,6 +120,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 37: return PCAP_SUB_SCAN;
             case 38: return KAFKA_SUB_SCAN;
             case 39: return KUDU_SUB_SCAN;
+            case 40: return FLATTEN;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index b13f059..d4c401d 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -327,6 +327,7 @@ enum CoreOperatorType {
   PCAP_SUB_SCAN = 37;
   KAFKA_SUB_SCAN = 38;
   KUDU_SUB_SCAN = 39;
+  FLATTEN = 40;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of 
function signatures.

Reply via email to