NealSun96 commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r443069639
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +29,89 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
- WorkflowControllerDataProvider dataProvider =
- event.getAttribute(AttributeName.ControllerDataProvider.name());
+ public void process(ClusterEvent event) throws Exception {
+ // Use main thread to compute what jobs need to be purged, and what
workflows need to be gc'ed.
+ // This is to avoid race conditions since the cache will be modified.
After this work, then the
+ // async work will happen.
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
-
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ "HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Set<WorkflowConfig> existingWorkflows =
- new HashSet<>(dataProvider.getWorkflowConfigMap().values());
- for (WorkflowConfig workflowConfig : existingWorkflows) {
- // clean up the expired jobs if it is a queue.
+ Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+ Set<String> workflowsToBeDeleted = new HashSet<>();
+ WorkflowControllerDataProvider dataProvider =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
+ for (Map.Entry<String, ZNRecord> entry :
dataProvider.getContexts().entrySet()) {
+ WorkflowConfig workflowConfig =
dataProvider.getWorkflowConfig(entry.getKey());
if (workflowConfig != null && (!workflowConfig.isTerminable() ||
workflowConfig
.isJobQueue())) {
- try {
- TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(),
workflowConfig,
- dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()),
manager,
- _rebalanceScheduler);
- } catch (Exception e) {
- LOG.warn(String.format("Failed to purge job for workflow %s with
reason %s",
- workflowConfig.getWorkflowId(), e.toString()));
+ WorkflowContext workflowContext =
dataProvider.getWorkflowContext(entry.getKey());
+ long purgeInterval = workflowConfig.getJobPurgeInterval();
+ long currentTime = System.currentTimeMillis();
+ if (purgeInterval > 0
+ && workflowContext.getLastJobPurgeTime() + purgeInterval <=
currentTime) {
+ // Find jobs that are ready to be purged
+ Set<String> expiredJobs =
+ TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig,
workflowContext);
+ if (!expiredJobs.isEmpty()) {
+ expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
+ }
+ scheduleNextJobPurge(workflowConfig.getWorkflowId(), currentTime,
purgeInterval,
Review comment:
This line is moved from the original purgeExpiredJob and moved here. The
reason is that by triggering, we no longer need to pass `purgeInterval` to the
async thread. I will elaborate on this in another comment.
I think it's necessary, right? The goal of this is to trigger a purge even
if there is no event, I believe.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]