This is an automated email from the ASF dual-hosted git repository.
sandynz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e73d04086ff pipeline: refresh migration pool sizes on restart (#38734)
e73d04086ff is described below
commit e73d04086ff81ddc30de02fe9420faa20c055136
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed May 27 21:44:04 2026 +0800
pipeline: refresh migration pool sizes on restart (#38734)
---
.../util/PipelineDataSourceConfigurationUtils.java | 149 ++++++++++++---
.../PipelineDataSourceConfigurationUtilsTest.java | 170 +++++++++++++++++
.../migration/MigrationJobExecutorCallback.java | 25 +++
.../MigrationDataConsistencyChecker.java | 21 +-
.../MigrationJobExecutorCallbackTest.java | 212 +++++++++++++++++++++
5 files changed, 551 insertions(+), 26 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
index e5e679e9e00..bcb4ccc070a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtils.java
@@ -20,14 +20,16 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
/**
* Utility class for pipeline data source configuration.
@@ -41,35 +43,132 @@ public final class PipelineDataSourceConfigurationUtils {
*
* @param jobId job id
* @param pipelineDataSourceConfig the pipeline data source configuration
to transform
+ * @param storageUnits current storage units
+ * @return transformed pipeline data source configuration
*/
- public static void transformPipelineDataSourceConfiguration(final String
jobId, final ShardingSpherePipelineDataSourceConfiguration
pipelineDataSourceConfig) {
- YamlRootConfiguration rootConfig =
pipelineDataSourceConfig.getRootConfig();
- ShardingSphereDatabase database;
- try {
- database =
PipelineContextManager.getProxyContext().getDatabase(rootConfig.getDatabaseName());
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ignored) {
- // CHECKSTYLE:ON
- return;
+ public static PipelineDataSourceConfiguration
transformPipelineDataSourceConfiguration(final String jobId, final
PipelineDataSourceConfiguration pipelineDataSourceConfig,
+
final Map<String, StorageUnit> storageUnits) {
+ if (null == storageUnits || storageUnits.isEmpty()) {
+ return pipelineDataSourceConfig;
}
- Map<String, StorageUnit> storageUnitMap =
database.getResourceMetaData().getStorageUnits();
- for (Entry<String, Map<String, Object>> entry :
rootConfig.getDataSources().entrySet()) {
- StorageUnit storageUnit = storageUnitMap.get(entry.getKey());
+ if (pipelineDataSourceConfig instanceof
ShardingSpherePipelineDataSourceConfiguration) {
+ return transformPipelineDataSourceConfiguration(jobId,
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig,
storageUnits);
+ }
+ return pipelineDataSourceConfig;
+ }
+
+ /**
+ * Transform the given pipeline data source properties.
+ *
+ * @param jobId job id
+ * @param dataSourceName data source name
+ * @param pipelineDataSourceConfig the pipeline data source configuration
to transform
+ * @param storageUnits current storage units
+ * @return transformed pipeline data source configuration
+ */
+ public static PipelineDataSourceConfiguration
transformPipelineDataSourceConfiguration(final String jobId, final String
dataSourceName,
+
final PipelineDataSourceConfiguration pipelineDataSourceConfig,
+
final Map<String, StorageUnit> storageUnits) {
+ if (null == storageUnits || storageUnits.isEmpty()) {
+ return pipelineDataSourceConfig;
+ }
+ if (pipelineDataSourceConfig instanceof
StandardPipelineDataSourceConfiguration) {
+ StorageUnit storageUnit = storageUnits.get(dataSourceName);
+ return null == storageUnit ? pipelineDataSourceConfig
+ : transformStandardPipelineDataSourceConfiguration(jobId,
dataSourceName, (StandardPipelineDataSourceConfiguration)
pipelineDataSourceConfig, storageUnit);
+ }
+ return transformPipelineDataSourceConfiguration(jobId,
pipelineDataSourceConfig, storageUnits);
+ }
+
+ /**
+ * Transform the given pipeline data source properties.
+ *
+ * @param jobId job id
+ * @param dataSourceName data source name
+ * @param pipelineDataSourceConfig the pipeline data source configuration
to transform
+ * @param dataSourcePoolProps current data source pool properties
+ * @return transformed pipeline data source configuration
+ */
+ public static PipelineDataSourceConfiguration
transformPipelineDataSourceConfiguration(final String jobId, final String
dataSourceName,
+
final PipelineDataSourceConfiguration pipelineDataSourceConfig,
+
final DataSourcePoolProperties dataSourcePoolProps) {
+ return pipelineDataSourceConfig instanceof
StandardPipelineDataSourceConfiguration
+ ? transformStandardPipelineDataSourceConfiguration(jobId,
dataSourceName, (StandardPipelineDataSourceConfiguration)
pipelineDataSourceConfig, dataSourcePoolProps)
+ : pipelineDataSourceConfig;
+ }
+
+ /**
+ * Transform the given ShardingSphere pipeline data source properties.
+ *
+ * @param jobId job id
+ * @param pipelineDataSourceConfig the pipeline data source configuration
to transform
+ * @param storageUnits current storage units
+ * @return transformed pipeline data source configuration
+ */
+ public static ShardingSpherePipelineDataSourceConfiguration
transformPipelineDataSourceConfiguration(final String jobId,
+
final ShardingSpherePipelineDataSourceConfiguration
pipelineDataSourceConfig,
+
final Map<String, StorageUnit> storageUnits) {
+ if (null == storageUnits || storageUnits.isEmpty()) {
+ return pipelineDataSourceConfig;
+ }
+ for (Entry<String, Map<String, Object>> entry :
pipelineDataSourceConfig.getRootConfig().getDataSources().entrySet()) {
+ StorageUnit storageUnit = storageUnits.get(entry.getKey());
if (null == storageUnit) {
continue;
}
Map<String, Object> jobDataSourceProps = entry.getValue();
Map<String, Object> storageUnitStandardProps =
storageUnit.getDataSourcePoolProperties().getPoolPropertySynonyms().getStandardProperties();
- if (storageUnitStandardProps.containsKey("maxPoolSize")) {
- log.info("Transform maxPoolSize from '{}' to '{}' for {} data
source: {}",
- jobDataSourceProps.get("maxPoolSize"),
storageUnitStandardProps.get("maxPoolSize"), jobId, entry.getKey());
- jobDataSourceProps.put("maxPoolSize",
storageUnitStandardProps.get("maxPoolSize"));
- }
- if (storageUnitStandardProps.containsKey("maximumPoolSize")) {
- log.info("Transform maximumPoolSize from '{}' to '{}' for {}
data source: {}",
- jobDataSourceProps.get("maximumPoolSize"),
storageUnitStandardProps.get("maximumPoolSize"), jobId, entry.getKey());
- jobDataSourceProps.put("maximumPoolSize",
storageUnitStandardProps.get("maximumPoolSize"));
- }
+ logTransformPoolSize(jobId, entry.getKey(), jobDataSourceProps,
storageUnitStandardProps);
+ transformPoolSize(jobDataSourceProps, storageUnitStandardProps);
+ }
+ return pipelineDataSourceConfig;
+ }
+
+ private static PipelineDataSourceConfiguration
transformStandardPipelineDataSourceConfiguration(final String jobId, final
String dataSourceName,
+
final StandardPipelineDataSourceConfiguration
pipelineDataSourceConfig,
+
final StorageUnit storageUnit) {
+ return transformStandardPipelineDataSourceConfiguration(jobId,
dataSourceName, pipelineDataSourceConfig,
storageUnit.getDataSourcePoolProperties());
+ }
+
+ private static PipelineDataSourceConfiguration
transformStandardPipelineDataSourceConfiguration(final String jobId, final
String dataSourceName,
+
final StandardPipelineDataSourceConfiguration
pipelineDataSourceConfig,
+
final DataSourcePoolProperties dataSourcePoolProps) {
+ DataSourcePoolProperties jobDataSourcePoolProps =
(DataSourcePoolProperties)
pipelineDataSourceConfig.getDataSourceConfiguration();
+ Map<String, Object> jobStandardProps =
jobDataSourcePoolProps.getPoolPropertySynonyms().getStandardProperties();
+ Map<String, Object> currentStandardProps =
dataSourcePoolProps.getPoolPropertySynonyms().getStandardProperties();
+ if (!isPoolSizeChanged(jobStandardProps, currentStandardProps)) {
+ return pipelineDataSourceConfig;
+ }
+ logTransformPoolSize(jobId, dataSourceName, jobStandardProps,
currentStandardProps);
+ return new StandardPipelineDataSourceConfiguration(new
YamlDataSourceConfigurationSwapper().swapToMap(dataSourcePoolProps));
+ }
+
+ private static boolean isPoolSizeChanged(final Map<String, Object>
jobDataSourceProps, final Map<String, Object> storageUnitStandardProps) {
+ return isPoolSizeChanged("maxPoolSize", jobDataSourceProps,
storageUnitStandardProps)
+ || isPoolSizeChanged("maximumPoolSize", jobDataSourceProps,
storageUnitStandardProps);
+ }
+
+ private static boolean isPoolSizeChanged(final String key, final
Map<String, Object> jobDataSourceProps, final Map<String, Object>
storageUnitStandardProps) {
+ return storageUnitStandardProps.containsKey(key) &&
!Objects.equals(String.valueOf(jobDataSourceProps.get(key)),
String.valueOf(storageUnitStandardProps.get(key)));
+ }
+
+ private static void logTransformPoolSize(final String jobId, final String
dataSourceName, final Map<String, Object> jobDataSourceProps, final Map<String,
Object> storageUnitStandardProps) {
+ if (storageUnitStandardProps.containsKey("maxPoolSize")) {
+ log.info("Transform maxPoolSize from '{}' to '{}' for {} data
source: {}",
+ jobDataSourceProps.get("maxPoolSize"),
storageUnitStandardProps.get("maxPoolSize"), jobId, dataSourceName);
+ }
+ if (storageUnitStandardProps.containsKey("maximumPoolSize")) {
+ log.info("Transform maximumPoolSize from '{}' to '{}' for {} data
source: {}",
+ jobDataSourceProps.get("maximumPoolSize"),
storageUnitStandardProps.get("maximumPoolSize"), jobId, dataSourceName);
+ }
+ }
+
+ private static void transformPoolSize(final Map<String, Object>
jobDataSourceProps, final Map<String, Object> storageUnitStandardProps) {
+ if (storageUnitStandardProps.containsKey("maxPoolSize")) {
+ jobDataSourceProps.put("maxPoolSize",
storageUnitStandardProps.get("maxPoolSize"));
+ }
+ if (storageUnitStandardProps.containsKey("maximumPoolSize")) {
+ jobDataSourceProps.put("maximumPoolSize",
storageUnitStandardProps.get("maximumPoolSize"));
}
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtilsTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtilsTest.java
new file mode 100644
index 00000000000..66341ae4964
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDataSourceConfigurationUtilsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.spi.JdbcQueryPropertiesExtension;
+import
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.synonym.PoolPropertySynonyms;
+import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({DatabaseTypedSPILoader.class, DatabaseTypeFactory.class})
+class PipelineDataSourceConfigurationUtilsTest {
+
+ private static final String JDBC_URL = "jdbc:mock://127.0.0.1/foo_ds";
+
+ private static final String USERNAME = "root";
+
+ private static final String PASSWORD = "123456";
+
+ private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
+
+ @Test
+ void assertTransformPipelineDataSourceConfiguration() {
+ Map<String, Object> dataSourceProps = new LinkedHashMap<>(2, 1F);
+ dataSourceProps.put("maxPoolSize", 2);
+ dataSourceProps.put("maximumPoolSize", 3);
+ ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig
= mock(ShardingSpherePipelineDataSourceConfiguration.class);
+
when(pipelineDataSourceConfig.getRootConfig()).thenReturn(createYamlRootConfiguration(dataSourceProps));
+
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
pipelineDataSourceConfig, Collections.singletonMap("ds_0", mockStorageUnit()));
+ assertThat(dataSourceProps.get("maxPoolSize"), is(10));
+ assertThat(dataSourceProps.get("maximumPoolSize"), is(20));
+ }
+
+ @Test
+ void
assertTransformPipelineDataSourceConfigurationWithUnnamedStandardDataSource() {
+ mockDatabaseTypeFactory();
+ StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2,
3));
+
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
pipelineDataSourceConfig, Collections.singletonMap("ds_0",
mock(StorageUnit.class))),
+ sameInstance(pipelineDataSourceConfig));
+ }
+
+ @Test
+ void assertTransformPipelineDataSourceConfigurationWithNullStorageUnits() {
+ mockDatabaseTypeFactory();
+ StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2,
3));
+
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
pipelineDataSourceConfig, null), sameInstance(pipelineDataSourceConfig));
+ }
+
+ @Test
+ void assertTransformPipelineDataSourceConfigurationWithEmptyStorageUnits()
{
+ ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig
= mock(ShardingSpherePipelineDataSourceConfiguration.class);
+
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
pipelineDataSourceConfig, Collections.emptyMap()),
sameInstance(pipelineDataSourceConfig));
+ }
+
+ @Test
+ void
assertTransformPipelineDataSourceConfigurationWithStandardDataSource() {
+ mockDatabaseTypeFactory();
+ StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2,
3));
+ StorageUnit storageUnit = mockStorageUnit(10, 20);
+ StandardPipelineDataSourceConfiguration actual =
(StandardPipelineDataSourceConfiguration)
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(
+ "foo_job", "ds_0", pipelineDataSourceConfig,
Collections.singletonMap("ds_0", storageUnit));
+ DataSourcePoolProperties actualProps = (DataSourcePoolProperties)
actual.getDataSourceConfiguration();
+
assertThat(actualProps.getPoolPropertySynonyms().getStandardProperties().get("maxPoolSize"),
is(10));
+ }
+
+ @Test
+ void
assertTransformPipelineDataSourceConfigurationWithCurrentStandardDataSource() {
+ mockDatabaseTypeFactory();
+ StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2,
3));
+ StandardPipelineDataSourceConfiguration actual =
(StandardPipelineDataSourceConfiguration)
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(
+ "foo_job", "ds_0", pipelineDataSourceConfig, new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource",
createStandardDataSourceProperties(10, 20)));
+ DataSourcePoolProperties actualProps = (DataSourcePoolProperties)
actual.getDataSourceConfiguration();
+
assertThat(actualProps.getPoolPropertySynonyms().getStandardProperties().get("maxPoolSize"),
is(10));
+ }
+
+ @Test
+ void
assertTransformPipelineDataSourceConfigurationWithSameStandardDataSource() {
+ mockDatabaseTypeFactory();
+ StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(10,
20));
+
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(
+ "foo_job", "ds_0", pipelineDataSourceConfig, new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource",
createStandardDataSourceProperties(10, 20))),
+ sameInstance(pipelineDataSourceConfig));
+ }
+
+ @Test
+ void assertTransformPipelineDataSourceConfigurationWithAbsentStorageUnit()
{
+ mockDatabaseTypeFactory();
+ StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(createStandardDataSourceProperties(2,
3));
+
assertThat(PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration("foo_job",
"ds_0", pipelineDataSourceConfig, Collections.emptyMap()),
is(pipelineDataSourceConfig));
+ }
+
+ private void mockDatabaseTypeFactory() {
+ when(DatabaseTypeFactory.get(JDBC_URL)).thenReturn(databaseType);
+
when(DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class,
databaseType)).thenReturn(Optional.empty());
+ }
+
+ private Map<String, Object> createStandardDataSourceProperties(final int
maxPoolSize, final int maximumPoolSize) {
+ Map<String, Object> result = new LinkedHashMap<>(5, 1F);
+ result.put("url", JDBC_URL);
+ result.put("username", USERNAME);
+ result.put("password", PASSWORD);
+ result.put("maxPoolSize", maxPoolSize);
+ result.put("maximumPoolSize", maximumPoolSize);
+ return result;
+ }
+
+ private YamlRootConfiguration createYamlRootConfiguration(final
Map<String, Object> dataSourceProps) {
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setDatabaseName("foo_db");
+ result.setDataSources(Collections.singletonMap("ds_0",
dataSourceProps));
+ return result;
+ }
+
+ private StorageUnit mockStorageUnit() {
+ Map<String, Object> standardProps = new LinkedHashMap<>(2, 1F);
+ standardProps.put("maxPoolSize", 10);
+ standardProps.put("maximumPoolSize", 20);
+ StorageUnit result = mock(StorageUnit.class);
+ DataSourcePoolProperties dataSourcePoolProps =
mock(DataSourcePoolProperties.class);
+ PoolPropertySynonyms poolPropertySynonyms =
mock(PoolPropertySynonyms.class);
+
when(result.getDataSourcePoolProperties()).thenReturn(dataSourcePoolProps);
+
when(dataSourcePoolProps.getPoolPropertySynonyms()).thenReturn(poolPropertySynonyms);
+
when(poolPropertySynonyms.getStandardProperties()).thenReturn(standardProps);
+ return result;
+ }
+
+ private StorageUnit mockStorageUnit(final int maxPoolSize, final int
maximumPoolSize) {
+ StorageUnit result = mock(StorageUnit.class);
+ when(result.getDataSourcePoolProperties())
+ .thenReturn(new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource",
createStandardDataSourceProperties(maxPoolSize, maximumPoolSize)));
+ return result;
+ }
+}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index a3f1df3a6b3..578edf19537 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -27,12 +28,15 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.PipelineRequiredCol
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.executor.DistributedPipelineJobExecutorCallback;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDataSourceConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -40,6 +44,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.
import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
@@ -57,14 +63,33 @@ import java.util.stream.Collectors;
*/
public final class MigrationJobExecutorCallback implements
DistributedPipelineJobExecutorCallback<MigrationJobConfiguration,
MigrationJobItemContext, TransmissionJobItemProgress> {
+ private static final String JOB_TYPE = new MigrationJobType().getType();
+
@Override
public MigrationJobItemContext buildJobItemContext(final
MigrationJobConfiguration jobConfig, final int shardingItem,
final
TransmissionJobItemProgress jobItemProgress, final TransmissionProcessContext
jobProcessContext,
final
PipelineDataSourceManager dataSourceManager) {
+ transformDataSourceConfigurations(jobConfig);
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getProcessConfiguration());
return new MigrationJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext, taskConfig, dataSourceManager);
}
+ private void transformDataSourceConfigurations(final
MigrationJobConfiguration jobConfig) {
+ Map<String, DataSourcePoolProperties> sourceDataSourcePoolProps = new
PipelineDataSourcePersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
JOB_TYPE);
+ Map<String, StorageUnit> targetStorageUnits =
PipelineContextManager.getProxyContext().getStorageUnits(jobConfig.getTargetDatabaseName());
+ for (Entry<String, PipelineDataSourceConfiguration> entry :
jobConfig.getSources().entrySet()) {
+ entry.setValue(transformSourceDataSourceConfiguration(jobConfig,
entry, sourceDataSourcePoolProps));
+ }
+
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
jobConfig.getTarget(), targetStorageUnits);
+ }
+
+ private PipelineDataSourceConfiguration
transformSourceDataSourceConfiguration(final MigrationJobConfiguration
jobConfig, final Entry<String, PipelineDataSourceConfiguration> entry,
+
final Map<String, DataSourcePoolProperties> sourceDataSourcePoolProps) {
+ return sourceDataSourcePoolProps.containsKey(entry.getKey())
+ ?
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
entry.getKey(), entry.getValue(),
sourceDataSourcePoolProps.get(entry.getKey()))
+ : entry.getValue();
+ }
+
private MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
Map<ShardingSphereIdentifier, Collection<String>>
tableAndRequiredColumnsMap = getTableAndRequiredColumnsMap(jobConfig);
IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobDataNodeLine(jobShardingItem));
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 21257453936..3b5cc088160 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
@@ -27,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
@@ -35,9 +37,11 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
@@ -48,7 +52,9 @@ import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDataSourceConfi
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import java.util.LinkedHashMap;
@@ -98,7 +104,8 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
try (
PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
-
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
(ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget());
+ Map<String, StorageUnit> storageUnits =
PipelineContextManager.getProxyContext().getStorageUnits(jobConfig.getTargetDatabaseName());
+ transformDataSourceConfigurations(storageUnits);
if (progressContext.getTableCheckRangePositions().isEmpty()) {
progressContext.getTableCheckRangePositions().addAll(splitCrossTables());
}
@@ -124,6 +131,18 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
return
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry ->
entry.getKey().format(), Entry::getValue));
}
+ private void transformDataSourceConfigurations(final Map<String,
StorageUnit> targetStorageUnits) {
+ Map<String, DataSourcePoolProperties> sourceDataSourcePoolProps =
+ new
PipelineDataSourcePersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
new MigrationJobType().getType());
+ for (Entry<String, PipelineDataSourceConfiguration> entry :
jobConfig.getSources().entrySet()) {
+
entry.setValue(sourceDataSourcePoolProps.containsKey(entry.getKey())
+ ?
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
entry.getKey(), entry.getValue(),
+ sourceDataSourcePoolProps.get(entry.getKey()))
+ : entry.getValue());
+ }
+
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
(ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget(),
targetStorageUnits);
+ }
+
private long getRecordsCount() {
Map<Integer, TransmissionJobItemProgress> jobProgress = new
TransmissionJobManager(new MigrationJobType()).getJobProgress(jobConfig);
return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getInventoryRecordsCount).sum();
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallbackTest.java
b/kernel/data-pipeline/scenario/migration/core/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallbackTest.java
new file mode 100644
index 00000000000..1d826b5cdd4
--- /dev/null
+++
b/kernel/data-pipeline/scenario/migration/core/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallbackTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
+
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class MigrationJobExecutorCallbackTest {
+
+ private static final String DATABASE_NAME = "foo_db";
+
+ private static final String SOURCE_DATA_SOURCE_NAME = "source_ds";
+
+ private static final String TARGET_DATA_SOURCE_NAME = "target_ds";
+
+ private static final String SOURCE_JDBC_URL =
"jdbc:mock://127.0.0.1/source_ds";
+
+ private static final String TARGET_JDBC_URL =
"jdbc:mock://127.0.0.1/target_ds";
+
+ private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
+
+ @Test
+ void assertBuildJobItemContextWithTransformedDataSourceConfigurations() {
+ try (
+ MockedStatic<DatabaseTypeFactory> databaseTypeFactory =
mockStatic(DatabaseTypeFactory.class);
+ MockedStatic<PipelineAPIFactory> pipelineAPIFactory =
mockStatic(PipelineAPIFactory.class);
+ MockedStatic<PipelineContextManager> pipelineContextManager =
mockStatic(PipelineContextManager.class)) {
+ mockDatabaseTypeFactory(databaseTypeFactory);
+ mockGovernanceFacade(pipelineAPIFactory,
createSourceDataSourceYaml(10, 20));
+ mockProxyContext(pipelineContextManager);
+ MigrationJobItemContext actual = new
MigrationJobExecutorCallback().buildJobItemContext(createJobConfiguration(), 0,
null, mockProcessContext(),
+ mock(PipelineDataSourceManager.class));
+ MigrationTaskConfiguration actualTaskConfig =
actual.getTaskConfig();
+
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration)
actualTaskConfig.getDumperContext().getCommonContext().getDataSourceConfig()),
is("10"));
+ ImporterConfiguration actualImporterConfig =
actualTaskConfig.getImporterConfig();
+
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
actualImporterConfig.getDataSourceConfig()), is("30"));
+ CreateTableConfiguration actualCreateTableConfig =
actualTaskConfig.getCreateTableConfigurations().iterator().next();
+
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration)
actualCreateTableConfig.getSourceDataSourceConfig()), is("10"));
+
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
actualCreateTableConfig.getTargetDataSourceConfig()), is("30"));
+ }
+ }
+
+ @Test
+ void assertBuildJobItemContextWithoutMigrationSourceStorageUnit() {
+ try (
+ MockedStatic<DatabaseTypeFactory> databaseTypeFactory =
mockStatic(DatabaseTypeFactory.class);
+ MockedStatic<PipelineAPIFactory> pipelineAPIFactory =
mockStatic(PipelineAPIFactory.class);
+ MockedStatic<PipelineContextManager> pipelineContextManager =
mockStatic(PipelineContextManager.class)) {
+ mockDatabaseTypeFactory(databaseTypeFactory);
+ mockGovernanceFacade(pipelineAPIFactory, "");
+ Map<String, StorageUnit> storageUnits = new LinkedHashMap<>(2, 1F);
+ storageUnits.put(SOURCE_DATA_SOURCE_NAME, mock(StorageUnit.class));
+ storageUnits.put(TARGET_DATA_SOURCE_NAME, mockStorageUnit(30, 40));
+ mockProxyContext(pipelineContextManager, storageUnits);
+ MigrationJobItemContext actual = new
MigrationJobExecutorCallback().buildJobItemContext(createJobConfiguration(), 0,
null, mockProcessContext(),
+ mock(PipelineDataSourceManager.class));
+ MigrationTaskConfiguration actualTaskConfig =
actual.getTaskConfig();
+
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration)
actualTaskConfig.getDumperContext().getCommonContext().getDataSourceConfig()),
is("2"));
+ ImporterConfiguration actualImporterConfig =
actualTaskConfig.getImporterConfig();
+
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
actualImporterConfig.getDataSourceConfig()), is("30"));
+ CreateTableConfiguration actualCreateTableConfig =
actualTaskConfig.getCreateTableConfigurations().iterator().next();
+
assertThat(getMaxPoolSize((StandardPipelineDataSourceConfiguration)
actualCreateTableConfig.getSourceDataSourceConfig()), is("2"));
+
assertThat(getRootDataSourceMaxPoolSize((ShardingSpherePipelineDataSourceConfiguration)
actualCreateTableConfig.getTargetDataSourceConfig()), is("30"));
+ }
+ }
+
+ private void mockDatabaseTypeFactory(final
MockedStatic<DatabaseTypeFactory> databaseTypeFactory) {
+ databaseTypeFactory.when(() ->
DatabaseTypeFactory.get(anyString())).thenReturn(databaseType);
+ }
+
+ private void mockGovernanceFacade(final MockedStatic<PipelineAPIFactory>
pipelineAPIFactory, final String dataSourcesYaml) {
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class, RETURNS_DEEP_STUBS);
+ when(governanceFacade.getMetaDataFacade().getDataSource().load(new
MigrationJobType().getType())).thenReturn(dataSourcesYaml);
+ pipelineAPIFactory.when(() ->
PipelineAPIFactory.getPipelineGovernanceFacade(any(PipelineContextKey.class))).thenReturn(governanceFacade);
+ }
+
+ private String createSourceDataSourceYaml(final int maxPoolSize, final int
maximumPoolSize) {
+ return SOURCE_DATA_SOURCE_NAME + ":\n"
+ + " dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n"
+ + " url: " + SOURCE_JDBC_URL + "\n"
+ + " username: root\n"
+ + " password: root\n"
+ + " maxPoolSize: " + maxPoolSize + "\n"
+ + " maximumPoolSize: " + maximumPoolSize + "\n";
+ }
+
+ private void mockProxyContext(final MockedStatic<PipelineContextManager>
pipelineContextManager) {
+ mockProxyContext(pipelineContextManager,
Collections.singletonMap(TARGET_DATA_SOURCE_NAME, mockStorageUnit(30, 40)));
+ }
+
+ private void mockProxyContext(final MockedStatic<PipelineContextManager>
pipelineContextManager, final Map<String, StorageUnit> storageUnits) {
+ ContextManager contextManager = mock(ContextManager.class);
+
when(contextManager.getStorageUnits(DATABASE_NAME)).thenReturn(storageUnits);
+
pipelineContextManager.when(PipelineContextManager::getProxyContext).thenReturn(contextManager);
+ }
+
+ private StorageUnit mockStorageUnit(final int maxPoolSize, final int
maximumPoolSize) {
+ StorageUnit result = mock(StorageUnit.class);
+ when(result.getDataSourcePoolProperties())
+ .thenReturn(new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource",
createDataSourceProperties(TARGET_JDBC_URL, maxPoolSize, maximumPoolSize)));
+ return result;
+ }
+
+ private MigrationJobConfiguration createJobConfiguration() {
+ Map<String, PipelineDataSourceConfiguration> sources = new
LinkedHashMap<>(1, 1F);
+ sources.put(SOURCE_DATA_SOURCE_NAME, new
StandardPipelineDataSourceConfiguration(createDataSourceProperties(SOURCE_JDBC_URL,
2, 3)));
+ return new MigrationJobConfiguration(createJobId(), DATABASE_NAME,
databaseType, databaseType, sources,
+ new
ShardingSpherePipelineDataSourceConfiguration(createRootConfiguration(4, 5)),
Collections.singletonList("t_order"),
+ Collections.singletonMap("t_order", "foo_schema"),
createJobDataNodeLine(), Collections.singletonList(createJobDataNodeLine()), 1,
3);
+ }
+
+ private String createJobId() {
+ return PipelineJobIdUtils.marshal(new MigrationJobId(new
PipelineContextKey(InstanceType.PROXY),
Collections.singletonList(createJobDataNodeLine().marshal())));
+ }
+
+ private JobDataNodeLine createJobDataNodeLine() {
+ return new JobDataNodeLine(Collections.singletonList(new
JobDataNodeEntry("t_order", Collections.singletonList(new
DataNode(SOURCE_DATA_SOURCE_NAME + ".t_order")))));
+ }
+
+ private YamlRootConfiguration createRootConfiguration(final int
maxPoolSize, final int maximumPoolSize) {
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setDatabaseName(DATABASE_NAME);
+
result.setDataSources(Collections.singletonMap(TARGET_DATA_SOURCE_NAME,
createDataSourceProperties(TARGET_JDBC_URL, maxPoolSize, maximumPoolSize)));
+ result.setRules(Collections.emptyList());
+ return result;
+ }
+
+ private Map<String, Object> createDataSourceProperties(final String
jdbcUrl, final int maxPoolSize, final int maximumPoolSize) {
+ Map<String, Object> result = new LinkedHashMap<>(5, 1F);
+ result.put("url", jdbcUrl);
+ result.put("username", "root");
+ result.put("password", "root");
+ result.put("maxPoolSize", maxPoolSize);
+ result.put("maximumPoolSize", maximumPoolSize);
+ return result;
+ }
+
+ private TransmissionProcessContext mockProcessContext() {
+ TransmissionProcessContext result =
mock(TransmissionProcessContext.class);
+ when(result.getProcessConfiguration()).thenReturn(new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 10, 10, null),
+ new PipelineWriteConfiguration(1, 50, null), null));
+ return result;
+ }
+
+ private String getMaxPoolSize(final
StandardPipelineDataSourceConfiguration dataSourceConfig) {
+ DataSourcePoolProperties poolProps = (DataSourcePoolProperties)
dataSourceConfig.getDataSourceConfiguration();
+ return
String.valueOf(poolProps.getPoolPropertySynonyms().getStandardProperties().get("maxPoolSize"));
+ }
+
+ private String getRootDataSourceMaxPoolSize(final
ShardingSpherePipelineDataSourceConfiguration dataSourceConfig) {
+ Collection<Map<String, Object>> dataSources =
dataSourceConfig.getRootConfig().getDataSources().values();
+ return
String.valueOf(dataSources.iterator().next().get("maxPoolSize"));
+ }
+}