HBASE-12769 Replication fails to delete all corresponding zk nodes when peer is removed (Jianwei Cui)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/210c3dd9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/210c3dd9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/210c3dd9 Branch: refs/heads/hbase-12439 Commit: 210c3dd93748b5de65301f2cca2342f36e169b78 Parents: e24d03b Author: tedyu <yuzhih...@gmail.com> Authored: Tue Oct 27 19:40:40 2015 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Tue Oct 27 19:40:40 2015 -0700 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 7 +- .../hbase/replication/ReplicationFactory.java | 7 +- .../replication/ReplicationPeersZKImpl.java | 25 +++- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 39 +++++- .../hbase/util/hbck/ReplicationChecker.java | 134 +++++++++++++++++++ .../replication/TestReplicationAdmin.java | 34 +++++ .../hadoop/hbase/util/BaseTestHBaseFsck.java | 3 +- .../hadoop/hbase/util/TestHBaseFsckOneRS.java | 59 +++++++- .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 11 +- 9 files changed, 303 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index b33e64d..8bd1267 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -125,11 +125,12 @@ public class ReplicationAdmin implements Closeable { try { zkw = createZooKeeperWatcher(); try { - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection); - this.replicationPeers.init(); this.replicationQueuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); this.replicationQueuesClient.init(); + this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, + this.replicationQueuesClient, this.connection); + this.replicationPeers.init(); } catch (Exception exception) { if (zkw != null) { zkw.close(); @@ -187,7 +188,7 @@ public class ReplicationAdmin implements Closeable { this.replicationPeers.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs); } - + /** * Add a new remote slave cluster for replication. * @param id a short name that identifies the cluster http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index f115a39..91e77ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -42,7 +42,12 @@ public class ReplicationFactory { public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { - return new ReplicationPeersZKImpl(zk, conf, abortable); + return getReplicationPeers(zk, conf, null, abortable); + } + + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, + final ReplicationQueuesClient queuesClient, Abortable abortable) { + return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); } public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper, http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index a223531..1884469 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -81,14 +81,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // Map of peer clusters keyed by their id private Map<String, ReplicationPeerZKImpl> peerClusters; private final String tableCFsNodeName; + private final ReplicationQueuesClient queuesClient; private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class); public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf, - Abortable abortable) { + final ReplicationQueuesClient queuesClient, Abortable abortable) { super(zk, conf, abortable); this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>(); + this.queuesClient = queuesClient; } @Override @@ -116,6 +118,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re throw new IllegalArgumentException("Found invalid peer name:" + id); } + checkQueuesDeleted(id); + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), @@ -561,5 +565,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return ProtobufUtil.prependPBMagic(bytes); } - + private void checkQueuesDeleted(String peerId) throws ReplicationException { + if (queuesClient == null) return; + try { + List<String> replicators = queuesClient.getListOfReplicators(); + for (String replicator : replicators) { + List<String> queueIds = queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + throw new ReplicationException("undeleted queue for peerId: " + peerId + + ", replicator: " + replicator + ", queueId: " + queueId); + } + } + } + } catch (KeeperException e) { + throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index e55b53f..88c5427 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; +import org.apache.hadoop.hbase.util.hbck.ReplicationChecker; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; import org.apache.hadoop.hbase.util.hbck.TableLockChecker; @@ -246,6 +247,7 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixReferenceFiles = false; // fix lingering reference store file private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows private boolean fixTableLocks = false; // fix table locks which are expired + private boolean fixReplication = false; // fix undeleted replication queues for removed peer private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all @@ -702,6 +704,8 @@ public class HBaseFsck extends Configured implements Closeable { checkAndFixTableLocks(); + checkAndFixReplication(); + // Remove the hbck lock unlockHbck(); @@ -3257,12 +3261,29 @@ public class HBaseFsck extends Configured implements Closeable { } private void checkAndFixTableLocks() throws IOException { - TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors); + ZooKeeperWatcher zkw = createZooKeeperWatcher(); + TableLockChecker checker = new TableLockChecker(zkw, errors); checker.checkTableLocks(); if (this.fixTableLocks) { checker.fixExpiredTableLocks(); } + zkw.close(); + } + + private void checkAndFixReplication() throws IOException { + ZooKeeperWatcher zkw = createZooKeeperWatcher(); + try { + ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors); + checker.checkUnDeletedQueues(); + + if (checker.hasUnDeletedQueues() && this.fixReplication) { + checker.fixUnDeletedQueues(); + setShouldRerun(); + } + } finally { + zkw.close(); + } } /** @@ -3801,7 +3822,7 @@ public class HBaseFsck extends Configured implements Closeable { HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, ORPHAN_TABLE_STATE, - NO_TABLE_STATE + NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE } void clear(); void report(String message); @@ -4202,6 +4223,14 @@ public class HBaseFsck extends Configured implements Closeable { fixTableLocks = shouldFix; fixAny |= shouldFix; } + + /** + * Set replication fix mode. + */ + public void setFixReplication(boolean shouldFix) { + fixReplication = shouldFix; + fixAny |= shouldFix; + } /** * Check if we should rerun fsck again. This checks if we've tried to @@ -4462,6 +4491,10 @@ public class HBaseFsck extends Configured implements Closeable { out.println(" Table lock options"); out.println(" -fixTableLocks Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)"); + out.println(""); + out.println(" Replication options"); + out.println(" -fixReplication Deletes replication queues for removed peers"); + out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -4647,6 +4680,8 @@ public class HBaseFsck extends Configured implements Closeable { setRegionBoundariesCheck(); } else if (cmd.equals("-fixTableLocks")) { setFixTableLocks(true); + } else if (cmd.equals("-fixReplication")) { + setFixReplication(true); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java new file mode 100644 index 0000000..bf44a50 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.hbck; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.util.HBaseFsck; +import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/* + * Check and fix undeleted replication queues for removed peerId. + */ +@InterfaceAudience.Private +public class ReplicationChecker { + private static final Log LOG = LogFactory.getLog(ReplicationChecker.class); + private ErrorReporter errorReporter; + private ReplicationQueuesClient queuesClient; + private ReplicationPeers replicationPeers; + private ReplicationQueueDeletor queueDeletor; + // replicator with its queueIds for removed peers + private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>(); + + public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection, + ErrorReporter errorReporter) throws IOException { + try { + this.errorReporter = errorReporter; + this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection); + this.queuesClient.init(); + this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, + connection); + this.replicationPeers.init(); + this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection); + } catch (ReplicationException e) { + throw new IOException("failed to construct ReplicationChecker", e); + } + } + + public boolean hasUnDeletedQueues() { + return errorReporter.getErrorList() + .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); + } + + public void checkUnDeletedQueues() throws IOException { + Set<String> peerIds = new HashSet<String>(this.replicationPeers.getAllPeerIds()); + try { + List<String> replicators = this.queuesClient.getListOfReplicators(); + for (String replicator : replicators) { + List<String> queueIds = this.queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + if (!undeletedQueueIds.containsKey(replicator)) { + undeletedQueueIds.put(replicator, new ArrayList<String>()); + } + undeletedQueueIds.get(replicator).add(queueId); + + String msg = "Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", + queueInfo.getPeerId(), replicator, queueId); + errorReporter.reportError( + HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); + } + } + } + } catch (KeeperException ke) { + throw new IOException(ke); + } + } + + private static class ReplicationQueueDeletor extends ReplicationStateZKBase { + public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { + super(zk, conf, abortable); + } + + public void removeQueue(String replicator, String queueId) throws IOException { + String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), + queueId); + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); + LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId); + } catch (KeeperException e) { + throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: " + + queueId); + } + } + } + + public void fixUnDeletedQueues() throws IOException { + for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { + String replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + queueDeletor.removeQueue(replicator, queueId); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index e126205..e187b9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -26,8 +26,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -114,6 +117,37 @@ public class TestReplicationAdmin { admin.removePeer(ID_SECOND); assertEquals(0, admin.getPeersCount()); } + + @Test + public void testAddPeerWithUnDeletedQueues() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(zkw, conf, null); + repQueues.init("server1"); + + // add queue for ID_ONE + repQueues.addLog(ID_ONE, "file1"); + try { + admin.addPeer(ID_ONE, KEY_ONE); + fail(); + } catch (ReplicationException e) { + // OK! + } + repQueues.removeQueue(ID_ONE); + assertEquals(0, repQueues.getAllQueues().size()); + + // add recovered queue for ID_ONE + repQueues.addLog(ID_ONE + "-server2", "file1"); + try { + admin.addPeer(ID_ONE, KEY_ONE); + fail(); + } catch (ReplicationException e) { + // OK! + } + repQueues.removeAllQueues(); + zkw.close(); + } /** * basic checks that when we add a peer that it is enabled, and that we can disable http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java index 7459a7d..8e8bb41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/BaseTestHBaseFsck.java @@ -498,7 +498,8 @@ public class BaseTestHBaseFsck { // fix hole assertErrors( - doFsck(conf, false, true, false, false, false, false, false, false, false, false, null), + doFsck(conf, false, true, false, false, false, false, false, false, false, false, false, + null), new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.NOT_IN_META_OR_DEPLOYED }); http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java index a44ccd1..df3c69c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.io.hfile.TestHFile; import org.apache.hadoop.hbase.master.AssignmentManager; @@ -50,10 +51,13 @@ import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -862,7 +866,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck { // for some time until children references are deleted. HBCK erroneously sees this as // overlapping regions HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, - false, null); + false, false, null); assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported // assert that the split hbase:meta entry is still there. @@ -941,7 +945,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck { // now fix it. The fix should not revert the region split, but add daughters to META hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, - false, null); + false, false, null); assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] { HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED, HBaseFsck.ErrorReporter.ERROR_CODE.NOT_IN_META_OR_DEPLOYED, @@ -1474,4 +1478,55 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck { writeLock.release(); // release for clean state tableLockManager.tableDeleted(tableName); } + + @Test(timeout=180000) + public void testCheckReplication() throws Exception { + // check no errors + HBaseFsck hbck = doFsck(conf, false); + assertNoErrors(hbck); + + // create peer + ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf); + Assert.assertEquals(0, replicationAdmin.getPeersCount()); + int zkPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + replicationAdmin.addPeer("1", "127.0.0.1:" + zkPort + ":/hbase"); + replicationAdmin.getPeersCount(); + Assert.assertEquals(1, replicationAdmin.getPeersCount()); + + // create replicator + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(zkw, conf, connection); + repQueues.init("server1"); + // queues for current peer, no errors + repQueues.addLog("1", "file1"); + repQueues.addLog("1-server2", "file1"); + Assert.assertEquals(2, repQueues.getAllQueues().size()); + hbck = doFsck(conf, false); + assertNoErrors(hbck); + + // queues for removed peer + repQueues.addLog("2", "file1"); + repQueues.addLog("2-server2", "file1"); + Assert.assertEquals(4, repQueues.getAllQueues().size()); + hbck = doFsck(conf, false); + assertErrors(hbck, new HBaseFsck.ErrorReporter.ERROR_CODE[] { + HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, + HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE }); + + // fix the case + hbck = doFsck(conf, true); + hbck = doFsck(conf, false); + assertNoErrors(hbck); + // ensure only "2" is deleted + Assert.assertEquals(2, repQueues.getAllQueues().size()); + Assert.assertNull(repQueues.getLogsInQueue("2")); + Assert.assertNull(repQueues.getLogsInQueue("2-sever2")); + + replicationAdmin.removePeer("1"); + repQueues.removeAllQueues(); + zkw.close(); + replicationAdmin.close(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/210c3dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index a28378e..d1e774e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -40,13 +40,13 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, TableName table) throws Exception { - return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); } - public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, - boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, - boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile, - boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, + public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, + boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, + boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, + boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { @@ -62,6 +62,7 @@ public class HbckTestingUtil { fsck.setFixReferenceFiles(fixReferenceFiles); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); fsck.setFixTableLocks(fixTableLocks); + fsck.setFixReplication(fixReplication); if (table != null) { fsck.includeTable(table); }