Repository: drill
Updated Branches:
  refs/heads/master 54df129ca -> 49d316a1c


DRILL-2210 Introducing multithreading capability to PartitonerSender


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/49d316a1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/49d316a1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/49d316a1

Branch: refs/heads/master
Commit: 49d316a1cb22f79061e246b5e197547dac730232
Parents: 54df129
Author: Yuliya Feldman <yfeld...@maprtech.com>
Authored: Tue Feb 17 00:09:07 2015 -0800
Committer: vkorukanti <venki.koruka...@gmail.com>
Committed: Wed Mar 18 17:40:37 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/compile/CodeCompiler.java |  13 +-
 .../apache/drill/exec/ops/FragmentContext.java  |   8 +
 .../apache/drill/exec/ops/OperatorStats.java    |  81 ++++
 .../exec/physical/config/IteratorValidator.java |   2 +-
 .../exec/physical/impl/SendingAccountor.java    |   9 +-
 .../PartitionSenderRootExec.java                |  91 ++++-
 .../impl/partitionsender/Partitioner.java       |  11 +-
 .../partitionsender/PartitionerDecorator.java   | 282 +++++++++++++
 .../partitionsender/PartitionerTemplate.java    |  54 ++-
 .../exec/planner/fragment/Materializer.java     |   3 +
 .../exec/planner/physical/PlannerSettings.java  |   3 +
 .../drill/exec/server/DrillbitContext.java      |  10 +-
 .../server/options/SystemOptionManager.java     |   3 +
 .../org/apache/drill/exec/work/WorkManager.java |   8 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   2 +-
 .../partitionsender/TestPartitionSender.java    | 392 +++++++++++++++++++
 16 files changed, 938 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
index 57a6660..f0147ef 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.compile;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -32,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
 
 public class CodeCompiler {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CodeCompiler.class);
@@ -53,10 +55,19 @@ public class CodeCompiler {
 
   @SuppressWarnings("unchecked")
   public <T> T getImplementationClass(final CodeGenerator<?> cg) throws 
ClassTransformationException, IOException {
+    return (T) getImplementationClass(cg, 1).get(0);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> List<T> getImplementationClass(final CodeGenerator<?> cg, int 
instanceNumber) throws ClassTransformationException, IOException {
     cg.generate();
     try {
       final GeneratedClassEntry ce = cache.get(cg);
-      return (T) ce.clazz.newInstance();
+      List<T> tList = Lists.newArrayList();
+      for ( int i = 0; i < instanceNumber; i++) {
+        tList.add((T) ce.clazz.newInstance());
+      }
+      return tList;
     } catch (ExecutionException | InstantiationException | 
IllegalAccessException e) {
       throw new ClassTransformationException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 9fc9ad1..2a6660e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -222,6 +222,14 @@ public class FragmentContext implements AutoCloseable, 
UdfUtilities {
     return context.getCompiler().getImplementationClass(cg);
   }
 
+  public <T> List<T> getImplementationClass(ClassGenerator<T> cg, int 
instanceCount) throws ClassTransformationException, IOException {
+    return getImplementationClass(cg.getCodeGenerator(), instanceCount);
+  }
+
+  public <T> List<T> getImplementationClass(CodeGenerator<T> cg, int 
instanceCount) throws ClassTransformationException, IOException {
+    return context.getCompiler().getImplementationClass(cg, instanceCount);
+  }
+
   /**
    * Get the user connection associated with this fragment.  This return null 
unless this is a root fragment.
    * @return The RPC connection to the query submitter.

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 0e9da0e..3f671e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.ops;
 
+import java.util.Iterator;
+
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.MetricValue;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
@@ -24,6 +26,8 @@ import 
org.apache.drill.exec.proto.UserBitShared.StreamProfile;
 
 import com.carrotsearch.hppc.IntDoubleOpenHashMap;
 import com.carrotsearch.hppc.IntLongOpenHashMap;
+import com.carrotsearch.hppc.cursors.IntDoubleCursor;
+import com.carrotsearch.hppc.cursors.IntLongCursor;
 
 public class OperatorStats {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OperatorStats.class);
@@ -53,16 +57,39 @@ public class OperatorStats {
   private long waitMark;
 
   private long schemas;
+  private int inputCount;
 
   public OperatorStats(OpProfileDef def, BufferAllocator allocator){
     this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), 
allocator);
   }
 
+  /**
+   * Copy constructor to be able to create a copy of existing stats object 
shell and use it independently
+   * this is useful if stats have to be updated in different threads, since it 
is not really
+   * possible to update such stats as waitNanos, setupNanos and 
processingNanos across threads
+   * @param original - OperatorStats object to create a copy from
+   * @param isClean - flag to indicate whether to start with clean state 
indicators or inherit those from original object
+   */
+  public OperatorStats(OperatorStats original, boolean isClean) {
+    this(original.operatorId, original.operatorType, original.inputCount, 
original.allocator);
+
+    if ( !isClean ) {
+      inProcessing = original.inProcessing;
+      inSetup = original.inSetup;
+      inWait = original.inWait;
+
+      processingMark = original.processingMark;
+      setupMark = original.setupMark;
+      waitMark = original.waitMark;
+    }
+  }
+
   private OperatorStats(int operatorId, int operatorType, int inputCount, 
BufferAllocator allocator) {
     super();
     this.allocator = allocator;
     this.operatorId = operatorId;
     this.operatorType = operatorType;
+    this.inputCount = inputCount;
     this.recordsReceivedByInput = new long[inputCount];
     this.batchesReceivedByInput = new long[inputCount];
     this.schemaCountByInput = new long[inputCount];
@@ -71,6 +98,44 @@ public class OperatorStats {
   private String assertionError(String msg){
     return String.format("Failure while %s for operator id %d. Currently have 
states of processing:%s, setup:%s, waiting:%s.", msg, operatorId, inProcessing, 
inSetup, inWait);
   }
+  /**
+   * OperatorStats merger - to merge stats from other OperatorStats
+   * this is needed in case some processing is multithreaded that needs to have
+   * separate OperatorStats to deal with
+   * WARN - this will only work for metrics that can be added
+   * @param from - OperatorStats from where to merge to "this"
+   * @return OperatorStats - for convenience so one can merge multiple stats 
in one go
+   */
+  public OperatorStats mergeMetrics(OperatorStats from) {
+    final IntLongOpenHashMap fromMetrics = from.longMetrics;
+
+    final Iterator<IntLongCursor> iter = fromMetrics.iterator();
+    while (iter.hasNext()) {
+      final IntLongCursor next = iter.next();
+      longMetrics.putOrAdd(next.key, next.value, next.value);
+    }
+
+    final IntDoubleOpenHashMap fromDMetrics = from.doubleMetrics;
+    final Iterator<IntDoubleCursor> iterD = fromDMetrics.iterator();
+
+    while (iterD.hasNext()) {
+      final IntDoubleCursor next = iterD.next();
+      doubleMetrics.putOrAdd(next.key, next.value, next.value);
+    }
+    return this;
+  }
+
+  /**
+   * Clear stats
+   */
+  public void clear() {
+    processingNanos = 0l;
+    setupNanos = 0l;
+    waitNanos = 0l;
+    longMetrics.clear();
+    doubleMetrics.clear();
+  }
+
   public void startSetup() {
     assert !inSetup  : assertionError("starting setup");
     stopProcessing();
@@ -183,4 +248,20 @@ public class OperatorStats {
     doubleMetrics.put(metric.metricId(), value);
   }
 
+  public long getWaitNanos() {
+    return waitNanos;
+  }
+
+  /**
+   * Adjust waitNanos based on client calculations
+   * @param waitNanosOffset - could be negative as well as positive
+   */
+  public void adjustWaitNanos(long waitNanosOffset) {
+    this.waitNanos += waitNanosOffset;
+  }
+
+  public long getProcessingNanos() {
+    return processingNanos;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
index 64cf7c5..b8ecae4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
@@ -26,7 +26,7 @@ public class IteratorValidator extends AbstractSingle{
 
   public IteratorValidator(PhysicalOperator child) {
     super(child);
-
+    setCost(child.getCost());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
index 7af7b65..3920f9c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl;
 
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Account for whether all messages sent have been completed. Necessary before 
finishing a task so we don't think
@@ -28,11 +29,11 @@ import java.util.concurrent.Semaphore;
 public class SendingAccountor {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
 
-  private int batchesSent = 0;
+  private final AtomicInteger batchesSent = new AtomicInteger(0);
   private Semaphore wait = new Semaphore(0);
 
   public void increment() {
-    batchesSent++;
+    batchesSent.incrementAndGet();
   }
 
   public void decrement() {
@@ -41,8 +42,8 @@ public class SendingAccountor {
 
   public synchronized void waitForSendComplete() {
     try {
-      wait.acquire(batchesSent);
-      batchesSent = 0;
+      wait.acquire(batchesSent.get());
+      batchesSent.set(0);
     } catch (InterruptedException e) {
       logger.warn("Failure while waiting for send complete.", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index abf9cbc..6a73cdd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -34,10 +35,12 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -46,8 +49,10 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.vector.CopyUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JType;
@@ -57,13 +62,15 @@ public class PartitionSenderRootExec extends BaseRootExec {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
   private RecordBatch incoming;
   private HashPartitionSender operator;
-  private Partitioner partitioner;
+  private PartitionerDecorator partitioner;
+
   private FragmentContext context;
   private boolean ok = true;
   private final SendingAccountor sendCount = new SendingAccountor();
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final StatusHandler statusHandler;
+  private final double cost;
 
   private final AtomicIntegerArray remainingReceivers;
   private final AtomicInteger remaingReceiverCount;
@@ -72,6 +79,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   long minReceiverRecordCount = Long.MAX_VALUE;
   long maxReceiverRecordCount = Long.MIN_VALUE;
+  protected final int numberPartitions;
+  protected final int actualPartitions;
 
   public enum Metric implements MetricDef {
     BATCHES_SENT,
@@ -79,7 +88,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
     MIN_RECORDS,
     MAX_RECORDS,
     N_RECEIVERS,
-    BYTES_SENT;
+    BYTES_SENT,
+    SENDING_THREADS_COUNT,
+    COST;
 
     @Override
     public int metricId() {
@@ -99,8 +110,32 @@ public class PartitionSenderRootExec extends BaseRootExec {
     this.statusHandler = new StatusHandler(sendCount, context);
     this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
     this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
-
     stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
+    // Algorithm to figure out number of threads to parallelize output
+    // numberOfRows/sliceTarget/numReceivers/threadfactor
+    // threadFactor = 4 by default
+    // one more param to put a limit on number max number of threads: default 
32
+    this.cost = operator.getChild().getCost();
+    final OptionManager optMgr = context.getOptions();
+    long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val;
+    int threadFactor = 
optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue();
+    int tmpParts = 1;
+    if ( sliceTarget != 0 && outGoingBatchCount != 0 ) {
+      tmpParts = (int) Math.round((((cost / (sliceTarget*1.0)) / 
(outGoingBatchCount*1.0)) / (threadFactor*1.0)));
+      if ( tmpParts < 1) {
+        tmpParts = 1;
+      }
+    }
+    final int imposedThreads = 
optMgr.getOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName()).num_val.intValue();
+    if (imposedThreads > 0 ) {
+      this.numberPartitions = imposedThreads;
+    } else {
+      this.numberPartitions = Math.min(tmpParts, 
optMgr.getOption(PlannerSettings.PARTITION_SENDER_MAX_THREADS.getOptionName()).num_val.intValue());
+    }
+    logger.info("Preliminary number of sending threads is: " + 
numberPartitions);
+    this.actualPartitions = outGoingBatchCount > numberPartitions ? 
numberPartitions : outGoingBatchCount;
+    this.stats.setLongStat(Metric.SENDING_THREADS_COUNT, actualPartitions);
+    this.stats.setDoubleStat(Metric.COST, this.cost);
   }
 
   @Override
@@ -189,8 +224,28 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
-  private void createPartitioner() throws SchemaChangeException {
+  @VisibleForTesting
+  protected void createPartitioner() throws SchemaChangeException {
+    final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
+    final int longTail = outGoingBatchCount % actualPartitions;
+
+    final List<Partitioner> subPartitioners = 
createClassInstances(actualPartitions);
+    int startIndex = 0;
+    int endIndex = 0;
+    for (int i = 0; i < actualPartitions; i++) {
+      startIndex = endIndex;
+      endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : 
outGoingBatchCount;
+      if ( i < longTail ) {
+        endIndex++;
+      }
+      final OperatorStats partitionStats = new OperatorStats(stats, true);
+      subPartitioners.get(i).setup(context, incoming, popConfig, 
partitionStats, sendCount, oContext, statusHandler,
+        startIndex, endIndex);
+    }
+    partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+  }
 
+  private List<Partitioner> createClassInstances(int actualPartitions) throws 
SchemaChangeException {
     // set up partitioning function
     final LogicalExpression expr = operator.getExpr();
     final ErrorCollector collector = new ErrorCollectorImpl();
@@ -218,8 +273,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
     try {
       // compile and setup generated code
-      partitioner = context.getImplementationClass(cg);
-      partitioner.setup(context, incoming, popConfig, stats, sendCount, 
oContext, statusHandler);
+      List<Partitioner> subPartitioners = context.getImplementationClass(cg, 
actualPartitions);
+      return subPartitioners;
 
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
@@ -228,13 +283,14 @@ public class PartitionSenderRootExec extends BaseRootExec 
{
 
   /**
    * Find min and max record count seen across the outgoing batches and put 
them in stats.
-   * @param outgoing
    */
-  private void updateAggregateStats(List<? extends PartitionOutgoingBatch> 
outgoing) {
-    for (PartitionOutgoingBatch o : outgoing) {
-      long totalRecords = o.getTotalRecords();
-      minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
-      maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
+  private void updateAggregateStats() {
+    for (Partitioner part : partitioner.getPartitioners() ) {
+      for (PartitionOutgoingBatch o : part.getOutgoingBatches()) {
+        long totalRecords = o.getTotalRecords();
+        minReceiverRecordCount = Math.min(minReceiverRecordCount, 
totalRecords);
+        maxReceiverRecordCount = Math.max(maxReceiverRecordCount, 
totalRecords);
+      }
     }
     stats.setLongStat(Metric.MIN_RECORDS, minReceiverRecordCount);
     stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount);
@@ -242,9 +298,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   @Override
   public void receivingFragmentFinished(FragmentHandle handle) {
-    int id = handle.getMinorFragmentId();
+    final int id = handle.getMinorFragmentId();
     if (remainingReceivers.compareAndSet(id, 0, 1)) {
-      
partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate();
+      partitioner.getOutgoingBatches(id).terminate();
       int remaining = remaingReceiverCount.decrementAndGet();
       if (remaining == 0) {
         done = true;
@@ -256,7 +312,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
     logger.debug("Partition sender stopping.");
     ok = false;
     if (partitioner != null) {
-      updateAggregateStats(partitioner.getOutgoingBatches());
+      updateAggregateStats();
       partitioner.clear();
     }
     sendCount.waitForSendComplete();
@@ -299,4 +355,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
     stats.addLongStat(Metric.BATCHES_SENT, 1);
   }
+
+  @VisibleForTesting
+  protected PartitionerDecorator getPartitioner() {
+    return partitioner;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 5ed9c39..9d6e98f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -37,13 +37,22 @@ public interface Partitioner {
                           OperatorStats stats,
                           SendingAccountor sendingAccountor,
                           OperatorContext oContext,
-                          StatusHandler statusHandler) throws 
SchemaChangeException;
+                          StatusHandler statusHandler,
+                          int start, int count) throws SchemaChangeException;
 
   public abstract void partitionBatch(RecordBatch incoming) throws IOException;
   public abstract void flushOutgoingBatches(boolean isLastBatch, boolean 
schemaChanged) throws IOException;
   public abstract void initialize();
   public abstract void clear();
   public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches();
+  /**
+   * Method to get PartitionOutgoingBatch based on the fact that there can be 
> 1 Partitioner
+   * @param minorFragmentIndex
+   * @return PartitionOutgoingBatch that matches index within Partitioner. 
This method can
+   * return null if index does not fall within boundary of this Partitioner
+   */
+  public abstract PartitionOutgoingBatch getOutgoingBatch(int index);
+  public abstract OperatorStats getStats();
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
new file mode 100644
index 0000000..c3261dc
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+/**
+ * Decorator class to hide multiple Partitioner existence from the caller
+ * since this class involves multithreaded processing of incoming batches
+ * as well as flushing it needs special handling of OperatorStats - stats
+ * since stats are not suitable for use in multithreaded environment
+ * The algorithm to figure out processing versus wait time is based on 
following formula:
+ * totalWaitTime = totalAllPartitionersProcessingTime - 
max(sum(processingTime) by partitioner)
+ */
+public class PartitionerDecorator {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
+
+  private List<Partitioner> partitioners;
+  private final OperatorStats stats;
+  private final String tName;
+  private final String childThreadPrefix;
+  private final ExecutorService executor;
+
+
+  public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats 
stats, FragmentContext context) {
+    this.partitioners = partitioners;
+    this.stats = stats;
+    this.executor = context.getDrillbitContext().getExecutor();
+    this.tName = Thread.currentThread().getName();
+    this.childThreadPrefix = "Partitioner-" + tName + "-";
+  }
+
+  /**
+   * partitionBatch - decorator method to call real Partitioner(s) to process 
incoming batch
+   * uses either threading or not threading approach based on number 
Partitioners
+   * @param incoming
+   * @throws IOException
+   */
+  public void partitionBatch(final RecordBatch incoming) throws IOException {
+    executeMethodLogic(new PartitionBatchHandlingClass(incoming));
+  }
+
+  /**
+   * flushOutgoingBatches - decorator to call real Partitioner(s) 
flushOutgoingBatches
+   * @param isLastBatch
+   * @param schemaChanged
+   * @throws IOException
+   */
+  public void flushOutgoingBatches(final boolean isLastBatch, final boolean 
schemaChanged) throws IOException {
+    executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, 
schemaChanged));
+  }
+
+  /**
+   * decorator method to call multiple Partitioners initialize()
+   */
+  public void initialize() {
+    for (Partitioner part : partitioners ) {
+      part.initialize();
+    }
+  }
+
+  /**
+   * decorator method to call multiple Partitioners clear()
+   */
+  public void clear() {
+    for (Partitioner part : partitioners ) {
+      part.clear();
+    }
+  }
+
+  /**
+   * Helper method to get PartitionOutgoingBatch based on the index
+   * since we may have more then one Partitioner
+   * As number of Partitioners should be very small AND this method it used 
very rarely,
+   * so it is OK to loop in order to find right partitioner
+   * @param index - index of PartitionOutgoingBatch
+   * @return PartitionOutgoingBatch
+   */
+  public PartitionOutgoingBatch getOutgoingBatches(int index) {
+    for (Partitioner part : partitioners ) {
+      PartitionOutgoingBatch outBatch = part.getOutgoingBatch(index);
+      if ( outBatch != null ) {
+        return outBatch;
+      }
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  protected List<Partitioner> getPartitioners() {
+    return partitioners;
+  }
+
+  /**
+   * Helper to execute the different methods wrapped into same logic
+   * @param iface
+   * @throws IOException
+   */
+  protected void executeMethodLogic(final GeneralExecuteIface iface) throws 
IOException {
+    if (partitioners.size() == 1 ) {
+      // no need for threads
+      final OperatorStats localStatsSingle = partitioners.get(0).getStats();
+      localStatsSingle.clear();
+      localStatsSingle.startProcessing();
+      try {
+        iface.execute(partitioners.get(0));
+      } finally {
+        localStatsSingle.stopProcessing();
+        stats.mergeMetrics(localStatsSingle);
+        // since main stats did not have any wait time - adjust based of 
partitioner stats wait time
+        // main stats processing time started recording in BaseRootExec
+        stats.adjustWaitNanos(localStatsSingle.getWaitNanos());
+      }
+      return;
+    }
+
+    long maxProcessTime = 0l;
+    // start waiting on main stats to adjust by sum(max(processing)) at the end
+    stats.startWait();
+    final CountDownLatch latch = new CountDownLatch(partitioners.size());
+    final List<CustomRunnable> runnables = Lists.newArrayList();
+    try {
+      int i = 0;
+      for (final Partitioner part : partitioners ) {
+        runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, 
part));
+        executor.submit(runnables.get(i++));
+      }
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      IOException excep = null;
+      for (final CustomRunnable runnable : runnables ) {
+        IOException myException = runnable.getException();
+        if ( myException != null ) {
+          if ( excep == null ) {
+            excep = myException;
+          } else {
+            excep.addSuppressed(myException);
+          }
+        }
+        final OperatorStats localStats = runnable.getPart().getStats();
+        long currentProcessingNanos = localStats.getProcessingNanos();
+        // find out max Partitioner processing time
+        maxProcessTime = (currentProcessingNanos > maxProcessTime) ? 
currentProcessingNanos : maxProcessTime;
+        stats.mergeMetrics(localStats);
+      }
+      if ( excep != null ) {
+        throw excep;
+      }
+    } finally {
+      stats.stopWait();
+      // scale down main stats wait time based on calculated processing time
+      // since we did not wait for whole duration of above execution
+      stats.adjustWaitNanos(-maxProcessTime);
+    }
+
+  }
+
+  /**
+   * Helper interface to generalize functionality executed in the thread
+   * since it is absolutely the same for partitionBatch and 
flushOutgoingBatches
+   * protected is for testing purposes
+   */
+  protected interface GeneralExecuteIface {
+    public void execute(Partitioner partitioner) throws IOException;
+  }
+
+  /**
+   * Class to handle running partitionBatch method
+   *
+   */
+  private static class PartitionBatchHandlingClass implements 
GeneralExecuteIface {
+
+    private final RecordBatch incoming;
+
+    public PartitionBatchHandlingClass(RecordBatch incoming) {
+      this.incoming = incoming;
+    }
+
+    @Override
+    public void execute(Partitioner part) throws IOException {
+      part.partitionBatch(incoming);
+    }
+  }
+
+  /**
+   * Class to handle running flushOutgoingBatches method
+   *
+   */
+  private static class FlushBatchesHandlingClass implements 
GeneralExecuteIface {
+
+    private final boolean isLastBatch;
+    private final boolean schemaChanged;
+
+    public FlushBatchesHandlingClass(boolean isLastBatch, boolean 
schemaChanged) {
+      this.isLastBatch = isLastBatch;
+      this.schemaChanged = schemaChanged;
+    }
+
+    @Override
+    public void execute(Partitioner part) throws IOException {
+      part.flushOutgoingBatches(isLastBatch, schemaChanged);
+    }
+  }
+
+  /**
+   * Helper class to wrap Runnable with customized naming
+   * Exception handling
+   *
+   */
+  private static class CustomRunnable implements Runnable {
+
+    private final String parentThreadName;
+    private final CountDownLatch latch;
+    private final GeneralExecuteIface iface;
+    private final Partitioner part;
+    private volatile IOException exp;
+
+    public CustomRunnable(String parentThreadName, CountDownLatch latch, 
GeneralExecuteIface iface, Partitioner part) {
+      this.parentThreadName = parentThreadName;
+      this.latch = latch;
+      this.iface = iface;
+      this.part = part;
+    }
+
+    @Override
+    public void run() {
+      final Thread currThread = Thread.currentThread();
+      final String currThreadName = currThread.getName();
+      final OperatorStats localStats = part.getStats();
+      try {
+        final String newThreadName = parentThreadName + currThread.getId();
+        currThread.setName(newThreadName);
+        localStats.clear();
+        localStats.startProcessing();
+        iface.execute(part);
+      } catch (IOException e) {
+        exp = e;
+      } finally {
+        localStats.stopProcessing();
+        currThread.setName(currThreadName);
+        latch.countDown();
+      }
+    }
+
+    public IOException getException() {
+      return this.exp;
+    }
+
+    public Partitioner getPart() {
+      return part;
+    }
+  }
+ }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 93d719c..33d6f95 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -61,6 +61,9 @@ public abstract class PartitionerTemplate implements 
Partitioner {
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
   private RecordBatch incoming;
+  private OperatorStats stats;
+  private int start;
+  private int end;
   private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
 
   private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE;
@@ -74,15 +77,27 @@ public abstract class PartitionerTemplate implements 
Partitioner {
   }
 
   @Override
+  public PartitionOutgoingBatch getOutgoingBatch(int index) {
+    if ( index >= start && index < end) {
+      return outgoingBatches.get(index - start);
+    }
+    return null;
+  }
+
+  @Override
   public final void setup(FragmentContext context,
                           RecordBatch incoming,
                           HashPartitionSender popConfig,
                           OperatorStats stats,
                           SendingAccountor sendingAccountor,
                           OperatorContext oContext,
-                          StatusHandler statusHandler) throws 
SchemaChangeException {
+                          StatusHandler statusHandler,
+                          int start, int end) throws SchemaChangeException {
 
     this.incoming = incoming;
+    this.stats = stats;
+    this.start = start;
+    this.end = end;
     doSetup(context, incoming, null);
 
     // Half the outgoing record batch size if the number of senders exceeds 
1000 to reduce the total amount of memory
@@ -92,9 +107,15 @@ public abstract class PartitionerTemplate implements 
Partitioner {
       outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1;
     }
 
+    int fieldId = 0;
     for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
-      outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, 
popConfig,
+      // create outgoingBatches only for subset of Destination Points
+      if ( fieldId >= start && fieldId < end ) {
+        logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId);
+        outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, 
popConfig,
           context.getDataTunnel(destination.getEndpoint()), context, 
oContext.getAllocator(), destination.getId(), statusHandler));
+      }
+      fieldId++;
     }
 
     for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) {
@@ -119,6 +140,11 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     }
   }
 
+  @Override
+  public OperatorStats getStats() {
+    return stats;
+  }
+
   /**
    * Flush each outgoing record batch, and optionally reset the state of each 
outgoing record
    * batch (on schema change).  Note that the schema is updated based on 
incoming at the time
@@ -150,24 +176,21 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     switch(svMode) {
       case NONE:
         for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
-          OutgoingRecordBatch outgoingBatch = 
outgoingBatches.get(doEval(recordId));
-          outgoingBatch.copy(recordId);
+          doCopy(recordId);
         }
         break;
 
       case TWO_BYTE:
         for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
           int svIndex = sv2.getIndex(recordId);
-          OutgoingRecordBatch outgoingBatch = 
outgoingBatches.get(doEval(svIndex));
-          outgoingBatch.copy(svIndex);
+          doCopy(svIndex);
         }
         break;
 
       case FOUR_BYTE:
         for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
           int svIndex = sv4.get(recordId);
-          OutgoingRecordBatch outgoingBatch = 
outgoingBatches.get(doEval(svIndex));
-          outgoingBatch.copy(svIndex);
+          doCopy(svIndex);
         }
         break;
 
@@ -176,6 +199,20 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     }
   }
 
+  /**
+   * Helper method to copy data based on partition
+   * @param svIndex
+   * @param incoming
+   * @throws IOException
+   */
+  private void doCopy(int svIndex) throws IOException {
+    int index = doEval(svIndex);
+    if ( index >= start && index < end) {
+      OutgoingRecordBatch outgoingBatch = outgoingBatches.get(index - start);
+      outgoingBatch.copy(svIndex);
+    }
+  }
+
   @Override
   public void clear() {
     for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) {
@@ -372,5 +409,6 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     public void clear(){
       vectorContainer.clear();
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 9b0944e..edec7e4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -49,6 +49,7 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
       PhysicalOperator child = exchange.getChild().accept(this, iNode);
       PhysicalOperator materializedSender = 
exchange.getSender(iNode.getMinorFragmentId(), child);
       materializedSender.setOperatorId(0);
+      materializedSender.setCost(exchange.getCost());
 //      logger.debug("Visit sending exchange, materialized {} with child {}.", 
materializedSender, child);
       return materializedSender;
 
@@ -57,6 +58,7 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
       PhysicalOperator materializedReceiver = 
exchange.getReceiver(iNode.getMinorFragmentId());
       materializedReceiver.setOperatorId(Short.MAX_VALUE & 
exchange.getOperatorId());
 //      logger.debug("Visit receiving exchange, materialized receiver: {}.", 
materializedReceiver);
+      materializedReceiver.setCost(exchange.getCost());
       return materializedReceiver;
     }
   }
@@ -100,6 +102,7 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
       children.add(child.accept(this, iNode));
     }
     PhysicalOperator newOp = op.getNewWithChildren(children);
+    newOp.setCost(op.getCost());
     newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId());
     return newOp;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index abbc910..f320157 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -49,6 +49,9 @@ public class PlannerSettings implements Context{
   public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new 
RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 
Double.MAX_VALUE, 1.0d);
   public static final OptionValidator MUX_EXCHANGE = new 
BooleanValidator("planner.enable_mux_exchange", true);
   public static final OptionValidator DEMUX_EXCHANGE = new 
BooleanValidator("planner.enable_demux_exchange", false);
+  public static final OptionValidator PARTITION_SENDER_THREADS_FACTOR = new 
LongValidator("planner.partitioner_sender_threads_factor", 1);
+  public static final OptionValidator PARTITION_SENDER_MAX_THREADS = new 
LongValidator("planner.partitioner_sender_max_threads", 8);
+  public static final OptionValidator PARTITION_SENDER_SET_THREADS = new 
LongValidator("planner.partitioner_sender_set_threads", -1);
   public static final OptionValidator PRODUCER_CONSUMER = new 
BooleanValidator("planner.add_producer_consumer", false);
   public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new 
LongValidator("planner.producer_consumer_queue_size", 10);
   public static final OptionValidator HASH_SINGLE_KEY = new 
BooleanValidator("planner.enable_hash_single_key", true);

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index b0b478e..1b35aec 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.server;
 import io.netty.channel.EventLoopGroup;
 
 import java.util.Collection;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.compile.CodeCompiler;
@@ -57,8 +58,10 @@ public class DrillbitContext {
   private final SystemOptionManager systemOptions;
   private final PStoreProvider provider;
   private final CodeCompiler compiler;
+  private final ExecutorService executor;
 
-  public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, 
ClusterCoordinator coord, Controller controller, DataConnectionCreator 
connectionsPool, WorkEventBus workBus, PStoreProvider provider) {
+  public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, 
ClusterCoordinator coord, Controller controller, DataConnectionCreator 
connectionsPool, WorkEventBus workBus, PStoreProvider provider,
+      ExecutorService executor) {
     super();
     Preconditions.checkNotNull(endpoint);
     Preconditions.checkNotNull(context);
@@ -71,6 +74,7 @@ public class DrillbitContext {
     this.connectionsPool = connectionsPool;
     this.endpoint = endpoint;
     this.provider = provider;
+    this.executor = executor;
     this.storagePlugins = new StoragePluginRegistry(this);
     this.reader = new PhysicalPlanReader(context.getConfig(), 
context.getConfig().getMapper(), endpoint, storagePlugins);
     this.operatorCreatorRegistry = new 
OperatorCreatorRegistry(context.getConfig());
@@ -154,4 +158,8 @@ public class DrillbitContext {
   public CodeCompiler getCompiler() {
     return compiler;
   }
+
+  public ExecutorService getExecutor() {
+    return executor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index c5d4656..fa4725d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -59,6 +59,9 @@ public class SystemOptionManager implements OptionManager {
       PlannerSettings.IDENTIFIER_MAX_LENGTH,
       PlannerSettings.HASH_JOIN_SWAP,
       PlannerSettings.HASH_JOIN_SWAP_MARGIN_FACTOR,
+      PlannerSettings.PARTITION_SENDER_THREADS_FACTOR,
+      PlannerSettings.PARTITION_SENDER_MAX_THREADS,
+      PlannerSettings.PARTITION_SENDER_SET_THREADS,
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 99c6ab8..125c56d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -101,9 +101,9 @@ public class WorkManager implements Closeable {
 
   public void start(DrillbitEndpoint endpoint, Controller controller,
       DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider 
provider) {
-    this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, 
data, workBus, provider);
-    // executor = 
Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
     this.executor = Executors.newCachedThreadPool(new 
NamedThreadFactory("WorkManager-"));
+    this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, 
data, workBus, provider, executor);
+    // executor = 
Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
     this.controller = controller;
     this.eventThread.start();
     this.statusThread.start();
@@ -130,6 +130,10 @@ public class WorkManager implements Closeable {
     }
   }
 
+  public ExecutorService getExecutor() {
+    return executor;
+  }
+
   public WorkEventBus getWorkBus() {
     return workBus;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 4aaaa78..2a0aedc 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -104,7 +104,7 @@ public class TestOptiqPlans extends ExecTest {
       }
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
-    DrillbitContext bitContext = new 
DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, 
controller, com, workBus, new LocalPStoreProvider(DrillConfig.create()));
+    DrillbitContext bitContext = new 
DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, 
controller, com, workBus, new LocalPStoreProvider(DrillConfig.create()), null);
     QueryContext qc = new 
QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(),
 QueryId.getDefaultInstance(), bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = 
reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), 
Charsets.UTF_8));

http://git-wip-us.apache.org/repos/asf/drill/blob/49d316a1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
new file mode 100644
index 0000000..bdb020b
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.QueryDateTimeInfo;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
+import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
+import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared.MetricValue;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+/**
+ * PartitionerSenderRootExec test to cover mostly part that deals with 
multithreaded
+ * ability to copy and flush data
+ *
+ */
+public class TestPartitionSender extends PlanTestBase {
+
+  private static final SimpleParallelizer PARALLELIZER = new 
SimpleParallelizer(
+      1 /*parallelizationThreshold (slice_count)*/,
+      6 /*maxWidthPerNode*/,
+      1000 /*maxGlobalWidth*/,
+      1.2 /*affinityFactor*/);
+
+  private final static UserSession USER_SESSION = 
UserSession.Builder.newBuilder()
+      
.withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .build();
+
+
+  public static TemporaryFolder testTempFolder = new TemporaryFolder();
+
+  private final static int NUM_DEPTS = 40;
+  private final static int NUM_EMPLOYEES = 1000;
+  private final static int DRILLBITS_COUNT = 3;
+
+  private static String empTableLocation;
+
+  private static String groupByQuery;
+
+  @BeforeClass
+  public static void setupTempFolder() throws IOException {
+    testTempFolder.create();
+  }
+
+  @BeforeClass
+  public static void generateTestDataAndQueries() throws Exception {
+    // Table consists of two columns "emp_id", "emp_name" and "dept_id"
+    empTableLocation = testTempFolder.newFolder().getAbsolutePath();
+
+    // Write 100 records for each new file
+    final int empNumRecsPerFile = 100;
+    for(int fileIndex=0; fileIndex<NUM_EMPLOYEES/empNumRecsPerFile; 
fileIndex++) {
+      File file = new File(empTableLocation + File.separator + fileIndex + 
".json");
+      PrintWriter printWriter = new PrintWriter(file);
+      for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < 
(fileIndex+1)*empNumRecsPerFile; recordIndex++) {
+        String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : 
\"Employee %d\", \"dept_id\" : %d }",
+            recordIndex, recordIndex, recordIndex % NUM_DEPTS);
+        printWriter.println(record);
+      }
+      printWriter.close();
+    }
+
+    // Initialize test queries
+    groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees 
FROM dfs.`%s` GROUP BY dept_id", empTableLocation);
+  }
+
+  @AfterClass
+  public static void cleanupTempFolder() throws IOException {
+    testTempFolder.delete();
+  }
+
+  @Test
+  /**
+   * Main test to go over different scenarios
+   * @throws Exception
+   */
+  public void testPartitionSenderCostToThreads() throws Exception {
+
+    final VectorContainer container = new VectorContainer();
+    container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+    final SelectionVector4 sv = Mockito.mock(SelectionVector4.class, 
"SelectionVector4");
+    Mockito.when(sv.getCount()).thenReturn(100);
+    Mockito.when(sv.getTotalCount()).thenReturn(100);
+    for (int i = 0; i < 100; i++ ) {
+      Mockito.when(sv.get(i)).thenReturn(i);
+    }
+
+    final TopNBatch.SimpleRecordBatch incoming = new 
TopNBatch.SimpleRecordBatch(container, sv, null);
+
+    setDrillbitCount(DRILLBITS_COUNT);
+
+    test("ALTER SESSION SET `planner.slice_target`=1");
+    String plan = getPlanInString("EXPLAIN PLAN FOR " + groupByQuery, 
JSON_FORMAT);
+    System.out.println("Plan: " + plan);
+
+    final DrillbitContext drillbitContext = getDrillbitContext();
+    final PhysicalPlanReader planReader = drillbitContext.getPlanReader();
+    final PhysicalPlan physicalPlan = planReader.readPhysicalPlan(plan);
+    final Fragment rootFragment = 
PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
+    final PlanningSet planningSet = new PlanningSet();
+    final FunctionImplementationRegistry registry = new 
FunctionImplementationRegistry(config);
+
+    // Create a planningSet to get the assignment of major fragment ids to 
fragments.
+    PARALLELIZER.initFragmentWrappers(rootFragment, planningSet);
+
+    final List<PhysicalOperator> operators = 
physicalPlan.getSortedOperators(false);
+
+    // get HashToRandomExchange physical operator
+    HashToRandomExchange hashToRandomExchange = null;
+    for ( PhysicalOperator operator : operators) {
+      if ( operator instanceof HashToRandomExchange) {
+        hashToRandomExchange = (HashToRandomExchange) operator;
+        break;
+      }
+    }
+
+    final OptionList options = new OptionList();
+    // try multiple scenarios with different set of options
+    options.add(OptionValue.createLong(OptionType.SESSION, 
"planner.slice_target", 1));
+    testThreadsHelper(hashToRandomExchange, drillbitContext, options,
+        incoming, registry, planReader, planningSet, rootFragment, 2);
+
+    options.clear();
+    options.add(OptionValue.createLong(OptionType.SESSION, 
"planner.slice_target", 1));
+    options.add(OptionValue.createLong(OptionType.SESSION, 
"planner.partitioner_sender_max_threads", 10));
+    hashToRandomExchange.setCost(1000);
+    testThreadsHelper(hashToRandomExchange, drillbitContext, options,
+        incoming, registry, planReader, planningSet, rootFragment, 10);
+
+    options.clear();
+    options.add(OptionValue.createLong(OptionType.SESSION, 
"planner.slice_target", 1000));
+    options.add(OptionValue.createLong(OptionType.SESSION, 
"planner.partitioner_sender_threads_factor",2));
+    hashToRandomExchange.setCost(14000);
+    testThreadsHelper(hashToRandomExchange, drillbitContext, options,
+        incoming, registry, planReader, planningSet, rootFragment, 2);
+  }
+
+  /**
+   * Core of the testing
+   * @param hashToRandomExchange
+   * @param drillbitContext
+   * @param options
+   * @param incoming
+   * @param registry
+   * @param planReader
+   * @param planningSet
+   * @param rootFragment
+   * @param expectedThreadsCount
+   * @throws Exception
+   */
+  private void testThreadsHelper(HashToRandomExchange hashToRandomExchange, 
DrillbitContext drillbitContext, OptionList options,
+      RecordBatch incoming, FunctionImplementationRegistry registry, 
PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment,
+      int expectedThreadsCount) throws Exception {
+
+    long queryStartTime = System.currentTimeMillis();
+    int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
+    QueryDateTimeInfo queryDateTimeInfo = new 
QueryDateTimeInfo(queryStartTime, timeZone);
+
+    final QueryWorkUnit qwu = PARALLELIZER.getFragments(options, 
drillbitContext.getEndpoint(),
+        QueryId.getDefaultInstance(),
+        drillbitContext.getBits(), planReader, rootFragment, USER_SESSION, 
queryDateTimeInfo);
+
+    final List<MinorFragmentEndpoint> mfEndPoints = 
PhysicalOperatorUtil.getIndexOrderedEndpoints(Lists.newArrayList(drillbitContext.getBits()));
+
+    for(PlanFragment planFragment : qwu.getFragments()) {
+      if (!planFragment.getFragmentJson().contains("hash-partition-sender")) {
+        continue;
+      }
+      MockPartitionSenderRootExec partionSenderRootExec = null;
+      FragmentContext context = null;
+      try {
+        context = new FragmentContext(drillbitContext, planFragment, null, 
registry);
+        final int majorFragmentId = 
planFragment.getHandle().getMajorFragmentId();
+        final HashPartitionSender partSender = new 
HashPartitionSender(majorFragmentId, hashToRandomExchange, 
hashToRandomExchange.getExpression(), mfEndPoints);
+        partionSenderRootExec = new MockPartitionSenderRootExec(context, 
incoming, partSender);
+        assertEquals("Number of threads calculated", expectedThreadsCount, 
partionSenderRootExec.getNumberPartitions());
+
+        partionSenderRootExec.createPartitioner();
+        final PartitionerDecorator partDecor = 
partionSenderRootExec.getPartitioner();
+        assertNotNull(partDecor);
+
+        List<Partitioner> partitioners = partDecor.getPartitioners();
+        assertNotNull(partitioners);
+        final int actualThreads = DRILLBITS_COUNT > expectedThreadsCount ? 
expectedThreadsCount : DRILLBITS_COUNT;
+        assertEquals("Number of partitioners", actualThreads, 
partitioners.size());
+
+        for ( int i = 0; i < mfEndPoints.size(); i++) {
+          assertNotNull("PartitionOutgoingBatch", 
partDecor.getOutgoingBatches(i));
+        }
+
+        // check distribution of PartitionOutgoingBatch - should be even 
distribution
+        boolean isFirst = true;
+        int prevBatchCountSize = 0;
+        int batchCountSize = 0;
+        for (Partitioner part : partitioners ) {
+          final List<PartitionOutgoingBatch> outBatch = 
(List<PartitionOutgoingBatch>) part.getOutgoingBatches();
+          batchCountSize = outBatch.size();
+          if ( !isFirst ) {
+            assertTrue(Math.abs(batchCountSize - prevBatchCountSize) <= 1);
+          } else {
+            isFirst = false;
+          }
+          prevBatchCountSize = batchCountSize;
+        }
+
+        partionSenderRootExec.getStats().startProcessing();
+        try {
+          partDecor.partitionBatch(incoming);
+        } finally {
+          partionSenderRootExec.getStats().stopProcessing();
+        }
+        if ( actualThreads == 1 ) {
+          assertEquals("With single thread parent and child waitNanos should 
match", partitioners.get(0).getStats().getWaitNanos(), 
partionSenderRootExec.getStats().getWaitNanos());
+        }
+
+        // testing values distribution
+        partitioners = partDecor.getPartitioners();
+        isFirst = true;
+        // since we have fake Nullvector distribution is skewed
+        for (Partitioner part : partitioners ) {
+          final List<PartitionOutgoingBatch> outBatches = 
(List<PartitionOutgoingBatch>) part.getOutgoingBatches();
+          for (PartitionOutgoingBatch partOutBatch : outBatches ) {
+            final int recordCount = ((VectorAccessible) 
partOutBatch).getRecordCount();
+            if ( isFirst ) {
+              assertEquals("RecordCount", 100, recordCount);
+              isFirst = false;
+            } else {
+              assertEquals("RecordCount", 0, recordCount);
+            }
+          }
+        }
+        // test exceptions within threads
+        // test stats merging
+        partionSenderRootExec.getStats().startProcessing();
+        try {
+          partDecor.executeMethodLogic(new InjectExceptionTest());
+          fail("Should throw IOException here");
+        } catch (IOException ioe) {
+          final OperatorProfile.Builder oPBuilder = 
OperatorProfile.newBuilder();
+          partionSenderRootExec.getStats().addAllMetrics(oPBuilder);
+          final List<MetricValue> metrics = oPBuilder.getMetricList();
+          for ( MetricValue metric : metrics) {
+            if ( Metric.BYTES_SENT.metricId() == metric.getMetricId() ) {
+              assertEquals("Should add metricValue irrespective of exception", 
5*actualThreads, metric.getLongValue());
+            }
+            if (Metric.SENDING_THREADS_COUNT.metricId() == 
metric.getMetricId()) {
+              assertEquals(actualThreads, metric.getLongValue());
+            }
+          }
+          assertEquals(actualThreads-1, ioe.getSuppressed().length);
+        } finally {
+          partionSenderRootExec.getStats().stopProcessing();
+        }
+      } finally {
+        // cleanup
+        partionSenderRootExec.close();
+        context.close();
+      }
+    }
+  }
+
+  @Test
+  /**
+   * Testing partitioners distribution algorithm
+   * @throws Exception
+   */
+  public void testAlgorithm() throws Exception {
+    int outGoingBatchCount;
+    int numberPartitions;
+    int k = 0;
+    final Random rand = new Random();
+    while ( k < 1000 ) {
+      outGoingBatchCount = rand.nextInt(1000)+1;
+      numberPartitions = rand.nextInt(32)+1;
+      final int actualPartitions = outGoingBatchCount > numberPartitions ? 
numberPartitions : outGoingBatchCount;
+      final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
+
+      final int longTail = outGoingBatchCount % actualPartitions;
+      int startIndex = 0;
+      int endIndex = 0;
+      for (int i = 0; i < actualPartitions; i++) {
+        startIndex = endIndex;
+        endIndex = startIndex + divisor;
+        if ( i < longTail ) {
+          endIndex++;
+        }
+      }
+      assertTrue("endIndex can not be > outGoingBatchCount", endIndex == 
outGoingBatchCount );
+      k++;
+    }
+  }
+
+  /**
+   * Helper class to expose some functionality of PartitionSenderRootExec
+   *
+   */
+  private static class MockPartitionSenderRootExec extends 
PartitionSenderRootExec {
+
+    public MockPartitionSenderRootExec(FragmentContext context,
+        RecordBatch incoming, HashPartitionSender operator)
+        throws OutOfMemoryException {
+      super(context, incoming, operator);
+    }
+
+    public void close() {
+      oContext.close();
+    }
+
+    public int getNumberPartitions() {
+      return numberPartitions;
+    }
+
+    public OperatorStats getStats() {
+      return this.stats;
+    }
+  }
+
+  /**
+   * Helper class to inject exceptions in the threads
+   *
+   */
+  private static class InjectExceptionTest implements GeneralExecuteIface {
+
+    @Override
+    public void execute(Partitioner partitioner) throws IOException {
+      // throws IOException
+      partitioner.getStats().addLongStat(Metric.BYTES_SENT, 5);
+      throw new IOException("Test exception handling");
+    }
+  }
+}

Reply via email to