This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a24e683 support openGauss mppdb_decoding plugin (#12749)
a24e683 is described below
commit a24e683d0f93f87df8569115d013af78380522fb
Author: justbk2015 <[email protected]>
AuthorDate: Mon Sep 27 18:32:00 2021 +0800
support openGauss mppdb_decoding plugin (#12749)
* avoid parallel create table throw already exists except to continue
scaling and add mppdb decoding.
* move MppTableData to dividual java file and modify javadoc
Co-authored-by: justbk2015 <[email protected]>
---
.../scaling/core/common/record/Column.java | 14 +-
.../sqlbuilder/AbstractScalingSQLBuilder.java | 7 +-
.../core/common/sqlbuilder/ScalingSQLBuilder.java | 14 ++
.../core/executor/importer/AbstractImporter.java | 2 +-
.../job/preparer/AbstractDataSourcePreparer.java | 9 +-
.../executor/importer/AbstractImporterTest.java | 3 +
.../component/OpenGaussScalingSQLBuilder.java | 21 +-
.../opengauss/component/OpenGaussWalDumper.java | 35 +++-
.../opengauss/wal/OpenGaussLogicalReplication.java | 24 ++-
.../opengauss/wal/decode/MppTableData.java} | 55 +++---
.../opengauss/wal/decode/MppdbDecodingPlugin.java | 215 +++++++++++++++++++++
.../wal/decode/MppdbDecodingPluginTest.java | 141 ++++++++++++++
12 files changed, 488 insertions(+), 52 deletions(-)
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
index f411f18..cc84fd8 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
@@ -18,12 +18,11 @@
package org.apache.shardingsphere.scaling.core.common.record;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
/**
* Column.
*/
-@RequiredArgsConstructor
@Getter
public final class Column {
@@ -32,7 +31,8 @@ public final class Column {
/**
* Value are available only when the primary key column is updated.
*/
- private final Object oldValue;
+ @Setter
+ private Object oldValue;
private final Object value;
@@ -44,6 +44,14 @@ public final class Column {
this(name, null, value, updated, primaryKey);
}
+ public Column(final String name, final Object oldValue, final Object
value, final boolean updated, final boolean primaryKey) {
+ this.name = name;
+ this.oldValue = oldValue;
+ this.value = value;
+ this.updated = updated;
+ this.primaryKey = primaryKey;
+ }
+
@Override
public String toString() {
return String.format("%s=%s", name, value);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
index b163e9c..130d274 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/AbstractScalingSQLBuilder.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.common.sqlbuilder;
-import com.google.common.collect.Collections2;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -100,7 +99,7 @@ public abstract class AbstractScalingSQLBuilder implements
ScalingSQLBuilder {
sqlCacheMap.put(sqlCacheKey,
buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
}
StringBuilder updatedColumnString = new StringBuilder();
- for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
+ for (Column each : extractUpdatedColumns(dataRecord.getColumns(),
dataRecord)) {
updatedColumnString.append(String.format("%s = ?,",
quote(each.getName())));
}
updatedColumnString.setLength(updatedColumnString.length() - 1);
@@ -111,10 +110,6 @@ public abstract class AbstractScalingSQLBuilder implements
ScalingSQLBuilder {
return String.format("UPDATE %s SET %%s WHERE %s", quote(tableName),
buildWhereSQL(conditionColumns));
}
- private Collection<Column> extractUpdatedColumns(final Collection<Column>
columns) {
- return Collections2.filter(columns, Column::isUpdated);
- }
-
@Override
public String buildDeleteSQL(final DataRecord dataRecord, final
Collection<Column> conditionColumns) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
index 88198f7..ad2bda5 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/sqlbuilder/ScalingSQLBuilder.java
@@ -19,8 +19,11 @@ package
org.apache.shardingsphere.scaling.core.common.sqlbuilder;
import org.apache.shardingsphere.scaling.core.common.record.Column;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.common.record.RecordUtil;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
/**
* Scaling SQL builder.
@@ -45,6 +48,17 @@ public interface ScalingSQLBuilder {
String buildUpdateSQL(DataRecord dataRecord, Collection<Column>
conditionColumns);
/**
+ * Extract need updated columns.
+ *
+ * @param columns the input columns
+ * @param record the input datarecord
+ * @return the filtered columns.
+ */
+ default List<Column> extractUpdatedColumns(Collection<Column> columns,
DataRecord record) {
+ return new ArrayList<>(RecordUtil.extractUpdatedColumns(record));
+ }
+
+ /**
* Build delete SQL.
*
* @param dataRecord data record
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
index cc7627c..da1fe53 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporter.java
@@ -182,7 +182,7 @@ public abstract class AbstractImporter extends
AbstractScalingExecutor implement
private void executeUpdate(final Connection connection, final DataRecord
record) throws SQLException {
List<Column> conditionColumns =
RecordUtil.extractConditionColumns(record,
importerConfig.getShardingColumnsMap().get(record.getTableName()));
- List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
+ List<Column> updatedColumns =
scalingSqlBuilder.extractUpdatedColumns(record.getColumns(), record);
String updateSql = scalingSqlBuilder.buildUpdateSQL(record,
conditionColumns);
try (PreparedStatement ps = connection.prepareStatement(updateSql)) {
for (int i = 0; i < updatedColumns.size(); i++) {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
index c00e3f0..5c5d8de 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/AbstractDataSourcePreparer.java
@@ -60,6 +60,8 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
private static final Pattern PATTERN_ALTER_TABLE =
Pattern.compile("ALTER\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
+ private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple
primary keys for table", "already exists"};
+
private final DataSourceFactory dataSourceFactory = new
DataSourceFactory();
protected DataSourceWrapper getSourceDataSource(final JobConfiguration
jobConfig) {
@@ -113,9 +115,12 @@ public abstract class AbstractDataSourcePreparer
implements DataSourcePreparer {
try (Statement statement = targetConnection.createStatement()) {
statement.execute(sql);
} catch (final SQLException ex) {
- if (!ex.getMessage().contains("multiple primary keys for table")) {
- throw ex;
+ for (String ignoreMessage: IGNORE_EXCEPTION_MESSAGE) {
+ if (ex.getMessage().contains(ignoreMessage)) {
+ return;
+ }
}
+ throw ex;
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
index 9d0d43e..2996e61 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/executor/importer/AbstractImporterTest.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
@@ -130,6 +131,7 @@ public final class AbstractImporterTest {
when(scalingSqlBuilder.buildUpdateSQL(updateRecord,
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(),
anyInt())).thenReturn(mockRecords(updateRecord));
+ when(scalingSqlBuilder.extractUpdatedColumns(any(),
any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 10);
verify(preparedStatement).setObject(2, "UPDATE");
@@ -144,6 +146,7 @@ public final class AbstractImporterTest {
when(scalingSqlBuilder.buildUpdateSQL(updateRecord,
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(),
anyInt())).thenReturn(mockRecords(updateRecord));
+ when(scalingSqlBuilder.extractUpdatedColumns(any(),
any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
jdbcImporter.run();
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
index 23086a2..e0eeda8 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussScalingSQLBuilder.java
@@ -17,9 +17,14 @@
package org.apache.shardingsphere.scaling.opengauss.component;
+import com.google.common.collect.Collections2;
+import org.apache.shardingsphere.scaling.core.common.record.Column;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import
org.apache.shardingsphere.scaling.core.common.sqlbuilder.AbstractScalingSQLBuilder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,12 +39,12 @@ public final class OpenGaussScalingSQLBuilder extends
AbstractScalingSQLBuilder
@Override
public String getLeftIdentifierQuoteString() {
- return "\"";
+ return "";
}
@Override
public String getRightIdentifierQuoteString() {
- return "\"";
+ return "";
}
@Override
@@ -47,6 +52,18 @@ public final class OpenGaussScalingSQLBuilder extends
AbstractScalingSQLBuilder
return super.buildInsertSQL(dataRecord) + buildConflictSQL();
}
+ @Override
+ public List<Column> extractUpdatedColumns(final Collection<Column>
columns, final DataRecord record) {
+ return new ArrayList(Collections2.filter(columns, column ->
!(column.isPrimaryKey()
+ || isShardingColumn(getShardingColumnsMap(),
record.getTableName(), column.getName()))));
+ }
+
+ private boolean isShardingColumn(final Map<String, Set<String>>
shardingColumnsMap,
+ final String tableName, final String
columnName) {
+ return shardingColumnsMap.containsKey(tableName)
+ && shardingColumnsMap.get(tableName).contains(columnName);
+ }
+
private String buildConflictSQL() {
// there need return ON DUPLICATE KEY UPDATE NOTHING after support
this syntax.
return "";
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
index 89d77b8..8ad7003 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.java
@@ -20,7 +20,10 @@ package
org.apache.shardingsphere.scaling.opengauss.component;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.common.channel.Channel;
+import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
import
org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
+import org.apache.shardingsphere.scaling.core.common.record.Column;
+import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import org.apache.shardingsphere.scaling.core.common.record.Record;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
@@ -29,11 +32,12 @@ import
org.apache.shardingsphere.scaling.core.executor.dumper.IncrementalDumper;
import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import
org.apache.shardingsphere.scaling.opengauss.wal.OpenGaussLogicalReplication;
+import
org.apache.shardingsphere.scaling.opengauss.wal.decode.MppdbDecodingPlugin;
import
org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussTimestampUtils;
import
org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
-import
org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
import org.opengauss.jdbc.PgConnection;
@@ -57,6 +61,8 @@ public final class OpenGaussWalDumper extends
AbstractScalingExecutor implements
private final OpenGaussLogicalReplication logicalReplication = new
OpenGaussLogicalReplication();
private final WalEventConverter walEventConverter;
+
+ private String slotName = OpenGaussLogicalReplication.SLOT_NAME_PREFIX;
@Setter
private Channel channel;
@@ -82,14 +88,15 @@ public final class OpenGaussWalDumper extends
AbstractScalingExecutor implements
.unwrap(PgConnection.class);
}
- private TestDecodingPlugin initReplication() {
- TestDecodingPlugin plugin = null;
+ private MppdbDecodingPlugin initReplication() {
+ MppdbDecodingPlugin plugin = null;
try {
DataSource dataSource =
dumperConfig.getDataSourceConfig().toDataSource();
try (Connection conn = dataSource.getConnection()) {
+ slotName = OpenGaussLogicalReplication.getUniqueSlotName(conn);
OpenGaussLogicalReplication.createIfNotExists(conn);
OpenGaussTimestampUtils utils = new
OpenGaussTimestampUtils(conn.unwrap(PgConnection.class).getTimestampUtils());
- plugin = new TestDecodingPlugin(utils);
+ plugin = new MppdbDecodingPlugin(utils);
}
} catch (SQLException sqlExp) {
log.warn("create replication slot failed!");
@@ -98,9 +105,9 @@ public final class OpenGaussWalDumper extends
AbstractScalingExecutor implements
}
private void dump() {
- TestDecodingPlugin decodingPlugin = initReplication();
+ DecodingPlugin decodingPlugin = initReplication();
try (PgConnection pgConnection = getReplicationConn()) {
- PGReplicationStream stream =
logicalReplication.createReplicationStream(pgConnection,
walPosition.getLogSequenceNumber());
+ PGReplicationStream stream =
logicalReplication.createReplicationStream(pgConnection,
walPosition.getLogSequenceNumber(), slotName);
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
@@ -113,6 +120,7 @@ public final class OpenGaussWalDumper extends
AbstractScalingExecutor implements
if (!(event instanceof PlaceholderEvent) &&
log.isDebugEnabled()) {
log.debug("dump, event={}, record={}", event, record);
}
+ updateRecordOldValue(record);
pushRecord(record);
}
} catch (final SQLException ex) {
@@ -123,6 +131,21 @@ public final class OpenGaussWalDumper extends
AbstractScalingExecutor implements
}
}
+ private void updateRecordOldValue(final Record record) {
+ if (!(record instanceof DataRecord)) {
+ return;
+ }
+ DataRecord dataRecord = (DataRecord) record;
+ if (!ScalingConstant.UPDATE.equals(dataRecord.getType())) {
+ return;
+ }
+ for (Column col: dataRecord.getColumns()) {
+ if (col.isPrimaryKey() && col.isUpdated()) {
+ col.setOldValue(col.getValue());
+ }
+ }
+ }
+
private void pushRecord(final Record record) {
try {
channel.pushRecord(record);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
index 74e4d55..35a066c 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
@@ -38,7 +38,7 @@ import java.util.Properties;
*/
public final class OpenGaussLogicalReplication {
- public static final String SLOT_NAME = "sharding_scaling";
+ public static final String SLOT_NAME_PREFIX = "sharding_scaling";
public static final String DECODE_PLUGIN = "mppdb_decoding";
@@ -70,14 +70,15 @@ public final class OpenGaussLogicalReplication {
*
* @param pgConnection OpenGauss connection
* @param startPosition start position
+ * @param slotName the setted slotName
* @return replication stream
* @throws SQLException sql exception
*/
- public PGReplicationStream createReplicationStream(final PgConnection
pgConnection, final BaseLogSequenceNumber startPosition) throws SQLException {
+ public PGReplicationStream createReplicationStream(final PgConnection
pgConnection, final BaseLogSequenceNumber startPosition, final String slotName)
throws SQLException {
return pgConnection.getReplicationAPI()
.replicationStream()
.logical()
- .withSlotName(SLOT_NAME)
+ .withSlotName(slotName)
.withSlotOption("include-xids", true)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition((LogSequenceNumber) startPosition.get())
@@ -104,7 +105,7 @@ public final class OpenGaussLogicalReplication {
* @throws SQLException drop sql with error
*/
public static void dropSlot(final Connection conn) throws SQLException {
- String sql = String.format("select * from
pg_drop_replication_slot('%s')", SLOT_NAME);
+ String sql = String.format("select * from
pg_drop_replication_slot('%s')", getUniqueSlotName(conn));
try (CallableStatement cs = conn.prepareCall(sql)) {
cs.execute();
}
@@ -113,7 +114,7 @@ public final class OpenGaussLogicalReplication {
private static boolean isSlotNameExist(final Connection conn) throws
SQLException {
String sql = "select * from pg_replication_slots where slot_name=?";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
- ps.setString(1, SLOT_NAME);
+ ps.setString(1, getUniqueSlotName(conn));
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
@@ -123,7 +124,7 @@ public final class OpenGaussLogicalReplication {
private static void createSlotBySql(final Connection connection) throws
SQLException {
try (PreparedStatement ps = connection.prepareStatement(
String.format("SELECT * FROM
pg_create_logical_replication_slot('%s', '%s')",
- SLOT_NAME,
+ getUniqueSlotName(connection),
DECODE_PLUGIN))) {
ps.execute();
} catch (final PSQLException ex) {
@@ -132,4 +133,15 @@ public final class OpenGaussLogicalReplication {
}
}
}
+
+ /**
+ * Get the unique slot name by connection.
+ *
+ * @param conn the connection
+ * @return the unique name by connection
+ * @throws SQLException failed when getCatalog
+ */
+ public static String getUniqueSlotName(final Connection conn) throws
SQLException {
+ return String.format("%s_%s", SLOT_NAME_PREFIX, conn.getCatalog());
+ }
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppTableData.java
similarity index 52%
copy from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
copy to
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppTableData.java
index f411f18..a01cde5 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/record/Column.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppTableData.java
@@ -15,37 +15,40 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.record;
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
+import com.google.gson.annotations.SerializedName;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
/**
- * Column.
+ * Mppdb decoding Gson related class.
*/
-@RequiredArgsConstructor
+@Setter
@Getter
-public final class Column {
+public final class MppTableData {
- private final String name;
-
- /**
- * Value are available only when the primary key column is updated.
- */
- private final Object oldValue;
-
- private final Object value;
-
- private final boolean updated;
-
- private final boolean primaryKey;
-
- public Column(final String name, final Object value, final boolean
updated, final boolean primaryKey) {
- this(name, null, value, updated, primaryKey);
- }
-
- @Override
- public String toString() {
- return String.format("%s=%s", name, value);
- }
+ @SerializedName("table_name")
+ private String tableName;
+
+ @SerializedName("op_type")
+ private String opType;
+
+ @SerializedName("columns_name")
+ private String[] columnsName;
+
+ @SerializedName("columns_type")
+ private String[] columnsType;
+
+ @SerializedName("columns_val")
+ private String[] columnsVal;
+
+ @SerializedName("old_keys_name")
+ private String[] oldKeysName;
+
+ @SerializedName("old_keys_type")
+ private String[] oldKeysType;
+
+ @SerializedName("old_keys_val")
+ private String[] oldKeysVal;
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPlugin.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPlugin.java
new file mode 100644
index 0000000..5c6a193
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPlugin.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
+
+import com.google.gson.Gson;
+import lombok.AllArgsConstructor;
+import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
+import
org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
+import
org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseLogSequenceNumber;
+import
org.apache.shardingsphere.scaling.postgresql.wal.decode.BaseTimestampUtils;
+import
org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingException;
+import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Mppdb decoding plugin in openGauss.
+ */
+@AllArgsConstructor
+public final class MppdbDecodingPlugin implements DecodingPlugin {
+
+ private final BaseTimestampUtils timestampUtils;
+
+ @Override
+ public AbstractWalEvent decode(final ByteBuffer data, final
BaseLogSequenceNumber logSequenceNumber) {
+ AbstractWalEvent result;
+ char eventType = readOneChar(data);
+ if ('{' == eventType) {
+ result = readTableEvent(readMppData(data));
+ } else {
+ result = new PlaceholderEvent();
+ }
+ result.setLogSequenceNumber(logSequenceNumber);
+ return result;
+ }
+
+ private char readOneChar(final ByteBuffer data) {
+ return (char) data.get();
+ }
+
+ private String readMppData(final ByteBuffer data) {
+ StringBuilder mppData = new StringBuilder();
+ mppData.append('{');
+ int depth = 1;
+ while (depth != 0 && data.hasRemaining()) {
+ char next = (char) data.get();
+ mppData.append(next);
+ int optDepth = '{' == next ? 1 : ('}' == next ? -1 : 0);
+ depth += optDepth;
+ }
+ return mppData.toString();
+ }
+
+ private AbstractRowEvent readTableEvent(final String mppData) {
+ Gson mppDataGson = new Gson();
+ MppTableData mppTableData = mppDataGson.fromJson(mppData,
MppTableData.class);
+ AbstractRowEvent result;
+ String rowEventType = mppTableData.getOpType();
+ switch (rowEventType) {
+ case ScalingConstant.INSERT:
+ result = readWriteRowEvent(mppTableData);
+ break;
+ case ScalingConstant.UPDATE:
+ result = readUpdateRowEvent(mppTableData);
+ break;
+ case ScalingConstant.DELETE:
+ result = readDeleteRowEvent(mppTableData);
+ break;
+ default:
+ throw new ScalingTaskExecuteException("");
+ }
+ String[] tableMetaData = mppTableData.getTableName().split("\\.");
+ result.setSchemaName(tableMetaData[0]);
+ result.setTableName(tableMetaData[1]);
+ return result;
+ }
+
+ private AbstractRowEvent readWriteRowEvent(final MppTableData data) {
+ WriteRowEvent result = new WriteRowEvent();
+ result.setAfterRow(getColumnDataFromMppDataEvent(data));
+ return result;
+ }
+
+ private AbstractRowEvent readUpdateRowEvent(final MppTableData data) {
+ UpdateRowEvent result = new UpdateRowEvent();
+ result.setAfterRow(getColumnDataFromMppDataEvent(data));
+ return result;
+ }
+
+ private AbstractRowEvent readDeleteRowEvent(final MppTableData data) {
+ DeleteRowEvent result = new DeleteRowEvent();
+ result.setPrimaryKeys(getDeleteColumnDataFromMppDataEvent(data));
+ return result;
+ }
+
+ private List<Object> getColumnDataFromMppDataEvent(final MppTableData
data) {
+ List<Object> columns = new LinkedList<>();
+
+ for (int i = 0; i < data.getColumnsType().length; i++) {
+ columns.add(readColumnData(data.getColumnsVal()[i],
data.getColumnsType()[i]));
+ }
+ return columns;
+ }
+
+ private List<Object> getDeleteColumnDataFromMppDataEvent(final
MppTableData data) {
+ List<Object> columns = new LinkedList<>();
+
+ for (int i = 0; i < data.getOldKeysType().length; i++) {
+ columns.add(readColumnData(data.getOldKeysVal()[i],
data.getOldKeysType()[i]));
+ }
+ return columns;
+ }
+
+ private Object readColumnData(final String data, final String columnType) {
+ if (columnType.startsWith("numeric")) {
+ return new BigDecimal(data);
+ }
+ if (columnType.startsWith("bit") || columnType.startsWith("bit
varying")) {
+ return data;
+ }
+ switch (columnType) {
+ case "smallint":
+ return Short.parseShort(data);
+ case "integer":
+ return Integer.parseInt(data);
+ case "bigint":
+ return Long.parseLong(data);
+ case "real":
+ return Float.parseFloat(data);
+ case "double precision":
+ return Double.parseDouble(data);
+ case "boolean":
+ return Boolean.parseBoolean(data);
+ case "time without time zone":
+ try {
+ return timestampUtils.toTime(null, data);
+ } catch (final SQLException ex) {
+ throw new DecodingException(ex);
+ }
+ case "date":
+ return Date.valueOf(data);
+ case "timestamp without time zone":
+ try {
+ return timestampUtils.toTimestamp(null, data);
+ } catch (final SQLException ex) {
+ throw new DecodingException(ex);
+ }
+ case "bytea":
+ return decodeHex(data.substring(2));
+ case "character varying":
+ return decodeString(data);
+ default:
+ return data;
+ }
+ }
+
+ private static String decodeString(final String data) {
+ if (data.length() > 1) {
+ int begin = data.charAt(0) == '\'' ? 1 : 0;
+ int end = data.length() + (data.charAt(data.length() - 1) == '\''
? -1 : 0);
+ return data.substring(begin, end);
+ }
+ return data;
+ }
+
+ private byte[] decodeHex(final String hexString) {
+ int dataLength = hexString.length();
+ if (0 != (dataLength & 1)) {
+ throw new IllegalArgumentException(String.format("Illegal hex data
%s", hexString));
+ }
+ if (0 == dataLength) {
+ return new byte[0];
+ }
+ byte[] result = new byte[dataLength >>> 1];
+ for (int i = 0; i < dataLength; i += 2) {
+ result[i >>> 1] = decodeHexByte(hexString, i);
+ }
+ return result;
+ }
+
+ private byte decodeHexByte(final String hexString, final int index) {
+ int firstHexChar = Character.digit(hexString.charAt(index), 16);
+ int secondHexChar = Character.digit(hexString.charAt(index + 1), 16);
+ if (-1 == firstHexChar || -1 == secondHexChar) {
+ throw new IllegalArgumentException(String.format("Illegal hex byte
'%s' in index %d", hexString, index));
+ }
+ return (byte) ((firstHexChar << 4) + secondHexChar);
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPluginTest.java
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPluginTest.java
new file mode 100644
index 0000000..e74e053
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/test/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/MppdbDecodingPluginTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
+
+import com.google.gson.Gson;
+import lombok.SneakyThrows;
+import
org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
+import
org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingException;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
+import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
+import org.junit.Test;
+import org.opengauss.jdbc.TimestampUtils;
+import org.opengauss.replication.LogSequenceNumber;
+
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class MppdbDecodingPluginTest {
+
+ private final LogSequenceNumber pgSequenceNumber =
LogSequenceNumber.valueOf("0/14EFDB8");
+
+ private final OpenGaussLogSequenceNumber logSequenceNumber = new
OpenGaussLogSequenceNumber(pgSequenceNumber);
+
+ @Test
+ public void assertDecodeWriteRowEvent() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"character varyint"});
+ tableData.setColumnsVal(new String[]{"1 2 3"});
+ ByteBuffer data = ByteBuffer.wrap(new
Gson().toJson(tableData).getBytes());
+ WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+ assertThat(actual.getTableName(), is("test"));
+ assertThat(actual.getAfterRow().get(0), is("1 2 3"));
+ }
+
+ @Test
+ public void assertDecodeUpdateRowEvent() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("UPDATE");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"character varyint"});
+ tableData.setColumnsVal(new String[]{"1 2 3"});
+ ByteBuffer data = ByteBuffer.wrap(new
Gson().toJson(tableData).getBytes());
+ UpdateRowEvent actual = (UpdateRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+ assertThat(actual.getTableName(), is("test"));
+ assertThat(actual.getAfterRow().get(0), is("1 2 3"));
+ }
+
+ @Test
+ public void assertDecodeDeleteRowEvent() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("DELETE");
+ tableData.setOldKeysName(new String[]{"data"});
+ tableData.setOldKeysType(new String[]{"integer"});
+ tableData.setOldKeysVal(new String[]{"1"});
+ ByteBuffer data = ByteBuffer.wrap(new
Gson().toJson(tableData).getBytes());
+ DeleteRowEvent actual = (DeleteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+ assertThat(actual.getTableName(), is("test"));
+ assertThat(actual.getPrimaryKeys().get(0), is(1));
+ }
+
+ @Test
+ public void assertDecodeWriteRowEventWithByteA() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"bytea"});
+ tableData.setColumnsVal(new String[]{"\\xff00ab"});
+ ByteBuffer data = ByteBuffer.wrap(new
Gson().toJson(tableData).getBytes());
+ WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
+ assertThat(actual.getTableName(), is("test"));
+ assertThat(actual.getAfterRow().get(0), is(new byte[]{(byte) 0xff,
(byte) 0, (byte) 0xab}));
+ }
+
+ @Test
+ public void assertDecodeUnknownTableType() {
+ ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
+ AbstractWalEvent actual = new MppdbDecodingPlugin(null).decode(data,
logSequenceNumber);
+ assertTrue(actual instanceof PlaceholderEvent);
+ }
+
+ @Test(expected = ScalingTaskExecuteException.class)
+ public void assertDecodeUnknownRowEventType() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("UNKNOWN");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"character varying"});
+ tableData.setColumnsVal(new String[]{"1 2 3"});
+ ByteBuffer data = ByteBuffer.wrap(new
Gson().toJson(tableData).getBytes());
+ new MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ }
+
+ @Test(expected = DecodingException.class)
+ @SneakyThrows(SQLException.class)
+ public void assertDecodeTime() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"time without time zone"});
+ tableData.setColumnsVal(new String[]{"1 2 3'"});
+ TimestampUtils timestampUtils = mock(TimestampUtils.class);
+ when(timestampUtils.toTime(null, "1 2 3'")).thenThrow(new
SQLException(""));
+ ByteBuffer data = ByteBuffer.wrap(new
Gson().toJson(tableData).getBytes());
+ new MppdbDecodingPlugin(new
OpenGaussTimestampUtils(timestampUtils)).decode(data, logSequenceNumber);
+ }
+}