Michael Ho has posted comments on this change.

Change subject: IMPALA-4026: Implement double-buffering for BlockingQueue

Patch Set 5:


File be/src/exec/hdfs-scan-node.cc:

Line 79:   row_batches_put_timer_ = 
runtime_profile()->AddCounter("QueuePutTime", TUnit::TIME_NS);
> we usually do this in Open() or Prepare() (see other counters in e.g. HdfsS

File be/src/util/blocking-queue.h:

PS5, Line 75: NotitfyAll
> NotifyAll().

PS5, Line 90: DCHECK
> nit: DCHECK_NE
DCHECK removed.

PS5, Line 98:  This may
            :     // imply that some writers may be sleeping on a partially 
empty queue
> Maybe "If this race occurs, a writer can stay blocked on a partially empty 

PS5, Line 99: Given the major
            :     // use case is with HDFS scan node which has multiple 
producers and one consumer, it's
            :     // expected that some producers can make progress.
> Maybe more simply: "This should only occur when producers are faster than c

Line 102:     put_cv_.NotifyOne();
> is it worth explaining this race rather than fixing it?  Doesn't pthreads o
Not sure I understand the optimization you are referring to here.

The race here is that a thread can call put_cv_.NotifyOne() while another 
thread just checks the queue's size but before it calls put_cv_.Wait(). AFAIK, 
the only way to avoid this race is to also grab the "put_lock_" in 
BlockingGet() which kind of defeats the purpose of the change.

PS5, Line 137: GetSize
> this is an unfortunate name. I read it to be the size of the "Get" list.  M

Line 171:     // the queue's size could have changed once the lock is dropped.
> how do you know the deque::size() method doesn't need the synchronization (
Definitely expensive as writer can now block reader even if "get_list_" is not 

On x86, an aligned 64-bit read should be atomic. That said, it's a good point 
to that we cannot assume the implementation of dequeue::size(). Added an 
AtomicInt64 for the get_list's size to make sure all reads will be 32-bit 

Line 197:   boost::scoped_ptr<std::deque<T>> put_list_;
> why add this extra indirection?  couldn't we just do deque::swap() directly
Good point. Done. Also rearranged the class members a bit.

File be/src/util/condition-variable.h:

PS5, Line 29: doesn't have any logic to deal with thread interruption
> what's the implication of that?  are signals not handled properly?
The thread interruption feature has nothing to do with signal handling. It's a 
boost library feature which we don't use (at least for BlockingQueue). It's 
basically a way for one thread to interrupt another thread at well defined 
point in the code.


To view, visit http://gerrit.cloudera.org:8080/4350
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ib9f4cf351455efefb0f3bb791cf9bc82d1421d54
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Alex Behm <alex.b...@cloudera.com>
Gerrit-Reviewer: Chen Huang <paulhuan...@utexas.edu>
Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com>
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com>
Gerrit-HasComments: Yes

Reply via email to