This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 41eaef725c5 Refactor DataSourceCheckEngine (#29462)
41eaef725c5 is described below
commit 41eaef725c5705633c5abfc0d62e8356065188ba
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 00:26:42 2023 +0800
Refactor DataSourceCheckEngine (#29462)
* Refactor DataSourceCheckEngine
* Refactor DataSourceCheckEngine
* Refactor DataSourceCheckEngine
* Refactor DataSourceCheckEngine
* Refactor DataSourceCheckEngine
* Refactor DataSourceCheckEngine
* Refactor DataSourceCheckEngine
---
.../core/checker/DataSourceCheckEngine.java | 90 +++++++---------------
.../datasource/DataSourceCheckEngineTest.java | 17 ++--
.../migration/preparer/MigrationJobPreparer.java | 5 +-
3 files changed, 42 insertions(+), 70 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index f9e48d9a224..e7759414322 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -31,7 +32,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
/**
* Data source check engine.
@@ -47,37 +47,13 @@ public final class DataSourceCheckEngine {
sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
}
- /**
- * Check source data source.
- *
- * @param dataSource to be checked source data source
- */
- public void checkSourceDataSource(final DataSource dataSource) {
- Collection<DataSource> dataSources = Collections.singleton(dataSource);
- checkConnection(dataSources);
- checkPrivilege(dataSources);
- checkVariable(dataSources);
- }
-
- /**
- * Check target data source.
- *
- * @param dataSource to be checked target data source
- * @param importerConfig importer configuration
- */
- public void checkTargetDataSource(final DataSource dataSource, final
ImporterConfiguration importerConfig) {
- Collection<DataSource> dataSources = Collections.singleton(dataSource);
- checkConnection(dataSources);
- checkTargetTable(dataSources,
importerConfig.getTableAndSchemaNameMapper(),
importerConfig.getLogicTableNames());
- }
-
/**
* Check data source connections.
*
* @param dataSources data sources
* @throws PrepareJobWithInvalidConnectionException prepare job with
invalid connection exception
*/
- public void checkConnection(final Collection<? extends DataSource>
dataSources) {
+ public void checkConnection(final Collection<DataSource> dataSources) {
try {
for (DataSource each : dataSources) {
each.getConnection().close();
@@ -88,22 +64,38 @@ public final class DataSourceCheckEngine {
}
/**
- * Check table is empty.
+ * Check source data source.
+ *
+ * @param dataSources to be checked source data source
+ */
+ public void checkSourceDataSource(final Collection<DataSource>
dataSources) {
+ checkConnection(dataSources);
+ if (null == checker) {
+ return;
+ }
+ dataSources.forEach(checker::checkPrivilege);
+ dataSources.forEach(checker::checkVariable);
+ }
+
+ /**
+ * Check target data sources.
*
- * @param dataSources data sources
- * @param tableAndSchemaNameMapper mapping
- * @param logicTableNames logic table names
- * @throws PrepareJobWithInvalidConnectionException prepare job with
invalid connection exception
+ * @param dataSources to be checked target data sources
+ * @param importerConfig importer configuration
*/
+ public void checkTargetDataSources(final Collection<DataSource>
dataSources, final ImporterConfiguration importerConfig) {
+ checkConnection(dataSources);
+ checkTargetTable(dataSources,
importerConfig.getTableAndSchemaNameMapper(),
importerConfig.getLogicTableNames());
+ }
+
// TODO rename to common usage name
// TODO Merge schemaName and tableNames
- public void checkTargetTable(final Collection<? extends DataSource>
dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final
Collection<String> logicTableNames) {
+ private void checkTargetTable(final Collection<DataSource> dataSources,
final TableAndSchemaNameMapper tableAndSchemaNameMapper, final
Collection<String> logicTableNames) {
try {
for (DataSource each : dataSources) {
for (String tableName : logicTableNames) {
- if (!checkEmpty(each,
tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) {
- throw new
PrepareJobWithTargetTableNotEmptyException(tableName);
- }
+ ShardingSpherePreconditions.checkState(checkEmpty(each,
tableAndSchemaNameMapper.getSchemaName(tableName), tableName),
+ () -> new
PrepareJobWithTargetTableNotEmptyException(tableName));
}
}
} catch (final SQLException ex) {
@@ -120,32 +112,4 @@ public final class DataSourceCheckEngine {
return !resultSet.next();
}
}
-
- /**
- * Check user privileges.
- *
- * @param dataSources data sources
- */
- public void checkPrivilege(final Collection<? extends DataSource>
dataSources) {
- if (null == checker) {
- return;
- }
- for (DataSource each : dataSources) {
- checker.checkPrivilege(each);
- }
- }
-
- /**
- * Check data source variables.
- *
- * @param dataSources data sources
- */
- public void checkVariable(final Collection<? extends DataSource>
dataSources) {
- if (null == checker) {
- return;
- }
- for (DataSource each : dataSources) {
- checker.checkVariable(each);
- }
- }
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index f117cf933dd..16bf462303f 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
@@ -39,6 +40,7 @@ import java.util.Collections;
import java.util.LinkedList;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -82,20 +84,25 @@ class DataSourceCheckEngineTest {
}
@Test
- void assertCheckTargetTable() throws SQLException {
+ void assertCheckTargetDataSources() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT
1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
- dataSourceCheckEngine.checkTargetTable(dataSources, new
TableAndSchemaNameMapper(Collections.emptyMap()),
Collections.singletonList("t_order"));
+ ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class);
+ when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new
TableAndSchemaNameMapper(Collections.emptyMap()));
+
when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order"));
+ dataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig);
}
@Test
- void assertCheckTargetTableFailed() throws SQLException {
+ void assertCheckTargetDataSourcesFailed() throws SQLException {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT
1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
- assertThrows(PrepareJobWithTargetTableNotEmptyException.class,
- () -> dataSourceCheckEngine.checkTargetTable(dataSources, new
TableAndSchemaNameMapper(Collections.emptyMap()),
Collections.singletonList("t_order")));
+ ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class);
+ when(importerConfig.getTableAndSchemaNameMapper()).thenReturn(new
TableAndSchemaNameMapper(Collections.emptyMap()));
+
when(importerConfig.getLogicTableNames()).thenReturn(Collections.singleton("t_order"));
+ assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () ->
dataSourceCheckEngine.checkTargetDataSources(dataSources, importerConfig));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index db5e6407798..9087424ba82 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -76,6 +76,7 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
@@ -101,7 +102,7 @@ public final class MigrationJobPreparer {
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType =
jobItemContext.getJobConfig().getSourceDatabaseType();
- new
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(jobItemContext.getSourceDataSource());
+ new
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;
@@ -163,7 +164,7 @@ public final class MigrationJobPreparer {
}
if (null == jobItemContext.getInitProgress()) {
PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- new
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSource(targetDataSource,
jobItemContext.getTaskConfig().getImporterConfig());
+ new
DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource),
jobItemContext.getTaskConfig().getImporterConfig());
}
}