This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d3e77914e7 Pipeline support parallel decoding plugin of openGauss 
(#30498)
5d3e77914e7 is described below

commit 5d3e77914e788061528a5b7ca0156baa505789b9
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Mar 18 19:16:36 2024 +0800

    Pipeline support parallel decoding plugin of openGauss (#30498)
    
    * Support parallel decoding plugin
    
    * Add enmotech/opengauss:3.0.0
    
    * Improve
    
    * Rename method
---
 .github/workflows/e2e-operation.yml                |  4 +-
 .../opengauss/ingest/OpenGaussWALDumper.java       | 66 ++++++++++++++++++----
 .../ingest/wal/OpenGaussLogicalReplication.java    | 16 +++++-
 .../ingest/wal/decode/MppdbDecodingPlugin.java     | 30 ++++++++--
 .../opengauss/ingest/OpenGaussWALDumperTest.java   | 41 ++++++++++++++
 .../ingest/wal/decode/MppdbDecodingPluginTest.java | 59 +++++++++++++------
 .../ingest/wal/decode/TestDecodingPlugin.java      |  2 +-
 .../postgresql/ingest/wal/event/BeginTXEvent.java  |  4 +-
 .../ingest/wal/WALEventConverterTest.java          |  2 +-
 9 files changed, 184 insertions(+), 40 deletions(-)

diff --git a/.github/workflows/e2e-operation.yml 
b/.github/workflows/e2e-operation.yml
index 8b32ceb6b18..372ed5d1d74 100644
--- a/.github/workflows/e2e-operation.yml
+++ b/.github/workflows/e2e-operation.yml
@@ -60,12 +60,12 @@ jobs:
       fail-fast: false
       matrix:
         operation: [ transaction, pipeline, showprocesslist ]
-        image: [ { type: "it.docker.mysql.version", version: "mysql:5.7" }, { 
type: "it.docker.postgresql.version", version: "postgres:12-alpine" }, { type: 
"it.docker.opengauss.version", version: "enmotech/opengauss:2.1.0" } ]
+        image: [ { type: "it.docker.mysql.version", version: "mysql:5.7" }, { 
type: "it.docker.postgresql.version", version: "postgres:12-alpine" }, { type: 
"it.docker.opengauss.version", version: 
"enmotech/opengauss:2.1.0,enmotech/opengauss:3.1.0" } ]
         exclude:
           - operation: showprocesslist
             image: { type: "it.docker.postgresql.version", version: 
"postgres:12-alpine" }
           - operation: showprocesslist
-            image: { type: "it.docker.opengauss.version", version: 
"enmotech/opengauss:2.1.0" }
+            image: { type: "it.docker.opengauss.version", version: 
"enmotech/opengauss:2.1.0,enmotech/opengauss:3.1.0" }
     steps:
       - env:
           changed_operations: ${{ 
needs.detect-changed-files.outputs.changed_operations }}
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index e8437ad7970..3aeac60402c 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -19,15 +19,15 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
@@ -46,12 +46,18 @@ import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.PGReplicationStream;
 
 import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * WAL dumper of openGauss.
@@ -60,6 +66,10 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
     
+    private static final Pattern VERSION_PATTERN = 
Pattern.compile("^\\(openGauss (\\d)");
+    
+    private static final int DEFAULT_VERSION = 2;
+    
     private final IncrementalDumperContext dumperContext;
     
     private final AtomicReference<WALPosition> walPosition;
@@ -74,6 +84,8 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
     
     private List<AbstractRowEvent> rowEvents = new LinkedList<>();
     
+    private final AtomicReference<Long> currentCsn = new AtomicReference<>();
+    
     public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, 
final IngestPosition position,
                               final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
         
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
@@ -110,10 +122,11 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
     @SneakyThrows(InterruptedException.class)
     private void dump() throws SQLException {
         PGReplicationStream stream = null;
+        int majorVersion = getMajorVersion();
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
             stream = logicalReplication.createReplicationStream(connection, 
walPosition.get().getLogSequenceNumber(),
-                    
OpenGaussIngestPositionManager.getUniqueSlotName(connection, 
dumperContext.getJobId()));
-            DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new 
OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX);
+                    
OpenGaussIngestPositionManager.getUniqueSlotName(connection, 
dumperContext.getJobId()), majorVersion);
+            DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new 
OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX, 
majorVersion >= 3);
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
@@ -122,7 +135,7 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new 
OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
                 if (decodeWithTX) {
-                    processEventWithTX(event);
+                    processEventWithTX(event, majorVersion);
                 } else {
                     processEventIgnoreTX(event);
                 }
@@ -138,28 +151,61 @@ public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable
         }
     }
     
+    private int getMajorVersion() throws SQLException {
+        StandardPipelineDataSourceConfiguration dataSourceConfig = 
(StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig();
+        try (
+                Connection connection = 
DriverManager.getConnection(dataSourceConfig.getUrl(), 
dataSourceConfig.getUsername(), dataSourceConfig.getPassword());
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery("SELECT 
version()")) {
+            resultSet.next();
+            String versionText = resultSet.getString(1);
+            return parseMajorVersion(versionText);
+        }
+    }
+    
+    private int parseMajorVersion(final String versionText) {
+        Matcher matcher = VERSION_PATTERN.matcher(versionText);
+        boolean isFind = matcher.find();
+        log.info("openGauss major version={}, `select version()`={}", isFind ? 
matcher.group(1) : DEFAULT_VERSION, versionText);
+        if (isFind) {
+            return Integer.parseInt(matcher.group(1));
+        }
+        return DEFAULT_VERSION;
+    }
+    
     private PgConnection getReplicationConnectionUnwrap() throws SQLException {
         return 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class);
     }
     
-    private void processEventWithTX(final AbstractWALEvent event) {
+    private void processEventWithTX(final AbstractWALEvent event, final int 
majorVersion) {
         if (event instanceof BeginTXEvent) {
+            if (majorVersion < 3) {
+                return;
+            }
+            if (!rowEvents.isEmpty()) {
+                log.warn("Commit event parse have problem, there still has 
uncommitted row events size={}, ", rowEvents.size());
+            }
+            currentCsn.set(((BeginTXEvent) event).getCsn());
             return;
         }
         if (event instanceof AbstractRowEvent) {
-            rowEvents.add((AbstractRowEvent) event);
+            AbstractRowEvent rowEvent = (AbstractRowEvent) event;
+            rowEvent.setCsn(currentCsn.get());
+            rowEvents.add(rowEvent);
             return;
         }
         if (event instanceof CommitTXEvent) {
-            Long csn = ((CommitTXEvent) event).getCsn();
             List<Record> records = new LinkedList<>();
             for (AbstractRowEvent each : rowEvents) {
-                each.setCsn(csn);
+                if (majorVersion < 3) {
+                    each.setCsn(((CommitTXEvent) event).getCsn());
+                }
                 records.add(walEventConverter.convert(each));
             }
             records.add(walEventConverter.convert(event));
             channel.push(records);
             rowEvents = new LinkedList<>();
+            currentCsn.set(null);
         }
     }
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
index 082134dc0f2..cc8a37d9389 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
@@ -28,6 +28,7 @@ import org.opengauss.PGProperty;
 import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.LogSequenceNumber;
 import org.opengauss.replication.PGReplicationStream;
+import org.opengauss.replication.fluent.logical.ChainedLogicalStreamBuilder;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -86,17 +87,26 @@ public final class OpenGaussLogicalReplication {
      * @param connection connection
      * @param startPosition start position
      * @param slotName slot name
+     * @param majorVersion version
      * @return replication stream
      * @throws SQLException SQL exception
      */
-    public PGReplicationStream createReplicationStream(final PgConnection 
connection, final BaseLogSequenceNumber startPosition, final String slotName) 
throws SQLException {
-        return connection.getReplicationAPI()
+    public PGReplicationStream createReplicationStream(final PgConnection 
connection, final BaseLogSequenceNumber startPosition, final String slotName,
+                                                       final int majorVersion) 
throws SQLException {
+        ChainedLogicalStreamBuilder logicalStreamBuilder = 
connection.getReplicationAPI()
                 .replicationStream()
                 .logical()
                 .withSlotName(slotName)
                 .withSlotOption("include-xids", true)
                 .withSlotOption("skip-empty-xacts", true)
-                .withStartPosition((LogSequenceNumber) startPosition.get())
+                .withStartPosition((LogSequenceNumber) startPosition.get());
+        if (majorVersion < 3) {
+            return logicalStreamBuilder.start();
+        }
+        return logicalStreamBuilder
+                .withSlotOption("parallel-decode-num", 10)
+                .withSlotOption("decode-style", "j")
+                .withSlotOption("sending-batch", 0)
                 .start();
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 8f7396a5182..f15ce626cad 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -57,9 +57,7 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
     
     private final boolean decodeWithTX;
     
-    public MppdbDecodingPlugin(final BaseTimestampUtils timestampUtils) {
-        this(timestampUtils, false);
-    }
+    private final boolean decodeParallelly;
     
     @Override
     public AbstractWALEvent decode(final ByteBuffer data, final 
BaseLogSequenceNumber logSequenceNumber) {
@@ -77,10 +75,18 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
     }
     
     private AbstractWALEvent decodeDataWithTX(final String dataText) {
+        if (decodeParallelly) {
+            return decodeParallelly(dataText);
+        } else {
+            return decodeSerially(dataText);
+        }
+    }
+    
+    private AbstractWALEvent decodeSerially(final String dataText) {
         AbstractWALEvent result = new PlaceholderEvent();
         if (dataText.startsWith("BEGIN")) {
             int beginIndex = dataText.indexOf("BEGIN") + "BEGIN".length() + 1;
-            result = new 
BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)));
+            result = new 
BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
         } else if (dataText.startsWith("COMMIT")) {
             int commitBeginIndex = dataText.indexOf("COMMIT") + 
"COMMIT".length() + 1;
             int csnBeginIndex = dataText.indexOf("CSN") + "CSN".length() + 1;
@@ -91,6 +97,22 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
         return result;
     }
     
+    private AbstractWALEvent decodeParallelly(final String dataText) {
+        AbstractWALEvent result = new PlaceholderEvent();
+        if (dataText.startsWith("BEGIN")) {
+            int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1;
+            int firstLsnIndex = dataText.indexOf("first_lsn");
+            long csn = firstLsnIndex > 0 ? 
Long.parseLong(dataText.substring(beginIndex, firstLsnIndex - 1)) : 0L;
+            result = new BeginTXEvent(null, csn);
+        } else if (dataText.startsWith("commit") || 
dataText.startsWith("COMMIT")) {
+            int beginIndex = dataText.indexOf("xid:") + "xid:".length() + 1;
+            result = new 
CommitTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
+        } else if (dataText.startsWith("{")) {
+            result = readTableEvent(dataText);
+        }
+        return result;
+    }
+    
     private AbstractWALEvent decodeDataIgnoreTX(final String dataText) {
         return dataText.startsWith("{") ? readTableEvent(dataText) : new 
PlaceholderEvent();
     }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java
new file mode 100644
index 00000000000..21abeb075b4
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
+
+import org.apache.shardingsphere.infra.util.reflection.ReflectionUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+class OpenGaussWALDumperTest {
+    
+    @Test
+    void assertGetVersion() throws NoSuchMethodException {
+        OpenGaussWALDumper dumper = mock(OpenGaussWALDumper.class);
+        int version = 
ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion",
 String.class), dumper,
+                "(openGauss 3.1.0 build ) compiled at 2023-02-17 16:13:51 
commit 0 last mr   on x86_64-unknown-linux-gnu, compiled by g++ (GCC) 7.3.0, 
64-bit");
+        assertThat(version, is(3));
+        OpenGaussWALDumper mock = mock(OpenGaussWALDumper.class);
+        version = 
ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion",
 String.class), mock, "(openGauss 5.0.1 build )");
+        assertThat(version, is(5));
+        version = 
ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion",
 String.class), mock, "not match");
+        assertThat(version, is(2));
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
index 221add916f7..3e7726d598a 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
@@ -45,6 +45,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -65,7 +66,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsName(IntStream.range(0, 
insertTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new));
         tableData.setColumnsVal(IntStream.range(0, 
insertTypes.length).mapToObj(idx -> "'1 2 3'").toArray(String[]::new));
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         IntStream.range(0, insertTypes.length).forEach(each -> 
assertThat(actual.getAfterRow().get(each), is("1 2 3")));
@@ -80,7 +81,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"character varying"});
         tableData.setColumnsVal(new String[]{"'1 2 3'"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        UpdateRowEvent actual = (UpdateRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        UpdateRowEvent actual = (UpdateRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         assertThat(actual.getAfterRow().get(0), is("1 2 3"));
@@ -97,7 +98,7 @@ class MppdbDecodingPluginTest {
         tableData.setOldKeysName(IntStream.range(0, 
deleteTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new));
         tableData.setOldKeysVal(deleteValues);
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        DeleteRowEvent actual = (DeleteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        DeleteRowEvent actual = (DeleteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         IntStream.range(0, deleteTypes.length).forEach(each -> 
assertThat(actual.getPrimaryKeys().get(each).toString(), 
is(deleteValues[each])));
@@ -112,7 +113,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"money"});
         tableData.setColumnsVal(new String[]{"'$1.08'"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         Object byteaObj = actual.getAfterRow().get(0);
@@ -128,7 +129,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"boolean"});
         tableData.setColumnsVal(new String[]{Boolean.TRUE.toString()});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         Object byteaObj = actual.getAfterRow().get(0);
@@ -155,7 +156,7 @@ class MppdbDecodingPluginTest {
         when(timestampUtils.toTimestamp(null, 
"2010-12-12")).thenReturn(Timestamp.valueOf("2010-12-12 00:00:00.0"));
         when(timestampUtils.toTimestamp(null, "2013-12-11 
pst")).thenReturn(Timestamp.valueOf("2013-12-11 16:00:00.0"));
         when(timestampUtils.toTimestamp(null, "2003-04-12 
04:05:06")).thenReturn(Timestamp.valueOf("2003-04-12 04:05:00.0"));
-        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(new 
OpenGaussTimestampUtils(timestampUtils)).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(new 
OpenGaussTimestampUtils(timestampUtils), false, false).decode(data, 
logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         IntStream.range(0, insertTypes.length).forEach(each -> 
assertThat(actual.getAfterRow().get(each).toString(), is(compareValues[each])));
@@ -170,7 +171,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"bytea"});
         tableData.setColumnsVal(new String[]{"'\\xff00ab'"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         Object byteaObj = actual.getAfterRow().get(0);
@@ -187,7 +188,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"raw"});
         tableData.setColumnsVal(new String[]{"'7D'"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         Object byteaObj = actual.getAfterRow().get(0);
@@ -198,7 +199,7 @@ class MppdbDecodingPluginTest {
     @Test
     void assertDecodeUnknownTableType() {
         ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
-        assertThat(new MppdbDecodingPlugin(null).decode(data, 
logSequenceNumber), instanceOf(PlaceholderEvent.class));
+        assertThat(new MppdbDecodingPlugin(null, false, false).decode(data, 
logSequenceNumber), instanceOf(PlaceholderEvent.class));
     }
     
     @Test
@@ -210,7 +211,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"character varying"});
         tableData.setColumnsVal(new String[]{"1 2 3"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        assertThrows(IngestException.class, () -> new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber));
+        assertThrows(IngestException.class, () -> new 
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber));
     }
     
     @Test
@@ -224,11 +225,11 @@ class MppdbDecodingPluginTest {
         TimestampUtils timestampUtils = mock(TimestampUtils.class);
         when(timestampUtils.toTime(null, "1 2 3")).thenThrow(new 
SQLException(""));
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        assertThrows(DecodingException.class, () -> new 
MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), 
true).decode(data, logSequenceNumber));
+        assertThrows(DecodingException.class, () -> new 
MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), true, 
false).decode(data, logSequenceNumber));
     }
     
     @Test
-    void assertDecodeWithXid() {
+    void assertDecodeWithTx() {
         MppTableData tableData = new MppTableData();
         tableData.setTableName("public.test");
         tableData.setOpType("INSERT");
@@ -237,7 +238,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsVal(new String[]{"'7D'"});
         List<String> dataList = Arrays.asList("BEGIN 1", 
JsonUtils.toJsonString(tableData), JsonUtils.toJsonString(tableData),
                 "COMMIT 1 (at 2022-10-27 04:19:39.476261+00) CSN 3468");
-        MppdbDecodingPlugin mppdbDecodingPlugin = new 
MppdbDecodingPlugin(null, true);
+        MppdbDecodingPlugin mppdbDecodingPlugin = new 
MppdbDecodingPlugin(null, true, false);
         List<AbstractWALEvent> expectedEvent = new LinkedList<>();
         for (String each : dataList) {
             
expectedEvent.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()), 
logSequenceNumber));
@@ -245,13 +246,35 @@ class MppdbDecodingPluginTest {
         assertThat(expectedEvent.size(), is(4));
         AbstractWALEvent actualFirstEvent = expectedEvent.get(0);
         assertInstanceOf(BeginTXEvent.class, actualFirstEvent);
-        assertThat(((BeginTXEvent) actualFirstEvent).getXid(), is(1L));
         AbstractWALEvent actualLastEvent = 
expectedEvent.get(expectedEvent.size() - 1);
         assertInstanceOf(CommitTXEvent.class, actualLastEvent);
         assertThat(((CommitTXEvent) actualLastEvent).getCsn(), is(3468L));
         assertThat(((CommitTXEvent) actualLastEvent).getXid(), is(1L));
     }
     
+    @Test
+    void assertParallelDecodeWithTx() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"raw"});
+        tableData.setColumnsVal(new String[]{"'7D'"});
+        List<String> dataList = Arrays.asList("BEGIN CSN: 951909 first_lsn: 
5/59825858", JsonUtils.toJsonString(tableData), 
JsonUtils.toJsonString(tableData), "commit xid: 1006076");
+        MppdbDecodingPlugin mppdbDecodingPlugin = new 
MppdbDecodingPlugin(null, true, true);
+        List<AbstractWALEvent> actual = new LinkedList<>();
+        for (String each : dataList) {
+            
actual.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()), 
logSequenceNumber));
+        }
+        assertThat(actual.size(), is(4));
+        assertInstanceOf(BeginTXEvent.class, actual.get(0));
+        assertThat(((BeginTXEvent) actual.get(0)).getCsn(), is(951909L));
+        assertThat(((WriteRowEvent) 
actual.get(1)).getAfterRow().get(0).toString(), is("7D"));
+        assertThat(((WriteRowEvent) 
actual.get(2)).getAfterRow().get(0).toString(), is("7D"));
+        assertThat(((CommitTXEvent) actual.get(3)).getXid(), is(1006076L));
+        assertNull(((CommitTXEvent) actual.get(3)).getCsn());
+    }
+    
     @Test
     void assertDecodeWithTsrange() {
         MppTableData tableData = new MppTableData();
@@ -261,7 +284,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"tsrange"});
         tableData.setColumnsVal(new String[]{"'[\"2020-01-01 
00:00:00\",\"2021-01-01 00:00:00\")'"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         Object byteaObj = actual.getAfterRow().get(0);
         assertThat(byteaObj, instanceOf(PGobject.class));
         assertThat(byteaObj.toString(), is("[\"2020-01-01 
00:00:00\",\"2021-01-01 00:00:00\")"));
@@ -276,7 +299,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"daterange"});
         tableData.setColumnsVal(new String[]{"'[2020-01-02,2021-01-02)'"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         Object byteaObj = actual.getAfterRow().get(0);
         assertThat(byteaObj, instanceOf(PGobject.class));
         assertThat(byteaObj.toString(), is("[2020-01-02,2021-01-02)"));
@@ -291,7 +314,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"tsquery"});
         tableData.setColumnsVal(new String[]{"'''fff'' | ''faa'''"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         Object byteaObj = actual.getAfterRow().get(0);
         assertThat(byteaObj.toString(), is("'fff' | 'faa'"));
     }
@@ -305,7 +328,7 @@ class MppdbDecodingPluginTest {
         tableData.setColumnsType(new String[]{"tinyint"});
         tableData.setColumnsVal(new String[]{"255"});
         ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
-        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, false).decode(data, logSequenceNumber);
         Object byteaObj = actual.getAfterRow().get(0);
         assertThat(byteaObj, is(255));
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index e02bd7a5d4f..0f819728867 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -53,7 +53,7 @@ public final class TestDecodingPlugin implements 
DecodingPlugin {
         AbstractWALEvent result;
         String type = readEventType(data);
         if (type.startsWith("BEGIN")) {
-            result = new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+            result = new BeginTXEvent(Long.parseLong(readNextSegment(data)), 
null);
         } else if (type.startsWith("COMMIT")) {
             result = new CommitTXEvent(Long.parseLong(readNextSegment(data)), 
null);
         } else {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
index b608e04e5c5..825b5725d79 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
@@ -27,5 +27,7 @@ import lombok.RequiredArgsConstructor;
 @Getter
 public final class BeginTXEvent extends AbstractWALEvent {
     
-    private final long xid;
+    private final Long xid;
+    
+    private final Long csn;
 }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index dab22d7bd3d..af0c4de1014 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -144,7 +144,7 @@ class WALEventConverterTest {
     
     @Test
     void assertConvertBeginTXEvent() {
-        BeginTXEvent beginTXEvent = new BeginTXEvent(100);
+        BeginTXEvent beginTXEvent = new BeginTXEvent(100L, null);
         beginTXEvent.setLogSequenceNumber(new 
PostgreSQLLogSequenceNumber(logSequenceNumber));
         Record record = walEventConverter.convert(beginTXEvent);
         assertInstanceOf(PlaceholderRecord.class, record);


Reply via email to