This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4b3339a4aed [opt](scan) read scan ranges in the order of partitions 
(#33515) (#33657)
4b3339a4aed is described below

commit 4b3339a4aed20b6daa5919e532edc7edee6e832f
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Mon Apr 15 16:20:12 2024 +0800

    [opt](scan) read scan ranges in the order of partitions (#33515) (#33657)
    
    backport: #33515
---
 be/src/pipeline/exec/file_scan_operator.cpp        | 36 +++++++++++++++-------
 be/src/vec/exec/scan/new_file_scan_node.cpp        | 36 +++++++++++++++-------
 .../doris/datasource/FederationBackendPolicy.java  | 27 ++++++----------
 .../doris/planner/FederationBackendPolicyTest.java | 19 ++++++++++--
 4 files changed, 77 insertions(+), 41 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index ac193147dfb..f81781481df 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -73,20 +73,34 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* 
state,
         _scan_ranges = scan_ranges;
     } else {
         // There is no need for the number of scanners to exceed the number of 
threads in thread pool.
-        _scan_ranges.clear();
-        auto range_iter = scan_ranges.begin();
-        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); 
++i, ++range_iter) {
-            _scan_ranges.push_back(*range_iter);
+        // scan_ranges is sorted by path(as well as partition path) in FE, so 
merge scan ranges in order.
+        // In the insert statement, reading data in partition order can reduce 
the memory usage of BE
+        // and prevent the generation of smaller tables.
+        _scan_ranges.resize(max_scanners);
+        int num_ranges = scan_ranges.size() / max_scanners;
+        int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+        int scan_index = 0;
+        int range_index = 0;
+        for (int i = 0; i < num_add_one; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
+            }
         }
-        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
-            if (i == max_scanners) {
-                i = 0;
+        for (int i = num_add_one; i < max_scanners; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges - 1; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
             }
-            auto& ranges = 
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
-            auto& merged_ranges = 
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
-            ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
         }
-        _scan_ranges.shrink_to_fit();
         LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << 
_scan_ranges.size();
     }
     if (scan_ranges.size() > 0 &&
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 2ce80f4463a..eed7cfaaec6 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -71,20 +71,34 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
         _scan_ranges = scan_ranges;
     } else {
         // There is no need for the number of scanners to exceed the number of 
threads in thread pool.
-        _scan_ranges.clear();
-        auto range_iter = scan_ranges.begin();
-        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); 
++i, ++range_iter) {
-            _scan_ranges.push_back(*range_iter);
+        // scan_ranges is sorted by path(as well as partition path) in FE, so 
merge scan ranges in order.
+        // In the insert statement, reading data in partition order can reduce 
the memory usage of BE
+        // and prevent the generation of smaller tables.
+        _scan_ranges.resize(max_scanners);
+        int num_ranges = scan_ranges.size() / max_scanners;
+        int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+        int scan_index = 0;
+        int range_index = 0;
+        for (int i = 0; i < num_add_one; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
+            }
         }
-        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
-            if (i == max_scanners) {
-                i = 0;
+        for (int i = num_add_one; i < max_scanners; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges - 1; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
             }
-            auto& ranges = 
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
-            auto& merged_ranges = 
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
-            ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
         }
-        _scan_ranges.shrink_to_fit();
         LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << 
_scan_ranges.size();
     }
     if (scan_ranges.size() > 0 &&
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 73a49bb24a8..13756c978f6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -211,24 +211,18 @@ public class FederationBackendPolicy {
         this.enableSplitsRedistribution = enableSplitsRedistribution;
     }
 
+    /**
+     * Assign splits to each backend. Ensure that each backend receives a 
similar amount of data.
+     * In order to make sure backends utilize the os page cache as much as 
possible, and all backends read splits
+     * in the order of partitions(reading data in partition order can reduce 
the memory usage of backends),
+     * splits should be sorted by path.
+     * Fortunately, the process of obtaining splits ensures that the splits 
have been sorted according to the path.
+     * If the splits are unordered, it is strongly recommended to sort them 
before calling this function.
+     */
     public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> 
splits) throws UserException {
-        // Sorting splits is to ensure that the same query utilizes the os 
page cache as much as possible.
-        splits.sort((split1, split2) -> {
-            int pathComparison = 
split1.getPathString().compareTo(split2.getPathString());
-            if (pathComparison != 0) {
-                return pathComparison;
-            }
-
-            int startComparison = Long.compare(split1.getStart(), 
split2.getStart());
-            if (startComparison != 0) {
-                return startComparison;
-            }
-            return Long.compare(split1.getLength(), split2.getLength());
-        });
-
         ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
 
-        List<Split> remainingSplits = null;
+        List<Split> remainingSplits;
 
         List<Backend> backends = new ArrayList<>();
         for (List<Backend> backendList : backendMap.values()) {
@@ -242,8 +236,7 @@ public class FederationBackendPolicy {
         // locality information
         if (Config.split_assigner_optimized_local_scheduling) {
             remainingSplits = new ArrayList<>(splits.size());
-            for (int i = 0; i < splits.size(); ++i) {
-                Split split = splits.get(i);
+            for (Split split : splits) {
                 if (split.isRemotelyAccessible() && (split.getHosts() != null 
&& split.getHosts().length > 0)) {
                     List<Backend> candidateNodes = 
selectExactNodes(backendMap, split.getHosts());
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index c0307cbd6d1..82f46862674 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -288,6 +288,21 @@ public class FederationBackendPolicyTest {
 
     }
 
+    public static void sortSplits(List<Split> splits) {
+        splits.sort((split1, split2) -> {
+            int pathComparison = 
split1.getPathString().compareTo(split2.getPathString());
+            if (pathComparison != 0) {
+                return pathComparison;
+            }
+
+            int startComparison = Long.compare(split1.getStart(), 
split2.getStart());
+            if (startComparison != 0) {
+                return startComparison;
+            }
+            return Long.compare(split1.getLength(), split2.getLength());
+        });
+    }
+
     @Test
     public void testGenerateRandomly() throws UserException {
         SystemInfoService service = new SystemInfoService();
@@ -367,7 +382,7 @@ public class FederationBackendPolicyTest {
             List<Split> totalSplits = new ArrayList<>();
             totalSplits.addAll(remoteSplits);
             totalSplits.addAll(localSplits);
-            Collections.shuffle(totalSplits);
+            sortSplits(totalSplits);
             Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
             if (i == 0) {
                 result = ArrayListMultimap.create(assignment);
@@ -489,7 +504,7 @@ public class FederationBackendPolicyTest {
             List<Split> totalSplits = new ArrayList<>();
             totalSplits.addAll(remoteSplits);
             totalSplits.addAll(localSplits);
-            Collections.shuffle(totalSplits);
+            sortSplits(totalSplits);
             Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
             if (i == 0) {
                 result = ArrayListMultimap.create(assignment);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to