This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 0f6a7f99dcc HBASE-27230 RegionServer should be aborted when WAL.sync 
throws TimeoutIOException (#4641) (#5711)
0f6a7f99dcc is described below

commit 0f6a7f99dcc1ee87c2066d2ed1d5d375e3e0fe24
Author: jhungund <106576553+jhung...@users.noreply.github.com>
AuthorDate: Thu Mar 7 18:41:19 2024 +0530

    HBASE-27230 RegionServer should be aborted when WAL.sync throws 
TimeoutIOException (#4641) (#5711)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Co-authored-by: chenglei <cheng...@apache.org>
    (cherry picked from commit 3c811c8d82cfb756c0e8126d8e2e6b0fecf80469)
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  22 ++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   3 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   4 +-
 .../wal/WALSyncTimeoutIOException.java             |  48 +++++
 .../hadoop/hbase/regionserver/wal/WALUtil.java     |   6 +
 .../hadoop/hbase/wal/AsyncFSWALProvider.java       |  10 +-
 .../main/java/org/apache/hadoop/hbase/wal/WAL.java |   5 +
 .../hadoop/hbase/master/TestWarmupRegion.java      |   4 -
 .../wal/TestWALSyncTimeoutException.java           | 202 +++++++++++++++++++++
 9 files changed, 291 insertions(+), 13 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a46204ddab7..6b27d124d6c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -158,6 +158,7 @@ import 
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControl
 import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
@@ -1372,7 +1373,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return this.fs.getRegionInfo();
   }
 
-  /** Returns Instance of {@link RegionServerServices} used by this HRegion. 
Can be null. */
+  /**
+   * Returns Instance of {@link RegionServerServices} used by this HRegion. 
Can be null.
+   */
   RegionServerServices getRegionServerServices() {
     return this.rsServices;
   }
@@ -3661,7 +3664,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
      * @param familyMap Map of Cells by family
      */
     protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
-      MemStoreSizing memstoreAccounting) throws IOException {
+      MemStoreSizing memstoreAccounting) {
       for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
         byte[] family = e.getKey();
         List<Cell> cells = e.getValue();
@@ -5083,7 +5086,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @see #applyToMemStore(HStore, Cell, MemStoreSizing)
    */
   private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
-    MemStoreSizing memstoreAccounting) throws IOException {
+    MemStoreSizing memstoreAccounting) {
     // Any change in how we update Store/MemStore needs to also be done in 
other applyToMemStore!!!!
     boolean upsert = delta && 
store.getColumnFamilyDescriptor().getMaxVersions() == 1;
     if (upsert) {
@@ -7887,6 +7890,19 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       if (walKey != null && walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
       }
+
+      /**
+       * If {@link WAL#sync} get a timeout exception, the only correct way is 
to abort the region
+       * server, as the design of {@link WAL#sync}, is to succeed or die, 
there is no 'failure'. It
+       * is usually not a big deal is because we set a very large default 
value(5 minutes) for
+       * {@link AbstractFSWAL#WAL_SYNC_TIMEOUT_MS}, usually the WAL system 
will abort the region
+       * server if it can not finish the sync within 5 minutes.
+       */
+      if (ioe instanceof WALSyncTimeoutIOException) {
+        if (rsServices != null) {
+          rsServices.abort("WAL sync timeout,forcing server shutdown", ioe);
+        }
+      }
       throw ioe;
     }
     return writeEntry;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9e9c2d1b19c..5f947993035 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1910,8 +1910,7 @@ public class HStore
    * across all of them.
    * @param readpoint readpoint below which we can safely remove duplicate KVs
    */
-  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing 
memstoreSizing)
-    throws IOException {
+  public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing 
memstoreSizing) {
     this.storeEngine.readLock();
     try {
       this.memstore.upsert(cells, readpoint, memstoreSizing);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index dc244d7abac..d13dfc83a9e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -139,7 +139,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
     "hbase.regionserver.wal.slowsync.roll.interval.ms";
   protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; 
// in ms, 1 minute
 
-  protected static final String WAL_SYNC_TIMEOUT_MS = 
"hbase.regionserver.wal.sync.timeout";
+  public static final String WAL_SYNC_TIMEOUT_MS = 
"hbase.regionserver.wal.sync.timeout";
   protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // 
in ms, 5min
 
   public static final String WAL_ROLL_MULTIPLIER = 
"hbase.regionserver.logroll.multiplier";
@@ -871,7 +871,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
         }
       }
     } catch (TimeoutIOException tioe) {
-      throw tioe;
+      throw new WALSyncTimeoutIOException(tioe);
     } catch (InterruptedException ie) {
       LOG.warn("Interrupted", ie);
       throw convertInterruptedExceptionToIOException(ie);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSyncTimeoutIOException.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSyncTimeoutIOException.java
new file mode 100644
index 00000000000..8cef9c9dfd3
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSyncTimeoutIOException.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Thrown when {@link WAL#sync} timeout.
+ */
+@InterfaceAudience.Private
+public class WALSyncTimeoutIOException extends HBaseIOException {
+
+  private static final long serialVersionUID = 5067699288291906985L;
+
+  public WALSyncTimeoutIOException() {
+    super();
+  }
+
+  public WALSyncTimeoutIOException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public WALSyncTimeoutIOException(String message) {
+    super(message);
+  }
+
+  public WALSyncTimeoutIOException(Throwable cause) {
+    super(cause);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index cf41beb1074..dd8d152da8b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -173,6 +173,12 @@ public class WALUtil {
       if (walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
       }
+      /**
+       * Here we do not abort the RegionServer for {@link 
WALSyncTimeoutIOException} as
+       * {@link HRegion#doWALAppend} does,because WAL Marker just records the 
internal state and
+       * seems it is no need to always abort the RegionServer when {@link 
WAL#sync} timeout,it is
+       * the internal state transition that determines whether RegionServer is 
aborted or not.
+       */
       throw ioe;
     }
     return walKey;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 4c7b0f88893..d4b7229617c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -65,9 +65,15 @@ public class AsyncFSWALProvider extends 
AbstractFSWALProvider<AsyncFSWAL> {
       StreamSlowMonitor monitor) throws IOException, 
CommonFSUtils.StreamLacksCapabilityException;
   }
 
-  private EventLoopGroup eventLoopGroup;
+  /**
+   * Protected visibility for used in tests.
+   */
+  protected EventLoopGroup eventLoopGroup;
 
-  private Class<? extends Channel> channelClass;
+  /**
+   * Protected visibility for used in tests.
+   */
+  protected Class<? extends Channel> channelClass;
 
   @Override
   protected AsyncFSWAL createWAL() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 79db2a678a2..1d1f8a51909 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -136,18 +137,21 @@ public interface WAL extends Closeable, 
WALFileLengthProvider {
 
   /**
    * Sync what we have in the WAL.
+   * @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
    */
   void sync() throws IOException;
 
   /**
    * Sync the WAL if the txId was not already sync'd.
    * @param txid Transaction id to sync to.
+   * @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
    */
   void sync(long txid) throws IOException;
 
   /**
    * @param forceSync Flag to force sync rather than flushing to the buffer. 
Example - Hadoop hflush
    *                  vs hsync.
+   * @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
    */
   default void sync(boolean forceSync) throws IOException {
     sync();
@@ -157,6 +161,7 @@ public interface WAL extends Closeable, 
WALFileLengthProvider {
    * @param txid      Transaction id to sync to.
    * @param forceSync Flag to force sync rather than flushing to the buffer. 
Example - Hadoop hflush
    *                  vs hsync.
+   * @throws when timeout, it would throw {@link WALSyncTimeoutIOException}.
    */
   default void sync(long txid, boolean forceSync) throws IOException {
     sync(txid);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestWarmupRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestWarmupRegion.java
index 0f07a458115..21b192caee8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestWarmupRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestWarmupRegion.java
@@ -21,7 +21,6 @@ import static 
org.apache.hadoop.hbase.regionserver.HRegion.warmupHRegion;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -55,7 +54,6 @@ import org.slf4j.LoggerFactory;
  * named for the method and does its stuff against that.
  */
 @Category({ MasterTests.class, LargeTests.class })
-@SuppressWarnings("deprecation")
 public class TestWarmupRegion {
 
   @ClassRule
@@ -67,7 +65,6 @@ public class TestWarmupRegion {
   protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private static byte[] ROW = Bytes.toBytes("testRow");
   private static byte[] FAMILY = Bytes.toBytes("testFamily");
-  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
   private static byte[] VALUE = Bytes.toBytes("testValue");
   private static byte[] COLUMN = Bytes.toBytes("column");
   private static int numRows = 10000;
@@ -80,7 +77,6 @@ public class TestWarmupRegion {
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
     TEST_UTIL.startMiniCluster(SLAVES);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALSyncTimeoutException.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALSyncTimeoutException.java
new file mode 100644
index 00000000000..7bfb5374436
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALSyncTimeoutException.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestWALSyncTimeoutException {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWALSyncTimeoutException.class);
+
+  private static final byte[] FAMILY = Bytes.toBytes("family_test");
+
+  private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
+
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
+  private static TableName tableName = 
TableName.valueOf("TestWALSyncTimeoutException");
+  private static volatile boolean testWALTimout = false;
+  private static final long timeoutMIlliseconds = 3000;
+  private static final String USER_THREAD_NAME = tableName.getNameAsString();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
+    conf.setClass(WALFactory.WAL_PROVIDER, SlowAsyncFSWALProvider.class, 
WALProvider.class);
+    conf.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, timeoutMIlliseconds);
+    
HTU.startMiniCluster(StartMiniClusterOption.builder().numRegionServers(1).build());
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  /**
+   * This test is for HBASE-27230. When {@link WAL#sync} timeout, it would 
throws
+   * {@link WALSyncTimeoutIOException},and when {@link HRegion#doWALAppend} 
catches this exception
+   * it aborts the RegionServer.
+   */
+  @Test
+  public void testWALSyncWriteException() throws Exception {
+    final HRegionForTest region = this.createTable();
+
+    String oldThreadName = Thread.currentThread().getName();
+    Thread.currentThread().setName(USER_THREAD_NAME);
+    try {
+      byte[] rowKey1 = Bytes.toBytes(1);
+      byte[] value1 = Bytes.toBytes(3);
+      Thread.sleep(2000);
+      testWALTimout = true;
+
+      /**
+       * The {@link WAL#sync} would timeout and throws {@link 
WALSyncTimeoutIOException},when
+       * {@link HRegion#doWALAppend} catches this exception it aborts the 
RegionServer.
+       */
+      try {
+        region.put(new Put(rowKey1).addColumn(FAMILY, QUAL, value1));
+        fail();
+      } catch (WALSyncTimeoutIOException e) {
+        assertTrue(e != null);
+      }
+      assertTrue(region.getRSServices().isAborted());
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+      testWALTimout = false;
+    }
+  }
+
+  private HRegionForTest createTable() throws Exception {
+    TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
+    HTU.getAdmin().createTable(tableDescriptor);
+    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
+    return (HRegionForTest) rs.getRegions(tableName).get(0);
+  }
+
+  public static final class HRegionForTest extends HRegion {
+
+    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration 
confParam,
+      TableDescriptor htd, RegionServerServices rsServices) {
+      super(fs, wal, confParam, htd, rsServices);
+    }
+
+    @SuppressWarnings("deprecation")
+    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration 
confParam,
+      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices 
rsServices) {
+      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
+    }
+
+    public RegionServerServices getRSServices() {
+      return this.rsServices;
+    }
+
+  }
+
+  public static class SlowAsyncFSWAL extends AsyncFSWAL {
+
+    public SlowAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, 
String logDir,
+      String archiveDir, Configuration conf, List<WALActionsListener> 
listeners,
+      boolean failIfWALExists, String prefix, String suffix, EventLoopGroup 
eventLoopGroup,
+      Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
+      throws FailedLogCloseException, IOException {
+      super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
+        suffix, eventLoopGroup, channelClass, monitor);
+
+    }
+
+    public SlowAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String 
archiveDir,
+      Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
+      String prefix, String suffix, EventLoopGroup eventLoopGroup,
+      Class<? extends Channel> channelClass) throws FailedLogCloseException, 
IOException {
+      super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, 
prefix, suffix,
+        eventLoopGroup, channelClass);
+
+    }
+
+    @Override
+    protected void atHeadOfRingBufferEventHandlerAppend() {
+      if (testWALTimout) {
+        try {
+          Thread.sleep(timeoutMIlliseconds + 1000);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      super.atHeadOfRingBufferEventHandlerAppend();
+    }
+
+  }
+
+  public static class SlowAsyncFSWALProvider extends AsyncFSWALProvider {
+
+    @Override
+    protected AsyncFSWAL createWAL() throws IOException {
+      return new SlowAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), 
this.abortable,
+        CommonFSUtils.getWALRootDir(conf), 
getWALDirectoryName(factory.getFactoryId()),
+        getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, 
listeners, true, logPrefix,
+        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, 
eventLoopGroup,
+        channelClass, 
factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
+    }
+
+  }
+}

Reply via email to