Philip Zeyliger has uploaded this change for review. ( 
http://gerrit.cloudera.org:8080/12683


Change subject: Prototype for a remote read byte cache.
......................................................................

Prototype for a remote read byte cache.

This code (which is far from ready for prime time) attempts to get to a
place where a byte cache for remote (either HDFS or S3 or ADLS) reads
can be evaluated. This commit message tries to write down what I've
figured out so far...

Hive LLAP found that it was useful to build such a cache.  (See
hive.git:llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
and neighbors.) In their case, they implemented an LRUF policy. They are also
caching the uncompressed ORC byte streams, which give them fairly consistent
chunk sizes (because the compression buffers are typically all the same size).
Alluxio also found it useful to build a cache, and it plugs in as a Hadoop
filesystem. Apache Ignite and Quobole Rubix are of interest. And, of course,
there's memcached, Redis, ehcache, and so on and on and on...

Hive LLAP found that SSDs work best for these caches, since they're
seek heavy. I think this implies that they'd be configured differently
than scratch dirs, since, in a machine with both SSDs and disk, you may
want to keep the cache off the disk. (Or you may want a three-tier policy..)

The code in this commit builds a cache with the simplest policy I could
implement (FIFO) and backs the cache with a large, memory-mapped file. There is
implicitly a two-level cache here: the OS page cache is handling which pages
are in memory, and the cache here is handling eviction from the cache
altogether. The metadata for the cache is stored in a map in memory,
though the linked list management of free space is directly in the buffers.
(I currently regret that, but I don't think it affects the evaluation goal;
a linked list without pointer arithmetic would have been more pleasant
to deal with.)

I hooked up the code through ReadFromPosInternal() which was sort of
easy.  Technically cached files don't need an hdfsFileOpen(), but I've
not optimized that.

I've not made the cache coalesce overlapping reads. I suspect that
in practice it may not be necessary, because how we read tables is pretty
consistent, but that's something that needs to be checked.

I've not made the cache do "compaction" because the FIFO policy means
that there's no fragmentation.

When breaking down the function of the cache, a few things have to happen:
* Impala has to read through the cache or store things in the cache.
  I've done this via wrapping ReadFromPosInternal()

* Impala has to specify the "value" of certain data; e.g., Parquet footers
  or index pages are more valuable than data pages of really large
  fact tables.

  I've not dealt with this.

* The cache has to be configured.

  I've exposed a pair of flags for directories and how much of
  the relevant free space to use. If this sort of thing is long-lived,
  you'd actually want to integrate this with the general scratch space
  management code in Impala: a spilling query could use disk space
  that's being used by a cache. (Though even spilling queries should
  have limits; the cache may be more valuable than the currently
  spilling query.)

* The cache has to expose metrics.

  I exposed hit/miss counters (in both count and bytes) on the query,
  but nothing global.

* The cache needs to have a notion of locality, so that files don't
  end up cached N times for an N-node cluster.

  I've taken advantage of the recent changes to make remote reads pin themselves
  to certain hosts (to make the remote file handle cache work). I don't know
  whether that currently leaves us with 3 or 1 copies of most files, but
  it's enough for evaluation.

* The cache needs to have decent threading, since we read from many threads.

  I've set it up so that you get multiple caches, one per dir, and,
  furthermore, each cache is split into 8. This roughly models
  having a pool of 8 threads reading from SSDs. I've not explored
  this tunable space.

I used neither the temporary file manager code nor the buffer pool code.
This is out of expedience/ignorance. I suspect the temporary file
manager code, and especially its ability to do encryption, would be very
useful. The buffer pool code currently provides buffers that are pinned
by default, and I didn't want to deal with pinning and unpinning. A more
complicated implementation would find much more re-use potential here.

I looked into using 2MB Transparent Huge Pages but found that mmap
doesn't do both "file-backed" and "THP." There's some performance
exploration to be done here, but I've not looked into it.

The cache doesn't survive restarts. This seems ok and certainly makes
state management easier, since we don't have to worry about consistency
for the metadata state in the face of a crash. The underlying file is
deleted at creation time. Users may end up getting confused because
their file system is out of space and yet they can't find the 500GB
file, though.

Todd helpfully pointed me to an madvise() flag that tells the system not
to dump the caches while dumping core. I've not since crashed the thing,
but I can assure you the 250GB core dump was unpleasant for my system.

The testing I've done so far looks like:

  // Setup a table pointed to an external HDFS
  create external table philip_test stored as parquet location 
'hdfs://....:8020/user/hive/warehouse/philip_test' as select * from 
tpcds_parquet.store_returns;

  // Run a query. This one happens to read the table twice:
  with t as (select ndv(sr_returned_date_sk)+ ndv(sr_return_time_sk)+ 
ndv(sr_item_sk)+ ndv(sr_customer_sk)+ ndv(sr_cdemo_sk)+ ndv(sr_hdemo_sk)+ 
ndv(sr_addr_sk)+ ndv(sr_store_sk)+ ndv(sr_reason_sk)+ ndv(sr_ticket_number)+ 
ndv(sr_return_quantity)+ ndv(sr_return_amt)+ ndv(sr_return_tax)+ 
ndv(sr_return_amt_inc_tax)+ ndv(sr_fee)+ ndv(sr_return_ship_cost)+ 
ndv(sr_refunded_cash)+ ndv(sr_reversed_charge)+ ndv(sr_store_credit)+ 
ndv(sr_net_loss) from philip_test) select * from t join t t2 join t t3 join t 
t4;

First read takes ~3s; second read takes < 1s (with a debug build). I've
not yet done testing on more realistic clusters, but on a 1GB text
table, I've seen ~1GB/s read throughput as reported by Impala when
accessing the cache.

I also tested on a cluster of ~30 nodes pointed to an HDFS of ~30
non-overlapping nodes. TPC-DS Q28 at 10TB sped up, as expected, using
the cache. The query reads the same table over and over again and we've
seen it saturate our network. With caching, it runs in the same time (on
second or third occurrence) as the co-located cluster. This isn't
altogether surprising, but is nice validation. Our remote read
scheduling isn't as consistent as our local scheduling, so that
would need to be worked out.

Change-Id: Ic312b0f7ac7875e00a3855ef21dce5b8a9aa67c5
---
M be/src/exec/hdfs-scan-node-base.cc
M be/src/exec/hdfs-scan-node-base.h
M be/src/runtime/io/disk-io-mgr.cc
M be/src/runtime/io/disk-io-mgr.h
M be/src/runtime/io/hdfs-file-reader.cc
M be/src/runtime/io/hdfs-file-reader.h
M be/src/runtime/io/request-context.h
M be/src/scheduling/scheduler.cc
M be/src/util/CMakeLists.txt
A be/src/util/cache-test.cc
A be/src/util/cache.cc
A be/src/util/cache.h
12 files changed, 780 insertions(+), 3 deletions(-)



  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/83/12683/1
--
To view, visit http://gerrit.cloudera.org:8080/12683
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic312b0f7ac7875e00a3855ef21dce5b8a9aa67c5
Gerrit-Change-Number: 12683
Gerrit-PatchSet: 1
Gerrit-Owner: Philip Zeyliger <phi...@cloudera.com>

Reply via email to