HBASE-17904 Get runs into NoSuchElementException when using Read Replica, with hbase. ipc.client.specificThreadForWriting to be true and hbase.rpc.client.impl to be org.apache.hadoop.hbase.ipc.RpcClientImpl (Huaxiang Sun)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7678855f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7678855f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7678855f Branch: refs/heads/HBASE-16961 Commit: 7678855fac011a9c02e5d6a42470c0178482a4ce Parents: 0cd4cec Author: Michael Stack <st...@apache.org> Authored: Sun Apr 16 11:00:57 2017 -0700 Committer: Michael Stack <st...@apache.org> Committed: Sun Apr 16 11:01:06 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/ipc/BlockingRpcConnection.java | 2 +- .../hbase/client/TestReplicaWithCluster.java | 50 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7678855f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 15eb10c..1012ad0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -156,7 +156,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { } public void remove(Call call) { - callsToWrite.remove(); + callsToWrite.remove(call); // By removing the call from the expected call list, we make the list smaller, but // it means as well that we don't know how many calls we cancelled. calls.remove(call.id); http://git-wip-us.apache.org/repos/asf/hbase/blob/7678855f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index becb2eb..2c77541 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Waiter; + import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -515,7 +516,56 @@ public class TestReplicaWithCluster { Assert.assertTrue(r.isStale()); } finally { + HTU.getAdmin().disableTable(hdt.getTableName()); + HTU.deleteTable(hdt.getTableName()); + } + } + + @Test + public void testReplicaGetWithRpcClientImpl() throws IOException { + HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); + HTU.getConfiguration().set("hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.RpcClientImpl"); + // Create table then get the single region for our new table. + HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithRpcClientImpl"); + hdt.setRegionReplication(NB_SERVERS); + hdt.addCoprocessor(SlowMeCopro.class.getName()); + + try { + Table table = HTU.createTable(hdt, new byte[][] { f }, null); + + Put p = new Put(row); + p.addColumn(f, row, row); + table.put(p); + // Flush so it can be picked by the replica refresher thread + HTU.flush(table.getName()); + + // Sleep for some time until data is picked up by replicas + try { + Thread.sleep(2 * REFRESH_PERIOD); + } catch (InterruptedException e1) { + LOG.error(e1); + } + + try { + // Create the new connection so new config can kick in + Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table t = connection.getTable(hdt.getTableName()); + + // But if we ask for stale we will get it + SlowMeCopro.cdl.set(new CountDownLatch(1)); + Get g = new Get(row); + g.setConsistency(Consistency.TIMELINE); + Result r = t.get(g); + Assert.assertTrue(r.isStale()); + SlowMeCopro.cdl.get().countDown(); + } finally { + SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.sleepTime.set(0); + } + } finally { + HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting"); + HTU.getConfiguration().unset("hbase.rpc.client.impl"); HTU.getAdmin().disableTable(hdt.getTableName()); HTU.deleteTable(hdt.getTableName()); }