Fix archiving of pv2 WAL files

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53b865aa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53b865aa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53b865aa

Branch: refs/heads/HBASE-14614
Commit: 53b865aa9922143e2e95af09a73814a9e7d60047
Parents: 007d0d2
Author: Michael Stack <st...@apache.org>
Authored: Fri May 12 13:02:32 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Tue May 23 08:36:53 2017 -0700

----------------------------------------------------------------------
 .../procedure2/store/wal/ProcedureWALFile.java  | 28 ++++++++++----------
 .../procedure2/store/wal/WALProcedureStore.java | 28 +++++++++++++-------
 .../org/apache/hadoop/hbase/master/HMaster.java |  4 ++-
 .../assignment/TestAssignmentManager.java       |  2 +-
 4 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/53b865aa/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 2221cfc..42abe8f 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
@@ -156,22 +155,23 @@ public class ProcedureWALFile implements 
Comparable<ProcedureWALFile> {
     this.logSize += size;
   }
 
-  public void removeFile() throws IOException {
+  public void removeFile(final Path walArchiveDir) throws IOException {
     close();
-    // TODO: FIX THIS. MAKE THIS ARCHIVE FORMAL.
-    Path archiveDir =
-        new Path(logFile.getParent().getParent(), 
HConstants.HFILE_ARCHIVE_DIRECTORY);
-    try {
-      fs.mkdirs(archiveDir);
-    } catch (IOException ioe) {
-      LOG.warn("Making " + archiveDir, ioe);
+    boolean archived = false;
+    if (walArchiveDir != null) {
+      Path archivedFile = new Path(walArchiveDir, logFile.getName());
+      LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + 
logFile + " to " + walArchiveDir);
+      if (!fs.rename(logFile, archivedFile)) {
+        LOG.warn("Failed archive of " + logFile + ", deleting");
+      } else {
+        archived = true;
+      }
     }
-    Path archivedFile = new Path(archiveDir, logFile.getName());
-    LOG.info("ARCHIVED WAL (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + 
logFile + " to " + archivedFile);
-    if (!fs.rename(logFile, archivedFile)) {
-      LOG.warn("Failed archive of " + logFile);
+    if (!archived) {
+      if (!fs.delete(logFile, false)) {
+        LOG.warn("Failed delete of " + logFile);
+      }
     }
-    // fs.delete(logFile, false);
   }
 
   public void setProcIds(long minId, long maxId) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53b865aa/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 300e023..df818fe 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -124,6 +124,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private final Configuration conf;
   private final FileSystem fs;
   private final Path walDir;
+  private final Path walArchiveDir;
 
   private final AtomicReference<Throwable> syncException = new 
AtomicReference<>();
   private final AtomicBoolean loading = new AtomicBoolean(true);
@@ -185,9 +186,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   public WALProcedureStore(final Configuration conf, final FileSystem fs, 
final Path walDir,
       final LeaseRecovery leaseRecovery) {
+    this(conf, fs, walDir, null, leaseRecovery);
+  }
+
+  public WALProcedureStore(final Configuration conf, final FileSystem fs, 
final Path walDir,
+      final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
     this.fs = fs;
     this.conf = conf;
     this.walDir = walDir;
+    this.walArchiveDir = walArchiveDir;
     this.leaseRecovery = leaseRecovery;
   }
 
@@ -343,7 +350,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Someone else created new logs. Expected maxLogId < " + 
flushLogId);
           }
-          logs.getLast().removeFile();
+          logs.getLast().removeFile(this.walArchiveDir);
           continue;
         }
 
@@ -955,7 +962,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // but we should check if someone else has created new files
     if (getMaxLogId(getLogFiles()) > flushLogId) {
       LOG.warn("Someone else created new logs. Expected maxLogId < " + 
flushLogId);
-      logs.getLast().removeFile();
+      logs.getLast().removeFile(this.walArchiveDir);
       return false;
     }
 
@@ -1047,7 +1054,7 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
     // We keep track of which procedures are holding the oldest WAL in 
'holdingCleanupTracker'.
     // once there is nothing olding the oldest WAL we can remove it.
     while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
-      removeLogFile(logs.getFirst());
+      removeLogFile(logs.getFirst(), walArchiveDir);
       buildHoldingCleanupTracker();
     }
 
@@ -1089,7 +1096,7 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
       if (lastLogId < log.getLogId()) {
         break;
       }
-      removeLogFile(log);
+      removeLogFile(log, walArchiveDir);
       removed = true;
     }
 
@@ -1098,12 +1105,12 @@ public class WALProcedureStore extends 
ProcedureStoreBase {
     }
   }
 
-  private boolean removeLogFile(final ProcedureWALFile log) {
+  private boolean removeLogFile(final ProcedureWALFile log, final Path 
walArchiveDir) {
     try {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Removing log=" + log);
       }
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       logs.remove(log);
       if (LOG.isDebugEnabled()) {
         LOG.info("Removed log=" + log + " activeLogs=" + logs);
@@ -1192,7 +1199,7 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
         }
 
         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
-        ProcedureWALFile log = initOldLog(logFiles[i]);
+        ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
         if (log != null) {
           this.logs.add(log);
         }
@@ -1222,11 +1229,12 @@ public class WALProcedureStore extends 
ProcedureStoreBase {
   /**
    * Loads given log file and it's tracker.
    */
-  private ProcedureWALFile initOldLog(final FileStatus logFile) throws 
IOException {
+  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path 
walArchiveDir)
+  throws IOException {
     final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
     if (logFile.getLen() == 0) {
       LOG.warn("Remove uninitialized log: " + logFile);
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       return null;
     }
     if (LOG.isDebugEnabled()) {
@@ -1236,7 +1244,7 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
       log.open();
     } catch (ProcedureWALFormat.InvalidWALDataException e) {
       LOG.warn("Remove uninitialized log: " + logFile, e);
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       return null;
     } catch (IOException e) {
       String msg = "Unable to read state log: " + logFile;

http://git-wip-us.apache.org/repos/asf/hbase/blob/53b865aa/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 4600fe2..a07c436 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1163,6 +1163,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
     final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
+    final Path archiveWalDir = new Path(new 
Path(FSUtils.getWALRootDir(this.conf),
+        HConstants.HFILE_ARCHIVE_DIRECTORY), 
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
 
     final FileSystem walFs = walDir.getFileSystem(conf);
 
@@ -1176,7 +1178,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     FSUtils.setStoragePolicy(walFs, conf, walDir, 
HConstants.WAL_STORAGE_POLICY,
       HConstants.DEFAULT_WAL_STORAGE_POLICY);
 
-    procedureStore = new WALProcedureStore(conf, walFs, walDir,
+    procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), 
walDir, archiveWalDir,
         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
     procedureStore.registerListener(new 
MasterProcedureEnv.MasterProcedureStoreListener(this));
     MasterProcedureScheduler procedureScheduler = 
procEnv.getProcedureScheduler();

http://git-wip-us.apache.org/repos/asf/hbase/blob/53b865aa/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 9afb63f..dda41e0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -121,7 +121,7 @@ public class TestAssignmentManager {
     conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
     conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 
PROC_NTHREADS);
     conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
-    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 5);
+    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so 
we succeed eventually.
   }
 
   @Before

Reply via email to