This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c943558fe58 Refactor RuleAlteredContextManagerLifecycleListener for
common usage (#20159)
c943558fe58 is described below
commit c943558fe58ce3b7d5d6c3a41801966f13f53a8e
Author: Da Xiang Huang <[email protected]>
AuthorDate: Sun Aug 14 20:41:00 2022 +0800
Refactor RuleAlteredContextManagerLifecycleListener for common usage
(#20159)
---
.../PipelineContextManagerLifecycleListener.java} | 10 ++--
.../scenario/rulealtered/PipelineJobWorker.java | 55 ++++++++++++++++++++++
.../scenario/rulealtered/RuleAlteredJobWorker.java | 33 ++-----------
...anager.listener.ContextManagerLifecycleListener | 2 +-
4 files changed, 64 insertions(+), 36 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/RuleAlteredContextManagerLifecycleListener.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
similarity index 85%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/RuleAlteredContextManagerLifecycleListener.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 46dd4809cee..26bdfe40989 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/RuleAlteredContextManagerLifecycleListener.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
+package org.apache.shardingsphere.data.pipeline.core.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
+import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.PipelineJobWorker;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
/**
- * Rule altered context manager lifecycle listener.
+ * Pipeline context manager lifecycle listener.
*/
@Slf4j
-public final class RuleAlteredContextManagerLifecycleListener implements
ContextManagerLifecycleListener {
+public final class PipelineContextManagerLifecycleListener implements
ContextManagerLifecycleListener {
@Override
public void onInitialized(final ModeConfiguration modeConfig, final
ContextManager contextManager) {
@@ -42,6 +42,6 @@ public final class RuleAlteredContextManagerLifecycleListener
implements Context
PipelineContext.initModeConfig(modeConfig);
PipelineContext.initContextManager(contextManager);
// TODO init worker only if necessary, e.g. 1) rule altered action
configured, 2) enabled job exists, 3) stopped job restarted
- RuleAlteredJobWorker.initWorkerIfNecessary();
+ PipelineJobWorker.initWorkerIfNecessary();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
new file mode 100644
index 00000000000..b4507e923f1
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
+import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
+
+/**
+ * Pipeline job worker.
+ */
+@Slf4j
+public final class PipelineJobWorker {
+
+ private static final RuleAlteredJobWorker INSTANCE = new
RuleAlteredJobWorker();
+
+ private static final AtomicBoolean WORKER_INITIALIZED = new
AtomicBoolean(false);
+
+ /**
+ * Initialize job worker if necessary.
+ */
+ public static void initWorkerIfNecessary() {
+ if (WORKER_INITIALIZED.get()) {
+ return;
+ }
+ synchronized (WORKER_INITIALIZED) {
+ if (WORKER_INITIALIZED.get()) {
+ return;
+ }
+ log.info("start worker initialization");
+
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().register(INSTANCE);
+ new FinishedCheckJobExecutor().start();
+ new PipelineJobExecutor().start();
+ WORKER_INITIALIZED.set(true);
+ log.info("worker initialization done");
+ }
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 330e0dd4f8d..1e617a329f0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -35,8 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
-import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
@@ -59,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -69,13 +66,9 @@ import java.util.stream.Collectors;
@SuppressWarnings("UnstableApiUsage")
@Slf4j
public final class RuleAlteredJobWorker {
-
- private static final RuleAlteredJobWorker INSTANCE = new
RuleAlteredJobWorker();
-
+
private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE =
new YamlRuleConfigurationSwapperEngine();
-
- private static final AtomicBoolean WORKER_INITIALIZED = new
AtomicBoolean(false);
-
+
/**
* Is on rule altered action enabled.
*
@@ -89,27 +82,7 @@ public final class RuleAlteredJobWorker {
Optional<RuleAlteredDetector> detector =
RuleAlteredDetectorFactory.findInstance(ruleConfig);
return detector.isPresent() &&
detector.get().getOnRuleAlteredActionConfig(ruleConfig).isPresent();
}
-
- /**
- * Initialize job worker if necessary.
- */
- public static void initWorkerIfNecessary() {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- synchronized (WORKER_INITIALIZED) {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- log.info("start worker initialization");
-
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().register(INSTANCE);
- new FinishedCheckJobExecutor().start();
- new PipelineJobExecutor().start();
- WORKER_INITIALIZED.set(true);
- log.info("worker initialization done");
- }
- }
-
+
/**
* Create rule altered context.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
index b6808bfd611..4f351a1a4a0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi.RuleAlteredContextManagerLifecycleListener
+org.apache.shardingsphere.data.pipeline.core.listener.PipelineContextManagerLifecycleListener