Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3363
Change subject: [ASTERIXDB-2552][RT] Refactor runs generator and merger
......................................................................
[ASTERIXDB-2552][RT] Refactor runs generator and merger
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Moved the writer and sorter out of the merger to allow
micro external sort to use the run generator and merger
plus minor clean-ups.
Change-Id: Idda31c92cbcddba5ebef8bbbf7855b9c8293dd51
---
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
13 files changed, 215 insertions(+), 270 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/63/3363/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
index 364f1c7..cb2da4c 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -32,7 +32,6 @@
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
/**
* Group-by aggregation is pushed into multi-pass merge of external sort.
@@ -44,28 +43,23 @@
private final RecordDescriptor inputRecordDesc;
private final RecordDescriptor partialAggRecordDesc;
private final RecordDescriptor outRecordDesc;
-
private final int[] groupFields;
private final IAggregatorDescriptorFactory mergeAggregatorFactory;
private final IAggregatorDescriptorFactory partialAggregatorFactory;
private final boolean localSide;
-
private final int[] mergeSortFields;
private final int[] mergeGroupFields;
private final IBinaryComparator[] groupByComparators;
- public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, ISorter
frameSorter, List<GeneratedRunFileReader> runs,
- int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor
partialAggRecordDesc,
- RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter
writer, int[] groupFields,
- INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
+ public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx,
List<GeneratedRunFileReader> runs, int[] sortFields,
+ RecordDescriptor inRecordDesc, RecordDescriptor
partialAggRecordDesc, RecordDescriptor outRecordDesc,
+ int framesLimit, int[] groupFields, INormalizedKeyComputer nmk,
IBinaryComparator[] comparators,
IAggregatorDescriptorFactory partialAggregatorFactory,
IAggregatorDescriptorFactory aggregatorFactory,
boolean localStage) {
- super(ctx, frameSorter, runs, comparators, nmk, partialAggRecordDesc,
framesLimit, writer);
-
+ super(ctx, runs, comparators, nmk, partialAggRecordDesc, framesLimit);
this.inputRecordDesc = inRecordDesc;
this.partialAggRecordDesc = partialAggRecordDesc;
this.outRecordDesc = outRecordDesc;
-
this.groupFields = groupFields;
this.mergeAggregatorFactory = aggregatorFactory;
this.partialAggregatorFactory = partialAggregatorFactory;
@@ -93,11 +87,10 @@
}
@Override
- protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter
nextWriter) throws HyracksDataException {
+ public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter
nextWriter) throws HyracksDataException {
IAggregatorDescriptorFactory aggregatorFactory = localSide ?
partialAggregatorFactory : mergeAggregatorFactory;
- boolean outputPartial = false;
return new PreclusteredGroupWriter(ctx, groupFields,
groupByComparators, aggregatorFactory, inputRecordDesc,
- outRecordDesc, nextWriter, outputPartial);
+ outRecordDesc, nextWriter, false);
}
@Override
@@ -110,16 +103,14 @@
protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter
mergeFileWriter)
throws HyracksDataException {
IAggregatorDescriptorFactory aggregatorFactory = localSide ?
mergeAggregatorFactory : partialAggregatorFactory;
- boolean outputPartial = true;
return new PreclusteredGroupWriter(ctx, mergeGroupFields,
groupByComparators, aggregatorFactory,
- partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter,
outputPartial);
+ partialAggRecordDesc, partialAggRecordDesc, mergeFileWriter,
true);
}
@Override
- protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter
nextWriter) throws HyracksDataException {
- boolean outputPartial = false;
+ public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter)
throws HyracksDataException {
return new PreclusteredGroupWriter(ctx, mergeGroupFields,
groupByComparators, mergeAggregatorFactory,
- partialAggRecordDesc, outRecordDesc, nextWriter,
outputPartial);
+ partialAggRecordDesc, outRecordDesc, nextWriter, false);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 23e47f0..7ca01a5 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -20,7 +20,6 @@
import java.util.List;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -37,7 +36,6 @@
import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
/**
* This Operator pushes group-by aggregation into the external sort.
@@ -158,13 +156,12 @@
@Override
protected AbstractExternalSortRunMerger
getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter
writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[]
comparators,
- INormalizedKeyComputer nmkComputer, int necessaryFrames) {
- return new ExternalSortGroupByRunMerger(ctx, sorter, runs,
sortFields,
+ IRecordDescriptorProvider recordDescProvider,
List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer
nmkComputer, int necessaryFrames) {
+ return new ExternalSortGroupByRunMerger(ctx, runs, sortFields,
recordDescProvider.getInputRecordDescriptor(new
ActivityId(odId, SORT_ACTIVITY_ID), 0),
- partialAggRecordDesc, outputRecordDesc,
necessaryFrames, writer, groupFields, nmkComputer,
- comparators, partialAggregatorFactory,
mergeAggregatorFactory, !finalStage);
+ partialAggRecordDesc, outputRecordDesc,
necessaryFrames, groupFields, nmkComputer, comparators,
+ partialAggregatorFactory, mergeAggregatorFactory,
!finalStage);
}
};
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 0bead97..e860288 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -43,8 +43,6 @@
public abstract class AbstractExternalSortRunMerger {
protected final IHyracksTaskContext ctx;
- protected final IFrameWriter writer;
-
private final List<GeneratedRunFileReader> runs;
private final BitSet currentGenerationRunAvailable;
private final IBinaryComparator[] comparators;
@@ -54,136 +52,93 @@
private final int topK;
private List<GroupVSizeFrame> inFrames;
private VSizeFrame outputFrame;
- private ISorter sorter;
-
private static final Logger LOGGER = LogManager.getLogger();
- public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter
sorter, List<GeneratedRunFileReader> runs,
+ public AbstractExternalSortRunMerger(IHyracksTaskContext ctx,
List<GeneratedRunFileReader> runs,
IBinaryComparator[] comparators, INormalizedKeyComputer
nmkComputer, RecordDescriptor recordDesc,
- int framesLimit, IFrameWriter writer) {
- this(ctx, sorter, runs, comparators, nmkComputer, recordDesc,
framesLimit, Integer.MAX_VALUE, writer);
+ int framesLimit) {
+ this(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit,
Integer.MAX_VALUE);
}
- public AbstractExternalSortRunMerger(IHyracksTaskContext ctx, ISorter
sorter, List<GeneratedRunFileReader> runs,
+ AbstractExternalSortRunMerger(IHyracksTaskContext ctx,
List<GeneratedRunFileReader> runs,
IBinaryComparator[] comparators, INormalizedKeyComputer
nmkComputer, RecordDescriptor recordDesc,
- int framesLimit, int topK, IFrameWriter writer) {
+ int framesLimit, int topK) {
this.ctx = ctx;
- this.sorter = sorter;
this.runs = new LinkedList<>(runs);
this.currentGenerationRunAvailable = new BitSet(runs.size());
this.comparators = comparators;
this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
- this.writer = writer;
this.topK = topK;
}
- public void process() throws HyracksDataException {
- IFrameWriter finalWriter = null;
+ public void process(IFrameWriter finalWriter) throws HyracksDataException {
try {
- if (runs.isEmpty()) {
- finalWriter = prepareSkipMergingFinalResultWriter(writer);
- finalWriter.open();
- if (sorter != null) {
- try {
- if (sorter.hasRemaining()) {
- sorter.flush(finalWriter);
- }
- } finally {
- sorter.close();
- }
- }
- } else {
- /** recycle sort buffer */
- if (sorter != null) {
- sorter.close();
- }
+ int maxMergeWidth = framesLimit - 1;
+ inFrames = new ArrayList<>(maxMergeWidth);
+ outputFrame = new VSizeFrame(ctx);
+ List<GeneratedRunFileReader> partialRuns = new
ArrayList<>(maxMergeWidth);
+ int stop = runs.size();
+ currentGenerationRunAvailable.set(0, stop);
+ int numberOfPasses = 1;
+ while (true) {
+ int unUsed = selectPartialRuns(maxMergeWidth *
ctx.getInitialFrameSize(), runs, partialRuns,
+ currentGenerationRunAvailable, stop);
+ prepareFrames(unUsed, inFrames, partialRuns);
- finalWriter = prepareFinalMergeResultWriter(writer);
- finalWriter.open();
-
- int maxMergeWidth = framesLimit - 1;
-
- inFrames = new ArrayList<>(maxMergeWidth);
- outputFrame = new VSizeFrame(ctx);
- List<GeneratedRunFileReader> partialRuns = new
ArrayList<>(maxMergeWidth);
-
- int stop = runs.size();
- currentGenerationRunAvailable.set(0, stop);
- int numberOfPasses = 1;
- while (true) {
-
- int unUsed = selectPartialRuns(maxMergeWidth *
ctx.getInitialFrameSize(), runs, partialRuns,
- currentGenerationRunAvailable, stop);
- prepareFrames(unUsed, inFrames, partialRuns);
-
- if (!currentGenerationRunAvailable.isEmpty() || stop <
runs.size()) {
- GeneratedRunFileReader reader;
- if (partialRuns.size() == 1) {
- if (!currentGenerationRunAvailable.isEmpty()) {
- throw new HyracksDataException(
- "The record is too big to put into the
merging frame, please"
- + " allocate more sorting
memory");
- } else {
- reader = partialRuns.get(0);
- }
-
+ if (!currentGenerationRunAvailable.isEmpty() || stop <
runs.size()) {
+ GeneratedRunFileReader reader;
+ if (partialRuns.size() == 1) {
+ if (!currentGenerationRunAvailable.isEmpty()) {
+ throw new HyracksDataException("The record is too
big to put into the merging frame, please"
+ + " allocate more sorting memory");
} else {
- RunFileWriter mergeFileWriter =
prepareIntermediateMergeRunFile();
- IFrameWriter mergeResultWriter =
prepareIntermediateMergeResultWriter(mergeFileWriter);
-
- try {
- mergeResultWriter.open();
- merge(mergeResultWriter, partialRuns);
- } catch (Throwable t) {
- mergeResultWriter.fail();
- throw t;
- } finally {
- mergeResultWriter.close();
- }
- reader = mergeFileWriter.createReader();
- }
- runs.add(reader);
-
- if (currentGenerationRunAvailable.isEmpty()) {
- numberOfPasses++;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("generated runs:" + stop);
- }
- runs.subList(0, stop).clear();
- currentGenerationRunAvailable.clear();
- currentGenerationRunAvailable.set(0, runs.size());
- stop = runs.size();
+ reader = partialRuns.get(0);
}
} else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("final runs: {}", stop);
- LOGGER.debug("number of passes: " +
numberOfPasses);
+ RunFileWriter mergeFileWriter =
prepareIntermediateMergeRunFile();
+ IFrameWriter mergeResultWriter =
prepareIntermediateMergeResultWriter(mergeFileWriter);
+
+ try {
+ mergeResultWriter.open();
+ merge(mergeResultWriter, partialRuns);
+ } catch (Throwable t) {
+ mergeResultWriter.fail();
+ throw t;
+ } finally {
+ mergeResultWriter.close();
}
- merge(finalWriter, partialRuns);
- break;
+ reader = mergeFileWriter.createReader();
}
- }
- }
- } catch (Exception e) {
- if (finalWriter != null) {
- finalWriter.fail();
- }
- throw HyracksDataException.create(e);
- } finally {
- try {
- if (finalWriter != null) {
- finalWriter.close();
- }
- } finally {
- for (RunFileReader reader : runs) {
- try {
- reader.close(); // close is idempotent.
- } catch (Exception e) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.log(Level.WARN, e.getMessage(), e);
+ runs.add(reader);
+
+ if (currentGenerationRunAvailable.isEmpty()) {
+ numberOfPasses++;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("generated runs:" + stop);
}
+ runs.subList(0, stop).clear();
+ currentGenerationRunAvailable.clear();
+ currentGenerationRunAvailable.set(0, runs.size());
+ stop = runs.size();
+ }
+ } else {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("final runs: {}", stop);
+ LOGGER.debug("number of passes: " + numberOfPasses);
+ }
+ merge(finalWriter, partialRuns);
+ break;
+ }
+ }
+ } finally {
+ for (RunFileReader reader : runs) {
+ try {
+ reader.close(); // close is idempotent.
+ } catch (Exception e) {
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.log(Level.WARN, e.getMessage(), e);
}
}
}
@@ -237,18 +192,6 @@
}
}
- protected abstract IFrameWriter
prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
- throws HyracksDataException;
-
- protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws
HyracksDataException;
-
- protected abstract IFrameWriter
prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
- throws HyracksDataException;
-
- protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter
nextWriter) throws HyracksDataException;
-
- protected abstract int[] getSortFields();
-
private void merge(IFrameWriter writer, List<GeneratedRunFileReader>
partialRuns) throws HyracksDataException {
RunMergingFrameReader merger = new RunMergingFrameReader(ctx,
partialRuns, inFrames, getSortFields(),
comparators, nmkComputer, recordDesc, topK);
@@ -267,4 +210,16 @@
}
}
+ public abstract IFrameWriter
prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
+ throws HyracksDataException;
+
+ protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws
HyracksDataException;
+
+ protected abstract IFrameWriter
prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+ throws HyracksDataException;
+
+ public abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter
nextWriter) throws HyracksDataException;
+
+ protected abstract int[] getSortFields();
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 980ad9b..74223d8 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -79,14 +79,6 @@
private final BufferInfo info = new BufferInfo(null, -1, -1);
public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager
bufferManager, int maxSortFrames,
- int[] sortFields, INormalizedKeyComputerFactory[]
keyNormalizerFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor
recordDescriptor)
- throws HyracksDataException {
- this(ctx, bufferManager, maxSortFrames, sortFields,
keyNormalizerFactories, comparatorFactories,
- recordDescriptor, Integer.MAX_VALUE);
- }
-
- public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager
bufferManager, int maxSortFrames,
int[] sortFields, INormalizedKeyComputerFactory[]
normalizedKeyComputerFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor
recordDescriptor, int outputLimit)
throws HyracksDataException {
@@ -286,23 +278,10 @@
return 0;
}
- protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
- System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
- System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize,
ptrSize);
- System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
- }
-
- protected void copy(int src[], int srcPos, int dest[], int destPos) {
- System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize,
ptrSize);
- }
-
- protected void copy(int src[], int srcPos, int dest[], int destPos, int n)
{
- System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n *
ptrSize);
- }
-
@Override
public void close() {
tupleCount = 0;
+ totalMemoryUsed = 0;
bufferManager.close();
tPointers = null;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 406703e..2760857 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -49,12 +49,9 @@
public abstract class AbstractSorterOperatorDescriptor extends
AbstractOperatorDescriptor {
private static final Logger LOGGER = LogManager.getLogger();
-
private static final long serialVersionUID = 1L;
-
protected static final int SORT_ACTIVITY_ID = 0;
protected static final int MERGE_ACTIVITY_ID = 1;
-
protected final int[] sortFields;
protected final INormalizedKeyComputerFactory[] keyNormalizerFactories;
protected final IBinaryComparatorFactory[] comparatorFactories;
@@ -90,10 +87,10 @@
}
public static class SortTaskState extends AbstractStateObject {
- public List<GeneratedRunFileReader> generatedRunFileReaders;
- public ISorter sorter;
+ List<GeneratedRunFileReader> generatedRunFileReaders;
+ ISorter sorter;
- public SortTaskState(JobId jobId, TaskId taskId) {
+ SortTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
}
}
@@ -101,7 +98,7 @@
protected abstract class SortActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public SortActivity(ActivityId id) {
+ protected SortActivity(ActivityId id) {
super(id);
}
@@ -111,7 +108,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int
partition, int nPartitions) {
- IOperatorNodePushable op = new
AbstractUnaryInputSinkOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
private AbstractSortRunGenerator runGen;
@Override
@@ -143,26 +140,24 @@
runGen.fail();
}
};
- return op;
}
}
protected abstract class MergeActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public MergeActivity(ActivityId id) {
+ protected MergeActivity(ActivityId id) {
super(id);
}
protected abstract AbstractExternalSortRunMerger
getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter
writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[]
comparators, INormalizedKeyComputer nmkComputer,
- int necessaryFrames);
+ IRecordDescriptorProvider recordDescProvider,
List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer
nmkComputer, int necessaryFrames);
@Override
public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int
partition, int nPartitions) {
- IOperatorNodePushable op = new
AbstractUnaryOutputSourceOperatorNodePushable() {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
@@ -176,13 +171,40 @@
}
INormalizedKeyComputer nmkComputer =
keyNormalizerFactories == null ? null
:
keyNormalizerFactories[0].createNormalizedKeyComputer();
- AbstractExternalSortRunMerger merger =
getSortRunMerger(ctx, recordDescProvider, writer, sorter,
- runs, comparators, nmkComputer, framesLimit);
- merger.process();
+ AbstractExternalSortRunMerger merger =
+ getSortRunMerger(ctx, recordDescProvider, runs,
comparators, nmkComputer, framesLimit);
+ boolean closeSorterInFinally = true;
+ IFrameWriter wrappingWriter = null;
+ try {
+ if (runs.isEmpty()) {
+ wrappingWriter =
merger.prepareSkipMergingFinalResultWriter(writer);
+ wrappingWriter.open();
+ if (sorter.hasRemaining()) {
+ sorter.flush(wrappingWriter);
+ }
+ } else {
+ // eagerly close the sorter here to release memory
rather than in finally
+ closeSorterInFinally = false;
+ sorter.close();
+ wrappingWriter =
merger.prepareFinalMergeResultWriter(writer);
+ wrappingWriter.open();
+ merger.process(wrappingWriter);
+ }
+ } catch (Throwable e) {
+ if (wrappingWriter != null) {
+ wrappingWriter.fail();
+ }
+ throw HyracksDataException.create(e);
+ } finally {
+ if (closeSorterInFinally) {
+ sorter.close();
+ }
+ if (wrappingWriter != null) {
+ wrappingWriter.close();
+ }
+ }
}
};
- return op;
}
}
-
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index b58d4c7..8b80a26 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -20,7 +20,6 @@
import java.util.List;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -91,12 +90,11 @@
private static final long serialVersionUID = 1L;
@Override
- protected ExternalSortRunMerger
getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter
writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[]
comparators,
- INormalizedKeyComputer nmkComputer, int necessaryFrames) {
- return new ExternalSortRunMerger(ctx, sorter, runs,
sortFields, comparators, nmkComputer,
- outRecDescs[0], necessaryFrames, outputLimit, writer);
+ protected AbstractExternalSortRunMerger
getSortRunMerger(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider,
List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer
nmkComputer, int necessaryFrames) {
+ return new ExternalSortRunMerger(ctx, runs, sortFields,
comparators, nmkComputer, outRecDescs[0],
+ necessaryFrames, outputLimit);
}
};
}
@@ -113,7 +111,7 @@
RecordDescriptor recordDescriptor, Algorithm alg,
EnumFreeSlotPolicy policy, int outputLimit) {
super(spec, framesLimit, sortFields, keyNormalizerFactories,
comparatorFactories, recordDescriptor);
if (framesLimit <= 1) {
- throw new IllegalStateException();// minimum of 2 fames (1 in,1
out)
+ throw new IllegalStateException();// minimum of 2 frames (1 in,1
out)
}
this.alg = alg;
this.policy = policy;
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 2b985b9..fb32b0f 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -34,15 +34,15 @@
private final int[] sortFields;
- public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter,
List<GeneratedRunFileReader> runs,
- int[] sortFields, IBinaryComparator[] comparators,
INormalizedKeyComputer nmkComputer,
- RecordDescriptor recordDesc, int framesLimit, int topK,
IFrameWriter writer) {
- super(ctx, sorter, runs, comparators, nmkComputer, recordDesc,
framesLimit, topK, writer);
+ public ExternalSortRunMerger(IHyracksTaskContext ctx,
List<GeneratedRunFileReader> runs, int[] sortFields,
+ IBinaryComparator[] comparators, INormalizedKeyComputer
nmkComputer, RecordDescriptor recordDesc,
+ int framesLimit, int topK) {
+ super(ctx, runs, comparators, nmkComputer, recordDesc, framesLimit,
topK);
this.sortFields = sortFields;
}
@Override
- protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter
nextWriter) throws HyracksDataException {
+ public IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter
nextWriter) throws HyracksDataException {
return nextWriter;
}
@@ -59,7 +59,7 @@
}
@Override
- protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter
nextWriter) throws HyracksDataException {
+ public IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter)
throws HyracksDataException {
return nextWriter;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index 260b665..92b7d7c 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -116,4 +116,12 @@
copy(tPointers, pos2, tPointersTemp, targetPos, rest);
}
}
+
+ private void copy(int src[], int srcPos, int dest[], int destPos) {
+ System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize,
ptrSize);
+ }
+
+ private void copy(int src[], int srcPos, int dest[], int destPos, int n) {
+ System.arraycopy(src, srcPos * ptrSize, dest, destPos * ptrSize, n *
ptrSize);
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index 486bc7c..ddef0d5 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -25,17 +25,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-public class FrameSorterQuickSort extends AbstractFrameSorter {
+class FrameSorterQuickSort extends AbstractFrameSorter {
- public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager
bufferManager, int maxSortFrames,
- int[] sortFields, INormalizedKeyComputerFactory[]
keyNormalizerFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor
recordDescriptor)
- throws HyracksDataException {
- this(ctx, bufferManager, maxSortFrames, sortFields,
keyNormalizerFactories, comparatorFactories,
- recordDescriptor, Integer.MAX_VALUE);
- }
-
- public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager
bufferManager, int maxSortFrames,
+ FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager
bufferManager, int maxSortFrames,
int[] sortFields, INormalizedKeyComputerFactory[]
keyNormalizerFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor
recordDescriptor, int outputLimit)
throws HyracksDataException {
@@ -48,7 +40,7 @@
sort(0, tupleCount);
}
- void sort(int offset, int length) throws HyracksDataException {
+ private void sort(int offset, int length) throws HyracksDataException {
int m = offset + (length >> 1);
int a = offset;
@@ -102,4 +94,9 @@
}
}
+ private void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
+ System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
+ System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize,
ptrSize);
+ System.arraycopy(tmpPointer, 0, pointers2, pos2 * ptrSize, ptrSize);
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index 1578975..54c949d 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -44,12 +44,12 @@
protected final INormalizedKeyComputerFactory[] nmkFactories;
protected final IBinaryComparatorFactory[] comparatorFactories;
protected final RecordDescriptor recordDescriptor;
- protected ITupleSorter tupleSorter;
- protected IFrameTupleAccessor inAccessor;
+ protected final ITupleSorter tupleSorter;
+ protected final IFrameTupleAccessor inAccessor;
public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int
topK, int[] sortFields,
INormalizedKeyComputerFactory[] keyNormalizerFactories,
IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) {
+ RecordDescriptor recDescriptor) throws HyracksDataException {
super();
this.ctx = ctx;
this.frameLimit = frameLimit;
@@ -57,17 +57,11 @@
this.sortFields = sortFields;
this.nmkFactories = keyNormalizerFactories;
this.comparatorFactories = comparatorFactories;
- this.inAccessor = new FrameTupleAccessor(recordDescriptor);
- this.recordDescriptor = recordDescriptor;
- }
-
- @Override
- public void open() throws HyracksDataException {
+ this.inAccessor = new FrameTupleAccessor(recDescriptor);
+ this.recordDescriptor = recDescriptor;
IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) *
ctx.getInitialFrameSize());
- IDeletableTupleBufferManager bufferManager =
- new VariableDeletableTupleMemoryManager(framePool,
recordDescriptor);
+ IDeletableTupleBufferManager bufferManager = new
VariableDeletableTupleMemoryManager(framePool, recDescriptor);
tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK,
sortFields, nmkFactories, comparatorFactories);
- super.open();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
index 180ecbc..de4f4fa 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -32,30 +32,28 @@
import
org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
import
org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager;
import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class HybridTopKSortRunGenerator extends HeapSortRunGenerator {
- private static final Logger LOG = LogManager.getLogger();
private static final int SWITCH_TO_FRAME_SORTER_THRESHOLD = 2;
- private IFrameSorter frameSorter = null;
+ private final IFrameSorter frameSorter;
private int tupleSorterFlushedTimes = 0;
+ private boolean isUsingTupleSorter = true;
public HybridTopKSortRunGenerator(IHyracksTaskContext ctx, int frameLimit,
int topK, int[] sortFields,
INormalizedKeyComputerFactory[] keyNormalizerFactories,
IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) {
+ RecordDescriptor recordDescriptor) throws HyracksDataException {
super(ctx, frameLimit, topK, sortFields, keyNormalizerFactories,
comparatorFactories, recordDescriptor);
+ VariableFrameMemoryManager bufferManager = new
VariableFrameMemoryManager(
+ new VariableFramePool(ctx, (frameLimit - 1) *
ctx.getInitialFrameSize()),
+
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.BIGGEST_FIT,
frameLimit - 1));
+ frameSorter = new FrameSorterMergeSort(ctx, bufferManager, frameLimit
- 1, sortFields, nmkFactories,
+ comparatorFactories, recordDescriptor, topK);
}
@Override
- public ISorter getSorter() throws HyracksDataException {
- if (tupleSorter != null) {
- return tupleSorter;
- } else if (frameSorter != null) {
- return frameSorter;
- }
- return null;
+ public ISorter getSorter() {
+ return isUsingTupleSorter ? tupleSorter : frameSorter;
}
@Override
@@ -70,8 +68,8 @@
if (topK <= 0) {
return;
}
- inAccessor.reset(buffer);
- if (tupleSorter != null) {
+ if (isUsingTupleSorter) {
+ inAccessor.reset(buffer);
boolean isBadK = false;
for (int i = 0; i < inAccessor.getTupleCount(); i++) {
if (!tupleSorter.insertTuple(inAccessor, i)) {
@@ -89,29 +87,13 @@
flushFramesToRun();
}
tupleSorter.close();
- tupleSorter = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("clear tupleSorter");
- }
+ isUsingTupleSorter = false;
}
}
- } else {
- if (frameSorter == null) {
- VariableFrameMemoryManager bufferManager = new
VariableFrameMemoryManager(
- new VariableFramePool(ctx, (frameLimit - 1) *
ctx.getInitialFrameSize()),
-
FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.BIGGEST_FIT,
- frameLimit - 1));
- frameSorter = new FrameSorterMergeSort(ctx, bufferManager,
frameLimit - 1, sortFields, nmkFactories,
- comparatorFactories, recordDescriptor, topK);
- if (LOG.isDebugEnabled()) {
- LOG.debug("create frameSorter");
- }
- }
+ } else if (!frameSorter.insertFrame(buffer)) {
+ flushFramesToRun();
if (!frameSorter.insertFrame(buffer)) {
- flushFramesToRun();
- if (!frameSorter.insertFrame(buffer)) {
- throw new HyracksDataException("The given frame is too big
to insert into the sorting memory.");
- }
+ throw new HyracksDataException("The given frame is too big to
insert into the sorting memory.");
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index dea770a..1e90f1e 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -21,7 +21,6 @@
import java.util.List;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -30,6 +29,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
@@ -47,7 +47,7 @@
comparatorFactories, recordDescriptor);
}
- public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int
framesLimit, int topK, int[] sortFields,
+ private TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int
framesLimit, int topK, int[] sortFields,
INormalizedKeyComputerFactory[] keyNormalizerFactories,
IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
super(spec, framesLimit, sortFields, keyNormalizerFactories,
comparatorFactories, recordDescriptor);
@@ -61,7 +61,7 @@
@Override
protected AbstractSortRunGenerator
getRunGenerator(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider) {
+ IRecordDescriptorProvider recordDescProvider) throws
HyracksDataException {
return new HybridTopKSortRunGenerator(ctx, framesLimit, topK,
sortFields, keyNormalizerFactories,
comparatorFactories, outRecDescs[0]);
@@ -75,12 +75,11 @@
private static final long serialVersionUID = 1L;
@Override
- protected ExternalSortRunMerger
getSortRunMerger(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, IFrameWriter
writer, ISorter sorter,
- List<GeneratedRunFileReader> runs, IBinaryComparator[]
comparators,
- INormalizedKeyComputer nmkComputer, int necessaryFrames) {
- return new ExternalSortRunMerger(ctx, sorter, runs,
sortFields, comparators, nmkComputer,
- outRecDescs[0], necessaryFrames, topK, writer);
+ protected AbstractExternalSortRunMerger
getSortRunMerger(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider,
List<GeneratedRunFileReader> runs,
+ IBinaryComparator[] comparators, INormalizedKeyComputer
nmkComputer, int necessaryFrames) {
+ return new ExternalSortRunMerger(ctx, runs, sortFields,
comparators, nmkComputer, outRecDescs[0],
+ necessaryFrames, topK);
}
};
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
index bcf661f..8fa570c 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/SortGroupbyTest.java
@@ -57,10 +57,33 @@
}
INormalizedKeyComputer nmkComputer =
normalizedKeyComputerFactory == null ? null
:
normalizedKeyComputerFactory.createNormalizedKeyComputer();
- AbstractExternalSortRunMerger merger = new
ExternalSortGroupByRunMerger(ctx, sorter, runs, keyFields,
- inRecordDesc, outputRec, outputRec, numFrames, writer,
keyFields, nmkComputer, comparators,
+ AbstractExternalSortRunMerger merger = new
ExternalSortGroupByRunMerger(ctx, runs, keyFields,
+ inRecordDesc, outputRec, outputRec, numFrames,
keyFields, nmkComputer, comparators,
partialAggrInState, finalAggrInState, true);
- merger.process();
+ IFrameWriter wrappingWriter = null;
+ try {
+ if (runs.isEmpty()) {
+ wrappingWriter =
merger.prepareSkipMergingFinalResultWriter(writer);
+ wrappingWriter.open();
+ if (sorter.hasRemaining()) {
+ sorter.flush(wrappingWriter);
+ }
+ } else {
+ wrappingWriter =
merger.prepareFinalMergeResultWriter(writer);
+ wrappingWriter.open();
+ merger.process(wrappingWriter);
+ }
+ } catch (Throwable e) {
+ if (wrappingWriter != null) {
+ wrappingWriter.fail();
+ }
+ throw HyracksDataException.create(e);
+ } finally {
+ sorter.close();
+ if (wrappingWriter != null) {
+ wrappingWriter.close();
+ }
+ }
}
};
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3363
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Idda31c92cbcddba5ebef8bbbf7855b9c8293dd51
Gerrit-Change-Number: 3363
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>