IMPALA-7477: Batch-oriented query set construction Rework the row-by-row construction of query result sets in PlanRootSink so that it materialises an output column at a time. Make some minor optimisations like preallocating output vectors and initialising strings more efficiently.
My intent is both to make this faster and to make the QueryResultSet interface better before IMPALA-4268 does a bunch of surgery on this part of the code. Testing: Ran core tests. Perf: Downloaded tpch_parquet.orders via JDBC driver. Before: 3.01s, After: 2.57s. Downloaded l_orderkey from tpch_parquet.lineitem. Before: 1.21s, After: 1.08s. Change-Id: I764fa302842438902cd5db2551ec6e3cb77b6874 Reviewed-on: http://gerrit.cloudera.org:8080/11879 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0f63b2c9 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0f63b2c9 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0f63b2c9 Branch: refs/heads/branch-3.1.0 Commit: 0f63b2c9f9d62b0d22191f454b672a6047206252 Parents: d8b1e43 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Sun Aug 19 00:48:47 2018 -0700 Committer: Zoltan Borok-Nagy <borokna...@cloudera.com> Committed: Tue Nov 13 12:51:39 2018 +0100 ---------------------------------------------------------------------- be/src/exec/plan-root-sink.cc | 26 +-- be/src/exec/plan-root-sink.h | 4 - be/src/service/hs2-util.cc | 318 ++++++++++++++++++++++++-------- be/src/service/hs2-util.h | 15 +- be/src/service/query-result-set.cc | 115 +++++++----- be/src/service/query-result-set.h | 14 +- 6 files changed, 331 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/exec/plan-root-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc index a64dbb9..1f5b2e5 100644 --- a/be/src/exec/plan-root-sink.cc +++ b/be/src/exec/plan-root-sink.cc @@ -79,22 +79,11 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { // Otherwise the consumer is ready. Fill out the rows. DCHECK(results_ != nullptr); - // List of expr values to hold evaluated rows from the query - vector<void*> result_row; - result_row.resize(output_exprs_.size()); - - // List of scales for floating point values in result_row - vector<int> scales; - scales.resize(result_row.size()); - int num_to_fetch = batch->num_rows() - current_batch_row; if (num_rows_requested_ > 0) num_to_fetch = min(num_to_fetch, num_rows_requested_); - for (int i = 0; i < num_to_fetch; ++i) { - TupleRow* row = batch->GetRow(current_batch_row); - GetRowValue(row, &result_row, &scales); - RETURN_IF_ERROR(results_->AddOneRow(result_row, scales)); - ++current_batch_row; - } + RETURN_IF_ERROR( + results_->AddRows(output_expr_evals_, batch, current_batch_row, num_to_fetch)); + current_batch_row += num_to_fetch; // Prevent expr result allocations from accumulating. expr_results_pool_->Clear(); // Signal the consumer. @@ -146,13 +135,4 @@ Status PlanRootSink::GetNext( *eos = sender_state_ == SenderState::EOS; return state->GetQueryStatus(); } - -void PlanRootSink::GetRowValue( - TupleRow* row, vector<void*>* result, vector<int>* scales) { - DCHECK_GE(result->size(), output_expr_evals_.size()); - for (int i = 0; i < output_expr_evals_.size(); ++i) { - (*result)[i] = output_expr_evals_[i]->GetValue(row); - (*scales)[i] = output_expr_evals_[i]->output_scale(); - } -} } http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/exec/plan-root-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h index 1d64b21..300c993 100644 --- a/be/src/exec/plan-root-sink.h +++ b/be/src/exec/plan-root-sink.h @@ -118,10 +118,6 @@ class PlanRootSink : public DataSink { /// Set by GetNext() to indicate to Send() how many rows it should write to results_. int num_rows_requested_ = 0; - - /// Writes a single row into 'result' and 'scales' by evaluating - /// output_expr_evals_ over 'row'. - void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales); }; } http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/hs2-util.cc ---------------------------------------------------------------------- diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc index b856556..66a76bd 100644 --- a/be/src/service/hs2-util.cc +++ b/be/src/service/hs2-util.cc @@ -18,9 +18,12 @@ #include "service/hs2-util.h" #include "common/logging.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/decimal-value.inline.h" #include "runtime/raw-value.inline.h" +#include "runtime/row-batch.h" #include "runtime/types.h" +#include "util/bit-util.h" #include <gutil/strings/substitute.h> @@ -49,7 +52,9 @@ inline bool GetNullBit(const string& nulls, uint32_t row_idx) { void impala::StitchNulls(uint32_t num_rows_before, uint32_t num_rows_added, uint32_t start_idx, const string& from, string* to) { - to->reserve((num_rows_before + num_rows_added + 7) / 8); + // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated + // small increases in size. + to->reserve(BitUtil::RoundUpToPowerOfTwo((num_rows_before + num_rows_added + 7) / 8)); // TODO: This is very inefficient, since we could conceivably go one byte at a time // (although the operands should stay live in registers in the loop). However doing this @@ -118,106 +123,257 @@ void impala::TColumnValueToHS2TColumn(const TColumnValue& col_val, SetNullBit(row_idx, is_null, nulls); } +// Specialised per-type implementations of ExprValuesToHS2TColumn. + +// Helper to reserve space in hs2Vals->values and hs2Vals->nulls for the values that the +// different implementations of ExprValuesToHS2TColumn will write. +template <typename T> +void ReserveSpace(int start_idx, int num_rows, uint32_t output_row_idx, T* hs2Vals) { + int64_t num_output_rows = output_row_idx + num_rows - start_idx; + int64_t num_null_bytes = BitUtil::RoundUpNumBytes(num_output_rows); + // Round up reserve() arguments to power-of-two to avoid accidentally quadratic + // behaviour from repeated small increases in size. + hs2Vals->values.reserve(BitUtil::RoundUpToPowerOfTwo(num_output_rows)); + hs2Vals->nulls.reserve(BitUtil::RoundUpToPowerOfTwo(num_null_bytes)); +} + +// Implementation for BOOL. +static void BoolExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, + int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->boolVal); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + BooleanVal val = expr_eval->GetBooleanVal(it.Get()); + column->boolVal.values.push_back(val.val); + SetNullBit(output_row_idx, val.is_null, &column->boolVal.nulls); + ++output_row_idx; + } +} + +// Implementation for TINYINT. +static void TinyIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, + int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->byteVal); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + TinyIntVal val = expr_eval->GetTinyIntVal(it.Get()); + column->byteVal.values.push_back(val.val); + SetNullBit(output_row_idx, val.is_null, &column->byteVal.nulls); + ++output_row_idx; + } +} + +// Implementation for SMALLINT. +static void SmallIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, + RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->i16Val); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + SmallIntVal val = expr_eval->GetSmallIntVal(it.Get()); + column->i16Val.values.push_back(val.val); + SetNullBit(output_row_idx, val.is_null, &column->i16Val.nulls); + ++output_row_idx; + } +} + +// Implementation for INT. +static void IntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, + int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->i32Val); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + IntVal val = expr_eval->GetIntVal(it.Get()); + column->i32Val.values.push_back(val.val); + SetNullBit(output_row_idx, val.is_null, &column->i32Val.nulls); + ++output_row_idx; + } +} + +// Implementation for BIGINT. +static void BigIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, + int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->i64Val); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + BigIntVal val = expr_eval->GetBigIntVal(it.Get()); + column->i64Val.values.push_back(val.val); + SetNullBit(output_row_idx, val.is_null, &column->i64Val.nulls); + ++output_row_idx; + } +} + +// Implementation for FLOAT. +static void FloatExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, + int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + FloatVal val = expr_eval->GetFloatVal(it.Get()); + column->doubleVal.values.push_back(val.val); + SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls); + ++output_row_idx; + } +} + +// Implementation for DOUBLE. +static void DoubleExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, + int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + DoubleVal val = expr_eval->GetDoubleVal(it.Get()); + column->doubleVal.values.push_back(val.val); + SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls); + ++output_row_idx; + } +} + +// Implementation for TIMESTAMP. +static void TimestampExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, + RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + TimestampVal val = expr_eval->GetTimestampVal(it.Get()); + column->stringVal.values.emplace_back(); + if (!val.is_null) { + TimestampValue value = TimestampValue::FromTimestampVal(val); + RawValue::PrintValue( + &value, TYPE_TIMESTAMP, -1, &(column->stringVal.values.back())); + } + SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); + ++output_row_idx; + } +} + +// Implementation for STRING and VARCHAR. +static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, + int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + StringVal val = expr_eval->GetStringVal(it.Get()); + if (val.is_null) { + column->stringVal.values.emplace_back(); + } else { + column->stringVal.values.emplace_back(reinterpret_cast<char*>(val.ptr), val.len); + } + SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); + ++output_row_idx; + } +} + +// Implementation for CHAR. +static void CharExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, + const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, + uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); + ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + StringVal val = expr_eval->GetStringVal(it.Get()); + if (val.is_null) { + column->stringVal.values.emplace_back(); + } else { + column->stringVal.values.emplace_back( + reinterpret_cast<const char*>(val.ptr), char_type.len); + } + SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); + ++output_row_idx; + } +} + +static void DecimalExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, + const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, + uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { + ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + DecimalVal val = expr_eval->GetDecimalVal(it.Get()); + const ColumnType& decimalType = ColumnType::FromThrift(type); + if (val.is_null) { + column->stringVal.values.emplace_back(); + } else { + switch (decimalType.GetByteSize()) { + case 4: + column->stringVal.values.emplace_back( + Decimal4Value(val.val4).ToString(decimalType)); + break; + case 8: + column->stringVal.values.emplace_back( + Decimal8Value(val.val8).ToString(decimalType)); + break; + case 16: + column->stringVal.values.emplace_back( + Decimal16Value(val.val16).ToString(decimalType)); + break; + default: + DCHECK(false) << "bad type: " << decimalType; + } + } + SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); + ++output_row_idx; + } +} + // For V6 and above -void impala::ExprValueToHS2TColumn(const void* value, const TColumnType& type, - uint32_t row_idx, thrift::TColumn* column) { - string* nulls; +void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, + const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, + uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { + // Dispatch to a templated function for the loop over rows. This avoids branching on + // the type for every row. + // TODO: instead of relying on stamped out implementations, we could codegen this loop + // to inline the expression evaluation into the loop body. switch (type.types[0].scalar_type.type) { case TPrimitiveType::NULL_TYPE: case TPrimitiveType::BOOLEAN: - column->boolVal.values.push_back( - value == NULL ? false : *reinterpret_cast<const bool*>(value)); - nulls = &column->boolVal.nulls; - break; + BoolExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::TINYINT: - column->byteVal.values.push_back( - value == NULL ? 0 : *reinterpret_cast<const int8_t*>(value)); - nulls = &column->byteVal.nulls; - break; + TinyIntExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::SMALLINT: - column->i16Val.values.push_back( - value == NULL ? 0 : *reinterpret_cast<const int16_t*>(value)); - nulls = &column->i16Val.nulls; - break; + SmallIntExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::INT: - column->i32Val.values.push_back( - value == NULL ? 0 : *reinterpret_cast<const int32_t*>(value)); - nulls = &column->i32Val.nulls; - break; + IntExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::BIGINT: - column->i64Val.values.push_back( - value == NULL ? 0 : *reinterpret_cast<const int64_t*>(value)); - nulls = &column->i64Val.nulls; - break; + BigIntExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::FLOAT: - column->doubleVal.values.push_back( - value == NULL ? 0.f : *reinterpret_cast<const float*>(value)); - nulls = &column->doubleVal.nulls; - break; + FloatExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::DOUBLE: - column->doubleVal.values.push_back( - value == NULL ? 0.0 : *reinterpret_cast<const double*>(value)); - nulls = &column->doubleVal.nulls; - break; + DoubleExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::TIMESTAMP: - column->stringVal.values.push_back(""); - if (value != NULL) { - RawValue::PrintValue(value, TYPE_TIMESTAMP, -1, - &(column->stringVal.values.back())); - } - nulls = &column->stringVal.nulls; - break; + TimestampExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::STRING: case TPrimitiveType::VARCHAR: - column->stringVal.values.push_back(""); - if (value != NULL) { - const StringValue* str_val = reinterpret_cast<const StringValue*>(value); - column->stringVal.values.back().assign( - static_cast<char*>(str_val->ptr), str_val->len); - } - nulls = &column->stringVal.nulls; - break; + StringExprValuesToHS2TColumn( + expr_eval, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::CHAR: - column->stringVal.values.push_back(""); - if (value != NULL) { - ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len); - column->stringVal.values.back().assign( - reinterpret_cast<const char*>(value), char_type.len); - } - nulls = &column->stringVal.nulls; - break; + CharExprValuesToHS2TColumn( + expr_eval, type, batch, start_idx, num_rows, output_row_idx, column); + return; case TPrimitiveType::DECIMAL: { - // HiveServer2 requires decimal to be presented as string. - column->stringVal.values.push_back(""); - const ColumnType& decimalType = ColumnType::FromThrift(type); - if (value != NULL) { - switch (decimalType.GetByteSize()) { - case 4: - column->stringVal.values.back() = - reinterpret_cast<const Decimal4Value*>(value)->ToString(decimalType); - break; - case 8: - column->stringVal.values.back() = - reinterpret_cast<const Decimal8Value*>(value)->ToString(decimalType); - break; - case 16: - column->stringVal.values.back() = - reinterpret_cast<const Decimal16Value*>(value)->ToString(decimalType); - break; - default: - DCHECK(false) << "bad type: " << decimalType; - } - } - nulls = &column->stringVal.nulls; - break; + DecimalExprValuesToHS2TColumn( + expr_eval, type, batch, start_idx, num_rows, output_row_idx, column); + return; } default: DCHECK(false) << "Unhandled type: " << TypeToString(ThriftToType(type.types[0].scalar_type.type)); - return; } - - SetNullBit(row_idx, (value == NULL), nulls); } // For V1 -> V5 http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/hs2-util.h ---------------------------------------------------------------------- diff --git a/be/src/service/hs2-util.h b/be/src/service/hs2-util.h index 44ceba6..4f0f973 100644 --- a/be/src/service/hs2-util.h +++ b/be/src/service/hs2-util.h @@ -20,16 +20,23 @@ namespace impala { -/// Utility methods for converting from Impala (either an Expr result or a TColumnValue) to -/// Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->). +class RowBatch; +class ScalarExprEvaluator; + +/// Utility methods for converting from Impala (either an Expr result or a TColumnValue) +/// to Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->). /// For V6-> void TColumnValueToHS2TColumn(const TColumnValue& col_val, const TColumnType& type, uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column); +/// Evaluate 'expr_eval' over the row [start_idx, start_idx + num_rows) from 'batch' into +/// 'column' with 'type' starting at output_row_idx. The caller is responsible for +/// calling RuntimeState::GetQueryStatus() to check for expression evaluation errors. /// For V6-> -void ExprValueToHS2TColumn(const void* value, const TColumnType& type, - uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column); +void ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type, + RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, + apache::hive::service::cli::thrift::TColumn* column); /// For V1->V5 void TColumnValueToHS2TColumnValue(const TColumnValue& col_val, const TColumnType& type, http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/query-result-set.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc index aacd849..f254176 100644 --- a/be/src/service/query-result-set.cc +++ b/be/src/service/query-result-set.cc @@ -20,10 +20,13 @@ #include <sstream> #include <boost/scoped_ptr.hpp> +#include "exprs/scalar-expr-evaluator.h" #include "rpc/thrift-util.h" #include "runtime/raw-value.h" +#include "runtime/row-batch.h" #include "runtime/types.h" #include "service/hs2-util.h" +#include "util/bit-util.h" #include "common/names.h" @@ -51,18 +54,19 @@ class AsciiQueryResultSet : public QueryResultSet { virtual ~AsciiQueryResultSet() {} - /// Convert one row's expr values stored in 'col_values' to ASCII using "\t" as column + /// Evaluate 'expr_evals' over rows in 'batch', convert to ASCII using "\t" as column /// delimiter and store it in this result set. /// TODO: Handle complex types. - virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); + virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch, + int start_idx, int num_rows) override; /// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this /// result set. - virtual Status AddOneRow(const TResultRow& row); + virtual Status AddOneRow(const TResultRow& row) override; - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); - virtual int64_t ByteSize(int start_idx, int num_rows); - virtual size_t size() { return result_set_->size(); } + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override; + virtual int64_t ByteSize(int start_idx, int num_rows) override; + virtual size_t size() override { return result_set_->size(); } private: /// Metadata of the result set @@ -80,18 +84,20 @@ class HS2ColumnarResultSet : public QueryResultSet { virtual ~HS2ColumnarResultSet() {} - /// Add a row of expr values - virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); + /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 columnar + /// representation. + virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch, + int start_idx, int num_rows) override; /// Add a row from a TResultRow - virtual Status AddOneRow(const TResultRow& row); + virtual Status AddOneRow(const TResultRow& row) override; /// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows' /// from 'other' into this result set - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override; - virtual int64_t ByteSize(int start_idx, int num_rows); - virtual size_t size() { return num_rows_; } + virtual int64_t ByteSize(int start_idx, int num_rows) override; + virtual size_t size() override { return num_rows_; } private: /// Metadata of the result set @@ -119,15 +125,17 @@ class HS2RowOrientedResultSet : public QueryResultSet { virtual ~HS2RowOrientedResultSet() {} - /// Convert expr values to HS2 TRow and store it in a TRowSet. - virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); + /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 row-oriented + /// representation of TRows stored in a TRowSet. + virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch, + int start_idx, int num_rows) override; /// Convert TResultRow to HS2 TRow and store it in a TRowSet - virtual Status AddOneRow(const TResultRow& row); + virtual Status AddOneRow(const TResultRow& row) override; - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); - virtual int64_t ByteSize(int start_idx, int num_rows); - virtual size_t size() { return result_set_->rows.size(); } + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override; + virtual int64_t ByteSize(int start_idx, int num_rows) override; + virtual size_t size() override { return result_set_->rows.size(); } private: /// Metadata of the result set @@ -158,20 +166,34 @@ QueryResultSet* QueryResultSet::CreateHS2ResultSet( ////////////////////////////////////////////////////////////////////////////////////////// -Status AsciiQueryResultSet::AddOneRow( - const vector<void*>& col_values, const vector<int>& scales) { - int num_col = col_values.size(); +Status AsciiQueryResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals, + RowBatch* batch, int start_idx, int num_rows) { + DCHECK_GE(batch->num_rows(), start_idx + num_rows); + int num_col = expr_evals.size(); DCHECK_EQ(num_col, metadata_.columns.size()); + vector<int> scales; + scales.reserve(num_col); + for (ScalarExprEvaluator* expr_eval : expr_evals) { + scales.push_back(expr_eval->output_scale()); + } + // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated + // small increases in size. + result_set_->reserve( + BitUtil::RoundUpToPowerOfTwo(result_set_->size() + num_rows - start_idx)); stringstream out_stream; out_stream.precision(ASCII_PRECISION); - for (int i = 0; i < num_col; ++i) { - // ODBC-187 - ODBC can only take "\t" as the delimiter - out_stream << (i > 0 ? "\t" : ""); - DCHECK_EQ(1, metadata_.columns[i].columnType.types.size()); - RawValue::PrintValue(col_values[i], - ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], &out_stream); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + for (int i = 0; i < num_col; ++i) { + // ODBC-187 - ODBC can only take "\t" as the delimiter + out_stream << (i > 0 ? "\t" : ""); + DCHECK_EQ(1, metadata_.columns[i].columnType.types.size()); + RawValue::PrintValue(expr_evals[i]->GetValue(it.Get()), + ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], + &out_stream); + } + result_set_->push_back(out_stream.str()); + out_stream.str(""); } - result_set_->push_back(out_stream.str()); return Status::OK(); } @@ -263,16 +285,18 @@ HS2ColumnarResultSet::HS2ColumnarResultSet( InitColumns(); } -// Add a row of expr values -Status HS2ColumnarResultSet::AddOneRow( - const vector<void*>& col_values, const vector<int>& scales) { - int num_col = col_values.size(); +Status HS2ColumnarResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals, + RowBatch* batch, int start_idx, int num_rows) { + DCHECK_GE(batch->num_rows(), start_idx + num_rows); + int num_col = expr_evals.size(); DCHECK_EQ(num_col, metadata_.columns.size()); for (int i = 0; i < num_col; ++i) { - ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_, + const TColumnType& type = metadata_.columns[i].columnType; + ScalarExprEvaluator* expr_eval = expr_evals[i]; + ExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, num_rows_, &(result_set_->columns[i])); } - ++num_rows_; + num_rows_ += num_rows; return Status::OK(); } @@ -427,16 +451,21 @@ HS2RowOrientedResultSet::HS2RowOrientedResultSet( } } -Status HS2RowOrientedResultSet::AddOneRow( - const vector<void*>& col_values, const vector<int>& scales) { - int num_col = col_values.size(); +Status HS2RowOrientedResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals, + RowBatch* batch, int start_idx, int num_rows) { + DCHECK_GE(batch->num_rows(), start_idx + num_rows); + int num_col = expr_evals.size(); DCHECK_EQ(num_col, metadata_.columns.size()); - result_set_->rows.push_back(TRow()); - TRow& trow = result_set_->rows.back(); - trow.colVals.resize(num_col); - for (int i = 0; i < num_col; ++i) { - ExprValueToHS2TColumnValue( - col_values[i], metadata_.columns[i].columnType, &(trow.colVals[i])); + result_set_->rows.reserve( + BitUtil::RoundUpToPowerOfTwo(result_set_->rows.size() + num_rows - start_idx)); + FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { + result_set_->rows.push_back(TRow()); + TRow& trow = result_set_->rows.back(); + trow.colVals.resize(num_col); + for (int i = 0; i < num_col; ++i) { + ExprValueToHS2TColumnValue(expr_evals[i]->GetValue(it.Get()), + metadata_.columns[i].columnType, &(trow.colVals[i])); + } } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/0f63b2c9/be/src/service/query-result-set.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h index e0c88d7..fa39d73 100644 --- a/be/src/service/query-result-set.h +++ b/be/src/service/query-result-set.h @@ -27,6 +27,9 @@ namespace impala { +class RowBatch; +class ScalarExprEvaluator; + /// Wraps a client-API specific result representation, and implements the logic required /// to translate into that format from Impala's row format. /// @@ -36,12 +39,11 @@ class QueryResultSet { QueryResultSet() {} virtual ~QueryResultSet() {} - /// Add a single row to this result set. The row is a vector of pointers to values, - /// whose memory belongs to the caller. 'scales' contains the scales for decimal values - /// (# of digits after decimal), with -1 indicating no scale specified or the - /// corresponding value is not a decimal. - virtual Status AddOneRow( - const std::vector<void*>& row, const std::vector<int>& scales) = 0; + /// Add 'num_rows' rows to the result set, obtained by evaluating 'expr_evals' over + /// the rows in 'batch' starting at start_idx. Batch must contain at least + /// ('start_idx' + 'num_rows') rows. + virtual Status AddRows(const std::vector<ScalarExprEvaluator*>& expr_evals, + RowBatch* batch, int start_idx, int num_rows) = 0; /// Add the TResultRow to this result set. When a row comes from a DDL/metadata /// operation, the row in the form of TResultRow.