This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 22981c8bf34 Compatible with PostgreSQLMigrationGeneralE2EIT failure 
(#30434)
22981c8bf34 is described below

commit 22981c8bf34c01cc5270ff1ddc70fe776cc1a944
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Mar 8 17:22:25 2024 +0800

    Compatible with PostgreSQLMigrationGeneralE2EIT failure (#30434)
    
    * Revert PipelineJobProgressPersistService.persistNow previous persisting 
check
    
    * Update PostgreSQLMigrationGeneralE2EIT
---
 .../job/progress/persist/PipelineJobProgressPersistService.java    | 5 +----
 .../cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java   | 7 +++----
 2 files changed, 4 insertions(+), 8 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index ec9fdd4a5e0..7ebbf5b57af 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -101,10 +101,7 @@ public final class PipelineJobProgressPersistService {
      */
     public static void persistNow(final String jobId, final int shardingItem) {
         getPersistContext(jobId, shardingItem).ifPresent(persistContext -> {
-            if (null == 
persistContext.getBeforePersistingProgressMillis().get()) {
-                log.warn("Force persisting progress is not permitted since not 
previous persisting, jobId={}, shardingItem={}", jobId, shardingItem);
-                return;
-            }
+            // TODO Recover persistContext.getBeforePersistingProgressMillis() 
null check after compatible with PostgreSQLMigrationGeneralE2EIT
             notifyPersist(persistContext);
             PersistJobContextRunnable.persist(jobId, shardingItem, 
persistContext);
         });
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 0878a10877d..2b610cf89e0 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -81,7 +81,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
             String jobId = getJobIdByTableName(containerComposer, "ds_0.test." 
+ SOURCE_TABLE_NAME);
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             String schemaTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
             containerComposer.startIncrementTask(new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName, 
new SnowflakeKeyGenerateAlgorithm(),
                     containerComposer.getDatabaseType(), 20));
@@ -89,7 +89,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
schemaTableName, 10000);
-            checkOrderMigration(containerComposer, jobId);
+            checkOrderMigration(containerComposer, jdbcDataSource, jobId);
             checkOrderItemMigration(containerComposer);
             for (String each : listJobId(containerComposer)) {
                 commitMigrationByJobId(containerComposer, each);
@@ -100,14 +100,13 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         }
     }
     
-    private void checkOrderMigration(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
+    private void checkOrderMigration(final PipelineContainerComposer 
containerComposer, final DataSource jdbcDataSource, final String jobId) throws 
SQLException {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(containerComposer, jobId);
         long recordId = new 
SnowflakeKeyGenerateAlgorithm().generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next();
         containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s 
(order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(containerComposer, jobId);
-        DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
         containerComposer.assertOrderRecordExist(jdbcDataSource, 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), 
recordId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }

Reply via email to