This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 15304598b9 Wire up FSDataInputStream.setDropBehind, set to true in FileCompactor (#3079) 15304598b9 is described below commit 15304598b95497aa3b98240a34803c622d13d25b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Nov 16 09:58:05 2022 -0500 Wire up FSDataInputStream.setDropBehind, set to true in FileCompactor (#3079) Added CachableBlockFile.CachableBuilder.fsPath override method that allows the caller to specify whether or not setDropBehind should be called on the FSDataInputStream. Wired up FileOptions and RFileOperations for this new setting. Modified FileCompactor only to enable this setting. Any scans that are running on the same Tablet that is compacting will likely have their blocks cached, so this should not impact scans. I looked at setting FSDataInputStream.setDropBehind on the FileCompactors input files after the compaction was completed, in DatafileManager.bringMajorCompactionOnline, but the way this is wired up in Hadoop you have to read from the file to make this setting active (see org.apache.hadoop.hdfs.server.datanode.BlockSender.close(). I believe that the effect of this change is that this will set POSIX_FADV_DONTNEED on the DataNodes that serve up a copy of each HDFS file block that is read. This means that DataNodes that serve up different block replicas may not have the POSIX_FADV_DONTNEED set on the file descriptor. The impact of this change is likely inconsequential when looking at a single file or on a system that is not actively compacting. This change will have a larger impact on systems that are constantly compacting. Co-authored-by: Keith Turner <ktur...@apache.org> --- .../apache/accumulo/core/file/FileOperations.java | 26 +++++++++++++++++----- .../file/blockfile/impl/CachableBlockFile.java | 23 ++++++++++++++++++- .../accumulo/core/file/rfile/RFileOperations.java | 10 ++++----- .../accumulo/server/compaction/FileCompactor.java | 3 ++- .../org/apache/accumulo/tserver/log/DfsLogger.java | 4 +++- .../org/apache/accumulo/tserver/log/LogSorter.java | 4 +++- 6 files changed, 56 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index b5c5c9e817..2a2b4aeaa5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -183,12 +183,13 @@ public abstract class FileOperations { public final Range range; public final Set<ByteSequence> columnFamilies; public final boolean inclusive; + public final boolean dropCacheBehind; public FileOptions(AccumuloConfiguration tableConfiguration, String filename, FileSystem fs, Configuration fsConf, RateLimiter rateLimiter, String compression, FSDataOutputStream outputStream, boolean enableAccumuloStart, CacheProvider cacheProvider, Cache<String,Long> fileLenCache, boolean seekToBeginning, CryptoService cryptoService, - Range range, Set<ByteSequence> columnFamilies, boolean inclusive) { + Range range, Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind) { this.tableConfiguration = tableConfiguration; this.filename = filename; this.fs = fs; @@ -204,6 +205,7 @@ public abstract class FileOperations { this.range = range; this.columnFamilies = columnFamilies; this.inclusive = inclusive; + this.dropCacheBehind = dropCacheBehind; } public AccumuloConfiguration getTableConfiguration() { @@ -277,6 +279,7 @@ public abstract class FileOperations { private Configuration fsConf; private RateLimiter rateLimiter; private CryptoService cryptoService; + private boolean dropCacheBehind = false; protected FileHelper fs(FileSystem fs) { this.fs = Objects.requireNonNull(fs); @@ -308,28 +311,36 @@ public abstract class FileOperations { return this; } + protected FileHelper dropCacheBehind(boolean drop) { + this.dropCacheBehind = drop; + return this; + } + protected FileOptions toWriterBuilderOptions(String compression, FSDataOutputStream outputStream, boolean startEnabled) { return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, compression, - outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true); + outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true, + dropCacheBehind); } protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider, Cache<String,Long> fileLenCache, boolean seekToBeginning) { return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, false, cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache, - seekToBeginning, cryptoService, null, null, true); + seekToBeginning, cryptoService, null, null, true, dropCacheBehind); } protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) { return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, - false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true); + false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true, + dropCacheBehind); } protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies, boolean inclusive) { return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, - false, NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive); + false, NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive, + dropCacheBehind); } protected AccumuloConfiguration getTableConfiguration() { @@ -427,6 +438,11 @@ public abstract class FileOperations { return this; } + public ReaderBuilder dropCachesBehind() { + this.dropCacheBehind(true); + return this; + } + /** * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to * {@code seekToBeginning(true)}. diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index b33f5ea821..e0bf0c0bda 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.spi.cache.CacheEntry; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; @@ -83,8 +84,28 @@ public class CachableBlockFile { } public CachableBuilder fsPath(FileSystem fs, Path dataFile) { + return fsPath(fs, dataFile, false); + } + + public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind) { this.cacheId = pathToCacheId(dataFile); - this.inputSupplier = () -> fs.open(dataFile); + this.inputSupplier = () -> { + FSDataInputStream is = fs.open(dataFile); + if (dropCacheBehind) { + // Tell the DataNode that the write ahead log does not need to be cached in the OS page + // cache + try { + is.setDropBehind(Boolean.TRUE); + log.trace("Called setDropBehind(TRUE) for stream reading file {}", dataFile); + } catch (UnsupportedOperationException e) { + log.debug("setDropBehind not enabled for wal file: {}", dataFile); + } catch (IOException e) { + log.debug("IOException setting drop behind for file: {}, msg: {}", dataFile, + e.getMessage()); + } + } + return is; + }; this.lengthSupplier = () -> fs.getFileStatus(dataFile).getLen(); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index 833adb700d..6a73a914d8 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -47,11 +47,11 @@ public class RFileOperations extends FileOperations { private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet(); private static RFile.Reader getReader(FileOptions options) throws IOException { - CachableBuilder cb = - new CachableBuilder().fsPath(options.getFileSystem(), new Path(options.getFilename())) - .conf(options.getConfiguration()).fileLen(options.getFileLenCache()) - .cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter()) - .cryptoService(options.getCryptoService()); + CachableBuilder cb = new CachableBuilder() + .fsPath(options.getFileSystem(), new Path(options.getFilename()), options.dropCacheBehind) + .conf(options.getConfiguration()).fileLen(options.getFileLenCache()) + .cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter()) + .cryptoService(options.getCryptoService()); return new RFile.Reader(cb); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index a15f9a901e..f45811c85a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -313,7 +313,8 @@ public class FileCompactor implements Callable<CompactionStats> { reader = fileFactory.newReaderBuilder() .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService) - .withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter()).build(); + .withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter()) + .dropCachesBehind().build(); readers.add(reader); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index b24387050b..e30bb58cca 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -420,8 +420,10 @@ public class DfsLogger implements Comparable<DfsLogger> { // Tell the DataNode that the write ahead log does not need to be cached in the OS page cache try { logFile.setDropBehind(Boolean.TRUE); - } catch (IOException | UnsupportedOperationException e) { + } catch (UnsupportedOperationException e) { log.debug("setDropBehind writes not enabled for wal file: {}", logFile); + } catch (IOException e) { + log.debug("IOException setting drop behind for file: {}, msg: {}", logFile, e.getMessage()); } // check again that logfile can be sync'd diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 6df7729766..af787f5eff 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -146,8 +146,10 @@ public class LogSorter { // Tell the DataNode that the write ahead log does not need to be cached in the OS page cache try { input.setDropBehind(Boolean.TRUE); - } catch (IOException | UnsupportedOperationException e) { + } catch (UnsupportedOperationException e) { log.debug("setDropBehind reads not enabled for wal file: {}", input); + } catch (IOException e) { + log.debug("IOException setting drop behind for file: {}, msg: {}", input, e.getMessage()); } try {