ycycse commented on code in PR #9746:
URL: https://github.com/apache/iotdb/pull/9746#discussion_r1186674752
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java:
##########
@@ -67,75 +108,253 @@ public ListenableFuture<?> isBlocked() {
public TsBlock next() throws Exception {
if (!inputOperator.hasNextWithTimer()) {
- if (cachedData.size() > 1) {
- cachedData.sort(comparator);
+ if (diskSpiller.hasSpilledData()) {
+ try {
+ prepareSortReaders();
+ return mergeSort();
+ } catch (Exception e) {
+ clear();
+ throw e;
+ }
+ } else {
+ if (curRow == -1) {
+ cachedData.sort(comparator);
+ curRow = 0;
+ }
+ return buildTsBlockInMemory();
}
- TsBlock result = buildTsBlock();
- cachedData = null;
- return result;
}
TsBlock tsBlock = inputOperator.nextWithTimer();
if (tsBlock == null) {
return null;
}
- // add data of each TsBlock from child into list
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- cachedData.add(new MergeSortKey(tsBlock, i));
+
+ try {
+ cacheTsBlock(tsBlock);
+ } catch (IoTDBException e) {
+ clear();
+ throw e;
}
+
return null;
}
- private TsBlock buildTsBlock() {
+ private void prepareSortReaders() throws IoTDBException {
+ if (sortReaders != null) return;
+
+ try {
+
+ sortReaders = new ArrayList<>();
+ if (cachedBytes != 0) {
+ cachedData.sort(comparator);
+ if (sortBufferManager.allocate(cachedBytes)) {
+ sortReaders.add(new MemoryReader(cachedData));
+ } else {
+ sortBufferManager.allocateOneSortBranch();
+ diskSpiller.spillSortedData(cachedData);
+ cachedData = null;
+ }
+ }
+ sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
+ // if reader is finished
+ noMoreData = new boolean[sortReaders.size()];
+ // need to read data from reader when isEmpty is true
+ isEmpty = new boolean[sortReaders.size()];
+ Arrays.fill(isEmpty, true);
+ } catch (Exception e) {
+ throw new IoTDBException(e.getMessage(),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+ long bytesSize = tsBlock.getRetainedSizeInBytes();
+ if (bytesSize + cachedBytes < sortBufferManager.SORT_BUFFER_SIZE) {
+ cachedBytes += bytesSize;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ cachedData.add(new MergeSortKey(tsBlock, i));
+ }
+ } else {
+ cachedData.sort(comparator);
+ spill();
+ cachedData.clear();
+ cachedBytes = bytesSize;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ cachedData.add(new MergeSortKey(tsBlock, i));
+ }
+ }
+ }
+
+ private void spill() throws IoTDBException {
+ try {
+ // if current memory cannot put this tsBlock, an exception will be
thrown in spillSortedData()
+ // because there should be at least
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
+ // one branch.
+ sortBufferManager.allocateOneSortBranch();
+ diskSpiller.spillSortedData(cachedData);
+ } catch (IOException e) {
+ throw new IoTDBException(e.getMessage(),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
Review Comment:
done
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java:
##########
@@ -67,75 +108,253 @@ public ListenableFuture<?> isBlocked() {
public TsBlock next() throws Exception {
if (!inputOperator.hasNextWithTimer()) {
- if (cachedData.size() > 1) {
- cachedData.sort(comparator);
+ if (diskSpiller.hasSpilledData()) {
+ try {
+ prepareSortReaders();
+ return mergeSort();
+ } catch (Exception e) {
+ clear();
+ throw e;
+ }
+ } else {
+ if (curRow == -1) {
+ cachedData.sort(comparator);
+ curRow = 0;
+ }
+ return buildTsBlockInMemory();
}
- TsBlock result = buildTsBlock();
- cachedData = null;
- return result;
}
TsBlock tsBlock = inputOperator.nextWithTimer();
if (tsBlock == null) {
return null;
}
- // add data of each TsBlock from child into list
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- cachedData.add(new MergeSortKey(tsBlock, i));
+
+ try {
+ cacheTsBlock(tsBlock);
+ } catch (IoTDBException e) {
+ clear();
+ throw e;
}
+
return null;
}
- private TsBlock buildTsBlock() {
+ private void prepareSortReaders() throws IoTDBException {
+ if (sortReaders != null) return;
+
+ try {
+
+ sortReaders = new ArrayList<>();
+ if (cachedBytes != 0) {
+ cachedData.sort(comparator);
+ if (sortBufferManager.allocate(cachedBytes)) {
+ sortReaders.add(new MemoryReader(cachedData));
+ } else {
+ sortBufferManager.allocateOneSortBranch();
+ diskSpiller.spillSortedData(cachedData);
+ cachedData = null;
+ }
+ }
+ sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
+ // if reader is finished
+ noMoreData = new boolean[sortReaders.size()];
+ // need to read data from reader when isEmpty is true
+ isEmpty = new boolean[sortReaders.size()];
+ Arrays.fill(isEmpty, true);
+ } catch (Exception e) {
+ throw new IoTDBException(e.getMessage(),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]