Remove unnecessary Kudu table sink BE test

Now that we have functional tests for Kudu (IMPALA-3718), we
can remove the BE Kudu table sink test which duplicate existing
coverage and are expensive to maintain.

Change-Id: Ice1924d525c363ee65418c3495ed56647a352a52
Reviewed-by: Alex Behm <>
Tested-by: Internal Jenkins


Branch: refs/heads/master
Commit: 3e23e40504000dd896fc1862809b659e41d468c1
Parents: 3eb051f
Author: Matthew Jacobs <>
Authored: Tue Oct 11 12:34:41 2016 -0700
Committer: Internal Jenkins <>
Committed: Wed Oct 12 00:06:51 2016 +0000

 be/src/exec/CMakeLists.txt          |   1 -
 be/src/exec/ | 314 -------------------------------
 be/src/exec/kudu-testutil.h         | 262 --------------------------
 3 files changed, 577 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index b2d9663..571198f 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -107,5 +107,4 @@ ADD_BE_TEST(parquet-plain-test)
diff --git a/be/src/exec/ 
deleted file mode 100644
index a1cbb68..0000000
--- a/be/src/exec/
+++ /dev/null
@@ -1,314 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "exec/kudu-testutil.h"
-#include "common/init.h"
-#include "codegen/llvm-codegen.h"
-#include "exec/kudu-table-sink.h"
-#include "exec/kudu-util.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "gen-cpp/Types_types.h"
-#include "gutil/strings/split.h"
-#include "gutil/stl_util.h"
-#include "kudu/client/row_result.h"
-#include "runtime/descriptors.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/tuple-row.h"
-#include "service/fe-support.h"
-#include "testutil/desc-tbl-builder.h"
-#include "testutil/test-macros.h"
-#include "util/cpu-info.h"
-#include "util/test-info.h"
-using apache::thrift::ThriftDebugString;
-namespace impala {
-static const char* BASE_TABLE_NAME = "TestInsertNodeTable";
-static const int FIRST_SLOT_ID = 1;
-static const int SECOND_SLOT_ID = 2;
-static const int THIRD_SLOT_ID = 3;
-class KuduTableSinkTest : public testing::Test {
- public:
-  KuduTableSinkTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_) 
-  virtual void SetUp() {
-    // Create a Kudu client and the table (this will abort the test here
-    // if a Kudu cluster is not available).
-    kudu_test_helper_.CreateClient();
-    kudu_test_helper_.CreateTable(BASE_TABLE_NAME);
-    // Initialize the environment/runtime so that we can use a scan node in
-    // isolation.
-    Status s = exec_env_.InitForFeTests();
-    DCHECK(s.ok());
-    runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1);
-    exec_env_.disk_io_mgr()->Init(&mem_tracker_);
-  }
-  void BuildRuntimeState(int num_cols_to_insert,
-      TSinkAction::type sink_action) {
-    TTableSink table_sink;
-    table_sink.__set_target_table_id(0);
-    table_sink.__set_action(sink_action);
-    // For tests ignore not found keys in delete test. Other paths are 
exercised via
-    // end-to-end tests.
-    TKuduTableSink kudu_table_sink;
-    kudu_table_sink.__set_ignore_not_found_or_duplicate(true);
-    table_sink.__set_kudu_table_sink(kudu_table_sink);
-    data_sink_.__set_type(TDataSinkType::TABLE_SINK);
-    data_sink_.__set_table_sink(table_sink);
-    kudu_test_helper_.CreateTableDescriptor(num_cols_to_insert, &desc_tbl_);
-    row_desc_ = obj_pool_.Add(
-        new RowDescriptor(*desc_tbl_,
-                          boost::assign::list_of(0),
-                          boost::assign::list_of(false)));
-    runtime_state_.set_desc_tbl(desc_tbl_);
-  }
-  void CreateTExprNode(int slot_id, TPrimitiveType::type type, TExpr* expr) {
-    TExprNode expr_node;
-    expr_node.node_type = TExprNodeType::SLOT_REF;
-    expr_node.type.types.push_back(TTypeNode());
-    expr_node.type.types.back().__isset.scalar_type = true;
-    expr_node.type.types.back().scalar_type.type = type;
-    expr_node.num_children = 0;
-    TSlotRef slot_ref;
-    slot_ref.slot_id = slot_id;
-    expr_node.__set_slot_ref(slot_ref);
-    expr->nodes.push_back(expr_node);
-  }
-  void CreateTExpr(int num_cols_to_insert, vector<TExpr>* exprs) {
-    DCHECK(num_cols_to_insert > 0 && num_cols_to_insert <= 3);
-    TExpr expr_1;
-    CreateTExprNode(FIRST_SLOT_ID, TPrimitiveType::INT, &expr_1);
-    exprs->push_back(expr_1);
-    if (num_cols_to_insert == 1) return;
-    TExpr expr_2;
-    CreateTExprNode(SECOND_SLOT_ID, TPrimitiveType::INT, &expr_2);
-    exprs->push_back(expr_2);
-    if (num_cols_to_insert == 2) return;
-    TExpr expr_3;
-    CreateTExprNode(THIRD_SLOT_ID, TPrimitiveType::STRING, &expr_3);
-    exprs->push_back(expr_3);
-  }
-  // Create a batch and fill it according to the tuple descriptor.
-  // Parameters:
-  //   - first_row - offset used to calculate the values to be written
-  //   - batch_size - maximum number of rows to generate
-  //   - factor - multiplier used to modify the value to be written, used in 
update tests
-  //   - val - free string value passed to the string column
-  //   - skip_val - skips rows where  (row_pos % skip_val) == 0
-  RowBatch* CreateRowBatch(int first_row, int batch_size, int factor, string 
-      int skip_val) {
-    DCHECK(desc_tbl_->GetTupleDescriptor(0) != NULL);
-    DCHECK_GE(skip_val, 1);
-    TupleDescriptor* tuple_desc = desc_tbl_->GetTupleDescriptor(0);
-    RowBatch* batch = new RowBatch(*row_desc_, batch_size, &mem_tracker_);
-    int tuple_buffer_size = batch->capacity() * tuple_desc->byte_size();
-    void* tuple_buffer_ = 
-    DCHECK(tuple_buffer_ != NULL);
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer_);
-    memset(tuple_buffer_, 0, tuple_buffer_size);
-    for (int i = 0; i < batch_size; ++i) {
-      if (skip_val != 1 && ((i + first_row) % skip_val) == 0) continue;
-      int idx = batch->AddRow();
-      TupleRow* row = batch->GetRow(idx);
-      row->SetTuple(0, tuple);
-      for (int j = 0; j < tuple_desc->slots().size(); j++) {
-        void* slot = tuple->GetSlot(tuple_desc->slots()[j]->tuple_offset());
-        DCHECK(slot != NULL);
-        switch(j) {
-          case 0: {
-            int32_t* int_slot = reinterpret_cast<int32_t*>(slot);
-            *int_slot = first_row + i;
-            break;
-          }
-          case 1: {
-            int32_t* int_slot = reinterpret_cast<int32_t*>(slot);
-            *int_slot = (first_row + i) * factor;
-            break;
-          }
-          case 2: {
-            string value = strings::Substitute("$0_$1", val, first_row + i);
-            char* buffer = reinterpret_cast<char*>(
-                batch->tuple_data_pool()->TryAllocate(value.size()));
-            DCHECK(buffer != NULL);
-            memcpy(buffer,, value.size());
-            reinterpret_cast<StringValue*>(slot)->ptr = buffer;
-            reinterpret_cast<StringValue*>(slot)->len = value.size();
-            break;
-          }
-          default:
-            DCHECK(false) << "Wrong number of slots.";
-        }
-      }
-      batch->CommitLastRow();
-      uint8_t* mem = reinterpret_cast<uint8_t*>(tuple);
-      tuple = reinterpret_cast<Tuple*>(mem + tuple_desc->byte_size());
-    }
-    return batch;
-  }
-  void Verify(int num_columns, int expected_num_rows, int factor, string val,
-      int skip_val) {
-    kudu::client::KuduScanner scanner(kudu_test_helper_.table().get());
-    KUDU_ASSERT_OK(scanner.SetFaultTolerant());
-    KUDU_ASSERT_OK(scanner.Open());
-    int row_idx = 0;
-    while (scanner.HasMoreRows()) {
-      vector<kudu::client::KuduRowResult> rows;
-      KUDU_ASSERT_OK(scanner.NextBatch(&rows));
-      for (const kudu::client::KuduRowResult& row: rows) {
-        switch(num_columns) {
-          case 1:
-            ASSERT_EQ(row.ToString(), strings::Substitute(
-                "(int32 key=$0, int32 int_val=NULL, string string_val=NULL)",
-                row_idx * skip_val));
-            break;
-          case 2:
-            ASSERT_EQ(row.ToString(), strings::Substitute(
-                "(int32 key=$0, int32 int_val=$1, string string_val=NULL)",
-                row_idx * skip_val, row_idx * skip_val * factor));
-            break;
-          case 3:
-            ASSERT_EQ(row.ToString(), strings::Substitute(
-                "(int32 key=$0, int32 int_val=$1, string string_val=$2_$3)",
-                row_idx * skip_val, row_idx * skip_val * factor, val, row_idx 
* skip_val));
-            break;
-        }
-        ++row_idx;
-      }
-    }
-    ASSERT_EQ(row_idx,
-        skip_val == 1 ? expected_num_rows : (expected_num_rows + 1) / 
-  }
-  void WriteAndVerify(int num_columns, TSinkAction::type type, int factor, 
string val,
-      int skip_val) {
-    const int kNumRowsPerBatch = 10;
-    // For deletes only populate the key column, in other cases populate all 
-    int schema_cols = num_columns;
-    if (type == TSinkAction::DELETE) schema_cols = 1;
-    BuildRuntimeState(schema_cols, type);
-    vector<TExpr> exprs;
-    CreateTExpr(schema_cols, &exprs);
-    KuduTableSink sink(*row_desc_, exprs, data_sink_);
-    ASSERT_OK(sink.Prepare(&runtime_state_, &mem_tracker_));
-    ASSERT_OK(sink.Open(&runtime_state_));
-    vector<RowBatch*> row_batches;
-    row_batches.push_back(CreateRowBatch(0, kNumRowsPerBatch, factor, val, 
-    ASSERT_OK(sink.Send(&runtime_state_, row_batches.front()));
-    row_batches.push_back(CreateRowBatch(kNumRowsPerBatch, kNumRowsPerBatch, 
factor, val,
-                                         skip_val));
-    ASSERT_OK(sink.Send(&runtime_state_,row_batches.back()));
-    ASSERT_OK(sink.FlushFinal(&runtime_state_));
-    STLDeleteElements(&row_batches);
-    sink.Close(&runtime_state_);
-    Verify(num_columns, 2 * kNumRowsPerBatch, factor, val, skip_val);
-  }
-  void InsertAndVerify(int num_columns) {
-    WriteAndVerify(num_columns, TSinkAction::INSERT, 2, "hello", 1);
-  }
-  void UpdateAndVerify(int num_columns) {
-    WriteAndVerify(num_columns, TSinkAction::UPDATE, 3, "world", 1);
-  }
-  void DeleteAndVerify(int num_columns, int skip_val) {
-    WriteAndVerify(num_columns, TSinkAction::DELETE, 2, "hello", skip_val);
-  }
-  virtual void TearDown() {
-    kudu_test_helper_.DeleteTable();
-  }
- protected:
-  KuduTestHelper kudu_test_helper_;
-  MemTracker mem_tracker_;
-  ObjectPool obj_pool_;
-  ExecEnv exec_env_;
-  RuntimeState runtime_state_;
-  TDataSink data_sink_;
-  TTableDescriptor t_tbl_desc_;
-  DescriptorTbl* desc_tbl_;
-  RowDescriptor* row_desc_;
-TEST_F(KuduTableSinkTest, TestInsertJustKey) {
-  InsertAndVerify(1);
-TEST_F(KuduTableSinkTest, TestInsertTwoCols) {
-  InsertAndVerify(2);
-TEST_F(KuduTableSinkTest, TestInsertAllCols) {
-  InsertAndVerify(3);
-TEST_F(KuduTableSinkTest, UpdateTwoCols) {
-  InsertAndVerify(2);
-  UpdateAndVerify(2);
-TEST_F(KuduTableSinkTest, UpdateAllCols) {
-  InsertAndVerify(3);
-  UpdateAndVerify(3);
-TEST_F(KuduTableSinkTest, DeleteModThree) {
-  // 3 cols, delete all rows idx % 3 != 0
-  InsertAndVerify(3);
-  DeleteAndVerify(3, 3);
-TEST_F(KuduTableSinkTest, DeleteModThreeTwice) {
-  // 3 cols, delete all rows idx % 3 != 0
-  InsertAndVerify(3);
-  DeleteAndVerify(3, 3);
-  // Deleting the same rows does not have an impact
-  DeleteAndVerify(3, 3);
-} // namespace impala
-int main(int argc, char** argv) {
-  if (!impala::KuduClientIsSupported()) return 0;
-  ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
-  impala::InitFeSupport();
-  impala::InitKuduLogging();
-  impala::LlvmCodeGen::InitializeLlvm();
-  return RUN_ALL_TESTS();
diff --git a/be/src/exec/kudu-testutil.h b/be/src/exec/kudu-testutil.h
deleted file mode 100644
index 1fd2c41..0000000
--- a/be/src/exec/kudu-testutil.h
+++ /dev/null
@@ -1,262 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include <boost/assign/list_of.hpp>
-#include <gtest/gtest.h>
-#include <kudu/client/client.h>
-#include <kudu/util/slice.h>
-#include <kudu/util/status.h>
-#include <string>
-#include <tr1/memory>
-#include <vector>
-#include "common/object-pool.h"
-#include "gutil/gscoped_ptr.h"
-#include "runtime/exec-env.h"
-#include "testutil/desc-tbl-builder.h"
-#include "common/names.h"
-typedef kudu::Status KuduStatus;
-typedef impala::Status ImpalaStatus;
-namespace impala {
-using kudu::client::KuduClient;
-using kudu::client::KuduClientBuilder;
-using kudu::client::KuduColumnSchema;
-using kudu::client::KuduInsert;
-using kudu::client::KuduSchema;
-using kudu::client::KuduSchemaBuilder;
-using kudu::client::KuduSession;
-using kudu::client::KuduTable;
-using kudu::KuduPartialRow;
-using kudu::Slice;
-#define KUDU_ASSERT_OK(status) \
-  do { \
-    KuduStatus _s = status; \
-    if (_s.ok()) { \
-      SUCCEED(); \
-    } else { \
-      FAIL() << "Bad Kudu Status: " << _s.ToString();  \
-    } \
-  } while (0);
-// Helper class to assist in tests agains a Kudu cluster, namely with
-// table creation/deletion with insertion of rows.
-class KuduTestHelper {
- public:
-  void CreateClient() {
-    LOG(INFO) << "Creating Kudu client.";
-    KUDU_ASSERT_OK(KuduClientBuilder()
-                   .add_master_server_addr("")
-                   .Build(&client_));
-    KuduSchemaBuilder builder;
-    builder.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->Nullable();
-    KUDU_ASSERT_OK(builder.Build(&test_schema_));
-  }
-  void CreateTable(const string& table_name_prefix,
-                   vector<const KuduPartialRow*>* split_rows = NULL) {
-    vector<const KuduPartialRow*> splits;
-    if (split_rows != NULL) {
-      splits = *split_rows;
-    } else {
-      splits = DefaultSplitRows();
-    }
-    // Kudu's table delete functionality is in flux, meaning a table may 
-    // after being deleted. To work around this we add the time in 
milliseconds to
-    // the required table name, making it unique. When Kudu's delete table 
-    // is solid we should change this to avoid creating, and possibly leaving, 
-    // similar tables in the local Kudu test cluster. See KUDU-676
-    struct timeval tv;
-    gettimeofday(&tv, NULL);
-    int64_t millis = tv.tv_sec * 1000 + tv.tv_usec / 1000;
-    table_name_ = strings::Substitute("$0-$1", table_name_prefix, millis);
-    while(true) {
-      LOG(INFO) << "Creating Kudu table: " << table_name_;
-      kudu::Status s = client_->NewTableCreator()->table_name(table_name_)
-                             .schema(&test_schema_)
-                             .num_replicas(3)
-                             .split_rows(splits)
-                             .Create();
-      if (s.IsAlreadyPresent()) {
-        LOG(INFO) << "Table existed, deleting. " << table_name_;
-        KUDU_ASSERT_OK(client_->DeleteTable(table_name_));
-        sleep(1);
-        continue;
-      }
-      KUDU_CHECK_OK(s);
-      KUDU_ASSERT_OK(client_->OpenTable(table_name_, &client_table_));
-      break;
-    }
-  }
-  gscoped_ptr<KuduInsert> BuildTestRow(KuduTable* table, int index, int 
num_cols) {
-    DCHECK_GT(num_cols, 0);
-    DCHECK_LE(num_cols, 3);
-    gscoped_ptr<KuduInsert> insert(table->NewInsert());
-    KuduPartialRow* row = insert->mutable_row();
-    KUDU_CHECK_OK(row->SetInt32(0, index));
-    if (num_cols > 1) KUDU_CHECK_OK(row->SetInt32(1, index * 2));
-    if (num_cols > 2) {
-      KUDU_CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello_%d", 
-    }
-    return insert.Pass();
-  }
-  void InsertTestRows(KuduClient* client, KuduTable* table, int num_rows,
-      int first_row = 0, int num_cols = 3) {
-    std::tr1::shared_ptr<KuduSession> session = client->NewSession();
-    KUDU_ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-    session->SetTimeoutMillis(10000);
-    for (int i = first_row; i < num_rows + first_row; i++) {
-      KUDU_ASSERT_OK(session->Apply(BuildTestRow(table, i, 
-      if (i % 1000 == 0) {
-        KUDU_ASSERT_OK(session->Flush());
-      }
-    }
-    KUDU_ASSERT_OK(session->Flush());
-    ASSERT_FALSE(session->HasPendingOperations());
-  }
-  void OpenTable(const string& table_name) {
-    table_name_ = table_name;
-    LOG(INFO) << "Opening Kudu table: " << table_name_;
-    KUDU_ASSERT_OK(client_->OpenTable(table_name, &client_table_));
-  }
-  void DeleteTable() {
-    LOG(INFO) << "Deleting Kudu table: " << table_name_;
-    KUDU_ASSERT_OK(client_->DeleteTable(table_name_));
-  }
-  vector<const KuduPartialRow*> DefaultSplitRows() {
-    vector<const KuduPartialRow*> keys;
-    KuduPartialRow* key = test_schema_.NewRow();
-    KUDU_CHECK_OK(key->SetInt32(0, 5));
-    keys.push_back(key);
-    return keys;
-  }
-  const string& table_name() const {
-    return table_name_;
-  }
-  const std::tr1::shared_ptr<KuduClient>& client() const {
-    return client_;
-  }
-  const std::tr1::shared_ptr<KuduTable>& table() const {
-    return client_table_;
-  }
-  const KuduSchema& test_schema() {
-    return test_schema_;
-  }
-  // Creates a test descriptor table based on the test schema.
-  // The returned DescriptorTbl will be allocated in this classe's object pool.
-  void CreateTableDescriptor(int num_cols_materialize, DescriptorTbl** 
desc_tbl) {
-    DescriptorTblBuilder desc_builder(&obj_pool_);
-    DCHECK_GE(num_cols_materialize, 0);
-    DCHECK_LE(num_cols_materialize, test_schema_.num_columns());
-    TKuduTable t_kudu_table;
-    t_kudu_table.__set_table_name(table_name());
-    t_kudu_table.__set_master_addresses(vector<string>(1, ""));
-    t_kudu_table.__set_key_columns(boost::assign::list_of("key"));
-    TTableDescriptor t_tbl_desc;
-    t_tbl_desc.__set_id(0);
-    t_tbl_desc.__set_tableType(::impala::TTableType::KUDU_TABLE);
-    t_tbl_desc.__set_kuduTable(t_kudu_table);
-    TScalarType int_scalar_type;
-    int_scalar_type.type = TPrimitiveType::INT;
-    TTypeNode int_type;
-    int_type.type = TTypeNodeType::SCALAR;
-    int_type.__set_scalar_type(int_scalar_type);
-    TColumnType int_col_type;
-    int_col_type.__set_types(vector<TTypeNode>(1, int_type));
-    TScalarType string_scalar_type;
-    string_scalar_type.type = TPrimitiveType::STRING;
-    TTypeNode string_type;
-    string_type.type = TTypeNodeType::SCALAR;
-    string_type.__set_scalar_type(string_scalar_type);
-    TColumnType string_col_type;
-    string_col_type.__set_types(vector<TTypeNode>(1, string_type));
-    vector<TColumnDescriptor> column_descriptors;
-    TupleDescBuilder& builder = desc_builder.DeclareTuple();
-    if (num_cols_materialize > 0) {
-      builder << TYPE_INT;
-      TColumnDescriptor key;
-      key.__set_name("key");
-      key.__set_type(int_col_type);
-      column_descriptors.push_back(key);
-    }
-    if (num_cols_materialize > 1) {
-      builder << TYPE_INT;
-      TColumnDescriptor int_val;
-      int_val.__set_name("int_val");
-      int_val.__set_type(int_col_type);
-      column_descriptors.push_back(int_val);
-    }
-    if (num_cols_materialize > 2) {
-      builder << TYPE_STRING;
-      TColumnDescriptor string_val;
-      string_val.__set_name("string_val");
-      string_val.__set_type(string_col_type);
-      column_descriptors.push_back(string_val);
-    }
-    t_tbl_desc.__set_columnDescriptors(column_descriptors);
-    desc_builder.SetTableDescriptor(t_tbl_desc);
-    *desc_tbl = desc_builder.Build();
-  }
- private:
-  string table_name_;
-  KuduSchema test_schema_;;
-  ObjectPool obj_pool_;
-  std::tr1::shared_ptr<KuduClient> client_;
-  std::tr1::shared_ptr<KuduTable> client_table_;
-} // namespace impala

Reply via email to