HBASE-15763 Isolate Wal related stuff from MasterFileSystem

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a8a2c516
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a8a2c516
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a8a2c516

Branch: refs/heads/hbase-12439
Commit: a8a2c516a0752ff782f8338502c8b2005049eda3
Parents: 0e37063
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Wed May 4 07:59:44 2016 -0700
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Wed May 4 07:59:44 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/master/HMaster.java |  26 +-
 .../hadoop/hbase/master/MasterFileSystem.java   | 321 +----------------
 .../hadoop/hbase/master/MasterServices.java     |   5 +
 .../hadoop/hbase/master/MasterWalManager.java   | 351 +++++++++++++++++++
 .../hadoop/hbase/master/ServerManager.java      |   4 +-
 .../hadoop/hbase/master/SplitLogManager.java    |   8 +-
 .../master/procedure/ServerCrashProcedure.java  |  22 +-
 .../hbase/master/MockNoopMasterServices.java    |   5 +
 .../hadoop/hbase/master/TestCatalogJanitor.java |  11 +-
 .../master/TestDistributedLogSplitting.java     |  17 +-
 .../hbase/master/TestMasterFileSystem.java      |  56 +--
 .../hbase/master/TestMasterWalManager.java      | 104 ++++++
 12 files changed, 533 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2f1cd3c..d7f7c18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -270,6 +270,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   final MetricsMaster metricsMaster;
   // file system manager for the master FS operations
   private MasterFileSystem fileSystemManager;
+  private MasterWalManager walManager;
 
   // server manager to deal with region server info
   volatile ServerManager serverManager;
@@ -656,7 +657,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
     this.masterActiveTime = System.currentTimeMillis();
     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or 
Spring.
-    this.fileSystemManager = new MasterFileSystem(this, this);
+    this.fileSystemManager = new MasterFileSystem(this);
+    this.walManager = new MasterWalManager(this);
 
     // enable table descriptors cache
     this.tableDescriptors.setCacheOn();
@@ -715,7 +717,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     // we recover hbase:meta region servers inside master initialization and
     // handle other failed servers in SSH in order to start up master node ASAP
     Set<ServerName> previouslyFailedServers =
-      this.fileSystemManager.getFailedServersFromLogFolders();
+      this.walManager.getFailedServersFromLogFolders();
 
     // log splitting for hbase:meta server
     ServerName oldMetaServerLocation = 
metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
@@ -946,11 +948,11 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     // TODO: should we prevent from using state manager before meta was 
initialized?
     // tableStateManager.start();
 
-    if ((RecoveryMode.LOG_REPLAY == 
this.getMasterFileSystem().getLogRecoveryMode())
+    if ((RecoveryMode.LOG_REPLAY == 
this.getMasterWalManager().getLogRecoveryMode())
         && (!previouslyFailedMetaRSs.isEmpty())) {
       // replay WAL edits mode need new hbase:meta RS is assigned firstly
       status.setStatus("replaying log for Meta Region");
-      this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
+      this.walManager.splitMetaLog(previouslyFailedMetaRSs);
     }
 
     this.assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
@@ -985,14 +987,14 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   }
 
   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) 
throws IOException {
-    if (RecoveryMode.LOG_REPLAY == 
this.getMasterFileSystem().getLogRecoveryMode()) {
+    if (RecoveryMode.LOG_REPLAY == 
this.getMasterWalManager().getLogRecoveryMode()) {
       // In log replay mode, we mark hbase:meta region as recovering in ZK
       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
-      this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
+      this.walManager.prepareLogReplay(currentMetaServer, regions);
     } else {
       // In recovered.edits mode: create recovered edits file for hbase:meta 
server
-      this.fileSystemManager.splitMetaLog(currentMetaServer);
+      this.walManager.splitMetaLog(currentMetaServer);
     }
   }
 
@@ -1046,6 +1048,11 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   }
 
   @Override
+  public MasterWalManager getMasterWalManager() {
+    return this.walManager;
+  }
+
+  @Override
   public TableStateManager getTableStateManager() {
     return tableStateManager;
   }
@@ -1082,8 +1089,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 
1000);
    this.logCleaner =
       new LogCleaner(cleanerInterval,
-         this, conf, getMasterFileSystem().getFileSystem(),
-         getMasterFileSystem().getOldLogDir());
+         this, conf, getMasterWalManager().getFileSystem(),
+         getMasterWalManager().getOldLogDir());
     getChoreService().scheduleChore(logCleaner);
 
    //start the hfile archive cleaner thread
@@ -1132,6 +1139,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     if (this.activeMasterManager != null) this.activeMasterManager.stop();
     if (this.serverManager != null) this.serverManager.stop();
     if (this.assignmentManager != null) this.assignmentManager.stop();
+    if (this.walManager != null) this.walManager.stop();
     if (this.fileSystemManager != null) this.fileSystemManager.stop();
     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 43ae2f8..ad6e09d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -19,99 +19,56 @@
 package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
-import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.ipc.RemoteException;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class abstracts a bunch of operations the HMaster needs to interact 
with
- * the underlying file system, including splitting log files, checking file
+ * the underlying file system like creating the initial layout, checking file
  * system status, etc.
  */
 @InterfaceAudience.Private
 public class MasterFileSystem {
-  private static final Log LOG = 
LogFactory.getLog(MasterFileSystem.class.getName());
+  private static final Log LOG = LogFactory.getLog(MasterFileSystem.class);
+
   // HBase configuration
-  Configuration conf;
-  // master status
-  Server master;
-  // metrics for master
-  private final MetricsMasterFileSystem metricsMasterFilesystem = new 
MetricsMasterFileSystem();
+  private final Configuration conf;
   // Persisted unique cluster ID
   private ClusterId clusterId;
   // Keep around for convenience.
   private final FileSystem fs;
-  // Is the fileystem ok?
-  private volatile boolean fsOk = true;
-  // The Path to the old logs dir
-  private final Path oldLogDir;
   // root hbase directory on the FS
   private final Path rootdir;
   // hbase temp directory used for table construction and deletion
   private final Path tempdir;
-  // create the split log lock
-  final Lock splitLogLock = new ReentrantLock();
-  final boolean distributedLogReplay;
-  final SplitLogManager splitLogManager;
-  private final MasterServices services;
-
-  final static PathFilter META_FILTER = new PathFilter() {
-    @Override
-    public boolean accept(Path p) {
-      return DefaultWALProvider.isMetaFile(p);
-    }
-  };
 
-  final static PathFilter NON_META_FILTER = new PathFilter() {
-    @Override
-    public boolean accept(Path p) {
-      return !DefaultWALProvider.isMetaFile(p);
-    }
-  };
+  private final MasterServices services;
 
-  public MasterFileSystem(Server master, MasterServices services)
-  throws IOException {
-    this.conf = master.getConfiguration();
-    this.master = master;
+  public MasterFileSystem(MasterServices services) throws IOException {
+    this.conf = services.getConfiguration();
     this.services = services;
     // Set filesystem to be that of this.rootdir else we get complaints about
     // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
@@ -126,18 +83,8 @@ public class MasterFileSystem {
     // make sure the fs has the same conf
     fs.setConf(conf);
     // setup the filesystem variable
-    // set up the archived logs path
-    this.oldLogDir = createInitialFileSystemLayout();
+    createInitialFileSystemLayout();
     HFileSystem.addLocationsOrderInterceptor(conf);
-    this.splitLogManager =
-        new SplitLogManager(master, master.getConfiguration(), master, 
services,
-            master.getServerName());
-    this.distributedLogReplay = this.splitLogManager.isLogReplaying();
-  }
-
-  @VisibleForTesting
-  SplitLogManager getSplitLogManager() {
-    return this.splitLogManager;
   }
 
   /**
@@ -146,55 +93,23 @@ public class MasterFileSystem {
    * <li>Check if the meta region exists and is readable, if not create it.
    * Create hbase.version and the hbase:meta directory if not one.
    * </li>
-   * <li>Create a log archive directory for RS to put archived logs</li>
    * </ol>
    * Idempotent.
    */
-  private Path createInitialFileSystemLayout() throws IOException {
+  private void createInitialFileSystemLayout() throws IOException {
     // check if the root directory exists
     checkRootDir(this.rootdir, conf, this.fs);
 
     // check if temp directory exists and clean it
     checkTempDir(this.tempdir, conf, this.fs);
-
-    Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
-
-    // Make sure the region servers can archive their old logs
-    if(!this.fs.exists(oldLogDir)) {
-      this.fs.mkdirs(oldLogDir);
-    }
-
-    return oldLogDir;
   }
 
   public FileSystem getFileSystem() {
     return this.fs;
   }
 
-  /**
-   * Get the directory where old logs go
-   * @return the dir
-   */
-  public Path getOldLogDir() {
-    return this.oldLogDir;
-  }
-
-  /**
-   * Checks to see if the file system is still accessible.
-   * If not, sets closed
-   * @return false if file system is not available
-   */
-  public boolean checkFileSystem() {
-    if (this.fsOk) {
-      try {
-        FSUtils.checkFileSystemAvailable(this.fs);
-        FSUtils.checkDfsSafeMode(this.conf);
-      } catch (IOException e) {
-        master.abort("Shutting down HBase cluster: file system not available", 
e);
-        this.fsOk = false;
-      }
-    }
-    return this.fsOk;
+  public Configuration getConfiguration() {
+    return this.conf;
   }
 
   /**
@@ -219,197 +134,6 @@ public class MasterFileSystem {
   }
 
   /**
-   * Inspect the log directory to find dead servers which need recovery work
-   * @return A set of ServerNames which aren't running but still have WAL 
files left in file system
-   */
-  Set<ServerName> getFailedServersFromLogFolders() {
-    boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
-        WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
-
-    Set<ServerName> serverNames = new HashSet<ServerName>();
-    Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
-
-    do {
-      if (master.isStopped()) {
-        LOG.warn("Master stopped while trying to get failed servers.");
-        break;
-      }
-      try {
-        if (!this.fs.exists(logsDirPath)) return serverNames;
-        FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, 
null);
-        // Get online servers after getting log folders to avoid log folder 
deletion of newly
-        // checked in region servers . see HBASE-5916
-        Set<ServerName> onlineServers = ((HMaster) 
master).getServerManager().getOnlineServers()
-            .keySet();
-
-        if (logFolders == null || logFolders.length == 0) {
-          LOG.debug("No log files to split, proceeding...");
-          return serverNames;
-        }
-        for (FileStatus status : logFolders) {
-          FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, 
status.getPath(), null);
-          if (curLogFiles == null || curLogFiles.length == 0) {
-            // Empty log folder. No recovery needed
-            continue;
-          }
-          final ServerName serverName = 
DefaultWALProvider.getServerNameFromWALDirectoryName(
-              status.getPath());
-          if (null == serverName) {
-            LOG.warn("Log folder " + status.getPath() + " doesn't look like 
its name includes a " +
-                "region server name; leaving in place. If you see later errors 
about missing " +
-                "write ahead logs they may be saved in this location.");
-          } else if (!onlineServers.contains(serverName)) {
-            LOG.info("Log folder " + status.getPath() + " doesn't belong "
-                + "to a known region server, splitting");
-            serverNames.add(serverName);
-          } else {
-            LOG.info("Log folder " + status.getPath() + " belongs to an 
existing region server");
-          }
-        }
-        retrySplitting = false;
-      } catch (IOException ioe) {
-        LOG.warn("Failed getting failed servers to be recovered.", ioe);
-        if (!checkFileSystem()) {
-          LOG.warn("Bad Filesystem, exiting");
-          Runtime.getRuntime().halt(1);
-        }
-        try {
-          if (retrySplitting) {
-            
Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
-          }
-        } catch (InterruptedException e) {
-          LOG.warn("Interrupted, aborting since cannot return w/o splitting");
-          Thread.currentThread().interrupt();
-          retrySplitting = false;
-          Runtime.getRuntime().halt(1);
-        }
-      }
-    } while (retrySplitting);
-
-    return serverNames;
-  }
-
-  public void splitLog(final ServerName serverName) throws IOException {
-    Set<ServerName> serverNames = new HashSet<ServerName>();
-    serverNames.add(serverName);
-    splitLog(serverNames);
-  }
-
-  /**
-   * Specialized method to handle the splitting for meta WAL
-   * @param serverName
-   * @throws IOException
-   */
-  public void splitMetaLog(final ServerName serverName) throws IOException {
-    Set<ServerName> serverNames = new HashSet<ServerName>();
-    serverNames.add(serverName);
-    splitMetaLog(serverNames);
-  }
-
-  /**
-   * Specialized method to handle the splitting for meta WAL
-   * @param serverNames
-   * @throws IOException
-   */
-  public void splitMetaLog(final Set<ServerName> serverNames) throws 
IOException {
-    splitLog(serverNames, META_FILTER);
-  }
-
-  
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", 
justification=
-      "We only release this lock when we set it. Updates to code that uses it 
should verify use " +
-      "of the guard boolean.")
-  private List<Path> getLogDirs(final Set<ServerName> serverNames) throws 
IOException {
-    List<Path> logDirs = new ArrayList<Path>();
-    boolean needReleaseLock = false;
-    if (!this.services.isInitialized()) {
-      // during master initialization, we could have multiple places splitting 
a same wal
-      this.splitLogLock.lock();
-      needReleaseLock = true;
-    }
-    try {
-      for (ServerName serverName : serverNames) {
-        Path logDir = new Path(this.rootdir,
-            DefaultWALProvider.getWALDirectoryName(serverName.toString()));
-        Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
-        // Rename the directory so a rogue RS doesn't create more WALs
-        if (fs.exists(logDir)) {
-          if (!this.fs.rename(logDir, splitDir)) {
-            throw new IOException("Failed fs.rename for log split: " + logDir);
-          }
-          logDir = splitDir;
-          LOG.debug("Renamed region directory: " + splitDir);
-        } else if (!fs.exists(splitDir)) {
-          LOG.info("Log dir for server " + serverName + " does not exist");
-          continue;
-        }
-        logDirs.add(splitDir);
-      }
-    } finally {
-      if (needReleaseLock) {
-        this.splitLogLock.unlock();
-      }
-    }
-    return logDirs;
-  }
-
-  /**
-   * Mark regions in recovering state when distributedLogReplay are set true
-   * @param serverName Failed region server whose wals to be replayed
-   * @param regions Set of regions to be recovered
-   * @throws IOException
-   */
-  public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> 
regions) throws IOException {
-    if (!this.distributedLogReplay) {
-      return;
-    }
-    // mark regions in recovering state
-    if (regions == null || regions.isEmpty()) {
-      return;
-    }
-    this.splitLogManager.markRegionsRecovering(serverName, regions);
-  }
-
-  public void splitLog(final Set<ServerName> serverNames) throws IOException {
-    splitLog(serverNames, NON_META_FILTER);
-  }
-
-  /**
-   * Wrapper function on {@link 
SplitLogManager#removeStaleRecoveringRegions(Set)}
-   * @param failedServers
-   * @throws IOException
-   */
-  void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
-      throws IOException, InterruptedIOException {
-    this.splitLogManager.removeStaleRecoveringRegions(failedServers);
-  }
-
-  /**
-   * This method is the base split method that splits WAL files matching a 
filter. Callers should
-   * pass the appropriate filter for meta and non-meta WALs.
-   * @param serverNames logs belonging to these servers will be split; this 
will rename the log
-   *                    directory out from under a soft-failed server
-   * @param filter
-   * @throws IOException
-   */
-  public void splitLog(final Set<ServerName> serverNames, PathFilter filter) 
throws IOException {
-    long splitTime = 0, splitLogSize = 0;
-    List<Path> logDirs = getLogDirs(serverNames);
-
-    splitLogManager.handleDeadWorkers(serverNames);
-    splitTime = EnvironmentEdgeManager.currentTime();
-    splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, 
filter);
-    splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
-
-    if (this.metricsMasterFilesystem != null) {
-      if (filter == META_FILTER) {
-        this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
-      } else {
-        this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
-      }
-    }
-  }
-
-  /**
    * Get the rootdir.  Make sure its wholesome and exists before returning.
    * @param rd
    * @param c
@@ -418,9 +142,8 @@ public class MasterFileSystem {
    * needed populating the directory with necessary bootup files).
    * @throws IOException
    */
-  private Path checkRootDir(final Path rd, final Configuration c,
-    final FileSystem fs)
-  throws IOException {
+  private Path checkRootDir(final Path rd, final Configuration c, final 
FileSystem fs)
+      throws IOException {
     // If FS is in safe mode wait till out of it.
     FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 
1000));
 
@@ -595,22 +318,6 @@ public class MasterFileSystem {
   }
 
   public void stop() {
-    if (splitLogManager != null) {
-      this.splitLogManager.stop();
-    }
-  }
-
-  /**
-   * The function is used in SSH to set recovery mode based on configuration 
after all outstanding
-   * log split tasks drained.
-   * @throws IOException
-   */
-  public void setLogRecoveryMode() throws IOException {
-      this.splitLogManager.setRecoveryMode(false);
-  }
-
-  public RecoveryMode getLogRecoveryMode() {
-    return this.splitLogManager.getRecoveryMode();
   }
 
   public void logFileSystemState(Log log) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index d095183..21f14e8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -73,6 +73,11 @@ public interface MasterServices extends Server {
   MasterFileSystem getMasterFileSystem();
 
   /**
+   * @return Master's WALs {@link MasterWalManager} utility class.
+   */
+  MasterWalManager getMasterWalManager();
+
+  /**
    * @return Master's {@link ServerManager} instance.
    */
   ServerManager getServerManager();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
new file mode 100644
index 0000000..d447e2d
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -0,0 +1,351 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+
+/**
+ * This class abstracts a bunch of operations the HMaster needs
+ * when splitting log files e.g. finding log files, dirs etc.
+ */
+@InterfaceAudience.Private
+public class MasterWalManager {
+  private static final Log LOG = LogFactory.getLog(MasterWalManager.class);
+
+  final static PathFilter META_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      return DefaultWALProvider.isMetaFile(p);
+    }
+  };
+
+  final static PathFilter NON_META_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      return !DefaultWALProvider.isMetaFile(p);
+    }
+  };
+
+  // metrics for master
+  // TODO: Rename it, since those metrics are split-manager related
+  private final MetricsMasterFileSystem metricsMasterFilesystem = new 
MetricsMasterFileSystem();
+
+  // Keep around for convenience.
+  private final MasterServices services;
+  private final Configuration conf;
+  private final FileSystem fs;
+
+  // The Path to the old logs dir
+  private final Path oldLogDir;
+  private final Path rootDir;
+
+  // create the split log lock
+  private final Lock splitLogLock = new ReentrantLock();
+  private final SplitLogManager splitLogManager;
+  private final boolean distributedLogReplay;
+
+  // Is the fileystem ok?
+  private volatile boolean fsOk = true;
+
+  public MasterWalManager(MasterServices services) throws IOException {
+    this(services.getConfiguration(), 
services.getMasterFileSystem().getFileSystem(),
+      services.getMasterFileSystem().getRootDir(), services);
+  }
+
+  public MasterWalManager(Configuration conf, FileSystem fs, Path rootDir, 
MasterServices services)
+      throws IOException {
+    this.fs = fs;
+    this.conf = conf;
+    this.rootDir = rootDir;
+    this.services = services;
+    this.splitLogManager = new SplitLogManager(services, conf,
+        services, services, services.getServerName());
+    this.distributedLogReplay = this.splitLogManager.isLogReplaying();
+
+    this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+
+    // Make sure the region servers can archive their old logs
+    if (!this.fs.exists(oldLogDir)) {
+      this.fs.mkdirs(oldLogDir);
+    }
+  }
+
+  public void stop() {
+    if (splitLogManager != null) {
+      splitLogManager.stop();
+    }
+  }
+
+  @VisibleForTesting
+  SplitLogManager getSplitLogManager() {
+    return this.splitLogManager;
+  }
+
+  /**
+   * Get the directory where old logs go
+   * @return the dir
+   */
+  Path getOldLogDir() {
+    return this.oldLogDir;
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  /**
+   * Checks to see if the file system is still accessible.
+   * If not, sets closed
+   * @return false if file system is not available
+   */
+  private boolean checkFileSystem() {
+    if (this.fsOk) {
+      try {
+        FSUtils.checkFileSystemAvailable(this.fs);
+        FSUtils.checkDfsSafeMode(this.conf);
+      } catch (IOException e) {
+        services.abort("Shutting down HBase cluster: file system not 
available", e);
+        this.fsOk = false;
+      }
+    }
+    return this.fsOk;
+  }
+
+  /**
+   * Inspect the log directory to find dead servers which need recovery work
+   * @return A set of ServerNames which aren't running but still have WAL 
files left in file system
+   */
+  Set<ServerName> getFailedServersFromLogFolders() {
+    boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
+        WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
+
+    Set<ServerName> serverNames = new HashSet<ServerName>();
+    Path logsDirPath = new Path(this.rootDir, HConstants.HREGION_LOGDIR_NAME);
+
+    do {
+      if (services.isStopped()) {
+        LOG.warn("Master stopped while trying to get failed servers.");
+        break;
+      }
+      try {
+        if (!this.fs.exists(logsDirPath)) return serverNames;
+        FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, 
null);
+        // Get online servers after getting log folders to avoid log folder 
deletion of newly
+        // checked in region servers . see HBASE-5916
+        Set<ServerName> onlineServers = 
services.getServerManager().getOnlineServers().keySet();
+
+        if (logFolders == null || logFolders.length == 0) {
+          LOG.debug("No log files to split, proceeding...");
+          return serverNames;
+        }
+        for (FileStatus status : logFolders) {
+          FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, 
status.getPath(), null);
+          if (curLogFiles == null || curLogFiles.length == 0) {
+            // Empty log folder. No recovery needed
+            continue;
+          }
+          final ServerName serverName = 
DefaultWALProvider.getServerNameFromWALDirectoryName(
+              status.getPath());
+          if (null == serverName) {
+            LOG.warn("Log folder " + status.getPath() + " doesn't look like 
its name includes a " +
+                "region server name; leaving in place. If you see later errors 
about missing " +
+                "write ahead logs they may be saved in this location.");
+          } else if (!onlineServers.contains(serverName)) {
+            LOG.info("Log folder " + status.getPath() + " doesn't belong "
+                + "to a known region server, splitting");
+            serverNames.add(serverName);
+          } else {
+            LOG.info("Log folder " + status.getPath() + " belongs to an 
existing region server");
+          }
+        }
+        retrySplitting = false;
+      } catch (IOException ioe) {
+        LOG.warn("Failed getting failed servers to be recovered.", ioe);
+        if (!checkFileSystem()) {
+          LOG.warn("Bad Filesystem, exiting");
+          Runtime.getRuntime().halt(1);
+        }
+        try {
+          if (retrySplitting) {
+            
Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000));
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted, aborting since cannot return w/o splitting");
+          Thread.currentThread().interrupt();
+          retrySplitting = false;
+          Runtime.getRuntime().halt(1);
+        }
+      }
+    } while (retrySplitting);
+
+    return serverNames;
+  }
+
+  public void splitLog(final ServerName serverName) throws IOException {
+    Set<ServerName> serverNames = new HashSet<ServerName>();
+    serverNames.add(serverName);
+    splitLog(serverNames);
+  }
+
+  /**
+   * Specialized method to handle the splitting for meta WAL
+   * @param serverName logs belonging to this server will be split
+   */
+  public void splitMetaLog(final ServerName serverName) throws IOException {
+    Set<ServerName> serverNames = new HashSet<ServerName>();
+    serverNames.add(serverName);
+    splitMetaLog(serverNames);
+  }
+
+  /**
+   * Specialized method to handle the splitting for meta WAL
+   * @param serverNames logs belonging to these servers will be split
+   */
+  public void splitMetaLog(final Set<ServerName> serverNames) throws 
IOException {
+    splitLog(serverNames, META_FILTER);
+  }
+
+  
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", 
justification=
+      "We only release this lock when we set it. Updates to code that uses it 
should verify use " +
+      "of the guard boolean.")
+  private List<Path> getLogDirs(final Set<ServerName> serverNames) throws 
IOException {
+    List<Path> logDirs = new ArrayList<Path>();
+    boolean needReleaseLock = false;
+    if (!this.services.isInitialized()) {
+      // during master initialization, we could have multiple places splitting 
a same wal
+      this.splitLogLock.lock();
+      needReleaseLock = true;
+    }
+    try {
+      for (ServerName serverName : serverNames) {
+        Path logDir = new Path(this.rootDir,
+            DefaultWALProvider.getWALDirectoryName(serverName.toString()));
+        Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
+        // Rename the directory so a rogue RS doesn't create more WALs
+        if (fs.exists(logDir)) {
+          if (!this.fs.rename(logDir, splitDir)) {
+            throw new IOException("Failed fs.rename for log split: " + logDir);
+          }
+          logDir = splitDir;
+          LOG.debug("Renamed region directory: " + splitDir);
+        } else if (!fs.exists(splitDir)) {
+          LOG.info("Log dir for server " + serverName + " does not exist");
+          continue;
+        }
+        logDirs.add(splitDir);
+      }
+    } finally {
+      if (needReleaseLock) {
+        this.splitLogLock.unlock();
+      }
+    }
+    return logDirs;
+  }
+
+  /**
+   * Mark regions in recovering state when distributedLogReplay are set true
+   * @param serverName Failed region server whose wals to be replayed
+   * @param regions Set of regions to be recovered
+   */
+  public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> 
regions) throws IOException {
+    if (!this.distributedLogReplay) {
+      return;
+    }
+    // mark regions in recovering state
+    if (regions == null || regions.isEmpty()) {
+      return;
+    }
+    this.splitLogManager.markRegionsRecovering(serverName, regions);
+  }
+
+  public void splitLog(final Set<ServerName> serverNames) throws IOException {
+    splitLog(serverNames, NON_META_FILTER);
+  }
+
+  /**
+   * Wrapper function on {@link 
SplitLogManager#removeStaleRecoveringRegions(Set)}
+   * @param failedServers A set of known failed servers
+   */
+  void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
+      throws IOException, InterruptedIOException {
+    this.splitLogManager.removeStaleRecoveringRegions(failedServers);
+  }
+
+  /**
+   * This method is the base split method that splits WAL files matching a 
filter. Callers should
+   * pass the appropriate filter for meta and non-meta WALs.
+   * @param serverNames logs belonging to these servers will be split; this 
will rename the log
+   *                    directory out from under a soft-failed server
+   */
+  public void splitLog(final Set<ServerName> serverNames, PathFilter filter) 
throws IOException {
+    long splitTime = 0, splitLogSize = 0;
+    List<Path> logDirs = getLogDirs(serverNames);
+
+    splitLogManager.handleDeadWorkers(serverNames);
+    splitTime = EnvironmentEdgeManager.currentTime();
+    splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, 
filter);
+    splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
+
+    if (this.metricsMasterFilesystem != null) {
+      if (filter == META_FILTER) {
+        this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
+      } else {
+        this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
+      }
+    }
+  }
+
+  /**
+   * The function is used in SSH to set recovery mode based on configuration 
after all outstanding
+   * log split tasks drained.
+   */
+  public void setLogRecoveryMode() throws IOException {
+    this.splitLogManager.setRecoveryMode(false);
+  }
+
+  public RecoveryMode getLogRecoveryMode() {
+    return this.splitLogManager.getRecoveryMode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index d69c7aa..37659f8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -753,7 +753,7 @@ public class ServerManager {
     }
     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
       region, favoredNodes,
-      (RecoveryMode.LOG_REPLAY == 
this.services.getMasterFileSystem().getLogRecoveryMode()));
+      (RecoveryMode.LOG_REPLAY == 
this.services.getMasterWalManager().getLogRecoveryMode()));
     try {
       OpenRegionResponse response = admin.openRegion(null, request);
       return ResponseConverter.getRegionOpeningState(response);
@@ -781,7 +781,7 @@ public class ServerManager {
     }
 
     OpenRegionRequest request = 
RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
-      (RecoveryMode.LOG_REPLAY == 
this.services.getMasterFileSystem().getLogRecoveryMode()));
+      (RecoveryMode.LOG_REPLAY == 
this.services.getMasterWalManager().getLogRecoveryMode()));
     try {
       OpenRegionResponse response = admin.openRegion(null, request);
       return ResponseConverter.getRegionOpeningStateList(response);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 4c2e745..5d488dd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -75,8 +75,8 @@ import com.google.common.annotations.VisibleForTesting;
  * <p>SplitLogManager monitors the tasks that it creates using the
  * timeoutMonitor thread. If a task's progress is slow then
  * {@link SplitLogManagerCoordination#checkTasks} will take away the
- * task from the owner {@link 
org.apache.hadoop.hbase.regionserver.SplitLogWorker} 
- * and the task will be up for grabs again. When the task is done then it is 
+ * task from the owner {@link 
org.apache.hadoop.hbase.regionserver.SplitLogWorker}
+ * and the task will be up for grabs again. When the task is done then it is
  * deleted by SplitLogManager.
  *
  * <p>Clients call {@link #splitLogDistributed(Path)} to split a region 
server's
@@ -273,7 +273,7 @@ public class SplitLogManager {
     }
     waitForSplittingCompletion(batch, status);
     // remove recovering regions
-    if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
+    if (filter == MasterWalManager.META_FILTER /* reference comparison */) {
       // we split meta regions and user regions separately therefore logfiles 
are either all for
       // meta or user regions but won't for both( we could have mixed 
situations in tests)
       isMetaRecovery = true;
@@ -411,7 +411,7 @@ public class SplitLogManager {
     for (ServerName tmpServerName : serverNames) {
       recoveredServerNameSet.add(tmpServerName.getServerName());
     }
-   
+
     this.recoveringRegionLock.lock();
     try {
       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 7de694c..2fbb559 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -37,8 +37,8 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MasterWalManager;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
@@ -320,10 +320,10 @@ implements ServerProcedureInterface {
    * @throws IOException
    */
   private void start(final MasterProcedureEnv env) throws IOException {
-    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     // Set recovery mode late. This is what the old ServerShutdownHandler used 
do.
-    mfs.setLogRecoveryMode();
-    this.distributedLogReplay = mfs.getLogRecoveryMode() == 
RecoveryMode.LOG_REPLAY;
+    mwm.setLogRecoveryMode();
+    this.distributedLogReplay = mwm.getLogRecoveryMode() == 
RecoveryMode.LOG_REPLAY;
   }
 
   /**
@@ -335,7 +335,7 @@ implements ServerProcedureInterface {
   private boolean processMeta(final MasterProcedureEnv env)
   throws IOException {
     if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + 
this.serverName);
-    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
     HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO;
     if (this.shouldSplitWal) {
@@ -343,7 +343,7 @@ implements ServerProcedureInterface {
         prepareLogReplay(env, META_REGION_SET);
       } else {
         // TODO: Matteo. We BLOCK here but most important thing to be doing at 
this moment.
-        mfs.splitMetaLog(serverName);
+        mwm.splitMetaLog(serverName);
         am.getRegionStates().logSplit(metaHRI);
       }
     }
@@ -360,7 +360,7 @@ implements ServerProcedureInterface {
           processed = false;
         } else {
           // TODO: Matteo. We BLOCK here but most important thing to be doing 
at this moment.
-          mfs.splitMetaLog(serverName);
+          mwm.splitMetaLog(serverName);
         }
       }
     }
@@ -394,9 +394,9 @@ implements ServerProcedureInterface {
       LOG.debug("Mark " + size(this.regionsOnCrashedServer) + " 
regions-in-recovery from " +
         this.serverName);
     }
-    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
-    mfs.prepareLogReplay(this.serverName, regions);
+    mwm.prepareLogReplay(this.serverName, regions);
     am.getRegionStates().logSplit(this.serverName);
   }
 
@@ -405,10 +405,10 @@ implements ServerProcedureInterface {
       LOG.debug("Splitting logs from " + serverName + "; region count=" +
         size(this.regionsOnCrashedServer));
     }
-    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
     // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor 
while it is running.
-    mfs.splitLog(this.serverName);
+    mwm.splitLog(this.serverName);
     am.getRegionStates().logSplit(this.serverName);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 0882716..60b62e4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -98,6 +98,11 @@ public class MockNoopMasterServices implements 
MasterServices, Server {
   }
 
   @Override
+  public MasterWalManager getMasterWalManager() {
+    return null;
+  }
+
+  @Override
   public MasterCoprocessorHost getMasterCoprocessorHost() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index e850ed0..cc8e2d8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -221,9 +221,11 @@ public class TestCatalogJanitor {
   class MockMasterServices extends MockNoopMasterServices {
     private final MasterFileSystem mfs;
     private final AssignmentManager asm;
+    private final Server server;
 
     MockMasterServices(final Server server) throws IOException {
-      this.mfs = new MasterFileSystem(server, this);
+      this.server = server;
+      this.mfs = new MasterFileSystem(this);
       this.asm = Mockito.mock(AssignmentManager.class);
     }
 
@@ -239,7 +241,12 @@ public class TestCatalogJanitor {
 
     @Override
     public Configuration getConfiguration() {
-      return mfs.conf;
+      return server.getConfiguration();
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return server.getServerName();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 4f2385f..170a882 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1,6 +1,5 @@
 /**
  *
-
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -213,7 +212,7 @@ public class TestDistributedLogSplitting {
     startCluster(NUM_RS);
 
     final int NUM_LOG_LINES = 1000;
-    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    final SplitLogManager slm = 
master.getMasterWalManager().getSplitLogManager();
     // turn off load balancing to prevent regions from moving around otherwise
     // they will consume recovered.edits
     master.balanceSwitch(false);
@@ -680,7 +679,7 @@ public class TestDistributedLogSplitting {
     final ZooKeeperWatcher zkw = master.getZooKeeper();
     Table ht = installTable(zkw, "table", "family", 40);
     try {
-      final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+      final SplitLogManager slm = 
master.getMasterWalManager().getSplitLogManager();
 
       Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
       HRegionInfo region = null;
@@ -929,7 +928,7 @@ public class TestDistributedLogSplitting {
     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", 
null);
     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
     try {
-      final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+      final SplitLogManager slm = 
master.getMasterWalManager().getSplitLogManager();
 
       Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
       HRegionInfo region = null;
@@ -1004,7 +1003,7 @@ public class TestDistributedLogSplitting {
     LOG.info("testWorkerAbort");
     startCluster(3);
     final int NUM_LOG_LINES = 10000;
-    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    final SplitLogManager slm = 
master.getMasterWalManager().getSplitLogManager();
     FileSystem fs = master.getMasterFileSystem().getFileSystem();
 
     final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
@@ -1123,7 +1122,7 @@ public class TestDistributedLogSplitting {
   public void testDelayedDeleteOnFailure() throws Exception {
     LOG.info("testDelayedDeleteOnFailure");
     startCluster(1);
-    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    final SplitLogManager slm = 
master.getMasterWalManager().getSplitLogManager();
     final FileSystem fs = master.getMasterFileSystem().getFileSystem();
     final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
     fs.mkdirs(logDir);
@@ -1204,10 +1203,10 @@ public class TestDistributedLogSplitting {
     LOG.info("#regions = " + regions.size());
     Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
     tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
-    master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), 
tmpRegions);
+    master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), 
tmpRegions);
     Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
     userRegionSet.addAll(regions);
-    master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), 
userRegionSet);
+    master.getMasterWalManager().prepareLogReplay(hrs.getServerName(), 
userRegionSet);
     boolean isMetaRegionInRecovery = false;
     List<String> recoveringRegions =
         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, 
false);
@@ -1219,7 +1218,7 @@ public class TestDistributedLogSplitting {
     }
     assertTrue(isMetaRegionInRecovery);
 
-    master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
+    master.getMasterWalManager().splitMetaLog(hrs.getServerName());
 
     isMetaRegionInRecovery = false;
     recoveringRegions =

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
index 0534643..bf13e7f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
@@ -21,25 +21,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -50,8 +40,8 @@ import org.junit.experimental.categories.Category;
  */
 @Category({MasterTests.class, MediumTests.class})
 public class TestMasterFileSystem {
-
   private static final Log LOG = LogFactory.getLog(TestMasterFileSystem.class);
+
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
   @BeforeClass
@@ -68,52 +58,12 @@ public class TestMasterFileSystem {
   public void testFsUriSetProperly() throws Exception {
     HMaster master = UTIL.getMiniHBaseCluster().getMaster();
     MasterFileSystem fs = master.getMasterFileSystem();
-    Path masterRoot = FSUtils.getRootDir(fs.conf);
+    Path masterRoot = FSUtils.getRootDir(fs.getConfiguration());
     Path rootDir = FSUtils.getRootDir(fs.getFileSystem().getConf());
     // make sure the fs and the found root dir have the same scheme
     LOG.debug("from fs uri:" + 
FileSystem.getDefaultUri(fs.getFileSystem().getConf()));
-    LOG.debug("from configuration uri:" + FileSystem.getDefaultUri(fs.conf));
+    LOG.debug("from configuration uri:" + 
FileSystem.getDefaultUri(fs.getConfiguration()));
     // make sure the set uri matches by forcing it.
     assertEquals(masterRoot, rootDir);
   }
-
-  @Test
-  public void testRemoveStaleRecoveringRegionsDuringMasterInitialization() 
throws Exception {
-    // this test is for when distributed log replay is enabled
-    if 
(!UTIL.getConfiguration().getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
false)) return;
-    
-    LOG.info("Starting 
testRemoveStaleRecoveringRegionsDuringMasterInitialization");
-    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
-    MasterFileSystem fs = master.getMasterFileSystem();
-
-    String failedRegion = "failedRegoin1";
-    String staleRegion = "staleRegion";
-    ServerName inRecoveryServerName = ServerName.valueOf("mgr,1,1");
-    ServerName previouselyFaildServerName = ServerName.valueOf("previous,1,1");
-    String walPath = "/hbase/data/.logs/" + 
inRecoveryServerName.getServerName()
-        + "-splitting/test";
-    // Create a ZKW to use in the test
-    ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
-    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
walPath),
-      new SplitLogTask.Owned(inRecoveryServerName, 
fs.getLogRecoveryMode()).toByteArray(), 
-        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, 
staleRegion);
-    ZKUtil.createWithParents(zkw, staleRegionPath);
-    String inRecoveringRegionPath = 
ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);
-    inRecoveringRegionPath = ZKUtil.joinZNode(inRecoveringRegionPath, 
-      inRecoveryServerName.getServerName());
-    ZKUtil.createWithParents(zkw, inRecoveringRegionPath);
-    Set<ServerName> servers = new HashSet<ServerName>();
-    servers.add(previouselyFaildServerName);
-    fs.removeStaleRecoveringRegionsFromZK(servers);
-
-    // verification
-    assertFalse(ZKUtil.checkExists(zkw, staleRegionPath) != -1);
-    assertTrue(ZKUtil.checkExists(zkw, inRecoveringRegionPath) != -1);
-      
-    ZKUtil.deleteChildrenRecursively(zkw, zkw.recoveringRegionsZNode);
-    ZKUtil.deleteChildrenRecursively(zkw, zkw.splitLogZNode);
-    zkw.close();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8a2c516/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java
new file mode 100644
index 0000000..feb97d9
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the master wal manager in a local cluster
+ */
+@Category({MasterTests.class, MediumTests.class})
+public class TestMasterWalManager {
+  private static final Log LOG = LogFactory.getLog(TestMasterWalManager.class);
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardownTest() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRemoveStaleRecoveringRegionsDuringMasterInitialization() 
throws Exception {
+    // this test is for when distributed log replay is enabled
+    if 
(!UTIL.getConfiguration().getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
false)) return;
+
+    LOG.info("Starting 
testRemoveStaleRecoveringRegionsDuringMasterInitialization");
+    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+    MasterWalManager mwm = master.getMasterWalManager();
+
+    String failedRegion = "failedRegoin1";
+    String staleRegion = "staleRegion";
+    ServerName inRecoveryServerName = ServerName.valueOf("mgr,1,1");
+    ServerName previouselyFaildServerName = ServerName.valueOf("previous,1,1");
+    String walPath = "/hbase/data/.logs/" + 
inRecoveryServerName.getServerName()
+        + "-splitting/test";
+    // Create a ZKW to use in the test
+    ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
+    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
walPath),
+      new SplitLogTask.Owned(inRecoveryServerName, 
mwm.getLogRecoveryMode()).toByteArray(),
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, 
staleRegion);
+    ZKUtil.createWithParents(zkw, staleRegionPath);
+    String inRecoveringRegionPath = 
ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);
+    inRecoveringRegionPath = ZKUtil.joinZNode(inRecoveringRegionPath,
+      inRecoveryServerName.getServerName());
+    ZKUtil.createWithParents(zkw, inRecoveringRegionPath);
+    Set<ServerName> servers = new HashSet<ServerName>();
+    servers.add(previouselyFaildServerName);
+    mwm.removeStaleRecoveringRegionsFromZK(servers);
+
+    // verification
+    assertFalse(ZKUtil.checkExists(zkw, staleRegionPath) != -1);
+    assertTrue(ZKUtil.checkExists(zkw, inRecoveringRegionPath) != -1);
+
+    ZKUtil.deleteChildrenRecursively(zkw, zkw.recoveringRegionsZNode);
+    ZKUtil.deleteChildrenRecursively(zkw, zkw.splitLogZNode);
+    zkw.close();
+  }
+}

Reply via email to