Repository: hbase
Updated Branches:
  refs/heads/branch-1 6df7554d2 -> 66941910b


http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index 2c9fc0f..dd15e4c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -22,10 +22,14 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -37,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -183,13 +188,13 @@ public class TestPerTableCFReplication {
     Map<TableName, List<String>> tabCFsMap = null;
 
     // 1. null or empty string, result should be null
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null);
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(null);
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("");
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("   ");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("   ");
     assertEquals(null, tabCFsMap);
 
     TableName tab1 = TableName.valueOf("tab1");
@@ -197,20 +202,20 @@ public class TestPerTableCFReplication {
     TableName tab3 = TableName.valueOf("tab3");
 
     // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tab1));   // its table name is "tab1"
     assertFalse(tabCFsMap.containsKey(tab2));  // not other table
     assertEquals(null, tabCFsMap.get(tab1));   // null cf-list,
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab2:cf1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tab2));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey(tab1));  // not other table
     assertEquals(1, tabCFsMap.get(tab2).size());   // cf-list contains only 1 
cf
     assertEquals("cf1", tabCFsMap.get(tab2).get(0));// the only cf is "cf1"
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab3 : cf1 , 
cf3");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tab3));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey(tab1));  // not other table
@@ -219,7 +224,7 @@ public class TestPerTableCFReplication {
     assertTrue(tabCFsMap.get(tab3).contains("cf3"));// contains "cf3"
 
     // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; 
tab3:cf1,cf3");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab1 ; 
tab2:cf1 ; tab3:cf1,cf3");
     // 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
     assertTrue(tabCFsMap.containsKey(tab1));
@@ -237,7 +242,7 @@ public class TestPerTableCFReplication {
 
     // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) 
can be tolerated
     // still use the example of multiple tables: "tab1 ; tab2:cf1 ; 
tab3:cf1,cf3"
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
       "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
     // 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
@@ -256,7 +261,7 @@ public class TestPerTableCFReplication {
 
     // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
     //    "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
       "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
     // 5.1 no "tab1" and "tab2", only "tab3"
     assertEquals(1, tabCFsMap.size()); // only one table
@@ -267,7 +272,100 @@ public class TestPerTableCFReplication {
     assertEquals(2, tabCFsMap.get(tab3).size());
     assertTrue(tabCFsMap.get(tab3).contains("cf1"));
     assertTrue(tabCFsMap.get(tab3).contains("cf3"));
- }
+  }
+
+  @Test
+  public void testTableCFsHelperConverter() {
+
+    ZooKeeperProtos.TableCF[] tableCFs = null;
+    Map<TableName, List<String>> tabCFsMap = null;
+
+    // 1. null or empty string, result should be null
+    assertNull(ReplicationSerDeHelper.convert(tabCFsMap));
+
+    tabCFsMap = new HashMap<TableName, List<String>>();
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(0, tableCFs.length);
+
+    TableName tab1 = TableName.valueOf("tab1");
+    TableName tab2 = TableName.valueOf("tab2");
+    TableName tab3 = TableName.valueOf("tab3");
+
+    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
+    tabCFsMap.clear();
+    tabCFsMap.put(tab1, null);
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(1, tableCFs.length); // only one table
+    assertEquals(tab1.toString(),
+        tableCFs[0].getTableName().getQualifier().toStringUtf8());
+    assertEquals(0, tableCFs[0].getFamiliesCount());
+
+    tabCFsMap.clear();
+    tabCFsMap.put(tab2, new ArrayList<String>());
+    tabCFsMap.get(tab2).add("cf1");
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(1, tableCFs.length); // only one table
+    assertEquals(tab2.toString(),
+        tableCFs[0].getTableName().getQualifier().toStringUtf8());
+    assertEquals(1, tableCFs[0].getFamiliesCount());
+    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
+
+    tabCFsMap.clear();
+    tabCFsMap.put(tab3, new ArrayList<String>());
+    tabCFsMap.get(tab3).add("cf1");
+    tabCFsMap.get(tab3).add("cf3");
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(1, tableCFs.length);
+    assertEquals(tab3.toString(),
+        tableCFs[0].getTableName().getQualifier().toStringUtf8());
+    assertEquals(2, tableCFs[0].getFamiliesCount());
+    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
+    assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8());
+
+    tabCFsMap.clear();
+    tabCFsMap.put(tab1, null);
+    tabCFsMap.put(tab2, new ArrayList<String>());
+    tabCFsMap.get(tab2).add("cf1");
+    tabCFsMap.put(tab3, new ArrayList<String>());
+    tabCFsMap.get(tab3).add("cf1");
+    tabCFsMap.get(tab3).add("cf3");
+
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(3, tableCFs.length);
+    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, 
tab1.toString()));
+    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, 
tab2.toString()));
+    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, 
tab3.toString()));
+
+    assertEquals(0,
+        ReplicationSerDeHelper.getTableCF(tableCFs, 
tab1.toString()).getFamiliesCount());
+
+    assertEquals(1,
+        ReplicationSerDeHelper.getTableCF(tableCFs, 
tab2.toString()).getFamiliesCount());
+    assertEquals("cf1",
+        ReplicationSerDeHelper.getTableCF(tableCFs, 
tab2.toString()).getFamilies(0).toStringUtf8());
+
+    assertEquals(2,
+        ReplicationSerDeHelper.getTableCF(tableCFs, 
tab3.toString()).getFamiliesCount());
+    assertEquals("cf1",
+        ReplicationSerDeHelper.getTableCF(tableCFs, 
tab3.toString()).getFamilies(0).toStringUtf8());
+    assertEquals("cf3",
+        ReplicationSerDeHelper.getTableCF(tableCFs, 
tab3.toString()).getFamilies(1).toStringUtf8());
+
+    tabCFsMap = ReplicationSerDeHelper.convert2Map(tableCFs);
+    assertEquals(3, tabCFsMap.size());
+    assertTrue(tabCFsMap.containsKey(tab1));
+    assertTrue(tabCFsMap.containsKey(tab2));
+    assertTrue(tabCFsMap.containsKey(tab3));
+    // 3.2 table "tab1" : null cf-list
+    assertEquals(null, tabCFsMap.get(tab1));
+    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
+    assertEquals(1, tabCFsMap.get(tab2).size());
+    assertEquals("cf1", tabCFsMap.get(tab2).get(0));
+    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
+    assertEquals(2, tabCFsMap.get(tab3).size());
+    assertTrue(tabCFsMap.get(tab3).contains("cf1"));
+    assertTrue(tabCFsMap.get(tab3).contains("cf3"));
+  }
 
   @Test(timeout=300000)
   public void testPerTableCFReplication() throws Exception {
@@ -304,8 +402,23 @@ public class TestPerTableCFReplication {
       Table htab3C = connection3.getTable(tabCName);
 
       // A. add cluster2/cluster3 as peers to cluster1
-      replicationAdmin.addPeer("2", utility2.getClusterKey(), "TC;TB:f1,f3");
-      replicationAdmin.addPeer("3", utility3.getClusterKey(), "TA;TB:f1,f2");
+      ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
+      rpc2.setClusterKey(utility2.getClusterKey());
+      Map<TableName, List<String>> tableCFs = new HashMap<>();
+      tableCFs.put(tabCName, null);
+      tableCFs.put(tabBName, new ArrayList<String>());
+      tableCFs.get(tabBName).add("f1");
+      tableCFs.get(tabBName).add("f3");
+      replicationAdmin.addPeer("2", rpc2, tableCFs);
+
+      ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
+      rpc3.setClusterKey(utility3.getClusterKey());
+      tableCFs.clear();
+      tableCFs.put(tabAName, null);
+      tableCFs.put(tabBName, new ArrayList<String>());
+      tableCFs.get(tabBName).add("f1");
+      tableCFs.get(tabBName).add("f2");
+      replicationAdmin.addPeer("3", rpc3, tableCFs);
 
       // A1. tableA can only replicated to cluster3
       putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
@@ -348,8 +461,20 @@ public class TestPerTableCFReplication {
       deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
 
       // B. change peers' replicable table-cf config
-      replicationAdmin.setPeerTableCFs("2", "TA:f1,f2; TC:f2,f3");
-      replicationAdmin.setPeerTableCFs("3", "TB; TC:f3");
+      tableCFs.clear();
+      tableCFs.put(tabAName, new ArrayList<String>());
+      tableCFs.get(tabAName).add("f1");
+      tableCFs.get(tabAName).add("f2");
+      tableCFs.put(tabCName, new ArrayList<String>());
+      tableCFs.get(tabCName).add("f2");
+      tableCFs.get(tabCName).add("f3");
+      replicationAdmin.setPeerTableCFs("2", tableCFs);
+
+      tableCFs.clear();
+      tableCFs.put(tabBName, null);
+      tableCFs.put(tabCName, new ArrayList<String>());
+      tableCFs.get(tabCName).add("f3");
+      replicationAdmin.setPeerTableCFs("3", tableCFs);
 
       // B1. cf 'f1' of tableA can only replicated to cluster2
       putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e52a600..5283433 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -124,7 +124,9 @@ public class TestReplicationBase {
     utility2.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
 
-    admin.addPeer("2", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin.addPeer("2", rpc);
 
     LOG.info("Setup second Zk");
     CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index c293444..ba634dd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -369,7 +369,9 @@ public class TestReplicationSmallTests extends 
TestReplicationBase {
       }
     }
 
-    admin.addPeer("2", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin.addPeer("2", rpc);
     Thread.sleep(SLEEP_TIME);
     rowKey = Bytes.toBytes("do rep");
     put = new Put(rowKey);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 7f7ee98..001f147 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -170,7 +170,7 @@ public abstract class TestReplicationStateBasic {
 
     try {
       rp.addPeer(ID_ONE,
-        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"), 
null);
+        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
       fail("Should throw an IllegalArgumentException because "
             + "zookeeper.znode.parent is missing leading '/'.");
     } catch (IllegalArgumentException e) {
@@ -179,7 +179,7 @@ public abstract class TestReplicationStateBasic {
 
     try {
       rp.addPeer(ID_ONE,
-        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"), null);
+        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
       fail("Should throw an IllegalArgumentException because 
zookeeper.znode.parent is missing.");
     } catch (IllegalArgumentException e) {
       // Expected.
@@ -187,7 +187,7 @@ public abstract class TestReplicationStateBasic {
 
     try {
       rp.addPeer(ID_ONE,
-        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"), null);
+        new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
       fail("Should throw an IllegalArgumentException because "
           + "hbase.zookeeper.property.clientPort is missing.");
     } catch (IllegalArgumentException e) {
@@ -207,7 +207,7 @@ public abstract class TestReplicationStateBasic {
     files1.add("file_3");
     assertNull(rqc.getReplicableHFiles(ID_ONE));
     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), 
null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
     rq1.addPeerToHFileRefs(ID_ONE);
     rq1.addHFileRefs(ID_ONE, files1);
     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
@@ -229,8 +229,8 @@ public abstract class TestReplicationStateBasic {
     rqc.init();
 
     rp.init();
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), 
null);
-    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), 
null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
 
     List<String> files1 = new ArrayList<String>(3);
     files1.add("file_1");
@@ -288,9 +288,9 @@ public abstract class TestReplicationStateBasic {
     assertNumberOfPeers(0);
 
     // Add some peers
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), 
null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
     assertNumberOfPeers(1);
-    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), 
null);
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
     assertNumberOfPeers(2);
 
     // Test methods with a peer that is added but not connected
@@ -305,7 +305,7 @@ public abstract class TestReplicationStateBasic {
     assertNumberOfPeers(1);
 
     // Add one peer
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), 
null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
     rp.peerAdded(ID_ONE);
     assertNumberOfPeers(2);
     assertTrue(rp.getStatusOfPeer(ID_ONE));
@@ -365,7 +365,7 @@ public abstract class TestReplicationStateBasic {
         rq3.addLog("qId" + i, "filename" + j);
       }
       //Add peers for the corresponding queues so they are not orphans
-      rp.addPeer("qId" + i, new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i), null);
+      rp.addPeer("qId" + i, new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index fd02d1a..a949e92 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -203,7 +203,9 @@ public class TestReplicationSyncUpTool extends 
TestReplicationBase {
     /**
      * set M-S : Master: utility1 Slave1: utility2
      */
-    admin1.addPeer("1", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin1.addPeer("1", rpc);
 
     admin1.close();
     admin2.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index a5df432..7b2e1fd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -146,7 +146,7 @@ public class TestReplicationTrackerZKImpl {
 
   @Test(timeout = 30000)
   public void testPeerRemovedEvent() throws Exception {
-    rp.addPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+    rp.addPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     rt.registerListener(new DummyReplicationListener());
     rp.removePeer("5");
     // wait for event
@@ -159,7 +159,7 @@ public class TestReplicationTrackerZKImpl {
   @Test(timeout = 30000)
   public void testPeerListChangedEvent() throws Exception {
     // add a peer
-    rp.addPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+    rp.addPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5",
 true);
     rt.registerListener(new DummyReplicationListener());
     rp.disablePeer("5");
@@ -183,16 +183,16 @@ public class TestReplicationTrackerZKImpl {
   public void testPeerNameControl() throws Exception {
     int exists = 0;
     int hyphen = 0;
-    rp.addPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+    rp.addPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
 
     try{
-      rp.addPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+      rp.addPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     }catch(IllegalArgumentException e){
       exists++;
     }
 
     try{
-      rp.addPeer("6-ec2", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+      rp.addPeer("6-ec2", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     }catch(IllegalArgumentException e){
       hyphen++;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
index 5010365..a246241 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
@@ -130,7 +130,9 @@ public class TestReplicationWithTags {
     utility2 = new HBaseTestingUtility(conf2);
     utility2.setZkCluster(miniZK);
 
-    replicationAdmin.addPeer("2", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    replicationAdmin.addPeer("2", rpc);
 
     LOG.info("Setup second Zk");
     utility1.startMiniCluster(2);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
new file mode 100644
index 0000000..f53aef3
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -0,0 +1,164 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Category({ReplicationTests.class, SmallTests.class})
+public class TestTableCFsUpdater extends TableCFsUpdater {
+
+  private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class);
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static ZooKeeperWatcher zkw = null;
+  private static Abortable abortable = null;
+
+  public TestTableCFsUpdater() {
+    super(zkw, TEST_UTIL.getConfiguration(), abortable);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    abortable = new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+        LOG.info(why, e);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+    };
+    zkw = new ZooKeeperWatcher(conf, "TableCFs", abortable, true);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Test
+  public void testUpgrade() throws KeeperException, InterruptedException,
+      DeserializationException {
+    String peerId = "1";
+    TableName tab1 = TableName.valueOf("table1");
+    TableName tab2 = TableName.valueOf("table2");
+    TableName tab3 = TableName.valueOf("table3");
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(zkw.getQuorum());
+    String peerNode = getPeerNode(peerId);
+    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationSerDeHelper.toByteArray(rpc));
+
+    String tableCFs = "table1:cf1,cf2;table2:cf3;table3";
+    String tableCFsNode = getTableCFsNode(peerId);
+    LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
+    ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+
+    ReplicationPeerConfig actualRpc = 
ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+    String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
+
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    assertNull(actualRpc.getTableCFsMap());
+    assertEquals(tableCFs, actualTableCfs);
+
+    peerId = "2";
+    rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(zkw.getQuorum());
+    peerNode = getPeerNode(peerId);
+    ZKUtil.createWithParents(zkw, peerNode, 
ReplicationSerDeHelper.toByteArray(rpc));
+
+    tableCFs = "table1:cf1,cf3;table2:cf2";
+    tableCFsNode = getTableCFsNode(peerId);
+    LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
+    ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+
+    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
+
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    assertNull(actualRpc.getTableCFsMap());
+    assertEquals(tableCFs, actualTableCfs);
+
+
+    update();
+
+    peerId = "1";
+    peerNode = getPeerNode(peerId);
+    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
+    assertEquals(3, tableNameListMap.size());
+    assertTrue(tableNameListMap.containsKey(tab1));
+    assertTrue(tableNameListMap.containsKey(tab2));
+    assertTrue(tableNameListMap.containsKey(tab3));
+    assertEquals(2, tableNameListMap.get(tab1).size());
+    assertEquals("cf1", tableNameListMap.get(tab1).get(0));
+    assertEquals("cf2", tableNameListMap.get(tab1).get(1));
+    assertEquals(1, tableNameListMap.get(tab2).size());
+    assertEquals("cf3", tableNameListMap.get(tab2).get(0));
+    assertNull(tableNameListMap.get(tab3));
+
+
+    peerId = "2";
+    peerNode = getPeerNode(peerId);
+    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    tableNameListMap = actualRpc.getTableCFsMap();
+    assertEquals(2, tableNameListMap.size());
+    assertTrue(tableNameListMap.containsKey(tab1));
+    assertTrue(tableNameListMap.containsKey(tab2));
+    assertEquals(2, tableNameListMap.get(tab1).size());
+    assertEquals("cf1", tableNameListMap.get(tab1).get(0));
+    assertEquals("cf3", tableNameListMap.get(tab1).get(1));
+    assertEquals(1, tableNameListMap.get(tab2).size());
+    assertEquals("cf2", tableNameListMap.get(tab2).get(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 7614b0f..24c6ef3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -535,7 +535,7 @@ public class TestReplicationSourceManager {
           FailInitializeDummyReplicationSource.class.getName());
       final ReplicationPeers rp = manager.getReplicationPeers();
       // Set up the znode and ReplicationPeer for the fake peer
-      rp.addPeer("FakePeer", new 
ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null);
+      rp.addPeer("FakePeer", new 
ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
       // Wait for the peer to get created and connected
       Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
         @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
index dc4a340..f9ae011 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.User;
 import 
org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -131,7 +132,9 @@ public class TestVisibilityLabelReplicationWithExpAsString 
extends TestVisibilit
     TEST_UTIL1 = new HBaseTestingUtility(conf1);
     TEST_UTIL1.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true);
-    replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
+    replicationAdmin.addPeer("2", rpc);
 
     TEST_UTIL.startMiniCluster(1);
     // Wait for the labels table to become available

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index 419ad91..79cf0ac 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -65,6 +65,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.User;
 import 
org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication;
 import org.junit.experimental.categories.Category;
@@ -180,7 +181,9 @@ public class TestVisibilityLabelsReplication {
     TEST_UTIL1 = new HBaseTestingUtility(conf1);
     TEST_UTIL1.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true);
-    replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
+    replicationAdmin.addPeer("2", rpc);
 
     TEST_UTIL.startMiniCluster(1);
     // Wait for the labels table to become available

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
index d8087f5..2a3e7f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
@@ -108,6 +108,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl;
 import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
@@ -2260,7 +2261,9 @@ public class TestHBaseFsck {
     ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
     Assert.assertEquals(0, replicationAdmin.getPeersCount());
     String zkPort =  conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-    replicationAdmin.addPeer("1", "127.0.0.1:2181" + zkPort + ":/hbase");
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey("127.0.0.1:2181" + zkPort + ":/hbase");
+    replicationAdmin.addPeer("1", rpc);
     replicationAdmin.getPeersCount();
     Assert.assertEquals(1, replicationAdmin.getPeersCount());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb 
b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index d0719d8..2e240e1 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -39,11 +39,7 @@ module Hbase
     
#----------------------------------------------------------------------------------------------
     # Add a new peer cluster to replicate to
     def add_peer(id, args = {}, peer_tableCFs = nil)
-      # make add_peer backwards compatible to take in string for clusterKey 
and peer_tableCFs
-      if args.is_a?(String)
-        cluster_key = args
-        @replication_admin.addPeer(id, cluster_key, peer_tableCFs)
-      elsif args.is_a?(Hash)
+      if args.is_a?(Hash)
         unless peer_tableCFs.nil?
           raise(ArgumentError, "peer_tableCFs should be specified as TABLE_CFS 
in args")
         end
@@ -87,9 +83,18 @@ module Hbase
           }
         end
 
-        @replication_admin.add_peer(id, replication_peer_config, table_cfs)
+        unless table_cfs.nil?
+          # convert table_cfs to TableName
+          map = java.util.HashMap.new
+          table_cfs.each{|key, val|
+            map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+          }
+          replication_peer_config.set_table_cfs_map(map)
+        end
+
+        @replication_admin.add_peer(id, replication_peer_config)
       else
-        raise(ArgumentError, "args must be either a String or Hash")
+        raise(ArgumentError, "args must be a Hash")
       end
     end
 
@@ -111,7 +116,7 @@ module Hbase
     
#----------------------------------------------------------------------------------------------
     # List all peer clusters
     def list_peers
-      @replication_admin.listPeers
+      @replication_admin.listPeerConfigs
     end
 
     
#----------------------------------------------------------------------------------------------
@@ -141,20 +146,42 @@ module Hbase
     
#----------------------------------------------------------------------------------------------
     # Set new tableCFs config for the specified peer
     def set_peer_tableCFs(id, tableCFs)
-      @replication_admin.setPeerTableCFs(id, tableCFs)
+      unless tableCFs.nil?
+        # convert tableCFs to TableName
+        map = java.util.HashMap.new
+        tableCFs.each{|key, val|
+          map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+        }
+      end
+      @replication_admin.setPeerTableCFs(id, map)
     end
 
     
#----------------------------------------------------------------------------------------------
     # Append a tableCFs config for the specified peer
     def append_peer_tableCFs(id, tableCFs)
-      @replication_admin.appendPeerTableCFs(id, tableCFs)
+      unless tableCFs.nil?
+        # convert tableCFs to TableName
+        map = java.util.HashMap.new
+        tableCFs.each{|key, val|
+          map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+        }
+      end
+      @replication_admin.appendPeerTableCFs(id, map)
     end
 
     
#----------------------------------------------------------------------------------------------
     # Remove some tableCFs from the tableCFs config of the specified peer
     def remove_peer_tableCFs(id, tableCFs)
-      @replication_admin.removePeerTableCFs(id, tableCFs)
+      unless tableCFs.nil?
+        # convert tableCFs to TableName
+        map = java.util.HashMap.new
+        tableCFs.each{|key, val|
+          map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+        }
+      end
+      @replication_admin.removePeerTableCFs(id, map)
     end
+
     
#----------------------------------------------------------------------------------------------
     # Enables a table's replication switch
     def enable_tablerep(table_name)

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb 
b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
index cf9862a..d209a37 100644
--- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
@@ -33,7 +33,7 @@ Examples:
 
   hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
   hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
-    TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", 
"cf2"] }
+    TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }
 
 For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two 
optional arguments
 are DATA and CONFIG which can be specified to set different either the 
peer_data or configuration
@@ -48,7 +48,7 @@ the key TABLE_CFS.
   hbase> add_peer '9', ENDPOINT_CLASSNAME => 
'org.apache.hadoop.hbase.MyReplicationEndpoint',
     DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => 
"value2" },
   hbase> add_peer '10', ENDPOINT_CLASSNAME => 
'org.apache.hadoop.hbase.MyReplicationEndpoint',
-    TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", 
"cf2"] }
+    TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }
   hbase> add_peer '11', ENDPOINT_CLASSNAME => 
'org.apache.hadoop.hbase.MyReplicationEndpoint',
     DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => 
"value2" },
     TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb 
b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
index 3919b20..24a9976 100644
--- a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
@@ -26,7 +26,7 @@ Append a replicable table-cf config for the specified peer
 Examples:
 
   # append a table / table-cf to be replicable for a peer
-  hbase> append_peer_tableCFs '2', "table4:cfA,cfB"
+  hbase> append_peer_tableCFs '2',  { "ns1:table4" => ["cfA", "cfB"] }
 
 EOF
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb 
b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index cc1be04..6444c79 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -33,12 +33,14 @@ EOF
         now = Time.now
         peers = replication_admin.list_peers
 
-        formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"])
+        formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
+          "STATE", "TABLE_CFS"])
 
         peers.entrySet().each do |e|
           state = replication_admin.get_peer_state(e.key)
           tableCFs = replication_admin.show_peer_tableCFs(e.key)
-          formatter.row([ e.key, e.value, state, tableCFs ])
+          formatter.row([ e.key, e.value.getClusterKey,
+            e.value.getReplicationEndpointImpl, state, tableCFs ])
         end
 
         formatter.footer(now)

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb 
b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
index 5b15b52..af64bda 100644
--- a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
@@ -26,8 +26,8 @@ Remove a table / table-cf from the table-cfs config for the 
specified peer
 Examples:
 
   # Remove a table / table-cf from the replicable table-cfs for a peer
-  hbase> remove_peer_tableCFs '2', "table1"
-  hbase> remove_peer_tableCFs '2', "table1:cf1"
+  hbase> remove_peer_tableCFs '2', { "ns1:table1" => [] }
+  hbase> remove_peer_tableCFs '2', { "ns1:table1" => ["cf1"] }
 
 EOF
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb 
b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
index 3a88dbb..5599aee 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
@@ -32,7 +32,9 @@ module Shell
     # set table / table-cf to be replicable for a peer, for a table without
     # an explicit column-family list, all replicable column-families (with
     # replication_scope == 1) will be replicated
-    hbase> set_peer_tableCFs '2', "table1; table2:cf1,cf2; table3:cfA,cfB"
+    hbase> set_peer_tableCFs '2', { "ns1:table1" => [],
+                                    "ns2:table2" => ["cf1", "cf2"],
+                                    "ns3:table3" => ["cfA", "cfB"] }
 
   EOF
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
----------------------------------------------------------------------
diff --git 
a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
 
b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
index 3f4af05..04fbc7a 100644
--- 
a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
+++ 
b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
@@ -28,7 +28,7 @@ import org.junit.experimental.categories.Category;
 
 @Category({ ClientTests.class, LargeTests.class })
 public class TestReplicationShell extends AbstractTestShell {
-  @Ignore ("Disabled because hangs on occasion.. about 10% of the time") @Test
+  @Test
   public void testRunShellTests() throws IOException {
     System.setProperty("shell.test.include", "replication_admin_test.rb");
     // Start all ruby tests

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb 
b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 5b99c37..84bdf56 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -62,108 +62,142 @@ module Hbase
       assert_raise(ArgumentError) do
         replication_admin.add_peer(@peer_id, ['test'])
       end
+      assert_raise(ArgumentError) do
+        replication_admin.add_peer(@peer_id, 'test')
+      end
     end
 
-    define_test "add_peer: single zk cluster key" do
+    define_test "add_peer: single zk cluster key - peer config" do
       cluster_key = "server1.cie.com:2181:/hbase"
 
-      replication_admin.add_peer(@peer_id, cluster_key)
+      args = { CLUSTER_KEY => cluster_key }
+      replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(cluster_key, 
replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key" do
+    define_test "add_peer: multiple zk cluster key - peer config" do
       cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
 
-      replication_admin.add_peer(@peer_id, cluster_key)
+      args = { CLUSTER_KEY => cluster_key }
+      replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(replication_admin.list_peers.fetch(@peer_id), cluster_key)
+      assert_equal(cluster_key, 
replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key and table_cfs" do
+    define_test "add_peer: multiple zk cluster key and table_cfs - peer 
config" do
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
-      table_cfs_str = "table1;table2:cf1;table3:cf2,cf3"
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }
 
-      replication_admin.add_peer(@peer_id, cluster_key, table_cfs_str)
+      args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs }
+      replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
-      assert_equal(table_cfs_str, 
replication_admin.show_peer_tableCFs(@peer_id))
+      assert_equal(cluster_key, 
replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
+
+      table_cfs_map = 
replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: single zk cluster key - peer config" do
-      cluster_key = "server1.cie.com:2181:/hbase"
+    def assert_tablecfs_equal(table_cfs, table_cfs_map)
+      assert_equal(table_cfs.length, table_cfs_map.length)
+      table_cfs_map.each{|key, value|
+        assert(table_cfs.has_key?(key.getNameAsString))
+        if table_cfs.fetch(key.getNameAsString).length == 0
+          assert_equal(nil, value)
+        else
+          assert_equal(table_cfs.fetch(key.getNameAsString).length, 
value.length)
+          value.each{|v|
+            assert(table_cfs.fetch(key.getNameAsString).include?(v))
+          }
+        end
+      }
+    end
 
-      args = { CLUSTER_KEY => cluster_key }
+    define_test "add_peer: should fail when args is a hash and peer_tableCFs 
provided" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
+
+      assert_raise(ArgumentError) do
+        args = { CLUSTER_KEY => cluster_key }
+        replication_admin.add_peer(@peer_id, args, table_cfs_str)
+      end
+    end
+
+    define_test "set_peer_tableCFs: works with table-cfs map" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      args = { CLUSTER_KEY => cluster_key}
       replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(cluster_key, 
replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
+
+      table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }
+      replication_admin.set_peer_tableCFs(@peer_id, table_cfs)
+      table_cfs_map = 
replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key - peer config" do
-      cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
-
+    define_test "append_peer_tableCFs: works with table-cfs map" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }
       replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(cluster_key, 
replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
+
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
+      replication_admin.append_peer_tableCFs(@peer_id, table_cfs)
+      table_cfs_map = 
replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
+
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }
+      replication_admin.append_peer_tableCFs(@peer_id, { "ns3:table3" => 
["cf1", "cf2"] })
+      table_cfs_map = 
replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key and table_cfs - peer 
config" do
+    define_test "remove_peer_tableCFs: works with table-cfs map" do
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
-      table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", 
"cf2"] }
-      #table_cfs_str = 
"default.table1;default.table3:cf1,cf2;default.table2:cf1"
-
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }
       args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs }
       replication_admin.add_peer(@peer_id, args)
 
-      assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(cluster_key, 
replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
 
-      # Note: below assertion is dependent on the sort order of an unordered
-      # map and hence flaky depending on JVM
-      # Commenting out until HBASE-16274 is worked.
-      # assert_equal(table_cfs_str, command(:show_peer_tableCFs, @peer_id))
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
+      replication_admin.remove_peer_tableCFs(@peer_id, { "ns3:table3" => 
["cf1", "cf2"] })
+      table_cfs_map = 
replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: should fail when args is a hash and peer_tableCFs 
provided" do
-      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
-      table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
-
-      assert_raise(ArgumentError) do
-        args = { CLUSTER_KEY => cluster_key }
-        replication_admin.add_peer(@peer_id, args, table_cfs_str)
-      end
-    end
-
     define_test "get_peer_config: works with simple clusterKey peer" do
       cluster_key = "localhost:2181:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }
@@ -180,8 +214,8 @@ module Hbase
       config_params = { "config1" => "value1", "config2" => "value2" }
       args = { CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => repl_impl,
                CONFIG => config_params }
-      command(:add_peer, @peer_id, args)
-      peer_config = command(:get_peer_config, @peer_id)
+      replication_admin.add_peer(@peer_id, args)
+      peer_config = replication_admin.get_peer_config(@peer_id)
       assert_equal(cluster_key, peer_config.get_cluster_key)
       assert_equal(repl_impl, peer_config.get_replication_endpoint_impl)
       assert_equal(2, peer_config.get_configuration.size)

Reply via email to