HBASE-16388 Prevent client threads being blocked by only one slow region server
Signed-off-by: stack <st...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8ef6c763 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8ef6c763 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8ef6c763 Branch: refs/heads/hbase-12439 Commit: 8ef6c76344127f2f4d2f9536d87fa6fc7b5c7132 Parents: 8540171 Author: Phil Yang <ud1...@gmail.com> Authored: Wed Sep 14 13:21:01 2016 +0800 Committer: stack <st...@apache.org> Committed: Wed Sep 14 09:08:20 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/ipc/AbstractRpcClient.java | 22 ++++ .../hbase/ipc/ServerTooBusyException.java | 38 ++++++ .../org/apache/hadoop/hbase/HConstants.java | 12 ++ .../src/main/resources/hbase-default.xml | 16 ++- .../org/apache/hadoop/hbase/client/TestHCM.java | 119 ++++++++++++++++++- 5 files changed, 201 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 098ad3c..401a240 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.MethodDescriptor; @@ -137,6 +140,16 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC private final ScheduledFuture<?> cleanupIdleConnectionTask; + private int maxConcurrentCallsPerServer; + + private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache = + CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS). + build(new CacheLoader<InetSocketAddress, AtomicInteger>() { + @Override public AtomicInteger load(InetSocketAddress key) throws Exception { + return new AtomicInteger(0); + } + }); + /** * Construct an IPC client for the cluster <code>clusterId</code> * @param conf configuration @@ -167,6 +180,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); this.metrics = metrics; + this.maxConcurrentCallsPerServer = conf.getInt( + HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, + HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD); this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); @@ -382,16 +398,22 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC final RpcCallback<Message> callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); + final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { @Override public void run(Call call) { + counter.decrementAndGet(); onCallFinished(call, hrc, addr, callback); } }, cs); ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); + int count = counter.incrementAndGet(); try { + if (count > maxConcurrentCallsPerServer) { + throw new ServerTooBusyException(addr, count); + } T connection = getConnection(remoteId); connection.sendRequest(call, hrc); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java new file mode 100644 index 0000000..c6ba030 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Throw this in rpc call if there are too many pending requests for one region server + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ServerTooBusyException extends DoNotRetryIOException { + + public ServerTooBusyException(InetSocketAddress address, long count) { + super("There are " + count + " concurrent rpc requests for " + address); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4c499a2..5c53030 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -738,6 +738,18 @@ public final class HConstants { public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1; /** + * The maximum number of concurrent pending RPC requests for one server in process level. + */ + public static final String HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD = + "hbase.client.perserver.requests.threshold"; + + /** + * Default value of {@link #HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD}. + */ + public static final int DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD = Integer.MAX_VALUE; + + + /** * Parameter name for server pause value, used mostly as value to wait before * running a retry of a failed operation. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index a791717..8315829 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -487,24 +487,34 @@ possible configurations would overwhelm and obscure the important. <property> <name>hbase.client.max.total.tasks</name> <value>100</value> - <description>The maximum number of concurrent tasks a single HTable instance will + <description>The maximum number of concurrent mutation tasks a single HTable instance will send to the cluster.</description> </property> <property> <name>hbase.client.max.perserver.tasks</name> <value>5</value> - <description>The maximum number of concurrent tasks a single HTable instance will + <description>The maximum number of concurrent mutation tasks a single HTable instance will send to a single region server.</description> </property> <property> <name>hbase.client.max.perregion.tasks</name> <value>1</value> - <description>The maximum number of concurrent connections the client will + <description>The maximum number of concurrent mutation tasks the client will maintain to a single Region. That is, if there is already hbase.client.max.perregion.tasks writes in progress for this region, new puts won't be sent to this region until some writes finishes.</description> </property> <property> + <name>hbase.client.perserver.requests.threshold</name> + <value>2147483647</value> + <description>The max number of concurrent pending requests for one server in all client threads + (process level). Exceeding requests will be thrown ServerTooBusyException immediately to prevent + user's threads being occupied and blocked by only one slow region server. If you use a fix + number of threads to access HBase in a synchronous way, set this to a suitable value which is + related to the number of threads will help you. See + https://issues.apache.org/jira/browse/HBASE-16388 for details.</description> + </property> + <property> <name>hbase.client.scanner.caching</name> <value>2147483647</value> <description>Number of rows that we try to fetch when calling next http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 4e61fd3..786f570 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; import com.google.common.collect.Lists; import java.io.IOException; @@ -61,10 +60,12 @@ import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.ServerTooBusyException; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -85,8 +86,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import com.google.common.collect.Lists; -import com.google.protobuf.RpcController; +import static org.junit.Assert.fail; /** * This class is for testing HBaseConnectionManager features @@ -150,6 +150,12 @@ public class TestHCM { final Get get, final List<Cell> results) throws IOException { Threads.sleep(SLEEP_TIME); } + + @Override + public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } } public static class SleepWriteCoprocessor extends BaseRegionObserver { @@ -187,6 +193,8 @@ public class TestHCM { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); // simulate queue blocking in testDropTimeoutRequest TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); + // Used in testServerBusyException + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3); TEST_UTIL.startMiniCluster(2); } @@ -1338,4 +1346,109 @@ public class TestHCM { table.close(); connection.close(); } + + private class TestPutThread extends Thread { + Table table; + int getServerBusyException = 0; + + TestPutThread(Table table){ + this.table = table; + } + + @Override + public void run() { + try { + Put p = new Put(ROW); + p.addColumn(FAM_NAM, new byte[]{0}, new byte[]{0}); + table.put(p); + } catch (RetriesExhaustedWithDetailsException e) { + if (e.exceptions.get(0).getCause() instanceof ServerTooBusyException) { + getServerBusyException = 1; + } + } catch (IOException ignore) { + } + } + } + + private class TestGetThread extends Thread { + Table table; + int getServerBusyException = 0; + + TestGetThread(Table table){ + this.table = table; + } + + @Override + public void run() { + try { + Get g = new Get(ROW); + g.addColumn(FAM_NAM, new byte[]{0}); + table.get(g); + } catch (RetriesExhaustedException e) { + if (e.getCause().getCause() instanceof ServerTooBusyException) { + getServerBusyException = 1; + } + } catch (IOException ignore) { + } + } + } + + @Test() + public void testServerBusyException() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testServerBusy"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c); + + TestGetThread tg1 = + new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestGetThread tg2 = + new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestGetThread tg3 = + new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestGetThread tg4 = + new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestGetThread tg5 = + new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + tg1.start(); + tg2.start(); + tg3.start(); + tg4.start(); + tg5.start(); + tg1.join(); + tg2.join(); + tg3.join(); + tg4.join(); + tg5.join(); + assertEquals(2, + tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException + + tg4.getServerBusyException + tg5.getServerBusyException); + + // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at + // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException. + + TestPutThread tp1 = + new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestPutThread tp2 = + new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestPutThread tp3 = + new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestPutThread tp4 = + new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + TestPutThread tp5 = + new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); + tp1.start(); + tp2.start(); + tp3.start(); + tp4.start(); + tp5.start(); + tp1.join(); + tp2.join(); + tp3.join(); + tp4.join(); + tp5.join(); + assertEquals(2, + tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException + + tp4.getServerBusyException + tp5.getServerBusyException); + } }