This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3f898c14735 DataConsistencyChecker get metadata support multi schema
(#17309)
3f898c14735 is described below
commit 3f898c147351ef28378eea47dbb562f3738d18fc
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed May 4 14:34:42 2022 +0800
DataConsistencyChecker get metadata support multi schema (#17309)
---
.../check/consistency/DataConsistencyChecker.java | 23 +++++++++++-----------
.../rulealtered/RuleAlteredJobPreparer.java | 1 +
2 files changed, 13 insertions(+), 11 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 2aa36f9936f..c44ef6bc6a9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -40,6 +40,7 @@ import
org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -163,17 +164,13 @@ public final class DataConsistencyChecker {
try (
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- Map<String, TableMetaData> tableMetaDataMap =
getTableMetaDataMap(jobConfig.getDatabaseName());
- logicTableNames.forEach(each -> {
- // TODO put to preparer
- if (!tableMetaDataMap.containsKey(each)) {
- throw new
PipelineDataConsistencyCheckFailedException(String.format("Could not get
metadata for table '%s', tableMetaDataMap.keySet=%s", each,
tableMetaDataMap.keySet()));
- }
- });
String sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType().getName();
String targetDatabaseType =
targetDataSourceConfig.getDatabaseType().getName();
for (String each : logicTableNames) {
- TableMetaData tableMetaData = tableMetaDataMap.get(each);
+ TableMetaData tableMetaData =
getTableMetaData(jobConfig.getDatabaseName(), each);
+ if (null == tableMetaData) {
+ throw new PipelineDataConsistencyCheckFailedException("Can
not get metadata for table " + each);
+ }
Collection<String> columnNames =
tableMetaData.getColumns().keySet();
String uniqueKey = tableMetaData.getPrimaryKeyColumns().get(0);
DataConsistencyCalculateParameter sourceParameter =
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey);
@@ -226,15 +223,19 @@ public final class DataConsistencyChecker {
}
}
- private Map<String, TableMetaData> getTableMetaDataMap(final String
databaseName) {
+ private TableMetaData getTableMetaData(final String databaseName, final
String logicTableName) {
ContextManager contextManager = PipelineContext.getContextManager();
Preconditions.checkNotNull(contextManager, "ContextManager null");
ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData(databaseName);
if (null == metaData) {
throw new RuntimeException("Can not get meta data by database name
" + databaseName);
}
- String schema =
metaData.getResource().getDatabaseType().getDefaultSchema(databaseName);
- return metaData.getSchemaByName(schema).getTables();
+ String schemaName =
tableNameSchemaNameMapping.getSchemaName(logicTableName);
+ ShardingSphereSchema schema = metaData.getSchemaByName(schemaName);
+ if (null == schema) {
+ throw new RuntimeException("Can not get schema by schema name " +
schemaName + ", logicTableName=" + logicTableName);
+ }
+ return schema.get(logicTableName);
}
private DataConsistencyCalculateParameter buildParameter(final
PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping
tableNameSchemaNameMapping, final String tableName,
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 5b7da97826e..8a0684a417f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -78,6 +78,7 @@ public final class RuleAlteredJobPreparer {
// But InventoryTaskSplitter need to check target tables. It need to
do some refactoring for appendJDBCQueryProperties vocations.
checkSourceDataSource(jobContext);
prepareAndCheckTargetWithLock(jobContext);
+ // TODO check metadata
try {
initIncrementalTasks(jobContext);
initInventoryTasks(jobContext);