[ 
https://issues.apache.org/jira/browse/YARN-11238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607305#comment-17607305
 ] 

ASF GitHub Bot commented on YARN-11238:
---------------------------------------

goiri commented on code in PR #4904:
URL: https://github.com/apache/hadoop/pull/4904#discussion_r975598672


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -773,24 +761,19 @@ public GetClusterNodesResponse 
getClusterNodes(GetClusterNodesRequest request)
       RouterServerUtil.logAndThrowException("Missing getClusterNodes 
request.", null);
     }
     long startTime = clock.getTime();
-    Map<SubClusterId, SubClusterInfo> subClusters =
-        federationFacade.getSubClusters(true);
-    Map<SubClusterId, GetClusterNodesResponse> clusterNodes = 
Maps.newHashMap();
-    for (SubClusterId subClusterId : subClusters.keySet()) {
-      ApplicationClientProtocol client;
-      try {
-        client = getClientRMProxyForSubCluster(subClusterId);
-        GetClusterNodesResponse response = client.getClusterNodes(request);
-        clusterNodes.put(subClusterId, response);
-      } catch (Exception ex) {
-        routerMetrics.incrClusterNodesFailedRetrieved();
-        RouterServerUtil.logAndThrowException("Unable to get cluster nodes due 
to exception.", ex);
-      }
+    ClientMethod remoteMethod = new ClientMethod("getClusterNodes",
+        new Class[]{GetClusterNodesRequest.class}, new Object[]{request});
+    Collection<GetClusterNodesResponse> clusterNodes = null;
+    try {
+      clusterNodes = invokeConcurrent(remoteMethod, 
GetClusterNodesResponse.class);

Review Comment:
   Can we define and return inside the try?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
   }
 
   /**
+   * Get Active's SubClusterIds{@link SubClusterId}.
+   *
+   * @return SubClusterId Collection.
+   * @throws YarnException if the call to get active subClusterIds is 
unsuccessful
+   */
+  public Collection<SubClusterId> getActiveSubClusterIds() throws 
YarnException {
+    Map<SubClusterId, SubClusterInfo> activeSubClusters =
+        getSubClusters(true);

Review Comment:
   Can we put the true in a variable to make it easier to understand what the 
true represents?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
     return RouterYarnClientUtils.merge(clusterMetrics);
   }
 
-  <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
-      ClientMethod request, Class<R> clazz) throws YarnException, IOException {
-    List<Callable<Object>> callables = new ArrayList<>();
-    List<Future<Object>> futures = new ArrayList<>();
-    Map<SubClusterId, IOException> exceptions = new TreeMap<>();
-    for (SubClusterId subClusterId : clusterIds) {
-      callables.add(new Callable<Object>() {
-        @Override
-        public Object call() throws Exception {
-          ApplicationClientProtocol protocol =
-              getClientRMProxyForSubCluster(subClusterId);
-          Method method = ApplicationClientProtocol.class
-              .getMethod(request.getMethodName(), request.getTypes());
-          return method.invoke(protocol, request.getParams());
-        }
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+      throws YarnException {
+
+    Collection<SubClusterId> subClusterIds = 
federationFacade.getActiveSubClusterIds();
+
+    List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+    List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+    Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+    // Generate parallel Callable tasks
+    for (SubClusterId subClusterId : subClusterIds) {
+      callables.add(() -> {
+        ApplicationClientProtocol protocol =

Review Comment:
   Single line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
   }
 
   /**
+   * Get Active's SubClusterIds{@link SubClusterId}.
+   *
+   * @return SubClusterId Collection.
+   * @throws YarnException if the call to get active subClusterIds is 
unsuccessful
+   */
+  public Collection<SubClusterId> getActiveSubClusterIds() throws 
YarnException {
+    Map<SubClusterId, SubClusterInfo> activeSubClusters =

Review Comment:
   Single line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
     return RouterYarnClientUtils.merge(clusterMetrics);
   }
 
-  <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
-      ClientMethod request, Class<R> clazz) throws YarnException, IOException {
-    List<Callable<Object>> callables = new ArrayList<>();
-    List<Future<Object>> futures = new ArrayList<>();
-    Map<SubClusterId, IOException> exceptions = new TreeMap<>();
-    for (SubClusterId subClusterId : clusterIds) {
-      callables.add(new Callable<Object>() {
-        @Override
-        public Object call() throws Exception {
-          ApplicationClientProtocol protocol =
-              getClientRMProxyForSubCluster(subClusterId);
-          Method method = ApplicationClientProtocol.class
-              .getMethod(request.getMethodName(), request.getTypes());
-          return method.invoke(protocol, request.getParams());
-        }
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+      throws YarnException {
+
+    Collection<SubClusterId> subClusterIds = 
federationFacade.getActiveSubClusterIds();
+
+    List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+    List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+    Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+    // Generate parallel Callable tasks
+    for (SubClusterId subClusterId : subClusterIds) {
+      callables.add(() -> {
+        ApplicationClientProtocol protocol =
+            getClientRMProxyForSubCluster(subClusterId);
+        Method method = ApplicationClientProtocol.class

Review Comment:
   Single line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception 
{
     ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
         new Class[] {GetClusterMetricsRequest.class},
         new Object[] {GetClusterMetricsRequest.newInstance()});
-    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
-        invokeConcurrent(new ArrayList<>(), remoteMethod, 
GetClusterMetricsResponse.class);
-    Assert.assertTrue(clusterMetrics.isEmpty());
+    Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+        invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+    Assert.assertTrue(!clusterMetrics.isEmpty());

Review Comment:
   Now the expectation is the opposite? What has changed?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception 
{
     ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
         new Class[] {GetClusterMetricsRequest.class},
         new Object[] {GetClusterMetricsRequest.newInstance()});
-    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
-        invokeConcurrent(new ArrayList<>(), remoteMethod, 
GetClusterMetricsResponse.class);
-    Assert.assertTrue(clusterMetrics.isEmpty());
+    Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+        invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+    Assert.assertTrue(!clusterMetrics.isEmpty());

Review Comment:
   assertFalse





> Optimizing FederationClientInterceptor Call with Parallelism
> ------------------------------------------------------------
>
>                 Key: YARN-11238
>                 URL: https://issues.apache.org/jira/browse/YARN-11238
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation
>    Affects Versions: 3.4.0, 3.3.4
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to