Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13883 )
Change subject: IMPALA-8656: Add RowBatchQueue interface and BufferedPlanRootSink impl ...................................................................... Patch Set 3: (25 comments) High level comment about the refactoring of the RowBatchQueue seems unnecessary at this stage given the similarity of the two versions. It seems sufficient to just add a TryAddBatch() interface to get most of the functionality needed now but I could be missing something. http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG@9 PS3, Line 9: a blocking and : non-blocking implementation It may help to spell out the exact difference in the two categories. My interpretation is that a blocking queue will have limited capacity so it will block once capacity is reached. Non-blocking queue seems to lead to two possible interpretations: - it still has limited capacity but when capacity is reached, it will somehow not block but return something - the queue "does not have a capacity" so it will never block I guess we eventually want to get to some version of the second interpretation but it seems that this patch is an implementation of the first version. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h File be/src/exec/buffered-plan-root-sink.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29 PS3, Line 29: The blocking behavior follows : /// the same semantics as BlockingPlanRootSink. Now that I look at it again, I wonder if "BlockingPlanRootSink" is just a BufferedPlanRootSink with a buffer size of 1 (or 0?). Once the buffering limit is reached for this class, it will revert to the behavior of a BlockingPlanRootSink so it seems as though BlockingPlanRootSink is just a special version of BufferedPlanRootSink. I guess having two classes makes more sense once we actually back the buffering mechanism with buffered tuple stream with spilling so the buffering limit is removed. As the code stands now, it makes me wonder if the two separate PlanRootSink classes are needed. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42 PS3, Line 42: RowBatchQueue* batch_queue) Why is this not internally owned by this class ? Why does it need to be passed in via ctor this way ? http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@65 PS3, Line 65: boost::mutex std::mutex. IIUC, the general direction is to move away from boost library. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc File be/src/exec/buffered-plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc@38 PS3, Line 38: output_batch->AcquireState(batch); This is not making a copy. This is actually transferring the ownership of the resource of 'batch' to 'output_batch'. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc File be/src/exec/data-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc@114 PS3, Line 114: new NonBlockingRowBatchQueue(10)) Who owns this object ? It doesn't look like it's being added to pool. We usually make ownership explicit via unique_ptr / scoped_ptr or we add an object to an object pool and document that it's owned by certain object pool so the scope and the ownership of it is clear. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h File be/src/exec/plan-root-sink.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h@61 PS3, Line 61: = 0; Is it still valid in this patch ? Given PlanRootSink::Send() is not pure virtual anymore, it seems appropriate to document what it does and its return value. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc File be/src/exec/scan-node.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc@315 PS3, Line 315: Shutdown(); Close(); Should stick to the interface of RowBatchQueue if possible and hide the implementation details. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h File be/src/runtime/blocking-row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@54 PS3, Line 54: public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>, : public RowBatchQueue { Not a big fan of multi-inheritance. Is there any reason why this class cannot just contain an instance of BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn> ? In other words, why cannot we address the TODO mentioned above in this patch directly ? http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@67 PS3, Line 67: /// Adds a batch to the queue. This is blocking if the queue is full. Please also document the return value. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@70 PS3, Line 70: Returns NULL if there are no more. : /// This function blocks. : /// Returns NULL after Shutdown() Did you mean "This function will block if the queue is empty and returns NULL if the queue is shut down." http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@75 PS3, Line 75: underlying BlockingQueue queue No need to mention the internal implementation in the interface. Also, it helps to clarify what it means for the queue to be full given the two parameters passed to the constructor. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@79 PS3, Line 79: full typo http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@100 PS3, Line 100: /// Queue of orphaned row batches Not your change but it seems clearer to say "queue of orphaned row batches enqueued after the RowBatchQueue() has been closed.". They need to exist as preceding row batches may reference buffer owned by row batches in this queue. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h File be/src/runtime/non-blocking-row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@28 PS3, Line 28: A RowBatchQueue that stores batches in a std::queue. Instead of mentioning the implementation details, please document the differentiation of this class as compared to the other RowBatchQueue implementation and why a client would choose this class over the other one. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@32 PS3, Line 32: NonBlockingRowBatchQueue If I understand this patch correctly, this queue also has a capacity but it will not block (instead return false) when enqueuing into a full queue. Other than that, it is pretty similar in other aspect with the blocking version. It makes me wonder if it is sufficient for now to just to extend the RowBatchQueue class to implement a TryAddBatch() interface which will return false if the capacity is reached. Overloading the existing AddBatch() interface for the two semantics seems a bit confusing. In the future, once the queue is actually backed by buffered tuple stream, then the refactoring of RowBatchQueue into two classes make more sense and I will call this class an SpillableQueue or something with a better name. As this patch stands now, a separate class seems like an overkill IMHO. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@40 PS3, Line 40: /// Adds the given RowBatch to the queue. Please document return value for most functions unless it's very trivial. In this case, this can return false if the queue is closed or if the queue is full, right ? http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@43 PS3, Line 43: /// Returns and removes the RowBatch at the head of the queue. Please document the behavior when the queue is empty. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h File be/src/runtime/row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@26 PS3, Line 26: This interface makes no : /// thread-safety guarantees and different implementations might have different : /// thread-safety semantics. This assumption seems to make this interface somewhat hard to use if thread safety is implementation dependent. Would be good to have a common semantics for all classes extending this class. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@38 PS3, Line 38: /// Prepares the queue so that batches can be added. Is Init() a better name ? Also, this must be called before adding any rows to the row batch, right ? http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@42 PS3, Line 42: returns false if the queue is full Please see comments below about the queue being full. Also, this comment seems invalid for the blocking version of the implementation as it only returns false after the queue has been shut down. It seems better to have a consistent interface and return value for both implementations. Please also document the ownership change of the batch. Please also see comments elsewhere about the suggestion to add a TryAddBatch() interface which may forgo the need for two implementation of this class for now. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@52 PS3, Line 52: /// Returns true if the queue is full, false otherwise. : virtual bool IsFull() const = 0; Judging from the interface of this class, it seems a bit weird to have a IsFull() function without any way to specify the capacity in the constructor. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@55 PS3, Line 55: /// Returns false if the queue has been closed It'd be nice to document the expected behavior of various functions (e.g. AddBatch(), GetBatch()) after the queue has been closed. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h File be/src/util/blocking-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@195 PS3, Line 195: false if the queue has been closed, true otherwise. Returns true if the queue hasn't been shut down. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@196 PS3, Line 196: bool IsOpen() const { : return !shutdown_; : } nit: one line -- To view, visit http://gerrit.cloudera.org:8080/13883 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be Gerrit-Change-Number: 13883 Gerrit-PatchSet: 3 Gerrit-Owner: Sahil Takiar <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-Comment-Date: Tue, 23 Jul 2019 06:41:31 +0000 Gerrit-HasComments: Yes
