Sahil Takiar has uploaded this change for review. ( http://gerrit.cloudera.org:8080/13883
Change subject: IMPALA-8656: Add RowBatchQueue interface and BufferedPlanRootSink impl ...................................................................... IMPALA-8656: Add RowBatchQueue interface and BufferedPlanRootSink impl Introduces a generic RowBatchQueue interface with a blocking and non-blocking implementation. The blocking implementation is a re-factored version of the current RowBatchQueue. The non-blocking implementation is simple wrapper around std::queue. The current RowBatchQueue, which is used by the scanners, is renamed to BlockingRowBatchQueue and it is a subclass of the new RowBatchQueue interface. This patch stops short of completely abstracting all the details of the current RowBatchQueue and instead includes a few TODOs. NonBlockingRowBatchQueue has max capacity, after which calls to AddBatch will return false. Implements BufferedPlanRootSink using the new RowBatchQueue interface. Currently, the NonBlockingRowBatchQueue is injected into the BufferedPlanRootSink, however, the implementation of BufferedPlanRootSink is not tied to NonBlockingRowBatchQueue, although it does assume the RowBatchQueue is not thread safe. This allows a future patch to add a RowBatchQueue backed by a BufferedTupleStream without re-factoring BufferedPlanRootSink. BufferedPlanRootSink FlushFinal blocks until the consumer thread has processed all RowBatches. This ensures that the coordinator fragment stays alive until all results are fetched, but allows all other fragments to be shutdown immediately. Testing: * Running core tests * Updated tests/query_test/test_result_spooling.py Follow up work: * Add a stress test in test_result_spooling.py to validate the synchronization logic in BufferedPlanRootSink * Handle Send calls where num_results < batch->num_rows() * Add a direct write path in Send that directly writes a RowBatch to a QueryResultSet, if one is available and if the RowBatchQueue is empty * Implement a RowBatchQueue backed by a BufferedTupleStream * Re-factor the resource management logic to release all non-coordinator fragment resources Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be --- M be/src/exec/blocking-plan-root-sink.cc M be/src/exec/blocking-plan-root-sink.h M be/src/exec/buffered-plan-root-sink.cc M be/src/exec/buffered-plan-root-sink.h M be/src/exec/data-sink.cc M be/src/exec/hdfs-scan-node.cc M be/src/exec/kudu-scan-node.cc M be/src/exec/plan-root-sink.cc M be/src/exec/plan-root-sink.h M be/src/exec/scan-node.cc M be/src/exec/scan-node.h M be/src/exec/scanner-context.cc M be/src/runtime/CMakeLists.txt R be/src/runtime/blocking-row-batch-queue.cc A be/src/runtime/blocking-row-batch-queue.h A be/src/runtime/non-blocking-row-batch-queue.cc A be/src/runtime/non-blocking-row-batch-queue.h M be/src/runtime/row-batch-queue.h M be/src/util/blocking-queue.h M tests/query_test/test_result_spooling.py 20 files changed, 464 insertions(+), 97 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/83/13883/1 -- To view, visit http://gerrit.cloudera.org:8080/13883 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be Gerrit-Change-Number: 13883 Gerrit-PatchSet: 1 Gerrit-Owner: Sahil Takiar <[email protected]>
