This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ab24a1a799 Refactor PipelineJobRunnerManager (#29347)
7ab24a1a799 is described below

commit 7ab24a1a799871dba8635d8d345f502936bbb90b
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 10 14:45:48 2023 +0800

    Refactor PipelineJobRunnerManager (#29347)
---
 .../core/job/engine/PipelineJobRunnerManager.java  |  7 +++++
 .../JobRunnerPipelineDataSourceCleaner.java        | 34 ----------------------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  6 +---
 .../pipeline/cdc/engine/CDCJobRunnerCleaner.java   |  4 ---
 .../pipeline/scenario/migration/MigrationJob.java  | 10 ++-----
 5 files changed, 10 insertions(+), 51 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index 0865b3e5d7a..6f51ba89525 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -17,8 +17,11 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.engine;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -54,6 +57,9 @@ public final class PipelineJobRunnerManager {
     
     private final Map<Integer, PipelineTasksRunner> tasksRunners = new 
ConcurrentHashMap<>();
     
+    @Getter
+    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+    
     private final PipelineJobRunnerCleaner cleaner;
     
     public PipelineJobRunnerManager() {
@@ -131,6 +137,7 @@ public final class PipelineJobRunnerManager {
         } finally {
             jobId.ifPresent(PipelineJobProgressPersistService::remove);
             tasksRunners.values().stream().map(each -> 
each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
+            dataSourceManager.close();
             if (null != cleaner) {
                 cleaner.clean();
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/JobRunnerPipelineDataSourceCleaner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/JobRunnerPipelineDataSourceCleaner.java
deleted file mode 100644
index af8a6ed83f2..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/cleaner/JobRunnerPipelineDataSourceCleaner.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner;
-
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-
-/**
- * Job runner pipeline data source cleaner.
- */
-public final class JobRunnerPipelineDataSourceCleaner implements 
PipelineJobRunnerCleaner {
-    
-    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-    
-    @Override
-    public void clean() {
-        dataSourceManager.close();
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index fdaed744821..eb649068faf 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -34,7 +34,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
@@ -83,8 +82,6 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService;
     
-    private final DefaultPipelineDataSourceManager dataSourceManager;
-    
     private final CDCJobPreparer jobPreparer;
     
     public CDCJob(final PipelineSink sink) {
@@ -93,7 +90,6 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
         jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
         jobItemManager = new PipelineJobItemManager<>(new 
CDCJobType().getYamlJobItemProgressSwapper());
         processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
-        dataSourceManager = new DefaultPipelineDataSourceManager();
         jobPreparer = new CDCJobPreparer();
     }
     
@@ -104,7 +100,7 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
                 
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "STREAMING"));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = 
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
-        return new CDCJobItemContext((CDCJobConfiguration) jobConfig, 
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, 
dataSourceManager, sink);
+        return new CDCJobItemContext((CDCJobConfiguration) jobConfig, 
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, 
getJobRunnerManager().getDataSourceManager(), sink);
     }
     
     private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
index 2d2d3f12a73..0589ae1fe55 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.cdc.engine;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.JobRunnerPipelineDataSourceCleaner;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
@@ -29,13 +28,10 @@ import 
org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 @RequiredArgsConstructor
 public final class CDCJobRunnerCleaner implements PipelineJobRunnerCleaner {
     
-    private final JobRunnerPipelineDataSourceCleaner dataSourceCleaner = new 
JobRunnerPipelineDataSourceCleaner();
-    
     private final PipelineSink sink;
     
     @Override
     public void clean() {
-        dataSourceCleaner.clean();
         QuietlyCloser.close(sink);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 31cb0cef48b..2f97a967a12 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -22,14 +22,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.JobRunnerPipelineDataSourceCleaner;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -71,16 +68,13 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService;
     
-    private final PipelineDataSourceManager dataSourceManager;
-    
     // Shared by all sharding items
     private final MigrationJobPreparer jobPreparer;
     
     public MigrationJob() {
-        super(new PipelineJobRunnerManager(new 
JobRunnerPipelineDataSourceCleaner()));
+        super(new PipelineJobRunnerManager());
         jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
         processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
-        dataSourceManager = new DefaultPipelineDataSourceManager();
         jobPreparer = new MigrationJobPreparer();
     }
     
@@ -93,7 +87,7 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
                 
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "MIGRATION"));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
-        return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
+        return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, 
getJobRunnerManager().getDataSourceManager());
     }
     
     private MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {

Reply via email to