This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b15d531953 Improve DDL genereate method at migration prepare (#20245)
3b15d531953 is described below

commit 3b15d53195316122e703b98c0476a4863326732f
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Aug 18 12:48:04 2022 +0800

    Improve DDL genereate method at migration prepare (#20245)
    
    * Revise #20207
    
    * Improve parameter name
    
    * Fix codestyle
---
 .../metadata/generator/PipelineDDLGenerator.java   | 61 +++++++++++-----------
 .../datasource/AbstractDataSourcePreparer.java     | 33 +++++++-----
 .../datasource/PrepareTargetTablesParameter.java   | 26 ++++++---
 .../scenario/migration/MigrationJobPreparer.java   | 32 +++++++++++-
 .../datasource/MySQLDataSourcePreparer.java        | 31 +----------
 .../datasource/OpenGaussDataSourcePreparer.java    | 26 +--------
 .../datasource/PostgreSQLDataSourcePreparer.java   | 27 +---------
 7 files changed, 106 insertions(+), 130 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index ca07db51c9a..812dc1bbced 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -17,10 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
 
-import javax.sql.DataSource;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGeneratorFactory;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
 import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
@@ -33,11 +33,11 @@ import 
org.apache.shardingsphere.infra.binder.type.ConstraintAvailable;
 import org.apache.shardingsphere.infra.binder.type.IndexAvailable;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtil;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
+import 
org.apache.shardingsphere.infra.util.spi.exception.ServiceProviderNotFoundException;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.SQLSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.constraint.ConstraintSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
@@ -45,6 +45,8 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.Sim
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
+import javax.sql.DataSource;
+import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
@@ -69,8 +71,8 @@ public final class PipelineDDLGenerator {
     /**
      * Generate logic ddl sql.
      *
-     * @param database database
-     * @param dataSourceName data source name
+     * @param sourceDataSource source data source
+     * @param databaseType database type
      * @param schemaName schema name
      * @param logicTableName table name
      * @param actualTableName actual table name
@@ -78,16 +80,13 @@ public final class PipelineDDLGenerator {
      * @return ddl SQL
      */
     @SneakyThrows
-    public String generateLogicDDLSQL(final ShardingSphereDatabase database, 
final String dataSourceName, final String schemaName,
-                                      final String logicTableName, final 
String actualTableName,
+    public String generateLogicDDLSQL(final DataSource sourceDataSource, final 
DatabaseType databaseType, final String schemaName, final String 
logicTableName, final String actualTableName,
                                       final ShardingSphereSQLParserEngine 
parserEngine) {
-        DatabaseType databaseType = database.getProtocolType();
-        log.info("generateLogicDDLSQL, databaseType={}, databaseName={}, 
schemaName={}, tableName={}, dataSourceNames={}",
-                databaseType.getType(), database.getName(), schemaName, 
logicTableName, database.getResource().getDataSources().keySet());
-        Collection<String> multiSQL = generateActualDDLSQL(databaseType, 
schemaName, actualTableName, 
database.getResource().getDataSources().get(dataSourceName));
+        log.info("generateLogicDDLSQL, databaseType={},  schemaName={}, 
tableName={}", databaseType.getType(), schemaName, logicTableName);
+        Collection<String> multiSQL = generateActualDDLSQL(databaseType, 
schemaName, actualTableName, sourceDataSource);
         StringBuilder result = new StringBuilder();
         for (String each : multiSQL) {
-            Optional<String> logicSQL = decorate(databaseType, 
database.getName(), schemaName, database, each, parserEngine);
+            Optional<String> logicSQL = decorate(databaseType, schemaName, 
sourceDataSource, each, logicTableName, parserEngine);
             logicSQL.ifPresent(ddlSQL -> 
result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
         }
         return result.toString();
@@ -118,12 +117,16 @@ public final class PipelineDDLGenerator {
         return sql;
     }
     
-    private Optional<String> decorate(final DatabaseType databaseType, final 
String databaseName, final String schemaName, final ShardingSphereDatabase 
database, final String sql,
-                                      final ShardingSphereSQLParserEngine 
parserEngine) {
+    private Optional<String> decorate(final DatabaseType databaseType, final 
String schemaName, final DataSource dataSource, final String sql, final String 
logicTableName,
+                                      final ShardingSphereSQLParserEngine 
parserEngine) throws SQLException {
         if (sql.trim().isEmpty()) {
             return Optional.empty();
         }
-        String result = decorateActualSQL(sql.trim(), database, databaseName, 
parserEngine);
+        String databaseName;
+        try (Connection connection = dataSource.getConnection()) {
+            databaseName = connection.getCatalog();
+        }
+        String result = decorateActualSQL(sql.trim(), logicTableName, 
databaseName, parserEngine);
         // TODO remove it after set search_path is supported.
         if ("openGauss".equals(databaseType.getType())) {
             return decorateOpenGauss(databaseName, schemaName, result, 
parserEngine);
@@ -132,39 +135,38 @@ public final class PipelineDDLGenerator {
     }
     
     private Collection<String> generateActualDDLSQL(final DatabaseType 
databaseType, final String schemaName, final String actualTable, final 
DataSource dataSource) throws SQLException {
-        return 
CreateTableSQLGeneratorFactory.findInstance(databaseType).orElseThrow(() -> new 
ShardingSphereException("Failed to get dialect ddl sql generator"))
+        return 
CreateTableSQLGeneratorFactory.findInstance(databaseType).orElseThrow(() -> new 
ServiceProviderNotFoundException(CreateTableSQLGenerator.class, 
databaseType.getType()))
                 .generate(actualTable, schemaName, dataSource);
     }
     
-    private String decorateActualSQL(final String sql, final 
ShardingSphereDatabase database, final String databaseName, final 
ShardingSphereSQLParserEngine parserEngine) {
+    private String decorateActualSQL(final String sql, final String 
logicTableName, final String databaseName, final ShardingSphereSQLParserEngine 
parserEngine) {
         LogicSQL logicSQL = getLogicSQL(sql, databaseName, parserEngine);
         SQLStatementContext<?> sqlStatementContext = 
logicSQL.getSqlStatementContext();
         Map<SQLSegment, String> replaceMap = new 
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
         if (sqlStatementContext instanceof CreateTableStatementContext) {
-            appendFromIndexAndConstraint(replaceMap, database, 
sqlStatementContext);
-            appendFromTable(replaceMap, database, (TableAvailable) 
sqlStatementContext);
+            appendFromIndexAndConstraint(replaceMap, logicTableName, 
sqlStatementContext);
+            appendFromTable(replaceMap, logicTableName, (TableAvailable) 
sqlStatementContext);
         }
         if (sqlStatementContext instanceof CommentStatementContext) {
-            appendFromTable(replaceMap, database, (TableAvailable) 
sqlStatementContext);
+            appendFromTable(replaceMap, logicTableName, (TableAvailable) 
sqlStatementContext);
         }
         if (sqlStatementContext instanceof CreateIndexStatementContext) {
-            appendFromTable(replaceMap, database, (TableAvailable) 
sqlStatementContext);
-            appendFromIndexAndConstraint(replaceMap, database, 
sqlStatementContext);
+            appendFromTable(replaceMap, logicTableName, (TableAvailable) 
sqlStatementContext);
+            appendFromIndexAndConstraint(replaceMap, logicTableName, 
sqlStatementContext);
         }
         if (sqlStatementContext instanceof AlterTableStatementContext) {
-            appendFromIndexAndConstraint(replaceMap, database, 
sqlStatementContext);
-            appendFromTable(replaceMap, database, (TableAvailable) 
sqlStatementContext);
+            appendFromIndexAndConstraint(replaceMap, logicTableName, 
sqlStatementContext);
+            appendFromTable(replaceMap, logicTableName, (TableAvailable) 
sqlStatementContext);
         }
         return doDecorateActualTable(replaceMap, sql);
     }
     
-    private void appendFromIndexAndConstraint(final Map<SQLSegment, String> 
replaceMap, final ShardingSphereDatabase database, final SQLStatementContext<?> 
sqlStatementContext) {
+    private void appendFromIndexAndConstraint(final Map<SQLSegment, String> 
replaceMap, final String logicTableName, final SQLStatementContext<?> 
sqlStatementContext) {
         if (!(sqlStatementContext instanceof TableAvailable) || 
((TableAvailable) 
sqlStatementContext).getTablesContext().getTables().isEmpty()) {
             return;
         }
         TableNameSegment tableNameSegment = ((TableAvailable) 
sqlStatementContext).getTablesContext().getTables().iterator().next().getTableName();
-        String logicTable = findLogicTable(tableNameSegment, database);
-        if (!tableNameSegment.getIdentifier().getValue().equals(logicTable)) {
+        if 
(!tableNameSegment.getIdentifier().getValue().equals(logicTableName)) {
             if (sqlStatementContext instanceof IndexAvailable) {
                 for (IndexSegment each : ((IndexAvailable) 
sqlStatementContext).getIndexes()) {
                     String logicIndexName = 
IndexMetaDataUtil.getLogicIndexName(each.getIndexName().getIdentifier().getValue(),
 tableNameSegment.getIdentifier().getValue());
@@ -180,11 +182,10 @@ public final class PipelineDDLGenerator {
         }
     }
     
-    private void appendFromTable(final Map<SQLSegment, String> replaceMap, 
final ShardingSphereDatabase database, final TableAvailable 
sqlStatementContext) {
+    private void appendFromTable(final Map<SQLSegment, String> replaceMap, 
final String logicTableName, final TableAvailable sqlStatementContext) {
         for (SimpleTableSegment each : sqlStatementContext.getAllTables()) {
-            String logicTable = findLogicTable(each.getTableName(), database);
-            if 
(!logicTable.equals(each.getTableName().getIdentifier().getValue())) {
-                replaceMap.put(each.getTableName(), logicTable);
+            if 
(!logicTableName.equals(each.getTableName().getIdentifier().getValue())) {
+                replaceMap.put(each.getTableName(), logicTableName);
             }
         }
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 15cf5495073..191f9a8bc3f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,27 +17,30 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
-import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.datanode.DataNodes;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 
 /**
  * Abstract data source preparer.
@@ -57,7 +60,7 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         String defaultSchema = 
DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(), 
parameter.getDatabaseName());
         log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}", 
schemaNames, defaultSchema);
         PipelineSQLBuilder pipelineSQLBuilder = 
PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = 
getCachedDataSource(parameter.getDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
             for (String each : schemaNames) {
                 if (each.equalsIgnoreCase(defaultSchema)) {
                     continue;
@@ -91,7 +94,7 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         return 
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
 jobConfig.getSource().getParameter()));
     }
     
-    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final 
PipelineDataSourceConfiguration dataSourceConfig, final 
PipelineDataSourceManager dataSourceManager) {
+    protected final PipelineDataSourceWrapper getCachedDataSource(final 
PipelineDataSourceConfiguration dataSourceConfig, final 
PipelineDataSourceManager dataSourceManager) {
         return dataSourceManager.getDataSource(dataSourceConfig);
     }
     
@@ -116,11 +119,17 @@ public abstract class AbstractDataSourcePreparer 
implements DataSourcePreparer {
         return 
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT 
EXISTS ");
     }
     
-    protected String getActualTable(final ShardingSphereDatabase database, 
final String tableName) {
-        DataNodes dataNodes = new 
DataNodes(database.getRuleMetaData().getRules());
-        Optional<DataNode> filteredDataNode = 
dataNodes.getDataNodes(tableName).stream()
-                .filter(each -> 
database.getResource().getDataSources().containsKey(each.getDataSourceName().contains(".")
 ? each.getDataSourceName().split("\\.")[0] : each.getDataSourceName()))
-                .findFirst();
-        return filteredDataNode.map(DataNode::getTableName).orElse(tableName);
+    protected List<String> listCreateLogicalTableSQL(final 
PrepareTargetTablesParameter parameter) {
+        PipelineDDLGenerator generator = new PipelineDDLGenerator();
+        List<String> result = new LinkedList<>();
+        for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
+            String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
+            String dataSourceName = 
each.getDataNodes().get(0).getDataSourceName();
+            DataSource dataSource = 
parameter.getSourceDataSourceMap().get(dataSourceName);
+            DatabaseType databaseType = 
DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
+            String actualTableName = 
parameter.getTableNameMap().get(each.getLogicTableName());
+            result.add(generator.generateLogicDDLSQL(dataSource, databaseType, 
schemaName, each.getLogicTableName(), actualTableName, 
parameter.getSqlParserEngine()));
+        }
+        return result;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index 9aaf034ba3a..ffda71fcb52 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -23,6 +23,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+
+import javax.sql.DataSource;
+import java.util.Map;
 
 /**
  * Prepare target tables parameter.
@@ -34,19 +38,29 @@ public final class PrepareTargetTablesParameter {
     
     private final JobDataNodeLine tablesFirstDataNodes;
     
-    private final PipelineDataSourceConfiguration dataSourceConfig;
+    private final PipelineDataSourceConfiguration targetDataSourceConfig;
+    
+    private final Map<String, DataSource> sourceDataSourceMap;
     
     private final PipelineDataSourceManager dataSourceManager;
     
+    private final Map<String, String> tableNameMap;
+    
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    public PrepareTargetTablesParameter(@NonNull final String databaseName, 
@NonNull final PipelineDataSourceConfiguration dataSourceConfig,
-                                        @NonNull final 
PipelineDataSourceManager dataSourceManager,
-                                        @NonNull final String 
tablesFirstDataNodes, final TableNameSchemaNameMapping 
tableNameSchemaNameMapping) {
+    private final ShardingSphereSQLParserEngine sqlParserEngine;
+            
+    public PrepareTargetTablesParameter(@NonNull final String databaseName, 
@NonNull final PipelineDataSourceConfiguration targetDataSourceConfig,
+                                        @NonNull final Map<String, DataSource> 
sourceDataSourceMap, @NonNull final PipelineDataSourceManager dataSourceManager,
+                                        @NonNull final JobDataNodeLine 
tablesFirstDataNodes, final Map<String, String> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping,
+                                        @NonNull final 
ShardingSphereSQLParserEngine sqlParserEngine) {
         this.databaseName = databaseName;
-        this.dataSourceConfig = dataSourceConfig;
-        this.tablesFirstDataNodes = 
JobDataNodeLine.unmarshal(tablesFirstDataNodes);
+        this.targetDataSourceConfig = targetDataSourceConfig;
+        this.sourceDataSourceMap = sourceDataSourceMap;
+        this.tablesFirstDataNodes = tablesFirstDataNodes;
         this.dataSourceManager = dataSourceManager;
+        this.tableNameMap = tableNameMap;
         this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
+        this.sqlParserEngine = sqlParserEngine;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index e1fbed498b5..384f9fdd1a0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -40,12 +42,21 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.datanode.DataNodes;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 
 /**
  * Migration job preparer.
@@ -133,9 +144,18 @@ public final class MigrationJobPreparer {
                     
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), 
jobItemContext.getDataSourceManager(), tableNameSchemaNameMapping);
             PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, 
prepareTargetSchemasParameter);
         }
+        ShardingSphereMetaData metaData = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
+        ShardingSphereDatabase sphereDatabase = 
metaData.getDatabases().get(jobConfig.getDatabaseName());
+        ShardingSphereSQLParserEngine sqlParserEngine = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
+        JobDataNodeLine jobDataNodeLine = 
JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
+        Map<String, String> tableNameMap = new HashMap<>();
+        for (JobDataNodeEntry each : jobDataNodeLine.getEntries()) {
+            String actualTableName = getActualTable(sphereDatabase, 
each.getLogicTableName());
+            tableNameMap.put(each.getLogicTableName(), actualTableName);
+        }
         PrepareTargetTablesParameter prepareTargetTablesParameter = new 
PrepareTargetTablesParameter(jobConfig.getDatabaseName(),
-                
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
-                jobItemContext.getDataSourceManager(), 
jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
+                
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), 
sphereDatabase.getResource().getDataSources(), 
jobItemContext.getDataSourceManager(),
+                jobDataNodeLine, tableNameMap, tableNameSchemaNameMapping, 
sqlParserEngine);
         PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, 
prepareTargetTablesParameter);
     }
     
@@ -149,6 +169,14 @@ public final class MigrationJobPreparer {
         return true;
     }
     
+    private String getActualTable(final ShardingSphereDatabase database, final 
String tableName) {
+        DataNodes dataNodes = new 
DataNodes(database.getRuleMetaData().getRules());
+        Optional<DataNode> filteredDataNode = 
dataNodes.getDataNodes(tableName).stream()
+                .filter(each -> 
database.getResource().getDataSources().containsKey(each.getDataSourceName().contains(".")
 ? each.getDataSourceName().split("\\.")[0] : each.getDataSourceName()))
+                .findFirst();
+        return filteredDataNode.map(DataNode::getTableName).orElse(tableName);
+    }
+    
     private void initInventoryTasks(final MigrationJobItemContext 
jobItemContext) {
         InventoryTaskSplitter inventoryTaskSplitter = new 
InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager(),
                 
jobItemContext.getJobProcessContext().getImporterExecuteEngine(), 
jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), 
jobItemContext.getInitProgress());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 1f4c38b90a7..ceccccf1742 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,22 +18,13 @@
 package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 /**
  * Data source preparer for MySQL.
@@ -44,33 +35,15 @@ public final class MySQLDataSourcePreparer extends 
AbstractDataSourcePreparer {
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         PipelineDataSourceManager dataSourceManager = 
parameter.getDataSourceManager();
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
dataSourceManager).getConnection()) {
-            for (String each : getCreateTableSQL(parameter)) {
+        try (Connection targetConnection = 
getCachedDataSource(parameter.getTargetDataSourceConfig(), 
dataSourceManager).getConnection()) {
+            for (String each : listCreateLogicalTableSQL(parameter)) {
                 executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(each));
-                log.info("create target table '{}' success", each);
             }
         } catch (final SQLException ex) {
             throw new PipelineJobPrepareFailedException("prepare target tables 
failed.", ex);
         }
     }
     
-    private List<String> getCreateTableSQL(final PrepareTargetTablesParameter 
parameter) {
-        PipelineDDLGenerator generator = new PipelineDDLGenerator();
-        ShardingSphereMetaData metaData = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
-        ShardingSphereDatabase sphereDatabase = 
metaData.getDatabases().get(parameter.getDatabaseName());
-        ShardingSphereSQLParserEngine sqlParserEngine =
-                
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
-                        
.getSQLParserEngine(sphereDatabase.getProtocolType().getType());
-        List<String> result = new LinkedList<>();
-        for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
-            String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            String dataSourceName = 
each.getDataNodes().get(0).getDataSourceName();
-            result.add(generator.generateLogicDDLSQL(sphereDatabase, 
dataSourceName, schemaName, each.getLogicTableName(),
-                    getActualTable(sphereDatabase, each.getLogicTableName()), 
sqlParserEngine));
-        }
-        return result;
-    }
-    
     @Override
     public String getType() {
         return "MySQL";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index de5dcbd0a53..f42546f4471 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -20,22 +20,14 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
 import com.google.common.base.Splitter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 /**
  * Data source preparer for openGauss.
@@ -46,7 +38,7 @@ public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePrepare
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         List<String> createLogicalTableSQLs = 
listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = 
getCachedDataSource(parameter.getTargetDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
             for (String createLogicalTableSQL : createLogicalTableSQLs) {
                 for (String each : 
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
 {
                     executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(each));
@@ -57,22 +49,6 @@ public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePrepare
         }
     }
     
-    private List<String> listCreateLogicalTableSQL(final 
PrepareTargetTablesParameter parameter) {
-        PipelineDDLGenerator generator = new PipelineDDLGenerator();
-        ShardingSphereMetaData metaData = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
-        ShardingSphereDatabase sphereDatabase = 
metaData.getDatabases().get(parameter.getDatabaseName());
-        ShardingSphereSQLParserEngine sqlParserEngine = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
-                
.getSQLParserEngine(sphereDatabase.getProtocolType().getType());
-        List<String> result = new LinkedList<>();
-        for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
-            String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            String dataSourceName = 
each.getDataNodes().get(0).getDataSourceName();
-            result.add(generator.generateLogicDDLSQL(sphereDatabase, 
dataSourceName, schemaName, each.getLogicTableName(),
-                    getActualTable(sphereDatabase, each.getLogicTableName()), 
sqlParserEngine));
-        }
-        return result;
-    }
-    
     @Override
     public String getType() {
         return "openGauss";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource
 [...]
index 61287d90bbb..589b1860065 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -20,22 +20,14 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
 import com.google.common.base.Splitter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 /**
  * Data source preparer for PostgresSQL.
@@ -46,7 +38,7 @@ public final class PostgreSQLDataSourcePreparer extends 
AbstractDataSourcePrepar
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         List<String> createLogicalTableSQLs = 
listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = 
getCachedDataSource(parameter.getTargetDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
             for (String createLogicalTableSQL : createLogicalTableSQLs) {
                 for (String each : 
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
 {
                     executeTargetTableSQL(targetConnection, each);
@@ -57,23 +49,6 @@ public final class PostgreSQLDataSourcePreparer extends 
AbstractDataSourcePrepar
         }
     }
     
-    private List<String> listCreateLogicalTableSQL(final 
PrepareTargetTablesParameter parameter) {
-        PipelineDDLGenerator generator = new PipelineDDLGenerator();
-        ShardingSphereMetaData metaData = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
-        ShardingSphereDatabase sphereDatabase = 
metaData.getDatabases().get(parameter.getDatabaseName());
-        ShardingSphereSQLParserEngine sqlParserEngine =
-                
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
-                        
.getSQLParserEngine(sphereDatabase.getProtocolType().getType());
-        List<String> result = new LinkedList<>();
-        for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
-            String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            String dataSourceName = 
each.getDataNodes().get(0).getDataSourceName();
-            result.add(generator.generateLogicDDLSQL(sphereDatabase, 
dataSourceName, schemaName, each.getLogicTableName(),
-                    getActualTable(sphereDatabase, each.getLogicTableName()), 
sqlParserEngine));
-        }
-        return result;
-    }
-    
     @Override
     public String getType() {
         return "PostgreSQL";

Reply via email to