dasahcc commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r442560045



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
##########
@@ -40,5 +40,9 @@
   PipelineType,
   LastRebalanceFinishTimeStamp,
   ControllerDataProvider,
-  STATEFUL_REBALANCER
+  STATEFUL_REBALANCER,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse 
could cause race conditions.
+  WORKFLOWS_TO_BE_DELETED,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse 
could cause race conditions.
+  EXPIRED_JOBS_MAP

Review comment:
       We can name it consistently.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +29,89 @@ public AsyncWorkerType getAsyncWorkerType() {
   }
 
   @Override
-  public void execute(ClusterEvent event) {
-    WorkflowControllerDataProvider dataProvider =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
+  public void process(ClusterEvent event) throws Exception {
+    // Use main thread to compute what jobs need to be purged, and what 
workflows need to be gc'ed.
+    // This is to avoid race conditions since the cache will be modified. 
After this work, then the
+    // async work will happen.
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
-
-    if (dataProvider == null || manager == null) {
+    if (manager == null) {
       LOG.warn(
-          "ResourceControllerDataProvider or HelixManager is null for event 
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          "HelixManager is null for event {}({}) in cluster {}. Skip 
TaskGarbageCollectionStage.",
           event.getEventId(), event.getEventType(), event.getClusterName());
       return;
     }
 
-    Set<WorkflowConfig> existingWorkflows =
-        new HashSet<>(dataProvider.getWorkflowConfigMap().values());
-    for (WorkflowConfig workflowConfig : existingWorkflows) {
-      // clean up the expired jobs if it is a queue.
+    Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+    Set<String> workflowsToBeDeleted = new HashSet<>();
+    WorkflowControllerDataProvider dataProvider =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    for (Map.Entry<String, ZNRecord> entry : 
dataProvider.getContexts().entrySet()) {
+      WorkflowConfig workflowConfig = 
dataProvider.getWorkflowConfig(entry.getKey());
       if (workflowConfig != null && (!workflowConfig.isTerminable() || 
workflowConfig
           .isJobQueue())) {
-        try {
-          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), 
workflowConfig,
-              dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), 
manager,
-              _rebalanceScheduler);
-        } catch (Exception e) {
-          LOG.warn(String.format("Failed to purge job for workflow %s with 
reason %s",
-              workflowConfig.getWorkflowId(), e.toString()));
+        WorkflowContext workflowContext = 
dataProvider.getWorkflowContext(entry.getKey());
+        long purgeInterval = workflowConfig.getJobPurgeInterval();
+        long currentTime = System.currentTimeMillis();
+        if (purgeInterval > 0
+            && workflowContext.getLastJobPurgeTime() + purgeInterval <= 
currentTime) {
+          // Find jobs that are ready to be purged
+          Set<String> expiredJobs =
+              TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, 
workflowContext);
+          if (!expiredJobs.isEmpty()) {
+            expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
+          }
+          scheduleNextJobPurge(workflowConfig.getWorkflowId(), currentTime, 
purgeInterval,

Review comment:
       If you do garbage collection. This triggering is not necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to