NealSun96 commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r446355773



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -977,72 +1011,69 @@ public static boolean isJobStarted(String job, 
WorkflowContext workflowContext)
   }
 
   /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
+   * Clean up all jobs that are marked as expired.
    */
-  public static void purgeExpiredJobs(String workflow, WorkflowConfig 
workflowConfig,
-      WorkflowContext workflowContext, HelixManager manager,
-      RebalanceScheduler rebalanceScheduler) {
-    if (workflowContext == null) {
-      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
-      return;
+  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+    Set<String> failedJobRemovals = new HashSet<>();
+    for (String job : expiredJobs) {
+      if (!TaskUtil
+          .removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(), job)) {
+        failedJobRemovals.add(job);
+        LOG.warn("Failed to clean up expired and completed jobs from workflow 
" + workflow);
+      }
+      rebalanceScheduler.removeScheduledRebalance(job);
     }
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-    final Set<String> expiredJobs = Sets.newHashSet();
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + 
purgeInterval <= currentTime) {
-      
expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
-          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
-          }
-          rebalanceScheduler.removeScheduledRebalance(job);
-        }
 
-        // If the job removal failed, make sure we do NOT prematurely delete 
it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
+    // If the job removal failed, make sure we do NOT prematurely delete it 
from DAG so that the
+    // removal will be tried again at next purge
+    expiredJobs.removeAll(failedJobRemovals);
 
-        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), 
workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs
-              + " from the workflow " + workflow);
-        }
+    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, 
expiredJobs, true)) {
+      LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs + 
" from the workflow "
+          + workflow);
+    }
 
-        if (expiredJobs.size() > 0) {
-          // Update workflow context will be in main pipeline not here. 
Otherwise, it will cause
-          // concurrent write issue. It is possible that jobs got purged but 
there is no event to
-          // trigger the pipeline to clean context.
-          HelixDataAccessor accessor = manager.getHelixDataAccessor();
-          List<String> resourceConfigs =
-              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-          if (resourceConfigs.size() > 0) {
-            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 
0L);
-          } else {
-            LOG.warn(
-                "No resource config to trigger rebalance for clean up contexts 
for" + expiredJobs);
-          }
-        }
+    if (expiredJobs.size() > 0) {
+      // Update workflow context will be in main pipeline not here. Otherwise, 
it will cause
+      // concurrent write issue. It is possible that jobs got purged but there 
is no event to
+      // trigger the pipeline to clean context.
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      List<String> resourceConfigs =
+          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+      if (resourceConfigs.size() > 0) {
+        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+      } else {
+        LOG.warn("No resource config to trigger rebalance for clean up 
contexts for" + expiredJobs);
       }
     }
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval, 
rebalanceScheduler, manager);

Review comment:
       That's a good point, but I think you may have gotten this backwards? 
`setNextJobPurgeTime` should be a part of the pipeline, so it makes more sense 
to be in the stage instead of the function, right? If there is ever a REST 
endpoint that triggers the function, we don't want to schedule a rebalance. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to