DRILL-6284: Add operator metrics for batch sizing for flatten
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/77f5e901 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/77f5e901 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/77f5e901 Branch: refs/heads/master Commit: 77f5e901370c4c23195ac226d140e8f3eca5f71d Parents: 4f2182e Author: Padma Penumarthy <ppenuma...@yahoo.com> Authored: Tue Mar 20 13:44:50 2018 -0700 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Fri Apr 6 12:06:49 2018 +0300 ---------------------------------------------------------------------- .../drill/exec/ops/OperatorMetricRegistry.java | 2 + .../drill/exec/physical/config/FlattenPOP.java | 4 +- .../impl/flatten/FlattenRecordBatch.java | 42 ++++++++++++++++ .../AbstractRecordBatchMemoryManager.java | 53 ++++++++++++++++++++ .../apache/drill/exec/proto/UserBitShared.java | 20 ++++++-- .../exec/proto/beans/CoreOperatorType.java | 4 +- protocol/src/main/protobuf/UserBitShared.proto | 1 + 7 files changed, 118 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java index 0424332..b029154 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.impl.ScreenCreator; import org.apache.drill.exec.physical.impl.SingleSenderCreator; import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate; import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec; +import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch; import org.apache.drill.exec.physical.impl.join.HashJoinBatch; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; @@ -49,6 +50,7 @@ public class OperatorMetricRegistry { register(CoreOperatorType.HASH_JOIN_VALUE, HashJoinBatch.Metric.class); register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class); register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class); + register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class); } private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) { http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java index 42e6870..f3499bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared; import java.util.Iterator; import java.util.List; @@ -67,7 +68,6 @@ public class FlattenPOP extends AbstractSingle { @Override public int getOperatorType() { - // TODO - add this operator to the protobuf definition - return 0; + return UserBitShared.CoreOperatorType.FLATTEN_VALUE; } } http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 7509809..a1f783f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.AbstractSingleRecordBatch; @@ -99,6 +100,22 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } } + public enum Metric implements MetricDef { + INPUT_BATCH_COUNT, + AVG_INPUT_BATCH_BYTES, + AVG_INPUT_ROW_BYTES, + TOTAL_INPUT_RECORDS, + OUTPUT_BATCH_COUNT, + AVG_OUTPUT_BATCH_BYTES, + AVG_OUTPUT_ROW_BYTES, + TOTAL_OUTPUT_RECORDS; + + @Override + public int metricId() { + return ordinal(); + } + } + private class FlattenMemoryManager extends AbstractRecordBatchMemoryManager { @Override @@ -129,6 +146,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // Number of rows in outgoing batch setOutputRowCount(outputBatchSize, avgOutgoingRowWidth); + setOutgoingRowWidth(avgOutgoingRowWidth); + // Limit to lower bound of total number of rows possible for this batch // i.e. all rows fit within memory budget. setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); @@ -136,6 +155,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," + "avgOutgoingRowWidth : {}, outputRowCount : {}", getRecordBatchSizer(), outputBatchSize, avgOutgoingRowWidth, getOutputRowCount()); + + updateIncomingStats(); } } @@ -232,6 +253,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { container.buildSchema(SelectionVectorMode.NONE); } + flattenMemoryManager.updateOutgoingStats(outputRecords); return IterOutcome.OK; } @@ -265,6 +287,9 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { if (complexWriters != null) { container.buildSchema(SelectionVectorMode.NONE); } + + flattenMemoryManager.updateOutgoingStats(projRecords); + } public void addComplexWriter(ComplexWriter writer) { @@ -466,4 +491,21 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } return exprs; } + + private void updateStats() { + stats.setLongStat(Metric.INPUT_BATCH_COUNT, flattenMemoryManager.getNumIncomingBatches()); + stats.setLongStat(Metric.AVG_INPUT_BATCH_BYTES, flattenMemoryManager.getAvgInputBatchSize()); + stats.setLongStat(Metric.AVG_INPUT_ROW_BYTES, flattenMemoryManager.getAvgInputRowWidth()); + stats.setLongStat(Metric.TOTAL_INPUT_RECORDS, flattenMemoryManager.getTotalInputRecords()); + stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, flattenMemoryManager.getNumOutgoingBatches()); + stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, flattenMemoryManager.getAvgOutputBatchSize()); + stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth()); + stats.setLongStat(Metric.TOTAL_OUTPUT_RECORDS, flattenMemoryManager.getTotalOutputRecords()); + } + + @Override + public void close() { + updateStats(); + super.close(); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java index 1abd365..67c9cee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java @@ -29,6 +29,48 @@ public abstract class AbstractRecordBatchMemoryManager { private int outgoingRowWidth; private RecordBatchSizer sizer; + /** + * operator metric stats + */ + private long numIncomingBatches; + private long sumInputBatchSizes; + private long totalInputRecords; + private long numOutgoingBatches; + private long sumOutputBatchSizes; + private long totalOutputRecords; + + public long getNumIncomingBatches() { + return numIncomingBatches; + } + + public long getTotalInputRecords() { + return totalInputRecords; + } + + public long getNumOutgoingBatches() { + return numOutgoingBatches; + } + + public long getTotalOutputRecords() { + return totalOutputRecords; + } + + public long getAvgInputBatchSize() { + return RecordBatchSizer.safeDivide(sumInputBatchSizes, numIncomingBatches); + } + + public long getAvgInputRowWidth() { + return RecordBatchSizer.safeDivide(sumInputBatchSizes, totalInputRecords); + } + + public long getAvgOutputBatchSize() { + return RecordBatchSizer.safeDivide(sumOutputBatchSizes, numOutgoingBatches); + } + + public long getAvgOutputRowWidth() { + return RecordBatchSizer.safeDivide(sumOutputBatchSizes, totalOutputRecords); + } + public void update(int inputIndex) {}; public void update() {}; @@ -78,4 +120,15 @@ public abstract class AbstractRecordBatchMemoryManager { return sizer.getColumn(name); } + public void updateIncomingStats() { + numIncomingBatches++; + sumInputBatchSizes += sizer.netSize(); + totalInputRecords += sizer.rowCount(); + } + + public void updateOutgoingStats(int outputRecords) { + numOutgoingBatches++; + totalOutputRecords += outputRecords; + sumOutputBatchSizes += outgoingRowWidth * outputRecords; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 6ae9deb..b2cc57d 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -525,6 +525,10 @@ public final class UserBitShared { * <code>KUDU_SUB_SCAN = 39;</code> */ KUDU_SUB_SCAN(39, 39), + /** + * <code>FLATTEN = 40;</code> + */ + FLATTEN(40, 40), ; /** @@ -687,6 +691,10 @@ public final class UserBitShared { * <code>KUDU_SUB_SCAN = 39;</code> */ public static final int KUDU_SUB_SCAN_VALUE = 39; + /** + * <code>FLATTEN = 40;</code> + */ + public static final int FLATTEN_VALUE = 40; public final int getNumber() { return value; } @@ -733,6 +741,7 @@ public final class UserBitShared { case 37: return PCAP_SUB_SCAN; case 38: return KAFKA_SUB_SCAN; case 39: return KUDU_SUB_SCAN; + case 40: return FLATTEN; default: return null; } } @@ -24113,7 +24122,7 @@ public final class UserBitShared { "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" + "OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t" + "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" + - "REQUESTED\020\006*\227\006\n\020CoreOperatorType\022\021\n\rSING" + + "REQUESTED\020\006*\244\006\n\020CoreOperatorType\022\021\n\rSING" + "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" + "TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" + "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" + @@ -24133,10 +24142,11 @@ public final class UserBitShared { "ASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOO" + "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_" + "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S" + - "CAN\020\'*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n" + - "\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014S" + - "ASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.ap" + - "ache.drill.exec.protoB\rUserBitSharedH\001" + "CAN\020\'\022\013\n\007FLATTEN\020(*g\n\nSaslStatus\022\020\n\014SASL" + + "_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PR" + + "OGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILE" + + "D\020\004B.\n\033org.apache.drill.exec.protoB\rUser" + + "BitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 71595f7..dc3f158 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -61,7 +61,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO AVRO_SUB_SCAN(36), PCAP_SUB_SCAN(37), KAFKA_SUB_SCAN(38), - KUDU_SUB_SCAN(39); + KUDU_SUB_SCAN(39), + FLATTEN(40); public final int number; @@ -119,6 +120,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 37: return PCAP_SUB_SCAN; case 38: return KAFKA_SUB_SCAN; case 39: return KUDU_SUB_SCAN; + case 40: return FLATTEN; default: return null; } } http://git-wip-us.apache.org/repos/asf/drill/blob/77f5e901/protocol/src/main/protobuf/UserBitShared.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index b13f059..d4c401d 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -327,6 +327,7 @@ enum CoreOperatorType { PCAP_SUB_SCAN = 37; KAFKA_SUB_SCAN = 38; KUDU_SUB_SCAN = 39; + FLATTEN = 40; } /* Registry that contains list of jars, each jar contains its name and list of function signatures.