dasahcc commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r444523696



##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
##########
@@ -40,5 +40,9 @@
   PipelineType,
   LastRebalanceFinishTimeStamp,
   ControllerDataProvider,
-  STATEFUL_REBALANCER
+  STATEFUL_REBALANCER,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse 
could cause race conditions.
+  WORKFLOWS_TO_BE_DELETED,
+  // This attribute should only be used in TaskGarbageCollectionStage, misuse 
could cause race conditions.
+  EXPIRED_JOBS_MAP

Review comment:
       You can make it:
   
   WORKFLOW_TO_BE_DELETED with JOB_TO_BE_DELETED. Or EXPIRED_WORKFLOWS_MAP with 
EXPIRED_JOBS_MAP. Logically, they belong to same group.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -977,72 +1011,69 @@ public static boolean isJobStarted(String job, 
WorkflowContext workflowContext)
   }
 
   /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
+   * Clean up all jobs that are marked as expired.
    */
-  public static void purgeExpiredJobs(String workflow, WorkflowConfig 
workflowConfig,
-      WorkflowContext workflowContext, HelixManager manager,
-      RebalanceScheduler rebalanceScheduler) {
-    if (workflowContext == null) {
-      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
-      return;
+  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+    Set<String> failedJobRemovals = new HashSet<>();
+    for (String job : expiredJobs) {
+      if (!TaskUtil
+          .removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(), job)) {
+        failedJobRemovals.add(job);
+        LOG.warn("Failed to clean up expired and completed jobs from workflow 
" + workflow);
+      }
+      rebalanceScheduler.removeScheduledRebalance(job);
     }
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-    final Set<String> expiredJobs = Sets.newHashSet();
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + 
purgeInterval <= currentTime) {
-      
expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
-          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
-          }
-          rebalanceScheduler.removeScheduledRebalance(job);
-        }
 
-        // If the job removal failed, make sure we do NOT prematurely delete 
it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
+    // If the job removal failed, make sure we do NOT prematurely delete it 
from DAG so that the
+    // removal will be tried again at next purge
+    expiredJobs.removeAll(failedJobRemovals);
 
-        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), 
workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs
-              + " from the workflow " + workflow);
-        }
+    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, 
expiredJobs, true)) {
+      LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs + 
" from the workflow "
+          + workflow);
+    }
 
-        if (expiredJobs.size() > 0) {
-          // Update workflow context will be in main pipeline not here. 
Otherwise, it will cause
-          // concurrent write issue. It is possible that jobs got purged but 
there is no event to
-          // trigger the pipeline to clean context.
-          HelixDataAccessor accessor = manager.getHelixDataAccessor();
-          List<String> resourceConfigs =
-              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-          if (resourceConfigs.size() > 0) {
-            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 
0L);
-          } else {
-            LOG.warn(
-                "No resource config to trigger rebalance for clean up contexts 
for" + expiredJobs);
-          }
-        }
+    if (expiredJobs.size() > 0) {
+      // Update workflow context will be in main pipeline not here. Otherwise, 
it will cause
+      // concurrent write issue. It is possible that jobs got purged but there 
is no event to
+      // trigger the pipeline to clean context.
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      List<String> resourceConfigs =
+          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+      if (resourceConfigs.size() > 0) {
+        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+      } else {
+        LOG.warn("No resource config to trigger rebalance for clean up 
contexts for" + expiredJobs);
       }
     }
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval, 
rebalanceScheduler, manager);

Review comment:
       One concern here is that if we move the logic from function to the 
stage. We make the clean up logic dedicated to pipeline. Say some one they just 
want to call the purge from REST and do a job purge. You will not be able to do 
it.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -977,72 +1011,70 @@ public static boolean isJobStarted(String job, 
WorkflowContext workflowContext)
   }
 
   /**
-   * Clean up all jobs that are COMPLETED and passes its expiry time.
-   * @param workflowConfig
-   * @param workflowContext
+   * Clean up all jobs that are marked as expired.
    */
-  public static void purgeExpiredJobs(String workflow, WorkflowConfig 
workflowConfig,
-      WorkflowContext workflowContext, HelixManager manager,
-      RebalanceScheduler rebalanceScheduler) {
-    if (workflowContext == null) {
-      LOG.warn(String.format("Workflow %s context does not exist!", workflow));
-      return;
+  public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs,
+      HelixManager manager, RebalanceScheduler rebalanceScheduler) {
+    Set<String> failedJobRemovals = new HashSet<>();
+    for (String job : expiredJobs) {
+      if (!TaskUtil
+          .removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(), job)) {
+        failedJobRemovals.add(job);
+        LOG.warn("Failed to clean up expired and completed jobs from workflow 
{}!", workflow);
+      }
+      rebalanceScheduler.removeScheduledRebalance(job);
     }
-    long purgeInterval = workflowConfig.getJobPurgeInterval();
-    long currentTime = System.currentTimeMillis();
-    final Set<String> expiredJobs = Sets.newHashSet();
-    if (purgeInterval > 0 && workflowContext.getLastJobPurgeTime() + 
purgeInterval <= currentTime) {
-      
expiredJobs.addAll(TaskUtil.getExpiredJobs(manager.getHelixDataAccessor(),
-          manager.getHelixPropertyStore(), workflowConfig, workflowContext));
-      if (expiredJobs.isEmpty()) {
-        LOG.info("No job to purge for the queue " + workflow);
-      } else {
-        LOG.info("Purge jobs " + expiredJobs + " from queue " + workflow);
-        Set<String> failedJobRemovals = new HashSet<>();
-        for (String job : expiredJobs) {
-          if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), 
manager.getHelixPropertyStore(),
-              job)) {
-            failedJobRemovals.add(job);
-            LOG.warn("Failed to clean up expired and completed jobs from 
workflow " + workflow);
-          }
-          rebalanceScheduler.removeScheduledRebalance(job);
-        }
 
-        // If the job removal failed, make sure we do NOT prematurely delete 
it from DAG so that the
-        // removal will be tried again at next purge
-        expiredJobs.removeAll(failedJobRemovals);
+    // If the job removal failed, make sure we do NOT prematurely delete it 
from DAG so that the
+    // removal will be tried again at next purge
+    expiredJobs.removeAll(failedJobRemovals);
 
-        if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), 
workflow, expiredJobs,
-            true)) {
-          LOG.warn("Error occurred while trying to remove jobs + " + 
expiredJobs
-              + " from the workflow " + workflow);
-        }
+    if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, 
expiredJobs, true)) {
+      LOG.warn("Error occurred while trying to remove jobs {} from the 
workflow {}!", expiredJobs,
+          workflow);
+    }
 
-        if (expiredJobs.size() > 0) {
-          // Update workflow context will be in main pipeline not here. 
Otherwise, it will cause
-          // concurrent write issue. It is possible that jobs got purged but 
there is no event to
-          // trigger the pipeline to clean context.
-          HelixDataAccessor accessor = manager.getHelixDataAccessor();
-          List<String> resourceConfigs =
-              accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
-          if (resourceConfigs.size() > 0) {
-            RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 
0L);
-          } else {
-            LOG.warn(
-                "No resource config to trigger rebalance for clean up contexts 
for" + expiredJobs);
-          }
-        }
+    if (expiredJobs.size() > 0) {
+      // Update workflow context will be in main pipeline not here. Otherwise, 
it will cause
+      // concurrent write issue. It is possible that jobs got purged but there 
is no event to
+      // trigger the pipeline to clean context.
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      List<String> resourceConfigs =
+          accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+      if (resourceConfigs.size() > 0) {
+        RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L);
+      } else {
+        LOG.warn("No resource config to trigger rebalance for clean up 
contexts for {}!",
+            expiredJobs);
       }
     }
-    setNextJobPurgeTime(workflow, currentTime, purgeInterval, 
rebalanceScheduler, manager);
   }
 
-  private static void setNextJobPurgeTime(String workflow, long currentTime, 
long purgeInterval,
-      RebalanceScheduler rebalanceScheduler, HelixManager manager) {
-    long nextPurgeTime = currentTime + purgeInterval;
-    long currentScheduledTime = rebalanceScheduler.getRebalanceTime(workflow);
-    if (currentScheduledTime == -1 || currentScheduledTime > nextPurgeTime) {
-      rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
+  /**
+   * The function that removes IdealStates and workflow contexts of the 
workflows that need to be
+   * deleted.
+   * @param toBeDeletedWorkflows
+   * @param manager
+   */
+  public static void workflowGarbageCollection(final Set<String> 
toBeDeletedWorkflows,
+      final HelixManager manager) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    HelixPropertyStore<ZNRecord> propertyStore = 
manager.getHelixPropertyStore();
+
+    for (String workflowName : toBeDeletedWorkflows) {
+      LOG.warn(
+          "WorkflowContext exists for workflow {}. However, Workflow Config is 
missing! Deleting the WorkflowConfig and IdealState!!",
+          workflowName);
+
+      if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
+        LOG.warn("Error occurred while trying to remove workflow 
idealstate/externalview for {}.",
+            workflowName);
+        continue;
+      }

Review comment:
       You can mark a TODO here. We dont need it in the future since TF is not 
relying on IS/EV anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to