Philip Zeyliger has uploaded this change for review. ( http://gerrit.cloudera.org:8080/12683
Change subject: Prototype for a remote read byte cache. ...................................................................... Prototype for a remote read byte cache. This code (which is far from ready for prime time) attempts to get to a place where a byte cache for remote (either HDFS or S3 or ADLS) reads can be evaluated. This commit message tries to write down what I've figured out so far... Hive LLAP found that it was useful to build such a cache. (See hive.git:llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java and neighbors.) In their case, they implemented an LRUF policy. They are also caching the uncompressed ORC byte streams, which give them fairly consistent chunk sizes (because the compression buffers are typically all the same size). Alluxio also found it useful to build a cache, and it plugs in as a Hadoop filesystem. Apache Ignite and Quobole Rubix are of interest. And, of course, there's memcached, Redis, ehcache, and so on and on and on... Hive LLAP found that SSDs work best for these caches, since they're seek heavy. I think this implies that they'd be configured differently than scratch dirs, since, in a machine with both SSDs and disk, you may want to keep the cache off the disk. (Or you may want a three-tier policy..) The code in this commit builds a cache with the simplest policy I could implement (FIFO) and backs the cache with a large, memory-mapped file. There is implicitly a two-level cache here: the OS page cache is handling which pages are in memory, and the cache here is handling eviction from the cache altogether. The metadata for the cache is stored in a map in memory, though the linked list management of free space is directly in the buffers. (I currently regret that, but I don't think it affects the evaluation goal; a linked list without pointer arithmetic would have been more pleasant to deal with.) I hooked up the code through ReadFromPosInternal() which was sort of easy. Technically cached files don't need an hdfsFileOpen(), but I've not optimized that. I've not made the cache coalesce overlapping reads. I suspect that in practice it may not be necessary, because how we read tables is pretty consistent, but that's something that needs to be checked. I've not made the cache do "compaction" because the FIFO policy means that there's no fragmentation. When breaking down the function of the cache, a few things have to happen: * Impala has to read through the cache or store things in the cache. I've done this via wrapping ReadFromPosInternal() * Impala has to specify the "value" of certain data; e.g., Parquet footers or index pages are more valuable than data pages of really large fact tables. I've not dealt with this. * The cache has to be configured. I've exposed a pair of flags for directories and how much of the relevant free space to use. If this sort of thing is long-lived, you'd actually want to integrate this with the general scratch space management code in Impala: a spilling query could use disk space that's being used by a cache. (Though even spilling queries should have limits; the cache may be more valuable than the currently spilling query.) * The cache has to expose metrics. I exposed hit/miss counters (in both count and bytes) on the query, but nothing global. * The cache needs to have a notion of locality, so that files don't end up cached N times for an N-node cluster. I've taken advantage of the recent changes to make remote reads pin themselves to certain hosts (to make the remote file handle cache work). I don't know whether that currently leaves us with 3 or 1 copies of most files, but it's enough for evaluation. * The cache needs to have decent threading, since we read from many threads. I've set it up so that you get multiple caches, one per dir, and, furthermore, each cache is split into 8. This roughly models having a pool of 8 threads reading from SSDs. I've not explored this tunable space. I used neither the temporary file manager code nor the buffer pool code. This is out of expedience/ignorance. I suspect the temporary file manager code, and especially its ability to do encryption, would be very useful. The buffer pool code currently provides buffers that are pinned by default, and I didn't want to deal with pinning and unpinning. A more complicated implementation would find much more re-use potential here. I looked into using 2MB Transparent Huge Pages but found that mmap doesn't do both "file-backed" and "THP." There's some performance exploration to be done here, but I've not looked into it. The cache doesn't survive restarts. This seems ok and certainly makes state management easier, since we don't have to worry about consistency for the metadata state in the face of a crash. The underlying file is deleted at creation time. Users may end up getting confused because their file system is out of space and yet they can't find the 500GB file, though. Todd helpfully pointed me to an madvise() flag that tells the system not to dump the caches while dumping core. I've not since crashed the thing, but I can assure you the 250GB core dump was unpleasant for my system. The testing I've done so far looks like: // Setup a table pointed to an external HDFS create external table philip_test stored as parquet location 'hdfs://....:8020/user/hive/warehouse/philip_test' as select * from tpcds_parquet.store_returns; // Run a query. This one happens to read the table twice: with t as (select ndv(sr_returned_date_sk)+ ndv(sr_return_time_sk)+ ndv(sr_item_sk)+ ndv(sr_customer_sk)+ ndv(sr_cdemo_sk)+ ndv(sr_hdemo_sk)+ ndv(sr_addr_sk)+ ndv(sr_store_sk)+ ndv(sr_reason_sk)+ ndv(sr_ticket_number)+ ndv(sr_return_quantity)+ ndv(sr_return_amt)+ ndv(sr_return_tax)+ ndv(sr_return_amt_inc_tax)+ ndv(sr_fee)+ ndv(sr_return_ship_cost)+ ndv(sr_refunded_cash)+ ndv(sr_reversed_charge)+ ndv(sr_store_credit)+ ndv(sr_net_loss) from philip_test) select * from t join t t2 join t t3 join t t4; First read takes ~3s; second read takes < 1s (with a debug build). I've not yet done testing on more realistic clusters, but on a 1GB text table, I've seen ~1GB/s read throughput as reported by Impala when accessing the cache. I also tested on a cluster of ~30 nodes pointed to an HDFS of ~30 non-overlapping nodes. TPC-DS Q28 at 10TB sped up, as expected, using the cache. The query reads the same table over and over again and we've seen it saturate our network. With caching, it runs in the same time (on second or third occurrence) as the co-located cluster. This isn't altogether surprising, but is nice validation. Our remote read scheduling isn't as consistent as our local scheduling, so that would need to be worked out. Change-Id: Ic312b0f7ac7875e00a3855ef21dce5b8a9aa67c5 --- M be/src/exec/hdfs-scan-node-base.cc M be/src/exec/hdfs-scan-node-base.h M be/src/runtime/io/disk-io-mgr.cc M be/src/runtime/io/disk-io-mgr.h M be/src/runtime/io/hdfs-file-reader.cc M be/src/runtime/io/hdfs-file-reader.h M be/src/runtime/io/request-context.h M be/src/scheduling/scheduler.cc M be/src/util/CMakeLists.txt A be/src/util/cache-test.cc A be/src/util/cache.cc A be/src/util/cache.h 12 files changed, 780 insertions(+), 3 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/83/12683/1 -- To view, visit http://gerrit.cloudera.org:8080/12683 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: Ic312b0f7ac7875e00a3855ef21dce5b8a9aa67c5 Gerrit-Change-Number: 12683 Gerrit-PatchSet: 1 Gerrit-Owner: Philip Zeyliger <phi...@cloudera.com>