Alex Behm has posted comments on this change.

Change subject: IMPALA-2905: Handle coordinator fragment lifecycle like all 

Patch Set 6:

File be/src/exec/

Line 31: constexpr int32_t QUEUE_DEPTH = 16;
We need to carefully consider the implications on memory management. Some exec 
nodes set a special marker 'need_to_return_' in batches which forces them up 
the exec node tree to free memory (e.g., such that the memory can be used to 
move on to the next spilled hash partition in the next GetNext()). If those 
batches linger in a queue I think we break the contract and we may end up using 
more memory.

Sure, this is only relevant in rare cases in the coord fragment, but I think 
it's a good reason to set this value to 1.

Line 54:   row_batch_queue_.BlockingPut(move(my_batch));
Will this thread be unblocked when the query is cancelled? I imagine a scenario 
where the queue is full, but the client simply does not call fetch anymore.

Line 58: void PushPullSink::Close(RuntimeState* state) {
The row batches still sitting in the queue are Reset() implicitly when this 
sink is destroyed. We should prefer an explicit Reset() in here.

Line 61:   consumer_done_.Get();
I think we need generally me more careful with these indefinite waits. As 
described above there could be scenarios where the thread is never unblocked 
and not even cancellation will wake the thread up.
File be/src/exec/push-pull-sink.h:

Line 71:   void CloseConsumer();
Do we really need to differentiate between Close() and CloseConsumer()? Seems 
like both threads could see the effect by checking whether the queue is shut 
File be/src/runtime/

Line 356:     executor_(nullptr), // Set in Prepare()
Set in Exec()

Line 471:       MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, 
pool_tracker, NULL);
Can we get rid of rm_reserved_limit_ in MemTracker?

Line 490:     const TPlanNode root_node = request.fragments[0].plan.nodes[0];
const TPlanNode& ?

Line 491:     RowDescriptor my_row_desc(
my_row_desc -> root_row_desc

Line 505:   RETURN_IF_ERROR(StartRemoteFragments(&schedule));
> rename to StartFragments
If we return with an error here, could some threads hang indefinitely, waiting 
for profiles_ready_ to be set?
Some fragments may have started ok, but not others.

Line 1472:   profiles_ready_.Get();
Is this always set, even in weird error scenarios?
File be/src/runtime/

Line 339:   
Could another thread wait for opened_promise_ indefinitely if this returns an 
File be/src/runtime/plan-fragment-executor.h:

Line 208:   /// Set when Prepare() returns.
Also set in error scenarios and/or cancellation?
File fe/src/main/java/com/cloudera/impala/planner/

Line 29: public class PushPullSink extends DataSink {
Imo it would be very nice to eventually have this be a ResultSetSink. Apart 
from the other BE sanity benefits, we then would have place to naturally show 
the final result exprs.

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb0064ec2f085fa3a5598ea80894fb489a01e4df
Gerrit-PatchSet: 6
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <>
Gerrit-Reviewer: Alex Behm <>
Gerrit-Reviewer: Marcel Kornacker <>
Gerrit-HasComments: Yes

Reply via email to