This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d77494beb2a Fix TestDecodingPlugin.decode BEGIN TX and COMMIT TX event 
(#30440)
d77494beb2a is described below

commit d77494beb2a40ccb452ead905af7dd9b69e220ed
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Mar 9 18:17:37 2024 +0800

    Fix TestDecodingPlugin.decode BEGIN TX and COMMIT TX event (#30440)
    
    * Print exception on job progress persisting failure
    
    * Fix TestDecodingPlugin.decode BEGIN TX and COMMIT TX event
    
    * Recover previous persisting nonnull check on force persisting progress to 
avoid invalid progress overwriting
---
 .../persist/PipelineJobProgressPersistContext.java   |  2 ++
 .../persist/PipelineJobProgressPersistService.java   | 20 +++++++++++++++++++-
 .../ingest/wal/decode/TestDecodingPlugin.java        | 11 ++++++-----
 .../ingest/wal/decode/TestDecodingPluginTest.java    | 18 ++++++++++++++++++
 4 files changed, 45 insertions(+), 6 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
index 9441b511b4b..d91b04a9102 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
@@ -37,4 +37,6 @@ public final class PipelineJobProgressPersistContext {
     private final AtomicBoolean hasNewEvents = new AtomicBoolean(false);
     
     private final AtomicReference<Long> beforePersistingProgressMillis = new 
AtomicReference<>(null);
+    
+    private final AtomicBoolean firstExceptionLogged = new 
AtomicBoolean(false);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 7ebbf5b57af..f40b1a6710a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -101,7 +101,10 @@ public final class PipelineJobProgressPersistService {
      */
     public static void persistNow(final String jobId, final int shardingItem) {
         getPersistContext(jobId, shardingItem).ifPresent(persistContext -> {
-            // TODO Recover persistContext.getBeforePersistingProgressMillis() 
null check after compatible with PostgreSQLMigrationGeneralE2EIT
+            if (null == 
persistContext.getBeforePersistingProgressMillis().get()) {
+                log.warn("Force persisting progress is not permitted since 
there is no previous persisting, jobId={}, shardingItem={}", jobId, 
shardingItem);
+                return;
+            }
             notifyPersist(persistContext);
             PersistJobContextRunnable.persist(jobId, shardingItem, 
persistContext);
         });
@@ -117,6 +120,21 @@ public final class PipelineJobProgressPersistService {
         }
         
         private static synchronized void persist(final String jobId, final int 
shardingItem, final PipelineJobProgressPersistContext persistContext) {
+            try {
+                persist0(jobId, shardingItem, persistContext);
+                // CHECKSTYLE:OFF
+            } catch (final RuntimeException ex) {
+                // CHECKSTYLE:ON
+                if (!persistContext.getFirstExceptionLogged().get()) {
+                    log.error("Persist job progress failed, jobId={}, 
shardingItem={}", jobId, shardingItem, ex);
+                    persistContext.getFirstExceptionLogged().set(true);
+                } else if (5 == ThreadLocalRandom.current().nextInt(60)) {
+                    log.error("Persist job progress failed, jobId={}, 
shardingItem={}", jobId, shardingItem, ex);
+                }
+            }
+        }
+        
+        private static void persist0(final String jobId, final int 
shardingItem, final PipelineJobProgressPersistContext persistContext) {
             Long beforePersistingProgressMillis = 
persistContext.getBeforePersistingProgressMillis().get();
             if ((null == beforePersistingProgressMillis || 
System.currentTimeMillis() - beforePersistingProgressMillis < 
TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
                     && !persistContext.getHasNewEvents().get()) {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index b5d3a96e7ae..285f8810650 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -48,14 +48,15 @@ public final class TestDecodingPlugin implements 
DecodingPlugin {
     
     @Override
     public AbstractWALEvent decode(final ByteBuffer data, final 
BaseLogSequenceNumber logSequenceNumber) {
+        AbstractWALEvent result;
         String type = readEventType(data);
         if (type.startsWith("BEGIN")) {
-            return new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+            result = new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+        } else if (type.startsWith("COMMIT")) {
+            result = new CommitTXEvent(Long.parseLong(readNextSegment(data)), 
null);
+        } else {
+            result = "table".equals(type) ? readTableEvent(data) : new 
PlaceholderEvent();
         }
-        if (type.startsWith("COMMIT")) {
-            return new CommitTXEvent(Long.parseLong(readNextSegment(data)), 
null);
-        }
-        AbstractWALEvent result = "table".equals(type) ? readTableEvent(data) 
: new PlaceholderEvent();
         result.setLogSequenceNumber(logSequenceNumber);
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
index 8ed9a27ed28..eed559fd3b4 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
@@ -19,6 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
 
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -45,6 +47,22 @@ class TestDecodingPluginTest {
     
     private final PostgreSQLLogSequenceNumber logSequenceNumber = new 
PostgreSQLLogSequenceNumber(pgSequenceNumber);
     
+    @Test
+    void assertDecodeBeginTxEvent() {
+        ByteBuffer data = ByteBuffer.wrap("BEGIN 
616281".getBytes(StandardCharsets.UTF_8));
+        BeginTXEvent actual = (BeginTXEvent) new 
TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+        assertThat(actual.getXid(), is(616281L));
+    }
+    
+    @Test
+    void assertDecodeCommitTxEvent() {
+        ByteBuffer data = ByteBuffer.wrap("COMMIT 
616281".getBytes(StandardCharsets.UTF_8));
+        CommitTXEvent actual = (CommitTXEvent) new 
TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+        assertThat(actual.getXid(), is(616281L));
+    }
+    
     @Test
     void assertDecodeWriteRowEvent() {
         ByteBuffer data = ByteBuffer.wrap(("table public.test: INSERT: 
data[character varying]:' 1 2 3'' 😊中' t_json_empty[json]:'{}' 
t_json[json]:'{\"test\":\"中中{中中}' 中\"}'"

Reply via email to