xintongsong commented on a change in pull request #14561:
URL: https://github.com/apache/flink/pull/14561#discussion_r553079447



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
##########
@@ -275,7 +275,10 @@ public void 
processResourceRequirements(ResourceRequirements resourceRequirement
      */
     @Override
     public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection, SlotReport 
initialSlotReport) {
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {

Review comment:
       JavaDoc should be updated accordingly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -459,7 +459,10 @@ public boolean unregisterSlotRequest(AllocationID 
allocationId) {
      */
     @Override
     public boolean registerTaskManager(
-            final TaskExecutorConnection taskExecutorConnection, SlotReport 
initialSlotReport) {
+            final TaskExecutorConnection taskExecutorConnection,
+            SlotReport initialSlotReport,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {

Review comment:
       JavaDoc should be updated accordingly.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -464,7 +464,11 @@ private void stopResourceManagerServices() throws 
Exception {
                 taskExecutors.get(taskManagerResourceId);
 
         if 
(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId))
 {
-            if (slotManager.registerTaskManager(workerTypeWorkerRegistration, 
slotReport)) {
+            if (slotManager.registerTaskManager(
+                    workerTypeWorkerRegistration,
+                    slotReport,
+                    workerTypeWorkerRegistration.getTotalResourceProfile(),
+                    
workerTypeWorkerRegistration.getDefaultSlotResourceProfile())) {

Review comment:
       It's a bit weird that we have to pass in `workerTypeWorkerRegistration. 
getTotalResourceProfile()` and `workerTypeWorkerRegistration. 
getDefaultSlotResourceProfile()` when we have already passed in 
`workerTypeWorkerRegistration`.
   
   Despite the names, I think the boundary between `TaskExecutorConnection` and 
`WorkerRegistration` is that, the former contains information needed in 
`SlotManager` while the latter contains additional information needed in 
`ResourceManager`. (The name `TaskExecutorConnection` is probably because 
previously we need nothing more than the IDs and the RPC gateway in 
`SlotManager`.)
   
   Since the total and default slot resource profiles are only used in 
`SlotManager`, we probably should move them into `TaskExecutorConnection`. We 
may also rename the two classes as follows to explicitly suggest their scope of 
usage.
   * `WorkerRegistration` -> `ResourceManagerWorkerRegistration`
   * `TaskExecutorConnection` -> `SlotManagerWorkerRegistration`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to