Michael Ho has posted comments on this change.
Change subject: IMPALA-4026: Implement double-buffering for BlockingQueue
Patch Set 5:
Line 79: row_batches_put_timer_ =
> we usually do this in Open() or Prepare() (see other counters in e.g. HdfsS
PS5, Line 75: NotitfyAll
PS5, Line 90: DCHECK
> nit: DCHECK_NE
PS5, Line 98: This may
: // imply that some writers may be sleeping on a partially
> Maybe "If this race occurs, a writer can stay blocked on a partially empty
PS5, Line 99: Given the major
: // use case is with HDFS scan node which has multiple
producers and one consumer, it's
: // expected that some producers can make progress.
> Maybe more simply: "This should only occur when producers are faster than c
Line 102: put_cv_.NotifyOne();
> is it worth explaining this race rather than fixing it? Doesn't pthreads o
Not sure I understand the optimization you are referring to here.
The race here is that a thread can call put_cv_.NotifyOne() while another
thread just checks the queue's size but before it calls put_cv_.Wait(). AFAIK,
the only way to avoid this race is to also grab the "put_lock_" in
BlockingGet() which kind of defeats the purpose of the change.
PS5, Line 137: GetSize
> this is an unfortunate name. I read it to be the size of the "Get" list. M
Line 171: // the queue's size could have changed once the lock is dropped.
> how do you know the deque::size() method doesn't need the synchronization (
Definitely expensive as writer can now block reader even if "get_list_" is not
On x86, an aligned 64-bit read should be atomic. That said, it's a good point
to that we cannot assume the implementation of dequeue::size(). Added an
AtomicInt64 for the get_list's size to make sure all reads will be 32-bit
Line 197: boost::scoped_ptr<std::deque<T>> put_list_;
> why add this extra indirection? couldn't we just do deque::swap() directly
Good point. Done. Also rearranged the class members a bit.
PS5, Line 29: doesn't have any logic to deal with thread interruption
> what's the implication of that? are signals not handled properly?
The thread interruption feature has nothing to do with signal handling. It's a
boost library feature which we don't use (at least for BlockingQueue). It's
basically a way for one thread to interrupt another thread at well defined
point in the code.
To view, visit http://gerrit.cloudera.org:8080/4350
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-Owner: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Alex Behm <alex.b...@cloudera.com>
Gerrit-Reviewer: Chen Huang <paulhuan...@utexas.edu>
Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com>
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com>