This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c6b97c4daa3 [Improvement](segment iterator) remove range in first read 
to save time (#26689)
c6b97c4daa3 is described below

commit c6b97c4daa303d37df48d68840dcfbdcbb209700
Author: airborne12 <airborn...@gmail.com>
AuthorDate: Mon Nov 13 15:51:48 2023 +0800

    [Improvement](segment iterator) remove range in first read to save time 
(#26689)
    
    Currently, rowids may be fragmented significantly after 
`_get_row_ranges_by_column_conditions`, potentially leading to high CPU costs 
when processing these scattered ranges of rowid.
    
    This PR enhances the `SegmentIterator` by eliminating the initial range 
read in the `BitmapRangeIterator` constructor and introducing a 
`read_batch_rowids` method to both `BitmapRangeIterator` and 
`BackwardBitmapRangeIterator` classes. The aim is to boost performance by 
omitting redundant read operations, thereby reducing execution time.
    
    Moreover, to avoid unnecessary reads when the range is relatively complete, 
we employ a simple `is_continuous` check to determine if the block of rows is 
continuous. If so, we call `next_batch` instead of `read_by_rowids`, 
streamlining the processing of consecutive rowids.
    
    
    We selected three SQL statement scenarios to test the effects of the 
optimization, which are:
    
    1. ```select COUNT() from wc_httplogs_inverted_index where request match 
"images" and (size >= 10 and status = 200);```
    2. ```select COUNT() from wc_httplogs_inverted_index where request match 
"HTTP" and (size >= 10 and status = 200);```
    3. ```select COUNT() from wc_httplogs_inverted_index where request match 
"GET" and (size >= 10 and status = 200);```
    
    - The first SQL statement represents the scenario primarily optimized in 
this PR, where the first read matches a large number of rows but is highly 
fragmented.
    - The second SQL statement represents a scenario where the first read fully 
hits, mainly to verify if there is any performance degradation in the PR when 
hitting a complete rowid range.
    - The third SQL statement represents a near-total hit with only occasional 
misses, used to check if the PR degrades when the rowid range contains many 
continuous ranges.
    
    The results are as follows:
    
    1. For the first SQL statement:
        1. Before optimization: Execution time: 0.32 sec, FirstReadTime: 6s628ms
        2. After optimization: Execution time: 0.16 sec, FirstReadTime: 1s604ms
    2. For the second SQL statement:
        1. Before optimization: Execution time: 0.16 sec, FirstReadTime: 
682.816ms
        2. After optimization: Execution time: 0.15 sec, FirstReadTime: 
635.156ms
    3. For the third SQL statement:
        1. Before optimization: Execution time: 0.16 sec, FirstReadTime: 
787.904ms
        2. After optimization: Execution time: 0.16 sec, FirstReadTime: 
798.861ms
---
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 193 ++++++++++++++-------
 be/src/olap/rowset/segment_v2/segment_iterator.h   |   3 +-
 2 files changed, 133 insertions(+), 63 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 94f849e8165..38e24cd7c8a 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -99,11 +99,12 @@ public:
 
     explicit BitmapRangeIterator(const roaring::Roaring& bitmap) {
         roaring_init_iterator(&bitmap.roaring, &_iter);
-        _read_next_batch();
     }
 
     bool has_more_range() const { return !_eof; }
 
+    [[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; }
+
     // read next range into [*from, *to) whose size <= max_range_size.
     // return false when there is no more range.
     virtual bool next_range(const uint32_t max_range_size, uint32_t* from, 
uint32_t* to) {
@@ -142,6 +143,11 @@ public:
         return true;
     }
 
+    // read batch_size of rowids from roaring bitmap into buf array
+    virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) {
+        return roaring::api::roaring_read_uint32_iterator(&_iter, buf, 
batch_size);
+    }
+
 private:
     void _read_next_batch() {
         _buf_pos = 0;
@@ -166,6 +172,8 @@ class SegmentIterator::BackwardBitmapRangeIterator : public 
SegmentIterator::Bit
 public:
     explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) {
         roaring_init_iterator_last(&bitmap.roaring, &_riter);
+        _rowid_count = roaring_bitmap_get_cardinality(&bitmap.roaring);
+        _rowid_left = _rowid_count;
     }
 
     bool has_more_range() const { return !_riter.has_value; }
@@ -189,9 +197,51 @@ public:
 
         return true;
     }
+    /**
+     * Reads a batch of row IDs from a roaring bitmap, starting from the end 
and moving backwards.
+     * This function retrieves the last `batch_size` row IDs from the bitmap 
and stores them in the provided buffer.
+     * It updates the internal state to track how many row IDs are left to 
read in subsequent calls.
+     *
+     * The row IDs are read in reverse order, but stored in the buffer 
maintaining their original order in the bitmap.
+     *
+     * Example:
+     *   input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
+     *   If the bitmap has 12 elements and batch_size is set to 5, the 
function will first read [15, 16, 17, 18, 19]
+     *   into the buffer, leaving 7 elements left. In the next call with 
batch_size 5, it will read [4, 5, 6, 7, 10].
+     *
+     */
+    uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override {
+        if (!_riter.has_value || _rowid_left == 0) {
+            return 0;
+        }
+
+        if (_rowid_count <= batch_size) {
+            roaring_bitmap_to_uint32_array(_riter.parent,
+                                           buf); // Fill 'buf' with 
'_rowid_count' elements.
+            uint32_t num_read = _rowid_left;     // Save the number of row IDs 
read.
+            _rowid_left = 0;                     // No row IDs left after this 
operation.
+            return num_read;                     // Return the number of row 
IDs read.
+        }
+
+        uint32_t read_size = std::min(batch_size, _rowid_left);
+        uint32_t num_read = 0; // Counter for the number of row IDs read.
+
+        // Read row IDs into the buffer in reverse order.
+        while (num_read < read_size && _riter.has_value) {
+            buf[read_size - num_read - 1] = _riter.current_value;
+            num_read++;
+            _rowid_left--; // Decrement the count of remaining row IDs.
+            roaring_previous_uint32_iterator(&_riter);
+        }
+
+        // Return the actual number of row IDs read.
+        return num_read;
+    }
 
 private:
     roaring::api::roaring_uint32_iterator_t _riter;
+    uint32_t _rowid_count;
+    uint32_t _rowid_left;
 };
 
 SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr 
schema)
@@ -1610,56 +1660,86 @@ void 
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
     }
 }
 
+/**
+ * Reads columns by their index, handling both continuous and discontinuous 
rowid scenarios.
+ *
+ * This function is designed to read a specified number of rows (up to 
nrows_read_limit)
+ * from the segment iterator, dealing with both continuous and discontinuous 
rowid arrays.
+ * It operates as follows:
+ *
+ * 1. Reads a batch of rowids (up to the specified limit), and checks if they 
are continuous.
+ *    Continuous here means that the rowids form an unbroken sequence (e.g., 
1, 2, 3, 4...).
+ *
+ * 2. For each column that needs to be read (identified by 
_first_read_column_ids):
+ *    - If the rowids are continuous, the function uses seek_to_ordinal and 
next_batch
+ *      for efficient reading.
+ *    - If the rowids are not continuous, the function processes them in 
smaller batches
+ *      (each of size up to 256). Each batch is checked for internal 
continuity:
+ *        a. If a batch is continuous, uses seek_to_ordinal and next_batch for 
that batch.
+ *        b. If a batch is not continuous, uses read_by_rowids for individual 
rowids in the batch.
+ *
+ * This approach optimizes reading performance by leveraging batch processing 
for continuous
+ * rowid sequences and handling discontinuities gracefully in smaller chunks.
+ */
 Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, 
uint32_t& nrows_read,
                                                bool set_block_rowid) {
     SCOPED_RAW_TIMER(&_opts.stats->first_read_ns);
 
-    do {
-        uint32_t range_from = 0;
-        uint32_t range_to = 0;
-        bool has_next_range =
-                _range_iter->next_range(nrows_read_limit - nrows_read, 
&range_from, &range_to);
-        if (!has_next_range) {
-            break;
-        }
-
-        size_t rows_to_read = range_to - range_from;
-        _cur_rowid = range_to;
-
-        if (set_block_rowid) {
-            // Here use std::iota is better performance than for-loop, maybe 
for-loop is not vectorized
-            auto start = _block_rowids.data() + nrows_read;
-            auto end = start + rows_to_read;
-            std::iota(start, end, range_from);
-            nrows_read += rows_to_read;
-        } else {
-            nrows_read += rows_to_read;
-        }
-
-        _split_row_ranges.emplace_back(std::pair {range_from, range_to});
-    } while (nrows_read < nrows_read_limit && !_opts.read_orderby_key_reverse);
+    nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(), 
nrows_read_limit);
+    bool is_continuous = (nrows_read > 1) &&
+                         (_block_rowids[nrows_read - 1] - _block_rowids[0] == 
nrows_read - 1);
 
     for (auto cid : _first_read_column_ids) {
         auto& column = _current_return_columns[cid];
         if (_prune_column(cid, column, true, nrows_read)) {
             continue;
         }
-        for (auto& range : _split_row_ranges) {
-            size_t nrows = range.second - range.first;
-            {
-                _opts.stats->block_first_read_seek_num += 1;
-                if (_opts.runtime_state && 
_opts.runtime_state->enable_profile()) {
-                    SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
-                    
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first));
-                } else {
-                    
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first));
-                }
+
+        if (is_continuous) {
+            size_t rows_read = nrows_read;
+            _opts.stats->block_first_read_seek_num += 1;
+            if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
+                SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
+                
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
+            } else {
+                
RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
             }
-            size_t rows_read = nrows;
             RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, 
column));
-            if (rows_read != nrows) {
-                return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != 
rows_read({})", nrows,
-                                                                rows_read);
+            if (rows_read != nrows_read) {
+                return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != 
rows_read({})",
+                                                                nrows_read, 
rows_read);
+            }
+        } else {
+            const uint32_t batch_size = _range_iter->get_batch_size();
+            uint32_t processed = 0;
+            while (processed < nrows_read) {
+                uint32_t current_batch_size = std::min(batch_size, nrows_read 
- processed);
+                bool batch_continuous = (current_batch_size > 1) &&
+                                        (_block_rowids[processed + 
current_batch_size - 1] -
+                                                 _block_rowids[processed] ==
+                                         current_batch_size - 1);
+
+                if (batch_continuous) {
+                    size_t rows_read = current_batch_size;
+                    _opts.stats->block_first_read_seek_num += 1;
+                    if (_opts.runtime_state && 
_opts.runtime_state->enable_profile()) {
+                        
SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns);
+                        RETURN_IF_ERROR(
+                                
_column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
+                    } else {
+                        RETURN_IF_ERROR(
+                                
_column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
+                    }
+                    
RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
+                    if (rows_read != current_batch_size) {
+                        return Status::Error<ErrorCode::INTERNAL_ERROR>(
+                                "batch nrows({}) != rows_read({})", 
current_batch_size, rows_read);
+                    }
+                } else {
+                    RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(
+                            &_block_rowids[processed], current_batch_size, 
column));
+                }
+                processed += current_batch_size;
             }
         }
     }
@@ -1858,8 +1938,6 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
         nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
         _wait_times_estimate_row_size--;
     }
-    _split_row_ranges.clear();
-    _split_row_ranges.reserve(nrows_read_limit / 2);
     RETURN_IF_ERROR(_read_columns_by_index(
             nrows_read_limit, _current_batch_rows_read,
             _lazy_materialization_read || _opts.record_rowids || 
_is_need_expr_eval));
@@ -2127,35 +2205,28 @@ void 
SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx, uint1
     }
 }
 
-void SegmentIterator::_build_index_result_column(uint16_t* sel_rowid_idx, 
uint16_t select_size,
-                                                 vectorized::Block* block,
+void SegmentIterator::_build_index_result_column(const uint16_t* sel_rowid_idx,
+                                                 uint16_t select_size, 
vectorized::Block* block,
                                                  const std::string& 
pred_result_sign,
                                                  const roaring::Roaring& 
index_result) {
     auto index_result_column = vectorized::ColumnUInt8::create();
     vectorized::ColumnUInt8::Container& vec_match_pred = 
index_result_column->get_data();
     vec_match_pred.resize(block->rows());
-    size_t idx_in_block = 0;
-    size_t idx_in_row_range = 0;
     size_t idx_in_selected = 0;
-    // _split_row_ranges store multiple ranges which split in function 
_read_columns_by_index(),
-    // index_result is a column predicate apply result in a whole segement,
-    // but a scanner thread one time can read max rows limit by block_row_max,
-    // so split _row_bitmap by one time scan range, in order to match size of 
one scanner thread read rows.
-    for (auto origin_row_range : _split_row_ranges) {
-        for (size_t rowid = origin_row_range.first; rowid < 
origin_row_range.second; ++rowid) {
-            if (sel_rowid_idx == nullptr || (idx_in_selected < select_size &&
-                                             idx_in_row_range == 
sel_rowid_idx[idx_in_selected])) {
-                if (index_result.contains(rowid)) {
-                    vec_match_pred[idx_in_block++] = true;
-                } else {
-                    vec_match_pred[idx_in_block++] = false;
-                }
-                idx_in_selected++;
+
+    for (uint32_t i = 0; i < _current_batch_rows_read; i++) {
+        auto rowid = _block_rowids[i];
+        if (sel_rowid_idx == nullptr ||
+            (idx_in_selected < select_size && i == 
sel_rowid_idx[idx_in_selected])) {
+            if (index_result.contains(rowid)) {
+                vec_match_pred[idx_in_selected] = true;
+            } else {
+                vec_match_pred[idx_in_selected] = false;
             }
-            idx_in_row_range++;
+            idx_in_selected++;
         }
     }
-    assert(block->rows() == vec_match_pred.size());
+    DCHECK(block->rows() == vec_match_pred.size());
     auto index_result_position = block->get_position_by_name(pred_result_sign);
     block->replace_by_position(index_result_position, 
std::move(index_result_column));
 }
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 33d3a3f5f9c..352929678b3 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -259,7 +259,7 @@ private:
     std::string _gen_predicate_result_sign(ColumnPredicate* predicate);
     std::string _gen_predicate_result_sign(ColumnPredicateInfo* 
predicate_info);
 
-    void _build_index_result_column(uint16_t* sel_rowid_idx, uint16_t 
select_size,
+    void _build_index_result_column(const uint16_t* sel_rowid_idx, uint16_t 
select_size,
                                     vectorized::Block* block, const 
std::string& pred_result_sign,
                                     const roaring::Roaring& index_result);
     void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t 
select_size,
@@ -340,7 +340,6 @@ private:
     roaring::Roaring _row_bitmap;
     // "column_name+operator+value-> <in_compound_query, rowid_result>
     std::unordered_map<std::string, std::pair<bool, roaring::Roaring>> 
_rowid_result_for_index;
-    std::vector<std::pair<uint32_t, uint32_t>> _split_row_ranges;
     // an iterator for `_row_bitmap` that can be used to extract row range to 
scan
     std::unique_ptr<BitmapRangeIterator> _range_iter;
     // the next rowid to read


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to