[9/9] impala git commit: Revert IMPALA-4835 and dependent changes

2018-03-03 Thread lv
Revert IMPALA-4835 and dependent changes

Revert "IMPALA-6585: increase test_low_mem_limit_q21 limit"

This reverts commit 25bcb258dfd712f1514cf188206667a5e6be0e26.

Revert "IMPALA-6588: don't add empty list of ranges in text scan"

This reverts commit d57fbec6f67b83227b4c6117476da8f7d75fc4f6.

Revert "IMPALA-4835: Part 3: switch I/O buffers to buffer pool"

This reverts commit 24b4ed0b29a44090350e630d625291c01b753a36.

Revert "IMPALA-4835: Part 2: Allocate scan range buffers upfront"

This reverts commit 5699b59d0c5cbe37e888a367adb42fa12dfb0916.

Revert "IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation"

This reverts commit 65680dc42107db4ff2273c635cedf83d20f0ea94.

Change-Id: Ie5ca451cd96602886b0a8ecaa846957df0269cbb
Reviewed-on: http://gerrit.cloudera.org:8080/9480
Reviewed-by: Dan Hecht 
Tested-by: Impala Public Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/9485
Reviewed-by: Tim Armstrong 


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e5689fb5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e5689fb5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e5689fb5

Branch: refs/heads/2.x
Commit: e5689fb5c6a011af9e526effa9aa5399b194e977
Parents: 42c604e
Author: Tim Armstrong 
Authored: Fri Mar 2 16:09:25 2018 -0800
Committer: Impala Public Jenkins 
Committed: Sat Mar 3 22:07:15 2018 +

--
 be/src/exec/CMakeLists.txt  |1 -
 be/src/exec/base-sequence-scanner.cc|1 -
 be/src/exec/base-sequence-scanner.h |3 +-
 be/src/exec/hdfs-lzo-text-scanner.cc|1 -
 be/src/exec/hdfs-parquet-scanner-test.cc|   96 -
 be/src/exec/hdfs-parquet-scanner.cc |  234 +--
 be/src/exec/hdfs-parquet-scanner.h  |   44 +-
 be/src/exec/hdfs-scan-node-base.cc  |   95 +-
 be/src/exec/hdfs-scan-node-base.h   |3 -
 be/src/exec/hdfs-scan-node-mt.cc|   20 +-
 be/src/exec/hdfs-scan-node.cc   |  172 +-
 be/src/exec/hdfs-scan-node.h|   66 +-
 be/src/exec/hdfs-text-scanner.cc|6 +-
 be/src/exec/parquet-column-readers.cc   |  108 +-
 be/src/exec/parquet-column-readers.h|   88 +-
 be/src/exec/scanner-context.cc  |   42 +-
 be/src/exec/scanner-context.h   |   54 +-
 be/src/runtime/bufferpool/buffer-pool.h |1 -
 .../bufferpool/reservation-tracker-test.cc  |8 +-
 be/src/runtime/bufferpool/reservation-util.cc   |2 +-
 be/src/runtime/exec-env.cc  |7 +-
 be/src/runtime/io/disk-io-mgr-internal.h|   16 -
 be/src/runtime/io/disk-io-mgr-stress-test.cc|   43 +-
 be/src/runtime/io/disk-io-mgr-stress.cc |   89 +-
 be/src/runtime/io/disk-io-mgr-stress.h  |   26 +-
 be/src/runtime/io/disk-io-mgr-test.cc   |  849 
 be/src/runtime/io/disk-io-mgr.cc|  716 +--
 be/src/runtime/io/disk-io-mgr.h |  383 ++--
 be/src/runtime/io/request-context.cc|  239 +--
 be/src/runtime/io/request-context.h |  318 ++-
 be/src/runtime/io/request-ranges.h  |  196 +-
 be/src/runtime/io/scan-range.cc |  309 ++-
 be/src/runtime/mem-tracker.h|1 +
 be/src/runtime/test-env.cc  |2 +-
 be/src/runtime/tmp-file-mgr-test.cc |3 +-
 be/src/runtime/tmp-file-mgr.cc  |   18 +-
 be/src/runtime/tmp-file-mgr.h   |   17 +-
 be/src/util/bit-util-test.cc|   11 -
 be/src/util/bit-util.h  |8 +-
 be/src/util/impalad-metrics.cc  |   13 +-
 be/src/util/impalad-metrics.h   |9 +
 common/thrift/PlanNodes.thrift  |3 -
 .../apache/impala/analysis/SlotDescriptor.java  |   19 -
 .../org/apache/impala/analysis/SlotRef.java |   20 +
 .../org/apache/impala/planner/HdfsScanNode.java |  167 +-
 .../java/org/apache/impala/util/BitUtil.java|6 -
 .../org/apache/impala/util/BitUtilTest.java |6 -
 .../queries/PlannerTest/constant-folding.test   |   42 +-
 .../queries/PlannerTest/disable-codegen.test|   20 +-
 .../PlannerTest/fk-pk-join-detection.test   |   78 +-
 .../queries/PlannerTest/max-row-size.test   |   80 +-
 .../PlannerTest/min-max-runtime-filters.test|6 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   40 +-
 .../queries/PlannerTest/parquet-filtering.test  |   42 +-
 .../queries/PlannerTest/partition-pruning.test  |4 +-
 .../PlannerTest/resource-requirements.test  | 1814 --
 .../PlannerTest/sort-expr-materialization.test  |   32 +-
 

[7/9] impala git commit: Revert IMPALA-4835 and dependent changes

2018-03-03 Thread lv
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr-test.cc
--
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc 
b/be/src/runtime/io/disk-io-mgr-test.cc
index 3a0f727..b03ec31 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -22,16 +22,12 @@
 
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
-#include "runtime/bufferpool/buffer-pool.h"
-#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/io/request-context.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
-#include "runtime/io/request-context.h"
-#include "runtime/test-env.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
-#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
-#include "testutil/rand-util.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
@@ -39,20 +35,13 @@
 
 #include "common/names.h"
 
-using std::mt19937;
-using std::uniform_int_distribution;
-using std::uniform_real_distribution;
-
-DECLARE_int64(min_buffer_size);
 DECLARE_int32(num_remote_hdfs_io_threads);
 DECLARE_int32(num_s3_io_threads);
 DECLARE_int32(num_adls_io_threads);
 
-const int MIN_BUFFER_SIZE = 128;
+const int MIN_BUFFER_SIZE = 512;
 const int MAX_BUFFER_SIZE = 1024;
-const int64_t LARGE_RESERVATION_LIMIT = 4L * 1024L * 1024L * 1024L;
-const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 1024L;
-const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT;
+const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
 
 namespace impala {
 namespace io {
@@ -60,41 +49,14 @@ namespace io {
 class DiskIoMgrTest : public testing::Test {
  public:
 
-  virtual void SetUp() {
-test_env_.reset(new TestEnv);
-// Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool 
allows it.
-test_env_->SetBufferPoolArgs(1, BUFFER_POOL_CAPACITY);
-ASSERT_OK(test_env_->Init());
-RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", _);
-  }
+  virtual void SetUp() {}
 
   virtual void TearDown() {
-root_reservation_.Close();
 pool_.Clear();
-test_env_.reset();
-  }
-
-  /// Initialises 'root_reservation_'. The reservation is automatically closed 
in
-  /// TearDown().
-  void InitRootReservation(int64_t reservation_limit) {
-root_reservation_.InitRootTracker(
-RuntimeProfile::Create(_, "root"), reservation_limit);
   }
-
-  /// Initialise 'client' with the given reservation limit. The client 
reservation is a
-  /// child of 'root_reservation_'.
-  void RegisterBufferPoolClient(int64_t reservation_limit, int64_t 
initial_reservation,
-  BufferPool::ClientHandle* client) {
-ASSERT_OK(buffer_pool()->RegisterClient("", nullptr, _reservation_, 
nullptr,
-reservation_limit, RuntimeProfile::Create(_, ""), client));
-if (initial_reservation > 0) {
-  ASSERT_TRUE(client->IncreaseReservation(initial_reservation));
-}
-  }
-
   void WriteValidateCallback(int num_writes, WriteRange** written_range,
-  DiskIoMgr* io_mgr, RequestContext* reader, BufferPool::ClientHandle* 
client,
-  int32_t* data, Status expected_status, const Status& status) {
+  DiskIoMgr* io_mgr, RequestContext* reader, int32_t* data,
+  Status expected_status, const Status& status) {
 if (expected_status.code() == TErrorCode::CANCELLED) {
   EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << 
status.GetDetail();
 } else {
@@ -104,8 +66,8 @@ class DiskIoMgrTest : public testing::Test {
   ScanRange* scan_range = pool_.Add(new ScanRange());
   scan_range->Reset(nullptr, (*written_range)->file(), 
(*written_range)->len(),
   (*written_range)->offset(), 0, false, BufferOpts::Uncached());
-  ValidateSyncRead(io_mgr, reader, client, scan_range,
-  reinterpret_cast(data), sizeof(int32_t));
+  ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast(data),
+  sizeof(int32_t));
 }
 
 {
@@ -126,13 +88,9 @@ class DiskIoMgrTest : public testing::Test {
 
  protected:
   void CreateTempFile(const char* filename, const char* data) {
-CreateTempFile(filename, data, strlen(data));
-  }
-
-  void CreateTempFile(const char* filename, const char* data, int64_t 
data_bytes) {
 FILE* file = fopen(filename, "w");
 EXPECT_TRUE(file != nullptr);
-fwrite(data, 1, data_bytes, file);
+fwrite(data, 1, strlen(data), file);
 fclose(file);
   }
 
@@ -157,22 +115,15 @@ class DiskIoMgrTest : public testing::Test {
   }
 
   static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
-  BufferPool::ClientHandle* client, ScanRange* range, const char* expected,
-  int expected_len = -1) {
+  ScanRange* range, const char* expected, int expected_len = -1) {
 unique_ptr buffer;
-bool needs_buffers;
-ASSERT_OK(io_mgr->StartScanRange(reader, range, 

[6/9] impala git commit: Revert IMPALA-4835 and dependent changes

2018-03-03 Thread lv
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr.cc
--
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 0d2afe2..8ff6609 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -15,10 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/io/disk-io-mgr.h"
-
 #include "common/global-flags.h"
-#include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/handle-cache.inline.h"
 
@@ -54,8 +52,6 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads 
per disk");
 static const int THREADS_PER_ROTATIONAL_DISK = 1;
 static const int THREADS_PER_SOLID_STATE_DISK = 8;
 
-const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE;
-
 // The maximum number of the threads per rotational disk is also the max queue 
depth per
 // rotational disk.
 static const string num_io_threads_per_rotational_disk_help_msg = 
Substitute("Number of "
@@ -126,6 +122,13 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, 
"Maximum time, in seconds,
 DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used 
by the "
 "file handle cache.");
 
+// The IoMgr is able to run with a wide range of memory usage. If a query has 
memory
+// remaining less than this value, the IoMgr will stop all buffering 
regardless of the
+// current queue size.
+static const int LOW_MEMORY = 64 * 1024 * 1024;
+
+const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
+
 AtomicInt32 DiskIoMgr::next_disk_id_;
 
 namespace detail {
@@ -152,6 +155,34 @@ string DiskIoMgr::DebugString() {
   return ss.str();
 }
 
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+int64_t buffer_len, MemTracker* mem_tracker)
+  : io_mgr_(io_mgr),
+reader_(reader),
+mem_tracker_(mem_tracker),
+scan_range_(scan_range),
+buffer_(buffer),
+buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
+  DCHECK_NE(scan_range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER,
+  mem_tracker == nullptr);
+}
+
+void BufferDescriptor::TransferOwnership(MemTracker* dst) {
+  DCHECK(dst != nullptr);
+  DCHECK(!is_client_buffer());
+  // Memory of cached buffers is not tracked against a tracker.
+  if (is_cached()) return;
+  DCHECK(mem_tracker_ != nullptr);
+  dst->Consume(buffer_len_);
+  mem_tracker_->Release(buffer_len_);
+  mem_tracker_ = dst;
+}
+
 WriteRange::WriteRange(
 const string& file, int64_t file_offset, int disk_id, WriteDoneCallback 
callback)
   : RequestRange(RequestType::WRITE), callback_(callback) {
@@ -192,8 +223,8 @@ DiskIoMgr::DiskIoMgr() :
 num_io_threads_per_solid_state_disk_(GetFirstPositiveVal(
 FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk,
 THREADS_PER_SOLID_STATE_DISK)),
-max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)),
-min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)),
+max_buffer_size_(FLAGS_read_size),
+min_buffer_size_(FLAGS_min_buffer_size),
 shut_down_(false),
 total_bytes_read_counter_(TUnit::BYTES),
 read_timer_(TUnit::TIME_NS),
@@ -202,6 +233,8 @@ DiskIoMgr::DiskIoMgr() :
 FLAGS_num_file_handle_cache_partitions,
 FLAGS_unused_file_handle_timeout_sec) {
   DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size);
+  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, 
min_buffer_size_);
+  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   int num_local_disks = DiskInfo::num_disks();
   if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
 LOG(WARNING) << "Number of disks specified should be between 0 and the 
number of "
@@ -215,11 +248,11 @@ DiskIoMgr::DiskIoMgr() :
 }
 
 DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
-int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t 
max_buffer_size) :
+int threads_per_solid_state_disk, int min_buffer_size, int 
max_buffer_size) :
 num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
 num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
-max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)),
-min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)),
+max_buffer_size_(max_buffer_size),
+min_buffer_size_(min_buffer_size),
 shut_down_(false),
 total_bytes_read_counter_(TUnit::BYTES),
 read_timer_(TUnit::TIME_NS),
@@ -227,6 +260,8 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int 
threads_per_rotational_disk,
 FileSystemUtil::MaxNumFileHandles()),
 

[4/9] impala git commit: Revert IMPALA-4835 and dependent changes

2018-03-03 Thread lv
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
--
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java 
b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 0a945bd..23f2d88 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -153,6 +153,26 @@ public class SlotRef extends Expr {
 return "";
   }
 
+  /**
+   * Checks if this slotRef refers to an array "pos" pseudo-column.
+   *
+   * Note: checking whether the column is null distinguishes between top-level 
columns
+   * and nested types. This check more specifically looks just for a reference 
to the
+   * "pos" field of an array type.
+   */
+  public boolean isArrayPosRef() {
+TupleDescriptor parent = getDesc().getParent();
+if (parent == null) return false;
+Type parentType = parent.getType();
+if (parentType instanceof CollectionStructType) {
+  if (((CollectionStructType)parentType).isArrayStruct() &&
+  getDesc().getLabel().equals(Path.ARRAY_POS_FIELD_NAME)) {
+return true;
+  }
+}
+return false;
+  }
+
   @Override
   protected void toThrift(TExprNode msg) {
 msg.node_type = TExprNodeType.SLOT_REF;

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
--
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 87d1806..7735f98 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -59,7 +59,6 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
-import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.fb.FbFileBlock;
@@ -77,7 +76,6 @@ import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TTableStats;
-import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.MembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,7 +143,7 @@ public class HdfsScanNode extends ScanNode {
   // derived experimentally: running metadata-only Parquet count(*) scans on 
TPC-H
   // lineitem and TPC-DS store_sales of different sizes resulted in memory 
consumption
   // between 128kb and 1.1mb.
-  private static final long MIN_MEMORY_ESTIMATE = 1L * 1024L * 1024L;
+  private final static long MIN_MEMORY_ESTIMATE = 1 * 1024 * 1024;
 
   private final HdfsTable tbl_;
 
@@ -168,18 +166,6 @@ public class HdfsScanNode extends ScanNode {
   private long totalFiles_ = 0;
   private long totalBytes_ = 0;
 
-  // File formats scanned. Set in computeScanRangeLocations().
-  private Set fileFormats_;
-
-  // Number of bytes in the largest scan range (i.e. hdfs split). Set in
-  // computeScanRangeLocations().
-  private long maxScanRangeBytes_ = 0;
-
-  // The ideal reservation to process a single scan range (i.e. hdfs split), 
>= the
-  // minimum reservation. Generally provides enough memory to overlap CPU and 
I/O and
-  // maximize throughput. Set in computeResourceProfile().
-  private long idealScanRangeReservation_ = -1;
-
   // Input cardinality based on the partition row counts or extrapolation. -1 
if invalid.
   // Both values can be valid to report them in the explain plan, but only one 
of them is
   // used for determining the scan cardinality.
@@ -343,25 +329,25 @@ public class HdfsScanNode extends ScanNode {
 computeDictionaryFilterConjuncts(analyzer);
 
 // compute scan range locations with optional sampling
-computeScanRangeLocations(analyzer);
+Set fileFormats = computeScanRangeLocations(analyzer);
 
 // Determine backend scan node implementation to use. The optimized MT 
implementation
 // is currently only supported for Parquet.
 if (analyzer.getQueryOptions().isSetMt_dop() &&
 analyzer.getQueryOptions().mt_dop > 0 &&
-fileFormats_.size() == 1 &&
-(fileFormats_.contains(HdfsFileFormat.PARQUET)
-  || fileFormats_.contains(HdfsFileFormat.TEXT))) {
+fileFormats.size() == 1 &&
+(fileFormats.contains(HdfsFileFormat.PARQUET)
+  || fileFormats.contains(HdfsFileFormat.TEXT))) {
   useMtScanNode_ = true;
 } else {
   useMtScanNode_ = false;
 }
 
-if (fileFormats_.contains(HdfsFileFormat.PARQUET)) {
+if 

[8/9] impala git commit: Revert IMPALA-4835 and dependent changes

2018-03-03 Thread lv
http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/scanner-context.cc
--
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index c669e65..abdde07 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,15 +41,14 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* 
scan_node,
-BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* 
partition_desc,
-const vector& filter_ctxs,
-MemPool* expr_results_pool)
+HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
+const vector& filter_ctxs, MemPool* expr_results_pool)
   : state_(state),
 scan_node_(scan_node),
-bp_client_(bp_client),
 partition_desc_(partition_desc),
 filter_ctxs_(filter_ctxs),
 expr_results_pool_(expr_results_pool) {
+  AddStream(scan_range);
 }
 
 ScannerContext::~ScannerContext() {
@@ -67,20 +66,19 @@ void ScannerContext::ClearStreams() {
 }
 
 ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
-int64_t reservation, const HdfsFileDesc* file_desc)
+const HdfsFileDesc* file_desc)
   : parent_(parent),
 scan_range_(scan_range),
 file_desc_(file_desc),
-reservation_(reservation),
 file_len_(file_desc->file_length),
 next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
 boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
 boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
 }
 
-ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t 
reservation) {
-  streams_.emplace_back(new Stream(this, range, reservation,
-  scan_node_->GetFileDesc(partition_desc_->id(), range->file(;
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
+  streams_.emplace_back(new Stream(
+  this, range, scan_node_->GetFileDesc(partition_desc_->id(), 
range->file(;
   return streams_.back().get();
 }
 
@@ -103,7 +101,6 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool 
done) {
 
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   DCHECK_EQ(0, io_buffer_bytes_left_);
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
   if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
   if (io_buffer_ != nullptr) ReturnIoBuffer();
 
@@ -124,7 +121,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
 SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
 int64_t read_past_buffer_size = 0;
-int64_t max_buffer_size = io_mgr->max_buffer_size();
+int64_t max_buffer_size = 
parent_->state_->io_mgr()->max_read_buffer_size();
 if (!read_past_size_cb_.empty()) read_past_buffer_size = 
read_past_size_cb_(offset);
 if (read_past_buffer_size <= 0) {
   // Either no callback was set or the callback did not return an 
estimate. Use
@@ -136,7 +133,6 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
 read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
 read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
 read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
-read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
 // We're reading past the scan range. Be careful not to read past the end 
of file.
 DCHECK_GE(read_past_buffer_size, 0);
 if (read_past_buffer_size == 0) {
@@ -147,23 +143,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
 ScanRange* range = parent_->scan_node_->AllocateScanRange(
 scan_range_->fs(), filename(), read_past_buffer_size, offset, 
partition_id,
 scan_range_->disk_id(), false, BufferOpts::Uncached());
-bool needs_buffers;
-RETURN_IF_ERROR(io_mgr->StartScanRange(
-parent_->scan_node_->reader_context(), range, _buffers));
-if (needs_buffers) {
-  // Allocate fresh buffers. The buffers for 'scan_range_' should be 
released now
-  // since we hit EOS.
-  if (reservation_ < io_mgr->min_buffer_size()) {
-return Status(Substitute("Could not read past end of scan range in 
file '$0'. "
-"Reservation provided $1 was < the minimum I/O buffer size",
-reservation_, io_mgr->min_buffer_size()));
-  }
-  RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-  parent_->scan_node_->reader_context(), parent_->bp_client_, range,
-  reservation_));
-}
-RETURN_IF_ERROR(range->GetNext(_buffer_));
-DCHECK(io_buffer_->eosr());
+RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
+parent_->scan_node_->reader_context(), range, _buffer_));
   }
 
   DCHECK(io_buffer_ != nullptr);
@@ -343,8 +324,7 @@ Status