[
https://issues.apache.org/jira/browse/YARN-11238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607305#comment-17607305
]
ASF GitHub Bot commented on YARN-11238:
---------------------------------------
goiri commented on code in PR #4904:
URL: https://github.com/apache/hadoop/pull/4904#discussion_r975598672
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -773,24 +761,19 @@ public GetClusterNodesResponse
getClusterNodes(GetClusterNodesRequest request)
RouterServerUtil.logAndThrowException("Missing getClusterNodes
request.", null);
}
long startTime = clock.getTime();
- Map<SubClusterId, SubClusterInfo> subClusters =
- federationFacade.getSubClusters(true);
- Map<SubClusterId, GetClusterNodesResponse> clusterNodes =
Maps.newHashMap();
- for (SubClusterId subClusterId : subClusters.keySet()) {
- ApplicationClientProtocol client;
- try {
- client = getClientRMProxyForSubCluster(subClusterId);
- GetClusterNodesResponse response = client.getClusterNodes(request);
- clusterNodes.put(subClusterId, response);
- } catch (Exception ex) {
- routerMetrics.incrClusterNodesFailedRetrieved();
- RouterServerUtil.logAndThrowException("Unable to get cluster nodes due
to exception.", ex);
- }
+ ClientMethod remoteMethod = new ClientMethod("getClusterNodes",
+ new Class[]{GetClusterNodesRequest.class}, new Object[]{request});
+ Collection<GetClusterNodesResponse> clusterNodes = null;
+ try {
+ clusterNodes = invokeConcurrent(remoteMethod,
GetClusterNodesResponse.class);
Review Comment:
Can we define and return inside the try?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
}
/**
+ * Get Active's SubClusterIds{@link SubClusterId}.
+ *
+ * @return SubClusterId Collection.
+ * @throws YarnException if the call to get active subClusterIds is
unsuccessful
+ */
+ public Collection<SubClusterId> getActiveSubClusterIds() throws
YarnException {
+ Map<SubClusterId, SubClusterInfo> activeSubClusters =
+ getSubClusters(true);
Review Comment:
Can we put the true in a variable to make it easier to understand what the
true represents?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
return RouterYarnClientUtils.merge(clusterMetrics);
}
- <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
- ClientMethod request, Class<R> clazz) throws YarnException, IOException {
- List<Callable<Object>> callables = new ArrayList<>();
- List<Future<Object>> futures = new ArrayList<>();
- Map<SubClusterId, IOException> exceptions = new TreeMap<>();
- for (SubClusterId subClusterId : clusterIds) {
- callables.add(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- ApplicationClientProtocol protocol =
- getClientRMProxyForSubCluster(subClusterId);
- Method method = ApplicationClientProtocol.class
- .getMethod(request.getMethodName(), request.getTypes());
- return method.invoke(protocol, request.getParams());
- }
+ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+ throws YarnException {
+
+ Collection<SubClusterId> subClusterIds =
federationFacade.getActiveSubClusterIds();
+
+ List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+ List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+ Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+ // Generate parallel Callable tasks
+ for (SubClusterId subClusterId : subClusterIds) {
+ callables.add(() -> {
+ ApplicationClientProtocol protocol =
Review Comment:
Single line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
}
/**
+ * Get Active's SubClusterIds{@link SubClusterId}.
+ *
+ * @return SubClusterId Collection.
+ * @throws YarnException if the call to get active subClusterIds is
unsuccessful
+ */
+ public Collection<SubClusterId> getActiveSubClusterIds() throws
YarnException {
+ Map<SubClusterId, SubClusterInfo> activeSubClusters =
Review Comment:
Single line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
return RouterYarnClientUtils.merge(clusterMetrics);
}
- <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
- ClientMethod request, Class<R> clazz) throws YarnException, IOException {
- List<Callable<Object>> callables = new ArrayList<>();
- List<Future<Object>> futures = new ArrayList<>();
- Map<SubClusterId, IOException> exceptions = new TreeMap<>();
- for (SubClusterId subClusterId : clusterIds) {
- callables.add(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- ApplicationClientProtocol protocol =
- getClientRMProxyForSubCluster(subClusterId);
- Method method = ApplicationClientProtocol.class
- .getMethod(request.getMethodName(), request.getTypes());
- return method.invoke(protocol, request.getParams());
- }
+ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+ throws YarnException {
+
+ Collection<SubClusterId> subClusterIds =
federationFacade.getActiveSubClusterIds();
+
+ List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+ List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+ Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+ // Generate parallel Callable tasks
+ for (SubClusterId subClusterId : subClusterIds) {
+ callables.add(() -> {
+ ApplicationClientProtocol protocol =
+ getClientRMProxyForSubCluster(subClusterId);
+ Method method = ApplicationClientProtocol.class
Review Comment:
Single line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception
{
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class},
new Object[] {GetClusterMetricsRequest.newInstance()});
- Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
- invokeConcurrent(new ArrayList<>(), remoteMethod,
GetClusterMetricsResponse.class);
- Assert.assertTrue(clusterMetrics.isEmpty());
+ Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+ invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+ Assert.assertTrue(!clusterMetrics.isEmpty());
Review Comment:
Now the expectation is the opposite? What has changed?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception
{
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class},
new Object[] {GetClusterMetricsRequest.newInstance()});
- Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
- invokeConcurrent(new ArrayList<>(), remoteMethod,
GetClusterMetricsResponse.class);
- Assert.assertTrue(clusterMetrics.isEmpty());
+ Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+ invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+ Assert.assertTrue(!clusterMetrics.isEmpty());
Review Comment:
assertFalse
> Optimizing FederationClientInterceptor Call with Parallelism
> ------------------------------------------------------------
>
> Key: YARN-11238
> URL: https://issues.apache.org/jira/browse/YARN-11238
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: federation
> Affects Versions: 3.4.0, 3.3.4
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]