This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new af444cc4d53 Extract pipeline meta data node change listener SPI for 
common usage (#20849)
af444cc4d53 is described below

commit af444cc4d537e839db0abb5382bfc7090740e610
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Sep 7 20:34:45 2022 +0800

    Extract pipeline meta data node change listener SPI for common usage 
(#20849)
    
    * Extract pipeline meta data node change listener SPI for common usage
    
    * Fix unmarshal
    
    * Fix pipeline not find
    
    * Fix codestyle
---
 .../pipeline/core/context/PipelineContext.java     |  15 +++
 .../pipeline/core/execute/PipelineJobExecutor.java | 106 ++++++---------------
 .../pipeline/core/execute/PipelineJobWorker.java   |   2 +-
 .../spi/handler/BarrierMetaDataChangedHandler.java |  45 +++++++++
 .../handler/JobConfigurationChangedHandler.java}   |  73 +++++---------
 .../handler/PipelineMetaDataChangedHandler.java}   |  39 ++++----
 .../PipelineMetaDataChangedHandlerFactory.java}    |  34 +++----
 ...core.spi.handler.PipelineMetaDataChangedHandler |  19 ++++
 .../PipelineMetaDataChangedHandlerFactoryTest.java |  45 +++++++++
 9 files changed, 205 insertions(+), 173 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
index 8f190512ef7..5673a99a0e0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
@@ -17,9 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.core.context;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  * Pipeline context.
  */
@@ -29,6 +33,8 @@ public final class PipelineContext {
     
     private static volatile ContextManager contextManager;
     
+    private static final ExecutorService EVENT_LISTENER_EXECUTOR = 
Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Pipeline-EventListener-%d").build());
+    
     /**
      * Get mode configuration.
      *
@@ -64,4 +70,13 @@ public final class PipelineContext {
     public static void initContextManager(final ContextManager contextManager) 
{
         PipelineContext.contextManager = contextManager;
     }
+    
+    /**
+     * Get pipeline executor.
+     *
+     * @return pipeline executor
+     */
+    public static ExecutorService getEventListenerExecutor() {
+        return EVENT_LISTENER_EXECUTOR;
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 3427ccc3d1a..b2c85740b44 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,103 +18,51 @@
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandlerFactory;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
 
 /**
  * Pipeline job executor.
  */
 @Slf4j
-public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
+public final class PipelineJobExecutor {
     
-    private final ExecutorService executor = Executors.newFixedThreadPool(20);
+    private static final PipelineJobExecutor INSTANCE = new 
PipelineJobExecutor();
     
-    @Override
-    protected void doStart() {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
 event -> {
-            if 
(PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() && 
event.getType() == Type.ADDED) {
-                
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
-            }
-            getJobConfigPOJO(event).ifPresent(optional -> processEvent(event, 
optional));
-        });
-    }
+    private final Map<Pattern, PipelineMetaDataChangedHandler> listenerMap = 
new ConcurrentHashMap<>();
     
-    private Optional<JobConfigurationPOJO> getJobConfigPOJO(final 
DataChangedEvent event) {
-        try {
-            if 
(PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) {
-                log.info("{} job config: {}", event.getType(), event.getKey());
-                return Optional.of(YamlEngine.unmarshal(event.getValue(), 
JobConfigurationPOJO.class, true));
-            }
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error("analyze job config pojo failed.", ex);
+    private PipelineJobExecutor() {
+        Collection<PipelineMetaDataChangedHandler> instances = 
PipelineMetaDataChangedHandlerFactory.findAllInstances();
+        for (PipelineMetaDataChangedHandler each : instances) {
+            listenerMap.put(each.getKeyPattern(), each);
         }
-        return Optional.empty();
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
 this::dispatchEvent);
     }
     
-    private void processEvent(final DataChangedEvent event, final 
JobConfigurationPOJO jobConfigPOJO) {
-        boolean isDeleted = DataChangedEvent.Type.DELETED == event.getType();
-        boolean isDisabled = jobConfigPOJO.isDisabled();
-        if (isDeleted || isDisabled) {
-            String jobId = jobConfigPOJO.getJobName();
-            log.info("jobId={}, deleted={}, disabled={}", jobId, isDeleted, 
isDisabled);
-            // TODO refactor: dispatch to different job types
-            MigrationJobConfiguration jobConfig = 
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
-            if (isDeleted) {
-                new MigrationJobPreparer().cleanup(jobConfig);
-            } else if 
(PipelineJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), 
MigrationJobAPIFactory.getInstance().getJobProgress(jobConfig).values())) {
-                log.info("isJobSuccessful=true");
-                new MigrationJobPreparer().cleanup(jobConfig);
+    private void dispatchEvent(final DataChangedEvent event) {
+        for (Entry<Pattern, PipelineMetaDataChangedHandler> entry : 
listenerMap.entrySet()) {
+            if (entry.getKey().matcher(event.getKey()).matches()) {
+                entry.getValue().handle(event);
+                return;
             }
-            PipelineJobCenter.stop(jobId);
-            return;
         }
-        switch (event.getType()) {
-            case ADDED:
-            case UPDATED:
-                if 
(PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
-                    log.info("{} added to executing jobs failed since it 
already exists", jobConfigPOJO.getJobName());
-                } else {
-                    log.info("{} executing jobs", jobConfigPOJO.getJobName());
-                    executor.execute(() -> execute(jobConfigPOJO));
-                }
-                break;
-            default:
-                break;
-        }
-    }
-    
-    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
-        MigrationJob job = new MigrationJob();
-        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
-        OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
-        oneOffJobBootstrap.execute();
-        job.setOneOffJobBootstrap(oneOffJobBootstrap);
     }
     
-    @Override
-    protected void doStop() {
-        executor.shutdown();
-        executor.shutdownNow();
+    /**
+     * Get pipeline job executor instance.
+     *
+     * @return pipeline job executor
+     */
+    public static PipelineJobExecutor getInstance() {
+        return INSTANCE;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
index 767ad759881..a9d1f31a6c5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
@@ -41,7 +41,7 @@ public final class PipelineJobWorker {
                 return;
             }
             log.info("start worker initialization");
-            new PipelineJobExecutor().start();
+            PipelineJobExecutor.getInstance();
             WORKER_INITIALIZED.set(true);
             log.info("worker initialization done");
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
new file mode 100644
index 00000000000..676baa8efa4
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/BarrierMetaDataChangedHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.regex.Pattern;
+
+/**
+ * Barrier pipeline meta data handler, .
+ */
+@Slf4j
+public final class BarrierMetaDataChangedHandler implements 
PipelineMetaDataChangedHandler {
+    
+    @Override
+    public Pattern getKeyPattern() {
+        return PipelineMetaDataNode.BARRIER_PATTERN;
+    }
+    
+    @Override
+    public void handle(final DataChangedEvent event) {
+        if (event.getType() == Type.ADDED) {
+            
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
+        }
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
similarity index 51%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
index 3427ccc3d1a..f2efe9c3bb0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
@@ -15,77 +15,50 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.execute;
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
 
 /**
- * Pipeline job executor.
+ * Migration pipeline meta data handler.
  */
 @Slf4j
-public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
-    
-    private final ExecutorService executor = Executors.newFixedThreadPool(20);
+public final class JobConfigurationChangedHandler implements 
PipelineMetaDataChangedHandler {
     
     @Override
-    protected void doStart() {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
 event -> {
-            if 
(PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() && 
event.getType() == Type.ADDED) {
-                
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
-            }
-            getJobConfigPOJO(event).ifPresent(optional -> processEvent(event, 
optional));
-        });
+    public Pattern getKeyPattern() {
+        return PipelineMetaDataNode.CONFIG_PATTERN;
     }
     
-    private Optional<JobConfigurationPOJO> getJobConfigPOJO(final 
DataChangedEvent event) {
+    @Override
+    public void handle(final DataChangedEvent event) {
+        log.info("{} job config: {}", event.getType(), event.getKey());
+        JobConfigurationPOJO jobConfigPOJO;
         try {
-            if 
(PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) {
-                log.info("{} job config: {}", event.getType(), event.getKey());
-                return Optional.of(YamlEngine.unmarshal(event.getValue(), 
JobConfigurationPOJO.class, true));
-            }
+            jobConfigPOJO = YamlEngine.unmarshal(event.getValue(), 
JobConfigurationPOJO.class, true);
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
             log.error("analyze job config pojo failed.", ex);
+            return;
         }
-        return Optional.empty();
-    }
-    
-    private void processEvent(final DataChangedEvent event, final 
JobConfigurationPOJO jobConfigPOJO) {
-        boolean isDeleted = DataChangedEvent.Type.DELETED == event.getType();
-        boolean isDisabled = jobConfigPOJO.isDisabled();
-        if (isDeleted || isDisabled) {
-            String jobId = jobConfigPOJO.getJobName();
-            log.info("jobId={}, deleted={}, disabled={}", jobId, isDeleted, 
isDisabled);
-            // TODO refactor: dispatch to different job types
-            MigrationJobConfiguration jobConfig = 
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
-            if (isDeleted) {
-                new MigrationJobPreparer().cleanup(jobConfig);
-            } else if 
(PipelineJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), 
MigrationJobAPIFactory.getInstance().getJobProgress(jobConfig).values())) {
-                log.info("isJobSuccessful=true");
-                new MigrationJobPreparer().cleanup(jobConfig);
-            }
+        String jobId = jobConfigPOJO.getJobName();
+        if (jobConfigPOJO.isDisabled()) {
             PipelineJobCenter.stop(jobId);
             return;
         }
@@ -96,9 +69,15 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
                     log.info("{} added to executing jobs failed since it 
already exists", jobConfigPOJO.getJobName());
                 } else {
                     log.info("{} executing jobs", jobConfigPOJO.getJobName());
-                    executor.execute(() -> execute(jobConfigPOJO));
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), 
PipelineContext.getEventListenerExecutor());
                 }
                 break;
+            case DELETED:
+                log.info("deleted jobId={}", jobId);
+                MigrationJobConfiguration jobConfig = 
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+                new MigrationJobPreparer().cleanup(jobConfig);
+                PipelineJobCenter.stop(jobConfigPOJO.getJobName());
+                break;
             default:
                 break;
         }
@@ -111,10 +90,4 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
         oneOffJobBootstrap.execute();
         job.setOneOffJobBootstrap(oneOffJobBootstrap);
     }
-    
-    @Override
-    protected void doStop() {
-        executor.shutdown();
-        executor.shutdownNow();
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
similarity index 52%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
index 767ad759881..9b963e76a82 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandler.java
@@ -15,35 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.execute;
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
 /**
- * Pipeline job worker.
+ * Pipeline meta data changed handler.
  */
-@Slf4j
-public final class PipelineJobWorker {
+@SingletonSPI
+public interface PipelineMetaDataChangedHandler {
     
-    private static final AtomicBoolean WORKER_INITIALIZED = new 
AtomicBoolean(false);
+    /**
+     * Get key pattern.
+     *
+     * @return key pattern
+     */
+    Pattern getKeyPattern();
     
     /**
-     * Initialize job worker.
+     * Event changed handler.
+     *
+     * @param event changed event
      */
-    public static void initialize() {
-        if (WORKER_INITIALIZED.get()) {
-            return;
-        }
-        synchronized (WORKER_INITIALIZED) {
-            if (WORKER_INITIALIZED.get()) {
-                return;
-            }
-            log.info("start worker initialization");
-            new PipelineJobExecutor().start();
-            WORKER_INITIALIZED.set(true);
-            log.info("worker initialization done");
-        }
-    }
+    void handle(DataChangedEvent event);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
similarity index 52%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
index 767ad759881..0d746d5db8c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactory.java
@@ -15,35 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.execute;
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
 
 /**
- * Pipeline job worker.
+ * Pipeline meta data listener factory.
  */
-@Slf4j
-public final class PipelineJobWorker {
+public final class PipelineMetaDataChangedHandlerFactory {
     
-    private static final AtomicBoolean WORKER_INITIALIZED = new 
AtomicBoolean(false);
+    static {
+        
ShardingSphereServiceLoader.register(PipelineMetaDataChangedHandler.class);
+    }
     
     /**
-     * Initialize job worker.
+     * Get pipeline meta data listener instance.
+     *
+     * @return pipeline meta data listener
      */
-    public static void initialize() {
-        if (WORKER_INITIALIZED.get()) {
-            return;
-        }
-        synchronized (WORKER_INITIALIZED) {
-            if (WORKER_INITIALIZED.get()) {
-                return;
-            }
-            log.info("start worker initialization");
-            new PipelineJobExecutor().start();
-            WORKER_INITIALIZED.set(true);
-            log.info("worker initialization done");
-        }
+    public static Collection<PipelineMetaDataChangedHandler> 
findAllInstances() {
+        return 
ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedHandler.class);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
new file mode 100644
index 00000000000..2d7ae078cf4
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.handler.PipelineMetaDataChangedHandler
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.spi.handler.JobConfigurationChangedHandler
+org.apache.shardingsphere.data.pipeline.core.spi.handler.BarrierMetaDataChangedHandler
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
new file mode 100644
index 00000000000..d9a449bdb2b
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/PipelineMetaDataChangedHandlerFactoryTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertTrue;
+
+public final class PipelineMetaDataChangedHandlerFactoryTest {
+    
+    @Test
+    public void assertFindInstance() {
+        Collection<PipelineMetaDataChangedHandler> actual = 
PipelineMetaDataChangedHandlerFactory.findAllInstances();
+        boolean isContainMigration = false;
+        boolean isContainBarrier = false;
+        for (PipelineMetaDataChangedHandler each : actual) {
+            if (each instanceof JobConfigurationChangedHandler) {
+                isContainMigration = true;
+                continue;
+            }
+            if (each instanceof BarrierMetaDataChangedHandler) {
+                isContainBarrier = true;
+            }
+        }
+        assertTrue(isContainMigration);
+        assertTrue(isContainBarrier);
+    }
+}

Reply via email to