[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r409483913 ## File path: flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java ## @@ -100,8 +100,10 @@ private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) { ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER); - // we have to use the maximum parallelism as a default here, otherwise streaming pipelines would not run - int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, maximumParallelism); + int numSlotsPerTaskManager = configuration.getOptional(TaskManagerOptions.NUM_TASK_SLOTS) + .orElseGet(() -> maximumParallelism > 0 ? + (int) Math.ceil((double) maximumParallelism / numTaskManagers) : Review comment: Can't we use `MathUtils.divideRoundUp` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408972482 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication( * Allocates a resource using the resource profile. * * @param resourceProfile The resource description -* @return Collection of {@link ResourceProfile} describing the launched slots +* @return whether the resource can be allocated */ @VisibleForTesting - public abstract Collection startNewWorker(ResourceProfile resourceProfile); + public abstract boolean startNewWorker(ResourceProfile resourceProfile); Review comment: Sounds good to me. Thanks for the background information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408964659 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -199,6 +200,14 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) { } @Override + public Map getPendingWorkerNums() { + final int pendingWorkerNum = (int) Math.ceil((double) pendingSlots.size() / numSlotsPerWorker); Review comment: On the other hand, it complicates the expression a bit. Maybe introducing a name for this function `multipleOfAndLargerOrEqualThan(numSlotsPerWorker, pendingSlots.size())`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408964926 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -524,10 +524,9 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { * Request new container if pending containers cannot satisfy pending slot requests. */ private void requestYarnContainerIfRequired() { - int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); - int pendingTaskManagerSlots = numPendingContainerRequests * numSlotsPerTaskManager; + int requiredTaskManagers = getNumberRequiredTaskManagers(); Review comment: ```suggestion final int requiredTaskManagers = getNumberRequiredTaskManagers(); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408962507 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -199,6 +200,14 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) { } @Override + public Map getPendingWorkerNums() { + final int pendingWorkerNum = (int) Math.ceil((double) pendingSlots.size() / numSlotsPerWorker); Review comment: If we want to get rid of back and forth casting and double calculations: ```suggestion final int pendingWorkerNum = (pendingSlots.size() - 1) / numSlotsPerWorker + 1; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408965071 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -524,10 +524,9 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { * Request new container if pending containers cannot satisfy pending slot requests. */ private void requestYarnContainerIfRequired() { - int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); - int pendingTaskManagerSlots = numPendingContainerRequests * numSlotsPerTaskManager; + int requiredTaskManagers = getNumberRequiredTaskManagers(); - if (requiredTaskManagerSlots > pendingTaskManagerSlots) { + while (requiredTaskManagers-- > numPendingContainerRequests) { Review comment: ```suggestion while (requiredTaskManagers > numPendingContainerRequests) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408959570 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -267,10 +267,9 @@ private void requestKubernetesPod() { * Request new pod if pending pods cannot satisfy pending slot requests. */ private void requestKubernetesPodIfRequired() { - final int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); - final int pendingTaskManagerSlots = numPendingPodRequests * numSlotsPerTaskManager; + int requiredTaskManagers = getNumberRequiredTaskManagers(); Review comment: ```suggestion final int requiredTaskManagers = getNumberRequiredTaskManagers(); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408959505 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -267,10 +267,9 @@ private void requestKubernetesPod() { * Request new pod if pending pods cannot satisfy pending slot requests. */ private void requestKubernetesPodIfRequired() { - final int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); - final int pendingTaskManagerSlots = numPendingPodRequests * numSlotsPerTaskManager; + int requiredTaskManagers = getNumberRequiredTaskManagers(); - if (requiredTaskManagerSlots > pendingTaskManagerSlots) { + while (requiredTaskManagers-- > numPendingPodRequests) { Review comment: ```suggestion while (requiredTaskManagers > numPendingPodRequests) { ``` I think `requestKubernetesPod` will increase `numPendingPodRequests`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408960934 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ## @@ -51,7 +53,12 @@ int getNumberFreeSlotsOf(InstanceID instanceId); - int getNumberPendingTaskManagerSlots(); + /** +* Get number of workers SlotManager requested from {@link ResourceActions} that are not yet fulfilled. +* @return a map whose key set is all the unique resource specs of the pending workers, +* and the corresponding value is number of pending workers of that resource spec. +*/ + Map getPendingWorkerNums(); Review comment: ```suggestion Map getRequiredResources(); ``` maybe? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r408960456 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) { // Resource Management // - protected int getNumberRequiredTaskManagerSlots() { - return slotManager.getNumberPendingTaskManagerSlots(); + protected int getNumberRequiredTaskManagers() { + return getPendingWorkerNums().values().stream().reduce(0, Integer::sum); + } + + protected Map getPendingWorkerNums() { + return slotManager.getPendingWorkerNums(); Review comment: Should we rename this method into `getRequiredResources` or so? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r404630795 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication( * Allocates a resource using the resource profile. * * @param resourceProfile The resource description -* @return Collection of {@link ResourceProfile} describing the launched slots +* @return whether the resource can be allocated */ @VisibleForTesting - public abstract Collection startNewWorker(ResourceProfile resourceProfile); + public abstract boolean startNewWorker(ResourceProfile resourceProfile); Review comment: Hmm, but with this API we will waste resources because the caller thinks it has started a worker of the specified size, right? An idea how to improve the situation is to let the `startNewWorker` method return the size of the container it has started. This would allow the caller to react to it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r404085816 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) { // Resource Management // - protected int getNumberRequiredTaskManagerSlots() { - return slotManager.getNumberPendingTaskManagerSlots(); + protected int getNumberRequiredTaskManagers() { + return getPendingWorkerNums().values().stream().reduce(0, Integer::sum); Review comment: If this is the entry point, then we can do the check there. The important bit is to maintain the invariant that we do not start containers for `WorkerResourceSpecs` which are larger than the container. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r404084546 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ## @@ -42,9 +40,9 @@ * Requests to allocate a resource with the given {@link ResourceProfile}. * * @param resourceProfile for the to be allocated resource -* @return Collection of {@link ResourceProfile} describing the allocated slots +* @return whether the resource can be allocated */ - Collection allocateResource(ResourceProfile resourceProfile); + boolean allocateResource(ResourceProfile resourceProfile); Review comment: All right, understood. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r404083978 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -807,23 +811,32 @@ private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) return false; } - private Optional allocateResource(ResourceProfile resourceProfile) { - final Collection requestedSlots = resourceActions.allocateResource(resourceProfile); + private Optional allocateResource(ResourceProfile requestedSlotResourceProfile) { + if (defaultWorkerResourceSpec == null) { + // standalone mode, cannot allocate resource + return Optional.empty(); + } - if (requestedSlots.isEmpty()) { + if (!Preconditions.checkNotNull(defaultSlotResourceProfile, + "defaultSlotResourceProfile should be null iff taskExecutorProcessSpec is null, which means standalone mode.") + .isMatching(requestedSlotResourceProfile)) { + // requested resource profile is unfulfillable return Optional.empty(); - } else { - final Iterator slotIterator = requestedSlots.iterator(); - final PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); - pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); + } Review comment: Failing in case that the requested slot profile is too large sound good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r404082884 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication( * Allocates a resource using the resource profile. * * @param resourceProfile The resource description -* @return Collection of {@link ResourceProfile} describing the launched slots +* @return whether the resource can be allocated */ @VisibleForTesting - public abstract Collection startNewWorker(ResourceProfile resourceProfile); + public abstract boolean startNewWorker(ResourceProfile resourceProfile); Review comment: Ok if I understand it correctly, then this PR relies on the fact that the `RM` and the `SlotManager` both get the same (implicitly or explicitly) `WorkerResourceSpec` assigned, right? I think it can still happen that the RM won't be able to start exactly a container of the size `WorkerResourceSpec`, even after later changes. Imagine that we are on Yarn were the requested `WorkerResourceSpec` is below the minimum. How should Flink behave in this situation? Should we fail? Should we leave some of the resources unused? Or should we tell the `SlotManager` about it so that it can take the additional resources into account? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r404080033 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java ## @@ -81,4 +83,14 @@ private static Configuration createActiveResourceManagerConfiguration(Configurat ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception; + + protected WorkerResourceSpec createDefaultWorkerResourceSpec(Configuration configuration) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils + .newProcessSpecBuilder(configuration) + .withCpuCores(getDefaultCpus(configuration)) + .build(); + return WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec); + } + + protected abstract CPUResource getDefaultCpus(Configuration configuration); Review comment: My suggestion would be to pass it to the `ActiveResourceManagerFactory` when creating it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403037228 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -153,6 +155,8 @@ public SlotManagerImpl( this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease; this.defaultWorkerResourceSpec = defaultWorkerResourceSpec; this.numSlotsPerWorker = numSlotsPerWorker; + this.defaultSlotResourceProfile = defaultWorkerResourceSpec != null ? + generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker) : null; Review comment: Instead of setting `defaultSlotResourceProfile` to `null`, can't we set it to `ResourceProfile.UNKNOWN` or something like this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403037763 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -807,23 +811,32 @@ private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) return false; } - private Optional allocateResource(ResourceProfile resourceProfile) { - final Collection requestedSlots = resourceActions.allocateResource(resourceProfile); + private Optional allocateResource(ResourceProfile requestedSlotResourceProfile) { + if (defaultWorkerResourceSpec == null) { + // standalone mode, cannot allocate resource + return Optional.empty(); + } Review comment: I think here we are leaking details from the enclosing component into the `SlotManager`. I think the `SlotManager` should not decide for the `ResourceManager` whether it can allocate new resources or not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403060510 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1051,13 +1050,13 @@ protected abstract void internalDeregisterApplication( @Nullable String optionalDiagnostics) throws ResourceManagerException; /** -* Allocates a resource using the resource profile. +* Allocates a resource using the worker resource specification. * -* @param resourceProfile The resource description +* @param workerResourceSpec that describes the to be allocated worker. Review comment: ```suggestion * @param workerResourceSpec that describes the to be allocated worker ``` param, return or throws tags are usually not terminated by a period. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403048662 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java ## @@ -52,6 +53,9 @@ */ public class SlotManagerFailUnfulfillableTest extends TestLogger { + private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new WorkerResourceSpec( + 100.0, 1, 1, 1, 1); Review comment: Looking at this line, I would recommend introducing a builder for the `WorkerResourceSpec` and to make the default constructor private. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403053905 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) { // Resource Management // - protected int getNumberRequiredTaskManagerSlots() { - return slotManager.getNumberPendingTaskManagerSlots(); + protected int getNumberRequiredTaskManagers() { + return getPendingWorkerNums().values().stream().reduce(0, Integer::sum); Review comment: For the time being I would add a check state which ensures that the `WorkerResourceSpecs` can all be fulfilled by the container size we are using to start new containers/pods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403032569 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -133,14 +133,17 @@ @Nullable private final WorkerResourceSpec defaultWorkerResourceSpec; + private final int numSlotsPerWorker; + public SlotManagerImpl( SlotMatchingStrategy slotMatchingStrategy, ScheduledExecutor scheduledExecutor, Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout, boolean waitResultConsumedBeforeRelease, - @Nullable WorkerResourceSpec defaultWorkerResourceSpec) { + @Nullable WorkerResourceSpec defaultWorkerResourceSpec, + int numSlotsPerWorker) { Review comment: Maybe it would make sense to pass in the `SlotManagerConfiguration`. If this is possible, then one would not have to add another field to the constructor every time we add a new configuration parameter for the `SlotManager`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403054549 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -207,6 +208,18 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) { } @Override + public Map getPendingWorkerNums() { + final int pendingWorkerNum = (int) Math.ceil((double) pendingSlots.size() / numSlotsPerWorker); + if (pendingWorkerNum > 0) { + return Collections.singletonMap( + Preconditions.checkNotNull(defaultWorkerResourceSpec, + "There should never be pending slots/workers in standalone mode."), Review comment: I think this should not be necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403054142 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ## @@ -51,7 +53,7 @@ int getNumberFreeSlotsOf(InstanceID instanceId); - int getNumberPendingTaskManagerSlots(); + Map getPendingWorkerNums(); Review comment: `JavaDoc` missing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403056960 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -156,8 +156,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public boolean startNewWorker(ResourceProfile resourceProfile) { - LOG.info("Starting new worker with resource profile, {}", resourceProfile); + public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) { + LOG.info("Starting new worker with worker resource spec, {}", workerResourceSpec); requestKubernetesPod(); Review comment: I guess that we need a check that the configured pod fulfills `workerResourceSpec`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r401737501 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -125,20 +126,29 @@ * */ private boolean failUnfulfillableRequest = true; + /** +* The default resource spec of workers to request. It could be null iff on standalone/local setups, in which cases +* the {@link SlotManagerImpl} will never request any new worker. +*/ + @Nullable + private final WorkerResourceSpec defaultWorkerResourceSpec; Review comment: Instead of making this field nullable, I think it would be better to have an `UnspecifiedWorkerResourceSpec` instance which we can pass to the `SlotManagerImpl` in case it runs in standalone mode. I think this would also better reflect that the `SlotManagerImpl` does not really care whether it's running in a Standalone- or YarnResourceManager. At the moment, the `SlotManagerImpl` contains code which behaves differently depending on whether this field is null or not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403030485 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java ## @@ -81,4 +83,14 @@ private static Configuration createActiveResourceManagerConfiguration(Configurat ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception; + + protected WorkerResourceSpec createDefaultWorkerResourceSpec(Configuration configuration) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils + .newProcessSpecBuilder(configuration) + .withCpuCores(getDefaultCpus(configuration)) + .build(); + return WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec); + } + + protected abstract CPUResource getDefaultCpus(Configuration configuration); Review comment: The same applies to this method. I'm not entirely sure whether this class is the right place for this method. The `ActiveResourceManagerFactory` should not need to know how to derive these things ideally. Not saying that we have to do it right away but I think the cleaner approach would be to pass a `WorkerResourceSpecFactory` to the `ActiveResourceManagerFactory` or even better, pass simply the default `WorkerResourceSpec` to the constructor of the `ActiveResourceManagerFactory`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r401726831 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Tests for {@link WorkerResourceSpec}. + */ +public class WorkerResourceSpecTest { Review comment: ```suggestion public class WorkerResourceSpecTest extends TestLogger { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403062228 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ## @@ -37,12 +37,12 @@ void releaseResource(InstanceID instanceId, Exception cause); /** -* Requests to allocate a resource with the given {@link ResourceProfile}. +* Requests to allocate a resource with the given {@link WorkerResourceSpec}. * -* @param resourceProfile for the to be allocated resource +* @param workerResourceSpec for the to be allocated worker Review comment: Usually `@param` requires a noun phrase: `@param workerResourceSpec workerResourceSpec specifies the size of the to be allocated resource`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r401731886 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java ## @@ -80,4 +84,10 @@ public static KubernetesResourceManagerFactory getInstance() { fatalErrorHandler, resourceManagerMetricGroup); } + + @Override + protected CPUResource getDefaultCpus(Configuration configuration) { + double fallback = configuration.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU); Review comment: Out of scope for this PR but at some point we should unify the different CPU options for different deployments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403063198 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -288,7 +288,7 @@ protected void internalDeregisterApplication( } @Override - public boolean startNewWorker(ResourceProfile resourceProfile) { + public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) { Review comment: I think we should ensure that `workerResourceSpec` can be fulfilled by the actual container size the `YarnResourceManager` starts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403046204 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java ## @@ -42,9 +40,9 @@ * Requests to allocate a resource with the given {@link ResourceProfile}. * * @param resourceProfile for the to be allocated resource -* @return Collection of {@link ResourceProfile} describing the allocated slots +* @return whether the resource can be allocated */ - Collection allocateResource(ResourceProfile resourceProfile); + boolean allocateResource(ResourceProfile resourceProfile); Review comment: Wouldn't it be required for future developments that we can tell `allocateResource` how big resource-wise the worker should be? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403051769 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -267,10 +267,9 @@ private void requestKubernetesPod() { * Request new pod if pending pods cannot satisfy pending slot requests. */ private void requestKubernetesPodIfRequired() { - final int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); - final int pendingTaskManagerSlots = numPendingPodRequests * numSlotsPerTaskManager; + final int requiredTaskManagers = getNumberRequiredTaskManagers(); - if (requiredTaskManagerSlots > pendingTaskManagerSlots) { + if (requiredTaskManagers > numPendingPodRequests) { Review comment: Unrelated. Wouldn't a while loop be better here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403035761 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication( * Allocates a resource using the resource profile. * * @param resourceProfile The resource description -* @return Collection of {@link ResourceProfile} describing the launched slots +* @return whether the resource can be allocated */ @VisibleForTesting - public abstract Collection startNewWorker(ResourceProfile resourceProfile); + public abstract boolean startNewWorker(ResourceProfile resourceProfile); Review comment: What happens if the RM can only start a multiple of `resourceProfile`? Wouldn't we calculate the number of available slots wrongly in this case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403055120 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -524,10 +524,9 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { * Request new container if pending containers cannot satisfy pending slot requests. */ private void requestYarnContainerIfRequired() { - int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); - int pendingTaskManagerSlots = numPendingContainerRequests * numSlotsPerTaskManager; + int requiredTaskManagers = getNumberRequiredTaskManagers(); - if (requiredTaskManagerSlots > pendingTaskManagerSlots) { + if (requiredTaskManagers > numPendingContainerRequests) { Review comment: Same here with the `while` loop. But also unrelated to this PR as the other comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403028625 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java ## @@ -81,4 +83,14 @@ private static Configuration createActiveResourceManagerConfiguration(Configurat ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception; + + protected WorkerResourceSpec createDefaultWorkerResourceSpec(Configuration configuration) { Review comment: I'm wondering whether this should really be a method belonging to the `ActiveResourceManagerFactory`. If I look at the body, then it could perfectly be a static method. No need to be method of `ActiveResourceManagerFactory` since it does not access any state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r401738574 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -125,20 +126,29 @@ * */ private boolean failUnfulfillableRequest = true; + /** +* The default resource spec of workers to request. It could be null iff on standalone/local setups, in which cases +* the {@link SlotManagerImpl} will never request any new worker. +*/ + @Nullable + private final WorkerResourceSpec defaultWorkerResourceSpec; Review comment: A nice side-effect would be that everywhere where we now pass `null` we would have a nice and descriptive name `WorkerResourceSpec.UNSPECIFIED` or `WorkerResourceSpec.getUnknownSpec()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.
tillrohrmann commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity. URL: https://github.com/apache/flink/pull/11320#discussion_r403039671 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ## @@ -807,23 +811,32 @@ private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) return false; } - private Optional allocateResource(ResourceProfile resourceProfile) { - final Collection requestedSlots = resourceActions.allocateResource(resourceProfile); + private Optional allocateResource(ResourceProfile requestedSlotResourceProfile) { + if (defaultWorkerResourceSpec == null) { + // standalone mode, cannot allocate resource + return Optional.empty(); + } - if (requestedSlots.isEmpty()) { + if (!Preconditions.checkNotNull(defaultSlotResourceProfile, + "defaultSlotResourceProfile should be null iff taskExecutorProcessSpec is null, which means standalone mode.") + .isMatching(requestedSlotResourceProfile)) { + // requested resource profile is unfulfillable return Optional.empty(); - } else { - final Iterator slotIterator = requestedSlots.iterator(); - final PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); - pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); + } Review comment: Shouldn't we instead of returning an `Optional.empty()` fail with an invalid state exception because the current `SlotManagerImpl` does not support custom resource profiles yet? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services