This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 15304598b9 Wire up FSDataInputStream.setDropBehind, set to true in 
FileCompactor (#3079)
15304598b9 is described below

commit 15304598b95497aa3b98240a34803c622d13d25b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Nov 16 09:58:05 2022 -0500

    Wire up FSDataInputStream.setDropBehind, set to true in FileCompactor 
(#3079)
    
    Added CachableBlockFile.CachableBuilder.fsPath override method that allows
    the caller to specify whether or not setDropBehind should be called on
    the FSDataInputStream. Wired up FileOptions and RFileOperations for this
    new setting. Modified FileCompactor only to enable this setting. Any scans
    that are running on the same Tablet that is compacting will likely have
    their blocks cached, so this should not impact scans.
    
    I looked at setting FSDataInputStream.setDropBehind on the FileCompactors
    input files after the compaction was completed, in
    DatafileManager.bringMajorCompactionOnline, but the way this is wired up
    in Hadoop you have to read from the file to make this setting active (see
    org.apache.hadoop.hdfs.server.datanode.BlockSender.close().
    
    I believe that the effect of this change is that this will set
    POSIX_FADV_DONTNEED on the DataNodes that serve up a copy of each HDFS file
    block that is read. This means that DataNodes that serve up different
    block replicas may not have the POSIX_FADV_DONTNEED set on the file
    descriptor.
    
    The impact of this change is likely inconsequential when looking at a
    single file or on a system that is not actively compacting. This change
    will have a larger impact on systems that are constantly compacting.
    
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../apache/accumulo/core/file/FileOperations.java  | 26 +++++++++++++++++-----
 .../file/blockfile/impl/CachableBlockFile.java     | 23 ++++++++++++++++++-
 .../accumulo/core/file/rfile/RFileOperations.java  | 10 ++++-----
 .../accumulo/server/compaction/FileCompactor.java  |  3 ++-
 .../org/apache/accumulo/tserver/log/DfsLogger.java |  4 +++-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  4 +++-
 6 files changed, 56 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java 
b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index b5c5c9e817..2a2b4aeaa5 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -183,12 +183,13 @@ public abstract class FileOperations {
     public final Range range;
     public final Set<ByteSequence> columnFamilies;
     public final boolean inclusive;
+    public final boolean dropCacheBehind;
 
     public FileOptions(AccumuloConfiguration tableConfiguration, String 
filename, FileSystem fs,
         Configuration fsConf, RateLimiter rateLimiter, String compression,
         FSDataOutputStream outputStream, boolean enableAccumuloStart, 
CacheProvider cacheProvider,
         Cache<String,Long> fileLenCache, boolean seekToBeginning, 
CryptoService cryptoService,
-        Range range, Set<ByteSequence> columnFamilies, boolean inclusive) {
+        Range range, Set<ByteSequence> columnFamilies, boolean inclusive, 
boolean dropCacheBehind) {
       this.tableConfiguration = tableConfiguration;
       this.filename = filename;
       this.fs = fs;
@@ -204,6 +205,7 @@ public abstract class FileOperations {
       this.range = range;
       this.columnFamilies = columnFamilies;
       this.inclusive = inclusive;
+      this.dropCacheBehind = dropCacheBehind;
     }
 
     public AccumuloConfiguration getTableConfiguration() {
@@ -277,6 +279,7 @@ public abstract class FileOperations {
     private Configuration fsConf;
     private RateLimiter rateLimiter;
     private CryptoService cryptoService;
+    private boolean dropCacheBehind = false;
 
     protected FileHelper fs(FileSystem fs) {
       this.fs = Objects.requireNonNull(fs);
@@ -308,28 +311,36 @@ public abstract class FileOperations {
       return this;
     }
 
+    protected FileHelper dropCacheBehind(boolean drop) {
+      this.dropCacheBehind = drop;
+      return this;
+    }
+
     protected FileOptions toWriterBuilderOptions(String compression,
         FSDataOutputStream outputStream, boolean startEnabled) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, 
rateLimiter, compression,
-          outputStream, startEnabled, NULL_PROVIDER, null, false, 
cryptoService, null, null, true);
+          outputStream, startEnabled, NULL_PROVIDER, null, false, 
cryptoService, null, null, true,
+          dropCacheBehind);
     }
 
     protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider,
         Cache<String,Long> fileLenCache, boolean seekToBeginning) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, 
rateLimiter, null, null,
           false, cacheProvider == null ? NULL_PROVIDER : cacheProvider, 
fileLenCache,
-          seekToBeginning, cryptoService, null, null, true);
+          seekToBeginning, cryptoService, null, null, true, dropCacheBehind);
     }
 
     protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> 
fileLenCache) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, 
rateLimiter, null, null,
-          false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, 
null, true);
+          false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, 
null, true,
+          dropCacheBehind);
     }
 
     protected FileOptions toScanReaderBuilderOptions(Range range, 
Set<ByteSequence> columnFamilies,
         boolean inclusive) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, 
rateLimiter, null, null,
-          false, NULL_PROVIDER, null, false, cryptoService, range, 
columnFamilies, inclusive);
+          false, NULL_PROVIDER, null, false, cryptoService, range, 
columnFamilies, inclusive,
+          dropCacheBehind);
     }
 
     protected AccumuloConfiguration getTableConfiguration() {
@@ -427,6 +438,11 @@ public abstract class FileOperations {
       return this;
     }
 
+    public ReaderBuilder dropCachesBehind() {
+      this.dropCacheBehind(true);
+      return this;
+    }
+
     /**
      * Seek the constructed iterator to the beginning of its domain before 
returning. Equivalent to
      * {@code seekToBeginning(true)}.
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index b33f5ea821..e0bf0c0bda 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.spi.cache.CacheEntry;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
@@ -83,8 +84,28 @@ public class CachableBlockFile {
     }
 
     public CachableBuilder fsPath(FileSystem fs, Path dataFile) {
+      return fsPath(fs, dataFile, false);
+    }
+
+    public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean 
dropCacheBehind) {
       this.cacheId = pathToCacheId(dataFile);
-      this.inputSupplier = () -> fs.open(dataFile);
+      this.inputSupplier = () -> {
+        FSDataInputStream is = fs.open(dataFile);
+        if (dropCacheBehind) {
+          // Tell the DataNode that the write ahead log does not need to be 
cached in the OS page
+          // cache
+          try {
+            is.setDropBehind(Boolean.TRUE);
+            log.trace("Called setDropBehind(TRUE) for stream reading file {}", 
dataFile);
+          } catch (UnsupportedOperationException e) {
+            log.debug("setDropBehind not enabled for wal file: {}", dataFile);
+          } catch (IOException e) {
+            log.debug("IOException setting drop behind for file: {}, msg: {}", 
dataFile,
+                e.getMessage());
+          }
+        }
+        return is;
+      };
       this.lengthSupplier = () -> fs.getFileStatus(dataFile).getLen();
       return this;
     }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 833adb700d..6a73a914d8 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -47,11 +47,11 @@ public class RFileOperations extends FileOperations {
   private static final Collection<ByteSequence> EMPTY_CF_SET = 
Collections.emptySet();
 
   private static RFile.Reader getReader(FileOptions options) throws 
IOException {
-    CachableBuilder cb =
-        new CachableBuilder().fsPath(options.getFileSystem(), new 
Path(options.getFilename()))
-            
.conf(options.getConfiguration()).fileLen(options.getFileLenCache())
-            
.cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter())
-            .cryptoService(options.getCryptoService());
+    CachableBuilder cb = new CachableBuilder()
+        .fsPath(options.getFileSystem(), new Path(options.getFilename()), 
options.dropCacheBehind)
+        .conf(options.getConfiguration()).fileLen(options.getFileLenCache())
+        
.cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter())
+        .cryptoService(options.getCryptoService());
     return new RFile.Reader(cb);
   }
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index a15f9a901e..f45811c85a 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -313,7 +313,8 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
         reader = fileFactory.newReaderBuilder()
             .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService)
-            
.withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter()).build();
+            
.withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter())
+            .dropCachesBehind().build();
 
         readers.add(reader);
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index b24387050b..e30bb58cca 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -420,8 +420,10 @@ public class DfsLogger implements Comparable<DfsLogger> {
       // Tell the DataNode that the write ahead log does not need to be cached 
in the OS page cache
       try {
         logFile.setDropBehind(Boolean.TRUE);
-      } catch (IOException | UnsupportedOperationException e) {
+      } catch (UnsupportedOperationException e) {
         log.debug("setDropBehind writes not enabled for wal file: {}", 
logFile);
+      } catch (IOException e) {
+        log.debug("IOException setting drop behind for file: {}, msg: {}", 
logFile, e.getMessage());
       }
 
       // check again that logfile can be sync'd
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 6df7729766..af787f5eff 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -146,8 +146,10 @@ public class LogSorter {
       // Tell the DataNode that the write ahead log does not need to be cached 
in the OS page cache
       try {
         input.setDropBehind(Boolean.TRUE);
-      } catch (IOException | UnsupportedOperationException e) {
+      } catch (UnsupportedOperationException e) {
         log.debug("setDropBehind reads not enabled for wal file: {}", input);
+      } catch (IOException e) {
+        log.debug("IOException setting drop behind for file: {}, msg: {}", 
input, e.getMessage());
       }
 
       try {

Reply via email to