NealSun96 commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r443070095



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -977,72 +1011,69 @@ public static boolean isJobStarted(String job, 
WorkflowContext workflowContext)
   }
 
   /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
+   * Clean up all jobs that are marked as expired.
    */
-  public static void purgeExpiredJobs(String workflow, WorkflowConfig 
workflowConfig,
-      WorkflowContext workflowContext, HelixManager manager,
-      RebalanceScheduler rebalanceScheduler) {
-    if (workflowContext == null) {
-      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
-      return;
+  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+    Set<String> failedJobRemovals = new HashSet<>();
+    for (String job : expiredJobs) {
+      if (!TaskUtil
+          .removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(), job)) {
+        failedJobRemovals.add(job);
+        LOG.warn("Failed to clean up expired and completed jobs from workflow 
" + workflow);
+      }
+      rebalanceScheduler.removeScheduledRebalance(job);
     }
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-    final Set<String> expiredJobs = Sets.newHashSet();
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + 
purgeInterval <= currentTime) {
-      
expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
-          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
-          }
-          rebalanceScheduler.removeScheduledRebalance(job);
-        }
 
-        // If the job removal failed, make sure we do NOT prematurely delete 
it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
+    // If the job removal failed, make sure we do NOT prematurely delete it 
from DAG so that the
+    // removal will be tried again at next purge
+    expiredJobs.removeAll(failedJobRemovals);
 
-        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), 
workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs
-              + " from the workflow " + workflow);
-        }
+    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, 
expiredJobs, true)) {
+      LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs + 
" from the workflow "
+          + workflow);
+    }
 
-        if (expiredJobs.size() > 0) {
-          // Update workflow context will be in main pipeline not here. 
Otherwise, it will cause
-          // concurrent write issue. It is possible that jobs got purged but 
there is no event to
-          // trigger the pipeline to clean context.
-          HelixDataAccessor accessor = manager.getHelixDataAccessor();
-          List<String> resourceConfigs =
-              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-          if (resourceConfigs.size() > 0) {
-            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 
0L);
-          } else {
-            LOG.warn(
-                "No resource config to trigger rebalance for clean up contexts 
for" + expiredJobs);
-          }
-        }
+    if (expiredJobs.size() > 0) {
+      // Update workflow context will be in main pipeline not here. Otherwise, 
it will cause
+      // concurrent write issue. It is possible that jobs got purged but there 
is no event to
+      // trigger the pipeline to clean context.
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      List<String> resourceConfigs =
+          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+      if (resourceConfigs.size() > 0) {
+        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+      } else {
+        LOG.warn("No resource config to trigger rebalance for clean up 
contexts for" + expiredJobs);
       }
     }
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval, 
rebalanceScheduler, manager);

Review comment:
       This function is moved from here to `TaskGarbageCollectionStage`'s main 
thread. The reason for the move is that purgeInterval no longer needs to be 
passed to the async thread. I believe the trade-off is worth it because the 
frequency isn't critical. FYI: @dasahcc 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to