This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d77494beb2a Fix TestDecodingPlugin.decode BEGIN TX and COMMIT TX event
(#30440)
d77494beb2a is described below
commit d77494beb2a40ccb452ead905af7dd9b69e220ed
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Mar 9 18:17:37 2024 +0800
Fix TestDecodingPlugin.decode BEGIN TX and COMMIT TX event (#30440)
* Print exception on job progress persisting failure
* Fix TestDecodingPlugin.decode BEGIN TX and COMMIT TX event
* Recover previous persisting nonnull check on force persisting progress to
avoid invalid progress overwriting
---
.../persist/PipelineJobProgressPersistContext.java | 2 ++
.../persist/PipelineJobProgressPersistService.java | 20 +++++++++++++++++++-
.../ingest/wal/decode/TestDecodingPlugin.java | 11 ++++++-----
.../ingest/wal/decode/TestDecodingPluginTest.java | 18 ++++++++++++++++++
4 files changed, 45 insertions(+), 6 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
index 9441b511b4b..d91b04a9102 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
@@ -37,4 +37,6 @@ public final class PipelineJobProgressPersistContext {
private final AtomicBoolean hasNewEvents = new AtomicBoolean(false);
private final AtomicReference<Long> beforePersistingProgressMillis = new
AtomicReference<>(null);
+
+ private final AtomicBoolean firstExceptionLogged = new
AtomicBoolean(false);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 7ebbf5b57af..f40b1a6710a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -101,7 +101,10 @@ public final class PipelineJobProgressPersistService {
*/
public static void persistNow(final String jobId, final int shardingItem) {
getPersistContext(jobId, shardingItem).ifPresent(persistContext -> {
- // TODO Recover persistContext.getBeforePersistingProgressMillis()
null check after compatible with PostgreSQLMigrationGeneralE2EIT
+ if (null ==
persistContext.getBeforePersistingProgressMillis().get()) {
+ log.warn("Force persisting progress is not permitted since
there is no previous persisting, jobId={}, shardingItem={}", jobId,
shardingItem);
+ return;
+ }
notifyPersist(persistContext);
PersistJobContextRunnable.persist(jobId, shardingItem,
persistContext);
});
@@ -117,6 +120,21 @@ public final class PipelineJobProgressPersistService {
}
private static synchronized void persist(final String jobId, final int
shardingItem, final PipelineJobProgressPersistContext persistContext) {
+ try {
+ persist0(jobId, shardingItem, persistContext);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ if (!persistContext.getFirstExceptionLogged().get()) {
+ log.error("Persist job progress failed, jobId={},
shardingItem={}", jobId, shardingItem, ex);
+ persistContext.getFirstExceptionLogged().set(true);
+ } else if (5 == ThreadLocalRandom.current().nextInt(60)) {
+ log.error("Persist job progress failed, jobId={},
shardingItem={}", jobId, shardingItem, ex);
+ }
+ }
+ }
+
+ private static void persist0(final String jobId, final int
shardingItem, final PipelineJobProgressPersistContext persistContext) {
Long beforePersistingProgressMillis =
persistContext.getBeforePersistingProgressMillis().get();
if ((null == beforePersistingProgressMillis ||
System.currentTimeMillis() - beforePersistingProgressMillis <
TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
&& !persistContext.getHasNewEvents().get()) {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index b5d3a96e7ae..285f8810650 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -48,14 +48,15 @@ public final class TestDecodingPlugin implements
DecodingPlugin {
@Override
public AbstractWALEvent decode(final ByteBuffer data, final
BaseLogSequenceNumber logSequenceNumber) {
+ AbstractWALEvent result;
String type = readEventType(data);
if (type.startsWith("BEGIN")) {
- return new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+ result = new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+ } else if (type.startsWith("COMMIT")) {
+ result = new CommitTXEvent(Long.parseLong(readNextSegment(data)),
null);
+ } else {
+ result = "table".equals(type) ? readTableEvent(data) : new
PlaceholderEvent();
}
- if (type.startsWith("COMMIT")) {
- return new CommitTXEvent(Long.parseLong(readNextSegment(data)),
null);
- }
- AbstractWALEvent result = "table".equals(type) ? readTableEvent(data)
: new PlaceholderEvent();
result.setLogSequenceNumber(logSequenceNumber);
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
index 8ed9a27ed28..eed559fd3b4 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -45,6 +47,22 @@ class TestDecodingPluginTest {
private final PostgreSQLLogSequenceNumber logSequenceNumber = new
PostgreSQLLogSequenceNumber(pgSequenceNumber);
+ @Test
+ void assertDecodeBeginTxEvent() {
+ ByteBuffer data = ByteBuffer.wrap("BEGIN
616281".getBytes(StandardCharsets.UTF_8));
+ BeginTXEvent actual = (BeginTXEvent) new
TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+ assertThat(actual.getXid(), is(616281L));
+ }
+
+ @Test
+ void assertDecodeCommitTxEvent() {
+ ByteBuffer data = ByteBuffer.wrap("COMMIT
616281".getBytes(StandardCharsets.UTF_8));
+ CommitTXEvent actual = (CommitTXEvent) new
TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+ assertThat(actual.getXid(), is(616281L));
+ }
+
@Test
void assertDecodeWriteRowEvent() {
ByteBuffer data = ByteBuffer.wrap(("table public.test: INSERT:
data[character varying]:' 1 2 3'' 😊ä¸' t_json_empty[json]:'{}'
t_json[json]:'{\"test\":\"ä¸ä¸{ä¸ä¸}' ä¸\"}'"