narendly commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r437152830
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +27,53 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
+ public void process(ClusterEvent event) throws Exception {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ event.addAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name(),
+ dataProvider.getWorkflowConfigMap());
+ event.addAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name(),
dataProvider.getContexts());
+
+ super.process(event);
+ }
+
+ @Override
+ public void execute(ClusterEvent event) {
+ Map<String, WorkflowConfig> workflowConfigMap =
+ event.getAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name());
+ Map<String, ZNRecord> resourceContextMap =
+ event.getAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name());
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ " HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage.",
Review comment:
Why are we starting with a space?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +27,53 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
+ public void process(ClusterEvent event) throws Exception {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ event.addAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name(),
+ dataProvider.getWorkflowConfigMap());
+ event.addAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name(),
dataProvider.getContexts());
Review comment:
How would this affect the memory footprint of each event?
Also, is a soft copy okay here?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +27,53 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
+ public void process(ClusterEvent event) throws Exception {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ event.addAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name(),
+ dataProvider.getWorkflowConfigMap());
+ event.addAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name(),
dataProvider.getContexts());
Review comment:
How would this affect the memory footprint of each event?
Also, is a soft copy okay here? I guess if soft copy is okay, then it won't
be too much overhead because this would just be two more references.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +27,53 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
+ public void process(ClusterEvent event) throws Exception {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ event.addAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name(),
+ dataProvider.getWorkflowConfigMap());
+ event.addAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name(),
dataProvider.getContexts());
+
+ super.process(event);
+ }
+
+ @Override
+ public void execute(ClusterEvent event) {
+ Map<String, WorkflowConfig> workflowConfigMap =
+ event.getAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name());
+ Map<String, ZNRecord> resourceContextMap =
+ event.getAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name());
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ " HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Set<WorkflowConfig> existingWorkflows =
- new HashSet<>(dataProvider.getWorkflowConfigMap().values());
+ Set<WorkflowConfig> existingWorkflows = new
HashSet<>(workflowConfigMap.values());
Review comment:
If we assumed soft copy was okay from above, then why are we creating a
new set? Also this is not a hard copy either is it?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +27,53 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
+ public void process(ClusterEvent event) throws Exception {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ event.addAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name(),
+ dataProvider.getWorkflowConfigMap());
+ event.addAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name(),
dataProvider.getContexts());
+
+ super.process(event);
+ }
+
+ @Override
+ public void execute(ClusterEvent event) {
+ Map<String, WorkflowConfig> workflowConfigMap =
+ event.getAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name());
+ Map<String, ZNRecord> resourceContextMap =
+ event.getAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name());
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ " HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Set<WorkflowConfig> existingWorkflows =
- new HashSet<>(dataProvider.getWorkflowConfigMap().values());
+ Set<WorkflowConfig> existingWorkflows = new
HashSet<>(workflowConfigMap.values());
for (WorkflowConfig workflowConfig : existingWorkflows) {
// clean up the expired jobs if it is a queue.
if (workflowConfig != null && (!workflowConfig.isTerminable() ||
workflowConfig
.isJobQueue())) {
- try {
- TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(),
workflowConfig,
- dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()),
manager,
- _rebalanceScheduler);
- } catch (Exception e) {
- LOG.warn(String.format("Failed to purge job for workflow %s with
reason %s",
- workflowConfig.getWorkflowId(), e.toString()));
+ String workflowId = workflowConfig.getWorkflowId();
+ if (resourceContextMap.containsKey(workflowId)
+ && resourceContextMap.get(workflowId) != null) {
Review comment:
Aren't these two checks same checks? Why are we doing the same thing
twice?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +27,53 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
+ public void process(ClusterEvent event) throws Exception {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ event.addAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name(),
+ dataProvider.getWorkflowConfigMap());
+ event.addAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name(),
dataProvider.getContexts());
+
+ super.process(event);
+ }
+
+ @Override
+ public void execute(ClusterEvent event) {
+ Map<String, WorkflowConfig> workflowConfigMap =
+ event.getAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name());
+ Map<String, ZNRecord> resourceContextMap =
+ event.getAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name());
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ " HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Set<WorkflowConfig> existingWorkflows =
- new HashSet<>(dataProvider.getWorkflowConfigMap().values());
+ Set<WorkflowConfig> existingWorkflows = new
HashSet<>(workflowConfigMap.values());
for (WorkflowConfig workflowConfig : existingWorkflows) {
// clean up the expired jobs if it is a queue.
if (workflowConfig != null && (!workflowConfig.isTerminable() ||
workflowConfig
.isJobQueue())) {
- try {
- TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(),
workflowConfig,
- dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()),
manager,
- _rebalanceScheduler);
- } catch (Exception e) {
- LOG.warn(String.format("Failed to purge job for workflow %s with
reason %s",
- workflowConfig.getWorkflowId(), e.toString()));
+ String workflowId = workflowConfig.getWorkflowId();
+ if (resourceContextMap.containsKey(workflowId)
+ && resourceContextMap.get(workflowId) != null) {
+ try {
+ TaskUtil.purgeExpiredJobs(workflowId, workflowConfig,
+ new WorkflowContext(resourceContextMap.get(workflowId)),
manager,
+ _rebalanceScheduler);
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to purge job for workflow %s with
reason %s", workflowId,
+ e.toString()));
Review comment:
Use parameterized logging.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +27,53 @@ public AsyncWorkerType getAsyncWorkerType() {
}
@Override
- public void execute(ClusterEvent event) {
+ public void process(ClusterEvent event) throws Exception {
WorkflowControllerDataProvider dataProvider =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ event.addAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name(),
+ dataProvider.getWorkflowConfigMap());
+ event.addAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name(),
dataProvider.getContexts());
+
+ super.process(event);
+ }
+
+ @Override
+ public void execute(ClusterEvent event) {
+ Map<String, WorkflowConfig> workflowConfigMap =
+ event.getAttribute(AttributeName.WORKFLOW_CONFIG_MAP.name());
+ Map<String, ZNRecord> resourceContextMap =
+ event.getAttribute(AttributeName.RESOURCE_CONTEXT_MAP.name());
HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
- if (dataProvider == null || manager == null) {
+ if (manager == null) {
LOG.warn(
- "ResourceControllerDataProvider or HelixManager is null for event
{}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ " HelixManager is null for event {}({}) in cluster {}. Skip
TaskGarbageCollectionStage.",
event.getEventId(), event.getEventType(), event.getClusterName());
return;
}
- Set<WorkflowConfig> existingWorkflows =
- new HashSet<>(dataProvider.getWorkflowConfigMap().values());
+ Set<WorkflowConfig> existingWorkflows = new
HashSet<>(workflowConfigMap.values());
for (WorkflowConfig workflowConfig : existingWorkflows) {
// clean up the expired jobs if it is a queue.
if (workflowConfig != null && (!workflowConfig.isTerminable() ||
workflowConfig
.isJobQueue())) {
- try {
- TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(),
workflowConfig,
- dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()),
manager,
- _rebalanceScheduler);
- } catch (Exception e) {
- LOG.warn(String.format("Failed to purge job for workflow %s with
reason %s",
- workflowConfig.getWorkflowId(), e.toString()));
+ String workflowId = workflowConfig.getWorkflowId();
+ if (resourceContextMap.containsKey(workflowId)
+ && resourceContextMap.get(workflowId) != null) {
+ try {
+ TaskUtil.purgeExpiredJobs(workflowId, workflowConfig,
+ new WorkflowContext(resourceContextMap.get(workflowId)),
manager,
+ _rebalanceScheduler);
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to purge job for workflow %s with
reason %s", workflowId,
+ e.toString()));
+ }
+ } else {
+ LOG.warn(String.format("Workflow %s context does not exist!",
workflowId));
Review comment:
Use parameterized logging.
##########
File path:
helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
##########
@@ -103,6 +103,73 @@ public void testWorkflowContextWithoutConfig() throws
Exception {
Assert.assertTrue(workflowContextNotCreated);
}
+ @Test
+ public void testWorkflowContextGarbageCollection() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
+ _driver.start(builder1.build());
+
+ // Wait until workflow is created and IN_PROGRESS state.
+ _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+ // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed
created for this
+ // workflow
+ Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
+ Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
+ Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME,
workflowName));
+
+ String workflowContextPath =
+ "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName +
"/Context";
+
+ ZNRecord record =
_manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+ null, AccessOption.PERSISTENT);
+ Assert.assertNotNull(record);
+
+ // Wait until workflow is completed.
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed
after workflow got
+ // expired.
+ boolean workflowExpired = TestHelper.verify(() -> {
+ WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+ WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
+ IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME,
workflowName);
+ return (wCtx == null && wCfg == null && idealState == null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(workflowExpired);
+
+
Review comment:
Redundant line
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]