This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new d64ea33e25d HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation 
(#5762)
d64ea33e25d is described below

commit d64ea33e25d368a191af0573a0e2416790c217e3
Author: Bryan Beaudreault <bbeaudrea...@apache.org>
AuthorDate: Tue Mar 12 08:08:53 2024 -0400

    HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5762)
    
    Contributed-by: Charles Connell <char...@connells.org>
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Wei-Chiu Chuang <weic...@apache.org>
---
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java      |  5 ++--
 .../FanOutOneBlockAsyncDFSOutputHelper.java        | 16 +++++++----
 .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java  | 17 +++++------
 .../TestFanOutOneBlockAsyncDFSOutputHang.java      |  2 +-
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java     |  2 +-
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java      |  2 +-
 .../apache/hadoop/hbase/util/CommonFSUtils.java    | 33 ++++++++++++++++++++--
 .../hbase/regionserver/wal/AbstractFSWAL.java      |  4 +++
 .../wal/AbstractProtobufLogWriter.java             |  8 ++++--
 .../regionserver/wal/AsyncProtobufLogWriter.java   |  4 +--
 .../hbase/regionserver/wal/ProtobufLogWriter.java  |  6 ++--
 11 files changed, 71 insertions(+), 28 deletions(-)

diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index a530ca4a2a0..cbb0648f3af 100644
--- 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -48,11 +48,12 @@ public final class AsyncFSOutputHelper {
    */
   public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean 
overwrite,
     boolean createParent, short replication, long blockSize, EventLoopGroup 
eventLoopGroup,
-    Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
+    Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean 
noLocalWrite)
     throws IOException, CommonFSUtils.StreamLacksCapabilityException {
     if (fs instanceof DistributedFileSystem) {
       return 
FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
-        overwrite, createParent, replication, blockSize, eventLoopGroup, 
channelClass, monitor);
+        overwrite, createParent, replication, blockSize, eventLoopGroup, 
channelClass, monitor,
+        noLocalWrite);
     }
     final FSDataOutputStream out;
     int bufferSize = 
fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index b7bfd715bd0..7c2ba4ac999 100644
--- 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -503,7 +503,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     }
   }
 
-  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) 
{
+  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
+    boolean noLocalWrite) {
     List<CreateFlag> flags = new ArrayList<>();
     flags.add(CreateFlag.CREATE);
     if (overwrite) {
@@ -512,13 +513,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     if (SHOULD_REPLICATE_FLAG != null) {
       flags.add(SHOULD_REPLICATE_FLAG);
     }
+    if (noLocalWrite) {
+      flags.add(CreateFlag.NO_LOCAL_WRITE);
+    }
     return new EnumSetWritable<>(EnumSet.copyOf(flags));
   }
 
   private static FanOutOneBlockAsyncDFSOutput 
createOutput(DistributedFileSystem dfs, String src,
     boolean overwrite, boolean createParent, short replication, long blockSize,
-    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 
StreamSlowMonitor monitor)
-    throws IOException {
+    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, 
StreamSlowMonitor monitor,
+    boolean noLocalWrite) throws IOException {
     Configuration conf = dfs.getConf();
     DFSClient client = dfs.getClient();
     String clientName = client.getClientName();
@@ -535,7 +539,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       try {
         stat = FILE_CREATOR.create(namenode, src,
           
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), 
clientName,
-          getCreateFlags(overwrite), createParent, replication, blockSize,
+          getCreateFlags(overwrite, noLocalWrite), createParent, replication, 
blockSize,
           CryptoProtocolVersion.supported());
       } catch (Exception e) {
         if (e instanceof RemoteException) {
@@ -621,14 +625,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   public static FanOutOneBlockAsyncDFSOutput 
createOutput(DistributedFileSystem dfs, Path f,
     boolean overwrite, boolean createParent, short replication, long blockSize,
     EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
-    final StreamSlowMonitor monitor) throws IOException {
+    final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
       public FanOutOneBlockAsyncDFSOutput doCall(Path p)
         throws IOException, UnresolvedLinkException {
         return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, 
replication,
-          blockSize, eventLoopGroup, channelClass, monitor);
+          blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
       }
 
       @Override
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 68b8bfa3d9f..f0910684edd 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -141,7 +141,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
MONITOR);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
MONITOR, true);
     writeAndVerify(FS, f, out);
   }
 
@@ -154,7 +154,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
MONITOR);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
MONITOR, true);
     byte[] b = new byte[10];
     Bytes.random(b);
     out.write(b, 0, b.length);
@@ -183,7 +183,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
MONITOR);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
MONITOR, true);
     Thread.sleep(READ_TIMEOUT_MS * 2);
     // the connection to datanode should still alive.
     writeAndVerify(FS, f, out);
@@ -198,7 +198,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     try {
       FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
-        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
       fail("should fail with parent does not exist");
     } catch (RemoteException e) {
       LOG.info("expected exception caught", e);
@@ -220,8 +220,9 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
     Path f = new Path("/test");
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    try (FanOutOneBlockAsyncDFSOutput output = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
-      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, 
CHANNEL_CLASS, MONITOR)) {
+    try (FanOutOneBlockAsyncDFSOutput output =
+      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) {
       // should exclude the dead dn when retry so here we only have 2 DNs in 
pipeline
       assertEquals(2, output.getPipeline().length);
     } finally {
@@ -251,7 +252,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
     try (FanOutOneBlockAsyncDFSOutput output =
       FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
-        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
streamSlowDNsMonitor)) {
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
streamSlowDNsMonitor, true)) {
       // should exclude the dead dn when retry so here we only have 2 DNs in 
pipeline
       assertEquals(2, output.getPipeline().length);
       assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
@@ -266,7 +267,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
+      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, 
true);
     byte[] b = new byte[50 * 1024 * 1024];
     Bytes.random(b);
     out.write(b);
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
index 77752789dbb..7f6535a93a9 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
@@ -98,7 +98,7 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends 
AsyncFSTestBase {
     Path f = new Path("/testHang");
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 2,
-      FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+      FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
   }
 
   @AfterClass
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index 992dde7e9cb..ec87316e5f1 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -65,7 +65,7 @@ public class TestLocalAsyncOutput {
     Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
     FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
     AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
-      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, 
CHANNEL_CLASS, MONITOR);
+      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, 
CHANNEL_CLASS, MONITOR, true);
     TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
   }
 }
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 54a49ffebd9..e178d0960ea 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -258,7 +258,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
   private void test(Path file) throws IOException, InterruptedException, 
ExecutionException {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
-      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, 
CHANNEL_CLASS, MONITOR);
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, 
CHANNEL_CLASS, MONITOR, true);
     TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
   }
 
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
index 5d9c5fa9681..73bb6f38cd2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -733,6 +733,7 @@ public final class CommonFSUtils {
   private static final class DfsBuilderUtility {
     private static final Class<?> BUILDER;
     private static final Method REPLICATE;
+    private static final Method NO_LOCAL_WRITE;
 
     static {
       String builderName =
@@ -748,14 +749,25 @@ public final class CommonFSUtils {
       if (builderClass != null) {
         try {
           replicateMethod = builderClass.getMethod("replicate");
-          LOG.debug("Using builder API via reflection for DFS file creation.");
+          LOG.debug("Using builder API via reflection for DFS file creation 
replicate flag.");
         } catch (NoSuchMethodException e) {
           LOG.debug("Could not find replicate method on builder; will not set 
replicate when"
             + " creating output stream", e);
         }
       }
+      Method noLocalWriteMethod = null;
+      if (builderClass != null) {
+        try {
+          noLocalWriteMethod = builderClass.getMethod("noLocalWrite");
+          LOG.debug("Using builder API via reflection for DFS file creation 
noLocalWrite flag.");
+        } catch (NoSuchMethodException e) {
+          LOG.debug("Could not find noLocalWrite method on builder; will not 
set noLocalWrite when"
+            + " creating output stream", e);
+        }
+      }
       BUILDER = builderClass;
       REPLICATE = replicateMethod;
+      NO_LOCAL_WRITE = noLocalWriteMethod;
     }
 
     /**
@@ -771,6 +783,19 @@ public final class CommonFSUtils {
         }
       }
     }
+
+    static void noLocalWrite(FSDataOutputStreamBuilder<?, ?> builder) {
+      if (
+        BUILDER != null && NO_LOCAL_WRITE != null && 
BUILDER.isAssignableFrom(builder.getClass())
+      ) {
+        try {
+          NO_LOCAL_WRITE.invoke(builder);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          // Should have caught this failure during initialization, so log 
full trace here
+          LOG.warn("Couldn't use reflection with builder API", e);
+        }
+      }
+    }
   }
 
   /**
@@ -793,13 +818,17 @@ public final class CommonFSUtils {
    * Will not attempt to enable replication when passed an HFileSystem.
    */
   public static FSDataOutputStream createForWal(FileSystem fs, Path path, 
boolean overwrite,
-    int bufferSize, short replication, long blockSize, boolean isRecursive) 
throws IOException {
+    int bufferSize, short replication, long blockSize, boolean noLocalWrite, 
boolean isRecursive)
+    throws IOException {
     FSDataOutputStreamBuilder<?, ?> builder = 
fs.createFile(path).overwrite(overwrite)
       .bufferSize(bufferSize).replication(replication).blockSize(blockSize);
     if (isRecursive) {
       builder.recursive();
     }
     DfsBuilderUtility.replicate(builder);
+    if (noLocalWrite) {
+      DfsBuilderUtility.noLocalWrite(builder);
+    }
     return builder.build();
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 116bd98601c..dbeb66f8ae3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -151,6 +151,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
   public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 
"hbase.wal.shutdown.wait.timeout.ms";
   public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
 
+  public static final String WAL_AVOID_LOCAL_WRITES_KEY =
+    "hbase.regionserver.wal.avoid-local-writes";
+  public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;
+
   /**
    * file system instance
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 8437fef3bc2..5a59ac66939 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static 
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT;
+import static 
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY;
 import static 
org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
 import static 
org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
 
@@ -172,8 +174,10 @@ public abstract class AbstractProtobufLogWriter {
       int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
       short replication = (short) 
conf.getInt("hbase.regionserver.hlog.replication",
         CommonFSUtils.getDefaultReplication(fs, path));
+      boolean noLocalWrite =
+        conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, 
WAL_AVOID_LOCAL_WRITES_DEFAULT);
 
-      initOutput(fs, path, overwritable, bufferSize, replication, blocksize, 
monitor);
+      initOutput(fs, path, overwritable, bufferSize, replication, blocksize, 
monitor, noLocalWrite);
 
       boolean doTagCompress =
         doCompress && 
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
@@ -265,7 +269,7 @@ public abstract class AbstractProtobufLogWriter {
   }
 
   protected abstract void initOutput(FileSystem fs, Path path, boolean 
overwritable, int bufferSize,
-    short replication, long blockSize, StreamSlowMonitor monitor)
+    short replication, long blockSize, StreamSlowMonitor monitor, boolean 
noLocalWrite)
     throws IOException, StreamLacksCapabilityException;
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 44affaf734a..95b4e2d3a3a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -187,10 +187,10 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, 
int bufferSize,
-    short replication, long blockSize, StreamSlowMonitor monitor)
+    short replication, long blockSize, StreamSlowMonitor monitor, boolean 
noLocalWrite)
     throws IOException, StreamLacksCapabilityException {
     this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, 
false, replication,
-      blockSize, eventLoopGroup, channelClass, monitor);
+      blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index a9f2a0a3636..400187dad07 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -98,10 +98,10 @@ public class ProtobufLogWriter extends 
AbstractProtobufLogWriter implements FSHL
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, 
int bufferSize,
-    short replication, long blockSize, StreamSlowMonitor monitor)
+    short replication, long blockSize, StreamSlowMonitor monitor, boolean 
noLocalWrite)
     throws IOException, StreamLacksCapabilityException {
-    this.output =
-      CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, 
replication, blockSize, false);
+    this.output = CommonFSUtils.createForWal(fs, path, overwritable, 
bufferSize, replication,
+      blockSize, noLocalWrite, false);
     if 
(fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) 
{
       if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
         throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);

Reply via email to