This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a9f02144476 Add migration source resource implementation (#20296)
a9f02144476 is described below
commit a9f02144476c97f6f2cf7fd84f980ae5f151002f
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 19 15:20:28 2022 +0800
Add migration source resource implementation (#20296)
* Add migration source resource implementation
* Fix codestyle
---
.../update/AddMigrationSourceResourceUpdater.java | 29 +++++++++-
.../converter}/ResourceSegmentsConverter.java | 5 +-
.../converter}/ResourceSegmentsConverterTest.java | 2 +-
.../data/pipeline/api/MigrationJobPublicAPI.java | 8 +++
.../pipeline/core/api/GovernanceRepositoryAPI.java | 17 ++++++
.../pipeline/core/api/PipelineResourceAPI.java | 45 ++++++++++++++++
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 11 ++++
.../core/api/impl/PipelineResourceAPIImpl.java | 61 ++++++++++++++++++++++
.../AddMigrationSourceResourceException.java | 34 ++++++++++++
.../scenario/migration/MigrationJobAPIImpl.java | 25 +++++++++
.../core/fixture/MigrationJobAPIFixture.java | 5 ++
.../rdl/resource/AddResourceBackendHandler.java | 1 +
.../rdl/resource/AlterResourceBackendHandler.java | 1 +
.../core/api/impl/MigrationJobAPIImplTest.java | 18 +++++++
14 files changed, 257 insertions(+), 5 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
index 4a4c1419ada..26028d11fa8 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
@@ -20,8 +20,22 @@ package
org.apache.shardingsphere.migration.distsql.handler.update;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
+import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
+import
org.apache.shardingsphere.distsql.parser.segment.HostnameAndPortBasedDataSourceSegment;
+import
org.apache.shardingsphere.distsql.parser.segment.URLBasedDataSourceSegment;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesValidator;
+import
org.apache.shardingsphere.infra.distsql.exception.resource.InvalidResourcesException;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.AddMigrationSourceResourceStatement;
+import
org.apache.shardingsphere.sharding.distsql.handler.converter.ResourceSegmentsConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
/**
* Add migration source resource updater.
@@ -33,7 +47,20 @@ public final class AddMigrationSourceResourceUpdater
implements RALUpdater<AddMi
@Override
public void executeUpdate(final String databaseName, final
AddMigrationSourceResourceStatement sqlStatement) {
- // TODO add migration source resource later
+ List<DataSourceSegment> dataSources = new
ArrayList<>(sqlStatement.getDataSources());
+ if (dataSources.stream().anyMatch(each -> each instanceof
HostnameAndPortBasedDataSourceSegment)) {
+ throw new UnsupportedOperationException("Not currently support add
hostname and port, please use url");
+ }
+ URLBasedDataSourceSegment urlBasedDataSourceSegment =
(URLBasedDataSourceSegment) dataSources.get(0);
+ DatabaseType databaseType =
DatabaseTypeEngine.getDatabaseType(urlBasedDataSourceSegment.getUrl());
+ Map<String, DataSourceProperties> sourcePropertiesMap =
ResourceSegmentsConverter.convert(databaseType, dataSources);
+ DataSourcePropertiesValidator validator = new
DataSourcePropertiesValidator();
+ try {
+ validator.validate(sourcePropertiesMap, databaseType);
+ } catch (final InvalidResourcesException ex) {
+ throw new AddMigrationSourceResourceException(ex);
+ }
+ JOB_API.addMigrationSourceResources(sourcePropertiesMap);
}
@Override
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverter.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverter.java
similarity index 94%
rename from
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverter.java
rename to
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverter.java
index 7b1f5098ecc..2d0c86b3f2d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverter.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverter.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.resource;
+package org.apache.shardingsphere.sharding.distsql.handler.converter;
-import com.zaxxer.hikari.HikariDataSource;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
@@ -46,7 +45,7 @@ public final class ResourceSegmentsConverter {
public static Map<String, DataSourceProperties> convert(final DatabaseType
databaseType, final Collection<DataSourceSegment> resources) {
Map<String, DataSourceProperties> result = new
LinkedHashMap<>(resources.size(), 1);
for (DataSourceSegment each : resources) {
- result.put(each.getName(), new
DataSourceProperties(HikariDataSource.class.getName(),
createProperties(databaseType, each)));
+ result.put(each.getName(), new
DataSourceProperties("com.zaxxer.hikari.HikariDataSource",
createProperties(databaseType, each)));
}
return result;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverterTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverterTest.java
similarity index 97%
rename from
shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverterTest.java
rename to
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverterTest.java
index b2caee1c152..5601b912e2c 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverterTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverterTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.resource;
+package org.apache.shardingsphere.sharding.distsql.handler.converter;
import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
import
org.apache.shardingsphere.distsql.parser.segment.HostnameAndPortBasedDataSourceSegment;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index 99851859fe4..7787f4d31b3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -111,4 +112,11 @@ public interface MigrationJobPublicAPI extends
PipelineJobPublicAPI, RequiredSPI
* @param jobId job id
*/
void reset(String jobId);
+
+ /**
+ * Update migration source resource.
+ *
+ * @param sourcePropertiesMap source properties map
+ */
+ void addMigrationSourceResources(Map<String, DataSourceProperties>
sourcePropertiesMap);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index fb09b8cc50b..a1181ada319 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.api;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import java.util.List;
@@ -107,4 +108,20 @@ public interface GovernanceRepositoryAPI {
* @return sharding items
*/
List<Integer> getShardingItems(String jobId);
+
+ /**
+ * Get migration source data source.
+ *
+ * @param jobType job type
+ * @return migration source data source
+ */
+ String getMetaDataDataSource(JobType jobType);
+
+ /**
+ * Persist meta data data source.
+ *
+ * @param jobType job type
+ * @param metaDataDataSource meta data data source
+ */
+ void persistMetaDataDataSource(JobType jobType, String metaDataDataSource);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
new file mode 100644
index 00000000000..81d95798cc1
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+
+import java.util.Map;
+
+/**
+ * Pipeline resource API.
+ */
+public interface PipelineResourceAPI {
+
+ /**
+ * Get meta data data source.
+ *
+ * @param jobType job type
+ * @return meta data data source
+ */
+ Map<String, DataSourceProperties> getMetaDataDataSource(JobType jobType);
+
+ /**
+ * Persist meta data data source.
+ *
+ * @param jobType job type
+ * @param dataSource data source
+ */
+ void persistMetaDataDataSource(JobType jobType, Map<String,
DataSourceProperties> dataSource);
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 62af844a34a..09ed414e51a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -92,4 +93,14 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
return
result.stream().map(Integer::parseInt).collect(Collectors.toList());
}
+
+ @Override
+ public String getMetaDataDataSource(final JobType jobType) {
+ return
repository.get(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
+ }
+
+ @Override
+ public void persistMetaDataDataSource(final JobType jobType, final String
metaDataDataSource) {
+
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType),
metaDataDataSource);
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
new file mode 100644
index 00000000000..00bc24ecbef
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Pipeline resource API implementation.
+ */
+public final class PipelineResourceAPIImpl implements PipelineResourceAPI {
+
+ private static final YamlDataSourceConfigurationSwapper
DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<String, DataSourceProperties> getMetaDataDataSource(final
JobType jobType) {
+ String dataSourcesProperties =
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataDataSource(jobType);
+ if (StringUtils.isBlank(dataSourcesProperties)) {
+ return Collections.emptyMap();
+ }
+ Map<String, Map<String, Object>> yamlDataSources =
YamlEngine.unmarshal(dataSourcesProperties, Map.class);
+ Map<String, DataSourceProperties> result = new
LinkedHashMap<>(yamlDataSources.size());
+ yamlDataSources.forEach((key, value) -> result.put(key,
DATA_SOURCE_CONFIG_SWAPPER.swapToDataSourceProperties(value)));
+ return result;
+ }
+
+ @Override
+ public void persistMetaDataDataSource(final JobType jobType, final
Map<String, DataSourceProperties> dataSourceConfigs) {
+ Map<String, Map<String, Object>> dataSourceMap = new
LinkedHashMap<>(dataSourceConfigs.size());
+ for (Entry<String, DataSourceProperties> entry :
dataSourceConfigs.entrySet()) {
+ dataSourceMap.put(entry.getKey(),
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(entry.getValue()));
+ }
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSource(jobType,
YamlEngine.marshal(dataSourceMap));
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/AddMigrationSourceResourceException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/AddMigrationSourceResourceException.java
new file mode 100644
index 00000000000..52240324089
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/AddMigrationSourceResourceException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception;
+
+/**
+ * Add migration source resource exception.
+ */
+public final class AddMigrationSourceResourceException extends
RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public AddMigrationSourceResourceException(final String message) {
+ super(message);
+ }
+
+ public AddMigrationSourceResourceException(final Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 6e5ff4de841..5a7ca4c97c1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -40,11 +40,14 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineResourceAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -55,6 +58,7 @@ import
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
@@ -63,6 +67,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -81,6 +86,8 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
private final PipelineJobItemAPI jobItemAPI = new
InventoryIncrementalJobItemAPIImpl();
+ private final PipelineResourceAPI pipelineResourceAPI = new
PipelineResourceAPIImpl();
+
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
MigrationJobId jobId = (MigrationJobId) pipelineJobId;
@@ -390,6 +397,24 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
verifyJobStopped(jobConfigPOJO);
}
+ @Override
+ public void addMigrationSourceResources(final Map<String,
DataSourceProperties> dataSourceProperties) {
+ log.info("Add migration source resources {}",
dataSourceProperties.keySet());
+ Map<String, DataSourceProperties> existDataSources =
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+ Collection<String> duplicateDataSourceNames = new
HashSet<>(dataSourceProperties.size(), 1);
+ for (Entry<String, DataSourceProperties> entry :
dataSourceProperties.entrySet()) {
+ if (existDataSources.containsKey(entry.getKey())) {
+ duplicateDataSourceNames.add(entry.getKey());
+ }
+ }
+ if (!duplicateDataSourceNames.isEmpty()) {
+ throw new
AddMigrationSourceResourceException(String.format("Duplicate resource names
%s.", duplicateDataSourceNames));
+ }
+ Map<String, DataSourceProperties> result = new
LinkedHashMap<>(existDataSources);
+ result.putAll(dataSourceProperties);
+ pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION,
result);
+ }
+
@Override
public String getType() {
return JobType.MIGRATION.getTypeName();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 599721022a6..da7a60324d1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import java.util.Collection;
import java.util.List;
@@ -143,6 +144,10 @@ public final class MigrationJobAPIFixture implements
MigrationJobAPI {
public void reset(final String jobId) {
}
+ @Override
+ public void addMigrationSourceResources(final Map<String,
DataSourceProperties> sourcePropertiesMap) {
+ }
+
@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return null;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
index 69ad19c4d1d..e2642ca415d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.handler.DatabaseRequiredBackendHandler;
+import
org.apache.shardingsphere.sharding.distsql.handler.converter.ResourceSegmentsConverter;
import java.sql.SQLException;
import java.util.ArrayList;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
index 24014730d3f..85943f2fbd0 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
@@ -38,6 +38,7 @@ import
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.handler.DatabaseRequiredBackendHandler;
+import
org.apache.shardingsphere.sharding.distsql.handler.converter.ResourceSegmentsConverter;
import javax.sql.DataSource;
import java.sql.SQLException;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 6466aaba35a..16dad9490b5 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -26,10 +26,12 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfig
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -38,6 +40,7 @@ import
org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -47,6 +50,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -264,4 +268,18 @@ public final class MigrationJobAPIImplTest {
assertNotNull(actual);
assertThat(actual.getStatus(), is(JobStatus.FINISHED));
}
+
+ @Test
+ public void assertAddMigrationSourceResources() {
+ Map<String, Object> props = new HashMap<>();
+ props.put("jdbcUrl", "jdbc:mysql://localhost:3306/test");
+ props.put("username", "root");
+ props.put("password", "root");
+ Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
+ expect.put("ds_0", new
DataSourceProperties("com.zaxxer.hikari.HikariDataSource", props));
+ jobAPI.addMigrationSourceResources(expect);
+ PipelineResourceAPI pipelineResourceAPI = new
PipelineResourceAPIImpl();
+ Map<String, DataSourceProperties> actual =
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+ assertTrue(actual.containsKey("ds_0"));
+ }
}