This is an automated email from the ASF dual-hosted git repository.

sandynz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e73d04086ff pipeline: refresh migration pool sizes on restart (#38734)
e73d04086ff is described below

commit e73d04086ff81ddc30de02fe9420faa20c055136
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed May 27 21:44:04 2026 +0800

    pipeline: refresh migration pool sizes on restart (#38734)
---
 .../util/PipelineDataSourceConfigurationUtils.java | 149 ++++++++++++---
 .../PipelineDataSourceConfigurationUtilsTest.java  | 170 +++++++++++++++++
 .../migration/MigrationJobExecutorCallback.java    |  25 +++
 .../MigrationDataConsistencyChecker.java           |  21 +-
 .../MigrationJobExecutorCallbackTest.java          | 212 +++++++++++++++++++++
 5 files changed, 551 insertions(+), 26 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
index e5e679e9e00..bcb4ccc070a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
@@ -20,14 +20,16 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 
 /**
  * Utility class for pipeline data source configuration.
@@ -41,35 +43,132 @@ public final class PipelineDataSourceConfigurationUtils {
      *
      * @param jobId job id
      * @param pipelineDataSourceConfig the pipeline data source configuration 
to transform
+     * @param storageUnits current storage units
+     * @return transformed pipeline data source configuration
      */
-    public static void transformPipelineDataSourceConfiguration(final String 
jobId, final ShardingSpherePipelineDataSourceConfiguration 
pipelineDataSourceConfig) {
-        YamlRootConfiguration rootConfig = 
pipelineDataSourceConfig.getRootConfig();
-        ShardingSphereDatabase database;
-        try {
-            database = 
PipelineContextManager.getProxyContext().getDatabase(rootConfig.getDatabaseName());
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ignored) {
-            // CHECKSTYLE:ON
-            return;
+    public static PipelineDataSourceConfiguration 
transformPipelineDataSourceConfiguration(final String jobId, final 
PipelineDataSourceConfiguration pipelineDataSourceConfig,
+                                                                               
            final Map<String, StorageUnit> storageUnits) {
+        if (null == storageUnits || storageUnits.isEmpty()) {
+            return pipelineDataSourceConfig;
         }
-        Map<String, StorageUnit> storageUnitMap = 
database.getResourceMetaData().getStorageUnits();
-        for (Entry<String, Map<String, Object>> entry : 
rootConfig.getDataSources().entrySet()) {
-            StorageUnit storageUnit = storageUnitMap.get(entry.getKey());
+        if (pipelineDataSourceConfig instanceof 
ShardingSpherePipelineDataSourceConfiguration) {
+            return transformPipelineDataSourceConfiguration(jobId, 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, 
storageUnits);
+        }
+        return pipelineDataSourceConfig;
+    }
+    
+    /**
+     * Transform the given pipeline data source properties.
+     *
+     * @param jobId job id
+     * @param dataSourceName data source name
+     * @param pipelineDataSourceConfig the pipeline data source configuration 
to transform
+     * @param storageUnits current storage units
+     * @return transformed pipeline data source configuration
+     */
+    public static PipelineDataSourceConfiguration 
transformPipelineDataSourceConfiguration(final String jobId, final String 
dataSourceName,
+                                                                               
            final PipelineDataSourceConfiguration pipelineDataSourceConfig,
+                                                                               
            final Map<String, StorageUnit> storageUnits) {
+        if (null == storageUnits || storageUnits.isEmpty()) {
+            return pipelineDataSourceConfig;
+        }
+        if (pipelineDataSourceConfig instanceof 
StandardPipelineDataSourceConfiguration) {
+            StorageUnit storageUnit = storageUnits.get(dataSourceName);
+            return null == storageUnit ? pipelineDataSourceConfig
+                    : transformStandardPipelineDataSourceConfiguration(jobId, 
dataSourceName, (StandardPipelineDataSourceConfiguration) 
pipelineDataSourceConfig, storageUnit);
+        }
+        return transformPipelineDataSourceConfiguration(jobId, 
pipelineDataSourceConfig, storageUnits);
+    }
+    
+    /**
+     * Transform the given pipeline data source properties.
+     *
+     * @param jobId job id
+     * @param dataSourceName data source name
+     * @param pipelineDataSourceConfig the pipeline data source configuration 
to transform
+     * @param dataSourcePoolProps current data source pool properties
+     * @return transformed pipeline data source configuration
+     */
+    public static PipelineDataSourceConfiguration 
transformPipelineDataSourceConfiguration(final String jobId, final String 
dataSourceName,
+                                                                               
            final PipelineDataSourceConfiguration pipelineDataSourceConfig,
+                                                                               
            final DataSourcePoolProperties dataSourcePoolProps) {
+        return pipelineDataSourceConfig instanceof 
StandardPipelineDataSourceConfiguration
+                ? transformStandardPipelineDataSourceConfiguration(jobId, 
dataSourceName, (StandardPipelineDataSourceConfiguration) 
pipelineDataSourceConfig, dataSourcePoolProps)
+                : pipelineDataSourceConfig;
+    }
+    
+    /**
+     * Transform the given ShardingSphere pipeline data source properties.
+     *
+     * @param jobId job id
+     * @param pipelineDataSourceConfig the pipeline data source configuration 
to transform
+     * @param storageUnits current storage units
+     * @return transformed pipeline data source configuration
+     */
+    public static ShardingSpherePipelineDataSourceConfiguration 
transformPipelineDataSourceConfiguration(final String jobId,
+                                                                               
                          final ShardingSpherePipelineDataSourceConfiguration 
pipelineDataSourceConfig,
+                                                                               
                          final Map<String, StorageUnit> storageUnits) {
+        if (null == storageUnits || storageUnits.isEmpty()) {
+            return pipelineDataSourceConfig;
+        }
+        for (Entry<String, Map<String, Object>> entry : 
pipelineDataSourceConfig.getRootConfig().getDataSources().entrySet()) {
+            StorageUnit storageUnit = storageUnits.get(entry.getKey());
             if (null == storageUnit) {
                 continue;
             }
             Map<String, Object> jobDataSourceProps = entry.getValue();
             Map<String, Object> storageUnitStandardProps = 
storageUnit.getDataSourcePoolProperties().getPoolPropertySynonyms().getStandardProperties();
-            if (storageUnitStandardProps.containsKey("maxPoolSize")) {
-                log.info("Transform maxPoolSize from '{}' to '{}' for {} data 
source: {}",
-                        jobDataSourceProps.get("maxPoolSize"), 
storageUnitStandardProps.get("maxPoolSize"), jobId, entry.getKey());
-                jobDataSourceProps.put("maxPoolSize", 
storageUnitStandardProps.get("maxPoolSize"));
-            }
-            if (storageUnitStandardProps.containsKey("maximumPoolSize")) {
-                log.info("Transform maximumPoolSize from '{}' to '{}' for {} 
data source: {}",
-                        jobDataSourceProps.get("maximumPoolSize"), 
storageUnitStandardProps.get("maximumPoolSize"), jobId, entry.getKey());
-                jobDataSourceProps.put("maximumPoolSize", 
storageUnitStandardProps.get("maximumPoolSize"));
-            }
+            logTransformPoolSize(jobId, entry.getKey(), jobDataSourceProps, 
storageUnitStandardProps);
+            transformPoolSize(jobDataSourceProps, storageUnitStandardProps);
+        }
+        return pipelineDataSourceConfig;
+    }
+    
+    private static PipelineDataSourceConfiguration 
transformStandardPipelineDataSourceConfiguration(final String jobId, final 
String dataSourceName,
+                                                                               
                     final StandardPipelineDataSourceConfiguration 
pipelineDataSourceConfig,
+                                                                               
                     final StorageUnit storageUnit) {
+        return transformStandardPipelineDataSourceConfiguration(jobId, 
dataSourceName, pipelineDataSourceConfig, 
storageUnit.getDataSourcePoolProperties());
+    }
+    
+    private static PipelineDataSourceConfiguration 
transformStandardPipelineDataSourceConfiguration(final String jobId, final 
String dataSourceName,
+                                                                               
                     final StandardPipelineDataSourceConfiguration 
pipelineDataSourceConfig,
+                                                                               
                     final DataSourcePoolProperties dataSourcePoolProps) {
+        DataSourcePoolProperties jobDataSourcePoolProps = 
(DataSourcePoolProperties) 
pipelineDataSourceConfig.getDataSourceConfiguration();
+        Map<String, Object> jobStandardProps = 
jobDataSourcePoolProps.getPoolPropertySynonyms().getStandardProperties();
+        Map<String, Object> currentStandardProps = 
dataSourcePoolProps.getPoolPropertySynonyms().getStandardProperties();
+        if (!isPoolSizeChanged(jobStandardProps, currentStandardProps)) {
+            return pipelineDataSourceConfig;
+        }
+        logTransformPoolSize(jobId, dataSourceName, jobStandardProps, 
currentStandardProps);
+        return new StandardPipelineDataSourceConfiguration(new 
YamlDataSourceConfigurationSwapper().swapToMap(dataSourcePoolProps));
+    }
+    
+    private static boolean isPoolSizeChanged(final Map<String, Object> 
jobDataSourceProps, final Map<String, Object> storageUnitStandardProps) {
+        return isPoolSizeChanged("maxPoolSize", jobDataSourceProps, 
storageUnitStandardProps)
+                || isPoolSizeChanged("maximumPoolSize", jobDataSourceProps, 
storageUnitStandardProps);
+    }
+    
+    private static boolean isPoolSizeChanged(final String key, final 
Map<String, Object> jobDataSourceProps, final Map<String, Object> 
storageUnitStandardProps) {
+        return storageUnitStandardProps.containsKey(key) && 
!Objects.equals(String.valueOf(jobDataSourceProps.get(key)), 
String.valueOf(storageUnitStandardProps.get(key)));
+    }
+    
+    private static void logTransformPoolSize(final String jobId, final String 
dataSourceName, final Map<String, Object> jobDataSourceProps, final Map<String, 
Object> storageUnitStandardProps) {
+        if (storageUnitStandardProps.containsKey("maxPoolSize")) {
+            log.info("Transform maxPoolSize from '{}' to '{}' for {} data 
source: {}",
+                    jobDataSourceProps.get("maxPoolSize"), 
storageUnitStandardProps.get("maxPoolSize"), jobId, dataSourceName);
+        }
+        if (storageUnitStandardProps.containsKey("maximumPoolSize")) {
+            log.info("Transform maximumPoolSize from '{}' to '{}' for {} data 
source: {}",
+                    jobDataSourceProps.get("maximumPoolSize"), 
storageUnitStandardProps.get("maximumPoolSize"), jobId, dataSourceName);
+        }
+    }
+    
+    private static void transformPoolSize(final Map<String, Object> 
jobDataSourceProps, final Map<String, Object> storageUnitStandardProps) {
+        if (storageUnitStandardProps.containsKey("maxPoolSize")) {
+            jobDataSourceProps.put("maxPoolSize", 
storageUnitStandardProps.get("maxPoolSize"));
+        }
+        if (storageUnitStandardProps.containsKey("maximumPoolSize")) {
+            jobDataSourceProps.put("maximumPoolSize", 
storageUnitStandardProps.get("maximumPoolSize"));
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtilsTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtilsTest.java
new file mode 100644
index 00000000000..66341ae4964
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtilsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.spi.JdbcQueryPropertiesExtension;
+import 
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.synonym.PoolPropertySynonyms;
+import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({DatabaseTypedSPILoader.class, DatabaseTypeFactory.class})
+class PipelineDataSourceConfigurationUtilsTest {
+    
+    private static final String JDBC_URL = "jdbc:mock://127.0.0.1/foo_ds";
+    
+    private static final String USERNAME = "root";
+    
+    private static final String PASSWORD = "123456";
+    
+    private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
+    
+    @Test
+    void assertTransformPipelineDataSourceConfiguration() {
+        Map<String, Object> dataSourceProps = new LinkedHashMap<>(2, 1F);
+        dataSourceProps.put("maxPoolSize", 2);
+        dataSourceProps.put("maximumPoolSize", 3);
+        ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig 
= mock(ShardingSpherePipelineDataSourceConfiguration.class);
+        
when(pipelineDataSourceConfig.getRootConfig()).thenReturn(createYamlRootConfiguration(dataSourceProps));
+        
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
 pipelineDataSourceConfig, Collections.singletonMap("ds_0", mockStorageUnit()));
+        assertThat(dataSourceProps.get("maxPoolSize"), is(10));
+        assertThat(dataSourceProps.get("maximumPoolSize"), is(20));
+    }
+    
+    @Test
+    void 
assertTransformPipelineDataSourceConfigurationWithUnnamedStandardDataSource() {
+        mockDatabaseTypeFactory();
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2, 
3));
+        
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
 pipelineDataSourceConfig, Collections.singletonMap("ds_0", 
mock(StorageUnit.class))),
+                sameInstance(pipelineDataSourceConfig));
+    }
+    
+    @Test
+    void assertTransformPipelineDataSourceConfigurationWithNullStorageUnits() {
+        mockDatabaseTypeFactory();
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2, 
3));
+        
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
 pipelineDataSourceConfig, null), sameInstance(pipelineDataSourceConfig));
+    }
+    
+    @Test
+    void assertTransformPipelineDataSourceConfigurationWithEmptyStorageUnits() 
{
+        ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig 
= mock(ShardingSpherePipelineDataSourceConfiguration.class);
+        
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
 pipelineDataSourceConfig, Collections.emptyMap()), 
sameInstance(pipelineDataSourceConfig));
+    }
+    
+    @Test
+    void 
assertTransformPipelineDataSourceConfigurationWithStandardDataSource() {
+        mockDatabaseTypeFactory();
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2, 
3));
+        StorageUnit storageUnit = mockStorageUnit(10, 20);
+        StandardPipelineDataSourceConfiguration actual = 
(StandardPipelineDataSourceConfiguration) 
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(
+                "foo_job", "ds_0", pipelineDataSourceConfig, 
Collections.singletonMap("ds_0", storageUnit));
+        DataSourcePoolProperties actualProps = (DataSourcePoolProperties) 
actual.getDataSourceConfiguration();
+        
assertThat(actualProps.getPoolPropertySynonyms().getStandardProperties().get("maxPoolSize"),
 is(10));
+    }
+    
+    @Test
+    void 
assertTransformPipelineDataSourceConfigurationWithCurrentStandardDataSource() {
+        mockDatabaseTypeFactory();
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2, 
3));
+        StandardPipelineDataSourceConfiguration actual = 
(StandardPipelineDataSourceConfiguration) 
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(
+                "foo_job", "ds_0", pipelineDataSourceConfig, new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", 
createStandardDataSourceProperties(10, 20)));
+        DataSourcePoolProperties actualProps = (DataSourcePoolProperties) 
actual.getDataSourceConfiguration();
+        
assertThat(actualProps.getPoolPropertySynonyms().getStandardProperties().get("maxPoolSize"),
 is(10));
+    }
+    
+    @Test
+    void 
assertTransformPipelineDataSourceConfigurationWithSameStandardDataSource() {
+        mockDatabaseTypeFactory();
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(10, 
20));
+        
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(
+                "foo_job", "ds_0", pipelineDataSourceConfig, new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", 
createStandardDataSourceProperties(10, 20))),
+                sameInstance(pipelineDataSourceConfig));
+    }
+    
+    @Test
+    void assertTransformPipelineDataSourceConfigurationWithAbsentStorageUnit() 
{
+        mockDatabaseTypeFactory();
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2, 
3));
+        
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
 "ds_0", pipelineDataSourceConfig, Collections.emptyMap()), 
is(pipelineDataSourceConfig));
+    }
+    
+    private void mockDatabaseTypeFactory() {
+        when(DatabaseTypeFactory.get(JDBC_URL)).thenReturn(databaseType);
+        
when(DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, 
databaseType)).thenReturn(Optional.empty());
+    }
+    
+    private Map<String, Object> createStandardDataSourceProperties(final int 
maxPoolSize, final int maximumPoolSize) {
+        Map<String, Object> result = new LinkedHashMap<>(5, 1F);
+        result.put("url", JDBC_URL);
+        result.put("username", USERNAME);
+        result.put("password", PASSWORD);
+        result.put("maxPoolSize", maxPoolSize);
+        result.put("maximumPoolSize", maximumPoolSize);
+        return result;
+    }
+    
+    private YamlRootConfiguration createYamlRootConfiguration(final 
Map<String, Object> dataSourceProps) {
+        YamlRootConfiguration result = new YamlRootConfiguration();
+        result.setDatabaseName("foo_db");
+        result.setDataSources(Collections.singletonMap("ds_0", 
dataSourceProps));
+        return result;
+    }
+    
+    private StorageUnit mockStorageUnit() {
+        Map<String, Object> standardProps = new LinkedHashMap<>(2, 1F);
+        standardProps.put("maxPoolSize", 10);
+        standardProps.put("maximumPoolSize", 20);
+        StorageUnit result = mock(StorageUnit.class);
+        DataSourcePoolProperties dataSourcePoolProps = 
mock(DataSourcePoolProperties.class);
+        PoolPropertySynonyms poolPropertySynonyms = 
mock(PoolPropertySynonyms.class);
+        
when(result.getDataSourcePoolProperties()).thenReturn(dataSourcePoolProps);
+        
when(dataSourcePoolProps.getPoolPropertySynonyms()).thenReturn(poolPropertySynonyms);
+        
when(poolPropertySynonyms.getStandardProperties()).thenReturn(standardProps);
+        return result;
+    }
+    
+    private StorageUnit mockStorageUnit(final int maxPoolSize, final int 
maximumPoolSize) {
+        StorageUnit result = mock(StorageUnit.class);
+        when(result.getDataSourcePoolProperties())
+                .thenReturn(new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", 
createStandardDataSourceProperties(maxPoolSize, maximumPoolSize)));
+        return result;
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index a3f1df3a6b3..578edf19537 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -27,12 +28,15 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.PipelineRequiredCol
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutorCallback;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDataSourceConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -40,6 +44,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
@@ -57,14 +63,33 @@ import java.util.stream.Collectors;
  */
 public final class MigrationJobExecutorCallback implements 
DistributedPipelineJobExecutorCallback<MigrationJobConfiguration, 
MigrationJobItemContext, TransmissionJobItemProgress> {
     
+    private static final String JOB_TYPE = new MigrationJobType().getType();
+    
     @Override
     public MigrationJobItemContext buildJobItemContext(final 
MigrationJobConfiguration jobConfig, final int shardingItem,
                                                        final 
TransmissionJobItemProgress jobItemProgress, final TransmissionProcessContext 
jobProcessContext,
                                                        final 
PipelineDataSourceManager dataSourceManager) {
+        transformDataSourceConfigurations(jobConfig);
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getProcessConfiguration());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext, taskConfig, dataSourceManager);
     }
     
+    private void transformDataSourceConfigurations(final 
MigrationJobConfiguration jobConfig) {
+        Map<String, DataSourcePoolProperties> sourceDataSourcePoolProps = new 
PipelineDataSourcePersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 JOB_TYPE);
+        Map<String, StorageUnit> targetStorageUnits = 
PipelineContextManager.getProxyContext().getStorageUnits(jobConfig.getTargetDatabaseName());
+        for (Entry<String, PipelineDataSourceConfiguration> entry : 
jobConfig.getSources().entrySet()) {
+            entry.setValue(transformSourceDataSourceConfiguration(jobConfig, 
entry, sourceDataSourcePoolProps));
+        }
+        
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
 jobConfig.getTarget(), targetStorageUnits);
+    }
+    
+    private PipelineDataSourceConfiguration 
transformSourceDataSourceConfiguration(final MigrationJobConfiguration 
jobConfig, final Entry<String, PipelineDataSourceConfiguration> entry,
+                                                                               
    final Map<String, DataSourcePoolProperties> sourceDataSourcePoolProps) {
+        return sourceDataSourcePoolProps.containsKey(entry.getKey())
+                ? 
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
 entry.getKey(), entry.getValue(), 
sourceDataSourcePoolProps.get(entry.getKey()))
+                : entry.getValue();
+    }
+    
     private MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         Map<ShardingSphereIdentifier, Collection<String>> 
tableAndRequiredColumnsMap = getTableAndRequiredColumnsMap(jobConfig);
         IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobDataNodeLine(jobShardingItem));
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 21257453936..3b5cc088160 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
@@ -27,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
@@ -35,9 +37,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -48,7 +52,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDataSourceConfi
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import java.util.LinkedHashMap;
@@ -98,7 +104,8 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         try (
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
-            
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
 (ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget());
+            Map<String, StorageUnit> storageUnits = 
PipelineContextManager.getProxyContext().getStorageUnits(jobConfig.getTargetDatabaseName());
+            transformDataSourceConfigurations(storageUnits);
             if (progressContext.getTableCheckRangePositions().isEmpty()) {
                 
progressContext.getTableCheckRangePositions().addAll(splitCrossTables());
             }
@@ -124,6 +131,18 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         return 
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().format(), Entry::getValue));
     }
     
+    private void transformDataSourceConfigurations(final Map<String, 
StorageUnit> targetStorageUnits) {
+        Map<String, DataSourcePoolProperties> sourceDataSourcePoolProps =
+                new 
PipelineDataSourcePersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 new MigrationJobType().getType());
+        for (Entry<String, PipelineDataSourceConfiguration> entry : 
jobConfig.getSources().entrySet()) {
+            
entry.setValue(sourceDataSourcePoolProps.containsKey(entry.getKey())
+                    ? 
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
 entry.getKey(), entry.getValue(),
+                            sourceDataSourcePoolProps.get(entry.getKey()))
+                    : entry.getValue());
+        }
+        
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
 (ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget(), 
targetStorageUnits);
+    }
+    
     private long getRecordsCount() {
         Map<Integer, TransmissionJobItemProgress> jobProgress = new 
TransmissionJobManager(new MigrationJobType()).getJobProgress(jobConfig);
         return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getInventoryRecordsCount).sum();
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallbackTest.java
 
b/kernel/data-pipeline/scenario/migration/core/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallbackTest.java
new file mode 100644
index 00000000000..1d826b5cdd4
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallbackTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
+
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class MigrationJobExecutorCallbackTest {
+    
+    private static final String DATABASE_NAME = "foo_db";
+    
+    private static final String SOURCE_DATA_SOURCE_NAME = "source_ds";
+    
+    private static final String TARGET_DATA_SOURCE_NAME = "target_ds";
+    
+    private static final String SOURCE_JDBC_URL = 
"jdbc:mock://127.0.0.1/source_ds";
+    
+    private static final String TARGET_JDBC_URL = 
"jdbc:mock://127.0.0.1/target_ds";
+    
+    private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
+    
+    @Test
+    void assertBuildJobItemContextWithTransformedDataSourceConfigurations() {
+        try (
+                MockedStatic<DatabaseTypeFactory> databaseTypeFactory = 
mockStatic(DatabaseTypeFactory.class);
+                MockedStatic<PipelineAPIFactory> pipelineAPIFactory = 
mockStatic(PipelineAPIFactory.class);
+                MockedStatic<PipelineContextManager> pipelineContextManager = 
mockStatic(PipelineContextManager.class)) {
+            mockDatabaseTypeFactory(databaseTypeFactory);
+            mockGovernanceFacade(pipelineAPIFactory, 
createSourceDataSourceYaml(10, 20));
+            mockProxyContext(pipelineContextManager);
+            MigrationJobItemContext actual = new 
MigrationJobExecutorCallback().buildJobItemContext(createJobConfiguration(), 0, 
null, mockProcessContext(),
+                    mock(PipelineDataSourceManager.class));
+            MigrationTaskConfiguration actualTaskConfig = 
actual.getTaskConfig();
+            
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration) 
actualTaskConfig.getDumperContext().getCommonContext().getDataSourceConfig()), 
is("10"));
+            ImporterConfiguration actualImporterConfig = 
actualTaskConfig.getImporterConfig();
+            
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
 actualImporterConfig.getDataSourceConfig()), is("30"));
+            CreateTableConfiguration actualCreateTableConfig = 
actualTaskConfig.getCreateTableConfigurations().iterator().next();
+            
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration) 
actualCreateTableConfig.getSourceDataSourceConfig()), is("10"));
+            
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
 actualCreateTableConfig.getTargetDataSourceConfig()), is("30"));
+        }
+    }
+    
+    @Test
+    void assertBuildJobItemContextWithoutMigrationSourceStorageUnit() {
+        try (
+                MockedStatic<DatabaseTypeFactory> databaseTypeFactory = 
mockStatic(DatabaseTypeFactory.class);
+                MockedStatic<PipelineAPIFactory> pipelineAPIFactory = 
mockStatic(PipelineAPIFactory.class);
+                MockedStatic<PipelineContextManager> pipelineContextManager = 
mockStatic(PipelineContextManager.class)) {
+            mockDatabaseTypeFactory(databaseTypeFactory);
+            mockGovernanceFacade(pipelineAPIFactory, "");
+            Map<String, StorageUnit> storageUnits = new LinkedHashMap<>(2, 1F);
+            storageUnits.put(SOURCE_DATA_SOURCE_NAME, mock(StorageUnit.class));
+            storageUnits.put(TARGET_DATA_SOURCE_NAME, mockStorageUnit(30, 40));
+            mockProxyContext(pipelineContextManager, storageUnits);
+            MigrationJobItemContext actual = new 
MigrationJobExecutorCallback().buildJobItemContext(createJobConfiguration(), 0, 
null, mockProcessContext(),
+                    mock(PipelineDataSourceManager.class));
+            MigrationTaskConfiguration actualTaskConfig = 
actual.getTaskConfig();
+            
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration) 
actualTaskConfig.getDumperContext().getCommonContext().getDataSourceConfig()), 
is("2"));
+            ImporterConfiguration actualImporterConfig = 
actualTaskConfig.getImporterConfig();
+            
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
 actualImporterConfig.getDataSourceConfig()), is("30"));
+            CreateTableConfiguration actualCreateTableConfig = 
actualTaskConfig.getCreateTableConfigurations().iterator().next();
+            
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration) 
actualCreateTableConfig.getSourceDataSourceConfig()), is("2"));
+            
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
 actualCreateTableConfig.getTargetDataSourceConfig()), is("30"));
+        }
+    }
+    
+    private void mockDatabaseTypeFactory(final 
MockedStatic<DatabaseTypeFactory> databaseTypeFactory) {
+        databaseTypeFactory.when(() -> 
DatabaseTypeFactory.get(anyString())).thenReturn(databaseType);
+    }
+    
+    private void mockGovernanceFacade(final MockedStatic<PipelineAPIFactory> 
pipelineAPIFactory, final String dataSourcesYaml) {
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class, RETURNS_DEEP_STUBS);
+        when(governanceFacade.getMetaDataFacade().getDataSource().load(new 
MigrationJobType().getType())).thenReturn(dataSourcesYaml);
+        pipelineAPIFactory.when(() -> 
PipelineAPIFactory.getPipelineGovernanceFacade(any(PipelineContextKey.class))).thenReturn(governanceFacade);
+    }
+    
+    private String createSourceDataSourceYaml(final int maxPoolSize, final int 
maximumPoolSize) {
+        return SOURCE_DATA_SOURCE_NAME + ":\n"
+                + "  dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n"
+                + "  url: " + SOURCE_JDBC_URL + "\n"
+                + "  username: root\n"
+                + "  password: root\n"
+                + "  maxPoolSize: " + maxPoolSize + "\n"
+                + "  maximumPoolSize: " + maximumPoolSize + "\n";
+    }
+    
+    private void mockProxyContext(final MockedStatic<PipelineContextManager> 
pipelineContextManager) {
+        mockProxyContext(pipelineContextManager, 
Collections.singletonMap(TARGET_DATA_SOURCE_NAME, mockStorageUnit(30, 40)));
+    }
+    
+    private void mockProxyContext(final MockedStatic<PipelineContextManager> 
pipelineContextManager, final Map<String, StorageUnit> storageUnits) {
+        ContextManager contextManager = mock(ContextManager.class);
+        
when(contextManager.getStorageUnits(DATABASE_NAME)).thenReturn(storageUnits);
+        
pipelineContextManager.when(PipelineContextManager::getProxyContext).thenReturn(contextManager);
+    }
+    
+    private StorageUnit mockStorageUnit(final int maxPoolSize, final int 
maximumPoolSize) {
+        StorageUnit result = mock(StorageUnit.class);
+        when(result.getDataSourcePoolProperties())
+                .thenReturn(new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", 
createDataSourceProperties(TARGET_JDBC_URL, maxPoolSize, maximumPoolSize)));
+        return result;
+    }
+    
+    private MigrationJobConfiguration createJobConfiguration() {
+        Map<String, PipelineDataSourceConfiguration> sources = new 
LinkedHashMap<>(1, 1F);
+        sources.put(SOURCE_DATA_SOURCE_NAME, new 
StandardPipelineDataSourceConfiguration(createDataSourceProperties(SOURCE_JDBC_URL,
 2, 3)));
+        return new MigrationJobConfiguration(createJobId(), DATABASE_NAME, 
databaseType, databaseType, sources,
+                new 
ShardingSpherePipelineDataSourceConfiguration(createRootConfiguration(4, 5)), 
Collections.singletonList("t_order"),
+                Collections.singletonMap("t_order", "foo_schema"), 
createJobDataNodeLine(), Collections.singletonList(createJobDataNodeLine()), 1, 
3);
+    }
+    
+    private String createJobId() {
+        return PipelineJobIdUtils.marshal(new MigrationJobId(new 
PipelineContextKey(InstanceType.PROXY), 
Collections.singletonList(createJobDataNodeLine().marshal())));
+    }
+    
+    private JobDataNodeLine createJobDataNodeLine() {
+        return new JobDataNodeLine(Collections.singletonList(new 
JobDataNodeEntry("t_order", Collections.singletonList(new 
DataNode(SOURCE_DATA_SOURCE_NAME + ".t_order")))));
+    }
+    
+    private YamlRootConfiguration createRootConfiguration(final int 
maxPoolSize, final int maximumPoolSize) {
+        YamlRootConfiguration result = new YamlRootConfiguration();
+        result.setDatabaseName(DATABASE_NAME);
+        
result.setDataSources(Collections.singletonMap(TARGET_DATA_SOURCE_NAME, 
createDataSourceProperties(TARGET_JDBC_URL, maxPoolSize, maximumPoolSize)));
+        result.setRules(Collections.emptyList());
+        return result;
+    }
+    
+    private Map<String, Object> createDataSourceProperties(final String 
jdbcUrl, final int maxPoolSize, final int maximumPoolSize) {
+        Map<String, Object> result = new LinkedHashMap<>(5, 1F);
+        result.put("url", jdbcUrl);
+        result.put("username", "root");
+        result.put("password", "root");
+        result.put("maxPoolSize", maxPoolSize);
+        result.put("maximumPoolSize", maximumPoolSize);
+        return result;
+    }
+    
+    private TransmissionProcessContext mockProcessContext() {
+        TransmissionProcessContext result = 
mock(TransmissionProcessContext.class);
+        when(result.getProcessConfiguration()).thenReturn(new 
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 10, 10, null),
+                new PipelineWriteConfiguration(1, 50, null), null));
+        return result;
+    }
+    
+    private String getMaxPoolSize(final 
StandardPipelineDataSourceConfiguration dataSourceConfig) {
+        DataSourcePoolProperties poolProps = (DataSourcePoolProperties) 
dataSourceConfig.getDataSourceConfiguration();
+        return 
String.valueOf(poolProps.getPoolPropertySynonyms().getStandardProperties().get("maxPoolSize"));
+    }
+    
+    private String getRootDataSourceMaxPoolSize(final 
ShardingSpherePipelineDataSourceConfiguration dataSourceConfig) {
+        Collection<Map<String, Object>> dataSources = 
dataSourceConfig.getRootConfig().getDataSources().values();
+        return 
String.valueOf(dataSources.iterator().next().get("maxPoolSize"));
+    }
+}

Reply via email to