Sahil Takiar has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13883 )

Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and 
BufferedPRS imp
......................................................................


Patch Set 3:

(27 comments)

http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG@9
PS3, Line 9: a blocking and
           : non-blocking implementation
> It may help to spell out the exact difference in the two categories.
Updated the commit message now that the changes to add the generic 
RowBatchQueue aren't going in this patch.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h
File be/src/exec/buffered-plan-root-sink.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29
PS3, Line 29: The blocking behavior follows
            : /// the same semantics as BlockingPlanRootSink.
> Yes, I guess at some point soon, we may want to consolidate on the implemen
Done


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42
PS3, Line 42: RowBatchQueue* batch_queue)
> Thanks for the link. I can see the advantage being that different users of
Agreed. Changed so that BufferedPlanRootSink owns the given batch_queue.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@65
PS3, Line 65: boost::mutex
> std::mutex. IIUC, the general direction is to move away from boost library.
It seems that our ConditionVariables (util/condition-variable.h) are tied to 
boost::mutex-es. So I can't seem to use std::mutex here with some more 
re-factoring.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc
File be/src/exec/buffered-plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc@38
PS3, Line 38: output_batch->AcquireState(batch);
> This is not making a copy. This is actually transferring the ownership of t
Replaced this with DeepCopyTo per Tim's suggestion. Note this will probably 
change when we move to the BufferedTupleStream implementation as a BTS does its 
own copy of the given batch.

The call to DeepCopyTo is necessary when using a std::deque because the 
FragmentInstanceState owns the given RowBatch.


http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc
File be/src/exec/buffered-plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc@38
PS7, Line 38:   output_batch->AcquireState(batch);
> AcquireState isn't safe to use here - it doesn't copy into fresh memory, ju
Thanks for the info Tim. Replaced this DeepCopyTo. I think BufferedTupleStream 
is already doing something like DeepCopyTo, correct?

Currently, the copy is necessary because FragmentInstanceState owns the given 
RowBatch. However, once we migrate to the BTS queue, the call to DeepCopyTo 
shouldn't be necessary, because BTS already does a copy, right?


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc
File be/src/exec/data-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc@114
PS3, Line 114: new NonBlockingRowBatchQueue(10))
> Who owns this object ? It doesn't look like it's being added to pool. We us
Fixed so that BufferedPlanRootSink owns and manages the lifecycle of the queue; 
the queue is managed be a unique_ptr.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h
File be/src/exec/plan-root-sink.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h@61
PS3, Line 61:  = 0;
> Is it still valid in this patch ? Given PlanRootSink::Send() is not pure vi
Per Tim's suggestion. I moved the PlanRootSink::Send implementation to a new 
helper method call ValidateRowBatch.


http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc
File be/src/exec/plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc@64
PS7, Line 64: Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
> I'd find it clearer if this was a helper function instead of the default im
Makes sense, moved this to a new method called ValidateRowBatch.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc@315
PS3, Line 315: Shutdown();
> Close();
Now that I removed the generic RowBatchQueue interface from this patch, 
BlockingRowBatchQueue has a Shutdown() method.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h
File be/src/runtime/blocking-row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@54
PS3, Line 54:  public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>,
            :       public RowBatchQueue {
> Not a big fan of multi-inheritance.
Now that I removed the generic RowBatchQueue interface, this class no longer 
inherits from multiple classes. However, I still made the necessary changes so 
that BlockingRowBatchQueue owns a BlockingQueue rather than extends one. I 
think the rule of composition over inheritance works well here, and it allows 
us to add a generic RowBatchQueue interface later.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@67
PS3, Line 67: /// Adds a batch to the queue. This is blocking if the queue is 
full.
> Please also document the return value.
No longer returns a 'bool' because the usage doesn't require one (it would 
always return true because any batches that can't be added to the BlockingQueue 
are added to the cleanup queue).


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@70
PS3, Line 70: Returns NULL if there are no more.
            :   /// This function blocks.
            :   /// Returns NULL after Shutdown()
> Did you mean "This function will block if the queue is empty and returns NU
Didn't write this, but yes I think that is what the comments mean. Updated them 
to make it more clear.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@75
PS3, Line 75: underlying BlockingQueue
> queue
Done


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@79
PS3, Line 79: full
> typo
Deleted IsEmpty because its not actually used by any users of 
BlockingRowBatchQueue.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@100
PS3, Line 100: /// Queue of orphaned row batches
> Not your change but it seems clearer to say "queue of orphaned row batches
Done


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h
File be/src/runtime/non-blocking-row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@28
PS3, Line 28: A RowBatchQueue that stores batches in a std::queue.
> Instead of mentioning the implementation details, please document the diffe
I updated the docs a bit to make it a bit more clearer, but decided to leave in 
the mention of std::deque usage.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@32
PS3, Line 32:  NonBlockingRowBatchQueue
> As discussed offline, we may be able to get away with TryPutBatch() only. I
After playing around with a few things, here is what I decided to do:

* I deleted the generic RowBatchQueue interface, I agree it is probably not 
necessary at the moment and may be more useful once a SpillableQueue is used
* I kept most of the re-factoring of the old RowBatchQueue (the one used by the 
scanners) because (1) the cleanup seems generally useful, and (2) it more 
easily allows future commits to introduce a generic RowBatchQueue interface
* I renamed NonBlockingRowBatchQueue to DequeRowBatchQueue because I think the 
name is clearer
* BufferedPlanRootSink uses DequeRowBatchQueue directly, the class is generic 
enough that I think replacing it with SpillableQueue later should be 
straightforward
* I considered using BlockingRowBatchQueue in BufferedPlanRootSink because 
decided not to for the following reasons: (1) BufferedPlanRootSink will use 
SpillableQueue eventually any so it doesn't seem worth to invest the effort in 
getting BlockingRowBatchQueue to work with BufferedPlanRootSink if its just 
going to be throw away work, (2) getting BlockingRowBatchQueue to work in 
BufferedPlanRootSink requires re-writing a lot of the locking logic in 
BufferedPlanRootSink, (4) BlockingRowBatchQueue contains a lot of scanner 
specific semantics (the RowBatchBytesFn, the cleanup queue), (5) 
BlockingRowBatchQueue would need a thread-safe Cleanup() method (can't be added 
to BlockingQueue because Cleanup() requires calling Reset on all the 
RowBatch-es, so we would need to add additional locks to guard against calling 
Cleanup(), Add/GetBatch concurrently).


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@40
PS3, Line 40: /// Adds the given RowBatch to the queue.
> Please document return value for most functions unless it's very trivial. I
Done


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@43
PS3, Line 43: /// Returns and removes the RowBatch at the head of the queue.
> Please document the behavior when the queue is empty.
Done


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h
File be/src/runtime/row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@26
PS3, Line 26: This interface makes no
            : /// thread-safety guarantees and different implementations might 
have different
            : /// thread-safety semantics.
> This assumption seems to make this interface somewhat hard to use if thread
Deleted this class.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@38
PS3, Line 38: /// Prepares the queue so that batches can be added.
> Is Init() a better name ? Also, this must be called before adding any rows
Deleted this class, but BlockingRowBatchQueue has a Prepare method that I 
changed to Init.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@42
PS3, Line 42: returns false if the queue is full
> Please see comments below about the queue being full. Also, this comment se
Deleted this class, but documented the ownership change of the batch in 
DequeRowBatchQueue.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@52
PS3, Line 52:   /// Returns true if the queue is full, false otherwise.
            :   virtual bool IsFull() const = 0;
> Judging from the interface of this class, it seems a bit weird to have a Is
Deleted this class, both DequeRowBatchQueue and BlockingRowBatchQueue has 
constructors that specify capacity limits.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@55
PS3, Line 55: /// Returns false if the queue has been closed
> It'd be nice to document the expected behavior of various functions (e.g. A
Deleted this class, but clarified this in DequeRowBatchQueue.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h
File be/src/util/blocking-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@195
PS3, Line 195: false if the queue has been closed, true otherwise.
> Returns true if the queue hasn't been shut down.
Deleted this since its no longer necessary.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@196
PS3, Line 196:   bool IsOpen() const {
             :     return !shutdown_;
             :   }
> nit: one line
Done



--
To view, visit http://gerrit.cloudera.org:8080/13883
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
Gerrit-Change-Number: 13883
Gerrit-PatchSet: 3
Gerrit-Owner: Sahil Takiar <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Sahil Takiar <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-Comment-Date: Wed, 24 Jul 2019 19:51:49 +0000
Gerrit-HasComments: Yes

Reply via email to