This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a24e683  support openGauss mppdb_decoding plugin (#12749)
a24e683 is described below

commit a24e683d0f93f87df8569115d013af78380522fb
Author: justbk2015 <[email protected]>
AuthorDate: Mon Sep 27 18:32:00 2021 +0800

    support openGauss mppdb_decoding plugin (#12749)
    
    * avoid parallel create table throw already exists except to continue 
scaling and add mppdb decoding.
    
    * move MppTableData to dividual java file and modify javadoc
    
    Co-authored-by: justbk2015 <[email protected]>
---
 .../scaling/core/common/record/Column.java         |  14 +-
 .../sqlbuilder/AbstractScalingSQLBuilder.java      |   7 +-
 .../core/common/sqlbuilder/ScalingSQLBuilder.java  |  14 ++
 .../core/executor/importer/AbstractImporter.java   |   2 +-
 .../job/preparer/AbstractDataSourcePreparer.java   |   9 +-
 .../executor/importer/AbstractImporterTest.java    |   3 +
 .../component/OpenGaussScalingSQLBuilder.java      |  21 +-
 .../opengauss/component/OpenGaussWalDumper.java    |  35 +++-
 .../opengauss/wal/OpenGaussLogicalReplication.java |  24 ++-
 .../opengauss/wal/decode/MppTableData.java}        |  55 +++---
 .../opengauss/wal/decode/MppdbDecodingPlugin.java  | 215 +++++++++++++++++++++
 .../wal/decode/MppdbDecodingPluginTest.java        | 141 ++++++++++++++
 12 files changed, 488 insertions(+), 52 deletions(-)

diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
index f411f18..cc84fd8 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
@@ -18,12 +18,11 @@
 package org.apache.shardingsphere.scaling.core.common.record;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 /**
  * Column.
  */
-@RequiredArgsConstructor
 @Getter
 public final class Column {
     
@@ -32,7 +31,8 @@ public final class Column {
     /**
      * Value are available only when the primary key column is updated.
      */
-    private final Object oldValue;
+    @Setter
+    private Object oldValue;
     
     private final Object value;
     
@@ -44,6 +44,14 @@ public final class Column {
         this(name, null, value, updated, primaryKey);
     }
     
+    public Column(final String name, final Object oldValue, final Object 
value, final boolean updated, final boolean primaryKey) {
+        this.name = name;
+        this.oldValue = oldValue;
+        this.value = value;
+        this.updated = updated;
+        this.primaryKey = primaryKey;
+    }
+    
     @Override
     public String toString() {
         return String.format("%s=%s", name, value);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
index b163e9c..130d274 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.common.sqlbuilder;
 
-import com.google.common.collect.Collections2;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -100,7 +99,7 @@ public abstract class AbstractScalingSQLBuilder implements 
ScalingSQLBuilder {
             sqlCacheMap.put(sqlCacheKey, 
buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
         }
         StringBuilder updatedColumnString = new StringBuilder();
-        for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
+        for (Column each : extractUpdatedColumns(dataRecord.getColumns(), 
dataRecord)) {
             updatedColumnString.append(String.format("%s = ?,", 
quote(each.getName())));
         }
         updatedColumnString.setLength(updatedColumnString.length() - 1);
@@ -111,10 +110,6 @@ public abstract class AbstractScalingSQLBuilder implements 
ScalingSQLBuilder {
         return String.format("UPDATE %s SET %%s WHERE %s", quote(tableName), 
buildWhereSQL(conditionColumns));
     }
     
-    private Collection<Column> extractUpdatedColumns(final Collection<Column> 
columns) {
-        return Collections2.filter(columns, Column::isUpdated);
-    }
-    
     @Override
     public String buildDeleteSQL(final DataRecord dataRecord, final 
Collection<Column> conditionColumns) {
         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
index 88198f7..ad2bda5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
@@ -19,8 +19,11 @@ package 
org.apache.shardingsphere.scaling.core.common.sqlbuilder;
 
 import org.apache.shardingsphere.scaling.core.common.record.Column;
 import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.common.record.RecordUtil;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * Scaling SQL builder.
@@ -45,6 +48,17 @@ public interface ScalingSQLBuilder {
     String buildUpdateSQL(DataRecord dataRecord, Collection<Column> 
conditionColumns);
     
     /**
+     * Extract need updated columns.
+     *
+     * @param columns the input columns
+     * @param record the input datarecord
+     * @return the filtered columns.
+     */
+    default List<Column> extractUpdatedColumns(Collection<Column> columns, 
DataRecord record) {
+        return new ArrayList<>(RecordUtil.extractUpdatedColumns(record));
+    }
+    
+    /**
      * Build delete SQL.
      *
      * @param dataRecord data record
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
index cc7627c..da1fe53 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
@@ -182,7 +182,7 @@ public abstract class AbstractImporter extends 
AbstractScalingExecutor implement
     
     private void executeUpdate(final Connection connection, final DataRecord 
record) throws SQLException {
         List<Column> conditionColumns = 
RecordUtil.extractConditionColumns(record, 
importerConfig.getShardingColumnsMap().get(record.getTableName()));
-        List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
+        List<Column> updatedColumns = 
scalingSqlBuilder.extractUpdatedColumns(record.getColumns(), record);
         String updateSql = scalingSqlBuilder.buildUpdateSQL(record, 
conditionColumns);
         try (PreparedStatement ps = connection.prepareStatement(updateSql)) {
             for (int i = 0; i < updatedColumns.size(); i++) {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
index c00e3f0..5c5d8de 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
@@ -60,6 +60,8 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
     
     private static final Pattern PATTERN_ALTER_TABLE = 
Pattern.compile("ALTER\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
     
+    private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple 
primary keys for table", "already exists"};
+    
     private final DataSourceFactory dataSourceFactory = new 
DataSourceFactory();
     
     protected DataSourceWrapper getSourceDataSource(final JobConfiguration 
jobConfig) {
@@ -113,9 +115,12 @@ public abstract class AbstractDataSourcePreparer 
implements DataSourcePreparer {
         try (Statement statement = targetConnection.createStatement()) {
             statement.execute(sql);
         } catch (final SQLException ex) {
-            if (!ex.getMessage().contains("multiple primary keys for table")) {
-                throw ex;
+            for (String ignoreMessage: IGNORE_EXCEPTION_MESSAGE) {
+                if (ex.getMessage().contains(ignoreMessage)) {
+                    return;
+                }
             }
+            throw ex;
         }
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
index 9d0d43e..2996e61 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
@@ -130,6 +131,7 @@ public final class AbstractImporterTest {
         when(scalingSqlBuilder.buildUpdateSQL(updateRecord, 
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), 
anyInt())).thenReturn(mockRecords(updateRecord));
+        when(scalingSqlBuilder.extractUpdatedColumns(any(), 
any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 10);
         verify(preparedStatement).setObject(2, "UPDATE");
@@ -144,6 +146,7 @@ public final class AbstractImporterTest {
         when(scalingSqlBuilder.buildUpdateSQL(updateRecord, 
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), 
anyInt())).thenReturn(mockRecords(updateRecord));
+        when(scalingSqlBuilder.extractUpdatedColumns(any(), 
any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
         jdbcImporter.run();
         InOrder inOrder = inOrder(preparedStatement);
         inOrder.verify(preparedStatement).setObject(1, 2);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
index 23086a2..e0eeda8 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
@@ -17,9 +17,14 @@
 
 package org.apache.shardingsphere.scaling.opengauss.component;
 
+import com.google.common.collect.Collections2;
+import org.apache.shardingsphere.scaling.core.common.record.Column;
 import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
 import 
org.apache.shardingsphere.scaling.core.common.sqlbuilder.AbstractScalingSQLBuilder;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -34,12 +39,12 @@ public final class OpenGaussScalingSQLBuilder extends 
AbstractScalingSQLBuilder
     
     @Override
     public String getLeftIdentifierQuoteString() {
-        return "\"";
+        return "";
     }
     
     @Override
     public String getRightIdentifierQuoteString() {
-        return "\"";
+        return "";
     }
     
     @Override
@@ -47,6 +52,18 @@ public final class OpenGaussScalingSQLBuilder extends 
AbstractScalingSQLBuilder
         return super.buildInsertSQL(dataRecord) + buildConflictSQL();
     }
     
+    @Override
+    public List<Column> extractUpdatedColumns(final Collection<Column> 
columns, final DataRecord record) {
+        return new ArrayList(Collections2.filter(columns, column -> 
!(column.isPrimaryKey()
+                || isShardingColumn(getShardingColumnsMap(), 
record.getTableName(), column.getName()))));
+    }
+    
+    private boolean isShardingColumn(final Map<String, Set<String>> 
shardingColumnsMap,
+                                     final String tableName, final String 
columnName) {
+        return shardingColumnsMap.containsKey(tableName)
+                && shardingColumnsMap.get(tableName).contains(columnName);
+    }
+    
     private String buildConflictSQL() {
         // there need return ON DUPLICATE KEY UPDATE NOTHING after support 
this syntax.
         return "";
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
index 89d77b8..8ad7003 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
@@ -20,7 +20,10 @@ package 
org.apache.shardingsphere.scaling.opengauss.component;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.common.channel.Channel;
+import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
 import 
org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
+import org.apache.shardingsphere.scaling.core.common.record.Column;
+import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.common.record.Record;
 import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
 import 
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
@@ -29,11 +32,12 @@ import 
org.apache.shardingsphere.scaling.core.executor.dumper.IncrementalDumper;
 import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
 import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
 import 
org.apache.shardingsphere.scaling.opengauss.wal.OpenGaussLogicalReplication;
+import 
org.apache.shardingsphere.scaling.opengauss.wal.decode.MppdbDecodingPlugin;
 import 
org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussTimestampUtils;
 import 
org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussLogSequenceNumber;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
-import 
org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
 import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
 import org.opengauss.jdbc.PgConnection;
@@ -57,6 +61,8 @@ public final class OpenGaussWalDumper extends 
AbstractScalingExecutor implements
     private final OpenGaussLogicalReplication logicalReplication = new 
OpenGaussLogicalReplication();
 
     private final WalEventConverter walEventConverter;
+    
+    private String slotName = OpenGaussLogicalReplication.SLOT_NAME_PREFIX;
 
     @Setter
     private Channel channel;
@@ -82,14 +88,15 @@ public final class OpenGaussWalDumper extends 
AbstractScalingExecutor implements
                 .unwrap(PgConnection.class);
     }
 
-    private TestDecodingPlugin initReplication() {
-        TestDecodingPlugin plugin = null;
+    private MppdbDecodingPlugin initReplication() {
+        MppdbDecodingPlugin plugin = null;
         try {
             DataSource dataSource = 
dumperConfig.getDataSourceConfig().toDataSource();
             try (Connection conn = dataSource.getConnection()) {
+                slotName = OpenGaussLogicalReplication.getUniqueSlotName(conn);
                 OpenGaussLogicalReplication.createIfNotExists(conn);
                 OpenGaussTimestampUtils utils = new 
OpenGaussTimestampUtils(conn.unwrap(PgConnection.class).getTimestampUtils());
-                plugin = new TestDecodingPlugin(utils);
+                plugin = new MppdbDecodingPlugin(utils);
             }
         } catch (SQLException sqlExp) {
             log.warn("create replication slot failed!");
@@ -98,9 +105,9 @@ public final class OpenGaussWalDumper extends 
AbstractScalingExecutor implements
     }
 
     private void dump() {
-        TestDecodingPlugin decodingPlugin = initReplication();
+        DecodingPlugin decodingPlugin = initReplication();
         try (PgConnection pgConnection = getReplicationConn()) {
-            PGReplicationStream stream = 
logicalReplication.createReplicationStream(pgConnection, 
walPosition.getLogSequenceNumber());
+            PGReplicationStream stream = 
logicalReplication.createReplicationStream(pgConnection, 
walPosition.getLogSequenceNumber(), slotName);
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
@@ -113,6 +120,7 @@ public final class OpenGaussWalDumper extends 
AbstractScalingExecutor implements
                 if (!(event instanceof PlaceholderEvent) && 
log.isDebugEnabled()) {
                     log.debug("dump, event={}, record={}", event, record);
                 }
+                updateRecordOldValue(record);
                 pushRecord(record);
             }
         } catch (final SQLException ex) {
@@ -123,6 +131,21 @@ public final class OpenGaussWalDumper extends 
AbstractScalingExecutor implements
         }
     }
 
+    private void updateRecordOldValue(final Record record) {
+        if (!(record instanceof DataRecord)) {
+            return;
+        }
+        DataRecord dataRecord = (DataRecord) record;
+        if (!ScalingConstant.UPDATE.equals(dataRecord.getType())) {
+            return;
+        }
+        for (Column col: dataRecord.getColumns()) {
+            if (col.isPrimaryKey() && col.isUpdated()) {
+                col.setOldValue(col.getValue());
+            }
+        }
+    }
+    
     private void pushRecord(final Record record) {
         try {
             channel.pushRecord(record);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
index 74e4d55..35a066c 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
@@ -38,7 +38,7 @@ import java.util.Properties;
  */
 public final class OpenGaussLogicalReplication {
 
-    public static final String SLOT_NAME = "sharding_scaling";
+    public static final String SLOT_NAME_PREFIX = "sharding_scaling";
 
     public static final String DECODE_PLUGIN = "mppdb_decoding";
 
@@ -70,14 +70,15 @@ public final class OpenGaussLogicalReplication {
      *
      * @param pgConnection OpenGauss connection
      * @param startPosition start position
+     * @param slotName the setted slotName
      * @return replication stream
      * @throws SQLException sql exception
      */
-    public PGReplicationStream createReplicationStream(final PgConnection 
pgConnection, final BaseLogSequenceNumber startPosition) throws SQLException {
+    public PGReplicationStream createReplicationStream(final PgConnection 
pgConnection, final BaseLogSequenceNumber startPosition, final String slotName) 
throws SQLException {
         return pgConnection.getReplicationAPI()
                 .replicationStream()
                 .logical()
-                .withSlotName(SLOT_NAME)
+                .withSlotName(slotName)
                 .withSlotOption("include-xids", true)
                 .withSlotOption("skip-empty-xacts", true)
                 .withStartPosition((LogSequenceNumber) startPosition.get())
@@ -104,7 +105,7 @@ public final class OpenGaussLogicalReplication {
      * @throws SQLException drop sql with error
      */
     public static void dropSlot(final Connection conn) throws SQLException {
-        String sql = String.format("select * from 
pg_drop_replication_slot('%s')", SLOT_NAME);
+        String sql = String.format("select * from 
pg_drop_replication_slot('%s')", getUniqueSlotName(conn));
         try (CallableStatement cs = conn.prepareCall(sql)) {
             cs.execute();
         }
@@ -113,7 +114,7 @@ public final class OpenGaussLogicalReplication {
     private static boolean isSlotNameExist(final Connection conn) throws 
SQLException {
         String sql = "select * from pg_replication_slots where slot_name=?";
         try (PreparedStatement ps = conn.prepareStatement(sql)) {
-            ps.setString(1, SLOT_NAME);
+            ps.setString(1, getUniqueSlotName(conn));
             try (ResultSet rs = ps.executeQuery()) {
                 return rs.next();
             }
@@ -123,7 +124,7 @@ public final class OpenGaussLogicalReplication {
     private static void createSlotBySql(final Connection connection) throws 
SQLException {
         try (PreparedStatement ps = connection.prepareStatement(
                 String.format("SELECT * FROM 
pg_create_logical_replication_slot('%s', '%s')",
-                        SLOT_NAME,
+                        getUniqueSlotName(connection),
                         DECODE_PLUGIN))) {
             ps.execute();
         } catch (final PSQLException ex) {
@@ -132,4 +133,15 @@ public final class OpenGaussLogicalReplication {
             }
         }
     }
+    
+    /**
+     * Get the unique slot name by connection.
+     *
+     * @param conn the connection
+     * @return the unique name by connection
+     * @throws SQLException failed when getCatalog
+     */
+    public static String getUniqueSlotName(final Connection conn) throws 
SQLException {
+        return String.format("%s_%s", SLOT_NAME_PREFIX, conn.getCatalog());
+    }
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppTableData.java
similarity index 52%
copy from 
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
copy to 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppTableData.java
index f411f18..a01cde5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppTableData.java
@@ -15,37 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.record;
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
 
+import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 /**
- * Column.
+ * Mppdb decoding Gson related class.
  */
-@RequiredArgsConstructor
+@Setter
 @Getter
-public final class Column {
+public final class MppTableData {
     
-    private final String name;
-    
-    /**
-     * Value are available only when the primary key column is updated.
-     */
-    private final Object oldValue;
-    
-    private final Object value;
-    
-    private final boolean updated;
-    
-    private final boolean primaryKey;
-    
-    public Column(final String name, final Object value, final boolean 
updated, final boolean primaryKey) {
-        this(name, null, value, updated, primaryKey);
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("%s=%s", name, value);
-    }
+    @SerializedName("table_name")
+    private String tableName;
+
+    @SerializedName("op_type")
+    private String opType;
+
+    @SerializedName("columns_name")
+    private String[] columnsName;
+
+    @SerializedName("columns_type")
+    private String[] columnsType;
+
+    @SerializedName("columns_val")
+    private String[] columnsVal;
+
+    @SerializedName("old_keys_name")
+    private String[] oldKeysName;
+
+    @SerializedName("old_keys_type")
+    private String[] oldKeysType;
+
+    @SerializedName("old_keys_val")
+    private String[] oldKeysVal;
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPlugin.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPlugin.java
new file mode 100644
index 0000000..5c6a193
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPlugin.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
+
+import com.google.gson.Gson;
+import lombok.AllArgsConstructor;
+import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
+import 
org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
+import 
org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
+import 
org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseTimestampUtils;
+import 
org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingException;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Mppdb decoding plugin in openGauss.
+ */
+@AllArgsConstructor
+public final class MppdbDecodingPlugin implements DecodingPlugin {
+    
+    private final BaseTimestampUtils timestampUtils;
+    
+    @Override
+    public AbstractWalEvent decode(final ByteBuffer data, final 
BaseLogSequenceNumber logSequenceNumber) {
+        AbstractWalEvent result;
+        char eventType = readOneChar(data);
+        if ('{' == eventType) {
+            result = readTableEvent(readMppData(data));
+        } else {
+            result = new PlaceholderEvent();
+        }
+        result.setLogSequenceNumber(logSequenceNumber);
+        return result;
+    }
+    
+    private char readOneChar(final ByteBuffer data) {
+        return (char) data.get();
+    }
+    
+    private String readMppData(final ByteBuffer data) {
+        StringBuilder mppData = new StringBuilder();
+        mppData.append('{');
+        int depth = 1;
+        while (depth != 0 && data.hasRemaining()) {
+            char next = (char) data.get();
+            mppData.append(next);
+            int optDepth = '{' == next ? 1 : ('}' == next ? -1 : 0);
+            depth += optDepth;
+        }
+        return mppData.toString();
+    }
+    
+    private AbstractRowEvent readTableEvent(final String mppData) {
+        Gson mppDataGson = new Gson();
+        MppTableData mppTableData = mppDataGson.fromJson(mppData, 
MppTableData.class);
+        AbstractRowEvent result;
+        String rowEventType = mppTableData.getOpType();
+        switch (rowEventType) {
+            case ScalingConstant.INSERT:
+                result = readWriteRowEvent(mppTableData);
+                break;
+            case ScalingConstant.UPDATE:
+                result = readUpdateRowEvent(mppTableData);
+                break;
+            case ScalingConstant.DELETE:
+                result = readDeleteRowEvent(mppTableData);
+                break;
+            default:
+                throw new ScalingTaskExecuteException("");
+        }
+        String[] tableMetaData = mppTableData.getTableName().split("\\.");
+        result.setSchemaName(tableMetaData[0]);
+        result.setTableName(tableMetaData[1]);
+        return result;
+    }
+    
+    private AbstractRowEvent readWriteRowEvent(final MppTableData data) {
+        WriteRowEvent result = new WriteRowEvent();
+        result.setAfterRow(getColumnDataFromMppDataEvent(data));
+        return result;
+    }
+    
+    private AbstractRowEvent readUpdateRowEvent(final MppTableData data) {
+        UpdateRowEvent result = new UpdateRowEvent();
+        result.setAfterRow(getColumnDataFromMppDataEvent(data));
+        return result;
+    }
+    
+    private AbstractRowEvent readDeleteRowEvent(final MppTableData data) {
+        DeleteRowEvent result = new DeleteRowEvent();
+        result.setPrimaryKeys(getDeleteColumnDataFromMppDataEvent(data));
+        return result;
+    }
+
+    private List<Object> getColumnDataFromMppDataEvent(final MppTableData 
data) {
+        List<Object> columns = new LinkedList<>();
+    
+        for (int i = 0; i < data.getColumnsType().length; i++) {
+            columns.add(readColumnData(data.getColumnsVal()[i], 
data.getColumnsType()[i]));
+        }
+        return columns;
+    }
+    
+    private List<Object> getDeleteColumnDataFromMppDataEvent(final 
MppTableData data) {
+        List<Object> columns = new LinkedList<>();
+        
+        for (int i = 0; i < data.getOldKeysType().length; i++) {
+            columns.add(readColumnData(data.getOldKeysVal()[i], 
data.getOldKeysType()[i]));
+        }
+        return columns;
+    }
+    
+    private Object readColumnData(final String data, final String columnType) {
+        if (columnType.startsWith("numeric")) {
+            return new BigDecimal(data);
+        }
+        if (columnType.startsWith("bit") || columnType.startsWith("bit 
varying")) {
+            return data;
+        }
+        switch (columnType) {
+            case "smallint":
+                return Short.parseShort(data);
+            case "integer":
+                return Integer.parseInt(data);
+            case "bigint":
+                return Long.parseLong(data);
+            case "real":
+                return Float.parseFloat(data);
+            case "double precision":
+                return Double.parseDouble(data);
+            case "boolean":
+                return Boolean.parseBoolean(data);
+            case "time without time zone":
+                try {
+                    return timestampUtils.toTime(null, data);
+                } catch (final SQLException ex) {
+                    throw new DecodingException(ex);
+                }
+            case "date":
+                return Date.valueOf(data);
+            case "timestamp without time zone":
+                try {
+                    return timestampUtils.toTimestamp(null, data);
+                } catch (final SQLException ex) {
+                    throw new DecodingException(ex);
+                }
+            case "bytea":
+                return decodeHex(data.substring(2));
+            case "character varying":
+                return decodeString(data);
+            default:
+                return data;
+        }
+    }
+    
+    private static String decodeString(final String data) {
+        if (data.length() > 1) {
+            int begin = data.charAt(0) == '\'' ? 1 : 0;
+            int end = data.length() + (data.charAt(data.length() - 1) == '\'' 
? -1 : 0);
+            return data.substring(begin, end);
+        }
+        return data;
+    }
+    
+    private byte[] decodeHex(final String hexString) {
+        int dataLength = hexString.length();
+        if (0 != (dataLength & 1)) {
+            throw new IllegalArgumentException(String.format("Illegal hex data 
%s", hexString));
+        }
+        if (0 == dataLength) {
+            return new byte[0];
+        }
+        byte[] result = new byte[dataLength >>> 1];
+        for (int i = 0; i < dataLength; i += 2) {
+            result[i >>> 1] = decodeHexByte(hexString, i);
+        }
+        return result;
+    }
+    
+    private byte decodeHexByte(final String hexString, final int index) {
+        int firstHexChar = Character.digit(hexString.charAt(index), 16);
+        int secondHexChar = Character.digit(hexString.charAt(index + 1), 16);
+        if (-1 == firstHexChar || -1 == secondHexChar) {
+            throw new IllegalArgumentException(String.format("Illegal hex byte 
'%s' in index %d", hexString, index));
+        }
+        return (byte) ((firstHexChar << 4) + secondHexChar);
+    }
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPluginTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPluginTest.java
new file mode 100644
index 0000000..e74e053
--- /dev/null
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPluginTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
+
+import com.google.gson.Gson;
+import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
+import 
org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingException;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
+import org.junit.Test;
+import org.opengauss.jdbc.TimestampUtils;
+import org.opengauss.replication.LogSequenceNumber;
+
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class MppdbDecodingPluginTest {
+
+    private final LogSequenceNumber pgSequenceNumber = 
LogSequenceNumber.valueOf("0/14EFDB8");
+
+    private final OpenGaussLogSequenceNumber logSequenceNumber = new 
OpenGaussLogSequenceNumber(pgSequenceNumber);
+    
+    @Test
+    public void assertDecodeWriteRowEvent() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"character varyint"});
+        tableData.setColumnsVal(new String[]{"1 2 3"});
+        ByteBuffer data = ByteBuffer.wrap(new 
Gson().toJson(tableData).getBytes());
+        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+        assertThat(actual.getTableName(), is("test"));
+        assertThat(actual.getAfterRow().get(0), is("1 2 3"));
+    }
+    
+    @Test
+    public void assertDecodeUpdateRowEvent() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("UPDATE");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"character varyint"});
+        tableData.setColumnsVal(new String[]{"1 2 3"});
+        ByteBuffer data = ByteBuffer.wrap(new 
Gson().toJson(tableData).getBytes());
+        UpdateRowEvent actual = (UpdateRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+        assertThat(actual.getTableName(), is("test"));
+        assertThat(actual.getAfterRow().get(0), is("1 2 3"));
+    }
+    
+    @Test
+    public void assertDecodeDeleteRowEvent() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("DELETE");
+        tableData.setOldKeysName(new String[]{"data"});
+        tableData.setOldKeysType(new String[]{"integer"});
+        tableData.setOldKeysVal(new String[]{"1"});
+        ByteBuffer data = ByteBuffer.wrap(new 
Gson().toJson(tableData).getBytes());
+        DeleteRowEvent actual = (DeleteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+        assertThat(actual.getTableName(), is("test"));
+        assertThat(actual.getPrimaryKeys().get(0), is(1));
+    }
+    
+    @Test
+    public void assertDecodeWriteRowEventWithByteA() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"bytea"});
+        tableData.setColumnsVal(new String[]{"\\xff00ab"});
+        ByteBuffer data = ByteBuffer.wrap(new 
Gson().toJson(tableData).getBytes());
+        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+        assertThat(actual.getTableName(), is("test"));
+        assertThat(actual.getAfterRow().get(0), is(new byte[]{(byte) 0xff, 
(byte) 0, (byte) 0xab}));
+    }
+    
+    @Test
+    public void assertDecodeUnknownTableType() {
+        ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
+        AbstractWalEvent actual = new MppdbDecodingPlugin(null).decode(data, 
logSequenceNumber);
+        assertTrue(actual instanceof PlaceholderEvent);
+    }
+    
+    @Test(expected = ScalingTaskExecuteException.class)
+    public void assertDecodeUnknownRowEventType() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("UNKNOWN");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"character varying"});
+        tableData.setColumnsVal(new String[]{"1 2 3"});
+        ByteBuffer data = ByteBuffer.wrap(new 
Gson().toJson(tableData).getBytes());
+        new MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+    }
+    
+    @Test(expected = DecodingException.class)
+    @SneakyThrows(SQLException.class)
+    public void assertDecodeTime() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"time without time zone"});
+        tableData.setColumnsVal(new String[]{"1 2 3'"});
+        TimestampUtils timestampUtils = mock(TimestampUtils.class);
+        when(timestampUtils.toTime(null, "1 2 3'")).thenThrow(new 
SQLException(""));
+        ByteBuffer data = ByteBuffer.wrap(new 
Gson().toJson(tableData).getBytes());
+        new MppdbDecodingPlugin(new 
OpenGaussTimestampUtils(timestampUtils)).decode(data, logSequenceNumber);
+    }
+}

Reply via email to