Matthew Jacobs has posted comments on this change. Change subject: PREVIEW: IMPALA-3742: partitions INSERTs into Kudu tables ......................................................................
Patch Set 1: (6 comments) just some high level comments for now since this is WIP. would you mind updating some of the planner tests so I can see what that looks like? presumably they should fail and you can grab the updated test output in /tmp/PlannerTest/ http://gerrit.cloudera.org:8080/#/c/6037/1/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS1, Line 457: kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_; : Status s = CreateKuduClient(table_desc_->kudu_master_addresses(), &client_); : kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_; : KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_), : "Unable to open Kudu table"); : kudu::client::KuduPartitionerBuilder b(table_); : kudu::client::KuduPartitioner* partitioner; we'll need to find a way to avoid doing this for every row batch PS1, Line 466: unique_ptr<kudu::KuduPartialRow> row(table_->schema().NewRow()); : for (int i = 0; i < batch->num_rows(); ++i) { : TupleRow* current_row = batch->GetRow(i); : for (int j = 0; j < partition_expr_ctxs_.size(); ++j) { : ExprContext* ctx = partition_expr_ctxs_[j]; : void* value = ctx->GetValue(current_row); : : PrimitiveType type = ctx->root()->type().type; : switch (type) { : case TYPE_VARCHAR: : case TYPE_STRING: { : StringValue* sv = reinterpret_cast<StringValue*>(value); : kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len); : KUDU_RETURN_IF_ERROR(row->SetString(j, slice), : "Could not set Kudu row value."); : break; : } : case TYPE_FLOAT: : KUDU_RETURN_IF_ERROR( : row->SetFloat(j, *reinterpret_cast<float*>(value)), : "Could not set Kudu row value."); : break; : case TYPE_DOUBLE: : KUDU_RETURN_IF_ERROR( : row->SetDouble(j, *reinterpret_cast<double*>(value)), : "Could not set Kudu row value."); : break; : case TYPE_BOOLEAN: : KUDU_RETURN_IF_ERROR( : row->SetBool(j, *reinterpret_cast<bool*>(value)), : "Could not set Kudu row value."); : break; : case TYPE_TINYINT: : KUDU_RETURN_IF_ERROR( : row->SetInt8(j, *reinterpret_cast<int8_t*>(value)), : "Could not set Kudu row value."); : break; : case TYPE_SMALLINT: : KUDU_RETURN_IF_ERROR( : row->SetInt16(j, *reinterpret_cast<int16_t*>(value)), : "Could not set Kudu row value."); : break; : case TYPE_INT: : KUDU_RETURN_IF_ERROR( : row->SetInt32(j, *reinterpret_cast<int32_t*>(value)), : "Could not set Kudu row value."); : break; : case TYPE_BIGINT: : KUDU_RETURN_IF_ERROR( : row->SetInt64(j, *reinterpret_cast<int64_t*>(value)), : "Could not set Kudu row value."); : break; : default: : return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type)); : } : } let's see if we can share some code with kudu-table-sink, at least the switch statement. we could put some stuff in kudu-util.h/cc http://gerrit.cloudera.org:8080/#/c/6037/1/common/thrift/Partitions.thrift File common/thrift/Partitions.thrift: PS1, Line 37: : // partitioning determined by Kudu : KUDU I think we'll need to find a way to avoid a new partition type, i.e. to treat this is hash partitioning. Line 50: } If it's possible to encapsulate the hash fn as an expr, this might be a good place to put the Expr, e.g. 4: opt TExpr hash_partition_fn Otherwise, I wonder if we can at least determine kudu vs regular hash partitioning in some other way than a new TPartitionType. e.g. we know the target table so maybe we can inspect that at runtime. http://gerrit.cloudera.org:8080/#/c/6037/1/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java File fe/src/main/java/org/apache/impala/analysis/InsertStmt.java: Line 625: private void prepareExpressions(List<Column> selectExprTargetColumns, I assume we'll have to do something similar for update/delete. Upsert is handled here too, right? PS1, Line 634: List<String> kuduPartitionByColumnNames = null; : if (isKuduTable) { : kuduPartitionByColumnNames = ((KuduTable) table_).getPartitionByColumnNames(); : } should this be a Set? Could be duplicates. Also I'm not sure if we should use partition cols or the primary key cols. Ultimately Kudu probably wants the PK. I think the query will be invalid if selectExprTargetColumns doesn't contain the entire PK. -- To view, visit http://gerrit.cloudera.org:8080/6037 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic10b3295159354888efcde3df76b0edb24161515 Gerrit-PatchSet: 1 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Thomas Tauber-Marshall <[email protected]> Gerrit-Reviewer: Matthew Jacobs <[email protected]> Gerrit-HasComments: Yes
