Ali Alsuliman has submitted this change and it was merged. ( https://asterix-gerrit.ics.uci.edu/3363 )
Change subject: [ASTERIXDB-2552][RT] Refactor runs generator and merger ...................................................................... [ASTERIXDB-2552][RT] Refactor runs generator and merger - user model changes: no - storage format changes: no - interface changes: no Details: Moved the writer and sorter out of the merger to allow micro external sort to use the run generator and merger plus minor clean-ups. Change-Id: Idda31c92cbcddba5ebef8bbbf7855b9c8293dd51 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3363 Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Dmitry Lychagin <dmitry.lycha...@couchbase.com> --- M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java 14 files changed, 209 insertions(+), 234 deletions(-) Approvals: Jenkins: Verified; ; Verified Anon. E. Moose (1000171): Dmitry Lychagin: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java index 364f1c7..cb2da4c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java @@ -32,7 +32,6 @@ import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter; import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger; -import org.apache.hyracks.dataflow.std.sort.ISorter; /** * Group-by aggregation is pushed into multi-pass merge of external sort. @@ -44,28 +43,23 @@ private final RecordDescriptor inputRecordDesc; private final RecordDescriptor partialAggRecordDesc; private final RecordDescriptor outRecordDesc; - private final int[] groupFields; private final IAggregatorDescriptorFactory mergeAggregatorFactory; private final IAggregatorDescriptorFactory partialAggregatorFactory; private final boolean localSide; - private final int[] mergeSortFields; private final int[] mergeGroupFields; private final IBinaryComparator[] groupByComparators; - public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, ISorter frameSorter, List<GeneratedRunFileReader> runs, - int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc, - RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter writer, int[] groupFields, - INormalizedKeyComputer nmk, IBinaryComparator[] comparators, + public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, int[] sortFields, + RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc, RecordDescriptor outRecordDesc, + int framesLimit, int[] groupFields, INormalizedKeyComputer nmk, IBinaryComparator[] comparators, IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory aggregatorFactory, boolean localStage) { - super(ctx, frameSorter, runs, comparators, nmk, partialAggRecordDesc, framesLimit, writer); - + super(ctx, runs, comparators, nmk, partialAggRecordDesc, framesLimit); this.inputRecordDesc = inRecordDesc; this.partialAggRecordDesc = partialAggRecordDesc; this.outRecordDesc = outRecordDesc; - this.groupFields = groupFields; this.mergeAggregatorFactory = aggregatorFactory; this.partialAggregatorFactory = partialAggregatorFactory; @@ -93,11 +87,10 @@ } @Override - protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException { + public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException { IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory; - boolean outputPartial = false; return new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregatorFactory, inputRecordDesc, - outRecordDesc, nextWriter, outputPartial); + outRecordDesc, nextWriter, false); } @Override @@ -110,16 +103,14 @@ protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter) throws HyracksDataException { IAggregatorDescriptorFactory aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory; - boolean outputPartial = true; return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregatorFactory, - partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter, outputPartial); + partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter, true); } @Override - protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException { - boolean outputPartial = false; + public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException { return new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, mergeAggregatorFactory, - partialAggRecordDesc, outRecordDesc, nextWriter, outputPartial); + partialAggRecordDesc, outRecordDesc, nextWriter, false); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java index 23e47f0..7ca01a5 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java @@ -20,7 +20,6 @@ import java.util.List; -import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; @@ -37,7 +36,6 @@ import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator; import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.Algorithm; -import org.apache.hyracks.dataflow.std.sort.ISorter; /** * This Operator pushes group-by aggregation into the external sort. @@ -158,13 +156,12 @@ @Override protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, - List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, - INormalizedKeyComputer nmkComputer, int necessaryFrames) { - return new ExternalSortGroupByRunMerger(ctx, sorter, runs, sortFields, + IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs, + IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) { + return new ExternalSortGroupByRunMerger(ctx, runs, sortFields, recordDescProvider.getInputRecordDescriptor(new ActivityId(odId, SORT_ACTIVITY_ID), 0), - partialAggRecordDesc, outputRecordDesc, necessaryFrames, writer, groupFields, nmkComputer, - comparators, partialAggregatorFactory, mergeAggregatorFactory, !finalStage); + partialAggRecordDesc, outputRecordDesc, necessaryFrames, groupFields, nmkComputer, comparators, + partialAggregatorFactory, mergeAggregatorFactory, !finalStage); } }; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java index 0bead97..e860288 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java @@ -43,8 +43,6 @@ public abstract class AbstractExternalSortRunMerger { protected final IHyracksTaskContext ctx; - protected final IFrameWriter writer; - private final List<GeneratedRunFileReader> runs; private final BitSet currentGenerationRunAvailable; private final IBinaryComparator[] comparators; @@ -54,136 +52,93 @@ private final int topK; private List<GroupVSizeFrame> inFrames; private VSizeFrame outputFrame; - private ISorter sorter; - private static final Logger LOGGER = LogManager.getLogger(); - public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs, + public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc, - int framesLimit, IFrameWriter writer) { - this(ctx, sorter, runs, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE, writer); + int framesLimit) { + this(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit, Integer.MAX_VALUE); } - public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs, + AbstractExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc, - int framesLimit, int topK, IFrameWriter writer) { + int framesLimit, int topK) { this.ctx = ctx; - this.sorter = sorter; this.runs = new LinkedList<>(runs); this.currentGenerationRunAvailable = new BitSet(runs.size()); this.comparators = comparators; this.nmkComputer = nmkComputer; this.recordDesc = recordDesc; this.framesLimit = framesLimit; - this.writer = writer; this.topK = topK; } - public void process() throws HyracksDataException { - IFrameWriter finalWriter = null; + public void process(IFrameWriter finalWriter) throws HyracksDataException { try { - if (runs.isEmpty()) { - finalWriter = prepareSkipMergingFinalResultWriter(writer); - finalWriter.open(); - if (sorter != null) { - try { - if (sorter.hasRemaining()) { - sorter.flush(finalWriter); - } - } finally { - sorter.close(); - } - } - } else { - /** recycle sort buffer */ - if (sorter != null) { - sorter.close(); - } + int maxMergeWidth = framesLimit - 1; + inFrames = new ArrayList<>(maxMergeWidth); + outputFrame = new VSizeFrame(ctx); + List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth); + int stop = runs.size(); + currentGenerationRunAvailable.set(0, stop); + int numberOfPasses = 1; + while (true) { + int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns, + currentGenerationRunAvailable, stop); + prepareFrames(unUsed, inFrames, partialRuns); - finalWriter = prepareFinalMergeResultWriter(writer); - finalWriter.open(); - - int maxMergeWidth = framesLimit - 1; - - inFrames = new ArrayList<>(maxMergeWidth); - outputFrame = new VSizeFrame(ctx); - List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth); - - int stop = runs.size(); - currentGenerationRunAvailable.set(0, stop); - int numberOfPasses = 1; - while (true) { - - int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns, - currentGenerationRunAvailable, stop); - prepareFrames(unUsed, inFrames, partialRuns); - - if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) { - GeneratedRunFileReader reader; - if (partialRuns.size() == 1) { - if (!currentGenerationRunAvailable.isEmpty()) { - throw new HyracksDataException( - "The record is too big to put into the merging frame, please" - + " allocate more sorting memory"); - } else { - reader = partialRuns.get(0); - } - + if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) { + GeneratedRunFileReader reader; + if (partialRuns.size() == 1) { + if (!currentGenerationRunAvailable.isEmpty()) { + throw new HyracksDataException("The record is too big to put into the merging frame, please" + + " allocate more sorting memory"); } else { - RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile(); - IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter); - - try { - mergeResultWriter.open(); - merge(mergeResultWriter, partialRuns); - } catch (Throwable t) { - mergeResultWriter.fail(); - throw t; - } finally { - mergeResultWriter.close(); - } - reader = mergeFileWriter.createReader(); - } - runs.add(reader); - - if (currentGenerationRunAvailable.isEmpty()) { - numberOfPasses++; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("generated runs:" + stop); - } - runs.subList(0, stop).clear(); - currentGenerationRunAvailable.clear(); - currentGenerationRunAvailable.set(0, runs.size()); - stop = runs.size(); + reader = partialRuns.get(0); } } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("final runs: {}", stop); - LOGGER.debug("number of passes: " + numberOfPasses); + RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile(); + IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter); + + try { + mergeResultWriter.open(); + merge(mergeResultWriter, partialRuns); + } catch (Throwable t) { + mergeResultWriter.fail(); + throw t; + } finally { + mergeResultWriter.close(); } - merge(finalWriter, partialRuns); - break; + reader = mergeFileWriter.createReader(); } - } - } - } catch (Exception e) { - if (finalWriter != null) { - finalWriter.fail(); - } - throw HyracksDataException.create(e); - } finally { - try { - if (finalWriter != null) { - finalWriter.close(); - } - } finally { - for (RunFileReader reader : runs) { - try { - reader.close(); // close is idempotent. - } catch (Exception e) { - if (LOGGER.isWarnEnabled()) { - LOGGER.log(Level.WARN, e.getMessage(), e); + runs.add(reader); + + if (currentGenerationRunAvailable.isEmpty()) { + numberOfPasses++; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("generated runs:" + stop); } + runs.subList(0, stop).clear(); + currentGenerationRunAvailable.clear(); + currentGenerationRunAvailable.set(0, runs.size()); + stop = runs.size(); + } + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("final runs: {}", stop); + LOGGER.debug("number of passes: " + numberOfPasses); + } + merge(finalWriter, partialRuns); + break; + } + } + } finally { + for (RunFileReader reader : runs) { + try { + reader.close(); // close is idempotent. + } catch (Exception e) { + if (LOGGER.isWarnEnabled()) { + LOGGER.log(Level.WARN, e.getMessage(), e); } } } @@ -237,18 +192,6 @@ } } - protected abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) - throws HyracksDataException; - - protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException; - - protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter) - throws HyracksDataException; - - protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException; - - protected abstract int[] getSortFields(); - private void merge(IFrameWriter writer, List<GeneratedRunFileReader> partialRuns) throws HyracksDataException { RunMergingFrameReader merger = new RunMergingFrameReader(ctx, partialRuns, inFrames, getSortFields(), comparators, nmkComputer, recordDesc, topK); @@ -267,4 +210,16 @@ } } + public abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) + throws HyracksDataException; + + protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException; + + protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter) + throws HyracksDataException; + + public abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException; + + protected abstract int[] getSortFields(); + } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java index 980ad9b..74223d8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java @@ -79,14 +79,6 @@ private final BufferInfo info = new BufferInfo(null, -1, -1); public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, - int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, - IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) - throws HyracksDataException { - this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories, - recordDescriptor, Integer.MAX_VALUE); - } - - public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, int[] sortFields, INormalizedKeyComputerFactory[] normalizedKeyComputerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException { @@ -286,23 +278,10 @@ return 0; } - protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) { - System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize); - System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize); - System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize); - } - - protected void copy(int src[], int srcPos, int dest[], int destPos) { - System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize); - } - - protected void copy(int src[], int srcPos, int dest[], int destPos, int n) { - System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize); - } - @Override public void close() { tupleCount = 0; + totalMemoryUsed = 0; bufferManager.close(); tPointers = null; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java index 3c11669..3f0b7c7 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java @@ -28,13 +28,18 @@ import org.apache.hyracks.dataflow.common.io.RunFileWriter; public abstract class AbstractSortRunGenerator implements IRunGenerator { - protected final List<GeneratedRunFileReader> generatedRunFileReaders; + + private final List<GeneratedRunFileReader> generatedRunFileReaders; public AbstractSortRunGenerator() { generatedRunFileReaders = new LinkedList<>(); } - abstract public ISorter getSorter() throws HyracksDataException; + /** + * Null could be returned. Caller should check if it not null. + * @return the sorter associated with the run generator or null if there is no sorter. + */ + abstract public ISorter getSorter(); @Override public void open() throws HyracksDataException { @@ -43,9 +48,10 @@ @Override public void close() throws HyracksDataException { - if (getSorter().hasRemaining()) { + ISorter sorter = getSorter(); + if (sorter != null && sorter.hasRemaining()) { if (generatedRunFileReaders.size() <= 0) { - getSorter().sort(); + sorter.sort(); } else { flushFramesToRun(); } @@ -56,13 +62,15 @@ abstract protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException; - protected void flushFramesToRun() throws HyracksDataException { - getSorter().sort(); + // assumption is that there will always be a sorter (i.e. sorter is not null) + void flushFramesToRun() throws HyracksDataException { + ISorter sorter = getSorter(); + sorter.sort(); RunFileWriter runWriter = getRunFileWriter(); IFrameWriter flushWriter = getFlushableFrameWriter(runWriter); flushWriter.open(); try { - getSorter().flush(flushWriter); + sorter.flush(flushWriter); } catch (Exception e) { flushWriter.fail(); throw e; @@ -70,7 +78,7 @@ flushWriter.close(); } generatedRunFileReaders.add(runWriter.createDeleteOnCloseReader()); - getSorter().reset(); + sorter.reset(); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java index 406703e..6abc064 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java @@ -49,12 +49,9 @@ public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorDescriptor { private static final Logger LOGGER = LogManager.getLogger(); - private static final long serialVersionUID = 1L; - protected static final int SORT_ACTIVITY_ID = 0; protected static final int MERGE_ACTIVITY_ID = 1; - protected final int[] sortFields; protected final INormalizedKeyComputerFactory[] keyNormalizerFactories; protected final IBinaryComparatorFactory[] comparatorFactories; @@ -90,10 +87,10 @@ } public static class SortTaskState extends AbstractStateObject { - public List<GeneratedRunFileReader> generatedRunFileReaders; - public ISorter sorter; + List<GeneratedRunFileReader> generatedRunFileReaders; + ISorter sorter; - public SortTaskState(JobId jobId, TaskId taskId) { + SortTaskState(JobId jobId, TaskId taskId) { super(jobId, taskId); } } @@ -101,7 +98,7 @@ protected abstract class SortActivity extends AbstractActivityNode { private static final long serialVersionUID = 1L; - public SortActivity(ActivityId id) { + protected SortActivity(ActivityId id) { super(id); } @@ -111,7 +108,7 @@ @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { - IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() { + return new AbstractUnaryInputSinkOperatorNodePushable() { private AbstractSortRunGenerator runGen; @Override @@ -143,26 +140,24 @@ runGen.fail(); } }; - return op; } } protected abstract class MergeActivity extends AbstractActivityNode { private static final long serialVersionUID = 1L; - public MergeActivity(ActivityId id) { + protected MergeActivity(ActivityId id) { super(id); } protected abstract AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, - List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, - int necessaryFrames); + IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs, + IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames); @Override public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) { - IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() { + return new AbstractUnaryOutputSourceOperatorNodePushable() { @Override public void initialize() throws HyracksDataException { @@ -176,13 +171,39 @@ } INormalizedKeyComputer nmkComputer = keyNormalizerFactories == null ? null : keyNormalizerFactories[0].createNormalizedKeyComputer(); - AbstractExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter, - runs, comparators, nmkComputer, framesLimit); - merger.process(); + AbstractExternalSortRunMerger merger = + getSortRunMerger(ctx, recordDescProvider, runs, comparators, nmkComputer, framesLimit); + IFrameWriter wrappingWriter = null; + try { + if (runs.isEmpty()) { + wrappingWriter = merger.prepareSkipMergingFinalResultWriter(writer); + wrappingWriter.open(); + if (sorter.hasRemaining()) { + sorter.flush(wrappingWriter); + } + } else { + // eagerly close the sorter here to release memory rather than in finally + sorter.close(); + sorter = null; + wrappingWriter = merger.prepareFinalMergeResultWriter(writer); + wrappingWriter.open(); + merger.process(wrappingWriter); + } + } catch (Throwable e) { + if (wrappingWriter != null) { + wrappingWriter.fail(); + } + throw HyracksDataException.create(e); + } finally { + if (sorter != null) { + sorter.close(); + } + if (wrappingWriter != null) { + wrappingWriter.close(); + } + } } }; - return op; } } - } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java index b58d4c7..8b80a26 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java @@ -20,7 +20,6 @@ import java.util.List; -import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; @@ -91,12 +90,11 @@ private static final long serialVersionUID = 1L; @Override - protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, - List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, - INormalizedKeyComputer nmkComputer, int necessaryFrames) { - return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators, nmkComputer, - outRecDescs[0], necessaryFrames, outputLimit, writer); + protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs, + IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) { + return new ExternalSortRunMerger(ctx, runs, sortFields, comparators, nmkComputer, outRecDescs[0], + necessaryFrames, outputLimit); } }; } @@ -113,7 +111,7 @@ RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) { super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor); if (framesLimit <= 1) { - throw new IllegalStateException();// minimum of 2 fames (1 in,1 out) + throw new IllegalStateException();// minimum of 2 frames (1 in,1 out) } this.alg = alg; this.policy = policy; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java index 2b985b9..fb32b0f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java @@ -34,15 +34,15 @@ private final int[] sortFields; - public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<GeneratedRunFileReader> runs, - int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, - RecordDescriptor recordDesc, int framesLimit, int topK, IFrameWriter writer) { - super(ctx, sorter, runs, comparators, nmkComputer, recordDesc, framesLimit, topK, writer); + public ExternalSortRunMerger(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs, int[] sortFields, + IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc, + int framesLimit, int topK) { + super(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit, topK); this.sortFields = sortFields; } @Override - protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException { + public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException { return nextWriter; } @@ -59,7 +59,7 @@ } @Override - protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException { + public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException { return nextWriter; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java index 260b665..92b7d7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java @@ -116,4 +116,12 @@ copy(tPointers, pos2, tPointersTemp, targetPos, rest); } } + + private void copy(int src[], int srcPos, int dest[], int destPos) { + System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, ptrSize); + } + + private void copy(int src[], int srcPos, int dest[], int destPos, int n) { + System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n * ptrSize); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java index 486bc7c..ddef0d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java @@ -25,17 +25,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; -public class FrameSorterQuickSort extends AbstractFrameSorter { +class FrameSorterQuickSort extends AbstractFrameSorter { - public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, - int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, - IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) - throws HyracksDataException { - this(ctx, bufferManager, maxSortFrames, sortFields, keyNormalizerFactories, comparatorFactories, - recordDescriptor, Integer.MAX_VALUE); - } - - public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, + FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int maxSortFrames, int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException { @@ -48,7 +40,7 @@ sort(0, tupleCount); } - void sort(int offset, int length) throws HyracksDataException { + private void sort(int offset, int length) throws HyracksDataException { int m = offset + (length >> 1); int a = offset; @@ -102,4 +94,9 @@ } } + private void swap(int pointers1[], int pos1, int pointers2[], int pos2) { + System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize); + System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize); + System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java index 1578975..d8431f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java @@ -45,7 +45,7 @@ protected final IBinaryComparatorFactory[] comparatorFactories; protected final RecordDescriptor recordDescriptor; protected ITupleSorter tupleSorter; - protected IFrameTupleAccessor inAccessor; + protected final IFrameTupleAccessor inAccessor; public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, @@ -71,7 +71,7 @@ } @Override - public ISorter getSorter() throws HyracksDataException { + public ISorter getSorter() { return tupleSorter; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java index 180ecbc..3b07017 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java @@ -49,7 +49,7 @@ } @Override - public ISorter getSorter() throws HyracksDataException { + public ISorter getSorter() { if (tupleSorter != null) { return tupleSorter; } else if (frameSorter != null) { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java index dea770a..b29057f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java @@ -21,7 +21,6 @@ import java.util.List; -import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; @@ -47,7 +46,7 @@ comparatorFactories, recordDescriptor); } - public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields, + private TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) { super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, recordDescriptor); @@ -75,12 +74,11 @@ private static final long serialVersionUID = 1L; @Override - protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx, - IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, - List<GeneratedRunFileReader> runs, IBinaryComparator[] comparators, - INormalizedKeyComputer nmkComputer, int necessaryFrames) { - return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators, nmkComputer, - outRecDescs[0], necessaryFrames, topK, writer); + protected AbstractExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, List<GeneratedRunFileReader> runs, + IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames) { + return new ExternalSortRunMerger(ctx, runs, sortFields, comparators, nmkComputer, outRecDescs[0], + necessaryFrames, topK); } }; } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java index bcf661f..8fa570c 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java @@ -57,10 +57,33 @@ } INormalizedKeyComputer nmkComputer = normalizedKeyComputerFactory == null ? null : normalizedKeyComputerFactory.createNormalizedKeyComputer(); - AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, sorter, runs, keyFields, - inRecordDesc, outputRec, outputRec, numFrames, writer, keyFields, nmkComputer, comparators, + AbstractExternalSortRunMerger merger = new ExternalSortGroupByRunMerger(ctx, runs, keyFields, + inRecordDesc, outputRec, outputRec, numFrames, keyFields, nmkComputer, comparators, partialAggrInState, finalAggrInState, true); - merger.process(); + IFrameWriter wrappingWriter = null; + try { + if (runs.isEmpty()) { + wrappingWriter = merger.prepareSkipMergingFinalResultWriter(writer); + wrappingWriter.open(); + if (sorter.hasRemaining()) { + sorter.flush(wrappingWriter); + } + } else { + wrappingWriter = merger.prepareFinalMergeResultWriter(writer); + wrappingWriter.open(); + merger.process(wrappingWriter); + } + } catch (Throwable e) { + if (wrappingWriter != null) { + wrappingWriter.fail(); + } + throw HyracksDataException.create(e); + } finally { + sorter.close(); + if (wrappingWriter != null) { + wrappingWriter.close(); + } + } } }; } -- To view, visit https://asterix-gerrit.ics.uci.edu/3363 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: Idda31c92cbcddba5ebef8bbbf7855b9c8293dd51 Gerrit-Change-Number: 3363 Gerrit-PatchSet: 5 Gerrit-Owner: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Anon. E. Moose (1000171) Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org>