Repository: hbase
Updated Branches:
  refs/heads/master d44e7df5d -> e28ec7246


http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
new file mode 100644
index 0000000..d8d576f
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class by setting up region replicas 
and verifying
+ * async wal replication replays the edits to the secondary region in various 
scenarios.
+ */
+@Category(MediumTests.class)
+public class TestRegionReplicaReplicationEndpoint {
+
+  private static final Log LOG = 
LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
+
+  static {
+    ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private static final int NB_SERVERS = 2;
+
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+    conf.setInt("replication.source.size.capacity", 10240);
+    conf.setLong("replication.source.sleepforretries", 100);
+    conf.setInt("hbase.regionserver.maxlogs", 10);
+    conf.setLong("hbase.master.logcleaner.ttl", 10);
+    conf.setInt("zookeeper.recovery.retry", 1);
+    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, 
true);
+    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf.setInt("replication.stats.thread.period.seconds", 5);
+    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); // less number of 
retries is needed
+    conf.setInt("hbase.client.serverside.retries.multiplier", 1);
+
+    HTU.startMiniCluster(NB_SERVERS);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRegionReplicaReplicationPeerIsCreated() throws IOException, 
ReplicationException {
+    // create a table with region replicas. Check whether the replication peer 
is created
+    // and replication started.
+    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
+    String peerId = "region_replica_replication";
+
+    if (admin.getPeerConfig(peerId) != null) {
+      admin.removePeer(peerId);
+    }
+
+    HTableDescriptor htd = HTU.createTableDescriptor(
+      "testReplicationPeerIsCreated_no_region_replicas");
+    HTU.getHBaseAdmin().createTable(htd);
+    ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
+    assertNull(peerConfig);
+
+    htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
+    htd.setRegionReplication(2);
+    HTU.getHBaseAdmin().createTable(htd);
+
+    // assert peer configuration is correct
+    peerConfig = admin.getPeerConfig(peerId);
+    assertNotNull(peerConfig);
+    assertEquals(peerConfig.getClusterKey(), 
ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
+    assertEquals(peerConfig.getReplicationEndpointImpl(),
+      RegionReplicaReplicationEndpoint.class.getName());
+    admin.close();
+  }
+
+
+  public void testRegionReplicaReplication(int regionReplication) throws 
Exception {
+    // test region replica replication. Create a table with single region, 
write some data
+    // ensure that data is replicated to the secondary region
+    TableName tableName = 
TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+        + regionReplication);
+    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+    htd.setRegionReplication(regionReplication);
+    HTU.getHBaseAdmin().createTable(htd);
+    TableName tableNameNoReplicas =
+        
TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
+    HTU.deleteTableIfAny(tableNameNoReplicas);
+    HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
+
+    HConnection connection = 
HConnectionManager.createConnection(HTU.getConfiguration());
+    HTableInterface table = connection.getTable(tableName);
+    HTableInterface tableNoReplicas = connection.getTable(tableNameNoReplicas);
+
+    try {
+      // load some data to the non-replicated table
+      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 
7000);
+
+      // load the data to the table
+      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+
+    } finally {
+      table.close();
+      tableNoReplicas.close();
+      HTU.deleteTableIfAny(tableNameNoReplicas);
+      connection.close();
+    }
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication,
+      final int startRow, final int endRow) throws Exception {
+    // find the regions
+    final HRegion[] regions = new HRegion[regionReplication];
+
+    for (int i=0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getOnlineRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+
+    for (HRegion region : regions) {
+      assertNotNull(region);
+    }
+
+    for (int i = 1; i < regionReplication; i++) {
+      final HRegion region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 60000, new 
Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("verifying replication for region replica:" + 
region.getRegionInfo());
+          try {
+            HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, 
endRow);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet. 
Got:" + ex
+              + " " + ex.getMessage());
+            // still wait
+            return false;
+          }
+          return true;
+        }
+      });
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
+    testRegionReplicaReplication(2);
+  }
+
+  @Test(timeout = 60000)
+  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
+    testRegionReplicaReplication(3);
+  }
+
+  @Test(timeout = 60000)
+  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
+    testRegionReplicaReplication(10);
+  }
+
+  @Test (timeout = 60000)
+  public void testRegionReplicaReplicationForFlushAndCompaction() throws 
Exception {
+    // Tests a table with region replication 3. Writes some data, and causes 
flushes and
+    // compactions. Verifies that the data is readable from the replicas. Note 
that this
+    // does not test whether the replicas actually pick up flushed files and 
apply compaction
+    // to their stores
+    int regionReplication = 3;
+    TableName tableName = 
TableName.valueOf("testRegionReplicaReplicationForFlushAndCompaction");
+    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+    htd.setRegionReplication(regionReplication);
+    HTU.getHBaseAdmin().createTable(htd);
+
+
+    HConnection connection = 
HConnectionManager.createConnection(HTU.getConfiguration());
+    HTableInterface table = connection.getTable(tableName);
+
+    try {
+      // load the data to the table
+
+      for (int i = 0; i < 6000; i += 1000) {
+        LOG.info("Writing data from " + i + " to " + (i+1000));
+        HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        HTU.compact(tableName, false);
+      }
+
+      verifyReplication(tableName, regionReplication, 0, 6000);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  @Test (timeout = 60000)
+  public void testRegionReplicaReplicationIgnoresDisabledTables() throws 
Exception {
+    testRegionReplicaReplicationIgnoresDisabledTables(false);
+  }
+
+  @Test (timeout = 60000)
+  public void testRegionReplicaReplicationIgnoresDroppedTables() throws 
Exception {
+    testRegionReplicaReplicationIgnoresDisabledTables(true);
+  }
+
+  public void testRegionReplicaReplicationIgnoresDisabledTables(boolean 
dropTable)
+      throws Exception {
+    // tests having edits from a disabled or dropped table is handled 
correctly by skipping those
+    // entries and further edits after the edits from dropped/disabled table 
can be replicated
+    // without problems.
+    TableName tableName = 
TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables"
+      + dropTable);
+    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+    int regionReplication = 3;
+    htd.setRegionReplication(regionReplication);
+    HTU.deleteTableIfAny(tableName);
+    HTU.getHBaseAdmin().createTable(htd);
+    TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" 
: "disabledTable");
+    HTU.deleteTableIfAny(toBeDisabledTable);
+    htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
+    htd.setRegionReplication(regionReplication);
+    HTU.getHBaseAdmin().createTable(htd);
+
+    // both tables are created, now pause replication
+    ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
+    admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+
+    // now that the replication is disabled, write to the table to be dropped, 
then drop the table.
+
+    HConnection connection = 
HConnectionManager.createConnection(HTU.getConfiguration());
+    HTableInterface table = connection.getTable(tableName);
+    HTableInterface tableToBeDisabled = connection.getTable(toBeDisabledTable);
+
+    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 
7000);
+
+    AtomicLong skippedEdits = new AtomicLong();
+    RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
+        mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
+    when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
+    RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
+        new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
+          (ClusterConnection) connection,
+          Executors.newSingleThreadExecutor(), 1000);
+
+    HRegionLocation hrl = connection.locateRegion(toBeDisabledTable, 
HConstants.EMPTY_BYTE_ARRAY);
+    byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
+
+    HLog.Entry entry = new HLog.Entry(
+      new HLogKey(encodedRegionName, toBeDisabledTable, 1),
+      new WALEdit());
+
+    HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
+    if (dropTable) {
+      HTU.getHBaseAdmin().deleteTable(toBeDisabledTable);
+    }
+
+    sinkWriter.append(toBeDisabledTable, encodedRegionName,
+      HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
+
+    assertEquals(2, skippedEdits.get());
+
+    try {
+      // load some data to the to-be-dropped table
+
+      // load the data to the table
+      HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
+
+      // now enable the replication
+      admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+
+    } finally {
+      admin.close();
+      table.close();
+      tableToBeDisabled.close();
+      HTU.deleteTableIfAny(toBeDisabledTable);
+      connection.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e28ec724/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
new file mode 100644
index 0000000..4e879e4
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
+import static 
org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
+import 
org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint. Unlike 
TestRegionReplicaReplicationEndpoint this
+ * class contains lower level tests using callables.
+ */
+@Category(MediumTests.class)
+public class TestRegionReplicaReplicationEndpointNoMaster {
+
+  private static final Log LOG = LogFactory.getLog(
+    TestRegionReplicaReplicationEndpointNoMaster.class);
+
+  private static final int NB_SERVERS = 2;
+  private static TableName tableName = TableName.valueOf(
+    TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
+  private static HTable table;
+  private static final byte[] row = "TestRegionReplicaReplicator".getBytes();
+
+  private static HRegionServer rs0;
+  private static HRegionServer rs1;
+
+  private static HRegionInfo hriPrimary;
+  private static HRegionInfo hriSecondary;
+
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+  private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, 
true);
+
+    // install WALObserver coprocessor for tests
+    String walCoprocs = 
HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
+    if (walCoprocs == null) {
+      walCoprocs = WALEditCopro.class.getName();
+    } else {
+      walCoprocs += "," + WALEditCopro.class.getName();
+    }
+    HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+      walCoprocs);
+    HTU.startMiniCluster(NB_SERVERS);
+
+    // Create table then get the single region for our new table.
+    HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
+    table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration());
+
+    hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
+
+    // mock a secondary region info to open
+    hriSecondary = new HRegionInfo(hriPrimary.getTable(), 
hriPrimary.getStartKey(),
+        hriPrimary.getEndKey(), hriPrimary.isSplit(), 
hriPrimary.getRegionId(), 1);
+
+    // No master
+    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
+    rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
+    rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    table.close();
+    HTU.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception{
+    entries.clear();
+  }
+
+  @After
+  public void after() throws Exception {
+  }
+
+  static ConcurrentLinkedQueue<HLog.Entry> entries = new 
ConcurrentLinkedQueue<HLog.Entry>();
+
+  public static class WALEditCopro extends BaseWALObserver {
+    public WALEditCopro() {
+      entries.clear();
+    }
+    @Override
+    public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> ctx, 
HRegionInfo info,
+        HLogKey logKey, WALEdit logEdit) throws IOException {
+      // only keep primary region's edits
+      if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) 
{
+        entries.add(new HLog.Entry(logKey, logEdit));
+      }
+    }
+  }
+
+  @Test
+  public void testReplayCallable() throws Exception {
+    // tests replaying the edits to a secondary region replica using the 
Callable directly
+    openRegion(HTU, rs0, hriSecondary);
+    ClusterConnection connection =
+        (ClusterConnection) 
HConnectionManager.createConnection(HTU.getConfiguration());
+
+    //load some data to primary
+    HTU.loadNumericRows(table, f, 0, 1000);
+
+    Assert.assertEquals(1000, entries.size());
+    // replay the edits to the secondary using replay callable
+    replicateUsingCallable(connection, entries);
+
+    HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
+    HTU.verifyNumericRows(region, f, 0, 1000);
+
+    HTU.deleteNumericRows(table, f, 0, 1000);
+    closeRegion(HTU, rs0, hriSecondary);
+    connection.close();
+  }
+
+  private void replicateUsingCallable(ClusterConnection connection, 
Queue<HLog.Entry> entries)
+      throws IOException, RuntimeException {
+    HLog.Entry entry;
+    while ((entry = entries.poll()) != null) {
+      byte[] row = entry.getEdit().getKeyValues().get(0).getRow();
+      RegionLocations locations = connection.locateRegion(tableName, row, 
true, true);
+      RegionReplicaReplayCallable callable = new 
RegionReplicaReplayCallable(connection,
+        RpcControllerFactory.instantiate(connection.getConfiguration()),
+        table.getName(), locations.getRegionLocation(1),
+        locations.getRegionLocation(1).getRegionInfo(), row, 
Lists.newArrayList(entry),
+        new AtomicLong());
+
+      RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
+        connection.getConfiguration());
+      factory.<ReplicateWALEntryResponse> 
newCaller().callWithRetries(callable, 10000);
+    }
+  }
+
+  @Test
+  public void testReplayCallableWithRegionMove() throws Exception {
+    // tests replaying the edits to a secondary region replica using the 
Callable directly while
+    // the region is moved to another location.It tests handling of RME.
+    openRegion(HTU, rs0, hriSecondary);
+    ClusterConnection connection =
+        (ClusterConnection) 
HConnectionManager.createConnection(HTU.getConfiguration());
+    //load some data to primary
+    HTU.loadNumericRows(table, f, 0, 1000);
+
+    Assert.assertEquals(1000, entries.size());
+    // replay the edits to the secondary using replay callable
+    replicateUsingCallable(connection, entries);
+
+    HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
+    HTU.verifyNumericRows(region, f, 0, 1000);
+
+    HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to 
primary
+
+    // move the secondary region from RS0 to RS1
+    closeRegion(HTU, rs0, hriSecondary);
+    openRegion(HTU, rs1, hriSecondary);
+
+    // replicate the new data
+    replicateUsingCallable(connection, entries);
+
+    region = rs1.getFromOnlineRegions(hriSecondary.getEncodedName());
+    // verify the new data. old data may or may not be there
+    HTU.verifyNumericRows(region, f, 1000, 2000);
+
+    HTU.deleteNumericRows(table, f, 0, 2000);
+    closeRegion(HTU, rs1, hriSecondary);
+    connection.close();
+  }
+
+  @Test
+  public void testRegionReplicaReplicationEndpointReplicate() throws Exception 
{
+    // tests replaying the edits to a secondary region replica using the 
RRRE.replicate()
+    openRegion(HTU, rs0, hriSecondary);
+    ClusterConnection connection =
+        (ClusterConnection) 
HConnectionManager.createConnection(HTU.getConfiguration());
+    RegionReplicaReplicationEndpoint replicator = new 
RegionReplicaReplicationEndpoint();
+
+    ReplicationEndpoint.Context context = 
mock(ReplicationEndpoint.Context.class);
+    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+
+    replicator.init(context);
+    replicator.start();
+
+    //load some data to primary
+    HTU.loadNumericRows(table, f, 0, 1000);
+
+    Assert.assertEquals(1000, entries.size());
+    // replay the edits to the secondary using replay callable
+    replicator.replicate(new 
ReplicateContext().setEntries(Lists.newArrayList(entries)));
+
+    HRegion region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
+    HTU.verifyNumericRows(region, f, 0, 1000);
+
+    HTU.deleteNumericRows(table, f, 0, 1000);
+    closeRegion(HTU, rs0, hriSecondary);
+    connection.close();
+  }
+
+}

Reply via email to