[GitHub] [hbase] Apache9 commented on a change in pull request #1593: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry …

2020-05-04 Thread GitBox


Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419840454



##
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
 return String.format("%s:%d", hostname, port);
   }
 
-  /**
-   * @return Stub needed to make RPC using a hedged channel to the master end 
points.
-   */
-  private ClientMetaService.Interface getMasterStub() throws IOException {
-return ClientMetaService.newStub(
-rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), 
rpcTimeoutMs));
+  @FunctionalInterface
+  private interface Callable {
+void call(HBaseRpcController controller, ClientMetaService.Interface stub, 
RpcCallback done);
   }
 
-  /**
-   * Parses the list of master addresses from the provided configuration. 
Supported format is
-   * comma separated host[:port] values. If no port number if specified, 
default master port is
-   * assumed.
-   * @param conf Configuration to parse from.
-   */
-  private void parseMasterAddrs(Configuration conf) throws 
UnknownHostException {
-String configuredMasters = getMasterAddr(conf);
-for (String masterAddr: 
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
-  HostAndPort masterHostPort =
-  
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
-  masterServers.add(ServerName.valueOf(masterHostPort.toString(), 
ServerName.NON_STARTCODE));
-}
-Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master 
address is needed");
+  private  CompletableFuture 
call(ClientMetaService.Interface stub,
+Callable callable) {
+HBaseRpcController controller = rpcControllerFactory.newController();
+CompletableFuture future = new CompletableFuture<>();
+callable.call(controller, stub, resp -> {
+  if (controller.failed()) {
+future.completeExceptionally(controller.getFailed());
+  } else {
+future.complete(resp);
+  }
+});
+return future;
   }
 
-  @VisibleForTesting
-  public Set getParsedMasterServers() {
-return Collections.unmodifiableSet(masterServers);
+  private IOException badResponse(String debug) {
+return new IOException(String.format("Invalid result for request %s. Will 
be retried", debug));
   }
 
-  /**
-   * Returns a call back that can be passed along to the non-blocking rpc 
call. It is invoked once
-   * the rpc finishes and the response is propagated to the passed future.
-   * @param future Result future to which the rpc response is propagated.
-   * @param isValidResp Checks if the rpc response has a valid result.
-   * @param transformResult Transforms the result to a different form as 
expected by callers.
-   * @param hrc RpcController instance for this rpc.
-   * @param debug Debug message passed along to the caller in case of 
exceptions.
-   * @param  RPC result type.
-   * @param  Transformed type of the result.
-   * @return A call back that can be embedded in the non-blocking rpc call.
-   */
-  private  RpcCallback getRpcCallBack(CompletableFuture future,
-  Predicate isValidResp, Function transformResult, 
HBaseRpcController hrc,
-  final String debug) {
-return rpcResult -> {
-  if (rpcResult == null) {
-future.completeExceptionally(
-new MasterRegistryFetchException(masterServers, hrc.getFailed()));
-return;
-  }
-  if (!isValidResp.test(rpcResult)) {
-// Rpc returned ok, but result was malformed.
-future.completeExceptionally(new IOException(
-String.format("Invalid result for request %s. Will be retried", 
debug)));
-return;
-  }
-  future.complete(transformResult.apply(rpcResult));
-};
+  // send requests concurrently to hedgedReadsFanout masters
+  private  void groupCall(CompletableFuture future, int 
startIndexInclusive,

Review comment:
   I added more comments for this method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hbase] Apache9 commented on a change in pull request #1593: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry …

2020-05-04 Thread GitBox


Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419840354



##
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
 return String.format("%s:%d", hostname, port);
   }
 
-  /**
-   * @return Stub needed to make RPC using a hedged channel to the master end 
points.
-   */
-  private ClientMetaService.Interface getMasterStub() throws IOException {
-return ClientMetaService.newStub(
-rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), 
rpcTimeoutMs));
+  @FunctionalInterface
+  private interface Callable {
+void call(HBaseRpcController controller, ClientMetaService.Interface stub, 
RpcCallback done);
   }
 
-  /**
-   * Parses the list of master addresses from the provided configuration. 
Supported format is
-   * comma separated host[:port] values. If no port number if specified, 
default master port is
-   * assumed.
-   * @param conf Configuration to parse from.
-   */
-  private void parseMasterAddrs(Configuration conf) throws 
UnknownHostException {
-String configuredMasters = getMasterAddr(conf);
-for (String masterAddr: 
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
-  HostAndPort masterHostPort =
-  
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
-  masterServers.add(ServerName.valueOf(masterHostPort.toString(), 
ServerName.NON_STARTCODE));
-}
-Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master 
address is needed");
+  private  CompletableFuture 
call(ClientMetaService.Interface stub,
+Callable callable) {
+HBaseRpcController controller = rpcControllerFactory.newController();
+CompletableFuture future = new CompletableFuture<>();
+callable.call(controller, stub, resp -> {
+  if (controller.failed()) {
+future.completeExceptionally(controller.getFailed());
+  } else {
+future.complete(resp);
+  }
+});
+return future;
   }
 
-  @VisibleForTesting
-  public Set getParsedMasterServers() {
-return Collections.unmodifiableSet(masterServers);
+  private IOException badResponse(String debug) {
+return new IOException(String.format("Invalid result for request %s. Will 
be retried", debug));
   }
 
-  /**
-   * Returns a call back that can be passed along to the non-blocking rpc 
call. It is invoked once
-   * the rpc finishes and the response is propagated to the passed future.
-   * @param future Result future to which the rpc response is propagated.
-   * @param isValidResp Checks if the rpc response has a valid result.
-   * @param transformResult Transforms the result to a different form as 
expected by callers.
-   * @param hrc RpcController instance for this rpc.
-   * @param debug Debug message passed along to the caller in case of 
exceptions.
-   * @param  RPC result type.
-   * @param  Transformed type of the result.
-   * @return A call back that can be embedded in the non-blocking rpc call.
-   */
-  private  RpcCallback getRpcCallBack(CompletableFuture future,
-  Predicate isValidResp, Function transformResult, 
HBaseRpcController hrc,
-  final String debug) {
-return rpcResult -> {
-  if (rpcResult == null) {
-future.completeExceptionally(
-new MasterRegistryFetchException(masterServers, hrc.getFailed()));
-return;
-  }
-  if (!isValidResp.test(rpcResult)) {
-// Rpc returned ok, but result was malformed.
-future.completeExceptionally(new IOException(
-String.format("Invalid result for request %s. Will be retried", 
debug)));
-return;
-  }
-  future.complete(transformResult.apply(rpcResult));
-};
+  // send requests concurrently to hedgedReadsFanout masters
+  private  void groupCall(CompletableFuture future, int 
startIndexInclusive,
+Callable callable, Predicate isValidResp, String debug,
+ConcurrentLinkedQueue errors) {
+int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, 
masterStubs.size());
+AtomicInteger remaining = new AtomicInteger(endIndexExclusive - 
startIndexInclusive);
+for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+  addListener(call(masterStubs.get(i), callable), (r, e) -> {
+// a simple check to skip all the later operations earlier
+if (future.isDone()) {
+  return;
+}
+if (e == null && !isValidResp.test(r)) {
+  e = badResponse(debug);
+}
+if (e != null) {
+  // make sure when remaining reaches 0 we have all exceptions in the 
errors queue
+  errors.add(e);
+  if (remaining.decrementAndGet() == 0) {
+if (endIndexExclusive == masterStubs.size()) {
+  // we are done, complete the 

[GitHub] [hbase] Apache9 commented on a change in pull request #1593: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry …

2020-05-04 Thread GitBox


Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419820280



##
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
 return String.format("%s:%d", hostname, port);
   }
 
-  /**
-   * @return Stub needed to make RPC using a hedged channel to the master end 
points.
-   */
-  private ClientMetaService.Interface getMasterStub() throws IOException {
-return ClientMetaService.newStub(
-rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), 
rpcTimeoutMs));
+  @FunctionalInterface
+  private interface Callable {

Review comment:
   We have a RpcCallback in the parameters so I think it is enough to show 
that this is asynchronous?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hbase] Apache9 commented on a change in pull request #1593: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry …

2020-05-04 Thread GitBox


Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419819483



##
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
 return String.format("%s:%d", hostname, port);
   }
 
-  /**
-   * @return Stub needed to make RPC using a hedged channel to the master end 
points.
-   */
-  private ClientMetaService.Interface getMasterStub() throws IOException {
-return ClientMetaService.newStub(
-rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), 
rpcTimeoutMs));
+  @FunctionalInterface
+  private interface Callable {
+void call(HBaseRpcController controller, ClientMetaService.Interface stub, 
RpcCallback done);
   }
 
-  /**
-   * Parses the list of master addresses from the provided configuration. 
Supported format is
-   * comma separated host[:port] values. If no port number if specified, 
default master port is
-   * assumed.
-   * @param conf Configuration to parse from.
-   */
-  private void parseMasterAddrs(Configuration conf) throws 
UnknownHostException {
-String configuredMasters = getMasterAddr(conf);
-for (String masterAddr: 
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
-  HostAndPort masterHostPort =
-  
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
-  masterServers.add(ServerName.valueOf(masterHostPort.toString(), 
ServerName.NON_STARTCODE));
-}
-Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master 
address is needed");
+  private  CompletableFuture 
call(ClientMetaService.Interface stub,
+Callable callable) {
+HBaseRpcController controller = rpcControllerFactory.newController();
+CompletableFuture future = new CompletableFuture<>();
+callable.call(controller, stub, resp -> {
+  if (controller.failed()) {
+future.completeExceptionally(controller.getFailed());
+  } else {
+future.complete(resp);
+  }
+});
+return future;
   }
 
-  @VisibleForTesting
-  public Set getParsedMasterServers() {
-return Collections.unmodifiableSet(masterServers);
+  private IOException badResponse(String debug) {
+return new IOException(String.format("Invalid result for request %s. Will 
be retried", debug));
   }
 
-  /**
-   * Returns a call back that can be passed along to the non-blocking rpc 
call. It is invoked once
-   * the rpc finishes and the response is propagated to the passed future.
-   * @param future Result future to which the rpc response is propagated.
-   * @param isValidResp Checks if the rpc response has a valid result.
-   * @param transformResult Transforms the result to a different form as 
expected by callers.
-   * @param hrc RpcController instance for this rpc.
-   * @param debug Debug message passed along to the caller in case of 
exceptions.
-   * @param  RPC result type.
-   * @param  Transformed type of the result.
-   * @return A call back that can be embedded in the non-blocking rpc call.
-   */
-  private  RpcCallback getRpcCallBack(CompletableFuture future,
-  Predicate isValidResp, Function transformResult, 
HBaseRpcController hrc,
-  final String debug) {
-return rpcResult -> {
-  if (rpcResult == null) {
-future.completeExceptionally(
-new MasterRegistryFetchException(masterServers, hrc.getFailed()));
-return;
-  }
-  if (!isValidResp.test(rpcResult)) {
-// Rpc returned ok, but result was malformed.
-future.completeExceptionally(new IOException(
-String.format("Invalid result for request %s. Will be retried", 
debug)));
-return;
-  }
-  future.complete(transformResult.apply(rpcResult));
-};
+  // send requests concurrently to hedgedReadsFanout masters
+  private  void groupCall(CompletableFuture future, int 
startIndexInclusive,
+Callable callable, Predicate isValidResp, String debug,
+ConcurrentLinkedQueue errors) {
+int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, 
masterStubs.size());
+AtomicInteger remaining = new AtomicInteger(endIndexExclusive - 
startIndexInclusive);
+for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+  addListener(call(masterStubs.get(i), callable), (r, e) -> {
+// a simple check to skip all the later operations earlier
+if (future.isDone()) {
+  return;
+}
+if (e == null && !isValidResp.test(r)) {
+  e = badResponse(debug);

Review comment:
   The message is already contained in the exception?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For 

[GitHub] [hbase] Apache9 commented on a change in pull request #1593: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry …

2020-05-04 Thread GitBox


Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419801204



##
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
##
@@ -367,105 +363,6 @@ public void testAsyncEcho() throws IOException {
 }
   }
 
-  /**
-   * Tests the various request fan out values using a simple RPC hedged across 
a mix of running and
-   * failing servers.
-   */
-  @Test
-  @Ignore
-  public void testHedgedAsyncEcho() throws Exception {
-// Hedging is not supported for blocking connection types.
-Assume.assumeFalse(this instanceof TestBlockingIPC);
-List rpcServers = new ArrayList<>();
-List addresses = new ArrayList<>();
-// Create a mix of running and failing servers.
-final int numRunningServers = 5;
-final int numFailingServers = 3;
-final int numServers = numRunningServers + numFailingServers;
-for (int i = 0; i < numRunningServers; i++) {
-  RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
-  Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-  SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-  new FifoRpcScheduler(CONF, 1));
-  rpcServer.start();
-  addresses.add(rpcServer.getListenerAddress());
-  rpcServers.add(rpcServer);
-}
-for (int i = 0; i < numFailingServers; i++) {
-  RpcServer rpcServer = createTestFailingRpcServer(null, 
"testFailingRpcServer" + i,
-  Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-  SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-  new FifoRpcScheduler(CONF, 1));
-  rpcServer.start();
-  addresses.add(rpcServer.getListenerAddress());
-  rpcServers.add(rpcServer);
-}
-Configuration conf = HBaseConfiguration.create();
-try (AbstractRpcClient client = createRpcClient(conf)) {
-  // Try out various fan out values starting from 1 -> numServers.
-  for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
-// Update the client's underlying conf, should be ok for the test.
-LOG.debug("Testing with request fan out: " + reqFanOut);
-conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
-Interface stub = newStub(client, addresses);
-BlockingRpcCallback done = new 
BlockingRpcCallback<>();
-stub.echo(new HBaseRpcControllerImpl(),
-EchoRequestProto.newBuilder().setMessage("hello").build(), done);
-TestProtos.EchoResponseProto responseProto = done.get();
-assertNotNull(responseProto);
-assertEquals("hello", responseProto.getMessage());
-LOG.debug("Ended test with request fan out: " + reqFanOut);
-  }
-} finally {
-  for (RpcServer rpcServer: rpcServers) {
-rpcServer.stop();
-  }
-}
-  }
-
-  @Test
-  public void testHedgedAsyncTimeouts() throws Exception {
-// Hedging is not supported for blocking connection types.
-Assume.assumeFalse(this instanceof TestBlockingIPC);
-List rpcServers = new ArrayList<>();
-List addresses = new ArrayList<>();
-final int numServers = 3;
-for (int i = 0; i < numServers; i++) {
-  RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
-  Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-  SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-  new FifoRpcScheduler(CONF, 1));
-  rpcServer.start();
-  addresses.add(rpcServer.getListenerAddress());
-  rpcServers.add(rpcServer);
-}
-Configuration conf = HBaseConfiguration.create();
-int timeout = 100;
-int pauseTime = 1000;
-try (AbstractRpcClient client = createRpcClient(conf)) {
-  // Try out various fan out values starting from 1 -> numServers.
-  for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
-// Update the client's underlying conf, should be ok for the test.
-LOG.debug("Testing with request fan out: " + reqFanOut);
-conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
-Interface stub = newStub(client, addresses);
-HBaseRpcController pcrc = new HBaseRpcControllerImpl();
-pcrc.setCallTimeout(timeout);
-BlockingRpcCallback callback = new 
BlockingRpcCallback<>();
-stub.pause(pcrc, 
PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
-assertNull(callback.get());
-// Make sure the controller has the right exception propagated.
-assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
-LOG.debug("Ended test with request fan out: " + reqFanOut);
-  }
-} finally {
-  for (RpcServer rpcServer: rpcServers) {
-rpcServer.stop();
-  }
-}
-  }
-
-

Review comment:
   We do not have hedge reads support for rpc so just removed them. Let me 
see if we can directly 

[GitHub] [hbase] Apache9 commented on a change in pull request #1593: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry …

2020-05-04 Thread GitBox


Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419800630



##
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf) 
throws UnknownHostExcepti
 return String.format("%s:%d", hostname, port);
   }
 
-  /**
-   * @return Stub needed to make RPC using a hedged channel to the master end 
points.
-   */
-  private ClientMetaService.Interface getMasterStub() throws IOException {
-return ClientMetaService.newStub(
-rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), 
rpcTimeoutMs));
+  @FunctionalInterface
+  private interface Callable {
+void call(HBaseRpcController controller, ClientMetaService.Interface stub, 
RpcCallback done);
   }
 
-  /**
-   * Parses the list of master addresses from the provided configuration. 
Supported format is
-   * comma separated host[:port] values. If no port number if specified, 
default master port is
-   * assumed.
-   * @param conf Configuration to parse from.
-   */
-  private void parseMasterAddrs(Configuration conf) throws 
UnknownHostException {
-String configuredMasters = getMasterAddr(conf);
-for (String masterAddr: 
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
-  HostAndPort masterHostPort =
-  
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
-  masterServers.add(ServerName.valueOf(masterHostPort.toString(), 
ServerName.NON_STARTCODE));
-}
-Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master 
address is needed");
+  private  CompletableFuture 
call(ClientMetaService.Interface stub,
+Callable callable) {
+HBaseRpcController controller = rpcControllerFactory.newController();
+CompletableFuture future = new CompletableFuture<>();
+callable.call(controller, stub, resp -> {
+  if (controller.failed()) {
+future.completeExceptionally(controller.getFailed());
+  } else {
+future.complete(resp);
+  }
+});
+return future;
   }
 
-  @VisibleForTesting
-  public Set getParsedMasterServers() {
-return Collections.unmodifiableSet(masterServers);
+  private IOException badResponse(String debug) {
+return new IOException(String.format("Invalid result for request %s. Will 
be retried", debug));
   }
 
-  /**
-   * Returns a call back that can be passed along to the non-blocking rpc 
call. It is invoked once
-   * the rpc finishes and the response is propagated to the passed future.
-   * @param future Result future to which the rpc response is propagated.
-   * @param isValidResp Checks if the rpc response has a valid result.
-   * @param transformResult Transforms the result to a different form as 
expected by callers.
-   * @param hrc RpcController instance for this rpc.
-   * @param debug Debug message passed along to the caller in case of 
exceptions.
-   * @param  RPC result type.
-   * @param  Transformed type of the result.
-   * @return A call back that can be embedded in the non-blocking rpc call.
-   */
-  private  RpcCallback getRpcCallBack(CompletableFuture future,
-  Predicate isValidResp, Function transformResult, 
HBaseRpcController hrc,
-  final String debug) {
-return rpcResult -> {
-  if (rpcResult == null) {
-future.completeExceptionally(
-new MasterRegistryFetchException(masterServers, hrc.getFailed()));
-return;
-  }
-  if (!isValidResp.test(rpcResult)) {
-// Rpc returned ok, but result was malformed.
-future.completeExceptionally(new IOException(
-String.format("Invalid result for request %s. Will be retried", 
debug)));
-return;
-  }
-  future.complete(transformResult.apply(rpcResult));
-};
+  // send requests concurrently to hedgedReadsFanout masters
+  private  void groupCall(CompletableFuture future, int 
startIndexInclusive,
+Callable callable, Predicate isValidResp, String debug,
+ConcurrentLinkedQueue errors) {
+int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, 
masterStubs.size());
+AtomicInteger remaining = new AtomicInteger(endIndexExclusive - 
startIndexInclusive);
+for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+  addListener(call(masterStubs.get(i), callable), (r, e) -> {
+// a simple check to skip all the later operations earlier
+if (future.isDone()) {
+  return;
+}
+if (e == null && !isValidResp.test(r)) {
+  e = badResponse(debug);
+}
+if (e != null) {
+  // make sure when remaining reaches 0 we have all exceptions in the 
errors queue
+  errors.add(e);
+  if (remaining.decrementAndGet() == 0) {
+if (endIndexExclusive == masterStubs.size()) {
+  // we are done, complete the 

[GitHub] [hbase] Apache9 commented on a change in pull request #1593: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry …

2020-05-04 Thread GitBox


Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419799798



##
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##
@@ -61,53 +65,79 @@
 /**
  * Master based registry implementation. Makes RPCs to the configured master 
addresses from config
  * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
- *
+ * 
  * It supports hedged reads, which can be enabled by setting
  * {@value 
org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to 
True. Fan
  * out the requests batch is controlled by
  * {@value 
org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
- *
+ * 
  * TODO: Handle changes to the configuration dynamically without having to 
restart the client.
  */
 @InterfaceAudience.Private
 public class MasterRegistry implements ConnectionRegistry {
+
+  /** Configuration key that controls the fan out of requests **/
+  public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
+"hbase.client.master_registry.hedged.fanout";
+
+  /** Default value for the fan out of hedged requests. **/
+  public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
+
   private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
 
+  private final int hedgedReadFanOut;
+
   // Configured list of masters to probe the meta information from.
-  private final Set masterServers;
+  private final Set masterAddrs;
+
+  private final List masterStubs;
 
   // RPC client used to talk to the masters.
   private final RpcClient rpcClient;
   private final RpcControllerFactory rpcControllerFactory;
-  private final int rpcTimeoutMs;
-
-  MasterRegistry(Configuration conf) throws UnknownHostException {
-boolean hedgedReadsEnabled = 
conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
-MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
-Configuration finalConf;
-if (!hedgedReadsEnabled) {
-  // If hedged reads are disabled, it is equivalent to setting a fan out 
of 1. We make a copy of
-  // the configuration so that other places reusing this reference is not 
affected.
-  finalConf = new Configuration(conf);
-  finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
-} else {
-  finalConf = conf;
+
+  /**
+   * Parses the list of master addresses from the provided configuration. 
Supported format is comma
+   * separated host[:port] values. If no port number if specified, default 
master port is assumed.
+   * @param conf Configuration to parse from.
+   */
+  private static Set parseMasterAddrs(Configuration conf) throws 
UnknownHostException {
+Set masterAddrs = new HashSet<>();
+String configuredMasters = getMasterAddr(conf);
+for (String masterAddr : 
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
+  HostAndPort masterHostPort =
+
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
+  masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), 
ServerName.NON_STARTCODE));
 }
-if (conf.get(MASTER_ADDRS_KEY) != null) {
-  finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY));
+Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master 
address is needed");
+return masterAddrs;
+  }
+
+  MasterRegistry(Configuration conf) throws IOException {
+this.hedgedReadFanOut = conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+  MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
+int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+  conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+// XXX: we pass cluster id as null here since we do not have a cluster id 
yet, we have to fetch

Review comment:
   Just because we do not have a test where MasterRegistry is enabled and 
we use cluster id to select authentication. Most tests in HBase do not need 
authentication.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org