alirezazamani commented on a change in pull request #497: Fix job purge logic 
for job without config
URL: https://github.com/apache/helix/pull/497#discussion_r331720377
 
 

 ##########
 File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
 ##########
 @@ -990,46 +1009,65 @@ public static void purgeExpiredJobs(String workflow, 
WorkflowConfig workflowConf
     long purgeInterval = workflowConfig.getJobPurgeInterval();
     long currentTime = System.currentTimeMillis();
     final Set<String> expiredJobs = Sets.newHashSet();
+    final Set<String> misconfiguredJobs = Sets.newHashSet();
     if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + 
purgeInterval <= currentTime) {
       
expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
           manager.getHelixPropertyStore(), workflowConfig, workflowContext));
-      if (expiredJobs.isEmpty()) {
+      
misconfiguredJobs.addAll(TaskUtil.getMisconfiguredJobs(manager.getHelixDataAccessor(),
+          workflowConfig, workflowContext));
+      if (expiredJobs.isEmpty() && misconfiguredJobs.isEmpty()) {
         LOG.info("No job to purge for the queue " + workflow);
       } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
+        if (!expiredJobs.isEmpty()) {
+          LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
+          Set<String> failedJobRemovals = new HashSet<>();
+          for (String job : expiredJobs) {
+            if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(),
+                job)) {
+              failedJobRemovals.add(job);
+              LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
+            }
+            rebalanceScheduler.removeScheduledRebalance(job);
           }
-          rebalanceScheduler.removeScheduledRebalance(job);
-        }
 
-        // If the job removal failed, make sure we do NOT prematurely delete 
it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
+          // If the job removal failed, make sure we do NOT prematurely delete 
it from DAG so that
+          // the removal will be tried again at next purge
+          expiredJobs.removeAll(failedJobRemovals);
 
-        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), 
workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs
-              + " from the workflow " + workflow);
+          if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), 
workflow, expiredJobs,
+              true)) {
+            LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs
+                + " from the workflow " + workflow);
+          }
         }
-
-        if (expiredJobs.size() > 0) {
-          // Update workflow context will be in main pipeline not here. 
Otherwise, it will cause
-          // concurrent write issue. It is possible that jobs got purged but 
there is no event to
-          // trigger the pipeline to clean context.
-          HelixDataAccessor accessor = manager.getHelixDataAccessor();
-          List<String> resourceConfigs =
-              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-          if (resourceConfigs.size() > 0) {
-            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 
0L);
-          } else {
-            LOG.warn(
-                "No resource config to trigger rebalance for clean up contexts 
for" + expiredJobs);
+        if (!misconfiguredJobs.isEmpty()) {
+          LOG.info("Purge jobs " + misconfiguredJobs + " from queue " + 
workflow);
+          for (String job : misconfiguredJobs) {
+            if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(),
+                job)) {
+              LOG.warn("Failed to clean up misconfigured jobs from workflow " 
+ workflow);
+            }
+            rebalanceScheduler.removeScheduledRebalance(job);
           }
+          if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), 
workflow,
+              misconfiguredJobs, true)) {
+            LOG.warn("Error occurred while trying to remove jobs + " + 
misconfiguredJobs
+                + " from the workflow " + workflow);
+          }
+        }
+      }
 
 Review comment:
   Your point is kind of true. However, we treat them differently. In one 
expired jobs we try to remove them. For jobs without config, we remove them 
from the dag. There is no point in trying to remove jobs without config (here 
we just try it once) because they do not have any config and remove will fail. 
Expired job which have been failed to be removed, are added to the DAG for 
retry. But jobs without config will be removed from the DAG.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to