Sahil Takiar has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13883 )

Change subject: IMPALA-8656: Add RowBatchQueue interface and 
BufferedPlanRootSink impl
......................................................................


Patch Set 3:

(4 comments)

Just responding to a few of the higher level comments so we can nail down 
overall design first. Once we come to an agreement, will start working on the 
rest of the comments.

> High level comment about the refactoring of the RowBatchQueue seems 
> unnecessary at this stage given the similarity of the two versions. It seems 
> sufficient to just add a TryAddBatch() interface to get most of the 
> functionality needed now but I could be missing something.

Mentioned this in some of the other comments, but (1) as this patch stands, yes 
it might not be completely necessary, but will become more useful when we add a 
RowBatchQueue backed by a BufferedTupleStream, (2) there are various places in 
the code we buffer RowBatches, adding RowBatchQueue gives us a unified 
interface to use across the codebase, (3) the current RowBatchQueue is 
blocking, which won't work well in BufferedPlanRootSink since it needs 
additional locks to handle synchronization of its internal state (e.g. 
SenderState) and resources (e.g. MemTrackers that are closed in 
DataSink::Close).

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h
File be/src/exec/buffered-plan-root-sink.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29
PS3, Line 29: The blocking behavior follows
            : /// the same semantics as BlockingPlanRootSink.
> Now that I look at it again, I wonder if "BlockingPlanRootSink" is just a B
Yeah, it probably makes more sense once we change BufferedPlanRootSink to use a 
BufferedTupleStream. Another nice thing about keeping these two classes 
separate is that if there are bugs in the result spooling code, users can 
simply set SPOOL_QUERY_RESULTS = false and the code will fallback to 
BlockingPlanRootSink which is known to be stable.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42
PS3, Line 42: RowBatchQueue* batch_queue)
> Why is this not internally owned by this class ? Why does it need to be pas
Dependency injection. I'm not sure if this is the right way to do this in C++ 
though.

https://en.wikipedia.org/wiki/Dependency_injection
https://en.wikipedia.org/wiki/Dependency_injection#Constructor_injection


http://gerrit.cloudera.org:8080/#/c/13883/4/be/src/exec/buffered-plan-root-sink.cc
File be/src/exec/buffered-plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/4/be/src/exec/buffered-plan-root-sink.cc@45
PS4, Line 45:  is_full_.Wait(l);
> Looking at this code again, the thread will block when the queue is full an
The existing BlockingQueue doesn't expose its lock, which makes the 
synchronization more difficult. You could have two locks: one owned by the 
BlockingQueue and one owned by the BufferedPlanRootSink, but that makes the 
synchronization logic more complex. The BufferedPlanRootSink lock is necessary 
in case the producer calls Close() while the consumer is calling 
QueryResultSet::AddRows, and its needed to protect read / writes of the 
SenderState. Plus, it makes the transition to BufferedTupleStream easier 
because BufferedTupleStream makes no thread-safety guarantees.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h
File be/src/runtime/non-blocking-row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@32
PS3, Line 32:  NonBlockingRowBatchQueue
> If I understand this patch correctly, this queue also has a capacity but it
Adding TryAddBatch() makes sense to me.

Yes, as this patch stands now, the re-factoring for RowBatchQueue is probably 
overkill. Yes, the idea is that this will be most useful when adding the 
SpillableQueue (or whatever we decide to call it).

However, I do think this cleans up the code a little bit. Currently there are 
several different ways we buffer RowBatches throughout the code, having a 
unified interface would make things easier to understand. Doing all the 
re-factoring to migrate other queues to the new RowBatchQueue is probably out 
of the scope of this patch, but seems useful to do in the future.

The RowBatchQueue interface can also be useful for buffering in the KRPC sender 
/ receiver. The NonBlockingRowBatchQueue could be used to replace the current 
RowBatchQueue in KrpcDataStreamRecvr::SenderQueue

Non-blocking might not be the right name for this class (maybe 
DequeRowBatchQueue would be better since by default a std::queue uses 
std::deque). I'm attempting to follow a similar model to the JDK. Java models 
this using an interface called Queue with various implementations 
ArrayBlockingQueue (blocking), LinkedList (not-blocking), PriorityQueue 
(not-blocking), etc.

https://docs.oracle.com/javase/8/docs/api/java/util/Queue.html



--
To view, visit http://gerrit.cloudera.org:8080/13883
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
Gerrit-Change-Number: 13883
Gerrit-PatchSet: 3
Gerrit-Owner: Sahil Takiar <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Sahil Takiar <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-Comment-Date: Tue, 23 Jul 2019 18:16:41 +0000
Gerrit-HasComments: Yes

Reply via email to