HBASE-20723 Custom hbase.wal.dir results in data loss because we write 
recovered edits into a different place than where the recovering region server 
looks for them


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ac5bb815
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ac5bb815
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ac5bb815

Branch: refs/heads/HBASE-20331
Commit: ac5bb8155b618194fe9cf1131f0e72c99b7b534c
Parents: 0825462
Author: tedyu <yuzhih...@gmail.com>
Authored: Fri Jun 15 19:40:48 2018 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Jun 15 19:40:48 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/wal/FSHLogProvider.java |  3 +-
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 55 +++++++++++---------
 .../apache/hadoop/hbase/wal/TestWALFactory.java |  4 +-
 .../apache/hadoop/hbase/wal/TestWALSplit.java   |  8 +--
 4 files changed, 39 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ac5bb815/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index efcd377..44f692d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -73,7 +73,8 @@ public class FSHLogProvider extends 
AbstractFSWALProvider<FSHLog> {
     Writer writer = null;
     try {
       writer = logWriterClass.getDeclaredConstructor().newInstance();
-      writer.init(fs, path, conf, overwritable, blocksize);
+      FileSystem rootFs = FileSystem.get(path.toUri(), conf);
+      writer.init(rootFs, path, conf, overwritable, blocksize);
       return writer;
     } catch (Exception e) { 
       if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ac5bb815/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index dee5fb0..f020e7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -115,7 +115,7 @@ public class WALSplitter {
   public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
 
   // Parameters for split process
-  protected final Path rootDir;
+  protected final Path walDir;
   protected final FileSystem fs;
   protected final Configuration conf;
 
@@ -148,14 +148,14 @@ public class WALSplitter {
 
 
   @VisibleForTesting
-  WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
+  WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
       FileSystem fs, LastSequenceId idChecker,
       SplitLogWorkerCoordination splitLogWorkerCoordination) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName = conf
         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, 
WALCellCodec.class.getName());
     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
-    this.rootDir = rootDir;
+    this.walDir = walDir;
     this.fs = fs;
     this.sequenceIdChecker = idChecker;
     this.splitLogWorkerCoordination = splitLogWorkerCoordination;
@@ -186,11 +186,11 @@ public class WALSplitter {
    * <p>
    * @return false if it is interrupted by the progress-able.
    */
-  public static boolean splitLogFile(Path rootDir, FileStatus logfile, 
FileSystem fs,
+  public static boolean splitLogFile(Path walDir, FileStatus logfile, 
FileSystem fs,
       Configuration conf, CancelableProgressable reporter, LastSequenceId 
idChecker,
       SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory 
factory)
       throws IOException {
-    WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
+    WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker,
         splitLogWorkerCoordination);
     return s.splitLogFile(logfile, reporter);
   }
@@ -322,10 +322,10 @@ public class WALSplitter {
       LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
       if (splitLogWorkerCoordination != null) {
         // Some tests pass in a csm of null.
-        splitLogWorkerCoordination.markCorrupted(rootDir, 
logfile.getPath().getName(), fs);
+        splitLogWorkerCoordination.markCorrupted(walDir, 
logfile.getPath().getName(), fs);
       } else {
         // for tests only
-        ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
+        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs);
       }
       isCorrupted = true;
     } catch (IOException e) {
@@ -457,18 +457,19 @@ public class WALSplitter {
    * <code>logEntry</code>: e.g. 
/hbase/some_table/2323432434/recovered.edits/2332.
    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
    * creating it if necessary.
-   * @param fs
    * @param logEntry
-   * @param rootDir HBase root dir.
    * @param fileNameBeingSplit the file being split currently. Used to 
generate tmp file name.
+   * @param conf
    * @return Path to file into which to dump split log edits.
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
   @VisibleForTesting
-  static Path getRegionSplitEditsPath(final FileSystem fs,
-      final Entry logEntry, final Path rootDir, String fileNameBeingSplit)
+  static Path getRegionSplitEditsPath(final Entry logEntry, String 
fileNameBeingSplit,
+      Configuration conf)
   throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path rootDir = FSUtils.getRootDir(conf);
     Path tableDir = FSUtils.getTableDir(rootDir, 
logEntry.getKey().getTableName());
     String encodedRegionName = 
Bytes.toString(logEntry.getKey().getEncodedRegionName());
     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
@@ -1260,7 +1261,8 @@ public class WALSplitter {
     }
 
     // delete the one with fewer wal entries
-    private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws 
IOException {
+    private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath 
wap, Path dst)
+        throws IOException {
       long dstMinLogSeqNum = -1L;
       try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
         WAL.Entry entry = reader.next();
@@ -1281,8 +1283,8 @@ public class WALSplitter {
         }
       } else {
         LOG.warn("Found existing old edits file and we have less entries. 
Deleting " + wap.p
-            + ", length=" + fs.getFileStatus(wap.p).getLen());
-        if (!fs.delete(wap.p, false)) {
+            + ", length=" + rootFs.getFileStatus(wap.p).getLen());
+        if (!rootFs.delete(wap.p, false)) {
           LOG.warn("Failed deleting of {}", wap.p);
           throw new IOException("Failed deleting of " + wap.p);
         }
@@ -1367,6 +1369,7 @@ public class WALSplitter {
     Path closeWriter(String encodedRegionName, WriterAndPath wap,
         List<IOException> thrown) throws IOException{
       LOG.trace("Closing {}", wap.p);
+      FileSystem rootFs = FileSystem.get(conf);
       try {
         wap.w.close();
       } catch (IOException ioe) {
@@ -1381,7 +1384,7 @@ public class WALSplitter {
       }
       if (wap.editsWritten == 0) {
         // just remove the empty recovered.edits file
-        if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+        if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) {
           LOG.warn("Failed deleting empty {}", wap.p);
           throw new IOException("Failed deleting empty  " + wap.p);
         }
@@ -1391,14 +1394,14 @@ public class WALSplitter {
       Path dst = getCompletedRecoveredEditsFilePath(wap.p,
           regionMaximumEditLogSeqNum.get(encodedRegionName));
       try {
-        if (!dst.equals(wap.p) && fs.exists(dst)) {
-          deleteOneWithFewerEntries(wap, dst);
+        if (!dst.equals(wap.p) && rootFs.exists(dst)) {
+          deleteOneWithFewerEntries(rootFs, wap, dst);
         }
         // Skip the unit tests which create a splitter that reads and
         // writes the data without touching disk.
         // TestHLogSplit#testThreading is an example.
-        if (fs.exists(wap.p)) {
-          if (!fs.rename(wap.p, dst)) {
+        if (rootFs.exists(wap.p)) {
+          if (!rootFs.rename(wap.p, dst)) {
             throw new IOException("Failed renaming " + wap.p + " to " + dst);
           }
           LOG.info("Rename {} to {}", wap.p, dst);
@@ -1470,7 +1473,7 @@ public class WALSplitter {
       if (blacklistedRegions.contains(region)) {
         return null;
       }
-      ret = createWAP(region, entry, rootDir);
+      ret = createWAP(region, entry);
       if (ret == null) {
         blacklistedRegions.add(region);
         return null;
@@ -1484,16 +1487,18 @@ public class WALSplitter {
     /**
      * @return a path with a write for that path. caller should close.
      */
-    WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws 
IOException {
-      Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, 
fileBeingSplit.getPath().getName());
+    WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
+      Path regionedits = getRegionSplitEditsPath(entry,
+          fileBeingSplit.getPath().getName(), conf);
       if (regionedits == null) {
         return null;
       }
-      if (fs.exists(regionedits)) {
+      FileSystem rootFs = FileSystem.get(conf);
+      if (rootFs.exists(regionedits)) {
         LOG.warn("Found old edits file. It could be the "
             + "result of a previous failed split attempt. Deleting " + 
regionedits + ", length="
-            + fs.getFileStatus(regionedits).getLen());
-        if (!fs.delete(regionedits, false)) {
+            + rootFs.getFileStatus(regionedits).getLen());
+        if (!rootFs.delete(regionedits, false)) {
           LOG.warn("Failed delete of old {}", regionedits);
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ac5bb815/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index b1fe67b..fd2b3c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -126,6 +127,7 @@ public class TestWALFactory {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new 
Path("file:///tmp/wal"));
     // Make block sizes small.
     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
     // needed for testAppendClose()
@@ -176,7 +178,7 @@ public class TestWALFactory {
     final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl(1);
     final int howmany = 3;
     RegionInfo[] infos = new RegionInfo[3];
-    Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName);
+    Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
     fs.mkdirs(tabledir);
     for (int i = 0; i < howmany; i++) {
       infos[i] = 
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))

http://git-wip-us.apache.org/repos/asf/hbase/blob/ac5bb815/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 030c99f..0d5aa0d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -390,8 +390,8 @@ public class TestWALSplit {
         new Entry(new WALKeyImpl(encoded,
             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
             new WALEdit());
-    Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
-        FILENAME_BEING_SPLIT);
+    Path p = WALSplitter.getRegionSplitEditsPath(entry,
+        FILENAME_BEING_SPLIT, conf);
     String parentOfParent = p.getParent().getParent().getName();
     assertEquals(parentOfParent, 
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
   }
@@ -416,8 +416,8 @@ public class TestWALSplit {
     assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
     fs.createNewFile(parent); // create a recovered.edits file
 
-    Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
-        FILENAME_BEING_SPLIT);
+    Path p = WALSplitter.getRegionSplitEditsPath(entry,
+        FILENAME_BEING_SPLIT, conf);
     String parentOfParent = p.getParent().getParent().getName();
     assertEquals(parentOfParent, 
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
     WALFactory.createRecoveredEditsWriter(fs, p, conf).close();

Reply via email to