This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 22981c8bf34 Compatible with PostgreSQLMigrationGeneralE2EIT failure
(#30434)
22981c8bf34 is described below
commit 22981c8bf34c01cc5270ff1ddc70fe776cc1a944
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Mar 8 17:22:25 2024 +0800
Compatible with PostgreSQLMigrationGeneralE2EIT failure (#30434)
* Revert PipelineJobProgressPersistService.persistNow previous persisting
check
* Update PostgreSQLMigrationGeneralE2EIT
---
.../job/progress/persist/PipelineJobProgressPersistService.java | 5 +----
.../cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java | 7 +++----
2 files changed, 4 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index ec9fdd4a5e0..7ebbf5b57af 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -101,10 +101,7 @@ public final class PipelineJobProgressPersistService {
*/
public static void persistNow(final String jobId, final int shardingItem) {
getPersistContext(jobId, shardingItem).ifPresent(persistContext -> {
- if (null ==
persistContext.getBeforePersistingProgressMillis().get()) {
- log.warn("Force persisting progress is not permitted since not
previous persisting, jobId={}, shardingItem={}", jobId, shardingItem);
- return;
- }
+ // TODO Recover persistContext.getBeforePersistingProgressMillis()
null check after compatible with PostgreSQLMigrationGeneralE2EIT
notifyPersist(persistContext);
PersistJobContextRunnable.persist(jobId, shardingItem,
persistContext);
});
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 0878a10877d..2b610cf89e0 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -81,7 +81,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME,
"t_order");
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
String jobId = getJobIdByTableName(containerComposer, "ds_0.test."
+ SOURCE_TABLE_NAME);
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
String schemaTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName,
new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
@@ -89,7 +89,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
schemaTableName, 10000);
- checkOrderMigration(containerComposer, jobId);
+ checkOrderMigration(containerComposer, jdbcDataSource, jobId);
checkOrderItemMigration(containerComposer);
for (String each : listJobId(containerComposer)) {
commitMigrationByJobId(containerComposer, each);
@@ -100,14 +100,13 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
}
- private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
+ private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final DataSource jdbcDataSource, final String jobId) throws
SQLException {
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(containerComposer, jobId);
long recordId = new
SnowflakeKeyGenerateAlgorithm().generateKeys(mock(AlgorithmSQLContext.class),
1).iterator().next();
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (%s, %s, '%s')",
String.join(".", PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
startMigrationByJobId(containerComposer, jobId);
- DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME),
recordId);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}