Michael Ho has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/13883 )

Change subject: IMPALA-8656: Add RowBatchQueue interface and 
BufferedPlanRootSink impl
......................................................................


Patch Set 3:

(25 comments)

High level comment about the refactoring of the RowBatchQueue seems unnecessary 
at this stage given the similarity of the two versions. It seems sufficient to 
just add a TryAddBatch() interface to get most of the functionality needed now 
but I could be missing something.

http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG@9
PS3, Line 9: a blocking and
           : non-blocking implementation
It may help to spell out the exact difference in the two categories.

My interpretation is that a blocking queue will have limited capacity so it 
will block once capacity is reached.

Non-blocking queue seems to lead to two possible interpretations:
- it still has limited capacity but when capacity is reached, it will somehow 
not block but return something
- the queue "does not have a capacity" so it will never block

I guess we eventually want to get to some version of the second interpretation 
but it seems that this patch is an implementation of the first version.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h
File be/src/exec/buffered-plan-root-sink.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29
PS3, Line 29: The blocking behavior follows
            : /// the same semantics as BlockingPlanRootSink.
Now that I look at it again, I wonder if "BlockingPlanRootSink" is just a 
BufferedPlanRootSink with a buffer size of 1 (or 0?). Once the buffering limit 
is reached for this class, it will revert to the behavior of a 
BlockingPlanRootSink so it seems as though BlockingPlanRootSink is just a 
special version of BufferedPlanRootSink.

I guess having two classes makes more sense once we actually back the buffering 
mechanism with buffered tuple stream with spilling so the buffering limit is 
removed. As the code stands now, it makes me wonder if the two separate 
PlanRootSink classes are needed.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42
PS3, Line 42: RowBatchQueue* batch_queue)
Why is this not internally owned by this class ? Why does it need to be passed 
in via ctor this way ?


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@65
PS3, Line 65: boost::mutex
std::mutex. IIUC, the general direction is to move away from boost library.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc
File be/src/exec/buffered-plan-root-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc@38
PS3, Line 38: output_batch->AcquireState(batch);
This is not making a copy. This is actually transferring the ownership of the 
resource of 'batch' to 'output_batch'.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc
File be/src/exec/data-sink.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc@114
PS3, Line 114: new NonBlockingRowBatchQueue(10))
Who owns this object ? It doesn't look like it's being added to pool. We 
usually make ownership explicit via unique_ptr / scoped_ptr or we add an object 
to an object pool and document that it's owned by certain object pool so the 
scope and the ownership of it is clear.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h
File be/src/exec/plan-root-sink.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h@61
PS3, Line 61:  = 0;
Is it still valid in this patch ? Given PlanRootSink::Send() is not pure 
virtual anymore, it seems appropriate to document what it does and its return 
value.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc@315
PS3, Line 315: Shutdown();
Close();

Should stick to the interface of RowBatchQueue if possible and hide the 
implementation details.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h
File be/src/runtime/blocking-row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@54
PS3, Line 54:  public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>,
            :       public RowBatchQueue {
Not a big fan of multi-inheritance.

Is there any reason why this class cannot just contain an instance of  
BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn> ?  In other words, 
why cannot we address the TODO mentioned above in this patch directly ?


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@67
PS3, Line 67: /// Adds a batch to the queue. This is blocking if the queue is 
full.
Please also document the return value.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@70
PS3, Line 70: Returns NULL if there are no more.
            :   /// This function blocks.
            :   /// Returns NULL after Shutdown()
Did you mean "This function will block if the queue is empty and returns NULL 
if the queue is shut down."


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@75
PS3, Line 75: underlying BlockingQueue
queue

No need to mention the internal implementation in the interface. Also, it helps 
to clarify what it means for the queue to be full given the two parameters 
passed to the constructor.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@79
PS3, Line 79: full
typo


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@100
PS3, Line 100: /// Queue of orphaned row batches
Not your change but it seems clearer to say "queue of orphaned row batches 
enqueued after the RowBatchQueue() has been closed.". They need to exist as 
preceding row batches may reference buffer owned by row batches in this queue.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h
File be/src/runtime/non-blocking-row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@28
PS3, Line 28: A RowBatchQueue that stores batches in a std::queue.
Instead of mentioning the implementation details, please document the 
differentiation of this class as compared to the other RowBatchQueue 
implementation and why a client would choose this class over the other one.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@32
PS3, Line 32:  NonBlockingRowBatchQueue
If I understand this patch correctly, this queue also has a capacity but it 
will not block (instead return false) when enqueuing into a full queue. Other 
than that, it is pretty similar in other aspect with the blocking version.

It makes me wonder if it is sufficient for now to just  to extend the 
RowBatchQueue class to implement a TryAddBatch() interface which will return 
false if the capacity is reached. Overloading the existing AddBatch() interface 
for the two semantics seems a bit confusing.

In the future, once the queue is actually backed by buffered tuple stream, then 
the refactoring of RowBatchQueue into two classes make more sense and I will 
call this class an SpillableQueue or something with a better name. As this 
patch stands now, a separate class seems like an overkill IMHO.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@40
PS3, Line 40: /// Adds the given RowBatch to the queue.
Please document return value for most functions unless it's very trivial. In 
this case, this can return false if the queue is closed or if the queue is 
full, right ?


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@43
PS3, Line 43: /// Returns and removes the RowBatch at the head of the queue.
Please document the behavior when the queue is empty.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h
File be/src/runtime/row-batch-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@26
PS3, Line 26: This interface makes no
            : /// thread-safety guarantees and different implementations might 
have different
            : /// thread-safety semantics.
This assumption seems to make this interface somewhat hard to use if thread 
safety is implementation dependent. Would be good to have a common semantics 
for all classes extending this class.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@38
PS3, Line 38: /// Prepares the queue so that batches can be added.
Is Init() a better name ? Also, this must be called before adding any rows to 
the row batch, right ?


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@42
PS3, Line 42: returns false if the queue is full
Please see comments below about the queue being full. Also, this comment seems 
invalid for the blocking version of the implementation as it only returns false 
after the queue has been shut down. It seems better to have a consistent 
interface and return value for both implementations.

Please also document the ownership change of the batch.

Please also see comments elsewhere about the suggestion to add a TryAddBatch() 
interface which may forgo the need for two implementation of this class for now.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@52
PS3, Line 52:   /// Returns true if the queue is full, false otherwise.
            :   virtual bool IsFull() const = 0;
Judging from the interface of this class, it seems a bit weird to have a 
IsFull() function without any way to specify the capacity in the constructor.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@55
PS3, Line 55: /// Returns false if the queue has been closed
It'd be nice to document the expected behavior of various functions (e.g. 
AddBatch(), GetBatch()) after the queue has been closed.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h
File be/src/util/blocking-queue.h:

http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@195
PS3, Line 195: false if the queue has been closed, true otherwise.
Returns true if the queue hasn't been shut down.


http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@196
PS3, Line 196:   bool IsOpen() const {
             :     return !shutdown_;
             :   }
nit: one line



--
To view, visit http://gerrit.cloudera.org:8080/13883
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
Gerrit-Change-Number: 13883
Gerrit-PatchSet: 3
Gerrit-Owner: Sahil Takiar <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Ho <[email protected]>
Gerrit-Reviewer: Tim Armstrong <[email protected]>
Gerrit-Comment-Date: Tue, 23 Jul 2019 06:41:31 +0000
Gerrit-HasComments: Yes

Reply via email to