[ASTERIXDB-2149] Enable multiple normalized keys in sort - user model changes: no - storage format changes: no - interface changes: yes. The interface of sort is changed.
Currently, during the (in-memory) sort, we use an int normalized keys to speed up comparisions by avoiding random memory accesses. However, this technique is inefficient if the first 4 bytes of the sorting keys are not distinctive. From performance point of view, it's better to use longer normalized keys when it's possible (2-3x improvements). This is enabled by this patch by: - Allowing multiple normalized keys during sort, and the length of each normalized key can be longer (multiple integers). - Enable memory budgeting of pointer directories as well during sort (but for performance, we still use int[], instead of byte[] from frame). The next patch will enable the AsterixDB layer to use this feature to speed up sort performance. Change-Id: I4354242ff731b4b006b8446b58f65873047dde78 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2127 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ed469381 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ed469381 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ed469381 Branch: refs/heads/master Commit: ed469381235990ce5ecd2f242b679190ef2ca263 Parents: c5a0a19 Author: luochen01 <cl...@uci.edu> Authored: Mon Nov 27 11:14:01 2017 -0800 Committer: Luo Chen <cl...@uci.edu> Committed: Mon Nov 27 13:18:52 2017 -0800 ---------------------------------------------------------------------- .../sort/InMemorySortRuntimeFactory.java | 18 +- .../tests/pushruntime/PushRuntimeTest.java | 7 +- .../dataflow/value/INormalizedKeyComputer.java | 5 + .../value/INormalizedKeyComputerFactory.java | 17 ++ .../IntegerNormalizedKeyComputerFactory.java | 8 +- .../VariableDeletableTupleMemoryManager.java | 4 +- .../sort/ExternalSortGroupByRunGenerator.java | 29 ++- .../sort/SortGroupByOperatorDescriptor.java | 65 ++++-- .../sort/AbstractExternalSortRunGenerator.java | 24 +-- .../dataflow/std/sort/AbstractFrameSorter.java | 199 ++++++++++++++----- .../sort/AbstractSorterOperatorDescriptor.java | 10 +- .../sort/ExternalSortOperatorDescriptor.java | 30 ++- .../std/sort/ExternalSortRunGenerator.java | 22 +- .../dataflow/std/sort/FrameSorterMergeSort.java | 46 ++--- .../dataflow/std/sort/FrameSorterQuickSort.java | 38 ++-- .../dataflow/std/sort/HeapSortRunGenerator.java | 19 +- .../std/sort/HybridTopKSortRunGenerator.java | 12 +- .../sort/InMemorySortOperatorDescriptor.java | 13 +- .../std/sort/TopKSorterOperatorDescriptor.java | 13 +- .../dataflow/std/sort/TupleSorterHeapSort.java | 97 ++++++--- .../tests/integration/HeapSortMergeTest.java | 85 ++++---- .../tests/unit/AbstractRunGeneratorTest.java | 99 +++++---- .../unit/ExternalSortRunGeneratorTest.java | 21 +- .../tests/unit/HeapSortRunGeneratorTest.java | 26 ++- .../tests/unit/HybridSortRunGenerator.java | 34 ---- .../tests/unit/HybridSortRunGeneratorTest.java | 48 +++++ .../tests/unit/RunMergingFrameReaderTest.java | 30 +-- .../tests/unit/TopKRunGeneratorTest.java | 35 ++-- .../examples/text/client/WordCountMain.java | 32 +-- .../hyracks/examples/tpch/client/Sort.java | 14 +- 30 files changed, 703 insertions(+), 397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java index 925ff93..bb8223d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java @@ -39,18 +39,26 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime private static final long serialVersionUID = 1L; private final int[] sortFields; - private INormalizedKeyComputerFactory firstKeyNormalizerFactory; - private IBinaryComparatorFactory[] comparatorFactories; + private final INormalizedKeyComputerFactory[] keyNormalizerFactories; + private final IBinaryComparatorFactory[] comparatorFactories; public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) { + this(sortFields, + firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory } + : null, + comparatorFactories, projectionList); + } + + public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) { super(projectionList); // Obs: the projection list is currently ignored. if (projectionList != null) { throw new NotImplementedException("Cannot push projection into InMemorySortRuntime."); } this.sortFields = sortFields; - this.firstKeyNormalizerFactory = firstKeyNormalizerFactory; + this.keyNormalizerFactories = keyNormalizerFactories; this.comparatorFactories = comparatorFactories; } @@ -67,8 +75,8 @@ public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntime IFrameBufferManager manager = new VariableFrameMemoryManager( new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT)); - frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory, - comparatorFactories, outputRecordDesc); + frameSorter = new FrameSorterMergeSort(ctx, manager, VariableFramePool.UNLIMITED_MEMORY, sortFields, + keyNormalizerFactories, comparatorFactories, outputRecordDesc); } frameSorter.reset(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index 34ed142..cc4c1b9 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -64,6 +64,7 @@ import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.io.FileReference; @@ -722,7 +723,8 @@ public class PushRuntimeTest { new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID }); // the algebricks op. - InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 }, null, + InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 1 }, + (INormalizedKeyComputerFactory) null, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, null); RecordDescriptor sortDesc = scannerDesc; @@ -836,7 +838,8 @@ public class PushRuntimeTest { // the sort (by nation id) RecordDescriptor sortDesc = scannerDesc; - InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 }, null, + InMemorySortRuntimeFactory sort = new InMemorySortRuntimeFactory(new int[] { 3 }, + (INormalizedKeyComputerFactory) null, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, null); // the group-by http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java index 2c79a4d..7bf8255 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java @@ -20,4 +20,9 @@ package org.apache.hyracks.api.dataflow.value; public interface INormalizedKeyComputer { public int normalize(byte[] bytes, int start, int length); + + default void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + int key = normalize(bytes, start, length); + normalizedKeys[keyStart] = key; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java index 2b7198b..901702e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java @@ -22,4 +22,21 @@ import java.io.Serializable; public interface INormalizedKeyComputerFactory extends Serializable { public INormalizedKeyComputer createNormalizedKeyComputer(); + + /** + * + * @return The length of the normalized key in terms of integers + */ + default int getNormalizedKeyLength() { + return 1; + } + + /** + * + * @return Whether we can solely rely on this normalized key to complete comparison, + * even when two normalized keys are equal + */ + default boolean isDecisive() { + return false; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java index 5cfef28..41c0740 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java @@ -21,7 +21,6 @@ package org.apache.hyracks.dataflow.common.data.normalizers; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.data.std.primitive.IntegerPointable; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; public class IntegerNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { private static final long serialVersionUID = 1L; @@ -32,8 +31,13 @@ public class IntegerNormalizedKeyComputerFactory implements INormalizedKeyComput @Override public int normalize(byte[] bytes, int start, int length) { int value = IntegerPointable.getInteger(bytes, start); - return value ^Integer.MIN_VALUE; + return value ^ Integer.MIN_VALUE; } }; } + + @Override + public boolean isDecisive() { + return true; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java index 4ed11e6..5a59b5d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java @@ -170,8 +170,8 @@ public class VariableDeletableTupleMemoryManager implements IDeletableTupleBuffe @Override public ITuplePointerAccessor createTuplePointerAccessor() { return new AbstractTuplePointerAccessor() { - private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender( - recordDescriptor); + private final IAppendDeletableFrameTupleAccessor bufferAccessor = + new DeletableFrameTupleAppender(recordDescriptor); @Override IFrameTupleAccessor getInnerAccessor() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java index 3f10f50..0b915dc 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java @@ -50,16 +50,25 @@ public class ExternalSortGroupByRunGenerator extends AbstractExternalSortRunGene int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException { - this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, firstKeyNormalizerFactory, comparatorFactories, + this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, + firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory } + : null, + comparatorFactories, aggregatorFactory, outRecordDesc, alg, EnumFreeSlotPolicy.LAST_FIT); + } + + public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc, + int framesLimit, int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory, + RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException { + this(ctx, sortFields, inputRecordDesc, framesLimit, groupFields, keyNormalizerFactories, comparatorFactories, aggregatorFactory, outRecordDesc, alg, EnumFreeSlotPolicy.LAST_FIT); } public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor inputRecordDesc, - int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, + int framesLimit, int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor outRecordDesc, Algorithm alg, EnumFreeSlotPolicy policy) throws HyracksDataException { - super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, inputRecordDesc, alg, policy, - framesLimit); + super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, inputRecordDesc, alg, policy, framesLimit); this.groupFields = groupFields; this.comparatorFactories = comparatorFactories; @@ -70,20 +79,20 @@ public class ExternalSortGroupByRunGenerator extends AbstractExternalSortRunGene @Override protected RunFileWriter getRunFileWriter() throws HyracksDataException { - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile( - ExternalSortGroupByRunGenerator.class.getSimpleName()); + FileReference file = ctx.getJobletContext() + .createManagedWorkspaceFile(ExternalSortGroupByRunGenerator.class.getSimpleName()); return new RunFileWriter(file, ctx.getIoManager()); } @Override protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException { //create group-by comparators - IBinaryComparator[] comparators = new IBinaryComparator[Math - .min(groupFields.length, comparatorFactories.length)]; + IBinaryComparator[] comparators = + new IBinaryComparator[Math.min(groupFields.length, comparatorFactories.length)]; for (int i = 0; i < comparators.length; i++) { comparators[i] = comparatorFactories[i].createBinaryComparator(); } - return new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, - this.inRecordDesc, this.outRecordDesc, writer, true); + return new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, this.inRecordDesc, + this.outRecordDesc, writer, true); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java index da5b4a8..23e47f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java @@ -55,39 +55,74 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip private final RecordDescriptor partialAggRecordDesc; private final RecordDescriptor outputRecordDesc; private final boolean finalStage; - private Algorithm alg = Algorithm.MERGE_SORT; + private static final Algorithm ALG = Algorithm.MERGE_SORT; /** * @param spec - * , the Hyracks job specification + * the Hyracks job specification * @param framesLimit - * , the frame limit for this operator + * the frame limit for this operator * @param sortFields - * , the fields to sort + * the fields to sort * @param groupFields - * , the fields to group, which can be a prefix subset of sortFields + * the fields to group, which can be a prefix subset of sortFields * @param firstKeyNormalizerFactory - * , the normalized key computer factory of the first key + * the normalized key computer factory of the first key * @param comparatorFactories - * , the comparator factories of sort keys + * the comparator factories of sort keys * @param partialAggregatorFactory - * , for aggregating the input of this operator + * for aggregating the input of this operator * @param mergeAggregatorFactory - * , for aggregating the intermediate data of this operator + * for aggregating the intermediate data of this operator * @param partialAggRecordDesc - * , the record descriptor of intermediate data + * the record descriptor of intermediate data * @param outRecordDesc - * , the record descriptor of output data + * the record descriptor of output data * @param finalStage - * , whether the operator is used for final stage aggregation + * whether the operator is used for final stage aggregation */ public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc, RecordDescriptor outRecordDesc, boolean finalStage) { + this(spec, framesLimit, sortFields, groupFields, + firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory } + : null, + comparatorFactories, partialAggregatorFactory, mergeAggregatorFactory, partialAggRecordDesc, + outRecordDesc, finalStage); + } - super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, outRecordDesc); + /** + * @param spec + * the Hyracks job specification + * @param framesLimit + * the frame limit for this operator + * @param sortFields + * the fields to sort + * @param groupFields + * the fields to group, which can be a prefix subset of sortFields + * @param keyNormalizerFactories + * the normalized key computer factories for the prefix the sortFields + * @param comparatorFactories + * the comparator factories of sort keys + * @param partialAggregatorFactory + * for aggregating the input of this operator + * @param mergeAggregatorFactory + * for aggregating the intermediate data of this operator + * @param partialAggRecordDesc + * the record descriptor of intermediate data + * @param outRecordDesc + * the record descriptor of output data + * @param finalStage + * whether the operator is used for final stage aggregation + */ + public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, + int[] groupFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory, + IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc, + RecordDescriptor outRecordDesc, boolean finalStage) { + super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, outRecordDesc); if (framesLimit <= 1) { throw new IllegalStateException();// minimum of 2 fames (1 in,1 out) } @@ -110,8 +145,8 @@ public class SortGroupByOperatorDescriptor extends AbstractSorterOperatorDescrip IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException { return new ExternalSortGroupByRunGenerator(ctx, sortFields, recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit, - groupFields, firstKeyNormalizerFactory, comparatorFactories, partialAggregatorFactory, - partialAggRecordDesc, alg); + groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory, + partialAggRecordDesc, ALG); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java index 7c7bfec..a8cc93b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunGenerator.java @@ -39,24 +39,24 @@ public abstract class AbstractExternalSortRunGenerator extends AbstractSortRunGe protected final int maxSortFrames; public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException { - this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, - EnumFreeSlotPolicy.LAST_FIT, framesLimit); + this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT, + framesLimit); } public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit) - throws HyracksDataException { - this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit, + throws HyracksDataException { + this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit, Integer.MAX_VALUE); } public AbstractExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit) - throws HyracksDataException { + throws HyracksDataException { super(); this.ctx = ctx; maxSortFrames = framesLimit - 1; @@ -65,11 +65,11 @@ public abstract class AbstractExternalSortRunGenerator extends AbstractSortRunGe IFrameBufferManager bufferManager = new VariableFrameMemoryManager( new VariableFramePool(ctx, maxSortFrames * ctx.getInitialFrameSize()), freeSlotPolicy); if (alg == Algorithm.MERGE_SORT) { - frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, - comparatorFactories, recordDesc, outputLimit); + frameSorter = new FrameSorterMergeSort(ctx, bufferManager, maxSortFrames, sortFields, + keyNormalizerFactories, comparatorFactories, recordDesc, outputLimit); } else { - frameSorter = new FrameSorterQuickSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, - comparatorFactories, recordDesc, outputLimit); + frameSorter = new FrameSorterQuickSort(ctx, bufferManager, maxSortFrames, sortFields, + keyNormalizerFactories, comparatorFactories, recordDesc, outputLimit); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java index 77d5d49..6c061ae 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java @@ -39,46 +39,90 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo; import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; +import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; import org.apache.hyracks.util.IntSerDeUtils; public abstract class AbstractFrameSorter implements IFrameSorter { protected Logger LOGGER = Logger.getLogger(AbstractFrameSorter.class.getName()); - static final int PTR_SIZE = 4; - static final int ID_FRAMEID = 0; - static final int ID_TUPLE_START = 1; - static final int ID_TUPLE_END = 2; - static final int ID_NORMAL_KEY = 3; + protected static final int ID_FRAME_ID = 0; + protected static final int ID_TUPLE_START = 1; + protected static final int ID_TUPLE_END = 2; + protected static final int ID_NORMALIZED_KEY = 3; + + // the length of each normalized key (in terms of integers) + protected final int[] normalizedKeyLength; + // the total length of the normalized key (in term of integers) + protected final int normalizedKeyTotalLength; + // whether the normalized keys can be used to decide orders, even when normalized keys are the same + protected final boolean normalizedKeysDecisive; + + protected final int ptrSize; protected final int[] sortFields; protected final IBinaryComparator[] comparators; - protected final INormalizedKeyComputer nkc; + protected final INormalizedKeyComputer[] nkcs; protected final IFrameBufferManager bufferManager; protected final FrameTupleAccessor inputTupleAccessor; protected final IFrameTupleAppender outputAppender; protected final IFrame outputFrame; protected final int outputLimit; + protected final long maxSortMemory; + protected long totalMemoryUsed; protected int[] tPointers; + protected final int[] tmpPointer; protected int tupleCount; - private FrameTupleAccessor fta2; - private BufferInfo info = new BufferInfo(null, -1, -1); + private final FrameTupleAccessor fta2; + private final BufferInfo info = new BufferInfo(null, -1, -1); - public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor) throws HyracksDataException { - this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, - Integer.MAX_VALUE); + public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, + int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) + throws HyracksDataException { + this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories, + recordDescriptor, Integer.MAX_VALUE); } - public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor, int outputLimit) + public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, + int[] sortFields, INormalizedKeyComputerFactory[] normalizedKeyComputerFactories, + IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException { this.bufferManager = bufferManager; + if (maxSortFrames == VariableFramePool.UNLIMITED_MEMORY) { + this.maxSortMemory = Long.MAX_VALUE; + } else { + this.maxSortMemory = (long) ctx.getInitialFrameSize() * maxSortFrames; + } this.sortFields = sortFields; - this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer(); + + int runningNormalizedKeyTotalLength = 0; + + if (normalizedKeyComputerFactories != null) { + int decisivePrefixLength = getDecisivePrefixLength(normalizedKeyComputerFactories); + + // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys + // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid + // computing unncessary normalized keys + int normalizedKeys = decisivePrefixLength < normalizedKeyComputerFactories.length ? decisivePrefixLength + 1 + : decisivePrefixLength; + this.nkcs = new INormalizedKeyComputer[normalizedKeys]; + this.normalizedKeyLength = new int[normalizedKeys]; + + for (int i = 0; i < normalizedKeys; i++) { + this.nkcs[i] = normalizedKeyComputerFactories[i].createNormalizedKeyComputer(); + this.normalizedKeyLength[i] = normalizedKeyComputerFactories[i].getNormalizedKeyLength(); + runningNormalizedKeyTotalLength += this.normalizedKeyLength[i]; + } + this.normalizedKeysDecisive = decisivePrefixLength == comparatorFactories.length; + } else { + this.nkcs = null; + this.normalizedKeyLength = null; + this.normalizedKeysDecisive = false; + } + this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength; + this.ptrSize = ID_NORMALIZED_KEY + normalizedKeyTotalLength; this.comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); @@ -88,17 +132,24 @@ public abstract class AbstractFrameSorter implements IFrameSorter { this.outputFrame = new VSizeFrame(ctx); this.outputLimit = outputLimit; this.fta2 = new FrameTupleAccessor(recordDescriptor); + this.tmpPointer = new int[ptrSize]; } @Override public void reset() throws HyracksDataException { this.tupleCount = 0; + this.totalMemoryUsed = 0; this.bufferManager.reset(); } @Override public boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException { - if (bufferManager.insertFrame(inputBuffer) >= 0) { + inputTupleAccessor.reset(inputBuffer); + long requiredMemory = getRequiredMemory(inputTupleAccessor); + if (totalMemoryUsed + requiredMemory <= maxSortMemory && bufferManager.insertFrame(inputBuffer) >= 0) { + // we have enough memory + totalMemoryUsed += requiredMemory; + tupleCount += inputTupleAccessor.getTupleCount(); return true; } if (getFrameCount() == 0) { @@ -108,36 +159,41 @@ public abstract class AbstractFrameSorter implements IFrameSorter { return false; } + protected long getRequiredMemory(FrameTupleAccessor frameAccessor) { + return (long) frameAccessor.getBuffer().capacity() + ptrSize * frameAccessor.getTupleCount() * Integer.BYTES; + } + @Override public void sort() throws HyracksDataException { - tupleCount = 0; - for (int i = 0; i < bufferManager.getNumFrames(); ++i) { - bufferManager.getFrame(i, info); - inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength()); - tupleCount += inputTupleAccessor.getTupleCount(); - } - if (tPointers == null || tPointers.length < tupleCount * PTR_SIZE) { - tPointers = new int[tupleCount * PTR_SIZE]; + if (tPointers == null || tPointers.length < tupleCount * ptrSize) { + tPointers = new int[tupleCount * ptrSize]; } int ptr = 0; - int sfIdx = sortFields[0]; for (int i = 0; i < bufferManager.getNumFrames(); ++i) { bufferManager.getFrame(i, info); inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength()); int tCount = inputTupleAccessor.getTupleCount(); byte[] array = inputTupleAccessor.getBuffer().array(); - for (int j = 0; j < tCount; ++j) { + int fieldSlotsLength = inputTupleAccessor.getFieldSlotsLength(); + for (int j = 0; j < tCount; ++j, ++ptr) { int tStart = inputTupleAccessor.getTupleStartOffset(j); int tEnd = inputTupleAccessor.getTupleEndOffset(j); - tPointers[ptr * PTR_SIZE + ID_FRAMEID] = i; - tPointers[ptr * PTR_SIZE + ID_TUPLE_START] = tStart; - tPointers[ptr * PTR_SIZE + ID_TUPLE_END] = tEnd; - int f0StartRel = inputTupleAccessor.getFieldStartOffset(j, sfIdx); - int f0EndRel = inputTupleAccessor.getFieldEndOffset(j, sfIdx); - int f0Start = f0StartRel + tStart + inputTupleAccessor.getFieldSlotsLength(); - tPointers[ptr * PTR_SIZE + ID_NORMAL_KEY] = - nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel); - ++ptr; + tPointers[ptr * ptrSize + ID_FRAME_ID] = i; + tPointers[ptr * ptrSize + ID_TUPLE_START] = tStart; + tPointers[ptr * ptrSize + ID_TUPLE_END] = tEnd; + if (nkcs == null) { + continue; + } + int keyPos = ptr * ptrSize + ID_NORMALIZED_KEY; + for (int k = 0; k < nkcs.length; k++) { + int sortField = sortFields[k]; + int fieldStartOffsetRel = inputTupleAccessor.getFieldStartOffset(j, sortField); + int fieldEndOffsetRel = inputTupleAccessor.getFieldEndOffset(j, sortField); + int fieldStartOffset = fieldStartOffsetRel + tStart + fieldSlotsLength; + nkcs[k].normalize(array, fieldStartOffset, fieldEndOffsetRel - fieldStartOffsetRel, tPointers, + keyPos); + keyPos += normalizedKeyLength[k]; + } } } if (tupleCount > 0) { @@ -164,9 +220,9 @@ public abstract class AbstractFrameSorter implements IFrameSorter { int limit = Math.min(tupleCount, outputLimit); int io = 0; for (int ptr = 0; ptr < limit; ++ptr) { - int i = tPointers[ptr * PTR_SIZE + ID_FRAMEID]; - int tStart = tPointers[ptr * PTR_SIZE + ID_TUPLE_START]; - int tEnd = tPointers[ptr * PTR_SIZE + ID_TUPLE_END]; + int i = tPointers[ptr * ptrSize + ID_FRAME_ID]; + int tStart = tPointers[ptr * ptrSize + ID_TUPLE_START]; + int tEnd = tPointers[ptr * ptrSize + ID_TUPLE_END]; bufferManager.getFrame(i, info); inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength()); int flushed = FrameUtils.appendToWriter(writer, outputAppender, inputTupleAccessor, tStart, tEnd); @@ -185,19 +241,23 @@ public abstract class AbstractFrameSorter implements IFrameSorter { } protected final int compare(int tp1, int tp2) throws HyracksDataException { - int i1 = tPointers[tp1 * 4 + ID_FRAMEID]; - int j1 = tPointers[tp1 * 4 + ID_TUPLE_START]; - int v1 = tPointers[tp1 * 4 + ID_NORMAL_KEY]; - - int tp2i = tPointers[tp2 * 4 + ID_FRAMEID]; - int tp2j = tPointers[tp2 * 4 + ID_TUPLE_START]; - int tp2v = tPointers[tp2 * 4 + ID_NORMAL_KEY]; + return compare(tPointers, tp1, tPointers, tp2); + } - if (v1 != tp2v) { - return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1; + protected final int compare(int[] tPointers1, int tp1, int[] tPointers2, int tp2) throws HyracksDataException { + if (nkcs != null) { + int cmpNormalizedKey = compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2, + tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength); + if (cmpNormalizedKey != 0 || normalizedKeysDecisive) { + return cmpNormalizedKey; + } } - int i2 = tp2i; - int j2 = tp2j; + + int i1 = tPointers1[tp1 * ptrSize + ID_FRAME_ID]; + int j1 = tPointers1[tp1 * ptrSize + ID_TUPLE_START]; + int i2 = tPointers2[tp2 * ptrSize + ID_FRAME_ID]; + int j2 = tPointers2[tp2 * ptrSize + ID_TUPLE_START]; + bufferManager.getFrame(i1, info); byte[] b1 = info.getBuffer().array(); inputTupleAccessor.reset(info.getBuffer(), info.getStartOffset(), info.getLength()); @@ -223,6 +283,43 @@ public abstract class AbstractFrameSorter implements IFrameSorter { return 0; } + public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) { + for (int i = 0; i < length; i++) { + int key1 = keys1[start1 + i]; + int key2 = keys2[start2 + i]; + if (key1 != key2) { + return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1; + } + } + return 0; + } + + public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) { + if (keyNormalizerFactories == null) { + return 0; + } + for (int i = 0; i < keyNormalizerFactories.length; i++) { + if (!keyNormalizerFactories[i].isDecisive()) { + return i; + } + } + return keyNormalizerFactories.length; + } + + protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) { + System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize); + System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize); + System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize); + } + + protected void copy(int src[], int srcPos, int dest[], int destPos) { + System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize); + } + + protected void copy(int src[], int srcPos, int dest[], int destPos, int n) { + System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize); + } + @Override public void close() { tupleCount = 0; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java index 1cd5fc3..602157f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java @@ -56,17 +56,17 @@ public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorD protected static final int MERGE_ACTIVITY_ID = 1; protected final int[] sortFields; - protected final INormalizedKeyComputerFactory firstKeyNormalizerFactory; + protected final INormalizedKeyComputerFactory[] keyNormalizerFactories; protected final IBinaryComparatorFactory[] comparatorFactories; protected final int framesLimit; public AbstractSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { super(spec, 1, 1); this.framesLimit = framesLimit; this.sortFields = sortFields; - this.firstKeyNormalizerFactory = firstKeyNormalizerFactory; + this.keyNormalizerFactories = keyNormalizerFactories; this.comparatorFactories = comparatorFactories; outRecDescs[0] = recordDescriptor; } @@ -174,8 +174,8 @@ public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorD for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); } - INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null - : firstKeyNormalizerFactory.createNormalizedKeyComputer(); + INormalizedKeyComputer nmkComputer = keyNormalizerFactories == null ? null + : keyNormalizerFactories[0].createNormalizedKeyComputer(); AbstractExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter, runs, comparators, nmkComputer, framesLimit); merger.process(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java index 1b66ccf..b58d4c7 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java @@ -43,21 +43,31 @@ public class ExternalSortOperatorDescriptor extends AbstractSorterOperatorDescri private final int outputLimit; public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, Algorithm alg) { - this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg, + this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor, alg, EnumFreeSlotPolicy.LAST_FIT); } public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { - this(spec, framesLimit, sortFields, null, comparatorFactories, recordDescriptor); + this(spec, framesLimit, sortFields, (INormalizedKeyComputerFactory[]) null, comparatorFactories, + recordDescriptor); } public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { - this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, + this(spec, framesLimit, sortFields, + firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory } + : null, + comparatorFactories, recordDescriptor, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT); + } + + public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, + RecordDescriptor recordDescriptor) { + this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT); } @@ -69,7 +79,7 @@ public class ExternalSortOperatorDescriptor extends AbstractSorterOperatorDescri @Override protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider) throws HyracksDataException { - return new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, + return new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit); } }; @@ -92,16 +102,16 @@ public class ExternalSortOperatorDescriptor extends AbstractSorterOperatorDescri } public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy) { - this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg, - policy, Integer.MAX_VALUE); + this(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor, alg, policy, + Integer.MAX_VALUE); } public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) { - super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor); + super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor); if (framesLimit <= 1) { throw new IllegalStateException();// minimum of 2 fames (1 in,1 out) } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java index b451b1c..785b94e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java @@ -31,32 +31,32 @@ import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy; public class ExternalSortRunGenerator extends AbstractExternalSortRunGenerator { public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException { - this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, - EnumFreeSlotPolicy.LAST_FIT, framesLimit); + this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, EnumFreeSlotPolicy.LAST_FIT, + framesLimit); } public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit) - throws HyracksDataException { - this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit, + throws HyracksDataException { + this(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit, Integer.MAX_VALUE); } public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit) - throws HyracksDataException { - super(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit, + throws HyracksDataException { + super(ctx, sortFields, keyNormalizerFactories, comparatorFactories, recordDesc, alg, policy, framesLimit, outputLimit); } @Override protected RunFileWriter getRunFileWriter() throws HyracksDataException { - FileReference file = ctx.getJobletContext() - .createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName()); + FileReference file = + ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName()); return new RunFileWriter(file, ctx.getIoManager()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java index ed28560..260b665 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java @@ -23,24 +23,27 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; public class FrameSorterMergeSort extends AbstractFrameSorter { private int[] tPointersTemp; - public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor) throws HyracksDataException { - this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, - Integer.MAX_VALUE); + public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, + int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) + throws HyracksDataException { + this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories, + recordDescriptor, Integer.MAX_VALUE); } - public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException { - super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, - outputLimit); + public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, + int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit) + throws HyracksDataException { + super(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories, + recordDescriptor, outputLimit); } @Override @@ -52,6 +55,11 @@ public class FrameSorterMergeSort extends AbstractFrameSorter { } @Override + protected long getRequiredMemory(FrameTupleAccessor frameAccessor) { + return super.getRequiredMemory(frameAccessor) + ptrSize * frameAccessor.getTupleCount() * Integer.BYTES; + } + + @Override public void close() { super.close(); tPointersTemp = null; @@ -68,7 +76,7 @@ public class FrameSorterMergeSort extends AbstractFrameSorter { if (next < end) { merge(i, next, step, Math.min(step, end - next)); } else { - System.arraycopy(tPointers, i * 4, tPointersTemp, i * 4, (end - i) * 4); + copy(tPointers, i, tPointersTemp, i, end - i); } } /** prepare next phase merge */ @@ -91,29 +99,21 @@ public class FrameSorterMergeSort extends AbstractFrameSorter { while (pos1 <= end1 && pos2 <= end2) { int cmp = compare(pos1, pos2); if (cmp <= 0) { - copy(pos1, targetPos); + copy(tPointers, pos1, tPointersTemp, targetPos); pos1++; } else { - copy(pos2, targetPos); + copy(tPointers, pos2, tPointersTemp, targetPos); pos2++; } targetPos++; } if (pos1 <= end1) { int rest = end1 - pos1 + 1; - System.arraycopy(tPointers, pos1 * 4, tPointersTemp, targetPos * 4, rest * 4); + copy(tPointers, pos1, tPointersTemp, targetPos, rest); } if (pos2 <= end2) { int rest = end2 - pos2 + 1; - System.arraycopy(tPointers, pos2 * 4, tPointersTemp, targetPos * 4, rest * 4); + copy(tPointers, pos2, tPointersTemp, targetPos, rest); } } - - private void copy(int src, int dest) { - tPointersTemp[dest * 4 + ID_FRAMEID] = tPointers[src * 4 + ID_FRAMEID]; - tPointersTemp[dest * 4 + ID_TUPLE_START] = tPointers[src * 4 + ID_TUPLE_START]; - tPointersTemp[dest * 4 + ID_TUPLE_END] = tPointers[src * 4 + ID_TUPLE_END]; - tPointersTemp[dest * 4 + ID_NORMAL_KEY] = tPointers[src * 4 + ID_NORMAL_KEY]; - } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java index cf864f6..486bc7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java @@ -27,18 +27,20 @@ import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; public class FrameSorterQuickSort extends AbstractFrameSorter { - public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor) throws HyracksDataException { - this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, - Integer.MAX_VALUE); + public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, + int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) + throws HyracksDataException { + this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories, + recordDescriptor, Integer.MAX_VALUE); } - public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, - RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException { - super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, - outputLimit); + public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, + int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, + IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit) + throws HyracksDataException { + super(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories, + recordDescriptor, outputLimit); } @Override @@ -60,7 +62,7 @@ public class FrameSorterQuickSort extends AbstractFrameSorter { break; } if (cmp == 0) { - swap(tPointers, a++, b); + swap(tPointers, a++, tPointers, b); } ++b; } @@ -70,13 +72,13 @@ public class FrameSorterQuickSort extends AbstractFrameSorter { break; } if (cmp == 0) { - swap(tPointers, c, d--); + swap(tPointers, c, tPointers, d--); } --c; } if (b > c) break; - swap(tPointers, b++, c--); + swap(tPointers, b++, tPointers, c--); } int s; @@ -94,17 +96,9 @@ public class FrameSorterQuickSort extends AbstractFrameSorter { } } - private void swap(int x[], int a, int b) { - for (int i = 0; i < 4; ++i) { - int t = x[a * 4 + i]; - x[a * 4 + i] = x[b * 4 + i]; - x[b * 4 + i] = t; - } - } - private void vecswap(int x[], int a, int b, int n) { for (int i = 0; i < n; i++, a++, b++) { - swap(x, a, b); + swap(x, a, x, b); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java index a058624..1578975 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java @@ -31,31 +31,31 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.io.RunFileWriter; -import org.apache.hyracks.dataflow.std.buffermanager.IFramePool; import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager; -import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; +import org.apache.hyracks.dataflow.std.buffermanager.IFramePool; import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager; +import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; public class HeapSortRunGenerator extends AbstractSortRunGenerator { protected final IHyracksTaskContext ctx; protected final int frameLimit; protected final int topK; protected final int[] sortFields; - protected final INormalizedKeyComputerFactory nmkFactory; + protected final INormalizedKeyComputerFactory[] nmkFactories; protected final IBinaryComparatorFactory[] comparatorFactories; protected final RecordDescriptor recordDescriptor; protected ITupleSorter tupleSorter; protected IFrameTupleAccessor inAccessor; public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { super(); this.ctx = ctx; this.frameLimit = frameLimit; this.topK = topK; this.sortFields = sortFields; - this.nmkFactory = firstKeyNormalizerFactory; + this.nmkFactories = keyNormalizerFactories; this.comparatorFactories = comparatorFactories; this.inAccessor = new FrameTupleAccessor(recordDescriptor); this.recordDescriptor = recordDescriptor; @@ -64,8 +64,9 @@ public class HeapSortRunGenerator extends AbstractSortRunGenerator { @Override public void open() throws HyracksDataException { IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize()); - IDeletableTupleBufferManager bufferManager = new VariableDeletableTupleMemoryManager(framePool, recordDescriptor); - tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactory, comparatorFactories); + IDeletableTupleBufferManager bufferManager = + new VariableDeletableTupleMemoryManager(framePool, recordDescriptor); + tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactories, comparatorFactories); super.open(); } @@ -76,8 +77,8 @@ public class HeapSortRunGenerator extends AbstractSortRunGenerator { @Override protected RunFileWriter getRunFileWriter() throws HyracksDataException { - FileReference file = ctx.getJobletContext() - .createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName()); + FileReference file = + ctx.getJobletContext().createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName()); return new RunFileWriter(file, ctx.getIoManager()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java index 4311128..80b36ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java @@ -43,9 +43,9 @@ public class HybridTopKSortRunGenerator extends HeapSortRunGenerator { private int tupleSorterFlushedTimes = 0; public HybridTopKSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { - super(ctx, frameLimit, topK, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor); + super(ctx, frameLimit, topK, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor); } @Override @@ -60,8 +60,8 @@ public class HybridTopKSortRunGenerator extends HeapSortRunGenerator { @Override protected RunFileWriter getRunFileWriter() throws HyracksDataException { - FileReference file = ctx.getJobletContext() - .createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName()); + FileReference file = + ctx.getJobletContext().createManagedWorkspaceFile(HybridTopKSortRunGenerator.class.getSimpleName()); return new RunFileWriter(file, ctx.getIoManager()); } @@ -101,8 +101,8 @@ public class HybridTopKSortRunGenerator extends HeapSortRunGenerator { new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize()), FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.BIGGEST_FIT, frameLimit - 1)); - frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, nmkFactory, comparatorFactories, - recordDescriptor, topK); + frameSorter = new FrameSorterMergeSort(ctx, bufferManager, frameLimit - 1, sortFields, nmkFactories, + comparatorFactories, recordDescriptor, topK); if (LOG.isLoggable(Level.FINE)) { LOG.fine("create frameSorter"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java index 996101b..adc0d5c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java @@ -53,8 +53,8 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor { private static final int MERGE_ACTIVITY_ID = 1; private final int[] sortFields; - private INormalizedKeyComputerFactory firstKeyNormalizerFactory; - private IBinaryComparatorFactory[] comparatorFactories; + private final INormalizedKeyComputerFactory[] keyNormalizerFactories; + private final IBinaryComparatorFactory[] comparatorFactories; public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { @@ -62,11 +62,11 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor { } public InMemorySortOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] sortFields, - INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { super(spec, 1, 1); this.sortFields = sortFields; - this.firstKeyNormalizerFactory = firstKeyNormalizerFactory; + this.keyNormalizerFactories = keyNormalizerFactories; this.comparatorFactories = comparatorFactories; outRecDescs[0] = recordDescriptor; } @@ -123,8 +123,9 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor { new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT)); - state.frameSorter = new FrameSorterMergeSort(ctx, frameBufferManager, sortFields, - firstKeyNormalizerFactory, comparatorFactories, outRecDescs[0]); + state.frameSorter = + new FrameSorterMergeSort(ctx, frameBufferManager, VariableFramePool.UNLIMITED_MEMORY, + sortFields, keyNormalizerFactories, comparatorFactories, outRecDescs[0]); state.frameSorter.reset(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java index 988eea3..a90d48f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java @@ -41,7 +41,16 @@ public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescript public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { - super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor); + this(spec, framesLimit, topK, sortFields, + firstKeyNormalizerFactory != null ? new INormalizedKeyComputerFactory[] { firstKeyNormalizerFactory } + : null, + comparatorFactories, recordDescriptor); + } + + public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields, + INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, + RecordDescriptor recordDescriptor) { + super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor); this.topK = topK; } @@ -53,7 +62,7 @@ public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescript @Override protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider) { - return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, firstKeyNormalizerFactory, + return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, keyNormalizerFactories, comparatorFactories, outRecDescs[0]); }