This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a533f1e1e77 Refactor PipelineContainerComposer and
PipelineE2EDistSQLFacade to support running several types of jobs in any E2E
(#37758)
a533f1e1e77 is described below
commit a533f1e1e77574e1b96a77d818edbda95abfe9af
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 16 20:39:21 2026 +0800
Refactor PipelineContainerComposer and PipelineE2EDistSQLFacade to support
running several types of jobs in any E2E (#37758)
* Improve PipelineContainerComposer.cleanUpPipelineJobs, clean all types of
job and remove jobType param
* Add jobType param in PipelineE2EDistSQLFacade constructor
* Remove PipelineContainerComposer constructor jobType param
---
.../pipeline/cases/PipelineContainerComposer.java | 23 ++++++++++++++--------
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 8 +++-----
.../general/MySQLMigrationGeneralE2EIT.java | 4 ++--
.../general/MySQLTimeTypesMigrationE2EIT.java | 4 ++--
.../general/PostgreSQLMigrationGeneralE2EIT.java | 4 ++--
.../general/PostgreSQLToMySQLMigrationE2EIT.java | 4 ++--
.../migration/general/RollbackMigrationE2EIT.java | 4 ++--
.../migration/general/RulesMigrationE2EIT.java | 6 +++---
.../primarykey/IndexesMigrationE2EIT.java | 10 +++++-----
.../primarykey/MariaDBMigrationE2EIT.java | 4 ++--
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 4 ++--
.../pipeline/util/PipelineE2EDistSQLFacade.java | 9 +++++----
12 files changed, 45 insertions(+), 39 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 1ebcf787b41..28f0bdfc838 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -35,6 +35,7 @@ import
org.apache.shardingsphere.database.connector.mysql.type.MySQLDatabaseType
import
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
import
org.apache.shardingsphere.database.connector.postgresql.type.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -118,11 +119,9 @@ public final class PipelineContainerComposer implements
AutoCloseable {
private final DataSource proxyDataSource;
- private final PipelineJobType<?> jobType;
-
private Thread increaseTaskThread;
- public PipelineContainerComposer(final PipelineTestParameter testParam,
final PipelineJobType<?> jobType) {
+ public PipelineContainerComposer(final PipelineTestParameter testParam) {
databaseType = testParam.getDatabaseType();
Type type =
E2ETestEnvironment.getInstance().getRunEnvironment().getType();
containerComposer = Type.DOCKER == type
@@ -140,15 +139,14 @@ public final class PipelineContainerComposer implements
AutoCloseable {
sourceDataSource =
StorageContainerUtils.generateDataSource(getActualJdbcUrlTemplate(DS_0, false),
username, password, 2);
proxyDataSource = StorageContainerUtils.generateDataSource(
appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)),
ProxyContainerConstants.USER, ProxyContainerConstants.PASSWORD, 2);
- this.jobType = jobType;
- init(jobType);
+ init();
}
@SneakyThrows(SQLException.class)
- private void init(final PipelineJobType<?> jobType) {
+ private void init() {
String jdbcUrl = containerComposer.getProxyJdbcUrl(databaseType
instanceof PostgreSQLDatabaseType || databaseType instanceof
OpenGaussDatabaseType ? "postgres" : "");
try (Connection connection = DriverManager.getConnection(jdbcUrl,
ProxyContainerConstants.USER, ProxyContainerConstants.PASSWORD)) {
- cleanUpPipelineJobs(connection, jobType);
+ cleanUpPipelineJobs(connection);
cleanUpProxyDatabase(connection);
// Compatible with "drop database if exists sharding_db;" failed
for now
cleanUpProxyDatabase(connection);
@@ -157,10 +155,19 @@ public final class PipelineContainerComposer implements
AutoCloseable {
cleanUpDataSource();
}
- private void cleanUpPipelineJobs(final Connection connection, final
PipelineJobType<?> jobType) throws SQLException {
+ private void cleanUpPipelineJobs(final Connection connection) throws
SQLException {
if (Type.NATIVE !=
E2ETestEnvironment.getInstance().getRunEnvironment().getType()) {
return;
}
+ for (PipelineJobType<?> each :
ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
+ if (!each.getOption().isTransmissionJob()) {
+ continue;
+ }
+ cleanUpPipelineJobsWithType(connection, each);
+ }
+ }
+
+ private void cleanUpPipelineJobsWithType(final Connection connection,
final PipelineJobType<?> jobType) throws SQLException {
String jobTypeName = jobType.getType();
for (Map<String, Object> each : queryJobs(connection, jobTypeName)) {
String jobId = each.get("id").toString();
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index d5d6c94a85e..99d27883a9f 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -98,15 +98,13 @@ class CDCE2EIT {
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertCDCDataImportSuccess(final PipelineTestParameter testParam)
throws SQLException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new CDCJobType())) {
- String alterStreamingRule = "ALTER STREAMING RULE
(READ(WORKER_THREAD=20,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
- + "WRITE(WORKER_THREAD=20,BATCH_SIZE=1000,
RATE_LIMITER(TYPE(NAME='TPS',PROPERTIES('tps'='10000')))),STREAM_CHANNEL(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))))";
- containerComposer.proxyExecuteWithLog(alterStreamingRule, 0);
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new CDCJobType());
+ distSQLFacade.alterPipelineRule();
for (String each : Arrays.asList(PipelineContainerComposer.DS_0,
PipelineContainerComposer.DS_1)) {
containerComposer.registerStorageUnit(each);
}
createOrderTableRule(containerComposer);
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
distSQLFacade.createBroadcastRule("t_address");
try (Connection connection =
containerComposer.getProxyDataSource().getConnection()) {
initSchemaAndTable(containerComposer, connection, 3);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 8650306176c..1f3a9cf2429 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -62,8 +62,8 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertMigrationSuccess(final PipelineTestParameter testParam) throws
SQLException, InterruptedException {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
distSQLFacade.alterPipelineRule();
containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
containerComposer.createSourceOrderItemTable();
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index ed5812ff1cd..96da7255b1e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -45,7 +45,7 @@ class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertIllegalTimeTypesValueMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sql = "CREATE TABLE `time_e2e` ( `id` int NOT NULL,
`t_timestamp` timestamp NULL DEFAULT NULL, `t_datetime` datetime DEFAULT NULL,
`t_date` date DEFAULT NULL, "
+ "`t_year` year DEFAULT NULL, PRIMARY KEY (`id`))
ENGINE=InnoDB;";
containerComposer.sourceExecuteWithLog(sql);
@@ -53,7 +53,7 @@ class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
startMigration(containerComposer, "time_e2e", "time_e2e");
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
insertOneRecordWithZeroValue(containerComposer, 2);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 0fbfdf2d732..b61b4adbc29 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -64,8 +64,8 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertMigrationSuccess(final PipelineTestParameter testParam) throws
SQLException, InterruptedException {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
distSQLFacade.alterPipelineRule();
createSourceSchema(containerComposer,
PipelineContainerComposer.SCHEMA_NAME);
containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index a41d0137748..fdf78950802 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -63,7 +63,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
void assertMigrationSuccess(final PipelineTestParameter testParam) throws
SQLException {
PostgreSQLContainer<?> postgresqlContainer = null;
Type type =
E2ETestEnvironment.getInstance().getRunEnvironment().getType();
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
if (Type.DOCKER == type) {
postgresqlContainer = new PostgreSQLContainer<>("postgres:13");
postgresqlContainer.withNetwork(containerComposer.getContainerComposer().getContainers().getNetwork()).withNetworkAliases("postgresql.host")
@@ -77,7 +77,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
+ "KEY_GENERATE_STRATEGY(COLUMN=order_id,
TYPE(NAME='snowflake')))", 2);
initTargetTable(containerComposer);
containerComposer.proxyExecuteWithLog("MIGRATE TABLE
source_ds.t_order INTO t_order", 2);
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
Awaitility.await().ignoreExceptions().atMost(10L,
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() ->
!distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobStatusReached(String.format("SHOW
MIGRATION STATUS %s", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
index ec1c742467d..d173d3547c2 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
@@ -43,7 +43,7 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertIllegalTimeTypesValueMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sql = "CREATE TABLE t_order (order_id BIGINT PRIMARY KEY,
user_id INT, status VARCHAR(50))";
containerComposer.sourceExecuteWithLog(sql);
try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
@@ -52,7 +52,7 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
startMigration(containerComposer, "t_order", "t_order");
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
distSQLFacade.rollback(jobId);
assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 4486edb8899..952c7493ca6 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -58,7 +58,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertNoRuleMigrationSuccess(final PipelineTestParameter testParam)
throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
assertMigrationSuccess(containerComposer, null);
}
}
@@ -67,7 +67,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertOnlyEncryptRuleMigrationSuccess(final PipelineTestParameter
testParam) throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
assertMigrationSuccess(containerComposer, () -> {
createTargetOrderTableEncryptRule(containerComposer);
return null;
@@ -86,7 +86,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
addRuleFn.call();
}
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index ef8d02a8d7e..28934494e74 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -78,7 +78,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter
testParam) throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sql;
String consistencyCheckAlgorithmType;
if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
@@ -162,7 +162,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertMultiPrimaryKeyMigrationSuccess(final PipelineTestParameter
testParam) throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sql;
String consistencyCheckAlgorithmType;
if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
@@ -185,7 +185,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertMultiUniqueKeyMigrationSuccess(final PipelineTestParameter
testParam) throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sql;
String consistencyCheckAlgorithmType;
if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
@@ -208,7 +208,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sql;
String consistencyCheckAlgorithmType;
if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
@@ -235,7 +235,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
distSQLFacade.alterPipelineRule();
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index f3cf694686e..df23e276776 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -56,14 +56,14 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertMigrationSuccess(final PipelineTestParameter testParam) throws
SQLException {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT
NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
containerComposer.sourceExecuteWithLog(String.format(sqlPattern,
SOURCE_TABLE_NAME));
try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
KeyGenerateAlgorithm generateAlgorithm = new
UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
generateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
distSQLFacade.alterPipelineRule();
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 311cf6fa05e..f29f3b64a31 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -51,13 +51,13 @@ class TextPrimaryKeyMigrationE2EIT extends
AbstractMigrationE2EIT {
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertTextPrimaryMigrationSuccess(final PipelineTestParameter
testParam) throws SQLException {
- try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
+ try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
containerComposer.createSourceOrderTable(getSourceTableName(containerComposer));
try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, getSourceTableName(containerComposer),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
distSQLFacade.alterPipelineRule();
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index d1b1b882342..27081b183fa 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.test.e2e.operation.pipeline.util;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import org.awaitility.Awaitility;
@@ -37,12 +38,12 @@ public final class PipelineE2EDistSQLFacade {
+ "WRITE(WORKER_THREAD=20, BATCH_SIZE=1000, RATE_LIMITER
(TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),\n"
+ "STREAM_CHANNEL(TYPE(NAME='MEMORY',
PROPERTIES('block-queue-size'=1000))))";
- private final String jobTypeName;
-
private final PipelineContainerComposer containerComposer;
- public PipelineE2EDistSQLFacade(final PipelineContainerComposer
containerComposer) {
- this(containerComposer.getJobType().getType(), containerComposer);
+ private final String jobTypeName;
+
+ public PipelineE2EDistSQLFacade(final PipelineContainerComposer
containerComposer, final PipelineJobType<?> jobType) {
+ this(containerComposer, jobType.getType());
}
/**