Repository: hbase Updated Branches: refs/heads/branch-1 6df7554d2 -> 66941910b
http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index 2c9fc0f..dd15e4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -22,10 +22,14 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -183,13 +188,13 @@ public class TestPerTableCFReplication { Map<TableName, List<String>> tabCFsMap = null; // 1. null or empty string, result should be null - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null); + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(null); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(""); + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(""); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(" "); + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(" "); assertEquals(null, tabCFsMap); TableName tab1 = TableName.valueOf("tab1"); @@ -197,20 +202,20 @@ public class TestPerTableCFReplication { TableName tab3 = TableName.valueOf("tab3"); // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1"); + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab1"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey(tab1)); // its table name is "tab1" assertFalse(tabCFsMap.containsKey(tab2)); // not other table assertEquals(null, tabCFsMap.get(tab1)); // null cf-list, - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1"); + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab2:cf1"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey(tab2)); // its table name is "tab2" assertFalse(tabCFsMap.containsKey(tab1)); // not other table assertEquals(1, tabCFsMap.get(tab2).size()); // cf-list contains only 1 cf assertEquals("cf1", tabCFsMap.get(tab2).get(0));// the only cf is "cf1" - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3"); + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab3 : cf1 , cf3"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey(tab3)); // its table name is "tab2" assertFalse(tabCFsMap.containsKey(tab1)); // not other table @@ -219,7 +224,7 @@ public class TestPerTableCFReplication { assertTrue(tabCFsMap.get(tab3).contains("cf3"));// contains "cf3" // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3"); + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3"); // 3.1 contains 3 tables : "tab1", "tab2" and "tab3" assertEquals(3, tabCFsMap.size()); assertTrue(tabCFsMap.containsKey(tab1)); @@ -237,7 +242,7 @@ public class TestPerTableCFReplication { // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig( + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig( "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;"); // 4.1 contains 3 tables : "tab1", "tab2" and "tab3" assertEquals(3, tabCFsMap.size()); @@ -256,7 +261,7 @@ public class TestPerTableCFReplication { // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3" // "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally - tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig( + tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig( "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"); // 5.1 no "tab1" and "tab2", only "tab3" assertEquals(1, tabCFsMap.size()); // only one table @@ -267,7 +272,100 @@ public class TestPerTableCFReplication { assertEquals(2, tabCFsMap.get(tab3).size()); assertTrue(tabCFsMap.get(tab3).contains("cf1")); assertTrue(tabCFsMap.get(tab3).contains("cf3")); - } + } + + @Test + public void testTableCFsHelperConverter() { + + ZooKeeperProtos.TableCF[] tableCFs = null; + Map<TableName, List<String>> tabCFsMap = null; + + // 1. null or empty string, result should be null + assertNull(ReplicationSerDeHelper.convert(tabCFsMap)); + + tabCFsMap = new HashMap<TableName, List<String>>(); + tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + assertEquals(0, tableCFs.length); + + TableName tab1 = TableName.valueOf("tab1"); + TableName tab2 = TableName.valueOf("tab2"); + TableName tab3 = TableName.valueOf("tab3"); + + // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" + tabCFsMap.clear(); + tabCFsMap.put(tab1, null); + tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + assertEquals(1, tableCFs.length); // only one table + assertEquals(tab1.toString(), + tableCFs[0].getTableName().getQualifier().toStringUtf8()); + assertEquals(0, tableCFs[0].getFamiliesCount()); + + tabCFsMap.clear(); + tabCFsMap.put(tab2, new ArrayList<String>()); + tabCFsMap.get(tab2).add("cf1"); + tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + assertEquals(1, tableCFs.length); // only one table + assertEquals(tab2.toString(), + tableCFs[0].getTableName().getQualifier().toStringUtf8()); + assertEquals(1, tableCFs[0].getFamiliesCount()); + assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); + + tabCFsMap.clear(); + tabCFsMap.put(tab3, new ArrayList<String>()); + tabCFsMap.get(tab3).add("cf1"); + tabCFsMap.get(tab3).add("cf3"); + tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + assertEquals(1, tableCFs.length); + assertEquals(tab3.toString(), + tableCFs[0].getTableName().getQualifier().toStringUtf8()); + assertEquals(2, tableCFs[0].getFamiliesCount()); + assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); + assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8()); + + tabCFsMap.clear(); + tabCFsMap.put(tab1, null); + tabCFsMap.put(tab2, new ArrayList<String>()); + tabCFsMap.get(tab2).add("cf1"); + tabCFsMap.put(tab3, new ArrayList<String>()); + tabCFsMap.get(tab3).add("cf1"); + tabCFsMap.get(tab3).add("cf3"); + + tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + assertEquals(3, tableCFs.length); + assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tab1.toString())); + assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tab2.toString())); + assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString())); + + assertEquals(0, + ReplicationSerDeHelper.getTableCF(tableCFs, tab1.toString()).getFamiliesCount()); + + assertEquals(1, + ReplicationSerDeHelper.getTableCF(tableCFs, tab2.toString()).getFamiliesCount()); + assertEquals("cf1", + ReplicationSerDeHelper.getTableCF(tableCFs, tab2.toString()).getFamilies(0).toStringUtf8()); + + assertEquals(2, + ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString()).getFamiliesCount()); + assertEquals("cf1", + ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString()).getFamilies(0).toStringUtf8()); + assertEquals("cf3", + ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString()).getFamilies(1).toStringUtf8()); + + tabCFsMap = ReplicationSerDeHelper.convert2Map(tableCFs); + assertEquals(3, tabCFsMap.size()); + assertTrue(tabCFsMap.containsKey(tab1)); + assertTrue(tabCFsMap.containsKey(tab2)); + assertTrue(tabCFsMap.containsKey(tab3)); + // 3.2 table "tab1" : null cf-list + assertEquals(null, tabCFsMap.get(tab1)); + // 3.3 table "tab2" : cf-list contains a single cf "cf1" + assertEquals(1, tabCFsMap.get(tab2).size()); + assertEquals("cf1", tabCFsMap.get(tab2).get(0)); + // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" + assertEquals(2, tabCFsMap.get(tab3).size()); + assertTrue(tabCFsMap.get(tab3).contains("cf1")); + assertTrue(tabCFsMap.get(tab3).contains("cf3")); + } @Test(timeout=300000) public void testPerTableCFReplication() throws Exception { @@ -304,8 +402,23 @@ public class TestPerTableCFReplication { Table htab3C = connection3.getTable(tabCName); // A. add cluster2/cluster3 as peers to cluster1 - replicationAdmin.addPeer("2", utility2.getClusterKey(), "TC;TB:f1,f3"); - replicationAdmin.addPeer("3", utility3.getClusterKey(), "TA;TB:f1,f2"); + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(utility2.getClusterKey()); + Map<TableName, List<String>> tableCFs = new HashMap<>(); + tableCFs.put(tabCName, null); + tableCFs.put(tabBName, new ArrayList<String>()); + tableCFs.get(tabBName).add("f1"); + tableCFs.get(tabBName).add("f3"); + replicationAdmin.addPeer("2", rpc2, tableCFs); + + ReplicationPeerConfig rpc3 = new ReplicationPeerConfig(); + rpc3.setClusterKey(utility3.getClusterKey()); + tableCFs.clear(); + tableCFs.put(tabAName, null); + tableCFs.put(tabBName, new ArrayList<String>()); + tableCFs.get(tabBName).add("f1"); + tableCFs.get(tabBName).add("f2"); + replicationAdmin.addPeer("3", rpc3, tableCFs); // A1. tableA can only replicated to cluster3 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A); @@ -348,8 +461,20 @@ public class TestPerTableCFReplication { deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C); // B. change peers' replicable table-cf config - replicationAdmin.setPeerTableCFs("2", "TA:f1,f2; TC:f2,f3"); - replicationAdmin.setPeerTableCFs("3", "TB; TC:f3"); + tableCFs.clear(); + tableCFs.put(tabAName, new ArrayList<String>()); + tableCFs.get(tabAName).add("f1"); + tableCFs.get(tabAName).add("f2"); + tableCFs.put(tabCName, new ArrayList<String>()); + tableCFs.get(tabCName).add("f2"); + tableCFs.get(tabCName).add("f3"); + replicationAdmin.setPeerTableCFs("2", tableCFs); + + tableCFs.clear(); + tableCFs.put(tabBName, null); + tableCFs.put(tabCName, new ArrayList<String>()); + tableCFs.get(tabCName).add("f3"); + replicationAdmin.setPeerTableCFs("3", tableCFs); // B1. cf 'f1' of tableA can only replicated to cluster2 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A); http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index e52a600..5283433 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -124,7 +124,9 @@ public class TestReplicationBase { utility2.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); - admin.addPeer("2", utility2.getClusterKey()); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + admin.addPeer("2", rpc); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index c293444..ba634dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -369,7 +369,9 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } - admin.addPeer("2", utility2.getClusterKey()); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + admin.addPeer("2", rpc); Thread.sleep(SLEEP_TIME); rowKey = Bytes.toBytes("do rep"); put = new Put(rowKey); http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 7f7ee98..001f147 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -170,7 +170,7 @@ public abstract class TestReplicationStateBasic { try { rp.addPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"), null); + new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase")); fail("Should throw an IllegalArgumentException because " + "zookeeper.znode.parent is missing leading '/'."); } catch (IllegalArgumentException e) { @@ -179,7 +179,7 @@ public abstract class TestReplicationStateBasic { try { rp.addPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"), null); + new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/")); fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing."); } catch (IllegalArgumentException e) { // Expected. @@ -187,7 +187,7 @@ public abstract class TestReplicationStateBasic { try { rp.addPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"), null); + new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase")); fail("Should throw an IllegalArgumentException because " + "hbase.zookeeper.property.clientPort is missing."); } catch (IllegalArgumentException e) { @@ -207,7 +207,7 @@ public abstract class TestReplicationStateBasic { files1.add("file_3"); assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rq1.addPeerToHFileRefs(ID_ONE); rq1.addHFileRefs(ID_ONE, files1); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); @@ -229,8 +229,8 @@ public abstract class TestReplicationStateBasic { rqc.init(); rp.init(); - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); - rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null); + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); List<String> files1 = new ArrayList<String>(3); files1.add("file_1"); @@ -288,9 +288,9 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(0); // Add some peers - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); assertNumberOfPeers(1); - rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null); + rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); assertNumberOfPeers(2); // Test methods with a peer that is added but not connected @@ -305,7 +305,7 @@ public abstract class TestReplicationStateBasic { assertNumberOfPeers(1); // Add one peer - rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); + rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); rp.peerAdded(ID_ONE); assertNumberOfPeers(2); assertTrue(rp.getStatusOfPeer(ID_ONE)); @@ -365,7 +365,7 @@ public abstract class TestReplicationStateBasic { rq3.addLog("qId" + i, "filename" + j); } //Add peers for the corresponding queues so they are not orphans - rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i), null); + rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index fd02d1a..a949e92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -203,7 +203,9 @@ public class TestReplicationSyncUpTool extends TestReplicationBase { /** * set M-S : Master: utility1 Slave1: utility2 */ - admin1.addPeer("1", utility2.getClusterKey()); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + admin1.addPeer("1", rpc); admin1.close(); admin2.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index a5df432..7b2e1fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -146,7 +146,7 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerRemovedEvent() throws Exception { - rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); + rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); rt.registerListener(new DummyReplicationListener()); rp.removePeer("5"); // wait for event @@ -159,7 +159,7 @@ public class TestReplicationTrackerZKImpl { @Test(timeout = 30000) public void testPeerListChangedEvent() throws Exception { // add a peer - rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); + rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true); rt.registerListener(new DummyReplicationListener()); rp.disablePeer("5"); @@ -183,16 +183,16 @@ public class TestReplicationTrackerZKImpl { public void testPeerNameControl() throws Exception { int exists = 0; int hyphen = 0; - rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); + rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); try{ - rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); + rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); }catch(IllegalArgumentException e){ exists++; } try{ - rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null); + rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); }catch(IllegalArgumentException e){ hyphen++; } http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java index 5010365..a246241 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java @@ -130,7 +130,9 @@ public class TestReplicationWithTags { utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); - replicationAdmin.addPeer("2", utility2.getClusterKey()); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + replicationAdmin.addPeer("2", rpc); LOG.info("Setup second Zk"); utility1.startMiniCluster(2); http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java new file mode 100644 index 0000000..f53aef3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java @@ -0,0 +1,164 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.master; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@Category({ReplicationTests.class, SmallTests.class}) +public class TestTableCFsUpdater extends TableCFsUpdater { + + private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static ZooKeeperWatcher zkw = null; + private static Abortable abortable = null; + + public TestTableCFsUpdater() { + super(zkw, TEST_UTIL.getConfiguration(), abortable); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + Configuration conf = TEST_UTIL.getConfiguration(); + abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + + @Override + public boolean isAborted() { + return false; + } + }; + zkw = new ZooKeeperWatcher(conf, "TableCFs", abortable, true); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testUpgrade() throws KeeperException, InterruptedException, + DeserializationException { + String peerId = "1"; + TableName tab1 = TableName.valueOf("table1"); + TableName tab2 = TableName.valueOf("table2"); + TableName tab3 = TableName.valueOf("table3"); + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(zkw.getQuorum()); + String peerNode = getPeerNode(peerId); + ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc)); + + String tableCFs = "table1:cf1,cf2;table2:cf3;table3"; + String tableCFsNode = getTableCFsNode(peerId); + LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); + ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); + + ReplicationPeerConfig actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); + + assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); + assertNull(actualRpc.getTableCFsMap()); + assertEquals(tableCFs, actualTableCfs); + + peerId = "2"; + rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(zkw.getQuorum()); + peerNode = getPeerNode(peerId); + ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc)); + + tableCFs = "table1:cf1,cf3;table2:cf2"; + tableCFsNode = getTableCFsNode(peerId); + LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); + ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); + + actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); + + assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); + assertNull(actualRpc.getTableCFsMap()); + assertEquals(tableCFs, actualTableCfs); + + + update(); + + peerId = "1"; + peerNode = getPeerNode(peerId); + actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); + Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap(); + assertEquals(3, tableNameListMap.size()); + assertTrue(tableNameListMap.containsKey(tab1)); + assertTrue(tableNameListMap.containsKey(tab2)); + assertTrue(tableNameListMap.containsKey(tab3)); + assertEquals(2, tableNameListMap.get(tab1).size()); + assertEquals("cf1", tableNameListMap.get(tab1).get(0)); + assertEquals("cf2", tableNameListMap.get(tab1).get(1)); + assertEquals(1, tableNameListMap.get(tab2).size()); + assertEquals("cf3", tableNameListMap.get(tab2).get(0)); + assertNull(tableNameListMap.get(tab3)); + + + peerId = "2"; + peerNode = getPeerNode(peerId); + actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); + tableNameListMap = actualRpc.getTableCFsMap(); + assertEquals(2, tableNameListMap.size()); + assertTrue(tableNameListMap.containsKey(tab1)); + assertTrue(tableNameListMap.containsKey(tab2)); + assertEquals(2, tableNameListMap.get(tab1).size()); + assertEquals("cf1", tableNameListMap.get(tab1).get(0)); + assertEquals("cf3", tableNameListMap.get(tab1).get(1)); + assertEquals(1, tableNameListMap.get(tab2).size()); + assertEquals("cf2", tableNameListMap.get(tab2).get(0)); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 7614b0f..24c6ef3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -535,7 +535,7 @@ public class TestReplicationSourceManager { FailInitializeDummyReplicationSource.class.getName()); final ReplicationPeers rp = manager.getReplicationPeers(); // Set up the znode and ReplicationPeer for the fake peer - rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null); + rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); // Wait for the peer to get created and connected Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java index dc4a340..f9ae011 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -131,7 +132,9 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit TEST_UTIL1 = new HBaseTestingUtility(conf1); TEST_UTIL1.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true); - replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey()); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(TEST_UTIL1.getClusterKey()); + replicationAdmin.addPeer("2", rpc); TEST_UTIL.startMiniCluster(1); // Wait for the labels table to become available http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 419ad91..79cf0ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication; import org.junit.experimental.categories.Category; @@ -180,7 +181,9 @@ public class TestVisibilityLabelsReplication { TEST_UTIL1 = new HBaseTestingUtility(conf1); TEST_UTIL1.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true); - replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey()); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(TEST_UTIL1.getClusterKey()); + replicationAdmin.addPeer("2", rpc); TEST_UTIL.startMiniCluster(1); // Wait for the labels table to become available http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index d8087f5..2a3e7f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -108,6 +108,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl; import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; @@ -2260,7 +2261,9 @@ public class TestHBaseFsck { ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf); Assert.assertEquals(0, replicationAdmin.getPeersCount()); String zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - replicationAdmin.addPeer("1", "127.0.0.1:2181" + zkPort + ":/hbase"); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey("127.0.0.1:2181" + zkPort + ":/hbase"); + replicationAdmin.addPeer("1", rpc); replicationAdmin.getPeersCount(); Assert.assertEquals(1, replicationAdmin.getPeersCount()); http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/hbase/replication_admin.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index d0719d8..2e240e1 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -39,11 +39,7 @@ module Hbase #---------------------------------------------------------------------------------------------- # Add a new peer cluster to replicate to def add_peer(id, args = {}, peer_tableCFs = nil) - # make add_peer backwards compatible to take in string for clusterKey and peer_tableCFs - if args.is_a?(String) - cluster_key = args - @replication_admin.addPeer(id, cluster_key, peer_tableCFs) - elsif args.is_a?(Hash) + if args.is_a?(Hash) unless peer_tableCFs.nil? raise(ArgumentError, "peer_tableCFs should be specified as TABLE_CFS in args") end @@ -87,9 +83,18 @@ module Hbase } end - @replication_admin.add_peer(id, replication_peer_config, table_cfs) + unless table_cfs.nil? + # convert table_cfs to TableName + map = java.util.HashMap.new + table_cfs.each{|key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + } + replication_peer_config.set_table_cfs_map(map) + end + + @replication_admin.add_peer(id, replication_peer_config) else - raise(ArgumentError, "args must be either a String or Hash") + raise(ArgumentError, "args must be a Hash") end end @@ -111,7 +116,7 @@ module Hbase #---------------------------------------------------------------------------------------------- # List all peer clusters def list_peers - @replication_admin.listPeers + @replication_admin.listPeerConfigs end #---------------------------------------------------------------------------------------------- @@ -141,20 +146,42 @@ module Hbase #---------------------------------------------------------------------------------------------- # Set new tableCFs config for the specified peer def set_peer_tableCFs(id, tableCFs) - @replication_admin.setPeerTableCFs(id, tableCFs) + unless tableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + tableCFs.each{|key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + } + end + @replication_admin.setPeerTableCFs(id, map) end #---------------------------------------------------------------------------------------------- # Append a tableCFs config for the specified peer def append_peer_tableCFs(id, tableCFs) - @replication_admin.appendPeerTableCFs(id, tableCFs) + unless tableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + tableCFs.each{|key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + } + end + @replication_admin.appendPeerTableCFs(id, map) end #---------------------------------------------------------------------------------------------- # Remove some tableCFs from the tableCFs config of the specified peer def remove_peer_tableCFs(id, tableCFs) - @replication_admin.removePeerTableCFs(id, tableCFs) + unless tableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + tableCFs.each{|key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + } + end + @replication_admin.removePeerTableCFs(id, map) end + #---------------------------------------------------------------------------------------------- # Enables a table's replication switch def enable_tablerep(table_name) http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/add_peer.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index cf9862a..d209a37 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -33,7 +33,7 @@ Examples: hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase" hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", - TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments are DATA and CONFIG which can be specified to set different either the peer_data or configuration @@ -48,7 +48,7 @@ the key TABLE_CFS. hbase> add_peer '9', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" }, hbase> add_peer '10', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', - TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } hbase> add_peer '11', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" }, TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb index 3919b20..24a9976 100644 --- a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb +++ b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb @@ -26,7 +26,7 @@ Append a replicable table-cf config for the specified peer Examples: # append a table / table-cf to be replicable for a peer - hbase> append_peer_tableCFs '2', "table4:cfA,cfB" + hbase> append_peer_tableCFs '2', { "ns1:table4" => ["cfA", "cfB"] } EOF end http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/list_peers.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index cc1be04..6444c79 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -33,12 +33,14 @@ EOF now = Time.now peers = replication_admin.list_peers - formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"]) + formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME", + "STATE", "TABLE_CFS"]) peers.entrySet().each do |e| state = replication_admin.get_peer_state(e.key) tableCFs = replication_admin.show_peer_tableCFs(e.key) - formatter.row([ e.key, e.value, state, tableCFs ]) + formatter.row([ e.key, e.value.getClusterKey, + e.value.getReplicationEndpointImpl, state, tableCFs ]) end formatter.footer(now) http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb index 5b15b52..af64bda 100644 --- a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb +++ b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb @@ -26,8 +26,8 @@ Remove a table / table-cf from the table-cfs config for the specified peer Examples: # Remove a table / table-cf from the replicable table-cfs for a peer - hbase> remove_peer_tableCFs '2', "table1" - hbase> remove_peer_tableCFs '2', "table1:cf1" + hbase> remove_peer_tableCFs '2', { "ns1:table1" => [] } + hbase> remove_peer_tableCFs '2', { "ns1:table1" => ["cf1"] } EOF end http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb index 3a88dbb..5599aee 100644 --- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb @@ -32,7 +32,9 @@ module Shell # set table / table-cf to be replicable for a peer, for a table without # an explicit column-family list, all replicable column-families (with # replication_scope == 1) will be replicated - hbase> set_peer_tableCFs '2', "table1; table2:cf1,cf2; table3:cfA,cfB" + hbase> set_peer_tableCFs '2', { "ns1:table1" => [], + "ns2:table2" => ["cf1", "cf2"], + "ns3:table3" => ["cfA", "cfB"] } EOF end http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java ---------------------------------------------------------------------- diff --git a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java index 3f4af05..04fbc7a 100644 --- a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java +++ b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java @@ -28,7 +28,7 @@ import org.junit.experimental.categories.Category; @Category({ ClientTests.class, LargeTests.class }) public class TestReplicationShell extends AbstractTestShell { - @Ignore ("Disabled because hangs on occasion.. about 10% of the time") @Test + @Test public void testRunShellTests() throws IOException { System.setProperty("shell.test.include", "replication_admin_test.rb"); // Start all ruby tests http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 5b99c37..84bdf56 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -62,108 +62,142 @@ module Hbase assert_raise(ArgumentError) do replication_admin.add_peer(@peer_id, ['test']) end + assert_raise(ArgumentError) do + replication_admin.add_peer(@peer_id, 'test') + end end - define_test "add_peer: single zk cluster key" do + define_test "add_peer: single zk cluster key - peer config" do cluster_key = "server1.cie.com:2181:/hbase" - replication_admin.add_peer(@peer_id, cluster_key) + args = { CLUSTER_KEY => cluster_key } + replication_admin.add_peer(@peer_id, args) assert_equal(1, replication_admin.list_peers.length) assert(replication_admin.list_peers.key?(@peer_id)) - assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key) # cleanup for future tests replication_admin.remove_peer(@peer_id) end - define_test "add_peer: multiple zk cluster key" do + define_test "add_peer: multiple zk cluster key - peer config" do cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" - replication_admin.add_peer(@peer_id, cluster_key) + args = { CLUSTER_KEY => cluster_key } + replication_admin.add_peer(@peer_id, args) assert_equal(1, replication_admin.list_peers.length) assert(replication_admin.list_peers.key?(@peer_id)) - assert_equal(replication_admin.list_peers.fetch(@peer_id), cluster_key) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key) # cleanup for future tests replication_admin.remove_peer(@peer_id) end - define_test "add_peer: multiple zk cluster key and table_cfs" do + define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - table_cfs_str = "table1;table2:cf1;table3:cf2,cf3" + table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } - replication_admin.add_peer(@peer_id, cluster_key, table_cfs_str) + args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs } + replication_admin.add_peer(@peer_id, args) assert_equal(1, replication_admin.list_peers.length) assert(replication_admin.list_peers.key?(@peer_id)) - assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) - assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key) + + table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap() + assert_tablecfs_equal(table_cfs, table_cfs_map) # cleanup for future tests replication_admin.remove_peer(@peer_id) end - define_test "add_peer: single zk cluster key - peer config" do - cluster_key = "server1.cie.com:2181:/hbase" + def assert_tablecfs_equal(table_cfs, table_cfs_map) + assert_equal(table_cfs.length, table_cfs_map.length) + table_cfs_map.each{|key, value| + assert(table_cfs.has_key?(key.getNameAsString)) + if table_cfs.fetch(key.getNameAsString).length == 0 + assert_equal(nil, value) + else + assert_equal(table_cfs.fetch(key.getNameAsString).length, value.length) + value.each{|v| + assert(table_cfs.fetch(key.getNameAsString).include?(v)) + } + end + } + end - args = { CLUSTER_KEY => cluster_key } + define_test "add_peer: should fail when args is a hash and peer_tableCFs provided" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + table_cfs_str = "table1;table2:cf1;table3:cf1,cf2" + + assert_raise(ArgumentError) do + args = { CLUSTER_KEY => cluster_key } + replication_admin.add_peer(@peer_id, args, table_cfs_str) + end + end + + define_test "set_peer_tableCFs: works with table-cfs map" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + args = { CLUSTER_KEY => cluster_key} replication_admin.add_peer(@peer_id, args) assert_equal(1, replication_admin.list_peers.length) assert(replication_admin.list_peers.key?(@peer_id)) - assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key) + + table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } + replication_admin.set_peer_tableCFs(@peer_id, table_cfs) + table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap() + assert_tablecfs_equal(table_cfs, table_cfs_map) # cleanup for future tests replication_admin.remove_peer(@peer_id) end - define_test "add_peer: multiple zk cluster key - peer config" do - cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" - + define_test "append_peer_tableCFs: works with table-cfs map" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" args = { CLUSTER_KEY => cluster_key } replication_admin.add_peer(@peer_id, args) assert_equal(1, replication_admin.list_peers.length) assert(replication_admin.list_peers.key?(@peer_id)) - assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key) + + table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } + replication_admin.append_peer_tableCFs(@peer_id, table_cfs) + table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap() + assert_tablecfs_equal(table_cfs, table_cfs_map) + + table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } + replication_admin.append_peer_tableCFs(@peer_id, { "ns3:table3" => ["cf1", "cf2"] }) + table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap() + assert_tablecfs_equal(table_cfs, table_cfs_map) # cleanup for future tests replication_admin.remove_peer(@peer_id) end - define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do + define_test "remove_peer_tableCFs: works with table-cfs map" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } - #table_cfs_str = "default.table1;default.table3:cf1,cf2;default.table2:cf1" - + table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs } replication_admin.add_peer(@peer_id, args) - assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(1, replication_admin.list_peers.length) + assert(replication_admin.list_peers.key?(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key) - # Note: below assertion is dependent on the sort order of an unordered - # map and hence flaky depending on JVM - # Commenting out until HBASE-16274 is worked. - # assert_equal(table_cfs_str, command(:show_peer_tableCFs, @peer_id)) + table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } + replication_admin.remove_peer_tableCFs(@peer_id, { "ns3:table3" => ["cf1", "cf2"] }) + table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap() + assert_tablecfs_equal(table_cfs, table_cfs_map) # cleanup for future tests replication_admin.remove_peer(@peer_id) end - define_test "add_peer: should fail when args is a hash and peer_tableCFs provided" do - cluster_key = "zk4,zk5,zk6:11000:/hbase-test" - table_cfs_str = "table1;table2:cf1;table3:cf1,cf2" - - assert_raise(ArgumentError) do - args = { CLUSTER_KEY => cluster_key } - replication_admin.add_peer(@peer_id, args, table_cfs_str) - end - end - define_test "get_peer_config: works with simple clusterKey peer" do cluster_key = "localhost:2181:/hbase-test" args = { CLUSTER_KEY => cluster_key } @@ -180,8 +214,8 @@ module Hbase config_params = { "config1" => "value1", "config2" => "value2" } args = { CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params } - command(:add_peer, @peer_id, args) - peer_config = command(:get_peer_config, @peer_id) + replication_admin.add_peer(@peer_id, args) + peer_config = replication_admin.get_peer_config(@peer_id) assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(repl_impl, peer_config.get_replication_endpoint_impl) assert_equal(2, peer_config.get_configuration.size)