This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 170f3f30cac Add pre-check for migration job to fail-fast (#20500)
170f3f30cac is described below
commit 170f3f30cace779604450d12a84b463537bd27e3
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Aug 25 11:51:56 2022 +0800
Add pre-check for migration job to fail-fast (#20500)
* Fix actualTableName null in AbstractDataSourcePreparer when migrating to
different table name
* Pre-check inventory and incremental tasks empty or not
* Pre-check target database name could not be null
* Pre-check source and target table name must be the same
* Recover
* Fix check style
---
.../migration/distsql/handler/update/MigrateTableUpdater.java | 3 +++
.../pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java | 4 +++-
.../core/prepare/datasource/PrepareTargetTablesParameter.java | 5 +----
.../data/pipeline/scenario/migration/MigrationJob.java | 1 +
.../data/pipeline/scenario/migration/MigrationJobPreparer.java | 4 +---
5 files changed, 9 insertions(+), 8 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index 06c7d721fc5..c8658614503 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
+import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
@@ -37,6 +38,8 @@ public final class MigrateTableUpdater implements
RALUpdater<MigrateTableStateme
public void executeUpdate(final String databaseName, final
MigrateTableStatement sqlStatement) {
log.info("start migrate job by {}", sqlStatement);
String targetDatabaseName =
ObjectUtils.defaultIfNull(sqlStatement.getTargetDatabaseName(), databaseName);
+ Preconditions.checkNotNull(targetDatabaseName, "Target database name
is null. You could define it in DistSQL or select a database.");
+
Preconditions.checkArgument(sqlStatement.getSourceTableName().equalsIgnoreCase(sqlStatement.getTargetTableName()),
"Source table name and target table name must be the same for now.");
CreateMigrationJobParameter createMigrationJobParameter = new
CreateMigrationJobParameter(sqlStatement.getSourceResourceName(),
sqlStatement.getSourceSchemaName(),
sqlStatement.getSourceTableName(), targetDatabaseName,
sqlStatement.getTargetTableName());
JOB_API.createJobAndStart(createMigrationJobParameter);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index e503a4147cd..c0172f9c51f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
@@ -128,7 +129,8 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
DataSource dataSource =
parameter.getSourceDataSourceMap().get(dataSourceName);
DatabaseType databaseType =
DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- String actualTableName =
parameter.getTableNameMap().get(each.getLogicTableName());
+ String actualTableName = each.getDataNodes().get(0).getTableName();
+ Preconditions.checkNotNull(actualTableName, "Could not get
actualTableName, nodeEntry={}", each);
result.add(generator.generateLogicDDL(databaseType, dataSource,
schemaName, each.getLogicTableName(), actualTableName,
parameter.getSqlParserEngine()));
}
return result;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index 76e70af4979..c9f1e8fc764 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -44,22 +44,19 @@ public final class PrepareTargetTablesParameter {
private final PipelineDataSourceManager dataSourceManager;
- private final Map<String, String> tableNameMap;
-
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
private final ShardingSphereSQLParserEngine sqlParserEngine;
public PrepareTargetTablesParameter(@NonNull final String databaseName,
@NonNull final PipelineDataSourceConfiguration targetDataSourceConfig,
@NonNull final Map<String, DataSource>
sourceDataSourceMap, @NonNull final PipelineDataSourceManager dataSourceManager,
- @NonNull final JobDataNodeLine
tablesFirstDataNodes, final Map<String, String> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping,
+ @NonNull final JobDataNodeLine
tablesFirstDataNodes, final TableNameSchemaNameMapping
tableNameSchemaNameMapping,
@NonNull final
ShardingSphereSQLParserEngine sqlParserEngine) {
this.databaseName = databaseName;
this.targetDataSourceConfig = targetDataSourceConfig;
this.sourceDataSourceMap = sourceDataSourceMap;
this.tablesFirstDataNodes = tablesFirstDataNodes;
this.dataSourceManager = dataSourceManager;
- this.tableNameMap = tableNameMap;
this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
this.sqlParserEngine = sqlParserEngine;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 8b32537e2a9..466b6428e96 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -75,6 +75,7 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
return;
}
log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(),
shardingItem);
+ // TODO inventory and incremental tasks are always empty on
construction
InventoryIncrementalTasksRunner tasksRunner = new
InventoryIncrementalTasksRunner(jobItemContext,
jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks(),
jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(),
jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine());
runInBackground(() -> {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index bf5f80f6cab..b0b6e428de7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -151,14 +151,12 @@ public final class MigrationJobPreparer {
ShardingSphereDatabase sphereDatabase =
metaData.getDatabases().get(jobConfig.getTargetDatabaseName());
ShardingSphereSQLParserEngine sqlParserEngine =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
JobDataNodeLine jobDataNodeLine =
JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
- Map<String, String> tableNameMap = new HashMap<>();
- tableNameMap.put(jobConfig.getTargetTableName(),
jobConfig.getSourceTableName());
PipelineDataSourceWrapper dataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
Map<String, DataSource> sourceDataSourceMap = new HashMap<>(1, 1.0F);
sourceDataSourceMap.put(jobConfig.getSourceResourceName(), dataSource);
PrepareTargetTablesParameter prepareTargetTablesParameter = new
PrepareTargetTablesParameter(jobConfig.getTargetDatabaseName(),
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
sourceDataSourceMap, jobItemContext.getDataSourceManager(),
- jobDataNodeLine, tableNameMap, tableNameSchemaNameMapping,
sqlParserEngine);
+ jobDataNodeLine, tableNameSchemaNameMapping, sqlParserEngine);
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType,
prepareTargetTablesParameter);
}