xintongsong commented on a change in pull request #14561: URL: https://github.com/apache/flink/pull/14561#discussion_r553079447
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java ########## @@ -275,7 +275,10 @@ public void processResourceRequirements(ResourceRequirements resourceRequirement */ @Override public boolean registerTaskManager( - final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + final TaskExecutorConnection taskExecutorConnection, + SlotReport initialSlotReport, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { Review comment: JavaDoc should be updated accordingly. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java ########## @@ -459,7 +459,10 @@ public boolean unregisterSlotRequest(AllocationID allocationId) { */ @Override public boolean registerTaskManager( - final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + final TaskExecutorConnection taskExecutorConnection, + SlotReport initialSlotReport, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { Review comment: JavaDoc should be updated accordingly. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ########## @@ -464,7 +464,11 @@ private void stopResourceManagerServices() throws Exception { taskExecutors.get(taskManagerResourceId); if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) { - if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) { + if (slotManager.registerTaskManager( + workerTypeWorkerRegistration, + slotReport, + workerTypeWorkerRegistration.getTotalResourceProfile(), + workerTypeWorkerRegistration.getDefaultSlotResourceProfile())) { Review comment: It's a bit weird that we have to pass in `workerTypeWorkerRegistration. getTotalResourceProfile()` and `workerTypeWorkerRegistration. getDefaultSlotResourceProfile()` when we have already passed in `workerTypeWorkerRegistration`. Despite the names, I think the boundary between `TaskExecutorConnection` and `WorkerRegistration` is that, the former contains information needed in `SlotManager` while the latter contains additional information needed in `ResourceManager`. (The name `TaskExecutorConnection` is probably because previously we need nothing more than the IDs and the RPC gateway in `SlotManager`.) Since the total and default slot resource profiles are only used in `SlotManager`, we probably should move them into `TaskExecutorConnection`. We may also rename the two classes as follows to explicitly suggest their scope of usage. * `WorkerRegistration` -> `ResourceManagerWorkerRegistration` * `TaskExecutorConnection` -> `SlotManagerWorkerRegistration` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org