This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c943558fe58 Refactor RuleAlteredContextManagerLifecycleListener for 
common usage (#20159)
c943558fe58 is described below

commit c943558fe58ce3b7d5d6c3a41801966f13f53a8e
Author: Da Xiang Huang <[email protected]>
AuthorDate: Sun Aug 14 20:41:00 2022 +0800

    Refactor RuleAlteredContextManagerLifecycleListener for common usage 
(#20159)
---
 .../PipelineContextManagerLifecycleListener.java}  | 10 ++--
 .../scenario/rulealtered/PipelineJobWorker.java    | 55 ++++++++++++++++++++++
 .../scenario/rulealtered/RuleAlteredJobWorker.java | 33 ++-----------
 ...anager.listener.ContextManagerLifecycleListener |  2 +-
 4 files changed, 64 insertions(+), 36 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/RuleAlteredContextManagerLifecycleListener.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
similarity index 85%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/RuleAlteredContextManagerLifecycleListener.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 46dd4809cee..26bdfe40989 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/RuleAlteredContextManagerLifecycleListener.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
+package org.apache.shardingsphere.data.pipeline.core.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
+import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.PipelineJobWorker;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
 
 /**
- * Rule altered context manager lifecycle listener.
+ * Pipeline context manager lifecycle listener.
  */
 @Slf4j
-public final class RuleAlteredContextManagerLifecycleListener implements 
ContextManagerLifecycleListener {
+public final class PipelineContextManagerLifecycleListener implements 
ContextManagerLifecycleListener {
     
     @Override
     public void onInitialized(final ModeConfiguration modeConfig, final 
ContextManager contextManager) {
@@ -42,6 +42,6 @@ public final class RuleAlteredContextManagerLifecycleListener 
implements Context
         PipelineContext.initModeConfig(modeConfig);
         PipelineContext.initContextManager(contextManager);
         // TODO init worker only if necessary, e.g. 1) rule altered action 
configured, 2) enabled job exists, 3) stopped job restarted
-        RuleAlteredJobWorker.initWorkerIfNecessary();
+        PipelineJobWorker.initWorkerIfNecessary();
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
new file mode 100644
index 00000000000..b4507e923f1
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
+
+/**
+ * Pipeline job worker.
+ */
+@Slf4j
+public final class PipelineJobWorker {
+    
+    private static final RuleAlteredJobWorker INSTANCE = new 
RuleAlteredJobWorker();
+    
+    private static final AtomicBoolean WORKER_INITIALIZED = new 
AtomicBoolean(false);
+    
+    /**
+     * Initialize job worker if necessary.
+     */
+    public static void initWorkerIfNecessary() {
+        if (WORKER_INITIALIZED.get()) {
+            return;
+        }
+        synchronized (WORKER_INITIALIZED) {
+            if (WORKER_INITIALIZED.get()) {
+                return;
+            }
+            log.info("start worker initialization");
+            
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().register(INSTANCE);
+            new FinishedCheckJobExecutor().start();
+            new PipelineJobExecutor().start();
+            WORKER_INITIALIZED.set(true);
+            log.info("worker initialization done");
+        }
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 330e0dd4f8d..1e617a329f0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -35,8 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import 
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
-import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
@@ -59,7 +57,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -69,13 +66,9 @@ import java.util.stream.Collectors;
 @SuppressWarnings("UnstableApiUsage")
 @Slf4j
 public final class RuleAlteredJobWorker {
-    
-    private static final RuleAlteredJobWorker INSTANCE = new 
RuleAlteredJobWorker();
-    
+
     private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE = 
new YamlRuleConfigurationSwapperEngine();
-    
-    private static final AtomicBoolean WORKER_INITIALIZED = new 
AtomicBoolean(false);
-    
+
     /**
      * Is on rule altered action enabled.
      *
@@ -89,27 +82,7 @@ public final class RuleAlteredJobWorker {
         Optional<RuleAlteredDetector> detector = 
RuleAlteredDetectorFactory.findInstance(ruleConfig);
         return detector.isPresent() && 
detector.get().getOnRuleAlteredActionConfig(ruleConfig).isPresent();
     }
-    
-    /**
-     * Initialize job worker if necessary.
-     */
-    public static void initWorkerIfNecessary() {
-        if (WORKER_INITIALIZED.get()) {
-            return;
-        }
-        synchronized (WORKER_INITIALIZED) {
-            if (WORKER_INITIALIZED.get()) {
-                return;
-            }
-            log.info("start worker initialization");
-            
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().register(INSTANCE);
-            new FinishedCheckJobExecutor().start();
-            new PipelineJobExecutor().start();
-            WORKER_INITIALIZED.set(true);
-            log.info("worker initialization done");
-        }
-    }
-    
+
     /**
      * Create rule altered context.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
index b6808bfd611..4f351a1a4a0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi.RuleAlteredContextManagerLifecycleListener
+org.apache.shardingsphere.data.pipeline.core.listener.PipelineContextManagerLifecycleListener

Reply via email to