Sahil Takiar has posted comments on this change. ( http://gerrit.cloudera.org:8080/13883 )
Change subject: IMPALA-8656: Add RowBatchQueue interface and BufferedPlanRootSink impl ...................................................................... Patch Set 3: (4 comments) Just responding to a few of the higher level comments so we can nail down overall design first. Once we come to an agreement, will start working on the rest of the comments. > High level comment about the refactoring of the RowBatchQueue seems > unnecessary at this stage given the similarity of the two versions. It seems > sufficient to just add a TryAddBatch() interface to get most of the > functionality needed now but I could be missing something. Mentioned this in some of the other comments, but (1) as this patch stands, yes it might not be completely necessary, but will become more useful when we add a RowBatchQueue backed by a BufferedTupleStream, (2) there are various places in the code we buffer RowBatches, adding RowBatchQueue gives us a unified interface to use across the codebase, (3) the current RowBatchQueue is blocking, which won't work well in BufferedPlanRootSink since it needs additional locks to handle synchronization of its internal state (e.g. SenderState) and resources (e.g. MemTrackers that are closed in DataSink::Close). http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h File be/src/exec/buffered-plan-root-sink.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@29 PS3, Line 29: The blocking behavior follows : /// the same semantics as BlockingPlanRootSink. > Now that I look at it again, I wonder if "BlockingPlanRootSink" is just a B Yeah, it probably makes more sense once we change BufferedPlanRootSink to use a BufferedTupleStream. Another nice thing about keeping these two classes separate is that if there are bugs in the result spooling code, users can simply set SPOOL_QUERY_RESULTS = false and the code will fallback to BlockingPlanRootSink which is known to be stable. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/exec/buffered-plan-root-sink.h@42 PS3, Line 42: RowBatchQueue* batch_queue) > Why is this not internally owned by this class ? Why does it need to be pas Dependency injection. I'm not sure if this is the right way to do this in C++ though. https://en.wikipedia.org/wiki/Dependency_injection https://en.wikipedia.org/wiki/Dependency_injection#Constructor_injection http://gerrit.cloudera.org:8080/#/c/13883/4/be/src/exec/buffered-plan-root-sink.cc File be/src/exec/buffered-plan-root-sink.cc: http://gerrit.cloudera.org:8080/#/c/13883/4/be/src/exec/buffered-plan-root-sink.cc@45 PS4, Line 45: is_full_.Wait(l); > Looking at this code again, the thread will block when the queue is full an The existing BlockingQueue doesn't expose its lock, which makes the synchronization more difficult. You could have two locks: one owned by the BlockingQueue and one owned by the BufferedPlanRootSink, but that makes the synchronization logic more complex. The BufferedPlanRootSink lock is necessary in case the producer calls Close() while the consumer is calling QueryResultSet::AddRows, and its needed to protect read / writes of the SenderState. Plus, it makes the transition to BufferedTupleStream easier because BufferedTupleStream makes no thread-safety guarantees. http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h File be/src/runtime/non-blocking-row-batch-queue.h: http://gerrit.cloudera.org:8080/#/c/13883/3/be/src/runtime/non-blocking-row-batch-queue.h@32 PS3, Line 32: NonBlockingRowBatchQueue > If I understand this patch correctly, this queue also has a capacity but it Adding TryAddBatch() makes sense to me. Yes, as this patch stands now, the re-factoring for RowBatchQueue is probably overkill. Yes, the idea is that this will be most useful when adding the SpillableQueue (or whatever we decide to call it). However, I do think this cleans up the code a little bit. Currently there are several different ways we buffer RowBatches throughout the code, having a unified interface would make things easier to understand. Doing all the re-factoring to migrate other queues to the new RowBatchQueue is probably out of the scope of this patch, but seems useful to do in the future. The RowBatchQueue interface can also be useful for buffering in the KRPC sender / receiver. The NonBlockingRowBatchQueue could be used to replace the current RowBatchQueue in KrpcDataStreamRecvr::SenderQueue Non-blocking might not be the right name for this class (maybe DequeRowBatchQueue would be better since by default a std::queue uses std::deque). I'm attempting to follow a similar model to the JDK. Java models this using an interface called Queue with various implementations ArrayBlockingQueue (blocking), LinkedList (not-blocking), PriorityQueue (not-blocking), etc. https://docs.oracle.com/javase/8/docs/api/java/util/Queue.html -- To view, visit http://gerrit.cloudera.org:8080/13883 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be Gerrit-Change-Number: 13883 Gerrit-PatchSet: 3 Gerrit-Owner: Sahil Takiar <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Sahil Takiar <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-Comment-Date: Tue, 23 Jul 2019 18:16:41 +0000 Gerrit-HasComments: Yes
