narendly commented on a change in pull request #497: Fix job purge logic for job without config URL: https://github.com/apache/helix/pull/497#discussion_r331848053
########## File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ########## @@ -989,47 +988,65 @@ public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConf } long purgeInterval = workflowConfig.getJobPurgeInterval(); long currentTime = System.currentTimeMillis(); - final Set<String> expiredJobs = Sets.newHashSet(); + Set<String> expiredJobs = Sets.newHashSet(); + Set<String> jobsWithoutConfig = Sets.newHashSet(); if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + purgeInterval <= currentTime) { - expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(), - manager.getHelixPropertyStore(), workflowConfig, workflowContext)); - if (expiredJobs.isEmpty()) { + TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(), + manager.getHelixPropertyStore(), workflowConfig, workflowContext, expiredJobs, jobsWithoutConfig); + + if (expiredJobs.isEmpty() && jobsWithoutConfig.isEmpty()) { LOG.info("No job to purge for the queue " + workflow); } else { - LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow); - Set<String> failedJobRemovals = new HashSet<>(); - for (String job : expiredJobs) { - if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), - job)) { - failedJobRemovals.add(job); - LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow); + if (!expiredJobs.isEmpty()) { + LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow); + Set<String> failedJobRemovals = new HashSet<>(); + for (String job : expiredJobs) { + if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), + job)) { + failedJobRemovals.add(job); + LOG.warn("Failed to clean up expired and completed jobs from workflow " + workflow); + } + rebalanceScheduler.removeScheduledRebalance(job); } - rebalanceScheduler.removeScheduledRebalance(job); - } - // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the - // removal will be tried again at next purge - expiredJobs.removeAll(failedJobRemovals); + // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that + // the removal will be tried again at next purge + expiredJobs.removeAll(failedJobRemovals); - if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, - true)) { - LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs - + " from the workflow " + workflow); + if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, + true)) { + LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs + + " from the workflow " + workflow); + } } - - if (expiredJobs.size() > 0) { - // Update workflow context will be in main pipeline not here. Otherwise, it will cause - // concurrent write issue. It is possible that jobs got purged but there is no event to - // trigger the pipeline to clean context. - HelixDataAccessor accessor = manager.getHelixDataAccessor(); - List<String> resourceConfigs = - accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); - if (resourceConfigs.size() > 0) { - RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L); - } else { - LOG.warn( - "No resource config to trigger rebalance for clean up contexts for" + expiredJobs); + if (!jobsWithoutConfig.isEmpty()) { + LOG.info("Purge jobs " + jobsWithoutConfig + " from queue " + workflow); + for (String job : jobsWithoutConfig) { + if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), Review comment: Please review this comment and determine if it's the root cause. Then we would need to make it an issue and fix it as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org