Sahil Takiar has posted comments on this change. ( http://gerrit.cloudera.org:8080/13883 )
Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp ...................................................................... Patch Set 3: (27 comments) http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/13883/3//COMMIT_MSG@9 PS3, Line 9: a blocking and : non-blocking implementation > It may help to spell out the exact difference in the two categories. Updated the commit message now that the changes to add the generic RowBatchQueue aren't going in this patch. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h File be/src/exec/buffered-plan-root-sink.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29 PS3, Line 29: The blocking behavior follows : /// the same semantics as BlockingPlanRootSink. > Yes, I guess at some point soon, we may want to consolidate on the implemen Done http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42 PS3, Line 42: RowBatchQueue* batch_queue) > Thanks for the link. I can see the advantage being that different users of Agreed. Changed so that BufferedPlanRootSink owns the given batch_queue. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@65 PS3, Line 65: boost::mutex > std::mutex. IIUC, the general direction is to move away from boost library. It seems that our ConditionVariables (util/condition-variable.h) are tied to boost::mutex-es. So I can't seem to use std::mutex here with some more re-factoring. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc File be/src/exec/buffered-plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.cc@38 PS3, Line 38: output_batch->AcquireState(batch); > This is not making a copy. This is actually transferring the ownership of t Replaced this with DeepCopyTo per Tim's suggestion. Note this will probably change when we move to the BufferedTupleStream implementation as a BTS does its own copy of the given batch. The call to DeepCopyTo is necessary when using a std::deque because the FragmentInstanceState owns the given RowBatch. http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc File be/src/exec/buffered-plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/buffered-plan-root-sink.cc@38 PS7, Line 38: output_batch->AcquireState(batch); > AcquireState isn't safe to use here - it doesn't copy into fresh memory, ju Thanks for the info Tim. Replaced this DeepCopyTo. I think BufferedTupleStream is already doing something like DeepCopyTo, correct? Currently, the copy is necessary because FragmentInstanceState owns the given RowBatch. However, once we migrate to the BTS queue, the call to DeepCopyTo shouldn't be necessary, because BTS already does a copy, right? http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc File be/src/exec/data-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/data-sink.cc@114 PS3, Line 114: new NonBlockingRowBatchQueue(10)) > Who owns this object ? It doesn't look like it's being added to pool. We us Fixed so that BufferedPlanRootSink owns and manages the lifecycle of the queue; the queue is managed be a unique_ptr. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h File be/src/exec/plan-root-sink.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/plan-root-sink.h@61 PS3, Line 61: = 0; > Is it still valid in this patch ? Given PlanRootSink::Send() is not pure vi Per Tim's suggestion. I moved the PlanRootSink::Send implementation to a new helper method call ValidateRowBatch. http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc File be/src/exec/plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/7/be/src/exec/plan-root-sink.cc@64 PS7, Line 64: Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { > I'd find it clearer if this was a helper function instead of the default im Makes sense, moved this to a new method called ValidateRowBatch. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc File be/src/exec/scan-node.cc: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/scan-node.cc@315 PS3, Line 315: Shutdown(); > Close(); Now that I removed the generic RowBatchQueue interface from this patch, BlockingRowBatchQueue has a Shutdown() method. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h File be/src/runtime/blocking-row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@54 PS3, Line 54: public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>, : public RowBatchQueue { > Not a big fan of multi-inheritance. Now that I removed the generic RowBatchQueue interface, this class no longer inherits from multiple classes. However, I still made the necessary changes so that BlockingRowBatchQueue owns a BlockingQueue rather than extends one. I think the rule of composition over inheritance works well here, and it allows us to add a generic RowBatchQueue interface later. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@67 PS3, Line 67: /// Adds a batch to the queue. This is blocking if the queue is full. > Please also document the return value. No longer returns a 'bool' because the usage doesn't require one (it would always return true because any batches that can't be added to the BlockingQueue are added to the cleanup queue). http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@70 PS3, Line 70: Returns NULL if there are no more. : /// This function blocks. : /// Returns NULL after Shutdown() > Did you mean "This function will block if the queue is empty and returns NU Didn't write this, but yes I think that is what the comments mean. Updated them to make it more clear. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@75 PS3, Line 75: underlying BlockingQueue > queue Done http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@79 PS3, Line 79: full > typo Deleted IsEmpty because its not actually used by any users of BlockingRowBatchQueue. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/blocking-row-batch-queue.h@100 PS3, Line 100: /// Queue of orphaned row batches > Not your change but it seems clearer to say "queue of orphaned row batches Done http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h File be/src/runtime/non-blocking-row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@28 PS3, Line 28: A RowBatchQueue that stores batches in a std::queue. > Instead of mentioning the implementation details, please document the diffe I updated the docs a bit to make it a bit more clearer, but decided to leave in the mention of std::deque usage. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@32 PS3, Line 32: NonBlockingRowBatchQueue > As discussed offline, we may be able to get away with TryPutBatch() only. I After playing around with a few things, here is what I decided to do: * I deleted the generic RowBatchQueue interface, I agree it is probably not necessary at the moment and may be more useful once a SpillableQueue is used * I kept most of the re-factoring of the old RowBatchQueue (the one used by the scanners) because (1) the cleanup seems generally useful, and (2) it more easily allows future commits to introduce a generic RowBatchQueue interface * I renamed NonBlockingRowBatchQueue to DequeRowBatchQueue because I think the name is clearer * BufferedPlanRootSink uses DequeRowBatchQueue directly, the class is generic enough that I think replacing it with SpillableQueue later should be straightforward * I considered using BlockingRowBatchQueue in BufferedPlanRootSink because decided not to for the following reasons: (1) BufferedPlanRootSink will use SpillableQueue eventually any so it doesn't seem worth to invest the effort in getting BlockingRowBatchQueue to work with BufferedPlanRootSink if its just going to be throw away work, (2) getting BlockingRowBatchQueue to work in BufferedPlanRootSink requires re-writing a lot of the locking logic in BufferedPlanRootSink, (4) BlockingRowBatchQueue contains a lot of scanner specific semantics (the RowBatchBytesFn, the cleanup queue), (5) BlockingRowBatchQueue would need a thread-safe Cleanup() method (can't be added to BlockingQueue because Cleanup() requires calling Reset on all the RowBatch-es, so we would need to add additional locks to guard against calling Cleanup(), Add/GetBatch concurrently). http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@40 PS3, Line 40: /// Adds the given RowBatch to the queue. > Please document return value for most functions unless it's very trivial. I Done http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@43 PS3, Line 43: /// Returns and removes the RowBatch at the head of the queue. > Please document the behavior when the queue is empty. Done http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h File be/src/runtime/row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@26 PS3, Line 26: This interface makes no : /// thread-safety guarantees and different implementations might have different : /// thread-safety semantics. > This assumption seems to make this interface somewhat hard to use if thread Deleted this class. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@38 PS3, Line 38: /// Prepares the queue so that batches can be added. > Is Init() a better name ? Also, this must be called before adding any rows Deleted this class, but BlockingRowBatchQueue has a Prepare method that I changed to Init. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@42 PS3, Line 42: returns false if the queue is full > Please see comments below about the queue being full. Also, this comment se Deleted this class, but documented the ownership change of the batch in DequeRowBatchQueue. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@52 PS3, Line 52: /// Returns true if the queue is full, false otherwise. : virtual bool IsFull() const = 0; > Judging from the interface of this class, it seems a bit weird to have a Is Deleted this class, both DequeRowBatchQueue and BlockingRowBatchQueue has constructors that specify capacity limits. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/row-batch-queue.h@55 PS3, Line 55: /// Returns false if the queue has been closed > It'd be nice to document the expected behavior of various functions (e.g. A Deleted this class, but clarified this in DequeRowBatchQueue. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h File be/src/util/blocking-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@195 PS3, Line 195: false if the queue has been closed, true otherwise. > Returns true if the queue hasn't been shut down. Deleted this since its no longer necessary. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/util/blocking-queue.h@196 PS3, Line 196: bool IsOpen() const { : return !shutdown_; : } > nit: one line Done -- To view, visit http://gerrit.cloudera.org:8080/13883 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be Gerrit-Change-Number: 13883 Gerrit-PatchSet: 3 Gerrit-Owner: Sahil Takiar <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Sahil Takiar <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-Comment-Date: Wed, 24 Jul 2019 19:51:49 +0000 Gerrit-HasComments: Yes
