This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aad259d91dc Add more test fields in incremental stage for pipeline job
E2E (#24485)
aad259d91dc is described below
commit aad259d91dce08b57437da37d10feadd57e33b39
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Mar 9 16:38:35 2023 +0800
Add more test fields in incremental stage for pipeline job E2E (#24485)
* E2E add more test fields in the incremental stage of pipeline
* Improve CI
* Compatible with BINARY column type in MySQL binlog parsing
* Fix ci
* Remove quoto
* Add extra_float_digits parameter for openGauss and trigger ci
* Update openGauss docker image version
* Revert upgrade openGauss version
* Trigger ci
* User ShardingSphere-JDBC data source
* Generate float no more than 2 decimal places
* Generate double no more than 6 decimal places
* Return MySQLBinaryString
* Fix codestyle
* Fix CDC SPI type name not match
* Fix PostgreSQL ci error.
* Improve code
* Improve set null method
---
.../string/MySQLStringBinlogProtocolValue.java | 2 +-
.../string/MySQLStringBinlogProtocolValueTest.java | 15 +-
.../query/ShowStreamingJobStatusExecutor.java | 3 +-
.../pipeline/cases/base/BaseIncrementTask.java | 1 +
.../pipeline/cases/base/PipelineBaseE2EIT.java | 20 ++-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 41 +++---
.../general/MySQLMigrationGeneralE2EIT.java | 4 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 17 ++-
.../pipeline/cases/task/E2EIncrementalTask.java | 161 +++++++++++++++++++++
.../pipeline/cases/task/MySQLIncrementTask.java | 2 +-
.../cases/task/PostgreSQLIncrementTask.java | 1 +
.../framework/helper/PipelineCaseHelper.java | 97 ++++++++-----
12 files changed, 289 insertions(+), 75 deletions(-)
diff --git
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValue.java
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValue.java
index 732ab5580f7..3097eb81d54 100644
---
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValue.java
+++
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValue.java
@@ -45,7 +45,7 @@ public final class MySQLStringBinlogProtocolValue implements
MySQLBinlogProtocol
case MYSQL_TYPE_SET:
return payload.getByteBuf().readByte();
case MYSQL_TYPE_STRING:
- return payload.readStringFix(readActualLength(length,
payload));
+ return new
MySQLBinaryString(payload.readStringFixByBytes(readActualLength(length,
payload)));
default:
throw new
UnsupportedSQLOperationException(MySQLBinaryColumnType.valueOf(type).toString());
}
diff --git
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValueTest.java
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValueTest.java
index 24002fae717..fd8d289deaa 100644
---
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValueTest.java
+++
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValueTest.java
@@ -28,8 +28,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.io.Serializable;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;
@@ -83,8 +86,10 @@ public final class MySQLStringBinlogProtocolValueTest {
columnDef.setColumnMeta(MySQLBinaryColumnType.MYSQL_TYPE_STRING.getValue() <<
8);
when(payload.getByteBuf()).thenReturn(byteBuf);
when(byteBuf.readUnsignedByte()).thenReturn((short) expected.length());
- when(payload.readStringFix(expected.length())).thenReturn(expected);
- assertThat(new MySQLStringBinlogProtocolValue().read(columnDef,
payload), is(expected));
+
when(payload.readStringFixByBytes(expected.length())).thenReturn(expected.getBytes());
+ Serializable actual = new
MySQLStringBinlogProtocolValue().read(columnDef, payload);
+ assertInstanceOf(MySQLBinaryString.class, actual);
+ assertThat(((MySQLBinaryString) actual).getBytes(),
is(expected.getBytes()));
}
@Test
@@ -93,8 +98,10 @@ public final class MySQLStringBinlogProtocolValueTest {
columnDef.setColumnMeta((MySQLBinaryColumnType.MYSQL_TYPE_STRING.getValue() ^
((256 & 0x300) >> 4)) << 8);
when(payload.getByteBuf()).thenReturn(byteBuf);
when(byteBuf.readUnsignedShortLE()).thenReturn(expected.length());
- when(payload.readStringFix(expected.length())).thenReturn(expected);
- assertThat(new MySQLStringBinlogProtocolValue().read(columnDef,
payload), is(expected));
+
when(payload.readStringFixByBytes(expected.length())).thenReturn(expected.getBytes());
+ Serializable actual = new
MySQLStringBinlogProtocolValue().read(columnDef, payload);
+ assertInstanceOf(MySQLBinaryString.class, actual);
+ assertThat(((MySQLBinaryString) actual).getBytes(),
is(expected.getBytes()));
}
@Test
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index f308304f44a..4eb589ace35 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
+import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -39,7 +40,7 @@ public final class ShowStreamingJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingStatusStatement sqlStatement) {
- InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, "CDC");
+ InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getTypeName());
List<InventoryIncrementalJobItemInfo> jobItemInfos =
jobAPI.getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each,
currentTimeMillis)).collect(Collectors.toList());
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java
index 344f53c23f9..1cff3748270 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.test.e2e.data.pipeline.cases.base;
+// TODO remove later
public abstract class BaseIncrementTask implements Runnable {
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index cefb222fcfd..5ae6eef95e1 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.test.e2e.env.runtime.DataSourceEnvironment;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
import org.junit.Rule;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import javax.sql.DataSource;
import javax.xml.bind.JAXB;
@@ -248,6 +249,16 @@ public abstract class PipelineBaseE2EIT {
return "t_order";
}
+ protected void createSchema(final Connection connection, final int
sleepSeconds) throws SQLException {
+ if (!getDatabaseType().isSchemaAvailable()) {
+ return;
+ }
+ connection.createStatement().execute(String.format("CREATE SCHEMA %s",
SCHEMA_NAME));
+ if (sleepSeconds > 0) {
+ ThreadUtil.sleep(sleepSeconds, TimeUnit.SECONDS);
+ }
+ }
+
protected void createSourceOrderTable() throws SQLException {
sourceExecuteWithLog(getExtraSQLCommand().getCreateTableOrder(getSourceTableOrderName()));
}
@@ -394,8 +405,8 @@ public abstract class PipelineBaseE2EIT {
// TODO proxy support for some fields still needs to be optimized, such as
binary of MySQL, after these problems are optimized, Proxy dataSource can be
used.
protected DataSource generateShardingSphereDataSourceFromProxy() throws
SQLException {
- String dataSourceConfigText = queryForListWithLog("EXPORT DATABASE
CONFIGURATION").get(0).get("result").toString();
- YamlRootConfiguration rootConfig =
YamlEngine.unmarshal(dataSourceConfigText, YamlRootConfiguration.class);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> !getYamlRootConfig().getRules().isEmpty());
+ YamlRootConfiguration rootConfig = getYamlRootConfig();
if (PipelineEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
DockerStorageContainer storageContainer =
((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
String sourceUrl = String.join(":",
storageContainer.getNetworkAliases().get(0),
Integer.toString(storageContainer.getExposedPort()));
@@ -409,4 +420,9 @@ public abstract class PipelineBaseE2EIT {
}
return
YamlShardingSphereDataSourceFactory.createDataSourceWithoutCache(rootConfig);
}
+
+ private YamlRootConfiguration getYamlRootConfig() {
+ String result = queryForListWithLog("EXPORT DATABASE
CONFIGURATION").get(0).get("result").toString();
+ return YamlEngine.unmarshal(result, YamlRootConfiguration.class);
+ }
}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 7ee151ac392..2182776d753 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -35,12 +35,12 @@ import
org.apache.shardingsphere.data.pipeline.core.check.consistency.Consistenc
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
-import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.MySQLIncrementTask;
-import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.PostgreSQLIncrementTask;
+import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -53,6 +53,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.testcontainers.shaded.org.awaitility.Awaitility;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@@ -70,8 +71,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -126,37 +125,34 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
}
createOrderTableRule();
try (Connection connection = getProxyDataSource().getConnection()) {
- initSchemaAndTable(connection);
+ initSchemaAndTable(connection, 2);
}
+ DataSource jdbcDataSource =
generateShardingSphereDataSourceFromProxy();
Pair<List<Object[]>, List<Object[]>> dataPair =
PipelineCaseHelper.generateFullInsertData(getDatabaseType(), 20);
log.info("init data begin: {}", LocalDateTime.now());
- DataSourceExecuteUtil.execute(getProxyDataSource(),
getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()),
dataPair.getLeft());
+ DataSourceExecuteUtil.execute(jdbcDataSource,
getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()),
dataPair.getLeft());
log.info("init data end: {}", LocalDateTime.now());
try (Connection connection =
DriverManager.getConnection(getActualJdbcUrlTemplate(DS_4, false),
getUsername(), getPassword())) {
- initSchemaAndTable(connection);
+ initSchemaAndTable(connection, 0);
}
startCDCClient();
Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> !queryForListWithLog("SHOW STREAMING
LIST").isEmpty());
- if (getDatabaseType() instanceof MySQLDatabaseType) {
- startIncrementTask(new MySQLIncrementTask(getProxyDataSource(),
getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), 20));
- } else {
- startIncrementTask(new
PostgreSQLIncrementTask(getProxyDataSource(), PipelineBaseE2EIT.SCHEMA_NAME,
getSourceTableOrderName(), 20));
- }
+ String jobId = queryForListWithLog("SHOW STREAMING
LIST").get(0).get("id").toString();
+ waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'",
jobId));
+ startIncrementTask(new E2EIncrementalTask(jdbcDataSource,
getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(),
getDatabaseType(), 20));
getIncreaseTaskThread().join(10000);
List<Map<String, Object>> actualProxyList;
- try (Connection connection = getProxyDataSource().getConnection()) {
+ try (Connection connection = jdbcDataSource.getConnection()) {
ResultSet resultSet =
connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER
BY order_id ASC", getOrderTableNameWithSchema()));
actualProxyList = transformResultSetToList(resultSet);
}
- Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() ->
listOrderRecords(getOrderTableNameWithSchema()).size() ==
actualProxyList.size());
- List<Map<String, Object>> actualImportedList =
listOrderRecords(getOrderTableNameWithSchema());
- assertThat(actualProxyList.size(), is(actualImportedList.size()));
+ Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2,
TimeUnit.SECONDS).until(() ->
listOrderRecords(getOrderTableNameWithSchema()).size() ==
actualProxyList.size());
SchemaTableName schemaTableName = getDatabaseType().isSchemaAvailable()
? new SchemaTableName(new
SchemaName(PipelineBaseE2EIT.SCHEMA_NAME), new
TableName(getSourceTableOrderName()))
: new SchemaTableName(new SchemaName(null), new
TableName(getSourceTableOrderName()));
PipelineDataSourceWrapper targetDataSource = new
PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(getActualJdbcUrlTemplate(DS_4,
false), getUsername(), getPassword()),
getDatabaseType());
- PipelineDataSourceWrapper sourceDataSource = new
PipelineDataSourceWrapper(generateShardingSphereDataSourceFromProxy(),
getDatabaseType());
+ PipelineDataSourceWrapper sourceDataSource = new
PipelineDataSourceWrapper(jdbcDataSource, getDatabaseType());
StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(targetDataSource);
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(PipelineBaseE2EIT.SCHEMA_NAME, "t_order");
PipelineColumnMetaData primaryKeyMetaData =
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
@@ -171,15 +167,14 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
}
- private void initSchemaAndTable(final Connection connection) throws
SQLException {
- if (getDatabaseType().isSchemaAvailable()) {
- String sql = String.format("CREATE SCHEMA %s",
PipelineBaseE2EIT.SCHEMA_NAME);
- log.info("create schema sql: {}", sql);
- connection.createStatement().execute(sql);
- }
+ private void initSchemaAndTable(final Connection connection, final int
sleepSeconds) throws SQLException {
+ createSchema(connection, sleepSeconds);
String sql =
getExtraSQLCommand().getCreateTableOrder(getSourceTableOrderName());
log.info("create table sql: {}", sql);
connection.createStatement().execute(sql);
+ if (sleepSeconds > 0) {
+ ThreadUtil.sleep(sleepSeconds, TimeUnit.SECONDS);
+ }
}
private void startCDCClient() {
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index ad7cac969f3..a3a685f991c 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
-import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.MySQLIncrementTask;
+import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -97,7 +97,7 @@ public final class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigration("t_order_item", "t_order_item");
String orderJobId = getJobIdByTableName("ds_0." +
getSourceTableOrderName());
waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'",
orderJobId));
- startIncrementTask(new MySQLIncrementTask(getSourceDataSource(),
getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), 30));
+ startIncrementTask(new E2EIncrementalTask(getSourceDataSource(),
getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(),
getDatabaseType(), 30));
assertMigrationSuccessById(orderJobId, "DATA_MATCH");
String orderItemJobId = getJobIdByTableName("ds_0.t_order_item");
assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 3ba16355467..2b2c0ea2fa2 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseT
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
-import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.PostgreSQLIncrementTask;
+import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
import
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -34,12 +34,14 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -93,7 +95,13 @@ public final class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EI
DataSourceExecuteUtil.execute(getSourceDataSource(),
getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()),
dataPair.getLeft());
DataSourceExecuteUtil.execute(getSourceDataSource(),
getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
log.info("init data end: {}", LocalDateTime.now());
- checkOrderMigration();
+ startMigrationWithSchema(getSourceTableOrderName(), "t_order");
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> listJobId().size() > 0);
+ String jobId = getJobIdByTableName("ds_0.test." +
getSourceTableOrderName());
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
+ startIncrementTask(new E2EIncrementalTask(getSourceDataSource(),
String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName()),
+ new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 20));
+ checkOrderMigration(jobId);
checkOrderItemMigration();
for (String each : listJobId()) {
commitMigrationByJobId(each);
@@ -105,10 +113,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EI
log.info("{} E2E IT finished, database type={}, docker image={}",
this.getClass().getName(), testParam.getDatabaseType(),
testParam.getStorageContainerImage());
}
- private void checkOrderMigration() throws SQLException,
InterruptedException {
- startMigrationWithSchema(getSourceTableOrderName(), "t_order");
- startIncrementTask(new PostgreSQLIncrementTask(getSourceDataSource(),
PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName(), 20));
- String jobId = getJobIdByTableName("ds_0.test." +
getSourceTableOrderName());
+ private void checkOrderMigration(final String jobId) throws SQLException,
InterruptedException {
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
stopMigrationByJobId(jobId);
long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
new file mode 100644
index 00000000000..eadb185cdf7
--- /dev/null
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
+import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Year;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RequiredArgsConstructor
+@Slf4j
+public final class E2EIncrementalTask extends BaseIncrementTask {
+
+ private static final List<String> MYSQL_COLUMN_NAMES =
Arrays.asList("order_id", "user_id", "status", "t_mediumint", "t_smallint",
"t_tinyint", "t_unsigned_int", "t_unsigned_mediumint",
+ "t_unsigned_smallint", "t_unsigned_tinyint", "t_float",
"t_double", "t_decimal", "t_timestamp", "t_datetime", "t_date", "t_time",
"t_year", "t_bit", "t_binary", "t_varbinary", "t_blob",
+ "t_mediumblob", "t_char", "t_text", "t_mediumtext", "t_enum",
"t_set", "t_json");
+
+ private static final List<String> POSTGRESQL_COLUMN_NAMES =
Arrays.asList("order_id", "user_id", "status", "t_int2", "t_numeric", "t_bool",
"t_bytea", "t_char", "t_varchar", "t_float",
+ "t_double", "t_json", "t_jsonb", "t_text", "t_date", "t_time",
"t_timestamp", "t_timestamptz");
+
+ private final DataSource dataSource;
+
+ private final String orderTableName;
+
+ private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
+
+ private final DatabaseType databaseType;
+
+ private final int loopCount;
+
+ @Override
+ public void run() {
+ List<Object[]> orderInsertData =
PipelineCaseHelper.generateOrderInsertData(databaseType,
primaryKeyGenerateAlgorithm, loopCount);
+ List<Object> primaryKeys = new LinkedList<>();
+ for (Object[] each : orderInsertData) {
+ primaryKeys.add(each[0]);
+ insertOrder(each);
+ }
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
+ // TODO 0000-00-00 00:00:00 now will cause consistency check
failed of MySQL.
+ // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
+ updateOrderById(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
+ }
+ for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
+ setNullToAllFields(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
+ deleteOrderById(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
+ }
+ log.info("increment task runnable execute successfully.");
+ }
+
+ private void insertOrder(final Object[] orderInsertData) {
+ String sql;
+ if (databaseType instanceof MySQLDatabaseType) {
+ sql = buildInsertSQL(MYSQL_COLUMN_NAMES);
+ } else if (databaseType instanceof PostgreSQLDatabaseType ||
databaseType instanceof OpenGaussDatabaseType) {
+ sql = buildInsertSQL(POSTGRESQL_COLUMN_NAMES);
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ DataSourceExecuteUtil.execute(dataSource, sql, orderInsertData);
+ }
+
+ private String buildInsertSQL(final List<String> columnNames) {
+ StringBuilder sql = new StringBuilder("INSERT INTO %s (");
+ for (String each : columnNames) {
+ sql.append(each).append(",");
+ }
+ sql.setLength(sql.length() - 1);
+ sql.append(") ").append("VALUES").append("(");
+ for (int i = 0; i < columnNames.size(); i++) {
+ sql.append("?,");
+ }
+ sql.setLength(sql.length() - 1);
+ sql.append(")");
+ return String.format(sql.toString(), orderTableName);
+ }
+
+ private void updateOrderById(final Object orderId) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int randomInt = random.nextInt(-100, 100);
+ if (databaseType instanceof MySQLDatabaseType) {
+ String sql =
String.format(buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), "?"),
orderTableName);
+ log.info("update sql: {}", sql);
+ int randomUnsignedInt = random.nextInt(10, 100);
+ LocalDateTime now = LocalDateTime.now();
+ DataSourceExecuteUtil.execute(dataSource, sql, new
Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt,
randomUnsignedInt, randomUnsignedInt,
+ randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now,
now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, new
byte[]{}, new byte[]{1, 2, -1, -3},
+ "D".getBytes(), "A".getBytes(), "T".getBytes(), "E",
"text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32,
true), orderId});
+ return;
+ }
+ if (databaseType instanceof PostgreSQLDatabaseType || databaseType
instanceof OpenGaussDatabaseType) {
+ String sql =
String.format(buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES),
"?"), orderTableName);
+ log.info("update sql: {}", sql);
+ DataSourceExecuteUtil.execute(dataSource, sql, new
Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{},
"char", "varchar", PipelineCaseHelper.generateFloat(),
+ PipelineCaseHelper.generateDouble(),
PipelineCaseHelper.generateJsonString(10, true),
PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
+ LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()),
OffsetDateTime.now(), orderId});
+ }
+ }
+
+ private String buildUpdateSQL(final List<String> columnNames, final String
placeholder) {
+ StringBuilder sql = new StringBuilder("UPDATE %s SET ");
+ for (String each : columnNames) {
+ sql.append(each).append("=").append(placeholder).append(",");
+ }
+ sql.setLength(sql.length() - 1);
+ sql.append(" WHERE order_id=?");
+ return sql.toString();
+ }
+
+ private List<String> ignoreShardingColumns(final List<String> columnNames)
{
+ return new ArrayList<>(columnNames.subList(2, columnNames.size()));
+ }
+
+ private void deleteOrderById(final Object orderId) {
+ String sql = String.format("DELETE FROM %s WHERE order_id = ?",
orderTableName);
+ DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{orderId});
+ }
+
+ private void setNullToAllFields(final Object orderId) {
+ if (databaseType instanceof MySQLDatabaseType) {
+ String sql =
String.format(buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES),
"null"), orderTableName);
+ DataSourceExecuteUtil.execute(dataSource, sql, new
Object[]{orderId});
+ }
+ }
+}
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
index 2c079ae90ac..23a8c07c588 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ThreadLocalRandom;
@RequiredArgsConstructor
@Slf4j
-// TODO merge MySQL,PostgreSQL increment task
+// TODO remove later
public final class MySQLIncrementTask extends BaseIncrementTask {
private final DataSource dataSource;
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index f413ac43edb..9eddb1870de 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ThreadLocalRandom;
@RequiredArgsConstructor
@Slf4j
+// TODO remove later
public final class PostgreSQLIncrementTask extends BaseIncrementTask {
private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new
SnowflakeKeyGenerateAlgorithm();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
index ac5b31171d7..22f20f386cf 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
@@ -23,7 +23,8 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
@@ -44,17 +45,6 @@ import java.util.concurrent.ThreadLocalRandom;
@Slf4j
public final class PipelineCaseHelper {
- private static final SnowflakeKeyGenerateAlgorithm
SNOWFLAKE_KEY_GENERATE_ALGORITHM = new SnowflakeKeyGenerateAlgorithm();
-
- /**
- * Generate snowflake key.
- *
- * @return snowflake key
- */
- public static long generateSnowflakeKey() {
- return SNOWFLAKE_KEY_GENERATE_ALGORITHM.generateKey();
- }
-
/**
* Generate insert data, contains full fields.
*
@@ -68,30 +58,47 @@ public final class PipelineCaseHelper {
}
AutoIncrementKeyGenerateAlgorithm orderKeyGenerate = new
AutoIncrementKeyGenerateAlgorithm();
AutoIncrementKeyGenerateAlgorithm orderItemKeyGenerate = new
AutoIncrementKeyGenerateAlgorithm();
- List<Object[]> orderData = new ArrayList<>(insertRows);
- List<Object[]> orderItemData = new ArrayList<>(insertRows);
- for (int i = 0; i < insertRows; i++) {
- int orderId = orderKeyGenerate.generateKey();
- int userId = generateInt(0, 6);
- LocalDateTime now = LocalDateTime.now();
- int randomInt = generateInt(-100, 100);
- int randomUnsignedInt = generateInt(0, 100);
- String emojiText = "☠️x☺️x✋x☹️";
- if (databaseType instanceof MySQLDatabaseType) {
- Object[] addObjs = {orderId, userId, generateString(6) + "",
randomInt, randomInt, randomInt,
- randomUnsignedInt, randomUnsignedInt,
randomUnsignedInt, randomUnsignedInt, generateFloat(),
generateDouble(-100000000, 100000000),
- BigDecimal.valueOf(generateDouble(1, 100)), now, now,
now.toLocalDate(), now.toLocalTime(), Year.now().getValue(), "1", "t", "e",
"s", "t", generateString(2),
+ List<Object[]> orderData = generateOrderInsertData(databaseType,
orderKeyGenerate, insertRows);
+ List<Object[]> orderItemData =
generateOrderItemInsertData(orderItemKeyGenerate, insertRows);
+ return Pair.of(orderData, orderItemData);
+ }
+
+ /**
+ * Generate order insert data.
+ *
+ * @param databaseType database type
+ * @param keyGenerateAlgorithm key generate algorithm
+ * @param insertRows insert rows
+ * @return order insert data.
+ */
+ public static List<Object[]> generateOrderInsertData(final DatabaseType
databaseType, final KeyGenerateAlgorithm keyGenerateAlgorithm, final int
insertRows) {
+ List<Object[]> result = new ArrayList<>(insertRows);
+ String emojiText = "☠️x☺️x✋x☹️";
+ if (databaseType instanceof MySQLDatabaseType) {
+ for (int i = 0; i < insertRows; i++) {
+ int randomInt = generateInt(-100, 100);
+ Object orderId = keyGenerateAlgorithm.generateKey();
+ int randomUnsignedInt = generateInt(0, 100);
+ LocalDateTime now = LocalDateTime.now();
+ Object[] addObjs = {orderId, generateInt(0, 100),
generateString(6) + "", randomInt, randomInt, randomInt,
+ randomUnsignedInt, randomUnsignedInt,
randomUnsignedInt, randomUnsignedInt, generateFloat(), generateDouble(),
+ BigDecimal.valueOf(generateDouble()), now, now,
now.toLocalDate(), now.toLocalTime(), Year.now().getValue(), "1", "t", "e",
"s", "t", generateString(2),
emojiText, generateString(1), "1", "2",
generateJsonString(32, false)};
- orderData.add(addObjs);
- } else {
- orderData.add(new Object[]{orderId, userId, generateString(6),
randomInt,
- BigDecimal.valueOf(generateDouble(1, 100)), true,
"bytea".getBytes(), generateString(2), generateString(2), generateFloat(),
generateDouble(0, 1000),
+ result.add(addObjs);
+ }
+ return result;
+ }
+ if (databaseType instanceof PostgreSQLDatabaseType || databaseType
instanceof OpenGaussDatabaseType) {
+ for (int i = 0; i < insertRows; i++) {
+ Object orderId = keyGenerateAlgorithm.generateKey();
+ result.add(new Object[]{orderId, generateInt(0, 100),
generateString(6), generateInt(-128, 127),
+ BigDecimal.valueOf(generateDouble()), true,
"bytea".getBytes(), generateString(2), generateString(2), generateFloat(),
generateDouble(),
generateJsonString(8, false), generateJsonString(12,
true), emojiText, LocalDate.now(),
LocalTime.now(),
Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now()});
}
- orderItemData.add(new Object[]{orderItemKeyGenerate.generateKey(),
orderId, userId, "SUCCESS"});
+ return result;
}
- return Pair.of(orderData, orderItemData);
+ throw new UnsupportedOperationException("now support generate %s
insert data");
}
private static int generateInt(final int min, final int max) {
@@ -120,12 +127,32 @@ public final class PipelineCaseHelper {
return String.format("{\"test\":\"%s\"}", value);
}
- private static float generateFloat() {
- return ThreadLocalRandom.current().nextFloat();
+ /**
+ * Generate float value.
+ *
+ * @return float.
+ */
+ public static float generateFloat() {
+ return ThreadLocalRandom.current().nextInt(-1000, 1000) / 100.0F;
+ }
+
+ /**
+ * Generate double value.
+ *
+ * @return double
+ */
+ public static double generateDouble() {
+ return ThreadLocalRandom.current().nextInt(-1000000000, 1000000000) /
1000000.0D;
}
- private static double generateDouble(final double min, final double max) {
- return ThreadLocalRandom.current().nextDouble(min, max);
+ private static List<Object[]> generateOrderItemInsertData(final
KeyGenerateAlgorithm keyGenerateAlgorithm, final int insertRows) {
+ List<Object[]> result = new ArrayList<>(insertRows);
+ for (int i = 0; i < insertRows; i++) {
+ Object orderId = keyGenerateAlgorithm.generateKey();
+ int userId = generateInt(0, 100);
+ result.add(new Object[]{keyGenerateAlgorithm.generateKey(),
orderId, userId, "SUCCESS"});
+ }
+ return result;
}
/**