This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f898c14735 DataConsistencyChecker get metadata support multi schema 
(#17309)
3f898c14735 is described below

commit 3f898c147351ef28378eea47dbb562f3738d18fc
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed May 4 14:34:42 2022 +0800

    DataConsistencyChecker get metadata support multi schema (#17309)
---
 .../check/consistency/DataConsistencyChecker.java  | 23 +++++++++++-----------
 .../rulealtered/RuleAlteredJobPreparer.java        |  1 +
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 2aa36f9936f..c44ef6bc6a9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -40,6 +40,7 @@ import 
org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
@@ -163,17 +164,13 @@ public final class DataConsistencyChecker {
         try (
                 PipelineDataSourceWrapper sourceDataSource = 
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
                 PipelineDataSourceWrapper targetDataSource = 
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            Map<String, TableMetaData> tableMetaDataMap = 
getTableMetaDataMap(jobConfig.getDatabaseName());
-            logicTableNames.forEach(each -> {
-                // TODO put to preparer
-                if (!tableMetaDataMap.containsKey(each)) {
-                    throw new 
PipelineDataConsistencyCheckFailedException(String.format("Could not get 
metadata for table '%s', tableMetaDataMap.keySet=%s", each, 
tableMetaDataMap.keySet()));
-                }
-            });
             String sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType().getName();
             String targetDatabaseType = 
targetDataSourceConfig.getDatabaseType().getName();
             for (String each : logicTableNames) {
-                TableMetaData tableMetaData = tableMetaDataMap.get(each);
+                TableMetaData tableMetaData = 
getTableMetaData(jobConfig.getDatabaseName(), each);
+                if (null == tableMetaData) {
+                    throw new PipelineDataConsistencyCheckFailedException("Can 
not get metadata for table " + each);
+                }
                 Collection<String> columnNames = 
tableMetaData.getColumns().keySet();
                 String uniqueKey = tableMetaData.getPrimaryKeyColumns().get(0);
                 DataConsistencyCalculateParameter sourceParameter = 
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey);
@@ -226,15 +223,19 @@ public final class DataConsistencyChecker {
         }
     }
     
-    private Map<String, TableMetaData> getTableMetaDataMap(final String 
databaseName) {
+    private TableMetaData getTableMetaData(final String databaseName, final 
String logicTableName) {
         ContextManager contextManager = PipelineContext.getContextManager();
         Preconditions.checkNotNull(contextManager, "ContextManager null");
         ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData(databaseName);
         if (null == metaData) {
             throw new RuntimeException("Can not get meta data by database name 
" + databaseName);
         }
-        String schema = 
metaData.getResource().getDatabaseType().getDefaultSchema(databaseName);
-        return metaData.getSchemaByName(schema).getTables();
+        String schemaName = 
tableNameSchemaNameMapping.getSchemaName(logicTableName);
+        ShardingSphereSchema schema = metaData.getSchemaByName(schemaName);
+        if (null == schema) {
+            throw new RuntimeException("Can not get schema by schema name " + 
schemaName + ", logicTableName=" + logicTableName);
+        }
+        return schema.get(logicTableName);
     }
     
     private DataConsistencyCalculateParameter buildParameter(final 
PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping 
tableNameSchemaNameMapping, final String tableName,
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 5b7da97826e..8a0684a417f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -78,6 +78,7 @@ public final class RuleAlteredJobPreparer {
         // But InventoryTaskSplitter need to check target tables. It need to 
do some refactoring for appendJDBCQueryProperties vocations.
         checkSourceDataSource(jobContext);
         prepareAndCheckTargetWithLock(jobContext);
+        // TODO check metadata
         try {
             initIncrementalTasks(jobContext);
             initInventoryTasks(jobContext);

Reply via email to