This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cca14db65cf Add unsupported mode type check for migration (#30339)
cca14db65cf is described below
commit cca14db65cfc7d3c17052bd8103796289815e3dd
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Feb 28 20:03:05 2024 +0800
Add unsupported mode type check for migration (#30339)
* Add unsupported mode type check
* Replaced by cluster
* Use instanceContext.isCluster()
---
.../migration/distsql/handler/update/MigrateTableExecutor.java | 5 +++++
.../handler/update/RegisterMigrationSourceStorageUnitExecutor.java | 7 ++++++-
.../data/pipeline/scenario/migration/api/MigrationJobAPI.java | 4 ++--
.../pipeline/scenario/migration/api/impl/MigrationJobAPITest.java | 2 +-
4 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java
index a83160b4dbd..78bc1f23677 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/MigrateTableExecutor.java
@@ -20,12 +20,14 @@ package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
import
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -41,6 +43,9 @@ public final class MigrateTableExecutor implements
DistSQLUpdateExecutor<Migrate
@Override
public void executeUpdate(final MigrateTableStatement sqlStatement, final
ContextManager contextManager) {
+ InstanceContext instanceContext = contextManager.getInstanceContext();
+ ShardingSpherePreconditions.checkState(instanceContext.isCluster(),
+ () -> new
PipelineInvalidParameterException(String.format("Only `Cluster` is supported
now, but current mode type is `%s`",
instanceContext.getModeConfiguration().getType())));
checkTargetDatabase(sqlStatement);
String targetDatabaseName = null ==
sqlStatement.getTargetDatabaseName() ? database.getName() :
sqlStatement.getTargetDatabaseName();
MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java
index 8b0431cf032..904b8b658cc 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitExecutor.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RegisterMigrationSourceStorageUnitStatement;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
@@ -32,6 +33,7 @@ import
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -51,13 +53,16 @@ public final class
RegisterMigrationSourceStorageUnitExecutor implements DistSQL
@Override
public void executeUpdate(final
RegisterMigrationSourceStorageUnitStatement sqlStatement, final ContextManager
contextManager) {
+ InstanceContext instanceContext = contextManager.getInstanceContext();
+ ShardingSpherePreconditions.checkState(instanceContext.isCluster(),
+ () -> new
PipelineInvalidParameterException(String.format("Only `Cluster` is supported
now, but current mode type is `%s`",
instanceContext.getModeConfiguration().getType())));
checkDataSource(sqlStatement);
List<DataSourceSegment> dataSources = new
ArrayList<>(sqlStatement.getDataSources());
URLBasedDataSourceSegment urlBasedDataSourceSegment =
(URLBasedDataSourceSegment) dataSources.get(0);
DatabaseType databaseType =
DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap =
DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
- jobAPI.addMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), propsMap);
+ jobAPI.registerMigrationSourceStorageUnits(new
PipelineContextKey(InstanceType.PROXY), propsMap);
}
private void checkDataSource(final
RegisterMigrationSourceStorageUnitStatement sqlStatement) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 9f104b7938a..37758f39f8a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -198,12 +198,12 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
/**
- * Add migration source resources.
+ * Register migration source storage units.
*
* @param contextKey context key
* @param propsMap data source pool properties map
*/
- public void addMigrationSourceResources(final PipelineContextKey
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
+ public void registerMigrationSourceStorageUnits(final PipelineContextKey
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, getType());
Collection<String> duplicateDataSourceNames = new
HashSet<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 8bcbc33ee9e..362556cac2b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -126,7 +126,7 @@ class MigrationJobAPITest {
props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
-
jobAPI.addMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
}
@AfterAll