This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4b3339a4aed [opt](scan) read scan ranges in the order of partitions (#33515) (#33657) 4b3339a4aed is described below commit 4b3339a4aed20b6daa5919e532edc7edee6e832f Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Apr 15 16:20:12 2024 +0800 [opt](scan) read scan ranges in the order of partitions (#33515) (#33657) backport: #33515 --- be/src/pipeline/exec/file_scan_operator.cpp | 36 +++++++++++++++------- be/src/vec/exec/scan/new_file_scan_node.cpp | 36 +++++++++++++++------- .../doris/datasource/FederationBackendPolicy.java | 27 ++++++---------- .../doris/planner/FederationBackendPolicyTest.java | 19 ++++++++++-- 4 files changed, 77 insertions(+), 41 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index ac193147dfb..f81781481df 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -73,20 +73,34 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, _scan_ranges = scan_ranges; } else { // There is no need for the number of scanners to exceed the number of threads in thread pool. - _scan_ranges.clear(); - auto range_iter = scan_ranges.begin(); - for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { - _scan_ranges.push_back(*range_iter); + // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + _scan_ranges.resize(max_scanners); + int num_ranges = scan_ranges.size() / max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } } - for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { - if (i == max_scanners) { - i = 0; + for (int i = num_add_one; i < max_scanners; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; - auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } if (scan_ranges.size() > 0 && diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 2ce80f4463a..eed7cfaaec6 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -71,20 +71,34 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state, _scan_ranges = scan_ranges; } else { // There is no need for the number of scanners to exceed the number of threads in thread pool. - _scan_ranges.clear(); - auto range_iter = scan_ranges.begin(); - for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { - _scan_ranges.push_back(*range_iter); + // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + _scan_ranges.resize(max_scanners); + int num_ranges = scan_ranges.size() / max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } } - for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { - if (i == max_scanners) { - i = 0; + for (int i = num_add_one; i < max_scanners; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; - auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } if (scan_ranges.size() > 0 && diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 73a49bb24a8..13756c978f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -211,24 +211,18 @@ public class FederationBackendPolicy { this.enableSplitsRedistribution = enableSplitsRedistribution; } + /** + * Assign splits to each backend. Ensure that each backend receives a similar amount of data. + * In order to make sure backends utilize the os page cache as much as possible, and all backends read splits + * in the order of partitions(reading data in partition order can reduce the memory usage of backends), + * splits should be sorted by path. + * Fortunately, the process of obtaining splits ensures that the splits have been sorted according to the path. + * If the splits are unordered, it is strongly recommended to sort them before calling this function. + */ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException { - // Sorting splits is to ensure that the same query utilizes the os page cache as much as possible. - splits.sort((split1, split2) -> { - int pathComparison = split1.getPathString().compareTo(split2.getPathString()); - if (pathComparison != 0) { - return pathComparison; - } - - int startComparison = Long.compare(split1.getStart(), split2.getStart()); - if (startComparison != 0) { - return startComparison; - } - return Long.compare(split1.getLength(), split2.getLength()); - }); - ListMultimap<Backend, Split> assignment = ArrayListMultimap.create(); - List<Split> remainingSplits = null; + List<Split> remainingSplits; List<Backend> backends = new ArrayList<>(); for (List<Backend> backendList : backendMap.values()) { @@ -242,8 +236,7 @@ public class FederationBackendPolicy { // locality information if (Config.split_assigner_optimized_local_scheduling) { remainingSplits = new ArrayList<>(splits.size()); - for (int i = 0; i < splits.size(); ++i) { - Split split = splits.get(i); + for (Split split : splits) { if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) { List<Backend> candidateNodes = selectExactNodes(backendMap, split.getHosts()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index c0307cbd6d1..82f46862674 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -288,6 +288,21 @@ public class FederationBackendPolicyTest { } + public static void sortSplits(List<Split> splits) { + splits.sort((split1, split2) -> { + int pathComparison = split1.getPathString().compareTo(split2.getPathString()); + if (pathComparison != 0) { + return pathComparison; + } + + int startComparison = Long.compare(split1.getStart(), split2.getStart()); + if (startComparison != 0) { + return startComparison; + } + return Long.compare(split1.getLength(), split2.getLength()); + }); + } + @Test public void testGenerateRandomly() throws UserException { SystemInfoService service = new SystemInfoService(); @@ -367,7 +382,7 @@ public class FederationBackendPolicyTest { List<Split> totalSplits = new ArrayList<>(); totalSplits.addAll(remoteSplits); totalSplits.addAll(localSplits); - Collections.shuffle(totalSplits); + sortSplits(totalSplits); Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits); if (i == 0) { result = ArrayListMultimap.create(assignment); @@ -489,7 +504,7 @@ public class FederationBackendPolicyTest { List<Split> totalSplits = new ArrayList<>(); totalSplits.addAll(remoteSplits); totalSplits.addAll(localSplits); - Collections.shuffle(totalSplits); + sortSplits(totalSplits); Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits); if (i == 0) { result = ArrayListMultimap.create(assignment); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org