This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new fbefa6490 Change TopStateHandoffReportStage to be an Async Stage as it is the slowest stage in the pipeline. It can be run async since it does not add to ClusterEvent and simply computes and reports metrics. No following stages depend on it. (#2610) fbefa6490 is described below commit fbefa64909781d479ec486a93d22bba416322fb8 Author: Zachary Pinto <zapi...@linkedin.com> AuthorDate: Thu Aug 31 20:10:13 2023 -0700 Change TopStateHandoffReportStage to be an Async Stage as it is the slowest stage in the pipeline. It can be run async since it does not add to ClusterEvent and simply computes and reports metrics. No following stages depend on it. (#2610) In some cases TopStateHandoffReportStage is taking a very long portion of the total pipeline execution time. In order to speed up the total pipeline execution time, we will make TopStateHandoffReportStage async since it is simply computing metrics and reporting without adding to ClusterEvent. --- .../helix/controller/pipeline/AsyncWorkerType.java | 1 + .../stages/TopStateHandoffReportStage.java | 22 +++++++++++++++------- .../helix/controller/stages/BaseStageTest.java | 11 ++++++++--- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java index fcbf03f6b..a1afb95f2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java @@ -27,6 +27,7 @@ package org.apache.helix.controller.pipeline; */ public enum AsyncWorkerType { + TopStateHandoffReportWorker, TargetExternalViewCalcWorker, PersistAssignmentWorker, ExternalViewComputeWorker, diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java index 77a84a448..aec55aae5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java @@ -26,7 +26,8 @@ import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; -import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; +import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.CurrentState; import org.apache.helix.model.LiveInstance; @@ -42,20 +43,27 @@ import org.slf4j.LoggerFactory; /** * Observe top state handoff and report latency */ -public class TopStateHandoffReportStage extends AbstractBaseStage { +public class TopStateHandoffReportStage extends AbstractAsyncBaseStage { private static final long DEFAULT_HANDOFF_USER_LATENCY = 0L; private static Logger LOG = LoggerFactory.getLogger(TopStateHandoffReportStage.class); public static final long TIMESTAMP_NOT_RECORDED = -1L; @Override - public void process(ClusterEvent event) throws Exception { + public AsyncWorkerType getAsyncWorkerType() { + return AsyncWorkerType.TopStateHandoffReportWorker; + } + + @Override + public void execute(final ClusterEvent event) throws Exception { _eventId = event.getEventId(); - final BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); - final Long lastPipelineFinishTimestamp = event - .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), + final BaseControllerDataProvider cache = + event.getAttribute(AttributeName.ControllerDataProvider.name()); + final Long lastPipelineFinishTimestamp = + event.getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), TIMESTAMP_NOT_RECORDED); final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); - final CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); + final CurrentStateOutput currentStateOutput = + event.getAttribute(AttributeName.CURRENT_STATE.name()); final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java index 2d4aa0988..fe77383e0 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java @@ -31,8 +31,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.model.Message; -import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; import org.apache.helix.mock.MockHelixAdmin; @@ -44,10 +43,12 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.StateModelConfigGenerator; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.testng.ITestContext; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -209,7 +210,11 @@ public class BaseStageTest { stage.init(context); stage.preProcess(); try { - stage.process(event); + if (stage instanceof AbstractAsyncBaseStage) { + ((AbstractAsyncBaseStage) stage).execute(event); + } else { + stage.process(event); + } } catch (Exception e) { e.printStackTrace(); }