alirezazamani commented on a change in pull request #1165:
URL: https://github.com/apache/helix/pull/1165#discussion_r459853722



##########
File path: 
helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
##########
@@ -176,6 +181,178 @@ public void buildAssignableInstances(ClusterConfig 
clusterConfig, TaskDataCache
     computeGlobalThreadBasedCapacity();
   }
 
+  /**
+   * Builds AssignableInstances and restores TaskAssignResults from scratch by 
reading from
+   * CurrentState. It re-computes current quota profile for each 
AssignableInstance.
+   * If a task current state is INIT or RUNNING or if there is a pending 
message which it's ToState
+   * is RUNNING, the task/partition will be assigned to AssignableInstances of 
the instance.
+   * @param clusterConfig
+   * @param taskDataCache
+   * @param liveInstances
+   * @param instanceConfigs
+   * @param currentStateOutput
+   * @param resourceMap
+   */
+  public void buildAssignableInstancesFromCurrentState(ClusterConfig 
clusterConfig,
+      TaskDataCache taskDataCache, Map<String, LiveInstance> liveInstances,
+      Map<String, InstanceConfig> instanceConfigs, CurrentStateOutput 
currentStateOutput,
+      Map<String, Resource> resourceMap) {
+    _assignableInstanceMap.clear();
+    _taskAssignResultMap.clear();
+
+    // Create all AssignableInstance objects based on what's in liveInstances
+    for (Map.Entry<String, LiveInstance> liveInstanceEntry : 
liveInstances.entrySet()) {
+      // Prepare instance-specific metadata
+      String instanceName = liveInstanceEntry.getKey();
+      LiveInstance liveInstance = liveInstanceEntry.getValue();
+      if (!instanceConfigs.containsKey(instanceName)) {
+        continue; // Ill-formatted input; skip over this instance
+      }
+      InstanceConfig instanceConfig = instanceConfigs.get(instanceName);
+
+      // Create an AssignableInstance
+      AssignableInstance assignableInstance =
+          new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
+      _assignableInstanceMap.put(instanceConfig.getInstanceName(), 
assignableInstance);
+      LOG.debug("AssignableInstance created for instance: {}", instanceName);
+    }
+
+    Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();
+
+    // Update task profiles by traversing all CurrentStates
+    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
+      String resourceName = resourceEntry.getKey();
+      if 
(resourceEntry.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME))
 {
+        JobConfig jobConfig = jobConfigMap.get(resourceName);
+        JobContext jobContext = taskDataCache.getJobContext(resourceName);
+        String quotaType = getQuotaType(jobConfig);
+        Map<Partition, Map<String, String>> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceName);
+        for (Map.Entry<Partition, Map<String, String>> currentStateMapEntry : 
currentStateMap
+            .entrySet()) {
+          Partition partition = currentStateMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, String> instanceCurrentStateEntry : 
currentStateMapEntry.getValue()
+              .entrySet()) {
+            String assignedInstance = instanceCurrentStateEntry.getKey();
+            String taskState = instanceCurrentStateEntry.getValue();
+            // If a task in in INIT or RUNNING state on the instance, this 
task should occupy one
+            // quota from this instance.
+            if (taskState.equals(TaskPartitionState.INIT.name())
+                || taskState.equals(TaskPartitionState.RUNNING.name())) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, 
quotaType);
+            }
+          }
+        }
+        Map<Partition, Map<String, Message>> pendingMessageMap =
+            currentStateOutput.getPendingMessageMap(resourceName);
+        for (Map.Entry<Partition, Map<String, Message>> pendingMessageMapEntry 
: pendingMessageMap
+            .entrySet()) {
+          Partition partition = pendingMessageMapEntry.getKey();
+          String taskId = getTaskID(jobConfig, jobContext, partition);
+          for (Map.Entry<String, Message> instancePendingMessageEntry : 
pendingMessageMapEntry
+              .getValue().entrySet()) {
+            String assignedInstance = instancePendingMessageEntry.getKey();
+            String messageToState = 
instancePendingMessageEntry.getValue().getToState();
+            // If there is a pending message on the instance which has ToState 
of RUNNING, the task
+            // will run on the instance soon. So the task needs to occupy one 
quota on this instance.
+            if (messageToState.equals(TaskPartitionState.RUNNING.name())
+                && !TaskPartitionState.INIT.name().equals(
+                    currentStateOutput.getCurrentState(resourceName, 
partition, assignedInstance))
+                && !TaskPartitionState.RUNNING.name().equals(currentStateOutput
+                    .getCurrentState(resourceName, partition, 
assignedInstance))) {
+              assignTaskToInstance(assignedInstance, jobConfig, taskId, 
quotaType);
+            }
+          }
+        }
+      }
+    }
+    LOG.info(
+        "AssignableInstanceManager built AssignableInstances from scratch 
based on contexts in TaskDataCache due to Controller switch or ClusterConfig 
change.");
+    computeGlobalThreadBasedCapacity();
+  }
+
+  /**
+   * Assign the task to the instance's Assignable Instance
+   * @param instance
+   * @param jobConfig
+   * @param taskId
+   * @param quotaType
+   */
+  private void assignTaskToInstance(String instance, JobConfig jobConfig, 
String taskId,
+      String quotaType) {
+    if (_assignableInstanceMap.containsKey(instance)) {
+      TaskConfig taskConfig = getTaskConfig(jobConfig, taskId);
+      AssignableInstance assignableInstance = 
_assignableInstanceMap.get(instance);
+      TaskAssignResult taskAssignResult =
+          assignableInstance.restoreTaskAssignResult(taskId, taskConfig, 
quotaType);
+      if (taskAssignResult.isSuccessful()) {
+        _taskAssignResultMap.put(taskId, taskAssignResult);
+        LOG.debug("TaskAssignResult restored for taskId: {}, assigned on 
instance: {}", taskId,
+            instance);
+      }
+    } else {
+      LOG.debug(
+          "While building AssignableInstance map, discovered that the instance 
a task is assigned to is no "
+              + "longer a LiveInstance! TaskAssignResult will not be created 
and no resource will be taken "
+              + "up for this task. TaskId: {}, Instance: {}",
+          taskId, instance);
+    }
+  }
+
+  /**
+   * Extract the quota type information of the Job
+   * @param jobConfig
+   * @return
+   */
+  private String getQuotaType(JobConfig jobConfig) {
+    // If jobConfig is null (job has been deleted but participant has not 
dropped the task yet), use
+    // default quota for the task
+    if (jobConfig == null) {
+      return AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+    String quotaType = jobConfig.getJobType();
+    if (quotaType == null) {
+      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }

Review comment:
       Done. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to