[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-27 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r251273506
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -825,6 +843,12 @@ protected void closeTaskManagerConnection(final 
ResourceID resourceID, final Exc

slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
 

workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+   failedContainerSoFar.getAndAdd(1);
+   if (failedContainerSoFar.intValue() >= 
maximumAllowedTaskManagerFailureCount) {
+   rejectAllPendingSlotRequests(new 
MaximumFailedTaskManagerExceedingException(
+   String.format("Maximum number of failed 
container %d "
+   + "is detected in Resource 
Manager", failedContainerSoFar.intValue(;
+   }
 
 Review comment:
   Agree. Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-27 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r251273420
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -521,4 +526,57 @@ public void testOnContainerCompleted() throws Exception {
});
}};
}
+
+   /**
+*  Tests that YarnResourceManager will trigger to reject all 
pending slot request, when maximum number of failed
+*  contains is hit.
+*/
+   @Test
+   public void testOnContainersAllocatedWithFailure() throws Exception {
+   new Context() {{
+   runTest(() -> {
+   CompletableFuture registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+
+   // Callback from YARN when container is 
allocated.
+   Container disconnectedContainer1 = 
mockContainer("container1", 1234, 1, resourceManager.getContainerResource());
+
+   
doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest(
+   
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), 
anyString(), any(Resource.class));
+
+   
resourceManager.onContainersAllocated(ImmutableList.of(disconnectedContainer1));
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(disconnectedContainer1), 
any(ContainerLaunchContext.class));
+
+   ResourceID connectedTM = new 
ResourceID(disconnectedContainer1.getId().toString());
+
+   
resourceManager.registerTaskExecutor("container1", connectedTM, 1234,
+   hardwareDescription, Time.seconds(10L));
+
+   // force to unregister the task manager
+   
resourceManager.disconnectTaskManager(connectedTM, new TimeoutException());
+
+   // request second slot
+   registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   Container failedContainer = 
mockContainer("container2", 2345, 2, resourceManager.getContainerResource());
+   
when(mockNMClient.startContainer(eq(failedContainer), any())).thenThrow(new 
YarnException("Failed"));
+   
resourceManager.onContainersAllocated(ImmutableList.of(failedContainer));
+   verify(rmServices.slotManager, times(1))
+   
.rejectAllPendingSlotRequests(any(MaximumFailedTaskManagerExceedingException.class));
 
 Review comment:
   Agree. Changed accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-27 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r251273483
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -301,12 +301,25 @@ public boolean registerSlotRequest(SlotRequest 
slotRequest) throws SlotManagerEx
}
 
/**
-* Cancels and removes a pending slot request with the given allocation 
id. If there is no such
-* pending request, then nothing is done.
-*
-* @param allocationId identifying the pending slot request
-* @return True if a pending slot request was found; otherwise false
+* Rejects all pending slot requests.
+* @param cause the exception caused the rejection
 */
+   public void rejectAllPendingSlotRequests(Exception cause) {
+   for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+   rejectPendingSlotRequest(pendingSlotRequest, cause);
+   }
+
+   pendingSlotRequests.clear();
+   }
 
 Review comment:
   It can be checked by verifying the pendingSlotRequests. Added in test case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-10 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246860798
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -512,4 +517,53 @@ public void testOnContainerCompleted() throws Exception {
});
}};
}
+
+   /**
+*  Tests that YarnResourceManager will trigger to reject all 
pending slot request, when maximum number of failed
+*  contains is hit.
+*/
+   @Test
+   public void testOnContainersAllocatedWithFailure() throws Exception {
+   new Context() {{
+   runTest(() -> {
+   CompletableFuture registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+
+   // Callback from YARN when container is 
allocated.
+   Container disconnectedContainer1 = 
mockContainer("container1", 1234, 1);
+   
resourceManager.onContainersAllocated(ImmutableList.of(disconnectedContainer1));
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(disconnectedContainer1), 
any(ContainerLaunchContext.class));
+
+   ResourceID connectedTM = new 
ResourceID(disconnectedContainer1.getId().toString());
+
+   
resourceManager.registerTaskExecutor("container1", connectedTM, 1234,
+   hardwareDescription, Time.seconds(10L));
+
+   // force to unregister the task manager
+   
resourceManager.disconnectTaskManager(connectedTM, new TimeoutException());
+
+   // request second slot
+   registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
 
 Review comment:
   Good suggestion. But it is SlotPool's PendingRequest will 
completeExceptionally, but this slot request instance right? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246652721
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/MaximumFailedContainersException.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+
+/**
+ * Exception for {@link ResourceManager} when it identified that the maximum 
number of failed containers is hit.
+ */
+public class MaximumFailedContainersException extends ResourceManagerException 
{
 
 Review comment:
   Good suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246652065
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -825,6 +837,12 @@ protected void closeTaskManagerConnection(final 
ResourceID resourceID, final Exc

slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
 

workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+   failedContainerSoFar.getAndAdd(1);
+   if (failedContainerSoFar.intValue() >= 
maxFailedContainers) {
+   rejectAllPendingSlotRequests(new 
MaximumFailedContainersException(
 
 Review comment:
   No need. If other TMs can be still alive for restart strategy to make 
decision about whether to fail whole job. For per job cluster, if the job 
failed, the cluster will terminate by itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246646421
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -145,6 +147,12 @@
/** All registered listeners for status updates of the ResourceManager. 
*/
private ConcurrentMap 
infoMessageListeners;
 
+   /** The number of failed containers since the master became active. */
+   protected AtomicInteger failedContainerSoFar = new AtomicInteger(0);
+
+   /** Number of failed TaskManager containers before stopping the 
application. Default is  Integer.MAX_VALUE */
+   protected int maxFailedContainers = Integer.MAX_VALUE;
 
 Review comment:
   Agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-03 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r245161387
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -381,11 +393,20 @@ public void onContainersAllocated(List 
containers) {
} catch (Throwable t) {
log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
 
+   
failedContainerSoFar.getAndAdd(1);
// release the failed container

workerNodeMap.remove(resourceId);

resourceManagerClient.releaseAssignedContainer(container.getId());
-   // and ask for a new one
-   
requestYarnContainerIfRequired(container.getResource(), 
container.getPriority());
+
+   if 
(failedContainerSoFar.intValue() < maxFailedContainers) {
 
 Review comment:
   Yes, you are right. Will change also.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-03 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r245161384
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -145,6 +146,9 @@
/** All registered listeners for status updates of the ResourceManager. 
*/
private ConcurrentMap 
infoMessageListeners;
 
+   /** The number of failed containers since the master became active. */
+   protected AtomicInteger failedContainerSoFar = new AtomicInteger(0);
 
 Review comment:
   There are three code paths that will create failed container. One is the 
container can't be started from YARN, which is notified by NMClient callback, 
the other two are failed to registerTastManager and disconnectTaskManager (When 
tasks fail). Thus, the concurrent is between akka thread and main thread? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services