This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new af444cc4d53 Extract pipeline meta data node change listener SPI for
common usage (#20849)
af444cc4d53 is described below
commit af444cc4d537e839db0abb5382bfc7090740e610
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Sep 7 20:34:45 2022 +0800
Extract pipeline meta data node change listener SPI for common usage
(#20849)
* Extract pipeline meta data node change listener SPI for common usage
* Fix unmarshal
* Fix pipeline not find
* Fix codestyle
---
.../pipeline/core/context/PipelineContext.java | 15 +++
.../pipeline/core/execute/PipelineJobExecutor.java | 106 ++++++---------------
.../pipeline/core/execute/PipelineJobWorker.java | 2 +-
.../spi/handler/BarrierMetaDataChangedHandler.java | 45 +++++++++
.../handler/JobConfigurationChangedHandler.java} | 73 +++++---------
.../handler/PipelineMetaDataChangedHandler.java} | 39 ++++----
.../PipelineMetaDataChangedHandlerFactory.java} | 34 +++----
...core.spi.handler.PipelineMetaDataChangedHandler | 19 ++++
.../PipelineMetaDataChangedHandlerFactoryTest.java | 45 +++++++++
9 files changed, 205 insertions(+), 173 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
index 8f190512ef7..5673a99a0e0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
@@ -17,9 +17,13 @@
package org.apache.shardingsphere.data.pipeline.core.context;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* Pipeline context.
*/
@@ -29,6 +33,8 @@ public final class PipelineContext {
private static volatile ContextManager contextManager;
+ private static final ExecutorService EVENT_LISTENER_EXECUTOR =
Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Pipeline-EventListener-%d").build());
+
/**
* Get mode configuration.
*
@@ -64,4 +70,13 @@ public final class PipelineContext {
public static void initContextManager(final ContextManager contextManager)
{
PipelineContext.contextManager = contextManager;
}
+
+ /**
+ * Get pipeline executor.
+ *
+ * @return pipeline executor
+ */
+ public static ExecutorService getEventListenerExecutor() {
+ return EVENT_LISTENER_EXECUTOR;
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 3427ccc3d1a..b2c85740b44 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,103 +18,51 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler;
+import
org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandlerFactory;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
/**
* Pipeline job executor.
*/
@Slf4j
-public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
+public final class PipelineJobExecutor {
- private final ExecutorService executor = Executors.newFixedThreadPool(20);
+ private static final PipelineJobExecutor INSTANCE = new
PipelineJobExecutor();
- @Override
- protected void doStart() {
-
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
event -> {
- if
(PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() &&
event.getType() == Type.ADDED) {
-
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
- }
- getJobConfigPOJO(event).ifPresent(optional -> processEvent(event,
optional));
- });
- }
+ private final Map<Pattern, PipelineMetaDataChangedHandler> listenerMap =
new ConcurrentHashMap<>();
- private Optional<JobConfigurationPOJO> getJobConfigPOJO(final
DataChangedEvent event) {
- try {
- if
(PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) {
- log.info("{} job config: {}", event.getType(), event.getKey());
- return Optional.of(YamlEngine.unmarshal(event.getValue(),
JobConfigurationPOJO.class, true));
- }
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("analyze job config pojo failed.", ex);
+ private PipelineJobExecutor() {
+ Collection<PipelineMetaDataChangedHandler> instances =
PipelineMetaDataChangedHandlerFactory.findAllInstances();
+ for (PipelineMetaDataChangedHandler each : instances) {
+ listenerMap.put(each.getKeyPattern(), each);
}
- return Optional.empty();
+
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
this::dispatchEvent);
}
- private void processEvent(final DataChangedEvent event, final
JobConfigurationPOJO jobConfigPOJO) {
- boolean isDeleted = DataChangedEvent.Type.DELETED == event.getType();
- boolean isDisabled = jobConfigPOJO.isDisabled();
- if (isDeleted || isDisabled) {
- String jobId = jobConfigPOJO.getJobName();
- log.info("jobId={}, deleted={}, disabled={}", jobId, isDeleted,
isDisabled);
- // TODO refactor: dispatch to different job types
- MigrationJobConfiguration jobConfig =
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
- if (isDeleted) {
- new MigrationJobPreparer().cleanup(jobConfig);
- } else if
(PipelineJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(),
MigrationJobAPIFactory.getInstance().getJobProgress(jobConfig).values())) {
- log.info("isJobSuccessful=true");
- new MigrationJobPreparer().cleanup(jobConfig);
+ private void dispatchEvent(final DataChangedEvent event) {
+ for (Entry<Pattern, PipelineMetaDataChangedHandler> entry :
listenerMap.entrySet()) {
+ if (entry.getKey().matcher(event.getKey()).matches()) {
+ entry.getValue().handle(event);
+ return;
}
- PipelineJobCenter.stop(jobId);
- return;
}
- switch (event.getType()) {
- case ADDED:
- case UPDATED:
- if
(PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
- log.info("{} added to executing jobs failed since it
already exists", jobConfigPOJO.getJobName());
- } else {
- log.info("{} executing jobs", jobConfigPOJO.getJobName());
- executor.execute(() -> execute(jobConfigPOJO));
- }
- break;
- default:
- break;
- }
- }
-
- private void execute(final JobConfigurationPOJO jobConfigPOJO) {
- MigrationJob job = new MigrationJob();
- PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
- OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
- oneOffJobBootstrap.execute();
- job.setOneOffJobBootstrap(oneOffJobBootstrap);
}
- @Override
- protected void doStop() {
- executor.shutdown();
- executor.shutdownNow();
+ /**
+ * Get pipeline job executor instance.
+ *
+ * @return pipeline job executor
+ */
+ public static PipelineJobExecutor getInstance() {
+ return INSTANCE;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
index 767ad759881..a9d1f31a6c5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
@@ -41,7 +41,7 @@ public final class PipelineJobWorker {
return;
}
log.info("start worker initialization");
- new PipelineJobExecutor().start();
+ PipelineJobExecutor.getInstance();
WORKER_INITIALIZED.set(true);
log.info("worker initialization done");
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
new file mode 100644
index 00000000000..676baa8efa4
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.regex.Pattern;
+
+/**
+ * Barrier pipeline meta data handler, .
+ */
+@Slf4j
+public final class BarrierMetaDataChangedHandler implements
PipelineMetaDataChangedHandler {
+
+ @Override
+ public Pattern getKeyPattern() {
+ return PipelineMetaDataNode.BARRIER_PATTERN;
+ }
+
+ @Override
+ public void handle(final DataChangedEvent event) {
+ if (event.getType() == Type.ADDED) {
+
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
+ }
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
similarity index 51%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
index 3427ccc3d1a..f2efe9c3bb0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
@@ -15,77 +15,50 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.execute;
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
/**
- * Pipeline job executor.
+ * Migration pipeline meta data handler.
*/
@Slf4j
-public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
-
- private final ExecutorService executor = Executors.newFixedThreadPool(20);
+public final class JobConfigurationChangedHandler implements
PipelineMetaDataChangedHandler {
@Override
- protected void doStart() {
-
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
event -> {
- if
(PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() &&
event.getType() == Type.ADDED) {
-
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
- }
- getJobConfigPOJO(event).ifPresent(optional -> processEvent(event,
optional));
- });
+ public Pattern getKeyPattern() {
+ return PipelineMetaDataNode.CONFIG_PATTERN;
}
- private Optional<JobConfigurationPOJO> getJobConfigPOJO(final
DataChangedEvent event) {
+ @Override
+ public void handle(final DataChangedEvent event) {
+ log.info("{} job config: {}", event.getType(), event.getKey());
+ JobConfigurationPOJO jobConfigPOJO;
try {
- if
(PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) {
- log.info("{} job config: {}", event.getType(), event.getKey());
- return Optional.of(YamlEngine.unmarshal(event.getValue(),
JobConfigurationPOJO.class, true));
- }
+ jobConfigPOJO = YamlEngine.unmarshal(event.getValue(),
JobConfigurationPOJO.class, true);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("analyze job config pojo failed.", ex);
+ return;
}
- return Optional.empty();
- }
-
- private void processEvent(final DataChangedEvent event, final
JobConfigurationPOJO jobConfigPOJO) {
- boolean isDeleted = DataChangedEvent.Type.DELETED == event.getType();
- boolean isDisabled = jobConfigPOJO.isDisabled();
- if (isDeleted || isDisabled) {
- String jobId = jobConfigPOJO.getJobName();
- log.info("jobId={}, deleted={}, disabled={}", jobId, isDeleted,
isDisabled);
- // TODO refactor: dispatch to different job types
- MigrationJobConfiguration jobConfig =
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
- if (isDeleted) {
- new MigrationJobPreparer().cleanup(jobConfig);
- } else if
(PipelineJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(),
MigrationJobAPIFactory.getInstance().getJobProgress(jobConfig).values())) {
- log.info("isJobSuccessful=true");
- new MigrationJobPreparer().cleanup(jobConfig);
- }
+ String jobId = jobConfigPOJO.getJobName();
+ if (jobConfigPOJO.isDisabled()) {
PipelineJobCenter.stop(jobId);
return;
}
@@ -96,9 +69,15 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
log.info("{} added to executing jobs failed since it
already exists", jobConfigPOJO.getJobName());
} else {
log.info("{} executing jobs", jobConfigPOJO.getJobName());
- executor.execute(() -> execute(jobConfigPOJO));
+ CompletableFuture.runAsync(() -> execute(jobConfigPOJO),
PipelineContext.getEventListenerExecutor());
}
break;
+ case DELETED:
+ log.info("deleted jobId={}", jobId);
+ MigrationJobConfiguration jobConfig =
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+ new MigrationJobPreparer().cleanup(jobConfig);
+ PipelineJobCenter.stop(jobConfigPOJO.getJobName());
+ break;
default:
break;
}
@@ -111,10 +90,4 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
oneOffJobBootstrap.execute();
job.setOneOffJobBootstrap(oneOffJobBootstrap);
}
-
- @Override
- protected void doStop() {
- executor.shutdown();
- executor.shutdownNow();
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
similarity index 52%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
index 767ad759881..9b963e76a82 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
@@ -15,35 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.execute;
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
/**
- * Pipeline job worker.
+ * Pipeline meta data changed handler.
*/
-@Slf4j
-public final class PipelineJobWorker {
+@SingletonSPI
+public interface PipelineMetaDataChangedHandler {
- private static final AtomicBoolean WORKER_INITIALIZED = new
AtomicBoolean(false);
+ /**
+ * Get key pattern.
+ *
+ * @return key pattern
+ */
+ Pattern getKeyPattern();
/**
- * Initialize job worker.
+ * Event changed handler.
+ *
+ * @param event changed event
*/
- public static void initialize() {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- synchronized (WORKER_INITIALIZED) {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- log.info("start worker initialization");
- new PipelineJobExecutor().start();
- WORKER_INITIALIZED.set(true);
- log.info("worker initialization done");
- }
- }
+ void handle(DataChangedEvent event);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
similarity index 52%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
index 767ad759881..0d746d5db8c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
@@ -15,35 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.execute;
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
/**
- * Pipeline job worker.
+ * Pipeline meta data listener factory.
*/
-@Slf4j
-public final class PipelineJobWorker {
+public final class PipelineMetaDataChangedHandlerFactory {
- private static final AtomicBoolean WORKER_INITIALIZED = new
AtomicBoolean(false);
+ static {
+
ShardingSphereServiceLoader.register(PipelineMetaDataChangedHandler.class);
+ }
/**
- * Initialize job worker.
+ * Get pipeline meta data listener instance.
+ *
+ * @return pipeline meta data listener
*/
- public static void initialize() {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- synchronized (WORKER_INITIALIZED) {
- if (WORKER_INITIALIZED.get()) {
- return;
- }
- log.info("start worker initialization");
- new PipelineJobExecutor().start();
- WORKER_INITIALIZED.set(true);
- log.info("worker initialization done");
- }
+ public static Collection<PipelineMetaDataChangedHandler>
findAllInstances() {
+ return
ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedHandler.class);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
new file mode 100644
index 00000000000..2d7ae078cf4
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.spi.handler.JobConfigurationChangedHandler
+org.apache.shardingsphere.data.pipeline.core.spi.handler.BarrierMetaDataChangedHandler
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
new file mode 100644
index 00000000000..d9a449bdb2b
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertTrue;
+
+public final class PipelineMetaDataChangedHandlerFactoryTest {
+
+ @Test
+ public void assertFindInstance() {
+ Collection<PipelineMetaDataChangedHandler> actual =
PipelineMetaDataChangedHandlerFactory.findAllInstances();
+ boolean isContainMigration = false;
+ boolean isContainBarrier = false;
+ for (PipelineMetaDataChangedHandler each : actual) {
+ if (each instanceof JobConfigurationChangedHandler) {
+ isContainMigration = true;
+ continue;
+ }
+ if (each instanceof BarrierMetaDataChangedHandler) {
+ isContainBarrier = true;
+ }
+ }
+ assertTrue(isContainMigration);
+ assertTrue(isContainBarrier);
+ }
+}