Yida Wu has uploaded a new patch set (#17). ( 
http://gerrit.cloudera.org:8080/16318 )

Change subject: IMPALA-9867: Add Support for Spilling to S3: Milestone 1
......................................................................

IMPALA-9867: Add Support for Spilling to S3: Milestone 1

Major Features
1) Local files as buffers for spilling to S3.
2) Async Upload and Sync Fetching of remote files.
3) Sync remote files deletion after query ends.
4) Local buffer files management.
5) Compatibility of spilling to local and remote.
6) All the errors from hdfs/s3 should terminate the query.

Changes on TmpFile:
* TmpFile is separated into two types of implementation, TmpFileLocal
  and TmpFileRemote.
  TmpFileLocal is used for Spilling to local file system.
  TmpFileRemote is a new type for Spilling to the remote. It contains
  two DiskFiles, one for local buffer, the other for the remote file.
* The DiskFile is an object that contains the information of a pysical
  file for passing to the DiskIOMgr to execute the IO operations on
  that specific file. The DiskFile also contains status information of
  the file,includes DiskFileStatus::INWRITING/PERSISTED/DELETED.
  When the DiskFile is initialized, it is in INWRITING status. If the
  file is persisted into the file system, it would become PERSISTED
  status. If the file is deleted, for example, the local buffer is
  evicted, so the DiskFile status of the buffer file would become
  deleted. After that, if the file is fetching from the remote, the
  DiskFile status of the buffer file would become INWRITING, and then
  PERSISTED if the fetching finishes successfully.

Implementation Details:
1) A new enum type is added to specify the disk type of files,
   indicating where the file physically locates.
   The types include DiskFileType::LOCAL/LOCAL_BUFFER/DFS/S3.
   DiskFileType::LOCAL indicates the file is in the local file system.
   DiskFileType::LOCAL_BUFFER indicates the file is in the local file
   system, and it is the buffer of a remote scratch file.
   DiskFileType::DFS/S3 indicates the file is in the HDFS/S3.
   Also, startup option remote_tmp_file_read_by_file is added
   to specify the implementation of reading pages from the remote.
   If set to true, the entire file would be fetched to the local
   buffer during reading(pinning) if local buffer was evicted.
   If set to false, only a page is read for each reading, no file is
   to be fetched.
2) Two disk queues have been added to do the file operation jobs.
   Queue name: RemoteS3DiskFileOper/RemoteDfsDiskFileOper
   File operations on the remote disk like upload and fetch should
   be done in these queues. The purpose of the queues is to isolate
   the file operations from normal read/write IO operations in different
   queues. It could increase the efficiency of the file operations by
   not being interrupted during a relatively long execution time, and
   also provide a more accurate control on the thread number working on
   file operation jobs.
   RemoteOperRange is the new type to carry the file operation jobs.
   Previously,we have request types of READ and WRITE.
   Now FILE_FETCH/FILE_UPLOAD are added.
3) The tmp files are deleted when the tmp file group is
   deconstructing. For remote files, the entire directory would be
   deleted.
4) The local buffer files management is to control the total size
   of local buffer files and evict files if needed.
   A local buffer file can be evicted if the temporary file has uploaded
   a copy to the remote disk or all of the pages in the file has been
   pinned.
   There are two modes to decide the sequence of choosing files to be
   evicted. Default is LIFO, the other is FIFO. It can be controlled
   by startup option remote_tmp_files_avail_pool_lifo.
5) Spilling to local has higher priority than spilling to remote.
   If no local scratch space is available, temporary data will be
   spilled to remote.
   The first available local directory is used for the local buffer
   for spilling to remote if any remote directory is configured.
   If remote directory is configured without any available local
   scratch space, an error will return for initialization failure.
   The purpose of the design is to simplify the implementation in
   milestone 1 with less changes on the configuration.

Limitations:
* Only one remote scratch dir is supported.
* The first local scratch dir is used for the buffer of remote scratch
  space if remote scratch dir exists.

Existing Issue:
There could be a potential deadlock when writeback function is calling
WriteDirtyPagesAsync() to write a new page to the file, while a new
file is needed for the writing but waiting for available space, adn the
thread which is calling the writeback function in the DiskQueue could
impede the queue to call GetNextRequestRange(), so that no other
WriteRanges can be proceeded.
A temporary solution is to put a limitation on the query number.
For each query, it needs two files in the local buffer directory to
guarantee no this type of deadlock, because at least one file is
completed in this case, and doing the upload to potentially release the
space to resolve the deadlock.
So it means the max file number of that the local buffer directory can
contain should be at least as many as two times the query number.
For example, if the directory size limit is 64MB, each file is 8MB, so
the maximum number of query in the same time without a deadlock is
64/(8*2) = 4.

Testcases:
* Ran pre-review-test
* Uint Tests added to tmp-file-mgr-test/disk-io-mgr-test.
* E2E Tests added to custom_cluster/test_scratch_disk.py.
* Ran Unit Tests:
$IMPALA_HOME/be/build/debug/runtime/buffered-tuple-stream-test
$IMPALA_HOME/be/build/debug/runtime/tmp-file-mgr-test
$IMPALA_HOME/be/build/debug/runtime/bufferpool/buffer-pool-test
$IMPALA_HOME/be/build/debug/runtime/io/disk-io-mgr-test
* Ran E2E Tests:
custom_cluster/test_scratch_disk.py

Change-Id: I419b1d5dbbfe35334d9f964c4b65e553579fdc89
---
M be/src/runtime/hdfs-fs-cache.cc
M be/src/runtime/hdfs-fs-cache.h
M be/src/runtime/io/CMakeLists.txt
A be/src/runtime/io/disk-file.cc
A be/src/runtime/io/disk-file.h
M be/src/runtime/io/disk-io-mgr-test.cc
M be/src/runtime/io/disk-io-mgr.cc
M be/src/runtime/io/disk-io-mgr.h
A be/src/runtime/io/file-writer.h
M be/src/runtime/io/hdfs-file-reader.cc
A be/src/runtime/io/hdfs-file-writer.cc
A be/src/runtime/io/hdfs-file-writer.h
M be/src/runtime/io/local-file-system.cc
M be/src/runtime/io/local-file-system.h
A be/src/runtime/io/local-file-writer.cc
A be/src/runtime/io/local-file-writer.h
M be/src/runtime/io/request-context.cc
M be/src/runtime/io/request-context.h
M be/src/runtime/io/request-ranges.h
M be/src/runtime/io/scan-range.cc
M be/src/runtime/query-state.cc
M be/src/runtime/tmp-file-mgr-internal.h
M be/src/runtime/tmp-file-mgr-test.cc
M be/src/runtime/tmp-file-mgr.cc
M be/src/runtime/tmp-file-mgr.h
M be/src/util/hdfs-util.cc
M be/src/util/hdfs-util.h
M common/thrift/metrics.json
M tests/custom_cluster/test_scratch_disk.py
29 files changed, 3,808 insertions(+), 254 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/18/16318/17
--
To view, visit http://gerrit.cloudera.org:8080/16318
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I419b1d5dbbfe35334d9f964c4b65e553579fdc89
Gerrit-Change-Number: 16318
Gerrit-PatchSet: 17
Gerrit-Owner: Yida Wu <[email protected]>
Gerrit-Reviewer: Abhishek Rawat <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Sahil Takiar <[email protected]>
Gerrit-Reviewer: Yida Wu <[email protected]>

Reply via email to