This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpch500
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/tpch500 by this push:
     new 7c2a912ce34 [Fix](parquet-reader) Fix partiton column issue for 
#29181O(Opt late materialization of parquet reader). (#29271)
7c2a912ce34 is described below

commit 7c2a912ce343f0d96587dc3805aa245f098b2e07
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Fri Dec 29 16:25:31 2023 +0800

    [Fix](parquet-reader) Fix partiton column issue for #29181O(Opt late 
materialization of parquet reader). (#29271)
---
 be/src/vec/columns/column_nullable.cpp             | 13 -------
 .../format/parquet/fix_length_plain_decoder.cpp    |  5 ---
 be/src/vec/exec/format/parquet/parquet_common.cpp  |  9 -----
 .../exec/format/parquet/vparquet_column_reader.cpp | 40 ----------------------
 .../exec/format/parquet/vparquet_group_reader.cpp  | 25 +++-----------
 5 files changed, 4 insertions(+), 88 deletions(-)

diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index b44482781e8..426de2d4f70 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -370,19 +370,6 @@ size_t ColumnNullable::filter(const Filter& filter) {
     return data_result_size;
 }
 
-//size_t ColumnNullable::filter(const Filter& filter) {
-//    const auto data_result_size = get_nested_column().filter(filter);
-//
-//    get_null_map_column().resize(data_result_size);
-//    /*if (!_has_null) {
-//        get_null_map_column().resize(data_result_size);
-//    } else {
-//        const auto map_result_size = get_null_map_column().filter(filter);
-//        CHECK_EQ(data_result_size, map_result_size);
-//    }*/
-//    return data_result_size;
-//}
-
 Status ColumnNullable::filter_by_selector(const uint16_t* sel, size_t 
sel_size, IColumn* col_ptr) {
     const auto* nullable_col_ptr = reinterpret_cast<const 
ColumnNullable*>(col_ptr);
     ColumnPtr nest_col_ptr = nullable_col_ptr->nested_column;
diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp 
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
index 3160f5f5f1d..ea1f63a3e2d 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
@@ -233,11 +233,6 @@ Status 
FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
     while (size_t run_length = 
select_vector.get_next_run<has_filter>(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            /*for (size_t i = 0; i < run_length; ++i) {
-                char* buf_start = _data->data + _offset;
-                column_data[data_index++] = *(PhysicalType*)buf_start;
-                _offset += _type_length;
-            }*/
             memcpy(column_data.data() + data_index, _data->data + _offset,
                    run_length * _type_length);
             data_index += run_length;
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 241135eef61..095b34eeb4e 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -75,25 +75,16 @@ void ColumnSelectVector::set_run_length_null_map(const 
std::vector<uint16_t>& ru
             if (is_null) {
                 _num_nulls += run_length;
                 for (int i = 0; i < run_length; ++i) {
-                    //_data_map[map_index++] = FILTERED_NULL;
                     _data_map[map_index++] = NULL_DATA;
                 }
             } else {
                 for (int i = 0; i < run_length; ++i) {
-                    //_data_map[map_index++] = FILTERED_CONTENT;
                     _data_map[map_index++] = CONTENT;
                 }
             }
             is_null = !is_null;
         }
         size_t num_read = 0;
-        /*DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
-        for (size_t i = 0; i < num_values; ++i) {
-            if (_filter_map[_filter_map_index++]) {
-                _data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : 
CONTENT;
-                num_read++;
-            }
-        }*/
         num_read = num_values;
         _num_filtered = num_values - num_read;
         if (null_map != nullptr && num_read > 0) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index f0120f49701..4fe3ae7b597 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -477,7 +477,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
                                             ColumnSelectVector& select_vector, 
size_t batch_size,
                                             size_t* read_rows, bool* eof, bool 
is_dict_filter,
                                             size_t skip_nums, size_t* 
skipped_nums) {
-    //fprintf(stderr, "batch_size: %ld, skip_nums: %ld\n", batch_size, 
skip_nums);
     *skipped_nums = 0;
     if (_chunk_reader->remaining_num_values() == 0) {
         if (!_chunk_reader->has_next_page()) {
@@ -503,35 +502,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
         RETURN_IF_ERROR(_chunk_reader->skip_page());
         *read_rows = 0;
     } else {
-        //bool skip_whole_batch = false;
-        // Determining whether to skip page or batch will increase the 
calculation time.
-        // When the filtering effect is greater than 60%, it is possible to 
skip the page or batch.
-        /*if (select_vector.has_filter() && select_vector.filter_ratio() > 
0.6) {
-            // lazy read
-            size_t remaining_num_values = 0;
-            for (auto& range : read_ranges) {
-                remaining_num_values += range.last_row - range.first_row;
-            }
-            if (batch_size >= remaining_num_values &&
-                select_vector.can_filter_all(remaining_num_values)) {
-                // We can skip the whole page if the remaining values is 
filtered by predicate columns
-                select_vector.skip(remaining_num_values);
-               fprintf(stderr, "select_vector.skip1(%ld)\n", 
remaining_num_values);
-                _current_row_index += _chunk_reader->remaining_num_values();
-                RETURN_IF_ERROR(_chunk_reader->skip_page());
-                *read_rows = remaining_num_values;
-                if (!_chunk_reader->has_next_page()) {
-                    *eof = true;
-                }
-                return Status::OK();
-            }
-            skip_whole_batch =
-                    batch_size <= remaining_num_values && 
select_vector.can_filter_all(batch_size);
-            if (skip_whole_batch) {
-                select_vector.skip(batch_size);
-               fprintf(stderr, "select_vector.skip2(%ld)\n", batch_size);
-            }
-        }*/
         // load page data to decode or skip values
         RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
 
@@ -557,15 +527,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
             // generate the read values
             size_t read_values =
                     std::min((size_t)(range.last_row - _current_row_index), 
batch_size - has_read);
-            /*if (skip_whole_batch) {
-                RETURN_IF_ERROR(_skip_values(read_values));
-               fprintf(stderr, "_skip_values(%ld)\n", read_values);
-            } else {
-                RETURN_IF_ERROR(_read_values(read_values, doris_column, type, 
select_vector,
-                                             is_dict_filter));
-               fprintf(stderr, "_read_values(%ld)\n", read_values);
-            }*/
-            //fprintf(stderr, "read_values: %ld\n", read_values);
             RETURN_IF_ERROR(
                     _read_values(read_values, doris_column, type, 
select_vector, is_dict_filter));
             has_read += read_values;
@@ -581,7 +542,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
     if (_chunk_reader->remaining_num_values() == 0 && 
!_chunk_reader->has_next_page()) {
         *eof = true;
     }
-    //fprintf(stderr, "*read_rows: %ld\n", *read_rows);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0c05dcf8f1c..5757de40294 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -412,7 +412,6 @@ Status RowGroupReader::_read_column_data(Block* block, 
const std::vector<std::st
             RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
                     column_ptr, column_type, select_vector, batch_size - 
col_read_rows, &loop_rows,
                     &col_eof, is_dict_filter, col_skip_nums, &skipped_nums));
-            //fprintf(stderr, "batch_size: %ld, col_read_rows: %ld, loop_rows: 
%ld\n", batch_size, col_read_rows, loop_rows);
             col_skip_nums -= skipped_nums;
             col_read_rows += loop_rows;
         }
@@ -446,12 +445,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
         pre_read_rows = 0;
         pre_eof = false;
         ColumnSelectVector run_length_vector;
-        //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows: %ld\n", 
batch_size,
-        //        pre_read_rows);
         RETURN_IF_ERROR(_read_column_data(block, 
_lazy_read_ctx.predicate_columns.first, batch_size,
                                           &pre_read_rows, &pre_eof, 
run_length_vector, 0));
-        //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows %ld 
finished\n", batch_size,
-        //        pre_read_rows);
         if (pre_read_rows == 0) {
             DCHECK_EQ(pre_eof, true);
             break;
@@ -526,41 +521,31 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     }
 
     ColumnSelectVector& select_vector = *select_vector_ptr;
-    /*std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
-    if (_cached_filtered_rows != 0) {
-        _rebuild_select_vector(select_vector, rebuild_filter_map, 
pre_read_rows);
-        pre_read_rows += _cached_filtered_rows;
-        _cached_filtered_rows = 0;
-    }*/
 
     // lazy read columns
     size_t lazy_read_rows;
     bool lazy_eof;
-    //fprintf(stderr, "lazy column pre_read_rows: %ld\n", pre_read_rows);
     RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, 
pre_read_rows,
                                       &lazy_read_rows, &lazy_eof, 
select_vector,
                                       _cached_filtered_rows));
-    //fprintf(stderr, "lazy column pre_read_rows: %ld finished\n", 
pre_read_rows);
     if (pre_read_rows != lazy_read_rows) {
         return Status::Corruption("Can't read the same number of rows when 
doing lazy read");
     }
     if (_cached_filtered_rows != 0) {
         _cached_filtered_rows = 0;
     }
+
+    RETURN_IF_ERROR(
+            _fill_partition_columns(block, lazy_read_rows, 
_lazy_read_ctx.partition_columns));
+    RETURN_IF_ERROR(_fill_missing_columns(block, lazy_read_rows, 
_lazy_read_ctx.missing_columns));
     // pre_eof ^ lazy_eof
     // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != 
lazy_eof
 
     // filter data in predicate columns, and remove filter column
     if (select_vector.has_filter()) {
-        //if (block->columns() == origin_column_num) {
-        // the whole row group has been filtered by 
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
-        // generated from next batch, so the filter column is removed ahead.
-        //    DCHECK_EQ(block->rows(), 0);
-        //} else {
         RETURN_IF_CATCH_EXCEPTION(
                 Block::filter_block_internal(block, columns_to_filter, 
result_filter));
         Block::erase_useless_column(block, origin_column_num);
-        //}
     } else {
         Block::erase_useless_column(block, origin_column_num);
     }
@@ -582,8 +567,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     *read_rows = column_size;
 
     *batch_eof = pre_eof;
-    RETURN_IF_ERROR(_fill_partition_columns(block, column_size, 
_lazy_read_ctx.partition_columns));
-    RETURN_IF_ERROR(_fill_missing_columns(block, column_size, 
_lazy_read_ctx.missing_columns));
     if (!_not_single_slot_filter_conjuncts.empty()) {
         
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
                 _not_single_slot_filter_conjuncts, nullptr, block, 
columns_to_filter,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to