This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fbefa6490 Change TopStateHandoffReportStage to be an Async Stage as it 
is the slowest stage in the pipeline. It can be run async since it does not add 
to ClusterEvent and simply computes and reports metrics. No following stages 
depend on it. (#2610)
fbefa6490 is described below

commit fbefa64909781d479ec486a93d22bba416322fb8
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Thu Aug 31 20:10:13 2023 -0700

    Change TopStateHandoffReportStage to be an Async Stage as it is the slowest 
stage in the pipeline. It can be run async since it does not add to 
ClusterEvent and simply computes and reports metrics. No following stages 
depend on it. (#2610)
    
    In some cases TopStateHandoffReportStage is taking a very long portion of 
the total pipeline execution time. In order to speed up the total pipeline 
execution time, we will make TopStateHandoffReportStage async since it is 
simply computing metrics and reporting without adding to ClusterEvent.
---
 .../helix/controller/pipeline/AsyncWorkerType.java |  1 +
 .../stages/TopStateHandoffReportStage.java         | 22 +++++++++++++++-------
 .../helix/controller/stages/BaseStageTest.java     | 11 ++++++++---
 3 files changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index fcbf03f6b..a1afb95f2 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -27,6 +27,7 @@ package org.apache.helix.controller.pipeline;
  */
 
 public enum AsyncWorkerType {
+  TopStateHandoffReportWorker,
   TargetExternalViewCalcWorker,
   PersistAssignmentWorker,
   ExternalViewComputeWorker,
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
index 77a84a448..aec55aae5 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
@@ -26,7 +26,8 @@ import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import 
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.LiveInstance;
@@ -42,20 +43,27 @@ import org.slf4j.LoggerFactory;
 /**
  * Observe top state handoff and report latency
  */
-public class TopStateHandoffReportStage extends AbstractBaseStage {
+public class TopStateHandoffReportStage extends AbstractAsyncBaseStage {
   private static final long DEFAULT_HANDOFF_USER_LATENCY = 0L;
   private static Logger LOG = 
LoggerFactory.getLogger(TopStateHandoffReportStage.class);
   public static final long TIMESTAMP_NOT_RECORDED = -1L;
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.TopStateHandoffReportWorker;
+  }
+
+  @Override
+  public void execute(final ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
-    final BaseControllerDataProvider cache = 
event.getAttribute(AttributeName.ControllerDataProvider.name());
-    final Long lastPipelineFinishTimestamp = event
-        
.getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
+    final BaseControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    final Long lastPipelineFinishTimestamp =
+        
event.getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
             TIMESTAMP_NOT_RECORDED);
     final Map<String, Resource> resourceMap = 
event.getAttribute(AttributeName.RESOURCES.name());
-    final CurrentStateOutput currentStateOutput = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
+    final CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
     final ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 2d4aa0988..fe77383e0 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -31,8 +31,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.Message;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.mock.MockHelixAdmin;
@@ -44,10 +43,12 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.ITestContext;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -209,7 +210,11 @@ public class BaseStageTest {
     stage.init(context);
     stage.preProcess();
     try {
-      stage.process(event);
+      if (stage instanceof AbstractAsyncBaseStage) {
+        ((AbstractAsyncBaseStage) stage).execute(event);
+      } else {
+        stage.process(event);
+      }
     } catch (Exception e) {
       e.printStackTrace();
     }

Reply via email to