HBASE-19073 Cleanup CoordinatedStateManager

- Merged BaseCSM class into CSM interface
- Removed config hbase.coordinated.state.manager.class
- Since state manager is not pluggable anymore, we don't need 
start/stop/initialize to setup unknown classes. Our internal ZkCSM now requires 
Server in constructor itself. Makes the dependency clearer too.
- Removed CSM from HRegionServer and HMaster constructor. Although it's a step 
back from dependency injection, but it's more consistent with our current (not 
good)  pattern where we initialize everything in the ctor itself.

Change-Id: Ifca06bb354adec5b11ea1bad4707e014410491fc


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dd70cc30
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dd70cc30
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dd70cc30

Branch: refs/heads/HBASE-18410
Commit: dd70cc308158c435c6d8ec027e2435a29be4326b
Parents: eee3b01
Author: Apekshit Sharma <a...@apache.org>
Authored: Fri Oct 20 17:20:17 2017 -0700
Committer: Apekshit Sharma <a...@apache.org>
Committed: Tue Oct 24 19:56:04 2017 -0700

----------------------------------------------------------------------
 .../master/LogRollMasterProcedureManager.java   |  9 +--
 .../LogRollRegionServerProcedureManager.java    | 12 ++--
 .../org/apache/hadoop/hbase/HConstants.java     |  6 +-
 .../hadoop/hbase/CoordinatedStateManager.java   | 31 ++++----
 .../hbase/CoordinatedStateManagerFactory.java   | 48 -------------
 .../apache/hadoop/hbase/LocalHBaseCluster.java  | 16 +----
 .../BaseCoordinatedStateManager.java            | 76 --------------------
 .../SplitLogManagerCoordination.java            |  4 +-
 .../ZKSplitLogManagerCoordination.java          |  9 +--
 .../coordination/ZkCoordinatedStateManager.java | 21 +++---
 .../ZkSplitLogWorkerCoordination.java           |  9 +--
 .../org/apache/hadoop/hbase/master/HMaster.java | 12 ++--
 .../hadoop/hbase/master/HMasterCommandLine.java | 10 +--
 .../hadoop/hbase/master/SplitLogManager.java    |  4 +-
 .../hbase/regionserver/HRegionServer.java       | 29 +++-----
 .../regionserver/HRegionServerCommandLine.java  |  5 +-
 .../hbase/regionserver/SplitLogWorker.java      | 15 ++--
 .../hadoop/hbase/util/JVMClusterUtil.java       | 23 ++----
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 49 ++++++-------
 .../apache/hadoop/hbase/MiniHBaseCluster.java   |  4 +-
 .../hadoop/hbase/TestLocalHBaseCluster.java     | 10 ++-
 .../hadoop/hbase/TestMovedRegionsCleaner.java   |  5 +-
 .../client/TestClientScannerRPCTimeout.java     |  6 +-
 .../hadoop/hbase/client/TestMetaCache.java      |  4 +-
 .../master/TestDistributedLogSplitting.java     |  5 +-
 .../hbase/master/TestHMasterRPCException.java   |  5 +-
 .../hadoop/hbase/master/TestMasterMetrics.java  |  5 +-
 .../hbase/master/TestMasterNoCluster.java       | 17 ++---
 .../hbase/master/TestMetaShutdownHandler.java   |  5 +-
 .../hbase/master/TestSplitLogManager.java       |  6 +-
 .../hbase/regionserver/OOMERegionServer.java    |  5 +-
 .../hbase/regionserver/TestClusterId.java       | 10 +--
 .../TestCompactionInDeadRegionServer.java       |  4 --
 .../hbase/regionserver/TestPriorityRpc.java     |  5 +-
 .../TestRSKilledWhenInitializing.java           |  4 +-
 .../TestRegionMergeTransactionOnCluster.java    |  6 +-
 .../regionserver/TestRegionServerHostname.java  |  2 +-
 .../TestRegionServerReportForDuty.java          |  5 +-
 .../TestScannerHeartbeatMessages.java           |  5 --
 .../hbase/regionserver/TestSplitLogWorker.java  |  5 +-
 .../TestSplitTransactionOnCluster.java          |  6 +-
 .../regionserver/wal/AbstractTestWALReplay.java |  6 +-
 .../replication/TestReplicationSource.java      |  5 --
 .../hbase/wal/TestWALReaderOnSecureWAL.java     | 10 +--
 .../apache/hadoop/hbase/wal/TestWALSplit.java   | 14 ++--
 src/main/asciidoc/_chapters/hbase-default.adoc  | 11 ---
 46 files changed, 158 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
index 62b2df7..567d5ec 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
@@ -26,12 +26,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.master.MasterServices;
@@ -95,10 +95,7 @@ public class LogRollMasterProcedureManager extends 
MasterProcedureManager {
 
     // setup the default procedure coordinator
     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 
opThreads);
-    BaseCoordinatedStateManager coordManager =
-        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
-        .getCoordinatedStateManager(master.getConfiguration());
-    coordManager.initialize(master);
+    CoordinatedStateManager coordManager = new 
ZkCoordinatedStateManager(master);
     ProcedureCoordinatorRpcs comms =
         coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), 
name);
     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, 
wakeFrequency);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
index 5ab7fac..c499ec7 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
@@ -24,12 +24,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.procedure.ProcedureMember;
 import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@@ -159,12 +159,8 @@ public class LogRollRegionServerProcedureManager extends 
RegionServerProcedureMa
           + " setting");
       return;
     }
-    BaseCoordinatedStateManager coordManager =
-        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.
-          getCoordinatedStateManager(rss.getConfiguration());
-    coordManager.initialize(rss);
-    this.memberRpcs =
-        coordManager
+    CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(rss);
+    this.memberRpcs = coordManager
             
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
 
     // read in the backup handler configuration properties

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index a272fc8..45f22a6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1209,10 +1209,6 @@ public final class HConstants {
 
   public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
 
-  /** Config for pluggable consensus provider */
-  public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
-    "hbase.coordinated.state.manager.class";
-
   /** Configuration key for SplitLog manager timeout */
   public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = 
"hbase.splitlog.manager.timeout";
 
@@ -1298,7 +1294,7 @@ public final class HConstants {
 
   public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
       "hbase.canary.write.table.check.period";
-  
+
   public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = 
"hbase.canary.read.raw.enabled";
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
index 1da7587..ab146e7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
@@ -17,7 +17,14 @@
  */
 package org.apache.hadoop.hbase;
 
+import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
 
 /**
  * Implementations of this interface will keep and return to clients
@@ -28,31 +35,27 @@ import org.apache.yetus.audience.InterfaceAudience;
  * For each coarse-grained area of operations there will be a separate
  * interface with implementation, providing API for relevant operations
  * requiring coordination.
- *
- * Property hbase.coordinated.state.manager.class in hbase-site.xml controls
- * which provider to use.
  */
 @InterfaceAudience.Private
 public interface CoordinatedStateManager {
-
   /**
-   * Initialize coordinated state management service.
-   * @param server server instance to run within.
+   * Method to retrieve coordination for split log worker
    */
-  void initialize(Server server);
+  SplitLogWorkerCoordination getSplitLogWorkerCoordination();
 
   /**
-   * Starts service.
+   * Method to retrieve coordination for split log manager
    */
-  void start();
-
+  SplitLogManagerCoordination getSplitLogManagerCoordination();
   /**
-   * Stops service.
+   * Method to retrieve {@link 
org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
    */
-  void stop();
+  ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String 
coordNode)
+      throws IOException;
 
   /**
-   * @return instance of Server coordinated state manager runs within
+   * Method to retrieve {@link 
org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
    */
-  Server getServer();
+  ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws 
KeeperException;
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
deleted file mode 100644
index c66215a..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * Creates instance of {@link CoordinatedStateManager}
- * based on configuration.
- */
-@InterfaceAudience.Private
-public final class CoordinatedStateManagerFactory {
-
-  /**
-   * Private to keep this class from being accidentally instantiated.
-   */
-  private CoordinatedStateManagerFactory(){}
-
-  /**
-   * Creates consensus provider from the given configuration.
-   * @param conf Configuration
-   * @return Implementation of  {@link CoordinatedStateManager}
-   */
-  public static CoordinatedStateManager 
getCoordinatedStateManager(Configuration conf) {
-    Class<? extends CoordinatedStateManager> coordinatedStateMgrKlass =
-      conf.getClass(HConstants.HBASE_COORDINATED_STATE_MANAGER_CLASS,
-        ZkCoordinatedStateManager.class, CoordinatedStateManager.class);
-    return ReflectionUtils.newInstance(coordinatedStateMgrKlass, conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index 123cc42..e43d33b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -173,14 +173,8 @@ public class LocalHBaseCluster {
     // Create each regionserver with its own Configuration instance so each has
     // its Connection instance rather than share (see HBASE_INSTANCES down in
     // the guts of ConnectionManager).
-
-    // Also, create separate CoordinatedStateManager instance per Server.
-    // This is special case when we have to have more than 1 
CoordinatedStateManager
-    // within 1 process.
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
-
     JVMClusterUtil.RegionServerThread rst =
-        JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends 
HRegionServer>) conf
+        JVMClusterUtil.createRegionServerThread(config, (Class<? extends 
HRegionServer>) conf
             .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), 
index);
 
     this.regionThreads.add(rst);
@@ -208,13 +202,7 @@ public class LocalHBaseCluster {
     // Create each master with its own Configuration instance so each has
     // its Connection instance rather than share (see HBASE_INSTANCES down in
     // the guts of ConnectionManager.
-
-    // Also, create separate CoordinatedStateManager instance per Server.
-    // This is special case when we have to have more than 1 
CoordinatedStateManager
-    // within 1 process.
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
-
-    JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
+    JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
         (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, 
this.masterClass), index);
     this.masterThreads.add(mt);
     return mt;

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
deleted file mode 100644
index 0727375..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.coordination;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.Server;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
-import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} 
implementations.
- * Defines methods to retrieve coordination objects for relevant areas. 
CoordinatedStateManager
- * reference returned from Server interface has to be casted to this type to
- * access those methods.
- */
-@InterfaceAudience.Private
-public abstract class BaseCoordinatedStateManager implements 
CoordinatedStateManager {
-
-  @Override
-  public void initialize(Server server) {
-  }
-
-  @Override
-  public void start() {
-  }
-
-  @Override
-  public void stop() {
-  }
-
-  @Override
-  public Server getServer() {
-    return null;
-  }
-
-  /**
-   * Method to retrieve coordination for split log worker
-   */
-  public abstract  SplitLogWorkerCoordination getSplitLogWorkerCoordination();
-
-  /**
-   * Method to retrieve coordination for split log manager
-   */
-  public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
-  /**
-   * Method to retrieve {@link 
org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
-   */
-  public abstract ProcedureCoordinatorRpcs
-    getProcedureCoordinatorRpcs(String procType, String coordNode) throws 
IOException;
-
-  /**
-   * Method to retrieve {@link 
org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
-   */
-  public abstract ProcedureMemberRpcs
-    getProcedureMemberRpcs(String procType) throws KeeperException;
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
index df8103b..564d729 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
@@ -58,7 +58,7 @@ public interface SplitLogManagerCoordination {
   /**
    * Detail class that shares data between coordination and split log manager
    */
-  public static class SplitLogManagerDetails {
+  class SplitLogManagerDetails {
     final private ConcurrentMap<String, Task> tasks;
     final private MasterServices master;
     final private Set<String> failedDeletions;
@@ -156,7 +156,7 @@ public interface SplitLogManagerCoordination {
    * @throws InterruptedIOException
    * @throws IOException in case of failure
    */
-  void setRecoveryMode(boolean b) throws InterruptedIOException, IOException;
+  void setRecoveryMode(boolean b) throws IOException;
 
   /**
    * Removes known stale servers

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index 5fd20e8..ef99de0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
+import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALSplitter;
@@ -101,14 +102,14 @@ public class ZKSplitLogManagerCoordination extends 
ZooKeeperListener implements
 
   private boolean isDrainingDone = false;
 
-  public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
-      ZooKeeperWatcher watcher) {
+  public ZKSplitLogManagerCoordination(Configuration conf, ZooKeeperWatcher 
watcher) {
     super(watcher);
+    this.conf = conf;
     taskFinisher = new TaskFinisher() {
       @Override
       public Status finish(ServerName workerName, String logfile) {
         try {
-          WALSplitter.finishSplitLogFile(logfile, 
manager.getServer().getConfiguration());
+          WALSplitter.finishSplitLogFile(logfile, conf);
         } catch (IOException e) {
           LOG.warn("Could not finish splitting of log file " + logfile, e);
           return Status.ERR;
@@ -116,7 +117,6 @@ public class ZKSplitLogManagerCoordination extends 
ZooKeeperListener implements
         return Status.DONE;
       }
     };
-    this.conf = manager.getServer().getConfiguration();
   }
 
   @Override
@@ -1122,6 +1122,7 @@ public class ZKSplitLogManagerCoordination extends 
ZooKeeperListener implements
   /**
    * Temporary function that is used by unit tests only
    */
+  @VisibleForTesting
   public void setIgnoreDeleteForTesting(boolean b) {
     ignoreZKDeleteForTesting = b;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 2fc8e39..10e2642 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.coordination;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
 import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@@ -33,24 +36,16 @@ import org.apache.zookeeper.KeeperException;
  * ZooKeeper-based implementation of {@link 
org.apache.hadoop.hbase.CoordinatedStateManager}.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
-  protected Server server;
+public class ZkCoordinatedStateManager implements CoordinatedStateManager {
   protected ZooKeeperWatcher watcher;
   protected SplitLogWorkerCoordination splitLogWorkerCoordination;
   protected SplitLogManagerCoordination splitLogManagerCoordination;
 
-  @Override
-  public void initialize(Server server) {
-    this.server = server;
+  public ZkCoordinatedStateManager(Server server) {
     this.watcher = server.getZooKeeper();
-    splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, 
watcher);
-    splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, 
watcher);
-
-  }
-
-  @Override
-  public Server getServer() {
-    return server;
+    splitLogWorkerCoordination = new 
ZkSplitLogWorkerCoordination(server.getServerName(), watcher);
+    splitLogManagerCoordination = new 
ZKSplitLogManagerCoordination(server.getConfiguration(),
+        watcher);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index 230a42f..14e7796 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -90,13 +90,11 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
   protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
   private int maxConcurrentTasks = 0;
 
-  private final ZkCoordinatedStateManager manager;
+  private final ServerName serverName;
 
-  public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager 
zkCoordinatedStateManager,
-      ZooKeeperWatcher watcher) {
+  public ZkSplitLogWorkerCoordination(ServerName serverName, ZooKeeperWatcher 
watcher) {
     super(watcher);
-    manager = zkCoordinatedStateManager;
-
+    this.serverName = serverName;
   }
 
   /**
@@ -185,7 +183,6 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
         // currentTask can change but that's ok
         String taskpath = currentTask;
         if (taskpath != null && taskpath.equals(path)) {
-          ServerName serverName = manager.getServer().getServerName();
           // have to compare data. cannot compare version because then there
           // will be race with attemptToOwnTask()
           // cannot just check whether the node has been transitioned to

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bb36520..a990a4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -468,13 +468,12 @@ public class HMaster extends HRegionServer implements 
MasterServices {
    * #finishActiveMasterInitialization(MonitoredTask) after
    * the master becomes the active one.
    */
-  public HMaster(final Configuration conf, CoordinatedStateManager csm)
+  public HMaster(final Configuration conf)
       throws IOException, KeeperException {
-    super(conf, csm);
+    super(conf);
     try {
       this.rsFatals = new MemoryBoundedLogMessageBuffer(
           conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
-
       LOG.info("hbase.rootdir=" + getRootDir() +
           ", hbase.cluster.distributed=" + 
this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
 
@@ -2828,11 +2827,10 @@ public class HMaster extends HRegionServer implements 
MasterServices {
    * @return HMaster instance.
    */
   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
-      final Configuration conf, final CoordinatedStateManager cp)  {
+      final Configuration conf)  {
     try {
-      Constructor<? extends HMaster> c =
-        masterClass.getConstructor(Configuration.class, 
CoordinatedStateManager.class);
-      return c.newInstance(conf, cp);
+      Constructor<? extends HMaster> c = 
masterClass.getConstructor(Configuration.class);
+      return c.newInstance(conf);
     } catch(Exception e) {
       Throwable error = e;
       if (e instanceof InvocationTargetException &&

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
index d1e6b73..f9a441d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
@@ -29,8 +29,6 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
 import org.apache.hadoop.hbase.MasterNotRunningException;
@@ -230,9 +228,7 @@ public class HMasterCommandLine extends ServerCommandLine {
         waitOnMasterThreads(cluster);
       } else {
         logProcessInfo(getConf());
-        CoordinatedStateManager csm =
-          CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
-        HMaster master = HMaster.constructMaster(masterClass, conf, csm);
+        HMaster master = HMaster.constructMaster(masterClass, conf);
         if (master.isStopped()) {
           LOG.info("Won't bring the Master up as a shutdown is requested");
           return 1;
@@ -302,9 +298,9 @@ public class HMasterCommandLine extends ServerCommandLine {
   public static class LocalHMaster extends HMaster {
     private MiniZooKeeperCluster zkcluster = null;
 
-    public LocalHMaster(Configuration conf, CoordinatedStateManager csm)
+    public LocalHMaster(Configuration conf)
     throws IOException, KeeperException, InterruptedException {
-      super(conf, csm);
+      super(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 8027b6a..67aab82 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
 import 
org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -153,8 +152,7 @@ public class SplitLogManager {
   }
 
   private SplitLogManagerCoordination getSplitLogManagerCoordination() {
-    return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
-        .getSplitLogManagerCoordination();
+    return 
server.getCoordinatedStateManager().getSplitLogManagerCoordination();
   }
 
   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d792628..f384c1f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.ClockOutOfSyncException;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -92,8 +91,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.client.locking.LockServiceClient;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -505,7 +504,7 @@ public class HRegionServer extends HasThread implements
 
   protected final RSRpcServices rpcServices;
 
-  protected BaseCoordinatedStateManager csm;
+  protected CoordinatedStateManager csm;
 
   /**
    * Configuration manager is used to register/deregister and notify the 
configuration observers
@@ -533,21 +532,13 @@ public class HRegionServer extends HasThread implements
   private final boolean masterless;
   static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
 
-  /**
-   * Starts a HRegionServer at the default location.
-   */
-  public HRegionServer(Configuration conf) throws IOException, 
InterruptedException {
-    this(conf, 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
-  }
 
   /**
    * Starts a HRegionServer at the default location
-   *
-   * @param csm implementation of CoordinatedStateManager to be used
    */
-  // Don't start any services or managers in here in the Consructor.
+  // Don't start any services or managers in here in the Constructor.
   // Defer till after we register with the Master as much as possible. See 
#startServices.
-  public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws 
IOException {
+  public HRegionServer(Configuration conf) throws IOException {
     super("RegionServer");  // thread name
     try {
       this.startcode = System.currentTimeMillis();
@@ -642,9 +633,7 @@ public class HRegionServer extends HasThread implements
 
         // If no master in cluster, skip trying to track one or look for a 
cluster status.
         if (!this.masterless) {
-          this.csm = (BaseCoordinatedStateManager) csm;
-          this.csm.initialize(this);
-          this.csm.start();
+          this.csm = new ZkCoordinatedStateManager(this);
 
           masterAddressTracker = new MasterAddressTracker(getZooKeeper(), 
this);
           masterAddressTracker.start();
@@ -2924,7 +2913,7 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
-  public BaseCoordinatedStateManager getCoordinatedStateManager() {
+  public CoordinatedStateManager getCoordinatedStateManager() {
     return csm;
   }
 
@@ -3020,11 +3009,11 @@ public class HRegionServer extends HasThread implements
    */
   public static HRegionServer constructRegionServer(
       Class<? extends HRegionServer> regionServerClass,
-      final Configuration conf2, CoordinatedStateManager cp) {
+      final Configuration conf2) {
     try {
       Constructor<? extends HRegionServer> c = regionServerClass
-          .getConstructor(Configuration.class, CoordinatedStateManager.class);
-      return c.newInstance(conf2, cp);
+          .getConstructor(Configuration.class);
+      return c.newInstance(conf2);
     } catch (Exception e) {
       throw new RuntimeException("Failed construction of " + "Regionserver: "
           + regionServerClass.toString(), e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java
index 343ebf6..1212668 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java
@@ -23,10 +23,8 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.util.ServerCommandLine;
 
 /**
@@ -52,7 +50,6 @@ public class HRegionServerCommandLine extends 
ServerCommandLine {
 
   private int start() throws Exception {
     Configuration conf = getConf();
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
     try {
       // If 'local', don't start a region server here. Defer to
       // LocalHBaseCluster. It manages 'local' clusters.
@@ -61,7 +58,7 @@ public class HRegionServerCommandLine extends 
ServerCommandLine {
             + HConstants.CLUSTER_DISTRIBUTED + " is false");
       } else {
         logProcessInfo(getConf());
-        HRegionServer hrs = 
HRegionServer.constructRegionServer(regionServerClass, conf, cp);
+        HRegionServer hrs = 
HRegionServer.constructRegionServer(regionServerClass, conf);
         hrs.start();
         hrs.join();
         if (hrs.isAborted()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 96e9cf5..9d00f1a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -45,8 +44,8 @@ import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
 
 /**
  * This worker is spawned in every regionserver, including master. The Worker 
waits for log
- * splitting tasks to be put up by the {@link 
org.apache.hadoop.hbase.master.SplitLogManager} 
- * running in the master and races with other workers in other serves to 
acquire those tasks. 
+ * splitting tasks to be put up by the {@link 
org.apache.hadoop.hbase.master.SplitLogManager}
+ * running in the master and races with other workers in other serves to 
acquire those tasks.
  * The coordination is done via coordination engine.
  * <p>
  * If a worker has successfully moved the task from state UNASSIGNED to OWNED 
then it owns the task.
@@ -75,9 +74,7 @@ public class SplitLogWorker implements Runnable {
       TaskExecutor splitTaskExecutor) {
     this.server = server;
     this.conf = conf;
-    this.coordination =
-        ((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
-            .getSplitLogWorkerCoordination();
+    this.coordination = 
hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
     this.server = server;
     coordination.init(server, conf, splitTaskExecutor, this);
   }
@@ -102,7 +99,9 @@ public class SplitLogWorker implements Runnable {
         // encountered a bad non-retry-able persistent error.
         try {
           if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new 
Path(walDir, filename)),
-            fs, conf, p, sequenceIdChecker, 
server.getCoordinatedStateManager(), mode, factory)) {
+            fs, conf, p, sequenceIdChecker,
+              
server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
+              server.getConnection(), mode, factory)) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {
@@ -186,7 +185,7 @@ public class SplitLogWorker implements Runnable {
    * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
    * guarantee that two workers will not be executing the same task therefore 
it
    * is better to have workers prepare the task and then have the
-   * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in 
+   * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
    * SplitLogManager.TaskFinisher
    */
   public interface TaskExecutor {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index 135dabb..b88c0e6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -72,23 +72,18 @@ public class JVMClusterUtil {
    * Creates a {@link RegionServerThread}.
    * Call 'start' on the returned thread to make it run.
    * @param c Configuration to use.
-   * @param cp consensus provider to use
    * @param hrsc Class to create.
    * @param index Used distinguishing the object returned.
    * @throws IOException
    * @return Region server added.
    */
-  public static JVMClusterUtil.RegionServerThread createRegionServerThread(
-      final Configuration c, CoordinatedStateManager cp, final Class<? extends 
HRegionServer> hrsc,
-      final int index)
-  throws IOException {
+  public static JVMClusterUtil.RegionServerThread 
createRegionServerThread(final Configuration c,
+      final Class<? extends HRegionServer> hrsc, final int index) throws 
IOException {
     HRegionServer server;
     try {
-
-      Constructor<? extends HRegionServer> ctor = 
hrsc.getConstructor(Configuration.class,
-      CoordinatedStateManager.class);
+      Constructor<? extends HRegionServer> ctor = 
hrsc.getConstructor(Configuration.class);
       ctor.setAccessible(true);
-      server = ctor.newInstance(c, cp);
+      server = ctor.newInstance(c);
     } catch (InvocationTargetException ite) {
       Throwable target = ite.getTargetException();
       throw new RuntimeException("Failed construction of RegionServer: " +
@@ -124,20 +119,16 @@ public class JVMClusterUtil {
    * Creates a {@link MasterThread}.
    * Call 'start' on the returned thread to make it run.
    * @param c Configuration to use.
-   * @param cp consensus provider to use
    * @param hmc Class to create.
    * @param index Used distinguishing the object returned.
    * @throws IOException
    * @return Master added.
    */
-  public static JVMClusterUtil.MasterThread createMasterThread(
-      final Configuration c, CoordinatedStateManager cp, final Class<? extends 
HMaster> hmc,
-      final int index)
-  throws IOException {
+  public static JVMClusterUtil.MasterThread createMasterThread(final 
Configuration c,
+      final Class<? extends HMaster> hmc, final int index) throws IOException {
     HMaster server;
     try {
-      server = hmc.getConstructor(Configuration.class, 
CoordinatedStateManager.class).
-        newInstance(c, cp);
+      server = hmc.getConstructor(Configuration.class).newInstance(c);
     } catch (InvocationTargetException ite) {
       Throwable target = ite.getTargetException();
       throw new RuntimeException("Failed construction of Master: " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index aeacd9d..e39d7ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -18,6 +18,9 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@@ -82,7 +85,6 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -146,7 +148,8 @@ public class WALSplitter {
 
   private Map<TableName, TableState> tableStatesCache =
       new ConcurrentHashMap<>();
-  private BaseCoordinatedStateManager csm;
+  private SplitLogWorkerCoordination splitLogWorkerCoordination;
+  private Connection connection;
   private final WALFactory walFactory;
 
   private MonitoredTask status;
@@ -177,7 +180,8 @@ public class WALSplitter {
   @VisibleForTesting
   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
       FileSystem fs, LastSequenceId idChecker,
-      CoordinatedStateManager csm, RecoveryMode mode) {
+      SplitLogWorkerCoordination splitLogWorkerCoordination, Connection 
connection,
+      RecoveryMode mode) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName = conf
         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, 
WALCellCodec.class.getName());
@@ -185,7 +189,9 @@ public class WALSplitter {
     this.rootDir = rootDir;
     this.fs = fs;
     this.sequenceIdChecker = idChecker;
-    this.csm = (BaseCoordinatedStateManager)csm;
+    this.splitLogWorkerCoordination = splitLogWorkerCoordination;
+    this.connection = connection;
+
     this.walFactory = factory;
     this.controller = new PipelineController();
 
@@ -199,7 +205,7 @@ public class WALSplitter {
     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
 
     this.numWriterThreads = 
this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
-    if (csm != null && this.distributedLogReplay) {
+    if (this.splitLogWorkerCoordination != null && this.distributedLogReplay) {
       outputSink = new LogReplayOutputSink(controller, entryBuffers, 
numWriterThreads);
     } else {
       if (this.distributedLogReplay) {
@@ -217,20 +223,14 @@ public class WALSplitter {
    * <p>
    * If the log file has N regions then N recovered.edits files will be 
produced.
    * <p>
-   * @param rootDir
-   * @param logfile
-   * @param fs
-   * @param conf
-   * @param reporter
-   * @param idChecker
-   * @param cp coordination state manager
    * @return false if it is interrupted by the progress-able.
-   * @throws IOException
    */
   public static boolean splitLogFile(Path rootDir, FileStatus logfile, 
FileSystem fs,
       Configuration conf, CancelableProgressable reporter, LastSequenceId 
idChecker,
-      CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) 
throws IOException {
-    WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, 
mode);
+      SplitLogWorkerCoordination splitLogWorkerCoordination, Connection 
connection,
+      RecoveryMode mode, final WALFactory factory) throws IOException {
+    WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
+        splitLogWorkerCoordination, connection, mode);
     return s.splitLogFile(logfile, reporter);
   }
 
@@ -246,7 +246,7 @@ public class WALSplitter {
     List<Path> splits = new ArrayList<>();
     if (logfiles != null && logfiles.length > 0) {
       for (FileStatus logfile: logfiles) {
-        WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
+        WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, 
null, null,
             RecoveryMode.LOG_SPLITTING);
         if (s.splitLogFile(logfile, null)) {
           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
@@ -317,9 +317,8 @@ public class WALSplitter {
         lastFlushedSequenceId = 
lastFlushedSequenceIds.get(encodedRegionNameAsStr);
         if (lastFlushedSequenceId == null) {
           if (this.distributedLogReplay) {
-            RegionStoreSequenceIds ids =
-                
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
-                  encodedRegionNameAsStr);
+            RegionStoreSequenceIds ids = 
splitLogWorkerCoordination.getRegionFlushedSequenceId(
+                failedServerName, encodedRegionNameAsStr);
             if (ids != null) {
               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
               if (LOG.isDebugEnabled()) {
@@ -377,10 +376,9 @@ public class WALSplitter {
       throw iie;
     } catch (CorruptedLogFileException e) {
       LOG.warn("Could not parse, corrupted WAL=" + logPath, e);
-      if (this.csm != null) {
+      if (splitLogWorkerCoordination != null) {
         // Some tests pass in a csm of null.
-        this.csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
-          logfile.getPath().getName(), fs);
+        splitLogWorkerCoordination.markCorrupted(rootDir, 
logfile.getPath().getName(), fs);
       } else {
         // for tests only
         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
@@ -1952,7 +1950,7 @@ public class WALSplitter {
         // retrieve last flushed sequence Id from ZK. Because region 
postOpenDeployTasks will
         // update the value for the region
         RegionStoreSequenceIds ids =
-            
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
+            
splitLogWorkerCoordination.getRegionFlushedSequenceId(failedServerName,
               loc.getRegionInfo().getEncodedName());
         if (ids != null) {
           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
@@ -2185,15 +2183,14 @@ public class WALSplitter {
     }
 
     private boolean isTableDisabledOrDisabling(TableName tableName) {
-      if (csm == null)
+      if (connection == null)
         return false; // we can't get state without CoordinatedStateManager
       if (tableName.isSystemTable())
         return false; // assume that system tables never can be disabled
       TableState tableState = tableStatesCache.get(tableName);
       if (tableState == null) {
         try {
-          tableState =
-              MetaTableAccessor.getTableState(csm.getServer().getConnection(), 
tableName);
+          tableState = MetaTableAccessor.getTableState(connection, tableName);
           if (tableState != null)
             tableStatesCache.put(tableName, tableState);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
index fe4119a..a46fa9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
@@ -109,9 +109,9 @@ public class MiniHBaseCluster extends HBaseCluster {
     private Thread shutdownThread = null;
     private User user = null;
 
-    public MiniHBaseClusterRegionServer(Configuration conf, 
CoordinatedStateManager cp)
+    public MiniHBaseClusterRegionServer(Configuration conf)
         throws IOException, InterruptedException {
-      super(conf, cp);
+      super(conf);
       this.user = User.getCurrent();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/TestLocalHBaseCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestLocalHBaseCluster.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestLocalHBaseCluster.java
index bbf4f32..4c6b848 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestLocalHBaseCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestLocalHBaseCluster.java
@@ -66,10 +66,9 @@ public class TestLocalHBaseCluster {
    * running in local mode.
    */
   public static class MyHMaster extends HMaster {
-    public MyHMaster(Configuration conf, CoordinatedStateManager cp)
-      throws IOException, KeeperException,
+    public MyHMaster(Configuration conf) throws IOException, KeeperException,
         InterruptedException {
-      super(conf, cp);
+      super(conf);
     }
 
     public int echo(int val) {
@@ -82,9 +81,8 @@ public class TestLocalHBaseCluster {
    */
   public static class MyHRegionServer extends 
MiniHBaseCluster.MiniHBaseClusterRegionServer {
 
-    public MyHRegionServer(Configuration conf, CoordinatedStateManager cp) 
throws IOException,
-        InterruptedException {
-      super(conf, cp);
+    public MyHRegionServer(Configuration conf) throws IOException, 
InterruptedException {
+      super(conf);
     }
 
     public int echo(int val) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMovedRegionsCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMovedRegionsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMovedRegionsCleaner.java
index 8d3d2fc..5fb16c7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMovedRegionsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMovedRegionsCleaner.java
@@ -44,9 +44,8 @@ import java.io.IOException;
 
   private static class TestMockRegionServer extends 
MiniHBaseCluster.MiniHBaseClusterRegionServer {
 
-    public TestMockRegionServer(Configuration conf, CoordinatedStateManager cp)
-        throws IOException, InterruptedException {
-      super(conf, cp);
+    public TestMockRegionServer(Configuration conf) throws IOException, 
InterruptedException {
+      super(conf);
     }
 
     protected int movedRegionCleanerPeriod() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
index de21f83..c9686c2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
@@ -141,9 +141,9 @@ public class TestClientScannerRPCTimeout {
   }
 
   private static class RegionServerWithScanTimeout extends 
MiniHBaseClusterRegionServer {
-    public RegionServerWithScanTimeout(Configuration conf, 
CoordinatedStateManager cp)
+    public RegionServerWithScanTimeout(Configuration conf)
         throws IOException, InterruptedException {
-      super(conf, cp);
+      super(conf);
     }
 
     protected RSRpcServices createRpcServices() throws IOException {
@@ -168,7 +168,7 @@ public class TestClientScannerRPCTimeout {
         throws ServiceException {
       if (request.hasScannerId()) {
         ScanResponse scanResponse = super.scan(controller, request);
-        if (this.tableScannerId == request.getScannerId() && 
+        if (this.tableScannerId == request.getScannerId() &&
             (sleepAlways || (!slept && seqNoToSleepOn == 
request.getNextCallSeq()))) {
           try {
             LOG.info("SLEEPING " + (rpcTimeout + 500));

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java
index e7aa60f..b1fda41 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java
@@ -198,9 +198,9 @@ public class TestMetaCache {
   public static class RegionServerWithFakeRpcServices extends HRegionServer {
     private FakeRSRpcServices rsRpcServices;
 
-    public RegionServerWithFakeRpcServices(Configuration conf, 
CoordinatedStateManager cp)
+    public RegionServerWithFakeRpcServices(Configuration conf)
       throws IOException, InterruptedException {
-      super(conf, cp);
+      super(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index a65924d..eafc412 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -1151,8 +1150,8 @@ public class TestDistributedLogSplitting {
       out.write(Bytes.toBytes("corrupted bytes"));
       out.close();
       ZKSplitLogManagerCoordination coordination =
-          (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
-              .getCoordinatedStateManager()).getSplitLogManagerCoordination();
+          (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
+              .getSplitLogManagerCoordination();
       coordination.setIgnoreDeleteForTesting(true);
       executor = Executors.newSingleThreadExecutor();
       Runnable runnable = new Runnable() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
index 8cb3ec8..64d5a02 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
@@ -26,8 +26,6 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -68,10 +66,9 @@ public class TestHMasterRPCException {
     conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 2000);
     testUtil.startMiniZKCluster();
 
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
     ZooKeeperWatcher watcher = testUtil.getZooKeeperWatcher();
     ZKUtil.createWithParents(watcher, watcher.znodePaths.masterAddressZNode, 
Bytes.toBytes("fake:123"));
-    master = new HMaster(conf, cp);
+    master = new HMaster(conf);
     rpcClient = RpcClientFactory.createClient(conf, 
HConstants.CLUSTER_ID_DEFAULT);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java
index 04cfc65..69baa5f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java
@@ -51,9 +51,8 @@ public class TestMasterMetrics {
   private static HBaseTestingUtility TEST_UTIL;
 
   public static class MyMaster extends HMaster {
-    public MyMaster(Configuration conf, CoordinatedStateManager cp) throws 
IOException,
-        KeeperException, InterruptedException {
-      super(conf, cp);
+    public MyMaster(Configuration conf) throws IOException, KeeperException, 
InterruptedException {
+      super(conf);
     }
 /*
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index 480ba9a..25671fc 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -33,8 +33,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CategoryBasedTimeout;
 import org.apache.hadoop.hbase.CoordinatedStateException;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -47,7 +45,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -133,9 +130,7 @@ public class TestMasterNoCluster {
   @Test
   public void testStopDuringStart()
   throws IOException, KeeperException, InterruptedException {
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(
-      TESTUTIL.getConfiguration());
-    HMaster master = new HMaster(TESTUTIL.getConfiguration(), cp);
+    HMaster master = new HMaster(TESTUTIL.getConfiguration());
     master.start();
     // Immediately have it stop.  We used hang in assigning meta.
     master.stopMaster();
@@ -148,7 +143,7 @@ public class TestMasterNoCluster {
    * @throws IOException
    * @throws KeeperException
    * @throws InterruptedException
-   * @throws 
org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException 
+   * @throws 
org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException
    */
   @Ignore @Test // Disabled since HBASE-18511. Reenable when master can carry 
regions.
   public void testFailover() throws Exception {
@@ -186,15 +181,13 @@ public class TestMasterNoCluster {
     // and get notification on transitions.  We need to fake out any rpcs the
     // master does opening/closing regions.  Also need to fake out the address
     // of the 'remote' mocked up regionservers.
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(
-      TESTUTIL.getConfiguration());
     // Insert a mock for the connection, use TESTUTIL.getConfiguration rather 
than
     // the conf from the master; the conf will already have an 
ClusterConnection
     // associate so the below mocking of a connection will fail.
     final ClusterConnection mockedConnection = 
HConnectionTestingUtility.getMockedConnectionAndDecorate(
         TESTUTIL.getConfiguration(), rs0, rs0, rs0.getServerName(),
         HRegionInfo.FIRST_META_REGIONINFO);
-    HMaster master = new HMaster(conf, cp) {
+    HMaster master = new HMaster(conf) {
       InetAddress getRemoteInetAddress(final int port, final long 
serverStartCode)
       throws UnknownHostException {
         // Return different address dependent on port passed.
@@ -262,9 +255,7 @@ public class TestMasterNoCluster {
     final ServerName deadServer = ServerName.valueOf("test.sample", 1, 100);
     final MockRegionServer rs0 = new MockRegionServer(conf, newServer);
 
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(
-      TESTUTIL.getConfiguration());
-    HMaster master = new HMaster(conf, cp) {
+    HMaster master = new HMaster(conf) {
       @Override
       MasterMetaBootstrap createMetaBootstrap(final HMaster master, final 
MonitoredTask status) {
         return new MasterMetaBootstrap(this, status) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
index 68160df..d50e4a3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMetaShutdownHandler.java
@@ -124,10 +124,9 @@ public class TestMetaShutdownHandler {
 
   public static class MyRegionServer extends MiniHBaseClusterRegionServer {
 
-    public MyRegionServer(Configuration conf, CoordinatedStateManager cp)
-      throws IOException, KeeperException,
+    public MyRegionServer(Configuration conf) throws IOException, 
KeeperException,
         InterruptedException {
-      super(conf, cp);
+      super(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index 04fc797..6215790 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -39,7 +39,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.commons.logging.Log;
@@ -48,7 +47,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -57,6 +55,7 @@ import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
 import 
org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
@@ -105,8 +104,7 @@ public class TestSplitLogManager {
     public DummyMasterServices(ZooKeeperWatcher zkw, Configuration conf) {
       super(conf);
       this.zkw = zkw;
-      cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
-      cm.initialize(this);
+      cm = new ZkCoordinatedStateManager(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/OOMERegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/OOMERegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/OOMERegionServer.java
index 07c141c..ce38ef6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/OOMERegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/OOMERegionServer.java
@@ -39,9 +39,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequ
 public class OOMERegionServer extends HRegionServer {
   private List<Put> retainer = new ArrayList<>();
 
-  public OOMERegionServer(HBaseConfiguration conf, CoordinatedStateManager cp)
-      throws IOException, InterruptedException {
-    super(conf, cp);
+  public OOMERegionServer(HBaseConfiguration conf) throws IOException, 
InterruptedException {
+    super(conf);
   }
 
   public void put(byte [] regionName, Put put)

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java
index 2240226..46fd702 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.master.LoadBalancer;
@@ -75,11 +73,9 @@ public class TestClusterId {
     TEST_UTIL.startMiniDFSCluster(1);
 
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
     //start region server, needs to be separate
     //so we get an unset clusterId
-    rst = JVMClusterUtil.createRegionServerThread(conf,cp,
-        HRegionServer.class, 0);
+    rst = JVMClusterUtil.createRegionServerThread(conf, HRegionServer.class, 
0);
     rst.start();
     //Make sure RS is in blocking state
     Thread.sleep(10000);
@@ -92,7 +88,7 @@ public class TestClusterId {
     assertNotNull(clusterId);
     assertEquals(clusterId, rst.getRegionServer().getClusterId());
   }
-  
+
   @Test
   public void testRewritingClusterIdToPB() throws Exception {
     TEST_UTIL.startMiniZKCluster();
@@ -115,6 +111,6 @@ public class TestClusterId {
     int expected = 
LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration())? 2: 1;
     assertEquals(expected, 
master.getServerManager().getOnlineServersList().size());
   }
-  
+
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
index 4b13c30..5e212f8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
@@ -75,10 +75,6 @@ public class TestCompactionInDeadRegionServer {
       super(conf);
     }
 
-    public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) 
throws IOException {
-      super(conf, csm);
-    }
-
     @Override
     protected void tryRegionServerReport(long reportStartTime, long 
reportEndTime)
         throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
index ded4c48..ef216f7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
@@ -31,11 +31,9 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Get;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
@@ -65,8 +63,7 @@ public class TestPriorityRpc {
     conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
     final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
     TEST_UTIL.getDataTestDir(this.getClass().getName());
-    CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
-    regionServer = HRegionServer.constructRegionServer(HRegionServer.class, 
conf, cp);
+    regionServer = HRegionServer.constructRegionServer(HRegionServer.class, 
conf);
     priority = regionServer.rpcServices.getPriority();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index edd5673..bf5787f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -191,9 +191,9 @@ public class TestRSKilledWhenInitializing {
    * notices and so removes the region from its set of online regionservers.
    */
   static class RegisterAndDieRegionServer extends 
MiniHBaseCluster.MiniHBaseClusterRegionServer {
-    public RegisterAndDieRegionServer(Configuration conf, 
CoordinatedStateManager cp)
+    public RegisterAndDieRegionServer(Configuration conf)
     throws IOException, InterruptedException {
-      super(conf, cp);
+      super(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 15c6b76..bdcc559 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -518,10 +518,8 @@ public class TestRegionMergeTransactionOnCluster {
 
   // Make it public so that JVMClusterUtil can access it.
   public static class MyMaster extends HMaster {
-    public MyMaster(Configuration conf, CoordinatedStateManager cp)
-      throws IOException, KeeperException,
-        InterruptedException {
-      super(conf, cp);
+    public MyMaster(Configuration conf) throws IOException, KeeperException, 
InterruptedException {
+      super(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerHostname.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerHostname.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerHostname.java
index 8eee9b6..872fec6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerHostname.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerHostname.java
@@ -71,7 +71,7 @@ public class TestRegionServerHostname {
     TEST_UTIL.getConfiguration().set(HRegionServer.RS_HOSTNAME_KEY, 
invalidHostname);
     HRegionServer hrs = null;
     try {
-      hrs = new HRegionServer(TEST_UTIL.getConfiguration(), null);
+      hrs = new HRegionServer(TEST_UTIL.getConfiguration());
     } catch (IllegalArgumentException iae) {
       assertTrue(iae.getMessage(),
         iae.getMessage().contains("Failed resolve of " + invalidHostname) ||

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
index 6f8a23b..f32a87c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
@@ -164,10 +164,9 @@ public class TestRegionServerReportForDuty {
     private boolean rpcStubCreatedFlag = false;
     private boolean masterChanged = false;
 
-    public MyRegionServer(Configuration conf, CoordinatedStateManager cp)
-      throws IOException, KeeperException,
+    public MyRegionServer(Configuration conf) throws IOException, 
KeeperException,
         InterruptedException {
-      super(conf, cp);
+      super(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index d3771f6..b9e77c6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -397,11 +397,6 @@ public class TestScannerHeartbeatMessages {
       super(conf);
     }
 
-    public HeartbeatHRegionServer(Configuration conf, CoordinatedStateManager 
csm)
-        throws IOException {
-      super(conf, csm);
-    }
-
     @Override
     protected RSRpcServices createRpcServices() throws IOException {
       return new HeartbeatRPCServices(this);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index a01c75d..40077f9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@@ -88,8 +88,7 @@ public class TestSplitLogWorker {
     public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
       this.zkw = zkw;
       this.conf = conf;
-      cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
-      cm.initialize(this);
+      cm = new ZkCoordinatedStateManager(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd70cc30/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 1965d5a..8533004 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -951,10 +951,8 @@ public class TestSplitTransactionOnCluster {
 
   // Make it public so that JVMClusterUtil can access it.
   public static class MyMaster extends HMaster {
-    public MyMaster(Configuration conf, CoordinatedStateManager cp)
-      throws IOException, KeeperException,
-        InterruptedException {
-      super(conf, cp);
+    public MyMaster(Configuration conf) throws IOException, KeeperException, 
InterruptedException {
+      super(conf);
     }
 
     @Override

Reply via email to