This is an automated email from the ASF dual-hosted git repository. bbeaudreault pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push: new d64ea33e25d HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5762) d64ea33e25d is described below commit d64ea33e25d368a191af0573a0e2416790c217e3 Author: Bryan Beaudreault <bbeaudrea...@apache.org> AuthorDate: Tue Mar 12 08:08:53 2024 -0400 HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5762) Contributed-by: Charles Connell <char...@connells.org> Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org> Signed-off-by: Duo Zhang <zhang...@apache.org> Signed-off-by: Wei-Chiu Chuang <weic...@apache.org> --- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 5 ++-- .../FanOutOneBlockAsyncDFSOutputHelper.java | 16 +++++++---- .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 17 +++++------ .../TestFanOutOneBlockAsyncDFSOutputHang.java | 2 +- .../hbase/io/asyncfs/TestLocalAsyncOutput.java | 2 +- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 2 +- .../apache/hadoop/hbase/util/CommonFSUtils.java | 33 ++++++++++++++++++++-- .../hbase/regionserver/wal/AbstractFSWAL.java | 4 +++ .../wal/AbstractProtobufLogWriter.java | 8 ++++-- .../regionserver/wal/AsyncProtobufLogWriter.java | 4 +-- .../hbase/regionserver/wal/ProtobufLogWriter.java | 6 ++-- 11 files changed, 71 insertions(+), 28 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index a530ca4a2a0..cbb0648f3af 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -48,11 +48,12 @@ public final class AsyncFSOutputHelper { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, - Class<? extends Channel> channelClass, StreamSlowMonitor monitor) + Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, - overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor); + overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor, + noLocalWrite); } final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index b7bfd715bd0..7c2ba4ac999 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -503,7 +503,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } } - private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) { + private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite, + boolean noLocalWrite) { List<CreateFlag> flags = new ArrayList<>(); flags.add(CreateFlag.CREATE); if (overwrite) { @@ -512,13 +513,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { if (SHOULD_REPLICATE_FLAG != null) { flags.add(SHOULD_REPLICATE_FLAG); } + if (noLocalWrite) { + flags.add(CreateFlag.NO_LOCAL_WRITE); + } return new EnumSetWritable<>(EnumSet.copyOf(flags)); } private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor) - throws IOException { + EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor, + boolean noLocalWrite) throws IOException { Configuration conf = dfs.getConf(); DFSClient client = dfs.getClient(); String clientName = client.getClientName(); @@ -535,7 +539,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { try { stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - getCreateFlags(overwrite), createParent, replication, blockSize, + getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize, CryptoProtocolVersion.supported()); } catch (Exception e) { if (e instanceof RemoteException) { @@ -621,14 +625,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, - final StreamSlowMonitor monitor) throws IOException { + final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException { return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); } @Override diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 68b8bfa3d9f..f0910684edd 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -141,7 +141,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); writeAndVerify(FS, f, out); } @@ -154,7 +154,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[10]; Bytes.random(b); out.write(b, 0, b.length); @@ -183,7 +183,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -198,7 +198,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); fail("should fail with parent does not exist"); } catch (RemoteException e) { LOG.info("expected exception caught", e); @@ -220,8 +220,9 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { DataNodeProperties dnProp = CLUSTER.stopDataNode(0); Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) { + try (FanOutOneBlockAsyncDFSOutput output = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -251,7 +252,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) { + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); @@ -266,7 +267,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[50 * 1024 * 1024]; Bytes.random(b); out.write(b); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java index 77752789dbb..7f6535a93a9 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java @@ -98,7 +98,7 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase { Path f = new Path("/testHang"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); } @AfterClass diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 992dde7e9cb..ec87316e5f1 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -65,7 +65,7 @@ public class TestLocalAsyncOutput { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, - fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR); + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 54a49ffebd9..e178d0960ea 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -258,7 +258,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 5d9c5fa9681..73bb6f38cd2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -733,6 +733,7 @@ public final class CommonFSUtils { private static final class DfsBuilderUtility { private static final Class<?> BUILDER; private static final Method REPLICATE; + private static final Method NO_LOCAL_WRITE; static { String builderName = @@ -748,14 +749,25 @@ public final class CommonFSUtils { if (builderClass != null) { try { replicateMethod = builderClass.getMethod("replicate"); - LOG.debug("Using builder API via reflection for DFS file creation."); + LOG.debug("Using builder API via reflection for DFS file creation replicate flag."); } catch (NoSuchMethodException e) { LOG.debug("Could not find replicate method on builder; will not set replicate when" + " creating output stream", e); } } + Method noLocalWriteMethod = null; + if (builderClass != null) { + try { + noLocalWriteMethod = builderClass.getMethod("noLocalWrite"); + LOG.debug("Using builder API via reflection for DFS file creation noLocalWrite flag."); + } catch (NoSuchMethodException e) { + LOG.debug("Could not find noLocalWrite method on builder; will not set noLocalWrite when" + + " creating output stream", e); + } + } BUILDER = builderClass; REPLICATE = replicateMethod; + NO_LOCAL_WRITE = noLocalWriteMethod; } /** @@ -771,6 +783,19 @@ public final class CommonFSUtils { } } } + + static void noLocalWrite(FSDataOutputStreamBuilder<?, ?> builder) { + if ( + BUILDER != null && NO_LOCAL_WRITE != null && BUILDER.isAssignableFrom(builder.getClass()) + ) { + try { + NO_LOCAL_WRITE.invoke(builder); + } catch (IllegalAccessException | InvocationTargetException e) { + // Should have caught this failure during initialization, so log full trace here + LOG.warn("Couldn't use reflection with builder API", e); + } + } + } } /** @@ -793,13 +818,17 @@ public final class CommonFSUtils { * Will not attempt to enable replication when passed an HFileSystem. */ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite, - int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { + int bufferSize, short replication, long blockSize, boolean noLocalWrite, boolean isRecursive) + throws IOException { FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite) .bufferSize(bufferSize).replication(replication).blockSize(blockSize); if (isRecursive) { builder.recursive(); } DfsBuilderUtility.replicate(builder); + if (noLocalWrite) { + DfsBuilderUtility.noLocalWrite(builder); + } return builder.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 116bd98601c..dbeb66f8ae3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -151,6 +151,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; + public static final String WAL_AVOID_LOCAL_WRITES_KEY = + "hbase.regionserver.wal.avoid-local-writes"; + public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; + /** * file system instance */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 8437fef3bc2..5a59ac66939 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; @@ -172,8 +174,10 @@ public abstract class AbstractProtobufLogWriter { int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", CommonFSUtils.getDefaultReplication(fs, path)); + boolean noLocalWrite = + conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); + initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -265,7 +269,7 @@ public abstract class AbstractProtobufLogWriter { } protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 44affaf734a..95b4e2d3a3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -187,10 +187,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index a9f2a0a3636..400187dad07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -98,10 +98,10 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter implements FSHL @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { - this.output = - CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, blockSize, false); + this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, + blockSize, noLocalWrite, false); if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { if (!output.hasCapability(StreamCapabilities.HFLUSH)) { throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);