ycycse commented on code in PR #9746:
URL: https://github.com/apache/iotdb/pull/9746#discussion_r1186674467
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java:
##########
@@ -67,75 +108,253 @@ public ListenableFuture<?> isBlocked() {
public TsBlock next() throws Exception {
if (!inputOperator.hasNextWithTimer()) {
- if (cachedData.size() > 1) {
- cachedData.sort(comparator);
+ if (diskSpiller.hasSpilledData()) {
+ try {
+ prepareSortReaders();
+ return mergeSort();
+ } catch (Exception e) {
+ clear();
+ throw e;
+ }
+ } else {
+ if (curRow == -1) {
+ cachedData.sort(comparator);
+ curRow = 0;
+ }
+ return buildTsBlockInMemory();
}
- TsBlock result = buildTsBlock();
- cachedData = null;
- return result;
}
TsBlock tsBlock = inputOperator.nextWithTimer();
if (tsBlock == null) {
return null;
}
- // add data of each TsBlock from child into list
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- cachedData.add(new MergeSortKey(tsBlock, i));
+
+ try {
+ cacheTsBlock(tsBlock);
+ } catch (IoTDBException e) {
+ clear();
+ throw e;
}
+
return null;
}
- private TsBlock buildTsBlock() {
+ private void prepareSortReaders() throws IoTDBException {
+ if (sortReaders != null) return;
+
+ try {
+
+ sortReaders = new ArrayList<>();
+ if (cachedBytes != 0) {
+ cachedData.sort(comparator);
+ if (sortBufferManager.allocate(cachedBytes)) {
+ sortReaders.add(new MemoryReader(cachedData));
+ } else {
+ sortBufferManager.allocateOneSortBranch();
+ diskSpiller.spillSortedData(cachedData);
+ cachedData = null;
+ }
+ }
+ sortReaders.addAll(diskSpiller.getReaders(sortBufferManager));
+ // if reader is finished
+ noMoreData = new boolean[sortReaders.size()];
+ // need to read data from reader when isEmpty is true
+ isEmpty = new boolean[sortReaders.size()];
+ Arrays.fill(isEmpty, true);
+ } catch (Exception e) {
+ throw new IoTDBException(e.getMessage(),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ private void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
+ long bytesSize = tsBlock.getRetainedSizeInBytes();
+ if (bytesSize + cachedBytes < sortBufferManager.SORT_BUFFER_SIZE) {
+ cachedBytes += bytesSize;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ cachedData.add(new MergeSortKey(tsBlock, i));
+ }
+ } else {
+ cachedData.sort(comparator);
+ spill();
+ cachedData.clear();
+ cachedBytes = bytesSize;
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ cachedData.add(new MergeSortKey(tsBlock, i));
+ }
+ }
+ }
+
+ private void spill() throws IoTDBException {
+ try {
+ // if current memory cannot put this tsBlock, an exception will be
thrown in spillSortedData()
+ // because there should be at least
tsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES for
+ // one branch.
+ sortBufferManager.allocateOneSortBranch();
+ diskSpiller.spillSortedData(cachedData);
+ } catch (IOException e) {
+ throw new IoTDBException(e.getMessage(),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ }
+
+ private TsBlock buildTsBlockInMemory() {
+ tsBlockBuilder.reset();
TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
- cachedData.forEach(
- mergeSortKey -> {
- TsBlock tsBlock = mergeSortKey.tsBlock;
- int row = mergeSortKey.rowIndex;
- timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(row));
- for (int i = 0; i < valueColumnBuilders.length; i++) {
- if (tsBlock.getColumn(i).isNull(row)) {
- valueColumnBuilders[i].appendNull();
- continue;
- }
- valueColumnBuilders[i].write(tsBlock.getColumn(i), row);
- }
- tsBlockBuilder.declarePosition();
- });
+ for (int i = curRow; i < cachedData.size(); i++) {
+ MergeSortKey mergeSortKey = cachedData.get(i);
+ TsBlock tsBlock = mergeSortKey.tsBlock;
+
timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(mergeSortKey.rowIndex));
+ for (int j = 0; j < valueColumnBuilders.length; j++) {
+ if (tsBlock.getColumn(j).isNull(mergeSortKey.rowIndex)) {
+ valueColumnBuilders[j].appendNull();
+ continue;
+ }
+ valueColumnBuilders[j].write(tsBlock.getColumn(j),
mergeSortKey.rowIndex);
+ }
+ tsBlockBuilder.declarePosition();
+ curRow++;
+ if (tsBlockBuilder.isFull()) {
+ break;
+ }
+ }
+
+ return tsBlockBuilder.build();
+ }
+
+ private TsBlock mergeSort() throws IoTDBException {
+
+ if (!diskSpiller.allProcessingTaskFinished()) return null;
+
+ if (mergeSortHeap == null) {
+ mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
+ }
+
+ long startTime = System.nanoTime();
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+ // 1. fill the input from each reader
+ for (int i = 0; i < sortReaders.size(); i++) {
+ if (noMoreData[i] || !isEmpty[i]) continue;
+ SortReader sortReader = sortReaders.get(i);
+ if (sortReader.hasNext()) {
+ MergeSortKey mergeSortKey = sortReader.next();
+ mergeSortKey.tsBlockIndex = i;
+ mergeSortHeap.push(mergeSortKey);
+ isEmpty[i] = false;
+ } else {
+ noMoreData[i] = true;
+ sortBufferManager.releaseOneSortBranch();
+ }
+ }
+
+ // 2. do merge sort until one TsBlock is consumed up
+ tsBlockBuilder.reset();
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
+ while (!mergeSortHeap.isEmpty()) {
+
+ MergeSortKey mergeSortKey = mergeSortHeap.poll();
+ TsBlock targetBlock = mergeSortKey.tsBlock;
+ timeBuilder.writeLong(targetBlock.getTimeByIndex(mergeSortKey.rowIndex));
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ if (targetBlock.getColumn(i).isNull(mergeSortKey.rowIndex)) {
+ valueColumnBuilders[i].appendNull();
+ continue;
+ }
+ valueColumnBuilders[i].write(targetBlock.getColumn(i),
mergeSortKey.rowIndex);
+ }
+ tsBlockBuilder.declarePosition();
+
+ int readerIndex = mergeSortKey.tsBlockIndex;
+ mergeSortKey = readNextMergeSortKey(readerIndex);
+ if (mergeSortKey != null) {
+ mergeSortHeap.push(mergeSortKey);
+ } else {
+ noMoreData[readerIndex] = true;
+ sortBufferManager.releaseOneSortBranch();
+ }
+
+ // break if time is out or tsBlockBuilder is full or sortBuffer is not
enough
+ if (System.nanoTime() - startTime > maxRuntime ||
tsBlockBuilder.isFull()) {
+ break;
+ }
+ }
return tsBlockBuilder.build();
}
+ private MergeSortKey readNextMergeSortKey(int readerIndex) throws
IoTDBException {
+ SortReader sortReader = sortReaders.get(readerIndex);
+ if (sortReader.hasNext()) {
+ MergeSortKey mergeSortKey = sortReader.next();
+ mergeSortKey.tsBlockIndex = readerIndex;
+ return mergeSortKey;
+ }
+ return null;
+ }
+
+ private boolean hasMoreData() {
+ if (noMoreData == null) return true;
+ for (boolean noMore : noMoreData) {
+ if (!noMore) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void clear() {
+ if (!diskSpiller.hasSpilledData()) return;
+ try {
+ diskSpiller.clear();
+ Path path = Paths.get(filePrePath);
+ Files.deleteIfExists(path);
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]