This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpch500 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpch500 by this push: new 7c2a912ce34 [Fix](parquet-reader) Fix partiton column issue for #29181O(Opt late materialization of parquet reader). (#29271) 7c2a912ce34 is described below commit 7c2a912ce343f0d96587dc3805aa245f098b2e07 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Fri Dec 29 16:25:31 2023 +0800 [Fix](parquet-reader) Fix partiton column issue for #29181O(Opt late materialization of parquet reader). (#29271) --- be/src/vec/columns/column_nullable.cpp | 13 ------- .../format/parquet/fix_length_plain_decoder.cpp | 5 --- be/src/vec/exec/format/parquet/parquet_common.cpp | 9 ----- .../exec/format/parquet/vparquet_column_reader.cpp | 40 ---------------------- .../exec/format/parquet/vparquet_group_reader.cpp | 25 +++----------- 5 files changed, 4 insertions(+), 88 deletions(-) diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index b44482781e8..426de2d4f70 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -370,19 +370,6 @@ size_t ColumnNullable::filter(const Filter& filter) { return data_result_size; } -//size_t ColumnNullable::filter(const Filter& filter) { -// const auto data_result_size = get_nested_column().filter(filter); -// -// get_null_map_column().resize(data_result_size); -// /*if (!_has_null) { -// get_null_map_column().resize(data_result_size); -// } else { -// const auto map_result_size = get_null_map_column().filter(filter); -// CHECK_EQ(data_result_size, map_result_size); -// }*/ -// return data_result_size; -//} - Status ColumnNullable::filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) { const auto* nullable_col_ptr = reinterpret_cast<const ColumnNullable*>(col_ptr); ColumnPtr nest_col_ptr = nullable_col_ptr->nested_column; diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp index 3160f5f5f1d..ea1f63a3e2d 100644 --- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp @@ -233,11 +233,6 @@ Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column, while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) { switch (read_type) { case ColumnSelectVector::CONTENT: { - /*for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _data->data + _offset; - column_data[data_index++] = *(PhysicalType*)buf_start; - _offset += _type_length; - }*/ memcpy(column_data.data() + data_index, _data->data + _offset, run_length * _type_length); data_index += run_length; diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 241135eef61..095b34eeb4e 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -75,25 +75,16 @@ void ColumnSelectVector::set_run_length_null_map(const std::vector<uint16_t>& ru if (is_null) { _num_nulls += run_length; for (int i = 0; i < run_length; ++i) { - //_data_map[map_index++] = FILTERED_NULL; _data_map[map_index++] = NULL_DATA; } } else { for (int i = 0; i < run_length; ++i) { - //_data_map[map_index++] = FILTERED_CONTENT; _data_map[map_index++] = CONTENT; } } is_null = !is_null; } size_t num_read = 0; - /*DCHECK_LE(_filter_map_index + num_values, _filter_map_size); - for (size_t i = 0; i < num_values; ++i) { - if (_filter_map[_filter_map_index++]) { - _data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : CONTENT; - num_read++; - } - }*/ num_read = num_values; _num_filtered = num_values - num_read; if (null_map != nullptr && num_read > 0) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index f0120f49701..4fe3ae7b597 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -477,7 +477,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter, size_t skip_nums, size_t* skipped_nums) { - //fprintf(stderr, "batch_size: %ld, skip_nums: %ld\n", batch_size, skip_nums); *skipped_nums = 0; if (_chunk_reader->remaining_num_values() == 0) { if (!_chunk_reader->has_next_page()) { @@ -503,35 +502,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr RETURN_IF_ERROR(_chunk_reader->skip_page()); *read_rows = 0; } else { - //bool skip_whole_batch = false; - // Determining whether to skip page or batch will increase the calculation time. - // When the filtering effect is greater than 60%, it is possible to skip the page or batch. - /*if (select_vector.has_filter() && select_vector.filter_ratio() > 0.6) { - // lazy read - size_t remaining_num_values = 0; - for (auto& range : read_ranges) { - remaining_num_values += range.last_row - range.first_row; - } - if (batch_size >= remaining_num_values && - select_vector.can_filter_all(remaining_num_values)) { - // We can skip the whole page if the remaining values is filtered by predicate columns - select_vector.skip(remaining_num_values); - fprintf(stderr, "select_vector.skip1(%ld)\n", remaining_num_values); - _current_row_index += _chunk_reader->remaining_num_values(); - RETURN_IF_ERROR(_chunk_reader->skip_page()); - *read_rows = remaining_num_values; - if (!_chunk_reader->has_next_page()) { - *eof = true; - } - return Status::OK(); - } - skip_whole_batch = - batch_size <= remaining_num_values && select_vector.can_filter_all(batch_size); - if (skip_whole_batch) { - select_vector.skip(batch_size); - fprintf(stderr, "select_vector.skip2(%ld)\n", batch_size); - } - }*/ // load page data to decode or skip values RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent()); @@ -557,15 +527,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr // generate the read values size_t read_values = std::min((size_t)(range.last_row - _current_row_index), batch_size - has_read); - /*if (skip_whole_batch) { - RETURN_IF_ERROR(_skip_values(read_values)); - fprintf(stderr, "_skip_values(%ld)\n", read_values); - } else { - RETURN_IF_ERROR(_read_values(read_values, doris_column, type, select_vector, - is_dict_filter)); - fprintf(stderr, "_read_values(%ld)\n", read_values); - }*/ - //fprintf(stderr, "read_values: %ld\n", read_values); RETURN_IF_ERROR( _read_values(read_values, doris_column, type, select_vector, is_dict_filter)); has_read += read_values; @@ -581,7 +542,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) { *eof = true; } - //fprintf(stderr, "*read_rows: %ld\n", *read_rows); return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 0c05dcf8f1c..5757de40294 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -412,7 +412,6 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data( column_ptr, column_type, select_vector, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter, col_skip_nums, &skipped_nums)); - //fprintf(stderr, "batch_size: %ld, col_read_rows: %ld, loop_rows: %ld\n", batch_size, col_read_rows, loop_rows); col_skip_nums -= skipped_nums; col_read_rows += loop_rows; } @@ -446,12 +445,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re pre_read_rows = 0; pre_eof = false; ColumnSelectVector run_length_vector; - //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows: %ld\n", batch_size, - // pre_read_rows); RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size, &pre_read_rows, &pre_eof, run_length_vector, 0)); - //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows %ld finished\n", batch_size, - // pre_read_rows); if (pre_read_rows == 0) { DCHECK_EQ(pre_eof, true); break; @@ -526,41 +521,31 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re } ColumnSelectVector& select_vector = *select_vector_ptr; - /*std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr; - if (_cached_filtered_rows != 0) { - _rebuild_select_vector(select_vector, rebuild_filter_map, pre_read_rows); - pre_read_rows += _cached_filtered_rows; - _cached_filtered_rows = 0; - }*/ // lazy read columns size_t lazy_read_rows; bool lazy_eof; - //fprintf(stderr, "lazy column pre_read_rows: %ld\n", pre_read_rows); RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows, &lazy_read_rows, &lazy_eof, select_vector, _cached_filtered_rows)); - //fprintf(stderr, "lazy column pre_read_rows: %ld finished\n", pre_read_rows); if (pre_read_rows != lazy_read_rows) { return Status::Corruption("Can't read the same number of rows when doing lazy read"); } if (_cached_filtered_rows != 0) { _cached_filtered_rows = 0; } + + RETURN_IF_ERROR( + _fill_partition_columns(block, lazy_read_rows, _lazy_read_ctx.partition_columns)); + RETURN_IF_ERROR(_fill_missing_columns(block, lazy_read_rows, _lazy_read_ctx.missing_columns)); // pre_eof ^ lazy_eof // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof // filter data in predicate columns, and remove filter column if (select_vector.has_filter()) { - //if (block->columns() == origin_column_num) { - // the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is - // generated from next batch, so the filter column is removed ahead. - // DCHECK_EQ(block->rows(), 0); - //} else { RETURN_IF_CATCH_EXCEPTION( Block::filter_block_internal(block, columns_to_filter, result_filter)); Block::erase_useless_column(block, origin_column_num); - //} } else { Block::erase_useless_column(block, origin_column_num); } @@ -582,8 +567,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re *read_rows = column_size; *batch_eof = pre_eof; - RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns)); - RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns)); if (!_not_single_slot_filter_conjuncts.empty()) { RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org