[ASTERIXDB-2149] Enable multiple normalized keys in sort

- user model changes: no
- storage format changes: no
- interface changes: yes. The interface of sort is changed.

Currently, during the (in-memory) sort, we use an int normalized keys to
speed up comparisions by avoiding random memory accesses. However, this
technique is inefficient if the first 4 bytes of the sorting keys are
not distinctive. From performance point of view, it's better to use
longer normalized keys when it's possible (2-3x improvements).

This is enabled by this patch by:
- Allowing multiple normalized keys during sort, and the length of each
normalized key can be longer (multiple integers).
- Enable memory budgeting of pointer directories as well during sort
(but for performance, we still use int[], instead of byte[] from frame).

The next patch will enable the AsterixDB layer to use this feature to
speed up sort performance.

Change-Id: I4354242ff731b4b006b8446b58f65873047dde78
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2127
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ed469381
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ed469381
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ed469381

Branch: refs/heads/master
Commit: ed469381235990ce5ecd2f242b679190ef2ca263
Parents: c5a0a19
Author: luochen01 <cl...@uci.edu>
Authored: Mon Nov 27 11:14:01 2017 -0800
Committer: Luo Chen <cl...@uci.edu>
Committed: Mon Nov 27 13:18:52 2017 -0800

----------------------------------------------------------------------
 .../sort/InMemorySortRuntimeFactory.java        |  18 +-
 .../tests/pushruntime/PushRuntimeTest.java      |   7 +-
 .../dataflow/value/INormalizedKeyComputer.java  |   5 +
 .../value/INormalizedKeyComputerFactory.java    |  17 ++
 .../IntegerNormalizedKeyComputerFactory.java    |   8 +-
 .../VariableDeletableTupleMemoryManager.java    |   4 +-
 .../sort/ExternalSortGroupByRunGenerator.java   |  29 ++-
 .../sort/SortGroupByOperatorDescriptor.java     |  65 ++++--
 .../sort/AbstractExternalSortRunGenerator.java  |  24 +--
 .../dataflow/std/sort/AbstractFrameSorter.java  | 199 ++++++++++++++-----
 .../sort/AbstractSorterOperatorDescriptor.java  |  10 +-
 .../sort/ExternalSortOperatorDescriptor.java    |  30 ++-
 .../std/sort/ExternalSortRunGenerator.java      |  22 +-
 .../dataflow/std/sort/FrameSorterMergeSort.java |  46 ++---
 .../dataflow/std/sort/FrameSorterQuickSort.java |  38 ++--
 .../dataflow/std/sort/HeapSortRunGenerator.java |  19 +-
 .../std/sort/HybridTopKSortRunGenerator.java    |  12 +-
 .../sort/InMemorySortOperatorDescriptor.java    |  13 +-
 .../std/sort/TopKSorterOperatorDescriptor.java  |  13 +-
 .../dataflow/std/sort/TupleSorterHeapSort.java  |  97 ++++++---
 .../tests/integration/HeapSortMergeTest.java    |  85 ++++----
 .../tests/unit/AbstractRunGeneratorTest.java    |  99 +++++----
 .../unit/ExternalSortRunGeneratorTest.java      |  21 +-
 .../tests/unit/HeapSortRunGeneratorTest.java    |  26 ++-
 .../tests/unit/HybridSortRunGenerator.java      |  34 ----
 .../tests/unit/HybridSortRunGeneratorTest.java  |  48 +++++
 .../tests/unit/RunMergingFrameReaderTest.java   |  30 +--
 .../tests/unit/TopKRunGeneratorTest.java        |  35 ++--
 .../examples/text/client/WordCountMain.java     |  32 +--
 .../hyracks/examples/tpch/client/Sort.java      |  14 +-
 30 files changed, 703 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 925ff93..bb8223d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -39,18 +39,26 @@ public class InMemorySortRuntimeFactory extends 
AbstractOneInputOneOutputRuntime
     private static final long serialVersionUID = 1L;
 
     private final int[] sortFields;
-    private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
 
     public InMemorySortRuntimeFactory(int[] sortFields, 
INormalizedKeyComputerFactory firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, int[] 
projectionList) {
+        this(sortFields,
+                firstKeyNormalizerFactory != null ? new 
INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, projectionList);
+    }
+
+    public InMemorySortRuntimeFactory(int[] sortFields, 
INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, int[] 
projectionList) {
         super(projectionList);
         // Obs: the projection list is currently ignored.
         if (projectionList != null) {
             throw new NotImplementedException("Cannot push projection into 
InMemorySortRuntime.");
         }
         this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.keyNormalizerFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
     }
 
@@ -67,8 +75,8 @@ public class InMemorySortRuntimeFactory extends 
AbstractOneInputOneOutputRuntime
                     IFrameBufferManager manager = new 
VariableFrameMemoryManager(
                             new VariableFramePool(ctx, 
VariableFramePool.UNLIMITED_MEMORY),
                             
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
-                    frameSorter = new FrameSorterMergeSort(ctx, manager, 
sortFields, firstKeyNormalizerFactory,
-                            comparatorFactories, outputRecordDesc);
+                    frameSorter = new FrameSorterMergeSort(ctx, manager, 
VariableFramePool.UNLIMITED_MEMORY, sortFields,
+                            keyNormalizerFactories, comparatorFactories, 
outputRecordDesc);
                 }
                 frameSorter.reset();
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 34ed142..cc4c1b9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -64,6 +64,7 @@ import 
org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileReference;
@@ -722,7 +723,8 @@ public class PushRuntimeTest {
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
 
         // the algebricks op.
-        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new 
int[] { 1 }, null,
+        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new 
int[] { 1 },
+                (INormalizedKeyComputerFactory) null,
                 new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 null);
         RecordDescriptor sortDesc = scannerDesc;
@@ -836,7 +838,8 @@ public class PushRuntimeTest {
 
         // the sort (by nation id)
         RecordDescriptor sortDesc = scannerDesc;
-        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new 
int[] { 3 }, null,
+        InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new 
int[] { 3 },
+                (INormalizedKeyComputerFactory) null,
                 new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, null);
 
         // the group-by

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
index 2c79a4d..7bf8255 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
@@ -20,4 +20,9 @@ package org.apache.hyracks.api.dataflow.value;
 
 public interface INormalizedKeyComputer {
     public int normalize(byte[] bytes, int start, int length);
+
+    default void normalize(byte[] bytes, int start, int length, int[] 
normalizedKeys, int keyStart) {
+        int key = normalize(bytes, start, length);
+        normalizedKeys[keyStart] = key;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
index 2b7198b..901702e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
@@ -22,4 +22,21 @@ import java.io.Serializable;
 
 public interface INormalizedKeyComputerFactory extends Serializable {
     public INormalizedKeyComputer createNormalizedKeyComputer();
+
+    /**
+     *
+     * @return The length of the normalized key in terms of integers
+     */
+    default int getNormalizedKeyLength() {
+        return 1;
+    }
+
+    /**
+     *
+     * @return Whether we can solely rely on this normalized key to complete 
comparison,
+     *         even when two normalized keys are equal
+     */
+    default boolean isDecisive() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
index 5cfef28..41c0740 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.dataflow.common.data.normalizers;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerNormalizedKeyComputerFactory implements 
INormalizedKeyComputerFactory {
     private static final long serialVersionUID = 1L;
@@ -32,8 +31,13 @@ public class IntegerNormalizedKeyComputerFactory implements 
INormalizedKeyComput
             @Override
             public int normalize(byte[] bytes, int start, int length) {
                 int value = IntegerPointable.getInteger(bytes, start);
-                return value ^Integer.MIN_VALUE;
+                return value ^ Integer.MIN_VALUE;
             }
         };
     }
+
+    @Override
+    public boolean isDecisive() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
index 4ed11e6..5a59b5d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -170,8 +170,8 @@ public class VariableDeletableTupleMemoryManager implements 
IDeletableTupleBuffe
     @Override
     public ITuplePointerAccessor createTuplePointerAccessor() {
         return new AbstractTuplePointerAccessor() {
-            private IAppendDeletableFrameTupleAccessor bufferAccessor = new 
DeletableFrameTupleAppender(
-                    recordDescriptor);
+            private final IAppendDeletableFrameTupleAccessor bufferAccessor =
+                    new DeletableFrameTupleAppender(recordDescriptor);
 
             @Override
             IFrameTupleAccessor getInnerAccessor() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
index 3f10f50..0b915dc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -50,16 +50,25 @@ public class ExternalSortGroupByRunGenerator extends 
AbstractExternalSortRunGene
             int framesLimit, int[] groupFields, INormalizedKeyComputerFactory 
firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor outRecordDesc, Algorithm alg) throws 
HyracksDataException {
-        this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, 
firstKeyNormalizerFactory, comparatorFactories,
+        this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields,
+                firstKeyNormalizerFactory != null ? new 
INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, aggregatorFactory, outRecordDesc, alg, 
EnumFreeSlotPolicy.LAST_FIT);
+    }
+
+    public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] 
sortFields, RecordDescriptor inputRecordDesc,
+            int framesLimit, int[] groupFields, 
INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor outRecordDesc, Algorithm alg) throws 
HyracksDataException {
+        this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, 
keyNormalizerFactories, comparatorFactories,
                 aggregatorFactory, outRecordDesc, alg, 
EnumFreeSlotPolicy.LAST_FIT);
     }
 
     public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] 
sortFields, RecordDescriptor inputRecordDesc,
-            int framesLimit, int[] groupFields, INormalizedKeyComputerFactory 
firstKeyNormalizerFactory,
+            int framesLimit, int[] groupFields, 
INormalizedKeyComputerFactory[] keyNormalizerFactories,
             IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor outRecordDesc, Algorithm alg, EnumFreeSlotPolicy 
policy) throws HyracksDataException {
-        super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, 
inputRecordDesc, alg, policy,
-                framesLimit);
+        super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, 
inputRecordDesc, alg, policy, framesLimit);
 
         this.groupFields = groupFields;
         this.comparatorFactories = comparatorFactories;
@@ -70,20 +79,20 @@ public class ExternalSortGroupByRunGenerator extends 
AbstractExternalSortRunGene
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                ExternalSortGroupByRunGenerator.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                
.createManagedWorkspaceFile(ExternalSortGroupByRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 
     @Override
     protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) 
throws HyracksDataException {
         //create group-by comparators
-        IBinaryComparator[] comparators = new IBinaryComparator[Math
-                .min(groupFields.length, comparatorFactories.length)];
+        IBinaryComparator[] comparators =
+                new IBinaryComparator[Math.min(groupFields.length, 
comparatorFactories.length)];
         for (int i = 0; i < comparators.length; i++) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        return new PreclusteredGroupWriter(ctx, groupFields, comparators, 
aggregatorFactory,
-                this.inRecordDesc, this.outRecordDesc, writer, true);
+        return new PreclusteredGroupWriter(ctx, groupFields, comparators, 
aggregatorFactory, this.inRecordDesc,
+                this.outRecordDesc, writer, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index da5b4a8..23e47f0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -55,39 +55,74 @@ public class SortGroupByOperatorDescriptor extends 
AbstractSorterOperatorDescrip
     private final RecordDescriptor partialAggRecordDesc;
     private final RecordDescriptor outputRecordDesc;
     private final boolean finalStage;
-    private Algorithm alg = Algorithm.MERGE_SORT;
+    private static final Algorithm ALG = Algorithm.MERGE_SORT;
 
     /**
      * @param spec
-     *            , the Hyracks job specification
+     *            the Hyracks job specification
      * @param framesLimit
-     *            , the frame limit for this operator
+     *            the frame limit for this operator
      * @param sortFields
-     *            , the fields to sort
+     *            the fields to sort
      * @param groupFields
-     *            , the fields to group, which can be a prefix subset of 
sortFields
+     *            the fields to group, which can be a prefix subset of 
sortFields
      * @param firstKeyNormalizerFactory
-     *            , the normalized key computer factory of the first key
+     *            the normalized key computer factory of the first key
      * @param comparatorFactories
-     *            , the comparator factories of sort keys
+     *            the comparator factories of sort keys
      * @param partialAggregatorFactory
-     *            , for aggregating the input of this operator
+     *            for aggregating the input of this operator
      * @param mergeAggregatorFactory
-     *            , for aggregating the intermediate data of this operator
+     *            for aggregating the intermediate data of this operator
      * @param partialAggRecordDesc
-     *            , the record descriptor of intermediate data
+     *            the record descriptor of intermediate data
      * @param outRecordDesc
-     *            , the record descriptor of output data
+     *            the record descriptor of output data
      * @param finalStage
-     *            , whether the operator is used for final stage aggregation
+     *            whether the operator is used for final stage aggregation
      */
     public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
framesLimit, int[] sortFields,
             int[] groupFields, INormalizedKeyComputerFactory 
firstKeyNormalizerFactory,
             IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory partialAggregatorFactory,
             IAggregatorDescriptorFactory mergeAggregatorFactory, 
RecordDescriptor partialAggRecordDesc,
             RecordDescriptor outRecordDesc, boolean finalStage) {
+        this(spec, framesLimit, sortFields, groupFields,
+                firstKeyNormalizerFactory != null ? new 
INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, partialAggregatorFactory, 
mergeAggregatorFactory, partialAggRecordDesc,
+                outRecordDesc, finalStage);
+    }
 
-        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, outRecordDesc);
+    /**
+     * @param spec
+     *            the Hyracks job specification
+     * @param framesLimit
+     *            the frame limit for this operator
+     * @param sortFields
+     *            the fields to sort
+     * @param groupFields
+     *            the fields to group, which can be a prefix subset of 
sortFields
+     * @param keyNormalizerFactories
+     *            the normalized key computer factories for the prefix the 
sortFields
+     * @param comparatorFactories
+     *            the comparator factories of sort keys
+     * @param partialAggregatorFactory
+     *            for aggregating the input of this operator
+     * @param mergeAggregatorFactory
+     *            for aggregating the intermediate data of this operator
+     * @param partialAggRecordDesc
+     *            the record descriptor of intermediate data
+     * @param outRecordDesc
+     *            the record descriptor of output data
+     * @param finalStage
+     *            whether the operator is used for final stage aggregation
+     */
+    public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
framesLimit, int[] sortFields,
+            int[] groupFields, INormalizedKeyComputerFactory[] 
keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory partialAggregatorFactory,
+            IAggregatorDescriptorFactory mergeAggregatorFactory, 
RecordDescriptor partialAggRecordDesc,
+            RecordDescriptor outRecordDesc, boolean finalStage) {
+        super(spec, framesLimit, sortFields, keyNormalizerFactories, 
comparatorFactories, outRecordDesc);
         if (framesLimit <= 1) {
             throw new IllegalStateException();// minimum of 2 fames (1 in,1 
out)
         }
@@ -110,8 +145,8 @@ public class SortGroupByOperatorDescriptor extends 
AbstractSorterOperatorDescrip
                     IRecordDescriptorProvider recordDescriptorProvider) throws 
HyracksDataException {
                 return new ExternalSortGroupByRunGenerator(ctx, sortFields,
                         
recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), 
framesLimit,
-                        groupFields, firstKeyNormalizerFactory, 
comparatorFactories, partialAggregatorFactory,
-                        partialAggRecordDesc, alg);
+                        groupFields, keyNormalizerFactories, 
comparatorFactories, partialAggregatorFactory,
+                        partialAggRecordDesc, ALG);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
index 7c7bfec..a8cc93b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java
@@ -39,24 +39,24 @@ public abstract class AbstractExternalSortRunGenerator 
extends AbstractSortRunGe
     protected final int maxSortFrames;
 
     public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] 
sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, int framesLimit) 
throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, 
recordDesc, alg,
-                EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, 
recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT,
+                framesLimit);
     }
 
     public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] 
sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy 
policy, int framesLimit)
-                    throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, 
recordDesc, alg, policy, framesLimit,
+            throws HyracksDataException {
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, 
recordDesc, alg, policy, framesLimit,
                 Integer.MAX_VALUE);
     }
 
     public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] 
sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy 
policy, int framesLimit, int outputLimit)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         super();
         this.ctx = ctx;
         maxSortFrames = framesLimit - 1;
@@ -65,11 +65,11 @@ public abstract class AbstractExternalSortRunGenerator 
extends AbstractSortRunGe
         IFrameBufferManager bufferManager = new VariableFrameMemoryManager(
                 new VariableFramePool(ctx, maxSortFrames * 
ctx.getInitialFrameSize()), freeSlotPolicy);
         if (alg == Algorithm.MERGE_SORT) {
-            frameSorter = new FrameSorterMergeSort(ctx, bufferManager, 
sortFields, firstKeyNormalizerFactory,
-                    comparatorFactories, recordDesc, outputLimit);
+            frameSorter = new FrameSorterMergeSort(ctx, bufferManager, 
maxSortFrames, sortFields,
+                    keyNormalizerFactories, comparatorFactories, recordDesc, 
outputLimit);
         } else {
-            frameSorter = new FrameSorterQuickSort(ctx, bufferManager, 
sortFields, firstKeyNormalizerFactory,
-                    comparatorFactories, recordDesc, outputLimit);
+            frameSorter = new FrameSorterQuickSort(ctx, bufferManager, 
maxSortFrames, sortFields,
+                    keyNormalizerFactories, comparatorFactories, recordDesc, 
outputLimit);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 77d5d49..6c061ae 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -39,46 +39,90 @@ import 
org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 public abstract class AbstractFrameSorter implements IFrameSorter {
 
     protected Logger LOGGER = 
Logger.getLogger(AbstractFrameSorter.class.getName());
-    static final int PTR_SIZE = 4;
-    static final int ID_FRAMEID = 0;
-    static final int ID_TUPLE_START = 1;
-    static final int ID_TUPLE_END = 2;
-    static final int ID_NORMAL_KEY = 3;
+    protected static final int ID_FRAME_ID = 0;
+    protected static final int ID_TUPLE_START = 1;
+    protected static final int ID_TUPLE_END = 2;
+    protected static final int ID_NORMALIZED_KEY = 3;
+
+    // the length of each normalized key (in terms of integers)
+    protected final int[] normalizedKeyLength;
+    // the total length of the normalized key (in term of integers)
+    protected final int normalizedKeyTotalLength;
+    // whether the normalized keys can be used to decide orders, even when 
normalized keys are the same
+    protected final boolean normalizedKeysDecisive;
+
+    protected final int ptrSize;
 
     protected final int[] sortFields;
     protected final IBinaryComparator[] comparators;
-    protected final INormalizedKeyComputer nkc;
+    protected final INormalizedKeyComputer[] nkcs;
     protected final IFrameBufferManager bufferManager;
     protected final FrameTupleAccessor inputTupleAccessor;
     protected final IFrameTupleAppender outputAppender;
     protected final IFrame outputFrame;
     protected final int outputLimit;
 
+    protected final long maxSortMemory;
+    protected long totalMemoryUsed;
     protected int[] tPointers;
+    protected final int[] tmpPointer;
     protected int tupleCount;
 
-    private FrameTupleAccessor fta2;
-    private BufferInfo info = new BufferInfo(null, -1, -1);
+    private final FrameTupleAccessor fta2;
+    private final BufferInfo info = new BufferInfo(null, -1, -1);
 
-    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor,
-                Integer.MAX_VALUE);
+    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] 
keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor)
+            throws HyracksDataException {
+        this(ctx, bufferManager, maxSortFrames, sortFields, 
keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, Integer.MAX_VALUE);
     }
 
-    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int outputLimit)
+    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] 
normalizedKeyComputerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor, int outputLimit)
             throws HyracksDataException {
         this.bufferManager = bufferManager;
+        if (maxSortFrames == VariableFramePool.UNLIMITED_MEMORY) {
+            this.maxSortMemory = Long.MAX_VALUE;
+        } else {
+            this.maxSortMemory = (long) ctx.getInitialFrameSize() * 
maxSortFrames;
+        }
         this.sortFields = sortFields;
-        this.nkc = firstKeyNormalizerFactory == null ? null : 
firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+        int runningNormalizedKeyTotalLength = 0;
+
+        if (normalizedKeyComputerFactories != null) {
+            int decisivePrefixLength = 
getDecisivePrefixLength(normalizedKeyComputerFactories);
+
+            // we only take a prefix of the decisive normalized keys, plus at 
most indecisive normalized keys
+            // ideally, the caller should prepare normalizers in this way, but 
we just guard here to avoid
+            // computing unncessary normalized keys
+            int normalizedKeys = decisivePrefixLength < 
normalizedKeyComputerFactories.length ? decisivePrefixLength + 1
+                    : decisivePrefixLength;
+            this.nkcs = new INormalizedKeyComputer[normalizedKeys];
+            this.normalizedKeyLength = new int[normalizedKeys];
+
+            for (int i = 0; i < normalizedKeys; i++) {
+                this.nkcs[i] = 
normalizedKeyComputerFactories[i].createNormalizedKeyComputer();
+                this.normalizedKeyLength[i] = 
normalizedKeyComputerFactories[i].getNormalizedKeyLength();
+                runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
+            }
+            this.normalizedKeysDecisive = decisivePrefixLength == 
comparatorFactories.length;
+        } else {
+            this.nkcs = null;
+            this.normalizedKeyLength = null;
+            this.normalizedKeysDecisive = false;
+        }
+        this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength;
+        this.ptrSize = ID_NORMALIZED_KEY + normalizedKeyTotalLength;
         this.comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -88,17 +132,24 @@ public abstract class AbstractFrameSorter implements 
IFrameSorter {
         this.outputFrame = new VSizeFrame(ctx);
         this.outputLimit = outputLimit;
         this.fta2 = new FrameTupleAccessor(recordDescriptor);
+        this.tmpPointer = new int[ptrSize];
     }
 
     @Override
     public void reset() throws HyracksDataException {
         this.tupleCount = 0;
+        this.totalMemoryUsed = 0;
         this.bufferManager.reset();
     }
 
     @Override
     public boolean insertFrame(ByteBuffer inputBuffer) throws 
HyracksDataException {
-        if (bufferManager.insertFrame(inputBuffer) >= 0) {
+        inputTupleAccessor.reset(inputBuffer);
+        long requiredMemory = getRequiredMemory(inputTupleAccessor);
+        if (totalMemoryUsed + requiredMemory <= maxSortMemory && 
bufferManager.insertFrame(inputBuffer) >= 0) {
+            // we have enough memory
+            totalMemoryUsed += requiredMemory;
+            tupleCount += inputTupleAccessor.getTupleCount();
             return true;
         }
         if (getFrameCount() == 0) {
@@ -108,36 +159,41 @@ public abstract class AbstractFrameSorter implements 
IFrameSorter {
         return false;
     }
 
+    protected long getRequiredMemory(FrameTupleAccessor frameAccessor) {
+        return (long) frameAccessor.getBuffer().capacity() + ptrSize * 
frameAccessor.getTupleCount() * Integer.BYTES;
+    }
+
     @Override
     public void sort() throws HyracksDataException {
-        tupleCount = 0;
-        for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
-            bufferManager.getFrame(i, info);
-            inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), 
info.getLength());
-            tupleCount += inputTupleAccessor.getTupleCount();
-        }
-        if (tPointers == null || tPointers.length < tupleCount * PTR_SIZE) {
-            tPointers = new int[tupleCount * PTR_SIZE];
+        if (tPointers == null || tPointers.length < tupleCount * ptrSize) {
+            tPointers = new int[tupleCount * ptrSize];
         }
         int ptr = 0;
-        int sfIdx = sortFields[0];
         for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
             bufferManager.getFrame(i, info);
             inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), 
info.getLength());
             int tCount = inputTupleAccessor.getTupleCount();
             byte[] array = inputTupleAccessor.getBuffer().array();
-            for (int j = 0; j < tCount; ++j) {
+            int fieldSlotsLength = inputTupleAccessor.getFieldSlotsLength();
+            for (int j = 0; j < tCount; ++j, ++ptr) {
                 int tStart = inputTupleAccessor.getTupleStartOffset(j);
                 int tEnd = inputTupleAccessor.getTupleEndOffset(j);
-                tPointers[ptr * PTR_SIZE + ID_FRAMEID] = i;
-                tPointers[ptr * PTR_SIZE + ID_TUPLE_START] = tStart;
-                tPointers[ptr * PTR_SIZE + ID_TUPLE_END] = tEnd;
-                int f0StartRel = inputTupleAccessor.getFieldStartOffset(j, 
sfIdx);
-                int f0EndRel = inputTupleAccessor.getFieldEndOffset(j, sfIdx);
-                int f0Start = f0StartRel + tStart + 
inputTupleAccessor.getFieldSlotsLength();
-                tPointers[ptr * PTR_SIZE + ID_NORMAL_KEY] =
-                        nkc == null ? 0 : nkc.normalize(array, f0Start, 
f0EndRel - f0StartRel);
-                ++ptr;
+                tPointers[ptr * ptrSize + ID_FRAME_ID] = i;
+                tPointers[ptr * ptrSize + ID_TUPLE_START] = tStart;
+                tPointers[ptr * ptrSize + ID_TUPLE_END] = tEnd;
+                if (nkcs == null) {
+                    continue;
+                }
+                int keyPos = ptr * ptrSize + ID_NORMALIZED_KEY;
+                for (int k = 0; k < nkcs.length; k++) {
+                    int sortField = sortFields[k];
+                    int fieldStartOffsetRel = 
inputTupleAccessor.getFieldStartOffset(j, sortField);
+                    int fieldEndOffsetRel = 
inputTupleAccessor.getFieldEndOffset(j, sortField);
+                    int fieldStartOffset = fieldStartOffsetRel + tStart + 
fieldSlotsLength;
+                    nkcs[k].normalize(array, fieldStartOffset, 
fieldEndOffsetRel - fieldStartOffsetRel, tPointers,
+                            keyPos);
+                    keyPos += normalizedKeyLength[k];
+                }
             }
         }
         if (tupleCount > 0) {
@@ -164,9 +220,9 @@ public abstract class AbstractFrameSorter implements 
IFrameSorter {
         int limit = Math.min(tupleCount, outputLimit);
         int io = 0;
         for (int ptr = 0; ptr < limit; ++ptr) {
-            int i = tPointers[ptr * PTR_SIZE + ID_FRAMEID];
-            int tStart = tPointers[ptr * PTR_SIZE + ID_TUPLE_START];
-            int tEnd = tPointers[ptr * PTR_SIZE + ID_TUPLE_END];
+            int i = tPointers[ptr * ptrSize + ID_FRAME_ID];
+            int tStart = tPointers[ptr * ptrSize + ID_TUPLE_START];
+            int tEnd = tPointers[ptr * ptrSize + ID_TUPLE_END];
             bufferManager.getFrame(i, info);
             inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), 
info.getLength());
             int flushed = FrameUtils.appendToWriter(writer, outputAppender, 
inputTupleAccessor, tStart, tEnd);
@@ -185,19 +241,23 @@ public abstract class AbstractFrameSorter implements 
IFrameSorter {
     }
 
     protected final int compare(int tp1, int tp2) throws HyracksDataException {
-        int i1 = tPointers[tp1 * 4 + ID_FRAMEID];
-        int j1 = tPointers[tp1 * 4 + ID_TUPLE_START];
-        int v1 = tPointers[tp1 * 4 + ID_NORMAL_KEY];
-
-        int tp2i = tPointers[tp2 * 4 + ID_FRAMEID];
-        int tp2j = tPointers[tp2 * 4 + ID_TUPLE_START];
-        int tp2v = tPointers[tp2 * 4 + ID_NORMAL_KEY];
+        return compare(tPointers, tp1, tPointers, tp2);
+    }
 
-        if (v1 != tp2v) {
-            return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 
0xffffffffL)) ? -1 : 1;
+    protected final int compare(int[] tPointers1, int tp1, int[] tPointers2, 
int tp2) throws HyracksDataException {
+        if (nkcs != null) {
+            int cmpNormalizedKey = compareNormalizeKeys(tPointers1, tp1 * 
ptrSize + ID_NORMALIZED_KEY, tPointers2,
+                    tp2 * ptrSize + ID_NORMALIZED_KEY, 
normalizedKeyTotalLength);
+            if (cmpNormalizedKey != 0 || normalizedKeysDecisive) {
+                return cmpNormalizedKey;
+            }
         }
-        int i2 = tp2i;
-        int j2 = tp2j;
+
+        int i1 = tPointers1[tp1 * ptrSize + ID_FRAME_ID];
+        int j1 = tPointers1[tp1 * ptrSize + ID_TUPLE_START];
+        int i2 = tPointers2[tp2 * ptrSize + ID_FRAME_ID];
+        int j2 = tPointers2[tp2 * ptrSize + ID_TUPLE_START];
+
         bufferManager.getFrame(i1, info);
         byte[] b1 = info.getBuffer().array();
         inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), 
info.getLength());
@@ -223,6 +283,43 @@ public abstract class AbstractFrameSorter implements 
IFrameSorter {
         return 0;
     }
 
+    public static int compareNormalizeKeys(int[] keys1, int start1, int[] 
keys2, int start2, int length) {
+        for (int i = 0; i < length; i++) {
+            int key1 = keys1[start1 + i];
+            int key2 = keys2[start2 + i];
+            if (key1 != key2) {
+                return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 
: 1;
+            }
+        }
+        return 0;
+    }
+
+    public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] 
keyNormalizerFactories) {
+        if (keyNormalizerFactories == null) {
+            return 0;
+        }
+        for (int i = 0; i < keyNormalizerFactories.length; i++) {
+            if (!keyNormalizerFactories[i].isDecisive()) {
+                return i;
+            }
+        }
+        return keyNormalizerFactories.length;
+    }
+
+    protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
+        System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
+        System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, 
ptrSize);
+        System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
+    }
+
+    protected void copy(int src[], int srcPos, int dest[], int destPos) {
+        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, 
ptrSize);
+    }
+
+    protected void copy(int src[], int srcPos, int dest[], int destPos, int n) 
{
+        System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * 
ptrSize);
+    }
+
     @Override
     public void close() {
         tupleCount = 0;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 1cd5fc3..602157f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -56,17 +56,17 @@ public abstract class AbstractSorterOperatorDescriptor 
extends AbstractOperatorD
     protected static final int MERGE_ACTIVITY_ID = 1;
 
     protected final int[] sortFields;
-    protected final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    protected final INormalizedKeyComputerFactory[] keyNormalizerFactories;
     protected final IBinaryComparatorFactory[] comparatorFactories;
     protected final int framesLimit;
 
     public AbstractSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
         this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.keyNormalizerFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
         outRecDescs[0] = recordDescriptor;
     }
@@ -174,8 +174,8 @@ public abstract class AbstractSorterOperatorDescriptor 
extends AbstractOperatorD
                     for (int i = 0; i < comparatorFactories.length; ++i) {
                         comparators[i] = 
comparatorFactories[i].createBinaryComparator();
                     }
-                    INormalizedKeyComputer nmkComputer = 
firstKeyNormalizerFactory == null ? null
-                            : 
firstKeyNormalizerFactory.createNormalizedKeyComputer();
+                    INormalizedKeyComputer nmkComputer = 
keyNormalizerFactories == null ? null
+                            : 
keyNormalizerFactories[0].createNormalizedKeyComputer();
                     AbstractExternalSortRunMerger merger = 
getSortRunMerger(ctx, recordDescProvider, writer, sorter,
                             runs, comparators, nmkComputer, framesLimit);
                     merger.process();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 1b66ccf..b58d4c7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -43,21 +43,31 @@ public class ExternalSortOperatorDescriptor extends 
AbstractSorterOperatorDescri
     private final int outputLimit;
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, Algorithm alg) {
-        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor, alg,
+        this(spec, framesLimit, sortFields, keyNormalizerFactories, 
comparatorFactories, recordDescriptor, alg,
                 EnumFreeSlotPolicy.LAST_FIT);
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int framesLimit, int[] sortFields,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor) {
-        this(spec, framesLimit, sortFields, null, comparatorFactories, 
recordDescriptor);
+        this(spec, framesLimit, sortFields, (INormalizedKeyComputerFactory[]) 
null, comparatorFactories,
+                recordDescriptor);
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int framesLimit, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
-        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor,
+        this(spec, framesLimit, sortFields,
+                firstKeyNormalizerFactory != null ? new 
INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, recordDescriptor, Algorithm.MERGE_SORT, 
EnumFreeSlotPolicy.LAST_FIT);
+    }
+
+    public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int framesLimit, int[] sortFields,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        this(spec, framesLimit, sortFields, keyNormalizerFactories, 
comparatorFactories, recordDescriptor,
                 Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT);
     }
 
@@ -69,7 +79,7 @@ public class ExternalSortOperatorDescriptor extends 
AbstractSorterOperatorDescri
             @Override
             protected AbstractSortRunGenerator 
getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) throws 
HyracksDataException {
-                return new ExternalSortRunGenerator(ctx, sortFields, 
firstKeyNormalizerFactory, comparatorFactories,
+                return new ExternalSortRunGenerator(ctx, sortFields, 
keyNormalizerFactories, comparatorFactories,
                         outRecDescs[0], alg, policy, framesLimit, outputLimit);
             }
         };
@@ -92,16 +102,16 @@ public class ExternalSortOperatorDescriptor extends 
AbstractSorterOperatorDescri
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, Algorithm alg, 
EnumFreeSlotPolicy policy) {
-        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor, alg,
-                policy, Integer.MAX_VALUE);
+        this(spec, framesLimit, sortFields, keyNormalizerFactories, 
comparatorFactories, recordDescriptor, alg, policy,
+                Integer.MAX_VALUE);
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int framesLimit, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, Algorithm alg, 
EnumFreeSlotPolicy policy, int outputLimit) {
-        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor);
+        super(spec, framesLimit, sortFields, keyNormalizerFactories, 
comparatorFactories, recordDescriptor);
         if (framesLimit <= 1) {
             throw new IllegalStateException();// minimum of 2 fames (1 in,1 
out)
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index b451b1c..785b94e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -31,32 +31,32 @@ import 
org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 public class ExternalSortRunGenerator extends AbstractExternalSortRunGenerator 
{
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, int framesLimit) 
throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, 
recordDesc, alg,
-                EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, 
recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT,
+                framesLimit);
     }
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy 
policy, int framesLimit)
-                    throws HyracksDataException {
-        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, 
recordDesc, alg, policy, framesLimit,
+            throws HyracksDataException {
+        this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, 
recordDesc, alg, policy, framesLimit,
                 Integer.MAX_VALUE);
     }
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy 
policy, int framesLimit, int outputLimit)
-                    throws HyracksDataException {
-        super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, 
recordDesc, alg, policy, framesLimit,
+            throws HyracksDataException {
+        super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, 
recordDesc, alg, policy, framesLimit,
                 outputLimit);
     }
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext()
-                
.createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
+        FileReference file =
+                
ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index ed28560..260b665 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -23,24 +23,27 @@ import 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
 
 public class FrameSorterMergeSort extends AbstractFrameSorter {
 
     private int[] tPointersTemp;
 
-    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor,
-                Integer.MAX_VALUE);
+    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] 
keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor)
+            throws HyracksDataException {
+        this(ctx, bufferManager, maxSortFrames, sortFields, 
keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, Integer.MAX_VALUE);
     }
 
-    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int outputLimit) throws 
HyracksDataException {
-        super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor,
-                outputLimit);
+    public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] 
keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor, int outputLimit)
+            throws HyracksDataException {
+        super(ctx, bufferManager, maxSortFrames, sortFields, 
keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, outputLimit);
     }
 
     @Override
@@ -52,6 +55,11 @@ public class FrameSorterMergeSort extends 
AbstractFrameSorter {
     }
 
     @Override
+    protected long getRequiredMemory(FrameTupleAccessor frameAccessor) {
+        return super.getRequiredMemory(frameAccessor) + ptrSize * 
frameAccessor.getTupleCount() * Integer.BYTES;
+    }
+
+    @Override
     public void close() {
         super.close();
         tPointersTemp = null;
@@ -68,7 +76,7 @@ public class FrameSorterMergeSort extends AbstractFrameSorter 
{
                 if (next < end) {
                     merge(i, next, step, Math.min(step, end - next));
                 } else {
-                    System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, 
(end - i) * 4);
+                    copy(tPointers, i, tPointersTemp, i, end - i);
                 }
             }
             /** prepare next phase merge */
@@ -91,29 +99,21 @@ public class FrameSorterMergeSort extends 
AbstractFrameSorter {
         while (pos1 <= end1 && pos2 <= end2) {
             int cmp = compare(pos1, pos2);
             if (cmp <= 0) {
-                copy(pos1, targetPos);
+                copy(tPointers, pos1, tPointersTemp, targetPos);
                 pos1++;
             } else {
-                copy(pos2, targetPos);
+                copy(tPointers, pos2, tPointersTemp, targetPos);
                 pos2++;
             }
             targetPos++;
         }
         if (pos1 <= end1) {
             int rest = end1 - pos1 + 1;
-            System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 
4, rest * 4);
+            copy(tPointers, pos1, tPointersTemp, targetPos, rest);
         }
         if (pos2 <= end2) {
             int rest = end2 - pos2 + 1;
-            System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 
4, rest * 4);
+            copy(tPointers, pos2, tPointersTemp, targetPos, rest);
         }
     }
-
-    private void copy(int src, int dest) {
-        tPointersTemp[dest * 4 + ID_FRAMEID] = tPointers[src * 4 + ID_FRAMEID];
-        tPointersTemp[dest * 4 + ID_TUPLE_START] = tPointers[src * 4 + 
ID_TUPLE_START];
-        tPointersTemp[dest * 4 + ID_TUPLE_END] = tPointers[src * 4 + 
ID_TUPLE_END];
-        tPointersTemp[dest * 4 + ID_NORMAL_KEY] = tPointers[src * 4 + 
ID_NORMAL_KEY];
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index cf864f6..486bc7c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -27,18 +27,20 @@ import 
org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
 
 public class FrameSorterQuickSort extends AbstractFrameSorter {
 
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
-        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor,
-                Integer.MAX_VALUE);
+    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] 
keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor)
+            throws HyracksDataException {
+        this(ctx, bufferManager, maxSortFrames, sortFields, 
keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, Integer.MAX_VALUE);
     }
 
-    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int outputLimit) throws 
HyracksDataException {
-        super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor,
-                outputLimit);
+    public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager 
bufferManager, int maxSortFrames,
+            int[] sortFields, INormalizedKeyComputerFactory[] 
keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor, int outputLimit)
+            throws HyracksDataException {
+        super(ctx, bufferManager, maxSortFrames, sortFields, 
keyNormalizerFactories, comparatorFactories,
+                recordDescriptor, outputLimit);
     }
 
     @Override
@@ -60,7 +62,7 @@ public class FrameSorterQuickSort extends AbstractFrameSorter 
{
                     break;
                 }
                 if (cmp == 0) {
-                    swap(tPointers, a++, b);
+                    swap(tPointers, a++, tPointers, b);
                 }
                 ++b;
             }
@@ -70,13 +72,13 @@ public class FrameSorterQuickSort extends 
AbstractFrameSorter {
                     break;
                 }
                 if (cmp == 0) {
-                    swap(tPointers, c, d--);
+                    swap(tPointers, c, tPointers, d--);
                 }
                 --c;
             }
             if (b > c)
                 break;
-            swap(tPointers, b++, c--);
+            swap(tPointers, b++, tPointers, c--);
         }
 
         int s;
@@ -94,17 +96,9 @@ public class FrameSorterQuickSort extends 
AbstractFrameSorter {
         }
     }
 
-    private void swap(int x[], int a, int b) {
-        for (int i = 0; i < 4; ++i) {
-            int t = x[a * 4 + i];
-            x[a * 4 + i] = x[b * 4 + i];
-            x[b * 4 + i] = t;
-        }
-    }
-
     private void vecswap(int x[], int a, int b, int n) {
         for (int i = 0; i < n; i++, a++, b++) {
-            swap(x, a, b);
+            swap(x, a, x, b);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index a058624..1578975 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -31,31 +31,31 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
 import 
org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
 import 
org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 
 public class HeapSortRunGenerator extends AbstractSortRunGenerator {
     protected final IHyracksTaskContext ctx;
     protected final int frameLimit;
     protected final int topK;
     protected final int[] sortFields;
-    protected final INormalizedKeyComputerFactory nmkFactory;
+    protected final INormalizedKeyComputerFactory[] nmkFactories;
     protected final IBinaryComparatorFactory[] comparatorFactories;
     protected final RecordDescriptor recordDescriptor;
     protected ITupleSorter tupleSorter;
     protected IFrameTupleAccessor inAccessor;
 
     public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int 
topK, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         super();
         this.ctx = ctx;
         this.frameLimit = frameLimit;
         this.topK = topK;
         this.sortFields = sortFields;
-        this.nmkFactory = firstKeyNormalizerFactory;
+        this.nmkFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
         this.inAccessor = new FrameTupleAccessor(recordDescriptor);
         this.recordDescriptor = recordDescriptor;
@@ -64,8 +64,9 @@ public class HeapSortRunGenerator extends 
AbstractSortRunGenerator {
     @Override
     public void open() throws HyracksDataException {
         IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * 
ctx.getInitialFrameSize());
-        IDeletableTupleBufferManager bufferManager = new 
VariableDeletableTupleMemoryManager(framePool, recordDescriptor);
-        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, 
sortFields, nmkFactory, comparatorFactories);
+        IDeletableTupleBufferManager bufferManager =
+                new VariableDeletableTupleMemoryManager(framePool, 
recordDescriptor);
+        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, 
sortFields, nmkFactories, comparatorFactories);
         super.open();
     }
 
@@ -76,8 +77,8 @@ public class HeapSortRunGenerator extends 
AbstractSortRunGenerator {
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext()
-                
.createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
+        FileReference file =
+                
ctx.getJobletContext().createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
index 4311128..80b36ce 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -43,9 +43,9 @@ public class HybridTopKSortRunGenerator extends 
HeapSortRunGenerator {
     private int tupleSorterFlushedTimes = 0;
 
     public HybridTopKSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, 
int topK, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
-        super(ctx, frameLimit, topK, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor);
+        super(ctx, frameLimit, topK, sortFields, keyNormalizerFactories, 
comparatorFactories, recordDescriptor);
     }
 
     @Override
@@ -60,8 +60,8 @@ public class HybridTopKSortRunGenerator extends 
HeapSortRunGenerator {
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext()
-                
.createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName());
+        FileReference file =
+                
ctx.getJobletContext().createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIoManager());
     }
 
@@ -101,8 +101,8 @@ public class HybridTopKSortRunGenerator extends 
HeapSortRunGenerator {
                         new VariableFramePool(ctx, (frameLimit - 1) * 
ctx.getInitialFrameSize()),
                         
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.BIGGEST_FIT,
                                 frameLimit - 1));
-                frameSorter = new FrameSorterMergeSort(ctx, bufferManager, 
sortFields, nmkFactory, comparatorFactories,
-                        recordDescriptor, topK);
+                frameSorter = new FrameSorterMergeSort(ctx, bufferManager, 
frameLimit - 1, sortFields, nmkFactories,
+                        comparatorFactories, recordDescriptor, topK);
                 if (LOG.isLoggable(Level.FINE)) {
                     LOG.fine("create frameSorter");
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 996101b..adc0d5c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -53,8 +53,8 @@ public class InMemorySortOperatorDescriptor extends 
AbstractOperatorDescriptor {
     private static final int MERGE_ACTIVITY_ID = 1;
 
     private final int[] sortFields;
-    private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-    private IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory[] keyNormalizerFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
 
     public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int[] sortFields,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor 
recordDescriptor) {
@@ -62,11 +62,11 @@ public class InMemorySortOperatorDescriptor extends 
AbstractOperatorDescriptor {
     }
 
     public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.keyNormalizerFactories = keyNormalizerFactories;
         this.comparatorFactories = comparatorFactories;
         outRecDescs[0] = recordDescriptor;
     }
@@ -123,8 +123,9 @@ public class InMemorySortOperatorDescriptor extends 
AbstractOperatorDescriptor {
                             new VariableFramePool(ctx, 
VariableFramePool.UNLIMITED_MEMORY),
                             
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT));
 
-                    state.frameSorter = new FrameSorterMergeSort(ctx, 
frameBufferManager, sortFields,
-                            firstKeyNormalizerFactory, comparatorFactories, 
outRecDescs[0]);
+                    state.frameSorter =
+                            new FrameSorterMergeSort(ctx, frameBufferManager, 
VariableFramePool.UNLIMITED_MEMORY,
+                                    sortFields, keyNormalizerFactories, 
comparatorFactories, outRecDescs[0]);
                     state.frameSorter.reset();
                 }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index 988eea3..a90d48f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -41,7 +41,16 @@ public class TopKSorterOperatorDescriptor extends 
AbstractSorterOperatorDescript
     public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
framesLimit, int topK, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, 
IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
-        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, 
comparatorFactories, recordDescriptor);
+        this(spec, framesLimit, topK, sortFields,
+                firstKeyNormalizerFactory != null ? new 
INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory }
+                        : null,
+                comparatorFactories, recordDescriptor);
+    }
+
+    public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
framesLimit, int topK, int[] sortFields,
+            INormalizedKeyComputerFactory[] keyNormalizerFactories, 
IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        super(spec, framesLimit, sortFields, keyNormalizerFactories, 
comparatorFactories, recordDescriptor);
         this.topK = topK;
     }
 
@@ -53,7 +62,7 @@ public class TopKSorterOperatorDescriptor extends 
AbstractSorterOperatorDescript
             @Override
             protected AbstractSortRunGenerator 
getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) {
-                return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, 
sortFields, firstKeyNormalizerFactory,
+                return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, 
sortFields, keyNormalizerFactories,
                         comparatorFactories, outRecDescs[0]);
 
             }

Reply via email to