This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ecfba5945a3 Add JobRunnerPipelineDataSourceCleaner (#29346)
ecfba5945a3 is described below

commit ecfba5945a393d9139adfd67a06465d48370d13d
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 10 14:15:45 2023 +0800

    Add JobRunnerPipelineDataSourceCleaner (#29346)
---
 .../data/pipeline/core/job/engine/PipelineJobRunnerManager.java   | 1 +
 .../job/engine/cleaner/JobRunnerPipelineDataSourceCleaner.java}   | 7 +++----
 .../core/job/engine/{ => cleaner}/PipelineJobRunnerCleaner.java   | 2 +-
 .../data/pipeline/cdc/engine/CDCJobRunnerCleaner.java             | 8 ++++----
 .../data/pipeline/scenario/migration/MigrationJob.java            | 4 ++--
 5 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index d3bd6879c90..0865b3e5d7a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.engine;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/JobRunnerPipelineDataSourceCleaner.java
similarity index 81%
rename from 
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/JobRunnerPipelineDataSourceCleaner.java
index 48400463c6f..af8a6ed83f2 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/JobRunnerPipelineDataSourceCleaner.java
@@ -15,16 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration.engine;
+package org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner;
 
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerCleaner;
 
 /**
- * Migration job runner cleaner.
+ * Job runner pipeline data source cleaner.
  */
-public final class MigrationJobRunnerCleaner implements 
PipelineJobRunnerCleaner {
+public final class JobRunnerPipelineDataSourceCleaner implements 
PipelineJobRunnerCleaner {
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/PipelineJobRunnerCleaner.java
similarity index 92%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/PipelineJobRunnerCleaner.java
index a0a572c2629..0a4220bdc86 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/PipelineJobRunnerCleaner.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.engine;
+package org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner;
 
 /**
  * Pipeline job runner cleaner.
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
index e02c3343438..2d2d3f12a73 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.cdc.engine;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerCleaner;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.JobRunnerPipelineDataSourceCleaner;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 /**
@@ -29,13 +29,13 @@ import 
org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 @RequiredArgsConstructor
 public final class CDCJobRunnerCleaner implements PipelineJobRunnerCleaner {
     
-    private final DefaultPipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+    private final JobRunnerPipelineDataSourceCleaner dataSourceCleaner = new 
JobRunnerPipelineDataSourceCleaner();
     
     private final PipelineSink sink;
     
     @Override
     public void clean() {
-        dataSourceManager.close();
+        dataSourceCleaner.clean();
         QuietlyCloser.close(sink);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 0e6d74270fb..31cb0cef48b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.JobRunnerPipelineDataSourceCleaner;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -46,7 +47,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.engine.MigrationJobRunnerCleaner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -77,7 +77,7 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     private final MigrationJobPreparer jobPreparer;
     
     public MigrationJob() {
-        super(new PipelineJobRunnerManager(new MigrationJobRunnerCleaner()));
+        super(new PipelineJobRunnerManager(new 
JobRunnerPipelineDataSourceCleaner()));
         jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
         processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
         dataSourceManager = new DefaultPipelineDataSourceManager();

Reply via email to