Thomas Tauber-Marshall has posted comments on this change.

Change subject: IMPALA-3742: Partitions and sort INSERTs for Kudu tables
......................................................................


Patch Set 2:

(25 comments)

http://gerrit.cloudera.org:8080/#/c/6559/1/be/src/exec/kudu-util.h
File be/src/exec/kudu-util.h:

PS1, Line 62: int col, PrimitiveType type
> Comment on these params.
Done


http://gerrit.cloudera.org:8080/#/c/6559/2/be/src/exprs/kudu-partition-expr.cc
File be/src/exprs/kudu-partition-expr.cc:

PS2, Line 48:   using namespace kudu::client
> we typically don't use inline 'using' statements
Done


PS2, Line 51: CreateKuduClient
> use QueryState::GetKuduClient
I haven't rebased in awhile and don't have this locally, and rebasing is 
probably going to be painful as I'll have to rebase my patched version of Kudu 
for the toolchain also, so I'm going to punt on it for today and address it 
when I have more time.


Line 61:   RegisterFunctionContext(ctx, state);
> comment the return value is ignored because the idx is stored on the base c
Done


PS2, Line 72:  if (table_schema.Column(col).is_nullable()) {
> this should never be true right now, PK cols are not nullable. per my comme
Done


PS2, Line 80:         return PartitionError(ctx,
            :             
ErrorMsg::Init(TErrorCode::KUDU_NULL_CONSTRAINT_VIOLATION,
            :                 table_desc_->table_name()).msg());
> This will change the failure mode of our inserts, which is that NCV do not 
Done


Line 87:     if (!s.ok()) {
> I checked with Alexey on the Kudu team, this would only fail if we: a) wrot
Done


PS2, Line 95:   if (!s.ok()) {
            :     return PartitionError(
            :         ctx, Substitute("Failed to determine Kudu partition for 
row: $0", s.ToString()));
            :   }
> same thing about errors here, I think if this returns an error it means we 
Done


PS2, Line 110:   ctx->fn_context(fn_context_index_)->AddWarning(msg.c_str());
> I don't know if there are any errors we need to be reporting here.
Done


http://gerrit.cloudera.org:8080/#/c/6559/2/be/src/exprs/kudu-partition-expr.h
File be/src/exprs/kudu-partition-expr.h:

PS2, Line 30: number
> index
Done


PS2, Line 47: //
> ///
Done


http://gerrit.cloudera.org:8080/#/c/6559/1/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS1, Line 1845: f (fragment.__isset.output_sink && 
fragment.output_sink.__isset.stream_sink
              :       && fragment.output_sink.type == 
TDataSinkType::DATA_STREAM_SINK
              :       && fragment.output_sink.stream_sink.output_partition.type 
== TPartitionType::KUDU)
> Add a brief comment.
Done


http://gerrit.cloudera.org:8080/#/c/6559/1/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

Line 364:       DCHECK(sink.output_partition.type == 
TPartitionType::UNPARTITIONED
> I don't see what the point of this is. This enumerates possible values.
It doesn't include RANGE_PARTITIONED, although I don't believe we currently use 
RANGE_PARTITIONED, so yeah not sure what this DCHECK is accomplishing. Want me 
to remove it?


http://gerrit.cloudera.org:8080/#/c/6559/1/common/thrift/Exprs.thrift
File common/thrift/Exprs.thrift:

PS1, Line 134: 4
> what happened to 2 and 3?
Done


PS1, Line 137: 5
> 3? :)
Done


http://gerrit.cloudera.org:8080/#/c/6559/2/common/thrift/Partitions.thrift
File common/thrift/Partitions.thrift:

PS2, Line 50:   // Expr that takes a row and returns its partition number. If 
specified,
            :   // 'partition_exprs' will be empty. Used for 
TPartitionType::KUDU.
            :   3: optional Exprs.TExpr partition_expr
> how about for Kudu the expr just goes in partition_exprs[0] (and it's the o
As discussed with Marcel, we're moving partition_expr out of TDataPartition 
anyways.


http://gerrit.cloudera.org:8080/#/c/6559/1/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
File fe/src/main/java/org/apache/impala/analysis/InsertStmt.java:

PS1, Line 113: // Set in analyze(). Maps exprs in partitionKeyExprs_ to their 
column's position in the
             :   // table, eg. partitionKeyExprs_[i] = 
table_.columns(partitionKeyIdx_[i])
             :   private List<Integer> partitionKeyIdxs_ = Lists.newArrayList();
> Just for my understanding, can you explain why this is needed for this chan
The partition keys are a subset of the primary keys and so not guaranteed to be 
the first n columns. We only need to send the partition keys to Kudu to get the 
partition, so this allows us to determine what column positions to write the 
partition values for the row into in the be. It gets passed through 
TKuduPartitionExpr.referenced_columns


http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
File fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java:

PS2, Line 49: artitionKeyExprs_ = Expr.cloneList(partitionKeyExprs);
> Is there a reason we keep a separate copy of partitionKeyExprs in addition 
At the very least, I think we have to preserve the types.


PS2, Line 69:         
children_.get(i).uncheckedCastTo(partitionKeyExprs_.get(i).getType());
> as we discussed, I feel like we shouldn't have to do this
The problem is that the analysis state gets reset and everything is reanalyzed. 
When that happens, we lose implicit casts on the children.

The specific case that motivated this is a VALUES clause with a NullLiteral, 
which will hit a Precondition check while being converted to thrift if it 
hasn't been implicitly cast to something other than NULL_TYPE. I'm not sure how 
else to handle that.


PS2, Line 75: isConstantImpl
> Should this be constant?
I was primarily thinking about the round-robin values for rows that don't fit 
in any partition, which makes this expr non-deterministic.


http://gerrit.cloudera.org:8080/#/c/6559/1/fe/src/main/java/org/apache/impala/planner/DataPartition.java
File fe/src/main/java/org/apache/impala/planner/DataPartition.java:

PS1, Line 48: // For kudu partition: expr that calls into the kudu client to 
get the partition number.
            :   private Expr partitionExpr_;
            : 
            :   // Should be called only by the static factory method for Kudu 
partitioned tables.
            :   private DataPartition(TPartitionType type, Expr partitionExpr) {
            :     Preconditions.checkNotNull(partitionExpr);
            :     Preconditions.checkState(type == TPartitionType.KUDU);
            :     type_ = type;
            :     partitionExpr_ = partitionExpr;
            :     partitionExprs_ = Lists.newArrayList();
            :   }
> I am not sure I get this. Why not passing a single element list of the case
Good idea, but per discussions with Marcel we're moving partitionExpr out of 
the DataPartition anyways.


http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/planner/DataPartition.java
File fe/src/main/java/org/apache/impala/planner/DataPartition.java:

PS2, Line 45:   // For hash partition: exprs used to compute hash value.
            :   private List<Expr> partitionExprs_;
            : 
            :   // For kudu partition: expr that calls into the kudu client to 
get the partition number.
            :   private Expr partitionExpr_;
> I think this will be cleaned up a bit if we just store the Kudu partition e
Done


http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
File fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java:

PS2, Line 251: DescriptorTable.TABLE_SINK_ID
> I don't think this is what you want. I think you want the table ID of the t
This value doesn't get incremented - it will always be 0. Also, InsertStmt 
calls descTbl.setTargetTable on its target table, so I'm fairly sure this will 
be the right id.

I agree that it seems hacky, and I would rather put something here like 
insertStmt.getTargetTable().getId(), except that Table doesn't have a getId() 
and tableIds are generated during DescriptorTable.toThrift, so I'm not sure how 
to thread that through.

I also think that the related code in coordinator.cc is hacky, so perhaps 
there's someone who understands how the DescriptorTable is supposed to work who 
I could ask to weigh in?


http://gerrit.cloudera.org:8080/#/c/6559/2/fe/src/main/java/org/apache/impala/planner/Planner.java
File fe/src/main/java/org/apache/impala/planner/Planner.java:

PS2, Line 541:       Expr partitionExpr = new 
KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
             :           insertStmt.getPartitionKeyExprs(), 
insertStmt.getPartitionKeyIdxs());
             :       partitionExpr.analyze(ctx_.getRootAnalyzer());
> I wonder if we can/should clone the KuduPartitionExpr instead of creating i
Done


http://gerrit.cloudera.org:8080/#/c/6559/2/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
File testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test:

PS2, Line 16: KUDU
> it'd be nice for this to indicate it's an exchange on the Kudu partitioning
Done


-- 
To view, visit http://gerrit.cloudera.org:8080/6559
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I84ce0032a1b10958fdf31faef225372c5c38fdc4
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Thomas Tauber-Marshall <[email protected]>
Gerrit-Reviewer: Dimitris Tsirogiannis <[email protected]>
Gerrit-Reviewer: Matthew Jacobs <[email protected]>
Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]>
Gerrit-HasComments: Yes

Reply via email to