HBASE-18282 ReplicationLogCleaner can delete WALs not yet replicated in case of a KeeperException
Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef847f84 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef847f84 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef847f84 Branch: refs/heads/branch-1.2 Commit: ef847f8417b0a300f242fe76769d46d7efb86570 Parents: 0f3bf54 Author: Ben Lau <ben...@oath.com> Authored: Wed Feb 14 11:36:04 2018 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Feb 14 17:23:38 2018 -0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationQueues.java | 3 +- .../ReplicationQueuesClientZKImpl.java | 5 ++ .../replication/ReplicationQueuesZKImpl.java | 10 ++++ .../replication/ReplicationStateZKBase.java | 8 ++- .../cleaner/ReplicationZKLockCleanerChore.java | 4 +- .../master/ReplicationLogCleaner.java | 10 +++- .../hbase/master/cleaner/TestLogsCleaner.java | 54 ++++++++++++++++---- 7 files changed, 79 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 3dbbc33..f1457e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -104,8 +104,9 @@ public interface ReplicationQueues { * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. * @return a list of server names + * @throws ReplicationException */ - List<String> getListOfReplicators(); + List<String> getListOfReplicators() throws ReplicationException; /** * Checks if the provided znode is the same as this region server's http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index e1a6a49..93a932f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -47,6 +47,11 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem } @Override + public List<String> getListOfReplicators() throws KeeperException { + return super.getListOfReplicatorsZK(); + } + + @Override public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException { String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); znode = ZKUtil.joinZNode(znode, queueId); http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 35e5087..3085394 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -88,6 +88,16 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override + public List<String> getListOfReplicators() throws ReplicationException { + try { + return super.getListOfReplicatorsZK(); + } catch (KeeperException e) { + LOG.warn("getListOfReplicators() from ZK failed", e); + throw new ReplicationException("getListOfReplicators() from ZK failed", e); + } + } + + @Override public void removeQueue(String queueId) { try { ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId)); http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 4fbac0f..75c13d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -76,12 +76,18 @@ public abstract class ReplicationStateZKBase { this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); } - public List<String> getListOfReplicators() { + /** + * Subclasses that use ZK explicitly can just call this directly while classes + * that are trying to hide internal details of storage can wrap the KeeperException + * into a ReplicationException or something else. + */ + protected List<String> getListOfReplicatorsZK() throws KeeperException { List<String> result = null; try { result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); } catch (KeeperException e) { this.abortable.abort("Failed to get list of replicators", e); + throw e; } return result; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java index 3fa30bf..7c50719 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.ReplicationTracker; @@ -108,7 +109,8 @@ public class ReplicationZKLockCleanerChore extends ScheduledChore { } } catch (KeeperException e) { LOG.warn("zk operation interrupted", e); + } catch (ReplicationException e2) { + LOG.warn("replication exception", e2); } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 7731240..42d66a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -156,6 +156,14 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } } + @VisibleForTesting + public void setConf(Configuration conf, ZooKeeperWatcher zk, + ReplicationQueuesClient replicationQueuesClient) { + super.setConf(conf); + this.zkw = zk; + this.replicationQueues = replicationQueuesClient; + } + @Override public void stop(String why) { if (this.stopped) return; @@ -171,7 +179,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { return this.stopped; } - private static class WarnOnlyAbortable implements Abortable { + public static class WarnOnlyAbortable implements Abortable { @Override public void abort(String why, Throwable e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 8efa754..df5916c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doAnswer; import java.io.IOException; import java.lang.reflect.Field; @@ -29,6 +30,7 @@ import java.net.URLEncoder; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; @@ -55,12 +57,13 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @Category(MediumTests.class) public class TestLogsCleaner { @@ -177,13 +180,10 @@ public class TestLogsCleaner { cleaner.getDeletableFiles(new LinkedList<FileStatus>()); } - /** - * ReplicationLogCleaner should be able to ride over ZooKeeper errors without - * aborting. - */ - @Test - public void testZooKeeperAbort() throws Exception { + @Test(timeout=10000) + public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); List<FileStatus> dummyFiles = Lists.newArrayList( @@ -193,19 +193,51 @@ public class TestLogsCleaner { FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); + final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); + try { faultyZK.init(); - cleaner.setConf(conf, faultyZK); + ReplicationQueuesClient replicationQueuesClient = spy(ReplicationFactory.getReplicationQueuesClient( + faultyZK, conf, new ReplicationLogCleaner.WarnOnlyAbortable())); + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return invocation.callRealMethod(); + } catch (KeeperException.ConnectionLossException e) { + getListOfReplicatorsFailed.set(true); + throw e; + } + } + }).when(replicationQueuesClient).getListOfReplicators(); + replicationQueuesClient.init(); + + cleaner.setConf(conf, faultyZK, replicationQueuesClient); // should keep all files due to a ConnectionLossException getting the queues znodes Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); + + assertTrue(getListOfReplicatorsFailed.get()); assertFalse(toDelete.iterator().hasNext()); assertFalse(cleaner.isStopped()); } finally { faultyZK.close(); } + } + + /** + * When zk is working both files should be returned + * @throws Exception + */ + @Test(timeout=10000) + public void testZooKeeperNormal() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - // when zk is working both files should be returned - cleaner = new ReplicationLogCleaner(); + List<FileStatus> dummyFiles = Lists.newArrayList( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) + ); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null); try { cleaner.setConf(conf, zkw); @@ -291,7 +323,7 @@ public class TestLogsCleaner { public void init() throws Exception { this.zk = spy(super.getRecoverableZooKeeper()); doThrow(new KeeperException.ConnectionLossException()) - .when(zk).getData("/hbase/replication/rs", null, new Stat()); + .when(zk).getChildren("/hbase/replication/rs", null); } public RecoverableZooKeeper getRecoverableZooKeeper() {