This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a9f02144476 Add migration source resource implementation (#20296)
a9f02144476 is described below

commit a9f02144476c97f6f2cf7fd84f980ae5f151002f
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 19 15:20:28 2022 +0800

    Add migration source resource implementation (#20296)
    
    * Add migration source resource implementation
    
    * Fix codestyle
---
 .../update/AddMigrationSourceResourceUpdater.java  | 29 +++++++++-
 .../converter}/ResourceSegmentsConverter.java      |  5 +-
 .../converter}/ResourceSegmentsConverterTest.java  |  2 +-
 .../data/pipeline/api/MigrationJobPublicAPI.java   |  8 +++
 .../pipeline/core/api/GovernanceRepositoryAPI.java | 17 ++++++
 .../pipeline/core/api/PipelineResourceAPI.java     | 45 ++++++++++++++++
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 11 ++++
 .../core/api/impl/PipelineResourceAPIImpl.java     | 61 ++++++++++++++++++++++
 .../AddMigrationSourceResourceException.java       | 34 ++++++++++++
 .../scenario/migration/MigrationJobAPIImpl.java    | 25 +++++++++
 .../core/fixture/MigrationJobAPIFixture.java       |  5 ++
 .../rdl/resource/AddResourceBackendHandler.java    |  1 +
 .../rdl/resource/AlterResourceBackendHandler.java  |  1 +
 .../core/api/impl/MigrationJobAPIImplTest.java     | 18 +++++++
 14 files changed, 257 insertions(+), 5 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
index 4a4c1419ada..26028d11fa8 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/AddMigrationSourceResourceUpdater.java
@@ -20,8 +20,22 @@ package 
org.apache.shardingsphere.migration.distsql.handler.update;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
+import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
+import 
org.apache.shardingsphere.distsql.parser.segment.HostnameAndPortBasedDataSourceSegment;
+import 
org.apache.shardingsphere.distsql.parser.segment.URLBasedDataSourceSegment;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesValidator;
+import 
org.apache.shardingsphere.infra.distsql.exception.resource.InvalidResourcesException;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.AddMigrationSourceResourceStatement;
+import 
org.apache.shardingsphere.sharding.distsql.handler.converter.ResourceSegmentsConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Add migration source resource updater.
@@ -33,7 +47,20 @@ public final class AddMigrationSourceResourceUpdater 
implements RALUpdater<AddMi
     
     @Override
     public void executeUpdate(final String databaseName, final 
AddMigrationSourceResourceStatement sqlStatement) {
-        // TODO add migration source resource later
+        List<DataSourceSegment> dataSources = new 
ArrayList<>(sqlStatement.getDataSources());
+        if (dataSources.stream().anyMatch(each -> each instanceof 
HostnameAndPortBasedDataSourceSegment)) {
+            throw new UnsupportedOperationException("Not currently support add 
hostname and port, please use url");
+        }
+        URLBasedDataSourceSegment urlBasedDataSourceSegment = 
(URLBasedDataSourceSegment) dataSources.get(0);
+        DatabaseType databaseType = 
DatabaseTypeEngine.getDatabaseType(urlBasedDataSourceSegment.getUrl());
+        Map<String, DataSourceProperties> sourcePropertiesMap = 
ResourceSegmentsConverter.convert(databaseType, dataSources);
+        DataSourcePropertiesValidator validator = new 
DataSourcePropertiesValidator();
+        try {
+            validator.validate(sourcePropertiesMap, databaseType);
+        } catch (final InvalidResourcesException ex) {
+            throw new AddMigrationSourceResourceException(ex);
+        }
+        JOB_API.addMigrationSourceResources(sourcePropertiesMap);
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverter.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverter.java
similarity index 94%
rename from 
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverter.java
rename to 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverter.java
index 7b1f5098ecc..2d0c86b3f2d 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverter.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverter.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.resource;
+package org.apache.shardingsphere.sharding.distsql.handler.converter;
 
-import com.zaxxer.hikari.HikariDataSource;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
@@ -46,7 +45,7 @@ public final class ResourceSegmentsConverter {
     public static Map<String, DataSourceProperties> convert(final DatabaseType 
databaseType, final Collection<DataSourceSegment> resources) {
         Map<String, DataSourceProperties> result = new 
LinkedHashMap<>(resources.size(), 1);
         for (DataSourceSegment each : resources) {
-            result.put(each.getName(), new 
DataSourceProperties(HikariDataSource.class.getName(), 
createProperties(databaseType, each)));
+            result.put(each.getName(), new 
DataSourceProperties("com.zaxxer.hikari.HikariDataSource", 
createProperties(databaseType, each)));
         }
         return result;
     }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverterTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverterTest.java
similarity index 97%
rename from 
shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverterTest.java
rename to 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverterTest.java
index b2caee1c152..5601b912e2c 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/ResourceSegmentsConverterTest.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/sharding/distsql/handler/converter/ResourceSegmentsConverterTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.resource;
+package org.apache.shardingsphere.sharding.distsql.handler.converter;
 
 import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
 import 
org.apache.shardingsphere.distsql.parser.segment.HostnameAndPortBasedDataSourceSegment;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index 99851859fe4..7787f4d31b3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
@@ -111,4 +112,11 @@ public interface MigrationJobPublicAPI extends 
PipelineJobPublicAPI, RequiredSPI
      * @param jobId job id
      */
     void reset(String jobId);
+    
+    /**
+     * Update migration source resource.
+     *
+     * @param sourcePropertiesMap source properties map
+     */
+    void addMigrationSourceResources(Map<String, DataSourceProperties> 
sourcePropertiesMap);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index fb09b8cc50b..a1181ada319 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api;
 
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 import java.util.List;
@@ -107,4 +108,20 @@ public interface GovernanceRepositoryAPI {
      * @return sharding items
      */
     List<Integer> getShardingItems(String jobId);
+    
+    /**
+     * Get migration source data source.
+     *
+     * @param jobType job type
+     * @return migration source data source
+     */
+    String getMetaDataDataSource(JobType jobType);
+    
+    /**
+     * Persist meta data data source.
+     *
+     * @param jobType job type
+     * @param metaDataDataSource meta data data source
+     */
+    void persistMetaDataDataSource(JobType jobType, String metaDataDataSource);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
new file mode 100644
index 00000000000..81d95798cc1
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+
+import java.util.Map;
+
+/**
+ * Pipeline resource API.
+ */
+public interface PipelineResourceAPI {
+    
+    /**
+     * Get meta data data source.
+     *
+     * @param jobType job type
+     * @return meta data data source
+     */
+    Map<String, DataSourceProperties> getMetaDataDataSource(JobType jobType);
+    
+    /**
+     * Persist meta data data source.
+     *
+     * @param jobType job type
+     * @param dataSource data source
+     */
+    void persistMetaDataDataSource(JobType jobType, Map<String, 
DataSourceProperties> dataSource);
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 62af844a34a..09ed414e51a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -92,4 +93,14 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
         return 
result.stream().map(Integer::parseInt).collect(Collectors.toList());
     }
+    
+    @Override
+    public String getMetaDataDataSource(final JobType jobType) {
+        return 
repository.get(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
+    }
+    
+    @Override
+    public void persistMetaDataDataSource(final JobType jobType, final String 
metaDataDataSource) {
+        
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType), 
metaDataDataSource);
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
new file mode 100644
index 00000000000..00bc24ecbef
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Pipeline resource API implementation.
+ */
+public final class PipelineResourceAPIImpl implements PipelineResourceAPI {
+    
+    private static final YamlDataSourceConfigurationSwapper 
DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
+    
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<String, DataSourceProperties> getMetaDataDataSource(final 
JobType jobType) {
+        String dataSourcesProperties = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataDataSource(jobType);
+        if (StringUtils.isBlank(dataSourcesProperties)) {
+            return Collections.emptyMap();
+        }
+        Map<String, Map<String, Object>> yamlDataSources = 
YamlEngine.unmarshal(dataSourcesProperties, Map.class);
+        Map<String, DataSourceProperties> result = new 
LinkedHashMap<>(yamlDataSources.size());
+        yamlDataSources.forEach((key, value) -> result.put(key, 
DATA_SOURCE_CONFIG_SWAPPER.swapToDataSourceProperties(value)));
+        return result;
+    }
+    
+    @Override
+    public void persistMetaDataDataSource(final JobType jobType, final 
Map<String, DataSourceProperties> dataSourceConfigs) {
+        Map<String, Map<String, Object>> dataSourceMap = new 
LinkedHashMap<>(dataSourceConfigs.size());
+        for (Entry<String, DataSourceProperties> entry : 
dataSourceConfigs.entrySet()) {
+            dataSourceMap.put(entry.getKey(), 
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(entry.getValue()));
+        }
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSource(jobType,
 YamlEngine.marshal(dataSourceMap));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/AddMigrationSourceResourceException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/AddMigrationSourceResourceException.java
new file mode 100644
index 00000000000..52240324089
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/AddMigrationSourceResourceException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception;
+
+/**
+ * Add migration source resource exception.
+ */
+public final class AddMigrationSourceResourceException extends 
RuntimeException {
+    
+    private static final long serialVersionUID = 1L;
+    
+    public AddMigrationSourceResourceException(final String message) {
+        super(message);
+    }
+    
+    public AddMigrationSourceResourceException(final Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 6e5ff4de841..5a7ca4c97c1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -40,11 +40,14 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineResourceAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -55,6 +58,7 @@ import 
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
@@ -63,6 +67,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -81,6 +86,8 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     private final PipelineJobItemAPI jobItemAPI = new 
InventoryIncrementalJobItemAPIImpl();
     
+    private final PipelineResourceAPI pipelineResourceAPI = new 
PipelineResourceAPIImpl();
+    
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
         MigrationJobId jobId = (MigrationJobId) pipelineJobId;
@@ -390,6 +397,24 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         verifyJobStopped(jobConfigPOJO);
     }
     
+    @Override
+    public void addMigrationSourceResources(final Map<String, 
DataSourceProperties> dataSourceProperties) {
+        log.info("Add migration source resources {}", 
dataSourceProperties.keySet());
+        Map<String, DataSourceProperties> existDataSources = 
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+        Collection<String> duplicateDataSourceNames = new 
HashSet<>(dataSourceProperties.size(), 1);
+        for (Entry<String, DataSourceProperties> entry : 
dataSourceProperties.entrySet()) {
+            if (existDataSources.containsKey(entry.getKey())) {
+                duplicateDataSourceNames.add(entry.getKey());
+            }
+        }
+        if (!duplicateDataSourceNames.isEmpty()) {
+            throw new 
AddMigrationSourceResourceException(String.format("Duplicate resource names 
%s.", duplicateDataSourceNames));
+        }
+        Map<String, DataSourceProperties> result = new 
LinkedHashMap<>(existDataSources);
+        result.putAll(dataSourceProperties);
+        pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION, 
result);
+    }
+    
     @Override
     public String getType() {
         return JobType.MIGRATION.getTypeName();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 599721022a6..da7a60324d1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 
 import java.util.Collection;
 import java.util.List;
@@ -143,6 +144,10 @@ public final class MigrationJobAPIFixture implements 
MigrationJobAPI {
     public void reset(final String jobId) {
     }
     
+    @Override
+    public void addMigrationSourceResources(final Map<String, 
DataSourceProperties> sourcePropertiesMap) {
+    }
+    
     @Override
     public MigrationJobConfiguration getJobConfiguration(final String jobId) {
         return null;
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
index 69ad19c4d1d..e2642ca415d 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AddResourceBackendHandler.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.handler.DatabaseRequiredBackendHandler;
+import 
org.apache.shardingsphere.sharding.distsql.handler.converter.ResourceSegmentsConverter;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
index 24014730d3f..85943f2fbd0 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/resource/AlterResourceBackendHandler.java
@@ -38,6 +38,7 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.handler.DatabaseRequiredBackendHandler;
+import 
org.apache.shardingsphere.sharding.distsql.handler.converter.ResourceSegmentsConverter;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 6466aaba35a..16dad9490b5 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -26,10 +26,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfig
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -38,6 +40,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -47,6 +50,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -264,4 +268,18 @@ public final class MigrationJobAPIImplTest {
         assertNotNull(actual);
         assertThat(actual.getStatus(), is(JobStatus.FINISHED));
     }
+    
+    @Test
+    public void assertAddMigrationSourceResources() {
+        Map<String, Object> props = new HashMap<>();
+        props.put("jdbcUrl", "jdbc:mysql://localhost:3306/test");
+        props.put("username", "root");
+        props.put("password", "root");
+        Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
+        expect.put("ds_0", new 
DataSourceProperties("com.zaxxer.hikari.HikariDataSource", props));
+        jobAPI.addMigrationSourceResources(expect);
+        PipelineResourceAPI pipelineResourceAPI = new 
PipelineResourceAPIImpl();
+        Map<String, DataSourceProperties> actual = 
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+        assertTrue(actual.containsKey("ds_0"));
+    }
 }

Reply via email to