Fix the failing TestReplicationSyncUpToolWithBulkLoadedData test
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35cf5504 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35cf5504 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35cf5504 Branch: refs/heads/HBASE-16264 Commit: 35cf55048fe033373f6ae29dd6d3716ed7b18842 Parents: b4a729e Author: stack <st...@apache.org> Authored: Thu Sep 29 17:27:20 2016 -0700 Committer: stack <st...@apache.org> Committed: Thu Sep 29 17:27:20 2016 -0700 ---------------------------------------------------------------------- .../replication/ReplicationStateZKBase.java | 20 +- hbase-endpoint/pom.xml | 22 ++ ...ReplicationSyncUpToolWithBulkLoadedData.java | 235 +++++++++++++++++++ ...ReplicationSyncUpToolWithBulkLoadedData.java | 235 ------------------- 4 files changed, 274 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/35cf5504/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 496ab50..8fc7c07 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -18,9 +18,13 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.List; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -106,9 +110,19 @@ public abstract class ReplicationStateZKBase { * /hbase/replication/peers/PEER_ID/peer-state. */ protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) { - byte[] bytes = - ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build().toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); + ZooKeeperProtos.ReplicationState msg = + ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build(); + // There is no toByteArray on this pb Message? + // 32 bytes is default which seems fair enough here. + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); + msg.writeTo(cos); + cos.flush(); + baos.flush(); + return ProtobufUtil.prependPBMagic(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } } protected boolean peerExists(String id) throws KeeperException { http://git-wip-us.apache.org/repos/asf/hbase/blob/35cf5504/hbase-endpoint/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml index 3b0ffd7..de05950 100644 --- a/hbase-endpoint/pom.xml +++ b/hbase-endpoint/pom.xml @@ -117,6 +117,28 @@ <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>${compat.module}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>${compat.module}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> http://git-wip-us.apache.org/repos/asf/hbase/blob/35cf5504/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java new file mode 100644 index 0000000..f54c632 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool { + + private static final Log LOG = LogFactory + .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); + conf1.set("hbase.replication.source.fs.conf.provider", + TestSourceFSConfigurationProvider.class.getCanonicalName()); + String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { + classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; + conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); + } + + TestReplicationBase.setUpBeforeClass(); + } + + @Override + public void testSyncUpTool() throws Exception { + /** + * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: + * 'cf1' : replicated 'norep': not replicated + */ + setupReplication(); + + /** + * Prepare 16 random hfile ranges required for creating hfiles + */ + Iterator<String> randomHFileRangeListIterator = null; + Set<String> randomHFileRanges = new HashSet<String>(16); + for (int i = 0; i < 16; i++) { + randomHFileRanges.add(UUID.randomUUID().toString()); + } + List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); + Collections.sort(randomHFileRangeList); + randomHFileRangeListIterator = randomHFileRangeList.iterator(); + + /** + * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows + * into cf1, and 3 rows into norep verify correctly replicated to slave + */ + loadAndReplicateHFiles(true, randomHFileRangeListIterator); + + /** + * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load + * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and + * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave + * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step + * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not + * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to + * Slave + */ + mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); + + } + + private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) + throws Exception { + LOG.debug("mimicSyncUpAfterBulkLoad"); + utility2.shutdownMiniHBaseCluster(); + + loadAndReplicateHFiles(false, randomHFileRangeListIterator); + + int rowCount_ht1Source = utility1.countRows(ht1Source); + assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, + rowCount_ht1Source); + + int rowCount_ht2Source = utility1.countRows(ht2Source); + assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, + rowCount_ht2Source); + + utility1.shutdownMiniHBaseCluster(); + utility2.restartHBaseCluster(1); + + Thread.sleep(SLEEP_TIME); + + // Before sync up + int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); + + // Run sync up tool + syncUp(utility1); + + // After syun up + for (int i = 0; i < NB_RETRIES; i++) { + syncUp(utility1); + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + if (i == NB_RETRIES - 1) { + if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) { + // syncUP still failed. Let's look at the source in case anything wrong there + utility1.restartHBaseCluster(1); + rowCount_ht1Source = utility1.countRows(ht1Source); + LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); + rowCount_ht2Source = utility1.countRows(ht2Source); + LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); + } + assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, + rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, + rowCount_ht2TargetAtPeer1); + } + if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) { + LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); + break; + } else { + LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" + + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + + rowCount_ht2TargetAtPeer1); + } + Thread.sleep(SLEEP_TIME); + } + } + + private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, + Iterator<String> randomHFileRangeListIterator) throws Exception { + LOG.debug("loadAndReplicateHFiles"); + + // Load 100 + 3 hfiles to t1_syncup. + byte[][][] hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, + 100); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, + hfileRanges, 3); + + // Load 200 + 3 hfiles to t2_syncup. + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, + 200); + + hfileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), + Bytes.toBytes(randomHFileRangeListIterator.next()) } }; + loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source, + hfileRanges, 3); + + if (verifyReplicationOnSlave) { + // ensure replication completed + wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3, + "t1_syncup has 103 rows on source, and 100 on slave1"); + + wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3, + "t2_syncup has 203 rows on source, and 200 on slave1"); + } + } + + private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, + Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { + Path dir = utility1.getDataTestDirOnTestFS(testName); + FileSystem fs = utility1.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(fam)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_" + + hfileIdx++), fam, row, from, to, numOfRows); + } + + final TableName tableName = source.getName(); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); + String[] args = { dir.toString(), tableName.toString() }; + loader.run(args); + } + + private void wait(Table target, int expectedCount, String msg) throws IOException, + InterruptedException { + for (int i = 0; i < NB_RETRIES; i++) { + int rowCount_ht2TargetAtPeer1 = utility2.countRows(target); + if (i == NB_RETRIES - 1) { + assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1); + } + if (expectedCount == rowCount_ht2TargetAtPeer1) { + break; + } + Thread.sleep(SLEEP_TIME); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/35cf5504/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java deleted file mode 100644 index f54c632..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.HFileTestUtil; -import org.junit.BeforeClass; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool { - - private static final Log LOG = LogFactory - .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); - conf1.set("hbase.replication.source.fs.conf.provider", - TestSourceFSConfigurationProvider.class.getCanonicalName()); - String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); - if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { - classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; - conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); - } - - TestReplicationBase.setUpBeforeClass(); - } - - @Override - public void testSyncUpTool() throws Exception { - /** - * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: - * 'cf1' : replicated 'norep': not replicated - */ - setupReplication(); - - /** - * Prepare 16 random hfile ranges required for creating hfiles - */ - Iterator<String> randomHFileRangeListIterator = null; - Set<String> randomHFileRanges = new HashSet<String>(16); - for (int i = 0; i < 16; i++) { - randomHFileRanges.add(UUID.randomUUID().toString()); - } - List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); - Collections.sort(randomHFileRangeList); - randomHFileRangeListIterator = randomHFileRangeList.iterator(); - - /** - * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows - * into cf1, and 3 rows into norep verify correctly replicated to slave - */ - loadAndReplicateHFiles(true, randomHFileRangeListIterator); - - /** - * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load - * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and - * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave - * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step - * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not - * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to - * Slave - */ - mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); - - } - - private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) - throws Exception { - LOG.debug("mimicSyncUpAfterBulkLoad"); - utility2.shutdownMiniHBaseCluster(); - - loadAndReplicateHFiles(false, randomHFileRangeListIterator); - - int rowCount_ht1Source = utility1.countRows(ht1Source); - assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206, - rowCount_ht1Source); - - int rowCount_ht2Source = utility1.countRows(ht2Source); - assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, - rowCount_ht2Source); - - utility1.shutdownMiniHBaseCluster(); - utility2.restartHBaseCluster(1); - - Thread.sleep(SLEEP_TIME); - - // Before sync up - int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); - assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); - assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); - - // Run sync up tool - syncUp(utility1); - - // After syun up - for (int i = 0; i < NB_RETRIES; i++) { - syncUp(utility1); - rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); - rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); - if (i == NB_RETRIES - 1) { - if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) { - // syncUP still failed. Let's look at the source in case anything wrong there - utility1.restartHBaseCluster(1); - rowCount_ht1Source = utility1.countRows(ht1Source); - LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = utility1.countRows(ht2Source); - LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); - } - assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, - rowCount_ht1TargetAtPeer1); - assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, - rowCount_ht2TargetAtPeer1); - } - if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) { - LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); - break; - } else { - LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" - + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" - + rowCount_ht2TargetAtPeer1); - } - Thread.sleep(SLEEP_TIME); - } - } - - private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, - Iterator<String> randomHFileRangeListIterator) throws Exception { - LOG.debug("loadAndReplicateHFiles"); - - // Load 100 + 3 hfiles to t1_syncup. - byte[][][] hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges, - 100); - - hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source, - hfileRanges, 3); - - // Load 200 + 3 hfiles to t2_syncup. - hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges, - 200); - - hfileRanges = - new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()), - Bytes.toBytes(randomHFileRangeListIterator.next()) } }; - loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source, - hfileRanges, 3); - - if (verifyReplicationOnSlave) { - // ensure replication completed - wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3, - "t1_syncup has 103 rows on source, and 100 on slave1"); - - wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3, - "t2_syncup has 203 rows on source, and 200 on slave1"); - } - } - - private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, - Table source, byte[][][] hfileRanges, int numOfRows) throws Exception { - Path dir = utility1.getDataTestDirOnTestFS(testName); - FileSystem fs = utility1.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(fam)); - - int hfileIdx = 0; - for (byte[][] range : hfileRanges) { - byte[] from = range[0]; - byte[] to = range[1]; - HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_" - + hfileIdx++), fam, row, from, to, numOfRows); - } - - final TableName tableName = source.getName(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration()); - String[] args = { dir.toString(), tableName.toString() }; - loader.run(args); - } - - private void wait(Table target, int expectedCount, String msg) throws IOException, - InterruptedException { - for (int i = 0; i < NB_RETRIES; i++) { - int rowCount_ht2TargetAtPeer1 = utility2.countRows(target); - if (i == NB_RETRIES - 1) { - assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1); - } - if (expectedCount == rowCount_ht2TargetAtPeer1) { - break; - } - Thread.sleep(SLEEP_TIME); - } - } -}