[9/9] impala git commit: Revert IMPALA-4835 and dependent changes
Revert IMPALA-4835 and dependent changes Revert "IMPALA-6585: increase test_low_mem_limit_q21 limit" This reverts commit 25bcb258dfd712f1514cf188206667a5e6be0e26. Revert "IMPALA-6588: don't add empty list of ranges in text scan" This reverts commit d57fbec6f67b83227b4c6117476da8f7d75fc4f6. Revert "IMPALA-4835: Part 3: switch I/O buffers to buffer pool" This reverts commit 24b4ed0b29a44090350e630d625291c01b753a36. Revert "IMPALA-4835: Part 2: Allocate scan range buffers upfront" This reverts commit 5699b59d0c5cbe37e888a367adb42fa12dfb0916. Revert "IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation" This reverts commit 65680dc42107db4ff2273c635cedf83d20f0ea94. Change-Id: Ie5ca451cd96602886b0a8ecaa846957df0269cbb Reviewed-on: http://gerrit.cloudera.org:8080/9480 Reviewed-by: Dan HechtTested-by: Impala Public Jenkins Reviewed-on: http://gerrit.cloudera.org:8080/9485 Reviewed-by: Tim Armstrong Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e5689fb5 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e5689fb5 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e5689fb5 Branch: refs/heads/2.x Commit: e5689fb5c6a011af9e526effa9aa5399b194e977 Parents: 42c604e Author: Tim Armstrong Authored: Fri Mar 2 16:09:25 2018 -0800 Committer: Impala Public Jenkins Committed: Sat Mar 3 22:07:15 2018 + -- be/src/exec/CMakeLists.txt |1 - be/src/exec/base-sequence-scanner.cc|1 - be/src/exec/base-sequence-scanner.h |3 +- be/src/exec/hdfs-lzo-text-scanner.cc|1 - be/src/exec/hdfs-parquet-scanner-test.cc| 96 - be/src/exec/hdfs-parquet-scanner.cc | 234 +-- be/src/exec/hdfs-parquet-scanner.h | 44 +- be/src/exec/hdfs-scan-node-base.cc | 95 +- be/src/exec/hdfs-scan-node-base.h |3 - be/src/exec/hdfs-scan-node-mt.cc| 20 +- be/src/exec/hdfs-scan-node.cc | 172 +- be/src/exec/hdfs-scan-node.h| 66 +- be/src/exec/hdfs-text-scanner.cc|6 +- be/src/exec/parquet-column-readers.cc | 108 +- be/src/exec/parquet-column-readers.h| 88 +- be/src/exec/scanner-context.cc | 42 +- be/src/exec/scanner-context.h | 54 +- be/src/runtime/bufferpool/buffer-pool.h |1 - .../bufferpool/reservation-tracker-test.cc |8 +- be/src/runtime/bufferpool/reservation-util.cc |2 +- be/src/runtime/exec-env.cc |7 +- be/src/runtime/io/disk-io-mgr-internal.h| 16 - be/src/runtime/io/disk-io-mgr-stress-test.cc| 43 +- be/src/runtime/io/disk-io-mgr-stress.cc | 89 +- be/src/runtime/io/disk-io-mgr-stress.h | 26 +- be/src/runtime/io/disk-io-mgr-test.cc | 849 be/src/runtime/io/disk-io-mgr.cc| 716 +-- be/src/runtime/io/disk-io-mgr.h | 383 ++-- be/src/runtime/io/request-context.cc| 239 +-- be/src/runtime/io/request-context.h | 318 ++- be/src/runtime/io/request-ranges.h | 196 +- be/src/runtime/io/scan-range.cc | 309 ++- be/src/runtime/mem-tracker.h|1 + be/src/runtime/test-env.cc |2 +- be/src/runtime/tmp-file-mgr-test.cc |3 +- be/src/runtime/tmp-file-mgr.cc | 18 +- be/src/runtime/tmp-file-mgr.h | 17 +- be/src/util/bit-util-test.cc| 11 - be/src/util/bit-util.h |8 +- be/src/util/impalad-metrics.cc | 13 +- be/src/util/impalad-metrics.h |9 + common/thrift/PlanNodes.thrift |3 - .../apache/impala/analysis/SlotDescriptor.java | 19 - .../org/apache/impala/analysis/SlotRef.java | 20 + .../org/apache/impala/planner/HdfsScanNode.java | 167 +- .../java/org/apache/impala/util/BitUtil.java|6 - .../org/apache/impala/util/BitUtilTest.java |6 - .../queries/PlannerTest/constant-folding.test | 42 +- .../queries/PlannerTest/disable-codegen.test| 20 +- .../PlannerTest/fk-pk-join-detection.test | 78 +- .../queries/PlannerTest/max-row-size.test | 80 +- .../PlannerTest/min-max-runtime-filters.test|6 +- .../queries/PlannerTest/mt-dop-validation.test | 40 +- .../queries/PlannerTest/parquet-filtering.test | 42 +- .../queries/PlannerTest/partition-pruning.test |4 +- .../PlannerTest/resource-requirements.test | 1814 -- .../PlannerTest/sort-expr-materialization.test | 32 +-
[7/9] impala git commit: Revert IMPALA-4835 and dependent changes
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr-test.cc -- diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 3a0f727..b03ec31 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -22,16 +22,12 @@ #include "codegen/llvm-codegen.h" #include "common/init.h" -#include "runtime/bufferpool/buffer-pool.h" -#include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/io/request-context.h" #include "runtime/io/disk-io-mgr-stress.h" #include "runtime/io/disk-io-mgr.h" -#include "runtime/io/request-context.h" -#include "runtime/test-env.h" +#include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" -#include "service/fe-support.h" #include "testutil/gtest-util.h" -#include "testutil/rand-util.h" #include "util/condition-variable.h" #include "util/cpu-info.h" #include "util/disk-info.h" @@ -39,20 +35,13 @@ #include "common/names.h" -using std::mt19937; -using std::uniform_int_distribution; -using std::uniform_real_distribution; - -DECLARE_int64(min_buffer_size); DECLARE_int32(num_remote_hdfs_io_threads); DECLARE_int32(num_s3_io_threads); DECLARE_int32(num_adls_io_threads); -const int MIN_BUFFER_SIZE = 128; +const int MIN_BUFFER_SIZE = 512; const int MAX_BUFFER_SIZE = 1024; -const int64_t LARGE_RESERVATION_LIMIT = 4L * 1024L * 1024L * 1024L; -const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 1024L; -const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT; +const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024; namespace impala { namespace io { @@ -60,41 +49,14 @@ namespace io { class DiskIoMgrTest : public testing::Test { public: - virtual void SetUp() { -test_env_.reset(new TestEnv); -// Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it. -test_env_->SetBufferPoolArgs(1, BUFFER_POOL_CAPACITY); -ASSERT_OK(test_env_->Init()); -RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", _); - } + virtual void SetUp() {} virtual void TearDown() { -root_reservation_.Close(); pool_.Clear(); -test_env_.reset(); - } - - /// Initialises 'root_reservation_'. The reservation is automatically closed in - /// TearDown(). - void InitRootReservation(int64_t reservation_limit) { -root_reservation_.InitRootTracker( -RuntimeProfile::Create(_, "root"), reservation_limit); } - - /// Initialise 'client' with the given reservation limit. The client reservation is a - /// child of 'root_reservation_'. - void RegisterBufferPoolClient(int64_t reservation_limit, int64_t initial_reservation, - BufferPool::ClientHandle* client) { -ASSERT_OK(buffer_pool()->RegisterClient("", nullptr, _reservation_, nullptr, -reservation_limit, RuntimeProfile::Create(_, ""), client)); -if (initial_reservation > 0) { - ASSERT_TRUE(client->IncreaseReservation(initial_reservation)); -} - } - void WriteValidateCallback(int num_writes, WriteRange** written_range, - DiskIoMgr* io_mgr, RequestContext* reader, BufferPool::ClientHandle* client, - int32_t* data, Status expected_status, const Status& status) { + DiskIoMgr* io_mgr, RequestContext* reader, int32_t* data, + Status expected_status, const Status& status) { if (expected_status.code() == TErrorCode::CANCELLED) { EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail(); } else { @@ -104,8 +66,8 @@ class DiskIoMgrTest : public testing::Test { ScanRange* scan_range = pool_.Add(new ScanRange()); scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(), (*written_range)->offset(), 0, false, BufferOpts::Uncached()); - ValidateSyncRead(io_mgr, reader, client, scan_range, - reinterpret_cast(data), sizeof(int32_t)); + ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast(data), + sizeof(int32_t)); } { @@ -126,13 +88,9 @@ class DiskIoMgrTest : public testing::Test { protected: void CreateTempFile(const char* filename, const char* data) { -CreateTempFile(filename, data, strlen(data)); - } - - void CreateTempFile(const char* filename, const char* data, int64_t data_bytes) { FILE* file = fopen(filename, "w"); EXPECT_TRUE(file != nullptr); -fwrite(data, 1, data_bytes, file); +fwrite(data, 1, strlen(data), file); fclose(file); } @@ -157,22 +115,15 @@ class DiskIoMgrTest : public testing::Test { } static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader, - BufferPool::ClientHandle* client, ScanRange* range, const char* expected, - int expected_len = -1) { + ScanRange* range, const char* expected, int expected_len = -1) { unique_ptr buffer; -bool needs_buffers; -ASSERT_OK(io_mgr->StartScanRange(reader, range,
[6/9] impala git commit: Revert IMPALA-4835 and dependent changes
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr.cc -- diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 0d2afe2..8ff6609 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/io/disk-io-mgr.h" - #include "common/global-flags.h" -#include "runtime/exec-env.h" +#include "runtime/io/disk-io-mgr.h" #include "runtime/io/disk-io-mgr-internal.h" #include "runtime/io/handle-cache.inline.h" @@ -54,8 +52,6 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk"); static const int THREADS_PER_ROTATIONAL_DISK = 1; static const int THREADS_PER_SOLID_STATE_DISK = 8; -const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; - // The maximum number of the threads per rotational disk is also the max queue depth per // rotational disk. static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of " @@ -126,6 +122,13 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds, DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used by the " "file handle cache."); +// The IoMgr is able to run with a wide range of memory usage. If a query has memory +// remaining less than this value, the IoMgr will stop all buffering regardless of the +// current queue size. +static const int LOW_MEMORY = 64 * 1024 * 1024; + +const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT; + AtomicInt32 DiskIoMgr::next_disk_id_; namespace detail { @@ -152,6 +155,34 @@ string DiskIoMgr::DebugString() { return ss.str(); } +BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, +RequestContext* reader, ScanRange* scan_range, uint8_t* buffer, +int64_t buffer_len, MemTracker* mem_tracker) + : io_mgr_(io_mgr), +reader_(reader), +mem_tracker_(mem_tracker), +scan_range_(scan_range), +buffer_(buffer), +buffer_len_(buffer_len) { + DCHECK(io_mgr != nullptr); + DCHECK(scan_range != nullptr); + DCHECK(buffer != nullptr); + DCHECK_GE(buffer_len, 0); + DCHECK_NE(scan_range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER, + mem_tracker == nullptr); +} + +void BufferDescriptor::TransferOwnership(MemTracker* dst) { + DCHECK(dst != nullptr); + DCHECK(!is_client_buffer()); + // Memory of cached buffers is not tracked against a tracker. + if (is_cached()) return; + DCHECK(mem_tracker_ != nullptr); + dst->Consume(buffer_len_); + mem_tracker_->Release(buffer_len_); + mem_tracker_ = dst; +} + WriteRange::WriteRange( const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback) : RequestRange(RequestType::WRITE), callback_(callback) { @@ -192,8 +223,8 @@ DiskIoMgr::DiskIoMgr() : num_io_threads_per_solid_state_disk_(GetFirstPositiveVal( FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk, THREADS_PER_SOLID_STATE_DISK)), -max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)), -min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)), +max_buffer_size_(FLAGS_read_size), +min_buffer_size_(FLAGS_min_buffer_size), shut_down_(false), total_bytes_read_counter_(TUnit::BYTES), read_timer_(TUnit::TIME_NS), @@ -202,6 +233,8 @@ DiskIoMgr::DiskIoMgr() : FLAGS_num_file_handle_cache_partitions, FLAGS_unused_file_handle_timeout_sec) { DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size); + int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_); + free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1); int num_local_disks = DiskInfo::num_disks(); if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) { LOG(WARNING) << "Number of disks specified should be between 0 and the number of " @@ -215,11 +248,11 @@ DiskIoMgr::DiskIoMgr() : } DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk, -int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size) : +int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size) : num_io_threads_per_rotational_disk_(threads_per_rotational_disk), num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk), -max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)), -min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)), +max_buffer_size_(max_buffer_size), +min_buffer_size_(min_buffer_size), shut_down_(false), total_bytes_read_counter_(TUnit::BYTES), read_timer_(TUnit::TIME_NS), @@ -227,6 +260,8 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk, FileSystemUtil::MaxNumFileHandles()),
[4/9] impala git commit: Revert IMPALA-4835 and dependent changes
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/fe/src/main/java/org/apache/impala/analysis/SlotRef.java -- diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java index 0a945bd..23f2d88 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java @@ -153,6 +153,26 @@ public class SlotRef extends Expr { return ""; } + /** + * Checks if this slotRef refers to an array "pos" pseudo-column. + * + * Note: checking whether the column is null distinguishes between top-level columns + * and nested types. This check more specifically looks just for a reference to the + * "pos" field of an array type. + */ + public boolean isArrayPosRef() { +TupleDescriptor parent = getDesc().getParent(); +if (parent == null) return false; +Type parentType = parent.getType(); +if (parentType instanceof CollectionStructType) { + if (((CollectionStructType)parentType).isArrayStruct() && + getDesc().getLabel().equals(Path.ARRAY_POS_FIELD_NAME)) { +return true; + } +} +return false; + } + @Override protected void toThrift(TExprNode msg) { msg.node_type = TExprNodeType.SLOT_REF; http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java -- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 87d1806..7735f98 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -59,7 +59,6 @@ import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.InternalException; import org.apache.impala.common.NotImplementedException; -import org.apache.impala.common.Pair; import org.apache.impala.common.PrintUtils; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.fb.FbFileBlock; @@ -77,7 +76,6 @@ import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocation; import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.impala.thrift.TTableStats; -import org.apache.impala.util.BitUtil; import org.apache.impala.util.MembershipSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,7 +143,7 @@ public class HdfsScanNode extends ScanNode { // derived experimentally: running metadata-only Parquet count(*) scans on TPC-H // lineitem and TPC-DS store_sales of different sizes resulted in memory consumption // between 128kb and 1.1mb. - private static final long MIN_MEMORY_ESTIMATE = 1L * 1024L * 1024L; + private final static long MIN_MEMORY_ESTIMATE = 1 * 1024 * 1024; private final HdfsTable tbl_; @@ -168,18 +166,6 @@ public class HdfsScanNode extends ScanNode { private long totalFiles_ = 0; private long totalBytes_ = 0; - // File formats scanned. Set in computeScanRangeLocations(). - private Set fileFormats_; - - // Number of bytes in the largest scan range (i.e. hdfs split). Set in - // computeScanRangeLocations(). - private long maxScanRangeBytes_ = 0; - - // The ideal reservation to process a single scan range (i.e. hdfs split), >= the - // minimum reservation. Generally provides enough memory to overlap CPU and I/O and - // maximize throughput. Set in computeResourceProfile(). - private long idealScanRangeReservation_ = -1; - // Input cardinality based on the partition row counts or extrapolation. -1 if invalid. // Both values can be valid to report them in the explain plan, but only one of them is // used for determining the scan cardinality. @@ -343,25 +329,25 @@ public class HdfsScanNode extends ScanNode { computeDictionaryFilterConjuncts(analyzer); // compute scan range locations with optional sampling -computeScanRangeLocations(analyzer); +Set fileFormats = computeScanRangeLocations(analyzer); // Determine backend scan node implementation to use. The optimized MT implementation // is currently only supported for Parquet. if (analyzer.getQueryOptions().isSetMt_dop() && analyzer.getQueryOptions().mt_dop > 0 && -fileFormats_.size() == 1 && -(fileFormats_.contains(HdfsFileFormat.PARQUET) - || fileFormats_.contains(HdfsFileFormat.TEXT))) { +fileFormats.size() == 1 && +(fileFormats.contains(HdfsFileFormat.PARQUET) + || fileFormats.contains(HdfsFileFormat.TEXT))) { useMtScanNode_ = true; } else { useMtScanNode_ = false; } -if (fileFormats_.contains(HdfsFileFormat.PARQUET)) { +if
[8/9] impala git commit: Revert IMPALA-4835 and dependent changes
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/scanner-context.cc -- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index c669e65..abdde07 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -41,15 +41,14 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024; const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT; ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, -BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc, -const vector& filter_ctxs, -MemPool* expr_results_pool) +HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range, +const vector& filter_ctxs, MemPool* expr_results_pool) : state_(state), scan_node_(scan_node), -bp_client_(bp_client), partition_desc_(partition_desc), filter_ctxs_(filter_ctxs), expr_results_pool_(expr_results_pool) { + AddStream(scan_range); } ScannerContext::~ScannerContext() { @@ -67,20 +66,19 @@ void ScannerContext::ClearStreams() { } ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range, -int64_t reservation, const HdfsFileDesc* file_desc) +const HdfsFileDesc* file_desc) : parent_(parent), scan_range_(scan_range), file_desc_(file_desc), -reservation_(reservation), file_len_(file_desc->file_length), next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES), boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())), boundary_buffer_(new StringBuffer(boundary_pool_.get())) { } -ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) { - streams_.emplace_back(new Stream(this, range, reservation, - scan_node_->GetFileDesc(partition_desc_->id(), range->file(; +ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) { + streams_.emplace_back(new Stream( + this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file(; return streams_.back().get(); } @@ -103,7 +101,6 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) { Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { DCHECK_EQ(0, io_buffer_bytes_left_); - DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED; if (io_buffer_ != nullptr) ReturnIoBuffer(); @@ -124,7 +121,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { SCOPED_TIMER(parent_->state_->total_storage_wait_timer()); int64_t read_past_buffer_size = 0; -int64_t max_buffer_size = io_mgr->max_buffer_size(); +int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size(); if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset); if (read_past_buffer_size <= 0) { // Either no callback was set or the callback did not return an estimate. Use @@ -136,7 +133,6 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { read_past_buffer_size = ::max(read_past_buffer_size, read_past_size); read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining); read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size); -read_past_buffer_size = ::min(read_past_buffer_size, reservation_); // We're reading past the scan range. Be careful not to read past the end of file. DCHECK_GE(read_past_buffer_size, 0); if (read_past_buffer_size == 0) { @@ -147,23 +143,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { ScanRange* range = parent_->scan_node_->AllocateScanRange( scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id, scan_range_->disk_id(), false, BufferOpts::Uncached()); -bool needs_buffers; -RETURN_IF_ERROR(io_mgr->StartScanRange( -parent_->scan_node_->reader_context(), range, _buffers)); -if (needs_buffers) { - // Allocate fresh buffers. The buffers for 'scan_range_' should be released now - // since we hit EOS. - if (reservation_ < io_mgr->min_buffer_size()) { -return Status(Substitute("Could not read past end of scan range in file '$0'. " -"Reservation provided $1 was < the minimum I/O buffer size", -reservation_, io_mgr->min_buffer_size())); - } - RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange( - parent_->scan_node_->reader_context(), parent_->bp_client_, range, - reservation_)); -} -RETURN_IF_ERROR(range->GetNext(_buffer_)); -DCHECK(io_buffer_->eosr()); +RETURN_IF_ERROR(parent_->state_->io_mgr()->Read( +parent_->scan_node_->reader_context(), range, _buffer_)); } DCHECK(io_buffer_ != nullptr); @@ -343,8 +324,7 @@ Status