alirezazamani commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r448024936



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +29,87 @@ public AsyncWorkerType getAsyncWorkerType() {
   }
 
   @Override
-  public void execute(ClusterEvent event) {
-    WorkflowControllerDataProvider dataProvider =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
+  public void process(ClusterEvent event) throws Exception {
+    // Use main thread to compute what jobs need to be purged, and what 
workflows need to be gc'ed.
+    // This is to avoid race conditions since the cache will be modified. 
After this work, then the
+    // async work will happen.
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
-
-    if (dataProvider == null || manager == null) {
+    if (manager == null) {
       LOG.warn(
-          "ResourceControllerDataProvider or HelixManager is null for event 
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          "HelixManager is null for event {}({}) in cluster {}. Skip 
TaskGarbageCollectionStage.",
           event.getEventId(), event.getEventType(), event.getClusterName());
       return;
     }
 
-    Set<WorkflowConfig> existingWorkflows =
-        new HashSet<>(dataProvider.getWorkflowConfigMap().values());
-    for (WorkflowConfig workflowConfig : existingWorkflows) {
-      // clean up the expired jobs if it is a queue.
+    Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+    Set<String> workflowsToBeDeleted = new HashSet<>();
+    WorkflowControllerDataProvider dataProvider =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    for (Map.Entry<String, ZNRecord> entry : 
dataProvider.getContexts().entrySet()) {
+      WorkflowConfig workflowConfig = 
dataProvider.getWorkflowConfig(entry.getKey());
       if (workflowConfig != null && (!workflowConfig.isTerminable() || 
workflowConfig
           .isJobQueue())) {
-        try {
-          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), 
workflowConfig,
-              dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), 
manager,
-              _rebalanceScheduler);
-        } catch (Exception e) {
-          LOG.warn(String.format("Failed to purge job for workflow %s with 
reason %s",
-              workflowConfig.getWorkflowId(), e.toString()));
+        WorkflowContext workflowContext = 
dataProvider.getWorkflowContext(entry.getKey());

Review comment:
       Can we have workflow config without having context? Do we need to do 
null check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to