[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r409483913
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
 ##
 @@ -100,8 +100,10 @@ private MiniClusterConfiguration getMiniClusterConfig(int 
maximumParallelism) {
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
 
-   // we have to use the maximum parallelism as a default here, 
otherwise streaming pipelines would not run
-   int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism);
+   int numSlotsPerTaskManager = 
configuration.getOptional(TaskManagerOptions.NUM_TASK_SLOTS)
+   .orElseGet(() -> maximumParallelism > 0 ?
+   (int) Math.ceil((double) maximumParallelism / 
numTaskManagers) :
 
 Review comment:
   Can't we use `MathUtils.divideRoundUp` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408972482
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication(
 * Allocates a resource using the resource profile.
 *
 * @param resourceProfile The resource description
-* @return Collection of {@link ResourceProfile} describing the 
launched slots
+* @return whether the resource can be allocated
 */
@VisibleForTesting
-   public abstract Collection 
startNewWorker(ResourceProfile resourceProfile);
+   public abstract boolean startNewWorker(ResourceProfile resourceProfile);
 
 Review comment:
   Sounds good to me. Thanks for the background information.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408964659
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -199,6 +200,14 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
}
 
@Override
+   public Map getPendingWorkerNums() {
+   final int pendingWorkerNum = (int) Math.ceil((double) 
pendingSlots.size() / numSlotsPerWorker);
 
 Review comment:
   On the other hand, it complicates the expression a bit. Maybe introducing a 
name for this function `multipleOfAndLargerOrEqualThan(numSlotsPerWorker, 
pendingSlots.size())`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408964926
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -524,10 +524,9 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
private void requestYarnContainerIfRequired() {
-   int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   int pendingTaskManagerSlots = numPendingContainerRequests * 
numSlotsPerTaskManager;
+   int requiredTaskManagers = getNumberRequiredTaskManagers();
 
 Review comment:
   ```suggestion
final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408962507
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -199,6 +200,14 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
}
 
@Override
+   public Map getPendingWorkerNums() {
+   final int pendingWorkerNum = (int) Math.ceil((double) 
pendingSlots.size() / numSlotsPerWorker);
 
 Review comment:
   If we want to get rid of back and forth casting and double calculations:
   ```suggestion
final int pendingWorkerNum = (pendingSlots.size() - 1) / 
numSlotsPerWorker + 1;
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408965071
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -524,10 +524,9 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
private void requestYarnContainerIfRequired() {
-   int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   int pendingTaskManagerSlots = numPendingContainerRequests * 
numSlotsPerTaskManager;
+   int requiredTaskManagers = getNumberRequiredTaskManagers();
 
-   if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+   while (requiredTaskManagers-- > numPendingContainerRequests) {
 
 Review comment:
   ```suggestion
while (requiredTaskManagers > numPendingContainerRequests) {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408959570
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -267,10 +267,9 @@ private void requestKubernetesPod() {
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   final int pendingTaskManagerSlots = numPendingPodRequests * 
numSlotsPerTaskManager;
+   int requiredTaskManagers = getNumberRequiredTaskManagers();
 
 Review comment:
   ```suggestion
final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408959505
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -267,10 +267,9 @@ private void requestKubernetesPod() {
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   final int pendingTaskManagerSlots = numPendingPodRequests * 
numSlotsPerTaskManager;
+   int requiredTaskManagers = getNumberRequiredTaskManagers();
 
-   if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+   while (requiredTaskManagers-- > numPendingPodRequests) {
 
 Review comment:
   ```suggestion
while (requiredTaskManagers > numPendingPodRequests) {
   ```
   I think `requestKubernetesPod` will increase `numPendingPodRequests`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408960934
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -51,7 +53,12 @@
 
int getNumberFreeSlotsOf(InstanceID instanceId);
 
-   int getNumberPendingTaskManagerSlots();
+   /**
+* Get number of workers SlotManager requested from {@link 
ResourceActions} that are not yet fulfilled.
+* @return a map whose key set is all the unique resource specs of the 
pending workers,
+* and the corresponding value is number of pending workers of that 
resource spec.
+*/
+   Map getPendingWorkerNums();
 
 Review comment:
   ```suggestion
Map getRequiredResources();
   ```
   maybe?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-15 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r408960456
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) {
//  Resource Management
// 

 
-   protected int getNumberRequiredTaskManagerSlots() {
-   return slotManager.getNumberPendingTaskManagerSlots();
+   protected int getNumberRequiredTaskManagers() {
+   return getPendingWorkerNums().values().stream().reduce(0, 
Integer::sum);
+   }
+
+   protected Map getPendingWorkerNums() {
+   return slotManager.getPendingWorkerNums();
 
 Review comment:
   Should we rename this method into `getRequiredResources` or so?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-07 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r404630795
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication(
 * Allocates a resource using the resource profile.
 *
 * @param resourceProfile The resource description
-* @return Collection of {@link ResourceProfile} describing the 
launched slots
+* @return whether the resource can be allocated
 */
@VisibleForTesting
-   public abstract Collection 
startNewWorker(ResourceProfile resourceProfile);
+   public abstract boolean startNewWorker(ResourceProfile resourceProfile);
 
 Review comment:
   Hmm, but with this API we will waste resources because the caller thinks it 
has started a worker of the specified size, right? An idea how to improve the 
situation is to let the `startNewWorker` method return the size of the 
container it has started. This would allow the caller to react to it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r404085816
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) {
//  Resource Management
// 

 
-   protected int getNumberRequiredTaskManagerSlots() {
-   return slotManager.getNumberPendingTaskManagerSlots();
+   protected int getNumberRequiredTaskManagers() {
+   return getPendingWorkerNums().values().stream().reduce(0, 
Integer::sum);
 
 Review comment:
   If this is the entry point, then we can do the check there. The important 
bit is to maintain the invariant that we do not start containers for 
`WorkerResourceSpecs` which are larger than the container.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r404084546
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ##
 @@ -42,9 +40,9 @@
 * Requests to allocate a resource with the given {@link 
ResourceProfile}.
 *
 * @param resourceProfile for the to be allocated resource
-* @return Collection of {@link ResourceProfile} describing the 
allocated slots
+* @return whether the resource can be allocated
 */
-   Collection allocateResource(ResourceProfile 
resourceProfile);
+   boolean allocateResource(ResourceProfile resourceProfile);
 
 Review comment:
   All right, understood.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r404083978
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -807,23 +811,32 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
return false;
}
 
-   private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   private Optional 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+   if (defaultWorkerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
+   return Optional.empty();
+   }
 
-   if (requestedSlots.isEmpty()) {
+   if (!Preconditions.checkNotNull(defaultSlotResourceProfile,
+   "defaultSlotResourceProfile should be null iff 
taskExecutorProcessSpec is null, which means standalone mode.")
+   .isMatching(requestedSlotResourceProfile)) {
+   // requested resource profile is unfulfillable
return Optional.empty();
-   } else {
-   final Iterator slotIterator = 
requestedSlots.iterator();
-   final PendingTaskManagerSlot pendingTaskManagerSlot = 
new PendingTaskManagerSlot(slotIterator.next());
-   
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), 
pendingTaskManagerSlot);
+   }
 
 Review comment:
   Failing in case that the requested slot profile is too large sound good to 
me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r404082884
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication(
 * Allocates a resource using the resource profile.
 *
 * @param resourceProfile The resource description
-* @return Collection of {@link ResourceProfile} describing the 
launched slots
+* @return whether the resource can be allocated
 */
@VisibleForTesting
-   public abstract Collection 
startNewWorker(ResourceProfile resourceProfile);
+   public abstract boolean startNewWorker(ResourceProfile resourceProfile);
 
 Review comment:
   Ok if I understand it correctly, then this PR relies on the fact that the 
`RM` and the `SlotManager` both get the same (implicitly or explicitly) 
`WorkerResourceSpec` assigned, right?
   
   I think it can still happen that the RM won't be able to start exactly a 
container of the size `WorkerResourceSpec`, even after later changes. Imagine 
that we are on Yarn were the requested `WorkerResourceSpec` is below the 
minimum. How should Flink behave in this situation? Should we fail? Should we 
leave some of the resources unused? Or should we tell the `SlotManager` about 
it so that it can take the additional resources into account?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r404080033
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec = 
TaskExecutorProcessUtils
+   .newProcessSpecBuilder(configuration)
+   .withCpuCores(getDefaultCpus(configuration))
+   .build();
+   return 
WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec);
+   }
+
+   protected abstract CPUResource getDefaultCpus(Configuration 
configuration);
 
 Review comment:
   My suggestion would be to pass it to the `ActiveResourceManagerFactory` when 
creating it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403037228
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -153,6 +155,8 @@ public SlotManagerImpl(
this.waitResultConsumedBeforeRelease = 
waitResultConsumedBeforeRelease;
this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
this.numSlotsPerWorker = numSlotsPerWorker;
+   this.defaultSlotResourceProfile = defaultWorkerResourceSpec != 
null ?
+   
generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, 
numSlotsPerWorker) : null;
 
 Review comment:
   Instead of setting `defaultSlotResourceProfile` to `null`, can't we set it 
to `ResourceProfile.UNKNOWN` or something like this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403037763
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -807,23 +811,32 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
return false;
}
 
-   private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   private Optional 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+   if (defaultWorkerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
+   return Optional.empty();
+   }
 
 Review comment:
   I think here we are leaking details from the enclosing component into the 
`SlotManager`. I think the `SlotManager` should not decide for the 
`ResourceManager` whether it can allocate new resources or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403060510
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1051,13 +1050,13 @@ protected abstract void internalDeregisterApplication(
@Nullable String optionalDiagnostics) throws 
ResourceManagerException;
 
/**
-* Allocates a resource using the resource profile.
+* Allocates a resource using the worker resource specification.
 *
-* @param resourceProfile The resource description
+* @param workerResourceSpec that describes the to be allocated worker.
 
 Review comment:
   ```suggestion
 * @param workerResourceSpec that describes the to be allocated worker
   ```
   
   param, return or throws tags are usually not terminated by a period.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403048662
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
 ##
 @@ -52,6 +53,9 @@
  */
 public class SlotManagerFailUnfulfillableTest extends TestLogger {
 
+   private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new 
WorkerResourceSpec(
+   100.0, 1, 1, 1, 1);
 
 Review comment:
   Looking at this line, I would recommend introducing a builder for the 
`WorkerResourceSpec` and to make the default constructor private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403053905
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) {
//  Resource Management
// 

 
-   protected int getNumberRequiredTaskManagerSlots() {
-   return slotManager.getNumberPendingTaskManagerSlots();
+   protected int getNumberRequiredTaskManagers() {
+   return getPendingWorkerNums().values().stream().reduce(0, 
Integer::sum);
 
 Review comment:
   For the time being I would add a check state which ensures that the 
`WorkerResourceSpecs` can all be fulfilled by the container size we are using 
to start new containers/pods. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403032569
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -133,14 +133,17 @@
@Nullable
private final WorkerResourceSpec defaultWorkerResourceSpec;
 
+   private final int numSlotsPerWorker;
+
public SlotManagerImpl(
SlotMatchingStrategy slotMatchingStrategy,
ScheduledExecutor scheduledExecutor,
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout,
boolean waitResultConsumedBeforeRelease,
-   @Nullable WorkerResourceSpec defaultWorkerResourceSpec) 
{
+   @Nullable WorkerResourceSpec defaultWorkerResourceSpec,
+   int numSlotsPerWorker) {
 
 Review comment:
   Maybe it would make sense to pass in the `SlotManagerConfiguration`. If this 
is possible, then one would not have to add another field to the constructor 
every time we add a new configuration parameter for the `SlotManager`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403054549
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -207,6 +208,18 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
}
 
@Override
+   public Map getPendingWorkerNums() {
+   final int pendingWorkerNum = (int) Math.ceil((double) 
pendingSlots.size() / numSlotsPerWorker);
+   if (pendingWorkerNum > 0) {
+   return Collections.singletonMap(
+   
Preconditions.checkNotNull(defaultWorkerResourceSpec,
+   "There should never be pending 
slots/workers in standalone mode."),
 
 Review comment:
   I think this should not be necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403054142
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -51,7 +53,7 @@
 
int getNumberFreeSlotsOf(InstanceID instanceId);
 
-   int getNumberPendingTaskManagerSlots();
+   Map getPendingWorkerNums();
 
 Review comment:
   `JavaDoc` missing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403056960
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -156,8 +156,8 @@ protected void 
internalDeregisterApplication(ApplicationStatus finalStatus, @Nul
}
 
@Override
-   public boolean startNewWorker(ResourceProfile resourceProfile) {
-   LOG.info("Starting new worker with resource profile, {}", 
resourceProfile);
+   public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
+   LOG.info("Starting new worker with worker resource spec, {}", 
workerResourceSpec);
requestKubernetesPod();
 
 Review comment:
   I guess that we need a check that the configured pod fulfills 
`workerResourceSpec`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r401737501
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -125,20 +126,29 @@
 * */
private boolean failUnfulfillableRequest = true;
 
+   /**
+* The default resource spec of workers to request. It could be null 
iff on standalone/local setups, in which cases
+* the {@link SlotManagerImpl} will never request any new worker.
+*/
+   @Nullable
+   private final WorkerResourceSpec defaultWorkerResourceSpec;
 
 Review comment:
   Instead of making this field nullable, I think it would be better to have an 
`UnspecifiedWorkerResourceSpec` instance which we can pass to the 
`SlotManagerImpl` in case it runs in standalone mode. I think this would also 
better reflect that the `SlotManagerImpl` does not really care whether it's 
running in a Standalone- or YarnResourceManager. At the moment, the 
`SlotManagerImpl` contains code which behaves differently depending on whether 
this field is null or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403030485
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec = 
TaskExecutorProcessUtils
+   .newProcessSpecBuilder(configuration)
+   .withCpuCores(getDefaultCpus(configuration))
+   .build();
+   return 
WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec);
+   }
+
+   protected abstract CPUResource getDefaultCpus(Configuration 
configuration);
 
 Review comment:
   The same applies to this method. I'm not entirely sure whether this class is 
the right place for this method. The `ActiveResourceManagerFactory` should not 
need to know how to derive these things ideally.
   
   Not saying that we have to do it right away but I think the cleaner approach 
would be to pass a `WorkerResourceSpecFactory` to the 
`ActiveResourceManagerFactory` or even better, pass simply the default 
`WorkerResourceSpec` to the constructor of the `ActiveResourceManagerFactory`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r401726831
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests for {@link WorkerResourceSpec}.
+ */
+public class WorkerResourceSpecTest {
 
 Review comment:
   ```suggestion
   public class WorkerResourceSpecTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403062228
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ##
 @@ -37,12 +37,12 @@
void releaseResource(InstanceID instanceId, Exception cause);
 
/**
-* Requests to allocate a resource with the given {@link 
ResourceProfile}.
+* Requests to allocate a resource with the given {@link 
WorkerResourceSpec}.
 *
-* @param resourceProfile for the to be allocated resource
+* @param workerResourceSpec for the to be allocated worker
 
 Review comment:
   Usually `@param` requires a noun phrase: `@param workerResourceSpec 
workerResourceSpec specifies the size of the to be allocated resource`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r401731886
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
 ##
 @@ -80,4 +84,10 @@ public static KubernetesResourceManagerFactory 
getInstance() {
fatalErrorHandler,
resourceManagerMetricGroup);
}
+
+   @Override
+   protected CPUResource getDefaultCpus(Configuration configuration) {
+   double fallback = 
configuration.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU);
 
 Review comment:
   Out of scope for this PR but at some point we should unify the different CPU 
options for different deployments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403063198
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -288,7 +288,7 @@ protected void internalDeregisterApplication(
}
 
@Override
-   public boolean startNewWorker(ResourceProfile resourceProfile) {
+   public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
 
 Review comment:
   I think we should ensure that `workerResourceSpec` can be fulfilled by the 
actual container size the `YarnResourceManager` starts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403046204
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ##
 @@ -42,9 +40,9 @@
 * Requests to allocate a resource with the given {@link 
ResourceProfile}.
 *
 * @param resourceProfile for the to be allocated resource
-* @return Collection of {@link ResourceProfile} describing the 
allocated slots
+* @return whether the resource can be allocated
 */
-   Collection allocateResource(ResourceProfile 
resourceProfile);
+   boolean allocateResource(ResourceProfile resourceProfile);
 
 Review comment:
   Wouldn't it be required for future developments that we can tell 
`allocateResource` how big resource-wise the worker should be?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403051769
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -267,10 +267,9 @@ private void requestKubernetesPod() {
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   final int pendingTaskManagerSlots = numPendingPodRequests * 
numSlotsPerTaskManager;
+   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
 
-   if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+   if (requiredTaskManagers > numPendingPodRequests) {
 
 Review comment:
   Unrelated. Wouldn't a while loop be better here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403035761
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication(
 * Allocates a resource using the resource profile.
 *
 * @param resourceProfile The resource description
-* @return Collection of {@link ResourceProfile} describing the 
launched slots
+* @return whether the resource can be allocated
 */
@VisibleForTesting
-   public abstract Collection 
startNewWorker(ResourceProfile resourceProfile);
+   public abstract boolean startNewWorker(ResourceProfile resourceProfile);
 
 Review comment:
   What happens if the RM can only start a multiple of `resourceProfile`? 
Wouldn't we calculate the number of available slots wrongly in this case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403055120
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -524,10 +524,9 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
private void requestYarnContainerIfRequired() {
-   int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   int pendingTaskManagerSlots = numPendingContainerRequests * 
numSlotsPerTaskManager;
+   int requiredTaskManagers = getNumberRequiredTaskManagers();
 
-   if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+   if (requiredTaskManagers > numPendingContainerRequests) {
 
 Review comment:
   Same here with the `while` loop. But also unrelated to this PR as the other 
comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403028625
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
 
 Review comment:
   I'm wondering whether this should really be a method belonging to the 
`ActiveResourceManagerFactory`. If I look at the body, then it could perfectly 
be a static method. No need to be method of `ActiveResourceManagerFactory` 
since it does not access any state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r401738574
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -125,20 +126,29 @@
 * */
private boolean failUnfulfillableRequest = true;
 
+   /**
+* The default resource spec of workers to request. It could be null 
iff on standalone/local setups, in which cases
+* the {@link SlotManagerImpl} will never request any new worker.
+*/
+   @Nullable
+   private final WorkerResourceSpec defaultWorkerResourceSpec;
 
 Review comment:
   A nice side-effect would be that everywhere where we now pass `null` we 
would have a nice and descriptive name `WorkerResourceSpec.UNSPECIFIED` or 
`WorkerResourceSpec.getUnknownSpec()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403039671
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -807,23 +811,32 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
return false;
}
 
-   private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   private Optional 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+   if (defaultWorkerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
+   return Optional.empty();
+   }
 
-   if (requestedSlots.isEmpty()) {
+   if (!Preconditions.checkNotNull(defaultSlotResourceProfile,
+   "defaultSlotResourceProfile should be null iff 
taskExecutorProcessSpec is null, which means standalone mode.")
+   .isMatching(requestedSlotResourceProfile)) {
+   // requested resource profile is unfulfillable
return Optional.empty();
-   } else {
-   final Iterator slotIterator = 
requestedSlots.iterator();
-   final PendingTaskManagerSlot pendingTaskManagerSlot = 
new PendingTaskManagerSlot(slotIterator.next());
-   
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), 
pendingTaskManagerSlot);
+   }
 
 Review comment:
   Shouldn't we instead of returning an `Optional.empty()` fail with an invalid 
state exception because the current `SlotManagerImpl` does not support custom 
resource profiles yet?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services