Marcel Kornacker has posted comments on this change.

Change subject: IMPALA-2905: Handle coordinator fragment lifecycle like all 

Patch Set 6:

File be/src/exec/push-pull-sink.h:

Line 29: /// Sink which queues incoming batches from fragment, allowing 
consumers to pull from the
QueueSink then? i couldn't quite figure out what PushPullSink was.

Line 30: /// queue at their own pace by calling GetNext().
what's the size of the queue?

Line 66:   virtual Status GetNext(RuntimeState* state, RowBatch** row_batch);
why virtual?

Line 77:   /// copying overhead; since RowBatch is not movable it's wrapped in 
a unique_ptr.
re should be movable: i don't particularly like this comment, since it seems to 
imply that 'moving is always okay' (which i disagree with, and i wouldn't want 
RowBatches to be moved in and out of this queue). happy to talk about it 
File be/src/runtime/

Line 484:     // To prepare the expressions we need a local RuntimeState and 

Line 486:     RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), desc_tbl_, 
leave todo to get that out of the qs. this is expensive to create.

Line 505:   RETURN_IF_ERROR(StartRemoteFragments(&schedule));
rename to StartFragments

Line 510:     // Coordinator fragment instance has same ID as query.
that's actually not true for the mt case.

Line 1471:   // Make sure that InitExecProfile() has been called.
probably best to fix initialization as part of this change (it should precede 
everything else)

Line 1637:   // May not be called for the coordinator fragment, which has no 
averaged profile.
that shouldn't be true anymore
File be/src/runtime/coordinator.h:

Line 266:   /// them to the client in GetNext().
why do you need this and not just the queue sink?

Line 277:   MemTracker* output_expr_tracker_ = nullptr;
are we tracking this right now?

Line 381:   /// dependencies.
i don't understand why that's needed, let's talk in person.

Line 431:   /// TODO: Remove when output exprs evaluated by PushPullSink.
i don't understand why a sink would want to deal with exprs, let's talk in 
File be/src/runtime/

Line 286:   if (sink_->GetName() == PushPullSink::NAME) {
use fragment.output_sink.type instead

Line 359:   if (status.ok()) status = DriveSink();
it looks like we should break up Open() and move the row-producing logic into a 
new function (then the semantics of opened_promise_ would be easy to describe)
File be/src/runtime/plan-fragment-executor.h:

Line 49: /// PlanFragmentExecutor handles all aspects of the execution of a 
single plan fragment,
extend class description with new semantics.

Line 166:   bool done_;
i don't think 'done' quite captures that (we use that word too much). opened?
File be/src/service/

Line 60
don't all fragments now have a sink?

Line 46:   lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
why does this need to happen up here?
File be/src/service/impala-internal-service.h:

Line 36:       FragmentMgr* fragment_mgr)
why even have that param if it's a singleton
File be/src/util/blocking-queue.h:

Line 35: /// Queue entries must be movable.
why is this needed for these changes?
File common/thrift/DataSinks.thrift:

Line 90: struct TPushPullSink {

Line 108:   5: optional TPushPullSink push_pull_sink
remove, we already have 'type'

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb0064ec2f085fa3a5598ea80894fb489a01e4df
Gerrit-PatchSet: 6
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <>
Gerrit-Reviewer: Marcel Kornacker <>
Gerrit-HasComments: Yes

Reply via email to