Thomas Tauber-Marshall has posted comments on this change. Change subject: PREVIEW: IMPALA-3742: partitions INSERTs into Kudu tables ......................................................................
Patch Set 3: (3 comments) http://gerrit.cloudera.org:8080/#/c/6037/1/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS1, Line 457: } else { : RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(current_row)); : } : } : } : COUNTER_ADD(total_sent_rows_counter_, batch->num_rows()); : RETURN_IF_ERROR(state->CheckQueryState()); > we'll need to find a way to avoid doing this for every row batch Done PS1, Line 466: : Status DataStreamSender::FlushFinal(RuntimeState* state) { : DCHECK(!flushed_); : DCHECK(!closed_); : flushed_ = true; : for (int i = 0; i < channels_.size(); ++i) { : // If we hit an error here, we can return without closing the remaining channels as : // the error is propagated back to the coordinator, which in turn cancels the query, : // which will cause the remaining open channels to be closed. : RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state)); : } : return Status::OK(); : } : : void DataStreamSender::Close(RuntimeState* state) { : if (closed_) return; : for (int i = 0; i < channels_.size(); ++i) { : channels_[i]->Teardown(state); : } : if (partitioner_ != NULL) { : partitioner_->Close(state); : } : DataSink::Close(state); : closed_ = true; : } : : Status DataStreamSender::SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers) { : VLOG_ROW << "serializing " << src->num_rows() << " rows"; : { : SCOPED_TIMER(profile_->total_time_counter()); : SCOPED_TIMER(serialize_batch_timer_); : RETURN_IF_ERROR(src->Serialize(dest)); : int bytes = RowBatch::GetBatchSize(*dest); : int uncompressed_bytes = bytes - dest->tuple_data.size() + dest->uncompressed_size; : // The size output_batch would be if we didn't compress tuple_data (will be equal to : // actual batch size if tuple_data isn't compressed) : : COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers); : COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers); : } : return Status::OK(); : } : : int64_t DataStreamSender::GetNumDataBytesSent() const { : // TODO: do we need synchronization here or are reads & writes to 8-byte ints : // atomic? : int64_t result = 0; : for (int i = 0; i < channels_.size(); ++i) { : result += channels_[i]->num_data_bytes_sent(); : } : return result; : } : : } : : > let's see if we can share some code with kudu-table-sink, at least the swit Done http://gerrit.cloudera.org:8080/#/c/6037/1/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java File fe/src/main/java/org/apache/impala/analysis/InsertStmt.java: PS1, Line 634: boolean isKuduTable = table_ instanceof KuduTable; : Set<String> kuduPartitionByColumnNames = null; : if (isKuduTable) { : > should this be a Set? Could be duplicates. Also I'm not sure if we should u I believe only the partition columns are needed here, based on testing and on the comment in kudu/client/client.h which says ' May return a bad Status if the provided row does not have all partitions of the partition key set' -- To view, visit http://gerrit.cloudera.org:8080/6037 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic10b3295159354888efcde3df76b0edb24161515 Gerrit-PatchSet: 3 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Thomas Tauber-Marshall <[email protected]> Gerrit-Reviewer: Matthew Jacobs <[email protected]> Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]> Gerrit-HasComments: Yes
