IMPALA-5842: Write page index in Parquet files This commit builds on the previous work of Pooja Nilangekar: https://gerrit.cloudera.org/#/c/7464/
The commit implements the write path of PARQUET-922: "Add column indexes to parquet.thrift". As specified in the parquet-format, Impala writes the page indexes just before the footer. This allows much more efficient page filtering than using the same information from the 'statistics' field of DataPageHeader. I updated Pooja's python tests as well. Change-Id: Icbacf7fe3b7672e3ce719261ecef445b16f8dec9 Reviewed-on: http://gerrit.cloudera.org:8080/9693 Reviewed-by: Zoltan Borok-Nagy <borokna...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ccf19f9f Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ccf19f9f Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ccf19f9f Branch: refs/heads/master Commit: ccf19f9f8f2914639b6997849a56c13cfd2399b8 Parents: 05e0db3 Author: Zoltan Borok-Nagy <borokna...@cloudera.com> Authored: Mon Apr 9 16:10:00 2018 +0200 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Thu May 17 20:22:02 2018 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-table-writer.cc | 227 +++++++++--- be/src/exec/hdfs-parquet-table-writer.h | 14 +- be/src/exec/parquet-column-stats.h | 51 ++- be/src/exec/parquet-column-stats.inline.h | 62 +++- be/src/util/CMakeLists.txt | 2 + be/src/util/string-util-test.cc | 84 +++++ be/src/util/string-util.cc | 57 +++ be/src/util/string-util.h | 42 +++ common/thrift/parquet.thrift | 85 +++++ testdata/bin/load-dependent-tables.sql | 35 +- .../queries/QueryTest/stats-extrapolation.test | 14 +- tests/query_test/test_chars.py | 33 -- tests/query_test/test_parquet_page_index.py | 365 +++++++++++++++++++ tests/util/get_parquet_metadata.py | 44 ++- 14 files changed, 1002 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 6370859..6277a99 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -36,6 +36,7 @@ #include "util/debug-util.h" #include "util/dict-encoding.h" #include "util/hdfs-util.h" +#include "util/string-util.h" #include "util/rle-encoding.h" #include <sstream> @@ -44,7 +45,6 @@ #include "common/names.h" using namespace impala; -using namespace parquet; using namespace apache::thrift; // Managing file sizes: We need to estimate how big the files being buffered @@ -102,7 +102,10 @@ class HdfsParquetTableWriter::BaseColumnWriter { def_levels_(nullptr), values_buffer_len_(DEFAULT_DATA_PAGE_SIZE), page_stats_base_(nullptr), - row_group_stats_base_(nullptr) { + row_group_stats_base_(nullptr), + table_sink_mem_tracker_(parent_->parent_->mem_tracker()) { + static_assert(std::is_same<decltype(parent_->parent_), HdfsTableSink*>::value, + "'table_sink_mem_tracker_' must point to the mem tracker of an HdfsTableSink"); def_levels_ = parent_->state_->obj_pool()->Add( new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE), DEFAULT_DATA_PAGE_SIZE, 1)); @@ -145,7 +148,7 @@ class HdfsParquetTableWriter::BaseColumnWriter { // Encodes the row group statistics into a parquet::Statistics object and attaches it to // 'meta_data'. - void EncodeRowGroupStats(ColumnMetaData* meta_data) { + void EncodeRowGroupStats(parquet::ColumnMetaData* meta_data) { DCHECK(row_group_stats_base_ != nullptr); if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { row_group_stats_base_->EncodeToThrift(&meta_data->statistics); @@ -162,13 +165,21 @@ class HdfsParquetTableWriter::BaseColumnWriter { current_page_ = nullptr; num_values_ = 0; total_compressed_byte_size_ = 0; - current_encoding_ = Encoding::PLAIN; - next_page_encoding_ = Encoding::PLAIN; + current_encoding_ = parquet::Encoding::PLAIN; + next_page_encoding_ = parquet::Encoding::PLAIN; column_encodings_.clear(); dict_encoding_stats_.clear(); data_encoding_stats_.clear(); // Repetition/definition level encodings are constant. Incorporate them here. - column_encodings_.insert(Encoding::RLE); + column_encodings_.insert(parquet::Encoding::RLE); + offset_index_.page_locations.clear(); + column_index_.null_pages.clear(); + column_index_.min_values.clear(); + column_index_.max_values.clear(); + table_sink_mem_tracker_->Release(page_index_memory_consumption_); + page_index_memory_consumption_ = 0; + column_index_.null_counts.clear(); + valid_column_index_ = true; } // Close this writer. This is only called after Flush() and no more rows will @@ -176,6 +187,9 @@ class HdfsParquetTableWriter::BaseColumnWriter { void Close() { if (compressor_.get() != nullptr) compressor_->Close(); if (dict_encoder_base_ != nullptr) dict_encoder_base_->Close(); + // We must release the memory consumption of this column writer. + table_sink_mem_tracker_->Release(page_index_memory_consumption_); + page_index_memory_consumption_ = 0; } const ColumnType& type() const { return expr_eval_->root().type(); } @@ -211,7 +225,7 @@ class HdfsParquetTableWriter::BaseColumnWriter { struct DataPage { // Page header. This is a union of all page types. - PageHeader header; + parquet::PageHeader header; // Number of bytes needed to store definition levels. int num_def_bytes; @@ -259,21 +273,21 @@ class HdfsParquetTableWriter::BaseColumnWriter { int64_t total_compressed_byte_size_; int64_t total_uncompressed_byte_size_; // Encoding of the current page. - Encoding::type current_encoding_; + parquet::Encoding::type current_encoding_; // Encoding to use for the next page. By default, the same as 'current_encoding_'. // Used by the column writer to switch encoding while writing a column, e.g. if the // dictionary overflows. - Encoding::type next_page_encoding_; + parquet::Encoding::type next_page_encoding_; // Set of all encodings used in the column chunk - unordered_set<Encoding::type> column_encodings_; + unordered_set<parquet::Encoding::type> column_encodings_; // Map from the encoding to the number of pages in the column chunk with this encoding // These are used to construct the PageEncodingStats, which provide information // about encoding usage for each different page type. Currently, only dictionary // and data pages are used. - unordered_map<Encoding::type, int> dict_encoding_stats_; - unordered_map<Encoding::type, int> data_encoding_stats_; + unordered_map<parquet::Encoding::type, int> dict_encoding_stats_; + unordered_map<parquet::Encoding::type, int> data_encoding_stats_; // Created, owned, and set by the derived class. DictEncoderBase* dict_encoder_base_; @@ -292,6 +306,22 @@ class HdfsParquetTableWriter::BaseColumnWriter { // Pointers to statistics, created, owned, and set by the derived class. ColumnStatsBase* page_stats_base_; ColumnStatsBase* row_group_stats_base_; + + // OffsetIndex stores the locations of the pages. + parquet::OffsetIndex offset_index_; + + // ColumnIndex stores the statistics of the pages. + parquet::ColumnIndex column_index_; + + // Pointer to the HdfsTableSink's MemTracker. + MemTracker* table_sink_mem_tracker_; + + // Memory consumption of the min/max values in the page index. + int64_t page_index_memory_consumption_ = 0; + + // Only write ColumnIndex when 'valid_column_index_' is true. We always need to write + // the OffsetIndex though. + bool valid_column_index_ = true; }; // Per type column writer. @@ -312,8 +342,8 @@ class HdfsParquetTableWriter::ColumnWriter : BaseColumnWriter::Reset(); // Default to dictionary encoding. If the cardinality ends up being too high, // it will fall back to plain. - current_encoding_ = Encoding::PLAIN_DICTIONARY; - next_page_encoding_ = Encoding::PLAIN_DICTIONARY; + current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; + next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; dict_encoder_.reset( new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_, parent_->parent_->mem_tracker())); @@ -328,7 +358,7 @@ class HdfsParquetTableWriter::ColumnWriter : protected: virtual bool ProcessValue(void* value, int64_t* bytes_needed) { - if (current_encoding_ == Encoding::PLAIN_DICTIONARY) { + if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { if (UNLIKELY(num_values_since_dict_size_check_ >= DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) { num_values_since_dict_size_check_ = 0; @@ -339,11 +369,11 @@ class HdfsParquetTableWriter::ColumnWriter : // If the dictionary contains the maximum number of values, switch to plain // encoding for the next page. The current page is full and must be written out. if (UNLIKELY(*bytes_needed < 0)) { - next_page_encoding_ = Encoding::PLAIN; + next_page_encoding_ = parquet::Encoding::PLAIN; return false; } parent_->file_size_estimate_ += *bytes_needed; - } else if (current_encoding_ == Encoding::PLAIN) { + } else if (current_encoding_ == parquet::Encoding::PLAIN) { T* v = CastValue(value); *bytes_needed = plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(*v) : @@ -386,8 +416,7 @@ class HdfsParquetTableWriter::ColumnWriter : // Temporary string value to hold CHAR(N) StringValue temp_; - // Tracks statistics per page. These are not written out currently but are merged into - // the row group stats. TODO(IMPALA-5841): Write these to the page index. + // Tracks statistics per page. These are written out to the page index. scoped_ptr<ColumnStats<T>> page_stats_; // Tracks statistics per row group. This gets reset when starting a new row group. @@ -424,7 +453,7 @@ class HdfsParquetTableWriter::BoolColumnWriter : new BitWriter(values_buffer_, values_buffer_len_)); // Dictionary encoding doesn't make sense for bools and is not allowed by // the format. - current_encoding_ = Encoding::PLAIN; + current_encoding_ = parquet::Encoding::PLAIN; dict_encoder_base_ = nullptr; page_stats_base_ = &page_stats_; @@ -455,8 +484,7 @@ class HdfsParquetTableWriter::BoolColumnWriter : // Used to encode bools as single bit values. This is reused across pages. BitWriter* bool_values_; - // Tracks statistics per page. These are not written out currently but are merged into - // the row group stats. TODO(IMPALA-5841): Write these to the page index. + // Tracks statistics per page. These are written out to the page index. ColumnStats<bool> page_stats_; // Tracks statistics per row group. This gets reset when starting a new file. @@ -559,13 +587,13 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, if (dict_encoder_base_ != nullptr) { *first_dictionary_page = *file_pos; // Write dictionary page header - DictionaryPageHeader dict_header; + parquet::DictionaryPageHeader dict_header; dict_header.num_values = dict_encoder_base_->num_entries(); - dict_header.encoding = Encoding::PLAIN_DICTIONARY; + dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY; ++dict_encoding_stats_[dict_header.encoding]; - PageHeader header; - header.type = PageType::DICTIONARY_PAGE; + parquet::PageHeader header; + header.type = parquet::PageType::DICTIONARY_PAGE; header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size(); header.__set_dictionary_page_header(dict_header); @@ -608,15 +636,26 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, } *first_data_page = *file_pos; + int64_t current_row_group_index = 0; + offset_index_.page_locations.resize(num_data_pages_); + // Write data pages for (int i = 0; i < num_data_pages_; ++i) { DataPage& page = pages_[i]; + parquet::PageLocation location; if (page.header.data_page_header.num_values == 0) { // Skip empty pages + location.offset = -1; + location.compressed_page_size = 0; + location.first_row_index = -1; + offset_index_.page_locations[i] = location; continue; } + location.offset = *file_pos; + location.first_row_index = current_row_group_index; + // Write data page header uint8_t* buffer = nullptr; uint32_t len = 0; @@ -625,9 +664,17 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, RETURN_IF_ERROR(parent_->Write(buffer, len)); *file_pos += len; + // Note that the namings are confusing here: + // parquet::PageHeader::compressed_page_size is the compressed page size in bytes, as + // its name suggests. On the other hand, parquet::PageLocation::compressed_page_size + // also includes the size of the page header. + location.compressed_page_size = page.header.compressed_page_size + len; + offset_index_.page_locations[i] = location; + // Write the page data RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size)); *file_pos += page.header.compressed_page_size; + current_row_group_index += page.header.data_page_header.num_values; } return Status::OK(); } @@ -639,11 +686,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { // If the entire page was NULL, encode it as PLAIN since there is no // data anyway. We don't output a useless dictionary page and it works // around a parquet MR bug (see IMPALA-759 for more details). - if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN; + if (current_page_->num_non_null == 0) current_encoding_ = parquet::Encoding::PLAIN; - if (current_encoding_ == Encoding::PLAIN_DICTIONARY) WriteDictDataPage(); + if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) WriteDictDataPage(); - PageHeader& header = current_page_->header; + parquet::PageHeader& header = current_page_->header; header.data_page_header.encoding = current_encoding_; // Accumulate encoding statistics @@ -698,9 +745,41 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { max_compressed_size - header.compressed_page_size); } + DCHECK(page_stats_base_ != nullptr); + parquet::Statistics page_stats; + page_stats_base_->EncodeToThrift(&page_stats); + { + // If pages_stats contains min_value and max_value, then append them to min_values_ + // and max_values_ and also mark the page as not null. In case min and max values are + // not set, push empty strings to maintain the consistency of the index and mark the + // page as null. Always push the null_count. + string min_val; + string max_val; + if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) { + Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH, + &min_val); + Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH, + &max_val); + if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false; + column_index_.null_pages.push_back(false); + } else { + DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value); + column_index_.null_pages.push_back(true); + DCHECK_EQ(page_stats.null_count, num_values_); + } + int64_t new_memory_allocation = min_val.capacity() + max_val.capacity(); + if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) { + return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_, + "Failed to allocate memory for Parquet page index.", new_memory_allocation); + } + page_index_memory_consumption_ += new_memory_allocation; + column_index_.min_values.emplace_back(std::move(min_val)); + column_index_.max_values.emplace_back(std::move(max_val)); + column_index_.null_counts.push_back(page_stats.null_count); + } + // Update row group statistics from page statistics. DCHECK(row_group_stats_base_ != nullptr); - DCHECK(page_stats_base_ != nullptr); row_group_stats_base_->Merge(*page_stats_base_); // Add the size of the data page header @@ -728,13 +807,13 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() { pages_.push_back(DataPage()); current_page_ = &pages_[num_data_pages_++]; - DataPageHeader header; + parquet::DataPageHeader header; header.num_values = 0; // The code that populates the column chunk metadata's encodings field // relies on these specific values for the definition/repetition level // encodings. - header.definition_level_encoding = Encoding::RLE; - header.repetition_level_encoding = Encoding::RLE; + header.definition_level_encoding = parquet::Encoding::RLE; + header.repetition_level_encoding = parquet::Encoding::RLE; current_page_->header.__set_data_page_header(header); } current_encoding_ = next_page_encoding_; @@ -861,14 +940,14 @@ Status HdfsParquetTableWriter::CreateSchema() { const ColumnType& type = output_expr_evals_[i]->root().type(); node.name = table_desc_->col_descs()[i + num_clustering_cols].name(); node.__set_type(ConvertInternalToParquetType(type.type)); - node.__set_repetition_type(FieldRepetitionType::OPTIONAL); + node.__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL); if (type.type == TYPE_DECIMAL) { // This column is type decimal. Update the file metadata to include the // additional fields: // 1) converted_type: indicate this is really a decimal column. // 2) type_length: the number of bytes used per decimal value in the data // 3) precision/scale - node.__set_converted_type(ConvertedType::DECIMAL); + node.__set_converted_type(parquet::ConvertedType::DECIMAL); node.__set_type_length( ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type())); node.__set_scale(output_expr_evals_[i]->root().type().scale); @@ -876,15 +955,15 @@ Status HdfsParquetTableWriter::CreateSchema() { } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR || (type.type == TYPE_STRING && state_->query_options().parquet_annotate_strings_utf8)) { - node.__set_converted_type(ConvertedType::UTF8); + node.__set_converted_type(parquet::ConvertedType::UTF8); } else if (type.type == TYPE_TINYINT) { - node.__set_converted_type(ConvertedType::INT_8); + node.__set_converted_type(parquet::ConvertedType::INT_8); } else if (type.type == TYPE_SMALLINT) { - node.__set_converted_type(ConvertedType::INT_16); + node.__set_converted_type(parquet::ConvertedType::INT_16); } else if (type.type == TYPE_INT) { - node.__set_converted_type(ConvertedType::INT_32); + node.__set_converted_type(parquet::ConvertedType::INT_32); } else if (type.type == TYPE_BIGINT) { - node.__set_converted_type(ConvertedType::INT_64); + node.__set_converted_type(parquet::ConvertedType::INT_64); } } @@ -893,14 +972,14 @@ Status HdfsParquetTableWriter::CreateSchema() { Status HdfsParquetTableWriter::AddRowGroup() { if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup()); - file_metadata_.row_groups.push_back(RowGroup()); + file_metadata_.row_groups.push_back(parquet::RowGroup()); current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1]; // Initialize new row group metadata. int num_clustering_cols = table_desc_->num_clustering_cols(); current_row_group_->columns.resize(columns_.size()); for (int i = 0; i < columns_.size(); ++i) { - ColumnMetaData metadata; + parquet::ColumnMetaData metadata; metadata.type = ConvertInternalToParquetType(columns_[i]->type().type); metadata.path_in_schema.push_back( table_desc_->col_descs()[i + num_clustering_cols].name()); @@ -1029,12 +1108,13 @@ Status HdfsParquetTableWriter::Finalize() { file_metadata_.num_rows = row_count_; // Set the ordering used to write parquet statistics for columns in the file. - ColumnOrder col_order = ColumnOrder(); - col_order.__set_TYPE_ORDER(TypeDefinedOrder()); + parquet::ColumnOrder col_order = parquet::ColumnOrder(); + col_order.__set_TYPE_ORDER(parquet::TypeDefinedOrder()); file_metadata_.column_orders.assign(columns_.size(), col_order); file_metadata_.__isset.column_orders = true; RETURN_IF_ERROR(FlushCurrentRowGroup()); + RETURN_IF_ERROR(WritePageIndex()); RETURN_IF_ERROR(WriteFileFooter()); stats_.__set_parquet_stats(parquet_insert_stats_); COUNTER_ADD(parent_->rows_inserted_counter(), row_count_); @@ -1069,8 +1149,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, &dict_page_offset)); DCHECK_GT(data_page_offset, 0); - ColumnChunk& col_chunk = current_row_group_->columns[i]; - ColumnMetaData& col_metadata = col_chunk.meta_data; + parquet::ColumnChunk& col_chunk = current_row_group_->columns[i]; + parquet::ColumnMetaData& col_metadata = col_chunk.meta_data; col_metadata.data_page_offset = data_page_offset; if (dict_page_offset >= 0) { col_metadata.__set_dictionary_page_offset(dict_page_offset); @@ -1089,23 +1169,23 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { // Write encodings and encoding stats for this column col_metadata.encodings.clear(); - for (Encoding::type encoding : col_writer->column_encodings_) { + for (parquet::Encoding::type encoding : col_writer->column_encodings_) { col_metadata.encodings.push_back(encoding); } - vector<PageEncodingStats> encoding_stats; + vector<parquet::PageEncodingStats> encoding_stats; // Add dictionary page encoding stats for (const auto& entry: col_writer->dict_encoding_stats_) { - PageEncodingStats dict_enc_stat; - dict_enc_stat.page_type = PageType::DICTIONARY_PAGE; + parquet::PageEncodingStats dict_enc_stat; + dict_enc_stat.page_type = parquet::PageType::DICTIONARY_PAGE; dict_enc_stat.encoding = entry.first; dict_enc_stat.count = entry.second; encoding_stats.push_back(dict_enc_stat); } // Add data page encoding stats for (const auto& entry: col_writer->data_encoding_stats_) { - PageEncodingStats data_enc_stat; - data_enc_stat.page_type = PageType::DATA_PAGE; + parquet::PageEncodingStats data_enc_stat; + data_enc_stat.page_type = parquet::PageType::DATA_PAGE; data_enc_stat.encoding = entry.first; data_enc_stat.count = entry.second; encoding_stats.push_back(data_enc_stat); @@ -1129,8 +1209,6 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { thrift_serializer_->Serialize(¤t_row_group_->columns[i], &len, &buffer)); RETURN_IF_ERROR(Write(buffer, len)); file_pos_ += len; - - col_writer->Reset(); } // Populate RowGroup::sorting_columns with all columns specified by the Frontend. @@ -1148,6 +1226,47 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { return Status::OK(); } +Status HdfsParquetTableWriter::WritePageIndex() { + // Currently Impala only write Parquet files with a single row group. The current + // page index logic depends on this behavior as it only keeps one row group's + // statistics in memory. + DCHECK_EQ(file_metadata_.row_groups.size(), 1); + + parquet::RowGroup* row_group = &(file_metadata_.row_groups[0]); + // Write out the column indexes. + for (int i = 0; i < columns_.size(); ++i) { + auto& column = *columns_[i]; + if (!column.valid_column_index_) continue; + column.column_index_.__set_boundary_order( + column.row_group_stats_base_->GetBoundaryOrder()); + // We always set null_counts. + column.column_index_.__isset.null_counts = true; + uint8_t* buffer = nullptr; + uint32_t len = 0; + RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.column_index_, &len, &buffer)); + RETURN_IF_ERROR(Write(buffer, len)); + // Update the column_index_offset and column_index_length of the ColumnChunk + row_group->columns[i].__set_column_index_offset(file_pos_); + row_group->columns[i].__set_column_index_length(len); + file_pos_ += len; + } + // Write out the offset indexes. + for (int i = 0; i < columns_.size(); ++i) { + auto& column = *columns_[i]; + uint8_t* buffer = nullptr; + uint32_t len = 0; + RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.offset_index_, &len, &buffer)); + RETURN_IF_ERROR(Write(buffer, len)); + // Update the offset_index_offset and offset_index_length of the ColumnChunk + row_group->columns[i].__set_offset_index_offset(file_pos_); + row_group->columns[i].__set_offset_index_length(len); + file_pos_ += len; + } + // Reset column writers. + for (auto& column : columns_) column->Reset(); + return Status::OK(); +} + Status HdfsParquetTableWriter::WriteFileFooter() { // Write file_meta_data uint32_t file_metadata_len = 0; http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/hdfs-parquet-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h index 1334b19..fd77bf7 100644 --- a/be/src/exec/hdfs-parquet-table-writer.h +++ b/be/src/exec/hdfs-parquet-table-writer.h @@ -103,6 +103,12 @@ class HdfsParquetTableWriter : public HdfsTableWriter { /// as 'parquet-mr'. static const int MAX_COLUMN_STATS_SIZE = 4 * 1024; + /// In parquet::ColumnIndex we store the min and max values for each page. + /// However, we don't want to store very long strings, so we truncate them. + /// The value of it must not be too small, since we don't want to truncate + /// non-string values. + static const int PAGE_INDEX_MAX_STRING_LENGTH = 64; + /// Per-column information state. This contains some metadata as well as the /// data buffers. class BaseColumnWriter; @@ -120,10 +126,14 @@ class HdfsParquetTableWriter : public HdfsTableWriter { /// table_desc_ into the format in the file metadata Status CreateSchema(); - /// Write the file header information to the output file. + /// Writes the file header information to the output file. Status WriteFileHeader(); - /// Write the file metadata and footer. + /// Writes the column index and offset index of each page in the file. + /// It also resets the column writers. + Status WritePageIndex(); + + /// Writes the file metadata and footer. Status WriteFileFooter(); /// Flushes the current row group to file. This will compute the final http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/parquet-column-stats.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h index 44d4a65..6d2743a 100644 --- a/be/src/exec/parquet-column-stats.h +++ b/be/src/exec/parquet-column-stats.h @@ -26,6 +26,8 @@ #include "runtime/timestamp-value.h" #include "runtime/types.h" +#include "gen-cpp/parquet_types.h" + namespace impala { /// This class, together with its derivatives, is used to update column statistics when @@ -67,6 +69,11 @@ class ColumnStatsBase { struct MinMaxTrait { static decltype(auto) MinValue(const T& a, const T& b) { return std::min(a, b); } static decltype(auto) MaxValue(const T& a, const T& b) { return std::max(a, b); } + static int Compare(const T& a, const T& b) { + if (a < b) return -1; + if (a > b) return 1; + return 0; + } }; /// min and max functions for floating point types @@ -74,6 +81,13 @@ class ColumnStatsBase { struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> { static decltype(auto) MinValue(const T& a, const T& b) { return std::fmin(a, b); } static decltype(auto) MaxValue(const T& a, const T& b) { return std::fmax(a, b); } + static int Compare(const T& a, const T& b) { + //TODO: Should be aligned with PARQUET-1222, once resolved + if (a == b) return 0; + if (std::isnan(a) && std::isnan(b)) return 0; + if (MaxValue(a, b) == a) return 1; + return -1; + } }; ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {} @@ -94,7 +108,8 @@ class ColumnStatsBase { int64_t* null_count); /// Merges this statistics object with values from 'other'. If other has not been - /// initialized, then this object will not be changed. + /// initialized, then this object will not be changed. It maintains internal state that + /// tracks whether the min/max values are ordered. virtual void Merge(const ColumnStatsBase& other) = 0; /// Copies the contents of this object's statistics values to internal buffers. Some @@ -119,6 +134,18 @@ class ColumnStatsBase { /// value is appended to the column or the statistics are merged. void IncrementNullCount(int64_t count) { null_count_ += count; } + /// Returns the boundary order of the pages. That is, whether the lists of min/max + /// elements inside the ColumnIndex are ordered and if so, in which direction. + /// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is true, + /// it means all elements are equal, we choose ascending order in this case. + /// If only one flag is true, or both of them is false, then we return the identified + /// ordering, or unordered. + parquet::BoundaryOrder::type GetBoundaryOrder() const { + if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING; + if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING; + return parquet::BoundaryOrder::UNORDERED; + } + protected: // Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'. // 'buffer' is reset before making the copy. @@ -130,6 +157,16 @@ class ColumnStatsBase { // Number of null values since the last call to Reset(). int64_t null_count_; + // If true, min/max values are ascending. + // We assume the values are ascending, so start with true and only make it false when + // we find a descending value. If not all values are equal, then at least one of + // 'ascending_boundary_order_' and 'descending_boundary_order_' will be false. + bool ascending_boundary_order_ = true; + + // If true, min/max values are descending. + // See description of 'ascending_boundary_order_'. + bool descending_boundary_order_ = true; + private: /// Returns true if we support reading statistics stored in the fields 'min_value' and /// 'max_value' in parquet::Statistics for the type 'col_type' and the column order @@ -174,7 +211,9 @@ class ColumnStats : public ColumnStatsBase { plain_encoded_value_size_(plain_encoded_value_size), mem_pool_(mem_pool), min_buffer_(mem_pool), - max_buffer_(mem_pool) {} + max_buffer_(mem_pool), + prev_page_min_buffer_(mem_pool), + prev_page_max_buffer_(mem_pool) {} /// Updates the statistics based on the values min_value and max_value. If necessary, /// initializes the statistics. It may keep a reference to either value until @@ -216,12 +255,20 @@ class ColumnStats : public ColumnStatsBase { // Maximum value since the last call to Reset(). T max_value_; + // Minimum value of the previous page. Need to store that to calculate boundary order. + T prev_page_min_value_; + + // Maximum value of the previous page. Need to store that to calculate boundary order. + T prev_page_max_value_; + // Memory pool to allocate from when making copies of the statistics data. MemPool* mem_pool_; // Local buffers to copy statistics data into. StringBuffer min_buffer_; StringBuffer max_buffer_; + StringBuffer prev_page_min_buffer_; + StringBuffer prev_page_max_buffer_; }; } // end ns impala http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/parquet-column-stats.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h index 0b618f9..094fadd 100644 --- a/be/src/exec/parquet-column-stats.inline.h +++ b/be/src/exec/parquet-column-stats.inline.h @@ -28,6 +28,8 @@ namespace impala { inline void ColumnStatsBase::Reset() { has_min_max_values_ = false; null_count_ = 0; + ascending_boundary_order_ = true; + descending_boundary_order_ = true; } template <typename T> @@ -46,7 +48,25 @@ template <typename T> inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) { DCHECK(dynamic_cast<const ColumnStats<T>*>(&other)); const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other); - if (cs->has_min_max_values_) Update(cs->min_value_, cs->max_value_); + if (cs->has_min_max_values_) { + if (has_min_max_values_) { + if (ascending_boundary_order_) { + if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 || + MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) { + ascending_boundary_order_ = false; + } + } + if (descending_boundary_order_) { + if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 || + MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) { + descending_boundary_order_ = false; + } + } + } + Update(cs->min_value_, cs->max_value_); + prev_page_min_value_ = cs->min_value_; + prev_page_max_value_ = cs->max_value_; + } IncrementNullCount(cs->null_count_); } @@ -176,12 +196,52 @@ inline void ColumnStats<StringValue>::Update( } } +template <> +inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) { + DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other)); + const ColumnStats<StringValue>* cs = static_cast< + const ColumnStats<StringValue>*>(&other); + if (cs->has_min_max_values_) { + if (has_min_max_values_) { + // Make sure that we copied the previous page's min/max values to their own buffer. + DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr), + static_cast<void*>(cs->min_value_.ptr)); + DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr), + static_cast<void*>(cs->max_value_.ptr)); + if (ascending_boundary_order_) { + if (prev_page_max_value_ > cs->max_value_ || + prev_page_min_value_ > cs->min_value_) { + ascending_boundary_order_ = false; + } + } + if (descending_boundary_order_) { + if (prev_page_max_value_ < cs->max_value_ || + prev_page_min_value_ < cs->min_value_) { + descending_boundary_order_ = false; + } + } + } + Update(cs->min_value_, cs->max_value_); + prev_page_min_value_ = cs->min_value_; + prev_page_max_value_ = cs->max_value_; + prev_page_min_buffer_.Clear(); + prev_page_max_buffer_.Clear(); + } + IncrementNullCount(cs->null_count_); +} + // StringValues need to be copied at the end of processing a row batch, since the batch // memory will be released. template <> inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() { if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_)); if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_)); + if (prev_page_min_buffer_.IsEmpty()) { + RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, &prev_page_min_value_)); + } + if (prev_page_max_buffer_.IsEmpty()) { + RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, &prev_page_max_value_)); + } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 546c1c0..d9092ce 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -74,6 +74,7 @@ add_library(Util runtime-profile.cc simple-logger.cc string-parser.cc + string-util.cc symbols-util.cc static-asserts.cc summary-util.cc @@ -135,6 +136,7 @@ ADD_BE_TEST(redactor-unconfigured-test) ADD_BE_TEST(rle-test) ADD_BE_TEST(runtime-profile-test) ADD_BE_TEST(string-parser-test) +ADD_BE_TEST(string-util-test) ADD_BE_TEST(symbols-util-test) ADD_BE_TEST(sys-info-test) ADD_BE_TEST(thread-pool-test) http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/string-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/string-util-test.cc b/be/src/util/string-util-test.cc new file mode 100644 index 0000000..979eb9f --- /dev/null +++ b/be/src/util/string-util-test.cc @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "testutil/gtest-util.h" + +#include "util/string-util.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" + +#include "common/names.h" + +namespace impala { + +enum Truncation { + DOWN, + UP +}; + +void EvalTruncation(const string& original, const string& expected_result, + int32_t max_length, Truncation boundary) { + string result; + if (boundary == DOWN) { + ASSERT_OK(TruncateDown(original, max_length, &result)); + } else { + ASSERT_OK(TruncateUp(original, max_length, &result)); + } + EXPECT_EQ(expected_result, result); +} + +TEST(TruncateDownTest, Basic) { + EvalTruncation("0123456789", "0123456789", 100, DOWN); + EvalTruncation("0123456789", "0123456789", 10, DOWN); + EvalTruncation("0123456789", "01234", 5, DOWN); + EvalTruncation("0123456789", "", 0, DOWN); + EvalTruncation("", "", 10, DOWN); + EvalTruncation(string("\0\0\0", 3), string("\0\0", 2), 2, DOWN); + EvalTruncation("asdfghjkl", "asdf", 4, DOWN); + char a[] = {'a', CHAR_MAX, CHAR_MIN, 'b', '\0'}; + char b[] = {'a', CHAR_MAX, '\0'}; + EvalTruncation(a, b, 2, DOWN); +} + +TEST(TruncateUpTest, Basic) { + EvalTruncation("0123456789", "0123456789", 100, UP); + EvalTruncation("abcdefghij", "abcdefghij", 10, UP); + EvalTruncation("abcdefghij", "abcdefghj", 9, UP); + EvalTruncation("abcdefghij", "abcdf", 5, UP); + + string max_string(100, 0xFF); + EvalTruncation(max_string, max_string, 100, UP); + + string normal_plus_max = "abcdef" + max_string; + EvalTruncation(normal_plus_max, normal_plus_max, 200, UP); + EvalTruncation(normal_plus_max, "abcdeg", 10, UP); + + string result; + Status s = TruncateUp(max_string, 10, &result); + EXPECT_EQ(s.GetDetail(), "TruncateUp() couldn't increase string.\n"); + + EvalTruncation("", "", 10, UP); + EvalTruncation(string("\0\0\0", 3), string("\0\001", 2), 2, UP); + EvalTruncation("asdfghjkl", "asdg", 4, UP); + char a[] = {0, (char)0x7F, (char)0xFF, 0}; + char b[] = {0, (char)0x80, 0}; + EvalTruncation(a, b, 2, UP); +} + +} + +IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/string-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/string-util.cc b/be/src/util/string-util.cc new file mode 100644 index 0000000..b6c8fb7 --- /dev/null +++ b/be/src/util/string-util.cc @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> + +#include "gutil/strings/substitute.h" +#include "util/string-util.h" + +#include "common/names.h" + +namespace impala { + +Status TruncateDown(const string& str, int32_t max_length, string* result) { + DCHECK(result != nullptr); + *result = str.substr(0, std::min(static_cast<int32_t>(str.length()), max_length)); + return Status::OK(); +} + +Status TruncateUp(const string& str, int32_t max_length, string* result) { + DCHECK(result != nullptr); + if (str.length() <= max_length) { + *result = str; + return Status::OK(); + } + + *result = str.substr(0, max_length); + int i = max_length - 1; + while (i > 0 && static_cast<int32_t>((*result)[i]) == -1) { + (*result)[i] += 1; + --i; + } + // We convert it to unsigned because signed overflow results in undefined behavior. + unsigned char uch = static_cast<unsigned char>((*result)[i]); + uch += 1; + (*result)[i] = uch; + if (i == 0 && (*result)[i] == 0) { + return Status("TruncateUp() couldn't increase string."); + } + result->resize(i + 1); + return Status::OK(); +} + +} http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/string-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/string-util.h b/be/src/util/string-util.h new file mode 100644 index 0000000..7e7ab12 --- /dev/null +++ b/be/src/util/string-util.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_UTIL_STRING_UTIL_H +#define IMPALA_UTIL_STRING_UTIL_H + +#include <string> + +#include "common/status.h" + +namespace impala { + +/// 'str' holds the minimum value of some string set. We need to truncate it +/// if it is longer than 'max_length'. +WARN_UNUSED_RESULT +Status TruncateDown(const std::string& str, int32_t max_length, std::string* result); + +/// 'str' holds the maximum value of some string set. We want to truncate it +/// to only occupy 'max_length' bytes. We also want to guarantee that the truncated +/// value remains greater than all the strings in the original set, so we need +/// to increase it after truncation. E.g.: when 'max_length' == 3: AAAAAAA => AAB +/// Returns error if it cannot increase the string value, ie. all bytes are 0xFF. +WARN_UNUSED_RESULT +Status TruncateUp(const std::string& str, int32_t max_length, std::string* result); + +} + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/common/thrift/parquet.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/parquet.thrift b/common/thrift/parquet.thrift index c4afb77..3666a43 100644 --- a/common/thrift/parquet.thrift +++ b/common/thrift/parquet.thrift @@ -362,6 +362,16 @@ enum PageType { DATA_PAGE_V2 = 3; } +/** + * Enum to annotate whether lists of min/max elements inside ColumnIndex + * are ordered and if so, in which direction. + */ +enum BoundaryOrder { + UNORDERED = 0; + ASCENDING = 1; + DESCENDING = 2; +} + /** Data page header */ struct DataPageHeader { /** Number of values, including NULLs, in this data page. **/ @@ -551,6 +561,18 @@ struct ColumnChunk { * metadata. **/ 3: optional ColumnMetaData meta_data + + /** File offset of ColumnChunk's OffsetIndex **/ + 4: optional i64 offset_index_offset + + /** Size of ColumnChunk's OffsetIndex, in bytes **/ + 5: optional i32 offset_index_length + + /** File offset of ColumnChunk's ColumnIndex **/ + 6: optional i64 column_index_offset + + /** Size of ColumnChunk's ColumnIndex, in bytes **/ + 7: optional i32 column_index_length } struct RowGroup { @@ -588,6 +610,69 @@ union ColumnOrder { 1: TypeDefinedOrder TYPE_ORDER; } +struct PageLocation { + /** Offset of the page in the file **/ + 1: required i64 offset + + /** + * Size of the page, including header. Sum of compressed_page_size and header + * length + */ + 2: required i32 compressed_page_size + + /** + * Index within the RowGroup of the first row of the page; this means pages + * change on record boundaries (r = 0). + */ + 3: required i64 first_row_index +} + +struct OffsetIndex { + /** + * PageLocations, ordered by increasing PageLocation.offset. It is required + * that page_locations[i].first_row_index < page_locations[i+1].first_row_index. + */ + 1: required list<PageLocation> page_locations +} + +/** + * Description for ColumnIndex. + * Each <array-field>[i] refers to the page at OffsetIndex.page_locations[i] + */ +struct ColumnIndex { + /** + * A list of Boolean values to determine the validity of the corresponding + * min and max values. If true, a page contains only null values, and writers + * have to set the corresponding entries in min_values and max_values to + * byte[0], so that all lists have the same length. If false, the + * corresponding entries in min_values and max_values must be valid. + */ + 1: required list<bool> null_pages + + /** + * Two lists containing lower and upper bounds for the values of each page. + * These may be the actual minimum and maximum values found on a page, but + * can also be (more compact) values that do not exist on a page. For + * example, instead of storing ""Blart Versenwald III", a writer may set + * min_values[i]="B", max_values[i]="C". Such more compact values must still + * be valid values within the column's logical type. Readers must make sure + * that list entries are populated before using them by inspecting null_pages. + */ + 2: required list<binary> min_values + 3: required list<binary> max_values + + /** + * Stores whether both min_values and max_values are orderd and if so, in + * which direction. This allows readers to perform binary searches in both + * lists. Readers cannot assume that max_values[i] <= min_values[i+1], even + * if the lists are ordered. + */ + 4: required BoundaryOrder boundary_order + + /** A list containing the number of null values for each page **/ + 5: optional list<i64> null_counts +} + /** * Description for file metadata */ http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/testdata/bin/load-dependent-tables.sql ---------------------------------------------------------------------- diff --git a/testdata/bin/load-dependent-tables.sql b/testdata/bin/load-dependent-tables.sql index 7b25375..9de462f 100644 --- a/testdata/bin/load-dependent-tables.sql +++ b/testdata/bin/load-dependent-tables.sql @@ -66,6 +66,38 @@ ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=2) ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=3) SET FILEFORMAT RCFILE; +DROP TABLE IF EXISTS functional_parquet.chars_formats; +CREATE EXTERNAL TABLE functional_parquet.chars_formats +(cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) +STORED AS PARQUET +LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_parquet'; + +DROP TABLE IF EXISTS functional_orc_def.chars_formats; +CREATE EXTERNAL TABLE functional_orc_def.chars_formats +(cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) +STORED AS ORC +LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_orc_def'; + +DROP TABLE IF EXISTS functional.chars_formats; +CREATE EXTERNAL TABLE functional.chars_formats +(cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) +ROW FORMAT delimited fields terminated by ',' escaped by '\\' +STORED AS TEXTFILE +LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_text'; + +DROP TABLE IF EXISTS functional_avro_snap.chars_formats; +CREATE EXTERNAL TABLE functional_avro_snap.chars_formats +(cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) +STORED AS AVRO +LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_avro_snap' +TBLPROPERTIES ('avro.schema.literal'='{"type":"record", +"name":"CharTypesTest","doc":"Schema generated by Kite", +"fields":[ +{"name":"cs","type":["null","string"], "doc":"Type inferred"}, +{"name":"cl","type":["null","string"], "doc":"Type inferred"}, +{"name":"vc","type":["null","string"], "doc":"Type inferred"} +]}'); + ---- Unsupported Impala table types USE functional; CREATE VIEW IF NOT EXISTS hive_view AS SELECT 1 AS int_col FROM alltypes limit 1; @@ -74,4 +106,5 @@ USE functional; DROP INDEX IF EXISTS hive_index ON alltypes; CREATE INDEX hive_index ON TABLE alltypes (int_col) AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' -WITH DEFERRED REBUILD IN TABLE hive_index_tbl +WITH DEFERRED REBUILD IN TABLE hive_index_tbl; + http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test index ceaf1d0..55ec0a8 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test @@ -33,17 +33,17 @@ show table stats alltypes YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION ---- RESULTS '2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1' -'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2' -'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3' +'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2' +'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3' '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4' -'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5' +'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5' '2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6' -'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7' -'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8' +'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7' +'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8' '2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9' -'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10' +'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10' '2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11' -'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12' +'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12' 'Total','',3650,3650,12,regex:.*B,'0B','','','','' ---- TYPES STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/tests/query_test/test_chars.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py index 4444410..86ec095 100644 --- a/tests/query_test/test_chars.py +++ b/tests/query_test/test_chars.py @@ -47,39 +47,6 @@ class TestCharFormats(ImpalaTestSuite): def get_workload(cls): return 'functional-query' - def setup_method(self, method): - self.__create_char_tables() - - def __create_char_tables(self): - self.client.execute('''create external table if not exists - functional_parquet.chars_formats - (cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) - STORED AS PARQUET - LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_parquet"))) - self.client.execute('''create external table if not exists - functional_orc_def.chars_formats - (cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) - STORED AS ORC - LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_orc_def"))) - self.client.execute('''create external table if not exists - functional.chars_formats - (cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) - ROW FORMAT delimited fields terminated by ',' escaped by '\\\\' - STORED AS TEXTFILE - LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_text"))) - self.client.execute('''create external table if not exists - functional_avro_snap.chars_formats - (cs CHAR(5), cl CHAR(140), vc VARCHAR(32)) - STORED AS AVRO - LOCATION "{0}" - TBLPROPERTIES ('avro.schema.literal'='{{"type":"record", - "name":"CharTypesTest","doc":"Schema generated by Kite", - "fields":[ - {{"name":"cs","type":["null","string"], "doc":"Type inferred"}}, - {{"name":"cl","type":["null","string"], "doc":"Type inferred"}}, - {{"name":"vc","type":["null","string"],"doc":"Type inferred"}}]}}') - '''.format(get_fs_path("/test-warehouse/chars_formats_avro_snap"))) - @classmethod def add_test_dimensions(cls): super(TestCharFormats, cls).add_test_dimensions() http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/tests/query_test/test_parquet_page_index.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py new file mode 100644 index 0000000..51632e5 --- /dev/null +++ b/tests/query_test/test_parquet_page_index.py @@ -0,0 +1,365 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Targeted Impala insert tests + +import os + +from collections import namedtuple +from subprocess import check_call +from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.util.filesystem_utils import get_fs_path +from tests.util.get_parquet_metadata import ( + decode_stats_value, + get_parquet_metadata, + read_serialized_object +) + +PAGE_INDEX_MAX_STRING_LENGTH = 64 + + +class TestHdfsParquetTableIndexWriter(ImpalaTestSuite): + """Since PARQUET-922 page statistics can be written before the footer. + The tests in this class checks if Impala writes the page indices correctly. + """ + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + + def _get_row_group_from_file(self, parquet_file): + """Returns namedtuples that contain the schema, stats, offset_index, column_index, + and page_headers for each column in the first row group in file 'parquet_file'. Fails + if the file contains multiple row groups. + """ + ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index', + 'column_index', 'page_headers']) + + file_meta_data = get_parquet_metadata(parquet_file) + assert len(file_meta_data.row_groups) == 1 + # We only support flat schemas, the additional element is the root element. + schemas = file_meta_data.schema[1:] + row_group = file_meta_data.row_groups[0] + assert len(schemas) == len(row_group.columns) + row_group_index = [] + with open(parquet_file) as file_handle: + for column, schema in zip(row_group.columns, schemas): + column_index_offset = column.column_index_offset + column_index_length = column.column_index_length + column_index = None + if column_index_offset and column_index_length: + column_index = read_serialized_object(ColumnIndex, file_handle, + column_index_offset, column_index_length) + column_meta_data = column.meta_data + stats = None + if column_meta_data: + stats = column_meta_data.statistics + + offset_index_offset = column.offset_index_offset + offset_index_length = column.offset_index_length + offset_index = None + page_headers = [] + if offset_index_offset and offset_index_length: + offset_index = read_serialized_object(OffsetIndex, file_handle, + offset_index_offset, offset_index_length) + for page_loc in offset_index.page_locations: + page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset, + page_loc.compressed_page_size) + page_headers.append(page_header) + + column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers) + row_group_index.append(column_info) + return row_group_index + + def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir): + """Returns a list of column infos (containing the schema, stats, offset_index, + column_index, and page_headers) for the first row group in all parquet files in + 'hdfs_path'. + """ + row_group_indexes = [] + check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath]) + for root, subdirs, files in os.walk(tmpdir.strpath): + for f in files: + parquet_file = os.path.join(root, str(f)) + row_group_indexes.append(self._get_row_group_from_file(parquet_file)) + return row_group_indexes + + def _validate_page_locations(self, page_locations): + """Validate that the page locations are in order.""" + for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]): + assert previous_loc.offset < current_loc.offset + assert previous_loc.first_row_index < current_loc.first_row_index + + def _validate_null_stats(self, index_size, column_info): + """Validates the statistics stored in null_pages and null_counts.""" + column_index = column_info.column_index + column_stats = column_info.stats + assert column_index.null_pages is not None + assert len(column_index.null_pages) == index_size + assert column_index.null_counts is not None + assert len(column_index.null_counts) == index_size + + for page_is_null, null_count, page_header in zip(column_index.null_pages, + column_index.null_counts, column_info.page_headers): + assert page_header.type == PageType.DATA_PAGE + num_values = page_header.data_page_header.num_values + assert not page_is_null or null_count == num_values + + if column_stats: + assert column_stats.null_count == sum(column_index.null_counts) + + def _validate_min_max_values(self, index_size, column_info): + """Validate min/max values of the pages in a column chunk.""" + column_index = column_info.column_index + min_values = column_info.column_index.min_values + assert len(min_values) == index_size + max_values = column_info.column_index.max_values + assert len(max_values) == index_size + + if not column_info.stats: + return + + column_min_value_str = column_info.stats.min_value + column_max_value_str = column_info.stats.max_value + if column_min_value_str is None or column_max_value_str is None: + # If either is None, then both need to be None. + assert column_min_value_str is None and column_max_value_str is None + # No min and max value, all pages need to be null + for idx, null_page in enumerate(column_index.null_pages): + assert null_page, "Page {} of column {} is not null, \ + but doesn't have min and max values!".format(idx, column_index.schema.name) + # Everything is None, no further checks needed. + return + + column_min_value = decode_stats_value(column_info.schema, column_min_value_str) + for null_page, page_min_str in zip(column_index.null_pages, min_values): + if not null_page: + page_min_value = decode_stats_value(column_info.schema, page_min_str) + # If type is str, page_min_value might have been truncated. + if isinstance(page_min_value, basestring): + assert page_min_value >= column_min_value[:len(page_min_value)] + else: + assert page_min_value >= column_min_value + + column_max_value = decode_stats_value(column_info.schema, column_max_value_str) + for null_page, page_max_str in zip(column_index.null_pages, max_values): + if not null_page: + page_max_value = decode_stats_value(column_info.schema, page_max_str) + # If type is str, page_max_value might have been truncated and incremented. + if (isinstance(page_max_value, basestring) and + len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH): + max_val_prefix = page_max_value.rstrip('\0') + assert max_val_prefix[:-1] <= column_max_value + else: + assert page_max_value <= column_max_value + + def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values): + """Check if the ordering of the values reflects the value of 'ordering'.""" + + def is_sorted(l, reverse=False): + if not reverse: + return all(a <= b for a, b in zip(l, l[1:])) + else: + return all(a >= b for a, b in zip(l, l[1:])) + + # Filter out null pages and decode the actual min/max values. + actual_min_values = [decode_stats_value(schema, min_val) + for min_val, is_null in zip(min_values, null_pages) + if not is_null] + actual_max_values = [decode_stats_value(schema, max_val) + for max_val, is_null in zip(max_values, null_pages) + if not is_null] + + # For ASCENDING and DESCENDING, both min and max values need to be sorted. + if ordering == BoundaryOrder.ASCENDING: + assert is_sorted(actual_min_values) + assert is_sorted(actual_max_values) + elif ordering == BoundaryOrder.DESCENDING: + assert is_sorted(actual_min_values, reverse=True) + assert is_sorted(actual_max_values, reverse=True) + else: + assert ordering == BoundaryOrder.UNORDERED + # For UNORDERED, min and max values cannot be both sorted. + assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values) + assert (not is_sorted(actual_min_values, reverse=True) or + not is_sorted(actual_max_values, reverse=True)) + + def _validate_boundary_order(self, column_info): + """Validate that min/max values are really in the order specified by + boundary order. + """ + column_index = column_info.column_index + self._validate_ordering(column_index.boundary_order, column_info.schema, + column_index.null_pages, column_index.min_values, column_index.max_values) + + def _validate_parquet_page_index(self, hdfs_path, tmpdir): + """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup + index in that file is in the valid format. + """ + row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir) + for columns in row_group_indexes: + for column_info in columns: + try: + index_size = len(column_info.offset_index.page_locations) + assert index_size > 0 + self._validate_page_locations(column_info.offset_index.page_locations) + self._validate_null_stats(index_size, column_info) + self._validate_min_max_values(index_size, column_info) + self._validate_boundary_order(column_info) + except AssertionError as e: + e.args += ("Validation failed on column {}.".format(column_info.schema.name),) + raise + + def _ctas_table_and_verify_index(self, vector, unique_database, source_table, + tmpdir, sorting_column=None): + """Copies 'source_table' into a parquet table and makes sure that the index + in the resulting parquet file is valid. + """ + table_name = "test_hdfs_parquet_table_writer" + qualified_table_name = "{0}.{1}".format(unique_database, table_name) + hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, + table_name)) + # Setting num_nodes = 1 ensures that the query is executed on the coordinator, + # resulting in a single parquet file being written. + vector.get_value('exec_option')['num_nodes'] = 1 + self.execute_query("drop table if exists {0}".format(qualified_table_name)) + if sorting_column is None: + query = ("create table {0} stored as parquet as select * from {1}").format( + qualified_table_name, source_table) + else: + query = ("create table {0} sort by({1}) stored as parquet as select * from {2}" + ).format(qualified_table_name, sorting_column, source_table) + self.execute_query(query, vector.get_value('exec_option')) + self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table)) + + def _create_string_table_with_values(self, vector, unique_database, table_name, + values_sql): + """Creates a parquet table that has a single string column, then invokes an insert + statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')". + It returns the HDFS path for the table. + """ + qualified_table_name = "{0}.{1}".format(unique_database, table_name) + self.execute_query("drop table if exists {0}".format(qualified_table_name)) + vector.get_value('exec_option')['num_nodes'] = 1 + query = ("create table {0} (str string) stored as parquet").format(qualified_table_name) + self.execute_query(query, vector.get_value('exec_option')) + self.execute_query("insert into {0} values {1}".format(qualified_table_name, + values_sql), vector.get_value('exec_option')) + return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, + table_name)) + + def test_write_index_alltypes(self, vector, unique_database, tmpdir): + """Test that writing a parquet file populates the rowgroup indexes with the correct + values. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes", + tmpdir) + + def test_write_index_decimals(self, vector, unique_database, tmpdir): + """Test that writing a parquet file populates the rowgroup indexes with the correct + values, using decimal types. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl", + tmpdir) + + def test_write_index_chars(self, vector, unique_database, tmpdir): + """Test that writing a parquet file populates the rowgroup indexes with the correct + values, using char types. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats", + tmpdir) + + def test_write_index_null(self, vector, unique_database, tmpdir): + """Test that we don't write min/max values in the index for null columns. + Ensure null_count is set for columns with null values. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable", + tmpdir) + + def test_write_index_multi_page(self, vector, unique_database, tmpdir): + """Test that when a ColumnChunk is written across multiple pages, the index is + valid. + """ + self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer", + tmpdir) + self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders", + tmpdir) + + def test_write_index_sorting_column(self, vector, unique_database, tmpdir): + """Test that when the schema has a sorting column, the index is valid.""" + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.zipcode_incomes", tmpdir, "id") + + def test_write_index_wide_table(self, vector, unique_database, tmpdir): + """Test table with wide row.""" + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widerow", tmpdir) + + def test_write_index_many_columns_tables(self, vector, unique_database, tmpdir): + """Test tables with wide rows and many columns.""" + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widetable_250_cols", tmpdir) + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widetable_500_cols", tmpdir) + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widetable_1000_cols", tmpdir) + + def test_max_string_values(self, vector, unique_database, tmpdir): + """Test string values that are all 0xFFs or end with 0xFFs.""" + + # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH. + short_tbl = "short_tbl" + short_hdfs_path = self._create_string_table_with_values(vector, unique_database, + short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1)) + self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl)) + + # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH. + fit_tbl = "fit_tbl" + fit_hdfs_path = self._create_string_table_with_values(vector, unique_database, + fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH)) + self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl)) + + # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we + # should not write page statistics. + too_long_tbl = "too_long_tbl" + too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database, + too_long_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1)) + row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path, + tmpdir.join(too_long_tbl)) + column = row_group_indexes[0][0] + assert column.column_index is None + # We always write the offset index + assert column.offset_index is not None + + # Test string with value that starts with 'aaa' following with 0xFFs and its length is + # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'. + aaa_tbl = "aaa_tbl" + aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database, + aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1)) + row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path, + tmpdir.join(aaa_tbl)) + column = row_group_indexes[0][0] + assert len(column.column_index.max_values) == 1 + max_value = column.column_index.max_values[0] + assert max_value == 'aab' http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/tests/util/get_parquet_metadata.py ---------------------------------------------------------------------- diff --git a/tests/util/get_parquet_metadata.py b/tests/util/get_parquet_metadata.py index 8cf0405..f6a0e59 100644 --- a/tests/util/get_parquet_metadata.py +++ b/tests/util/get_parquet_metadata.py @@ -20,13 +20,21 @@ import struct from datetime import date, datetime, time, timedelta from decimal import Decimal -from parquet.ttypes import FileMetaData, Type +from parquet.ttypes import ColumnIndex, FileMetaData, OffsetIndex, PageHeader, Type from thrift.protocol import TCompactProtocol from thrift.transport import TTransport PARQUET_VERSION_NUMBER = 'PAR1' +def create_protocol(serialized_object_buffer): + """Creates a thrift protocol object from a memory buffer. The buffer should + contain a serialized thrift object. + """ + transport = TTransport.TMemoryBuffer(serialized_object_buffer) + return TCompactProtocol.TCompactProtocol(transport) + + def julian_day_to_date(julian_day): """Converts a julian day number into a Gregorian date. The reference date is picked arbitrarily and can be validated with an online converter like @@ -71,7 +79,8 @@ def parse_double(s): def decode_timestamp(s): """Reinterprets the string 's' as a 12-byte timestamp as written by Impala and decode it - into a datetime object.""" + into a datetime object. + """ # Impala writes timestamps as 12-byte values. The first 8 byte store a # boost::posix_time::time_duration, which is the time within the current day in # nanoseconds stored as int64. The last 4 bytes store a boost::gregorian::date, @@ -99,7 +108,8 @@ def decode_decimal(schema, value): def decode_stats_value(schema, value): """Decodes 'value' according to 'schema. It expects 'value' to be plain encoded. For BOOLEAN values, only the least significant bit is parsed and returned. Binary arrays are - expected to be stored as such, without a preceding length.""" + expected to be stored as such, without a preceding length. + """ column_type = schema.type if column_type == Type.BOOLEAN: return parse_boolean(value) @@ -123,9 +133,22 @@ def decode_stats_value(schema, value): return None +def read_serialized_object(thrift_class, file, file_pos, length): + """Reads an instance of class 'thrift_class' from an already opened file at the + given position. + """ + file.seek(file_pos) + serialized_thrift_object = file.read(length) + protocol = create_protocol(serialized_thrift_object) + thrift_object = thrift_class() + thrift_object.read(protocol) + return thrift_object + + def get_parquet_metadata(filename): """Returns a FileMetaData as defined in parquet.thrift. 'filename' must be a local - file path.""" + file path. + """ file_size = os.path.getsize(filename) with open(filename) as f: # Check file starts and ends with magic bytes @@ -140,13 +163,8 @@ def get_parquet_metadata(filename): f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4) metadata_len = parse_int32(f.read(4)) - # Read metadata - f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len) - serialized_metadata = f.read(metadata_len) + # Calculate metadata position in file + metadata_pos = file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len - # Deserialize metadata - transport = TTransport.TMemoryBuffer(serialized_metadata) - protocol = TCompactProtocol.TCompactProtocol(transport) - metadata = FileMetaData() - metadata.read(protocol) - return metadata + # Return deserialized FileMetaData object + return read_serialized_object(FileMetaData, f, metadata_pos, metadata_len)