Repository: drill Updated Branches: refs/heads/master 54df129ca -> 49d316a1c
DRILL-2210 Introducing multithreading capability to PartitonerSender Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/49d316a1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/49d316a1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/49d316a1 Branch: refs/heads/master Commit: 49d316a1cb22f79061e246b5e197547dac730232 Parents: 54df129 Author: Yuliya Feldman <yfeld...@maprtech.com> Authored: Tue Feb 17 00:09:07 2015 -0800 Committer: vkorukanti <venki.koruka...@gmail.com> Committed: Wed Mar 18 17:40:37 2015 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/compile/CodeCompiler.java | 13 +- .../apache/drill/exec/ops/FragmentContext.java | 8 + .../apache/drill/exec/ops/OperatorStats.java | 81 ++++ .../exec/physical/config/IteratorValidator.java | 2 +- .../exec/physical/impl/SendingAccountor.java | 9 +- .../PartitionSenderRootExec.java | 91 ++++- .../impl/partitionsender/Partitioner.java | 11 +- .../partitionsender/PartitionerDecorator.java | 282 +++++++++++++ .../partitionsender/PartitionerTemplate.java | 54 ++- .../exec/planner/fragment/Materializer.java | 3 + .../exec/planner/physical/PlannerSettings.java | 3 + .../drill/exec/server/DrillbitContext.java | 10 +- .../server/options/SystemOptionManager.java | 3 + .../org/apache/drill/exec/work/WorkManager.java | 8 +- .../exec/physical/impl/TestOptiqPlans.java | 2 +- .../partitionsender/TestPartitionSender.java | 392 +++++++++++++++++++ 16 files changed, 938 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java index 57a6660..f0147ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.compile; import java.io.IOException; +import java.util.List; import java.util.concurrent.ExecutionException; import org.apache.drill.common.config.DrillConfig; @@ -32,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; public class CodeCompiler { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodeCompiler.class); @@ -53,10 +55,19 @@ public class CodeCompiler { @SuppressWarnings("unchecked") public <T> T getImplementationClass(final CodeGenerator<?> cg) throws ClassTransformationException, IOException { + return (T) getImplementationClass(cg, 1).get(0); + } + + @SuppressWarnings("unchecked") + public <T> List<T> getImplementationClass(final CodeGenerator<?> cg, int instanceNumber) throws ClassTransformationException, IOException { cg.generate(); try { final GeneratedClassEntry ce = cache.get(cg); - return (T) ce.clazz.newInstance(); + List<T> tList = Lists.newArrayList(); + for ( int i = 0; i < instanceNumber; i++) { + tList.add((T) ce.clazz.newInstance()); + } + return tList; } catch (ExecutionException | InstantiationException | IllegalAccessException e) { throw new ClassTransformationException(e); } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 9fc9ad1..2a6660e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -222,6 +222,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { return context.getCompiler().getImplementationClass(cg); } + public <T> List<T> getImplementationClass(ClassGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException { + return getImplementationClass(cg.getCodeGenerator(), instanceCount); + } + + public <T> List<T> getImplementationClass(CodeGenerator<T> cg, int instanceCount) throws ClassTransformationException, IOException { + return context.getCompiler().getImplementationClass(cg, instanceCount); + } + /** * Get the user connection associated with this fragment. This return null unless this is a root fragment. * @return The RPC connection to the query submitter. http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 0e9da0e..3f671e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.ops; +import java.util.Iterator; + import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.MetricValue; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; @@ -24,6 +26,8 @@ import org.apache.drill.exec.proto.UserBitShared.StreamProfile; import com.carrotsearch.hppc.IntDoubleOpenHashMap; import com.carrotsearch.hppc.IntLongOpenHashMap; +import com.carrotsearch.hppc.cursors.IntDoubleCursor; +import com.carrotsearch.hppc.cursors.IntLongCursor; public class OperatorStats { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class); @@ -53,16 +57,39 @@ public class OperatorStats { private long waitMark; private long schemas; + private int inputCount; public OperatorStats(OpProfileDef def, BufferAllocator allocator){ this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator); } + /** + * Copy constructor to be able to create a copy of existing stats object shell and use it independently + * this is useful if stats have to be updated in different threads, since it is not really + * possible to update such stats as waitNanos, setupNanos and processingNanos across threads + * @param original - OperatorStats object to create a copy from + * @param isClean - flag to indicate whether to start with clean state indicators or inherit those from original object + */ + public OperatorStats(OperatorStats original, boolean isClean) { + this(original.operatorId, original.operatorType, original.inputCount, original.allocator); + + if ( !isClean ) { + inProcessing = original.inProcessing; + inSetup = original.inSetup; + inWait = original.inWait; + + processingMark = original.processingMark; + setupMark = original.setupMark; + waitMark = original.waitMark; + } + } + private OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { super(); this.allocator = allocator; this.operatorId = operatorId; this.operatorType = operatorType; + this.inputCount = inputCount; this.recordsReceivedByInput = new long[inputCount]; this.batchesReceivedByInput = new long[inputCount]; this.schemaCountByInput = new long[inputCount]; @@ -71,6 +98,44 @@ public class OperatorStats { private String assertionError(String msg){ return String.format("Failure while %s for operator id %d. Currently have states of processing:%s, setup:%s, waiting:%s.", msg, operatorId, inProcessing, inSetup, inWait); } + /** + * OperatorStats merger - to merge stats from other OperatorStats + * this is needed in case some processing is multithreaded that needs to have + * separate OperatorStats to deal with + * WARN - this will only work for metrics that can be added + * @param from - OperatorStats from where to merge to "this" + * @return OperatorStats - for convenience so one can merge multiple stats in one go + */ + public OperatorStats mergeMetrics(OperatorStats from) { + final IntLongOpenHashMap fromMetrics = from.longMetrics; + + final Iterator<IntLongCursor> iter = fromMetrics.iterator(); + while (iter.hasNext()) { + final IntLongCursor next = iter.next(); + longMetrics.putOrAdd(next.key, next.value, next.value); + } + + final IntDoubleOpenHashMap fromDMetrics = from.doubleMetrics; + final Iterator<IntDoubleCursor> iterD = fromDMetrics.iterator(); + + while (iterD.hasNext()) { + final IntDoubleCursor next = iterD.next(); + doubleMetrics.putOrAdd(next.key, next.value, next.value); + } + return this; + } + + /** + * Clear stats + */ + public void clear() { + processingNanos = 0l; + setupNanos = 0l; + waitNanos = 0l; + longMetrics.clear(); + doubleMetrics.clear(); + } + public void startSetup() { assert !inSetup : assertionError("starting setup"); stopProcessing(); @@ -183,4 +248,20 @@ public class OperatorStats { doubleMetrics.put(metric.metricId(), value); } + public long getWaitNanos() { + return waitNanos; + } + + /** + * Adjust waitNanos based on client calculations + * @param waitNanosOffset - could be negative as well as positive + */ + public void adjustWaitNanos(long waitNanosOffset) { + this.waitNanos += waitNanosOffset; + } + + public long getProcessingNanos() { + return processingNanos; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java index 64cf7c5..b8ecae4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java @@ -26,7 +26,7 @@ public class IteratorValidator extends AbstractSingle{ public IteratorValidator(PhysicalOperator child) { super(child); - + setCost(child.getCost()); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java index 7af7b65..3920f9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; /** * Account for whether all messages sent have been completed. Necessary before finishing a task so we don't think @@ -28,11 +29,11 @@ import java.util.concurrent.Semaphore; public class SendingAccountor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class); - private int batchesSent = 0; + private final AtomicInteger batchesSent = new AtomicInteger(0); private Semaphore wait = new Semaphore(0); public void increment() { - batchesSent++; + batchesSent.incrementAndGet(); } public void decrement() { @@ -41,8 +42,8 @@ public class SendingAccountor { public synchronized void waitForSendComplete() { try { - wait.acquire(batchesSent); - batchesSent = 0; + wait.acquire(batchesSent.get()); + batchesSent.set(0); } catch (InterruptedException e) { logger.warn("Failure while waiting for send complete.", e); } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index abf9cbc..6a73cdd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -34,10 +35,12 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -46,8 +49,10 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.CopyUtil; +import com.google.common.annotations.VisibleForTesting; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; import com.sun.codemodel.JType; @@ -57,13 +62,15 @@ public class PartitionSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); private RecordBatch incoming; private HashPartitionSender operator; - private Partitioner partitioner; + private PartitionerDecorator partitioner; + private FragmentContext context; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; + private final double cost; private final AtomicIntegerArray remainingReceivers; private final AtomicInteger remaingReceiverCount; @@ -72,6 +79,8 @@ public class PartitionSenderRootExec extends BaseRootExec { long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; + protected final int numberPartitions; + protected final int actualPartitions; public enum Metric implements MetricDef { BATCHES_SENT, @@ -79,7 +88,9 @@ public class PartitionSenderRootExec extends BaseRootExec { MIN_RECORDS, MAX_RECORDS, N_RECEIVERS, - BYTES_SENT; + BYTES_SENT, + SENDING_THREADS_COUNT, + COST; @Override public int metricId() { @@ -99,8 +110,32 @@ public class PartitionSenderRootExec extends BaseRootExec { this.statusHandler = new StatusHandler(sendCount, context); this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount); this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount); - stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount); + // Algorithm to figure out number of threads to parallelize output + // numberOfRows/sliceTarget/numReceivers/threadfactor + // threadFactor = 4 by default + // one more param to put a limit on number max number of threads: default 32 + this.cost = operator.getChild().getCost(); + final OptionManager optMgr = context.getOptions(); + long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val; + int threadFactor = optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue(); + int tmpParts = 1; + if ( sliceTarget != 0 && outGoingBatchCount != 0 ) { + tmpParts = (int) Math.round((((cost / (sliceTarget*1.0)) / (outGoingBatchCount*1.0)) / (threadFactor*1.0))); + if ( tmpParts < 1) { + tmpParts = 1; + } + } + final int imposedThreads = optMgr.getOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName()).num_val.intValue(); + if (imposedThreads > 0 ) { + this.numberPartitions = imposedThreads; + } else { + this.numberPartitions = Math.min(tmpParts, optMgr.getOption(PlannerSettings.PARTITION_SENDER_MAX_THREADS.getOptionName()).num_val.intValue()); + } + logger.info("Preliminary number of sending threads is: " + numberPartitions); + this.actualPartitions = outGoingBatchCount > numberPartitions ? numberPartitions : outGoingBatchCount; + this.stats.setLongStat(Metric.SENDING_THREADS_COUNT, actualPartitions); + this.stats.setDoubleStat(Metric.COST, this.cost); } @Override @@ -189,8 +224,28 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - private void createPartitioner() throws SchemaChangeException { + @VisibleForTesting + protected void createPartitioner() throws SchemaChangeException { + final int divisor = Math.max(1, outGoingBatchCount/actualPartitions); + final int longTail = outGoingBatchCount % actualPartitions; + + final List<Partitioner> subPartitioners = createClassInstances(actualPartitions); + int startIndex = 0; + int endIndex = 0; + for (int i = 0; i < actualPartitions; i++) { + startIndex = endIndex; + endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : outGoingBatchCount; + if ( i < longTail ) { + endIndex++; + } + final OperatorStats partitionStats = new OperatorStats(stats, true); + subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, sendCount, oContext, statusHandler, + startIndex, endIndex); + } + partitioner = new PartitionerDecorator(subPartitioners, stats, context); + } + private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException { // set up partitioning function final LogicalExpression expr = operator.getExpr(); final ErrorCollector collector = new ErrorCollectorImpl(); @@ -218,8 +273,8 @@ public class PartitionSenderRootExec extends BaseRootExec { try { // compile and setup generated code - partitioner = context.getImplementationClass(cg); - partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext, statusHandler); + List<Partitioner> subPartitioners = context.getImplementationClass(cg, actualPartitions); + return subPartitioners; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); @@ -228,13 +283,14 @@ public class PartitionSenderRootExec extends BaseRootExec { /** * Find min and max record count seen across the outgoing batches and put them in stats. - * @param outgoing */ - private void updateAggregateStats(List<? extends PartitionOutgoingBatch> outgoing) { - for (PartitionOutgoingBatch o : outgoing) { - long totalRecords = o.getTotalRecords(); - minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); - maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); + private void updateAggregateStats() { + for (Partitioner part : partitioner.getPartitioners() ) { + for (PartitionOutgoingBatch o : part.getOutgoingBatches()) { + long totalRecords = o.getTotalRecords(); + minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); + maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); + } } stats.setLongStat(Metric.MIN_RECORDS, minReceiverRecordCount); stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount); @@ -242,9 +298,9 @@ public class PartitionSenderRootExec extends BaseRootExec { @Override public void receivingFragmentFinished(FragmentHandle handle) { - int id = handle.getMinorFragmentId(); + final int id = handle.getMinorFragmentId(); if (remainingReceivers.compareAndSet(id, 0, 1)) { - partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate(); + partitioner.getOutgoingBatches(id).terminate(); int remaining = remaingReceiverCount.decrementAndGet(); if (remaining == 0) { done = true; @@ -256,7 +312,7 @@ public class PartitionSenderRootExec extends BaseRootExec { logger.debug("Partition sender stopping."); ok = false; if (partitioner != null) { - updateAggregateStats(partitioner.getOutgoingBatches()); + updateAggregateStats(); partitioner.clear(); } sendCount.waitForSendComplete(); @@ -299,4 +355,9 @@ public class PartitionSenderRootExec extends BaseRootExec { } stats.addLongStat(Metric.BATCHES_SENT, 1); } + + @VisibleForTesting + protected PartitionerDecorator getPartitioner() { + return partitioner; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 5ed9c39..9d6e98f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -37,13 +37,22 @@ public interface Partitioner { OperatorStats stats, SendingAccountor sendingAccountor, OperatorContext oContext, - StatusHandler statusHandler) throws SchemaChangeException; + StatusHandler statusHandler, + int start, int count) throws SchemaChangeException; public abstract void partitionBatch(RecordBatch incoming) throws IOException; public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; public abstract void initialize(); public abstract void clear(); public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches(); + /** + * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner + * @param minorFragmentIndex + * @return PartitionOutgoingBatch that matches index within Partitioner. This method can + * return null if index does not fall within boundary of this Partitioner + */ + public abstract PartitionOutgoingBatch getOutgoingBatch(int index); + public abstract OperatorStats getStats(); public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java new file mode 100644 index 0000000..c3261dc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.record.RecordBatch; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +/** + * Decorator class to hide multiple Partitioner existence from the caller + * since this class involves multithreaded processing of incoming batches + * as well as flushing it needs special handling of OperatorStats - stats + * since stats are not suitable for use in multithreaded environment + * The algorithm to figure out processing versus wait time is based on following formula: + * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner) + */ +public class PartitionerDecorator { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class); + + private List<Partitioner> partitioners; + private final OperatorStats stats; + private final String tName; + private final String childThreadPrefix; + private final ExecutorService executor; + + + public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) { + this.partitioners = partitioners; + this.stats = stats; + this.executor = context.getDrillbitContext().getExecutor(); + this.tName = Thread.currentThread().getName(); + this.childThreadPrefix = "Partitioner-" + tName + "-"; + } + + /** + * partitionBatch - decorator method to call real Partitioner(s) to process incoming batch + * uses either threading or not threading approach based on number Partitioners + * @param incoming + * @throws IOException + */ + public void partitionBatch(final RecordBatch incoming) throws IOException { + executeMethodLogic(new PartitionBatchHandlingClass(incoming)); + } + + /** + * flushOutgoingBatches - decorator to call real Partitioner(s) flushOutgoingBatches + * @param isLastBatch + * @param schemaChanged + * @throws IOException + */ + public void flushOutgoingBatches(final boolean isLastBatch, final boolean schemaChanged) throws IOException { + executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, schemaChanged)); + } + + /** + * decorator method to call multiple Partitioners initialize() + */ + public void initialize() { + for (Partitioner part : partitioners ) { + part.initialize(); + } + } + + /** + * decorator method to call multiple Partitioners clear() + */ + public void clear() { + for (Partitioner part : partitioners ) { + part.clear(); + } + } + + /** + * Helper method to get PartitionOutgoingBatch based on the index + * since we may have more then one Partitioner + * As number of Partitioners should be very small AND this method it used very rarely, + * so it is OK to loop in order to find right partitioner + * @param index - index of PartitionOutgoingBatch + * @return PartitionOutgoingBatch + */ + public PartitionOutgoingBatch getOutgoingBatches(int index) { + for (Partitioner part : partitioners ) { + PartitionOutgoingBatch outBatch = part.getOutgoingBatch(index); + if ( outBatch != null ) { + return outBatch; + } + } + return null; + } + + @VisibleForTesting + protected List<Partitioner> getPartitioners() { + return partitioners; + } + + /** + * Helper to execute the different methods wrapped into same logic + * @param iface + * @throws IOException + */ + protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException { + if (partitioners.size() == 1 ) { + // no need for threads + final OperatorStats localStatsSingle = partitioners.get(0).getStats(); + localStatsSingle.clear(); + localStatsSingle.startProcessing(); + try { + iface.execute(partitioners.get(0)); + } finally { + localStatsSingle.stopProcessing(); + stats.mergeMetrics(localStatsSingle); + // since main stats did not have any wait time - adjust based of partitioner stats wait time + // main stats processing time started recording in BaseRootExec + stats.adjustWaitNanos(localStatsSingle.getWaitNanos()); + } + return; + } + + long maxProcessTime = 0l; + // start waiting on main stats to adjust by sum(max(processing)) at the end + stats.startWait(); + final CountDownLatch latch = new CountDownLatch(partitioners.size()); + final List<CustomRunnable> runnables = Lists.newArrayList(); + try { + int i = 0; + for (final Partitioner part : partitioners ) { + runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, part)); + executor.submit(runnables.get(i++)); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + IOException excep = null; + for (final CustomRunnable runnable : runnables ) { + IOException myException = runnable.getException(); + if ( myException != null ) { + if ( excep == null ) { + excep = myException; + } else { + excep.addSuppressed(myException); + } + } + final OperatorStats localStats = runnable.getPart().getStats(); + long currentProcessingNanos = localStats.getProcessingNanos(); + // find out max Partitioner processing time + maxProcessTime = (currentProcessingNanos > maxProcessTime) ? currentProcessingNanos : maxProcessTime; + stats.mergeMetrics(localStats); + } + if ( excep != null ) { + throw excep; + } + } finally { + stats.stopWait(); + // scale down main stats wait time based on calculated processing time + // since we did not wait for whole duration of above execution + stats.adjustWaitNanos(-maxProcessTime); + } + + } + + /** + * Helper interface to generalize functionality executed in the thread + * since it is absolutely the same for partitionBatch and flushOutgoingBatches + * protected is for testing purposes + */ + protected interface GeneralExecuteIface { + public void execute(Partitioner partitioner) throws IOException; + } + + /** + * Class to handle running partitionBatch method + * + */ + private static class PartitionBatchHandlingClass implements GeneralExecuteIface { + + private final RecordBatch incoming; + + public PartitionBatchHandlingClass(RecordBatch incoming) { + this.incoming = incoming; + } + + @Override + public void execute(Partitioner part) throws IOException { + part.partitionBatch(incoming); + } + } + + /** + * Class to handle running flushOutgoingBatches method + * + */ + private static class FlushBatchesHandlingClass implements GeneralExecuteIface { + + private final boolean isLastBatch; + private final boolean schemaChanged; + + public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) { + this.isLastBatch = isLastBatch; + this.schemaChanged = schemaChanged; + } + + @Override + public void execute(Partitioner part) throws IOException { + part.flushOutgoingBatches(isLastBatch, schemaChanged); + } + } + + /** + * Helper class to wrap Runnable with customized naming + * Exception handling + * + */ + private static class CustomRunnable implements Runnable { + + private final String parentThreadName; + private final CountDownLatch latch; + private final GeneralExecuteIface iface; + private final Partitioner part; + private volatile IOException exp; + + public CustomRunnable(String parentThreadName, CountDownLatch latch, GeneralExecuteIface iface, Partitioner part) { + this.parentThreadName = parentThreadName; + this.latch = latch; + this.iface = iface; + this.part = part; + } + + @Override + public void run() { + final Thread currThread = Thread.currentThread(); + final String currThreadName = currThread.getName(); + final OperatorStats localStats = part.getStats(); + try { + final String newThreadName = parentThreadName + currThread.getId(); + currThread.setName(newThreadName); + localStats.clear(); + localStats.startProcessing(); + iface.execute(part); + } catch (IOException e) { + exp = e; + } finally { + localStats.stopProcessing(); + currThread.setName(currThreadName); + latch.countDown(); + } + } + + public IOException getException() { + return this.exp; + } + + public Partitioner getPart() { + return part; + } + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 93d719c..33d6f95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -61,6 +61,9 @@ public abstract class PartitionerTemplate implements Partitioner { private SelectionVector2 sv2; private SelectionVector4 sv4; private RecordBatch incoming; + private OperatorStats stats; + private int start; + private int end; private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList(); private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE; @@ -74,15 +77,27 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override + public PartitionOutgoingBatch getOutgoingBatch(int index) { + if ( index >= start && index < end) { + return outgoingBatches.get(index - start); + } + return null; + } + + @Override public final void setup(FragmentContext context, RecordBatch incoming, HashPartitionSender popConfig, OperatorStats stats, SendingAccountor sendingAccountor, OperatorContext oContext, - StatusHandler statusHandler) throws SchemaChangeException { + StatusHandler statusHandler, + int start, int end) throws SchemaChangeException { this.incoming = incoming; + this.stats = stats; + this.start = start; + this.end = end; doSetup(context, incoming, null); // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory @@ -92,9 +107,15 @@ public abstract class PartitionerTemplate implements Partitioner { outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1; } + int fieldId = 0; for (MinorFragmentEndpoint destination : popConfig.getDestinations()) { - outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, + // create outgoingBatches only for subset of Destination Points + if ( fieldId >= start && fieldId < end ) { + logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId); + outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId(), statusHandler)); + } + fieldId++; } for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { @@ -119,6 +140,11 @@ public abstract class PartitionerTemplate implements Partitioner { } } + @Override + public OperatorStats getStats() { + return stats; + } + /** * Flush each outgoing record batch, and optionally reset the state of each outgoing record * batch (on schema change). Note that the schema is updated based on incoming at the time @@ -150,24 +176,21 @@ public abstract class PartitionerTemplate implements Partitioner { switch(svMode) { case NONE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { - OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(recordId)); - outgoingBatch.copy(recordId); + doCopy(recordId); } break; case TWO_BYTE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { int svIndex = sv2.getIndex(recordId); - OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); - outgoingBatch.copy(svIndex); + doCopy(svIndex); } break; case FOUR_BYTE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { int svIndex = sv4.get(recordId); - OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); - outgoingBatch.copy(svIndex); + doCopy(svIndex); } break; @@ -176,6 +199,20 @@ public abstract class PartitionerTemplate implements Partitioner { } } + /** + * Helper method to copy data based on partition + * @param svIndex + * @param incoming + * @throws IOException + */ + private void doCopy(int svIndex) throws IOException { + int index = doEval(svIndex); + if ( index >= start && index < end) { + OutgoingRecordBatch outgoingBatch = outgoingBatches.get(index - start); + outgoingBatch.copy(svIndex); + } + } + @Override public void clear() { for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { @@ -372,5 +409,6 @@ public abstract class PartitionerTemplate implements Partitioner { public void clear(){ vectorContainer.clear(); } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index 9b0944e..edec7e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -49,6 +49,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate PhysicalOperator child = exchange.getChild().accept(this, iNode); PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child); materializedSender.setOperatorId(0); + materializedSender.setCost(exchange.getCost()); // logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child); return materializedSender; @@ -57,6 +58,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId()); materializedReceiver.setOperatorId(Short.MAX_VALUE & exchange.getOperatorId()); // logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver); + materializedReceiver.setCost(exchange.getCost()); return materializedReceiver; } } @@ -100,6 +102,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate children.add(child.accept(this, iNode)); } PhysicalOperator newOp = op.getNewWithChildren(children); + newOp.setCost(op.getCost()); newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); return newOp; } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index abbc910..f320157 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -49,6 +49,9 @@ public class PlannerSettings implements Context{ public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, Double.MAX_VALUE, 1.0d); public static final OptionValidator MUX_EXCHANGE = new BooleanValidator("planner.enable_mux_exchange", true); public static final OptionValidator DEMUX_EXCHANGE = new BooleanValidator("planner.enable_demux_exchange", false); + public static final OptionValidator PARTITION_SENDER_THREADS_FACTOR = new LongValidator("planner.partitioner_sender_threads_factor", 1); + public static final OptionValidator PARTITION_SENDER_MAX_THREADS = new LongValidator("planner.partitioner_sender_max_threads", 8); + public static final OptionValidator PARTITION_SENDER_SET_THREADS = new LongValidator("planner.partitioner_sender_set_threads", -1); public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false); public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10); public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true); http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index b0b478e..1b35aec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.server; import io.netty.channel.EventLoopGroup; import java.util.Collection; +import java.util.concurrent.ExecutorService; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.compile.CodeCompiler; @@ -57,8 +58,10 @@ public class DrillbitContext { private final SystemOptionManager systemOptions; private final PStoreProvider provider; private final CodeCompiler compiler; + private final ExecutorService executor; - public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider) { + public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider, + ExecutorService executor) { super(); Preconditions.checkNotNull(endpoint); Preconditions.checkNotNull(context); @@ -71,6 +74,7 @@ public class DrillbitContext { this.connectionsPool = connectionsPool; this.endpoint = endpoint; this.provider = provider; + this.executor = executor; this.storagePlugins = new StoragePluginRegistry(this); this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storagePlugins); this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig()); @@ -154,4 +158,8 @@ public class DrillbitContext { public CodeCompiler getCompiler() { return compiler; } + + public ExecutorService getExecutor() { + return executor; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index c5d4656..fa4725d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -59,6 +59,9 @@ public class SystemOptionManager implements OptionManager { PlannerSettings.IDENTIFIER_MAX_LENGTH, PlannerSettings.HASH_JOIN_SWAP, PlannerSettings.HASH_JOIN_SWAP_MARGIN_FACTOR, + PlannerSettings.PARTITION_SENDER_THREADS_FACTOR, + PlannerSettings.PARTITION_SENDER_MAX_THREADS, + PlannerSettings.PARTITION_SENDER_SET_THREADS, ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 99c6ab8..125c56d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -101,9 +101,9 @@ public class WorkManager implements Closeable { public void start(DrillbitEndpoint endpoint, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider) { - this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider); - // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) this.executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-")); + this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, executor); + // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) this.controller = controller; this.eventThread.start(); this.statusThread.start(); @@ -130,6 +130,10 @@ public class WorkManager implements Closeable { } } + public ExecutorService getExecutor() { + return executor; + } + public WorkEventBus getWorkBus() { return workBus; } http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java index 4aaaa78..2a0aedc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java @@ -104,7 +104,7 @@ public class TestOptiqPlans extends ExecTest { } }; RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet(); - DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, workBus, new LocalPStoreProvider(DrillConfig.create())); + DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, workBus, new LocalPStoreProvider(DrillConfig.create()), null); QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), QueryId.getDefaultInstance(), bitContext); PhysicalPlanReader reader = bitContext.getPlanReader(); LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8)); http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java new file mode 100644 index 0000000..bdb020b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java @@ -0,0 +1,392 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; +import java.util.Random; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.expr.fn.impl.DateUtility; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.QueryDateTimeInfo; +import org.apache.drill.exec.physical.MinorFragmentEndpoint; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; +import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.config.HashToRandomExchange; +import org.apache.drill.exec.physical.impl.TopN.TopNBatch; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.PlanningSet; +import org.apache.drill.exec.planner.fragment.SimpleParallelizer; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.UserBitShared.MetricValue; +import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionList; +import org.apache.drill.exec.server.options.OptionValue; +import org.apache.drill.exec.server.options.OptionValue.OptionType; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; + +/** + * PartitionerSenderRootExec test to cover mostly part that deals with multithreaded + * ability to copy and flush data + * + */ +public class TestPartitionSender extends PlanTestBase { + + private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer( + 1 /*parallelizationThreshold (slice_count)*/, + 6 /*maxWidthPerNode*/, + 1000 /*maxGlobalWidth*/, + 1.2 /*affinityFactor*/); + + private final static UserSession USER_SESSION = UserSession.Builder.newBuilder() + .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()) + .build(); + + + public static TemporaryFolder testTempFolder = new TemporaryFolder(); + + private final static int NUM_DEPTS = 40; + private final static int NUM_EMPLOYEES = 1000; + private final static int DRILLBITS_COUNT = 3; + + private static String empTableLocation; + + private static String groupByQuery; + + @BeforeClass + public static void setupTempFolder() throws IOException { + testTempFolder.create(); + } + + @BeforeClass + public static void generateTestDataAndQueries() throws Exception { + // Table consists of two columns "emp_id", "emp_name" and "dept_id" + empTableLocation = testTempFolder.newFolder().getAbsolutePath(); + + // Write 100 records for each new file + final int empNumRecsPerFile = 100; + for(int fileIndex=0; fileIndex<NUM_EMPLOYEES/empNumRecsPerFile; fileIndex++) { + File file = new File(empTableLocation + File.separator + fileIndex + ".json"); + PrintWriter printWriter = new PrintWriter(file); + for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < (fileIndex+1)*empNumRecsPerFile; recordIndex++) { + String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d }", + recordIndex, recordIndex, recordIndex % NUM_DEPTS); + printWriter.println(record); + } + printWriter.close(); + } + + // Initialize test queries + groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", empTableLocation); + } + + @AfterClass + public static void cleanupTempFolder() throws IOException { + testTempFolder.delete(); + } + + @Test + /** + * Main test to go over different scenarios + * @throws Exception + */ + public void testPartitionSenderCostToThreads() throws Exception { + + final VectorContainer container = new VectorContainer(); + container.buildSchema(SelectionVectorMode.FOUR_BYTE); + final SelectionVector4 sv = Mockito.mock(SelectionVector4.class, "SelectionVector4"); + Mockito.when(sv.getCount()).thenReturn(100); + Mockito.when(sv.getTotalCount()).thenReturn(100); + for (int i = 0; i < 100; i++ ) { + Mockito.when(sv.get(i)).thenReturn(i); + } + + final TopNBatch.SimpleRecordBatch incoming = new TopNBatch.SimpleRecordBatch(container, sv, null); + + setDrillbitCount(DRILLBITS_COUNT); + + test("ALTER SESSION SET `planner.slice_target`=1"); + String plan = getPlanInString("EXPLAIN PLAN FOR " + groupByQuery, JSON_FORMAT); + System.out.println("Plan: " + plan); + + final DrillbitContext drillbitContext = getDrillbitContext(); + final PhysicalPlanReader planReader = drillbitContext.getPlanReader(); + final PhysicalPlan physicalPlan = planReader.readPhysicalPlan(plan); + final Fragment rootFragment = PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan); + final PlanningSet planningSet = new PlanningSet(); + final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(config); + + // Create a planningSet to get the assignment of major fragment ids to fragments. + PARALLELIZER.initFragmentWrappers(rootFragment, planningSet); + + final List<PhysicalOperator> operators = physicalPlan.getSortedOperators(false); + + // get HashToRandomExchange physical operator + HashToRandomExchange hashToRandomExchange = null; + for ( PhysicalOperator operator : operators) { + if ( operator instanceof HashToRandomExchange) { + hashToRandomExchange = (HashToRandomExchange) operator; + break; + } + } + + final OptionList options = new OptionList(); + // try multiple scenarios with different set of options + options.add(OptionValue.createLong(OptionType.SESSION, "planner.slice_target", 1)); + testThreadsHelper(hashToRandomExchange, drillbitContext, options, + incoming, registry, planReader, planningSet, rootFragment, 2); + + options.clear(); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.slice_target", 1)); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.partitioner_sender_max_threads", 10)); + hashToRandomExchange.setCost(1000); + testThreadsHelper(hashToRandomExchange, drillbitContext, options, + incoming, registry, planReader, planningSet, rootFragment, 10); + + options.clear(); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.slice_target", 1000)); + options.add(OptionValue.createLong(OptionType.SESSION, "planner.partitioner_sender_threads_factor",2)); + hashToRandomExchange.setCost(14000); + testThreadsHelper(hashToRandomExchange, drillbitContext, options, + incoming, registry, planReader, planningSet, rootFragment, 2); + } + + /** + * Core of the testing + * @param hashToRandomExchange + * @param drillbitContext + * @param options + * @param incoming + * @param registry + * @param planReader + * @param planningSet + * @param rootFragment + * @param expectedThreadsCount + * @throws Exception + */ + private void testThreadsHelper(HashToRandomExchange hashToRandomExchange, DrillbitContext drillbitContext, OptionList options, + RecordBatch incoming, FunctionImplementationRegistry registry, PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment, + int expectedThreadsCount) throws Exception { + + long queryStartTime = System.currentTimeMillis(); + int timeZone = DateUtility.getIndex(System.getProperty("user.timezone")); + QueryDateTimeInfo queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone); + + final QueryWorkUnit qwu = PARALLELIZER.getFragments(options, drillbitContext.getEndpoint(), + QueryId.getDefaultInstance(), + drillbitContext.getBits(), planReader, rootFragment, USER_SESSION, queryDateTimeInfo); + + final List<MinorFragmentEndpoint> mfEndPoints = PhysicalOperatorUtil.getIndexOrderedEndpoints(Lists.newArrayList(drillbitContext.getBits())); + + for(PlanFragment planFragment : qwu.getFragments()) { + if (!planFragment.getFragmentJson().contains("hash-partition-sender")) { + continue; + } + MockPartitionSenderRootExec partionSenderRootExec = null; + FragmentContext context = null; + try { + context = new FragmentContext(drillbitContext, planFragment, null, registry); + final int majorFragmentId = planFragment.getHandle().getMajorFragmentId(); + final HashPartitionSender partSender = new HashPartitionSender(majorFragmentId, hashToRandomExchange, hashToRandomExchange.getExpression(), mfEndPoints); + partionSenderRootExec = new MockPartitionSenderRootExec(context, incoming, partSender); + assertEquals("Number of threads calculated", expectedThreadsCount, partionSenderRootExec.getNumberPartitions()); + + partionSenderRootExec.createPartitioner(); + final PartitionerDecorator partDecor = partionSenderRootExec.getPartitioner(); + assertNotNull(partDecor); + + List<Partitioner> partitioners = partDecor.getPartitioners(); + assertNotNull(partitioners); + final int actualThreads = DRILLBITS_COUNT > expectedThreadsCount ? expectedThreadsCount : DRILLBITS_COUNT; + assertEquals("Number of partitioners", actualThreads, partitioners.size()); + + for ( int i = 0; i < mfEndPoints.size(); i++) { + assertNotNull("PartitionOutgoingBatch", partDecor.getOutgoingBatches(i)); + } + + // check distribution of PartitionOutgoingBatch - should be even distribution + boolean isFirst = true; + int prevBatchCountSize = 0; + int batchCountSize = 0; + for (Partitioner part : partitioners ) { + final List<PartitionOutgoingBatch> outBatch = (List<PartitionOutgoingBatch>) part.getOutgoingBatches(); + batchCountSize = outBatch.size(); + if ( !isFirst ) { + assertTrue(Math.abs(batchCountSize - prevBatchCountSize) <= 1); + } else { + isFirst = false; + } + prevBatchCountSize = batchCountSize; + } + + partionSenderRootExec.getStats().startProcessing(); + try { + partDecor.partitionBatch(incoming); + } finally { + partionSenderRootExec.getStats().stopProcessing(); + } + if ( actualThreads == 1 ) { + assertEquals("With single thread parent and child waitNanos should match", partitioners.get(0).getStats().getWaitNanos(), partionSenderRootExec.getStats().getWaitNanos()); + } + + // testing values distribution + partitioners = partDecor.getPartitioners(); + isFirst = true; + // since we have fake Nullvector distribution is skewed + for (Partitioner part : partitioners ) { + final List<PartitionOutgoingBatch> outBatches = (List<PartitionOutgoingBatch>) part.getOutgoingBatches(); + for (PartitionOutgoingBatch partOutBatch : outBatches ) { + final int recordCount = ((VectorAccessible) partOutBatch).getRecordCount(); + if ( isFirst ) { + assertEquals("RecordCount", 100, recordCount); + isFirst = false; + } else { + assertEquals("RecordCount", 0, recordCount); + } + } + } + // test exceptions within threads + // test stats merging + partionSenderRootExec.getStats().startProcessing(); + try { + partDecor.executeMethodLogic(new InjectExceptionTest()); + fail("Should throw IOException here"); + } catch (IOException ioe) { + final OperatorProfile.Builder oPBuilder = OperatorProfile.newBuilder(); + partionSenderRootExec.getStats().addAllMetrics(oPBuilder); + final List<MetricValue> metrics = oPBuilder.getMetricList(); + for ( MetricValue metric : metrics) { + if ( Metric.BYTES_SENT.metricId() == metric.getMetricId() ) { + assertEquals("Should add metricValue irrespective of exception", 5*actualThreads, metric.getLongValue()); + } + if (Metric.SENDING_THREADS_COUNT.metricId() == metric.getMetricId()) { + assertEquals(actualThreads, metric.getLongValue()); + } + } + assertEquals(actualThreads-1, ioe.getSuppressed().length); + } finally { + partionSenderRootExec.getStats().stopProcessing(); + } + } finally { + // cleanup + partionSenderRootExec.close(); + context.close(); + } + } + } + + @Test + /** + * Testing partitioners distribution algorithm + * @throws Exception + */ + public void testAlgorithm() throws Exception { + int outGoingBatchCount; + int numberPartitions; + int k = 0; + final Random rand = new Random(); + while ( k < 1000 ) { + outGoingBatchCount = rand.nextInt(1000)+1; + numberPartitions = rand.nextInt(32)+1; + final int actualPartitions = outGoingBatchCount > numberPartitions ? numberPartitions : outGoingBatchCount; + final int divisor = Math.max(1, outGoingBatchCount/actualPartitions); + + final int longTail = outGoingBatchCount % actualPartitions; + int startIndex = 0; + int endIndex = 0; + for (int i = 0; i < actualPartitions; i++) { + startIndex = endIndex; + endIndex = startIndex + divisor; + if ( i < longTail ) { + endIndex++; + } + } + assertTrue("endIndex can not be > outGoingBatchCount", endIndex == outGoingBatchCount ); + k++; + } + } + + /** + * Helper class to expose some functionality of PartitionSenderRootExec + * + */ + private static class MockPartitionSenderRootExec extends PartitionSenderRootExec { + + public MockPartitionSenderRootExec(FragmentContext context, + RecordBatch incoming, HashPartitionSender operator) + throws OutOfMemoryException { + super(context, incoming, operator); + } + + public void close() { + oContext.close(); + } + + public int getNumberPartitions() { + return numberPartitions; + } + + public OperatorStats getStats() { + return this.stats; + } + } + + /** + * Helper class to inject exceptions in the threads + * + */ + private static class InjectExceptionTest implements GeneralExecuteIface { + + @Override + public void execute(Partitioner partitioner) throws IOException { + // throws IOException + partitioner.getStats().addLongStat(Metric.BYTES_SENT, 5); + throw new IOException("Test exception handling"); + } + } +}