This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 19322c196f9 Refactor PostgreSQLIngestPositionManager (#32499)
19322c196f9 is described below
commit 19322c196f91b1733f4186de67e04d96ba1c455b
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Aug 14 11:41:38 2024 +0800
Refactor PostgreSQLIngestPositionManager (#32499)
* Refactor OpenGaussIngestPositionManager
* Refactor PostgreSQLIngestPositionManager
* Refactor PostgreSQLIngestPositionManager
* Refactor PostgreSQLIngestPositionManager
---
.../ingest/OpenGaussIngestPositionManager.java | 8 ++--
.../ingest/PostgreSQLIngestPositionManager.java | 54 ++++++++++++----------
.../PostgreSQLIngestPositionManagerTest.java | 2 +-
3 files changed, 34 insertions(+), 30 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
index 8dc1282d05a..1163e8ccc5b 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
@@ -52,13 +52,12 @@ public final class OpenGaussIngestPositionManager
implements DialectIngestPositi
@Override
public WALPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- createSlotIfNotExist(connection, slotNameSuffix);
+ createSlotIfNotExist(connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
return getWALPosition(connection);
}
}
- private void createSlotIfNotExist(final Connection connection, final
String slotNameSuffix) throws SQLException {
- String slotName =
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
+ private void createSlotIfNotExist(final Connection connection, final
String slotName) throws SQLException {
Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection,
slotName);
if (!slotInfo.isPresent()) {
createSlot(connection, slotName);
@@ -95,10 +94,9 @@ public final class OpenGaussIngestPositionManager implements
DialectIngestPositi
private void dropSlotIfExist(final Connection connection, final String
slotName) throws SQLException {
if (!getSlotInfo(connection, slotName).isPresent()) {
- log.info("dropSlotIfExist, slot not exist, ignore, slotName={}",
slotName);
return;
}
- try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * from pg_drop_replication_slot(?)")) {
+ try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
preparedStatement.setString(1, slotName);
preparedStatement.execute();
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
index e3d23caea2d..82e4c507719 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.postgresql.replication.LogSequenceNumber;
@@ -29,6 +30,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Optional;
/**
* Ingest position manager for PostgreSQL.
@@ -54,10 +56,29 @@ public final class PostgreSQLIngestPositionManager
implements DialectIngestPosit
}
private void createSlotIfNotExist(final Connection connection, final
String slotName) throws SQLException {
- if (isSlotExisting(connection, slotName)) {
- log.info("createSlotIfNotExist, slot exist, slotName={}",
slotName);
+ Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection,
slotName);
+ if (!slotInfo.isPresent()) {
+ createSlot(connection, slotName);
return;
}
+ if (null == slotInfo.get().getDatabaseName()) {
+ dropSlotIfExist(connection, slotName);
+ createSlot(connection, slotName);
+ }
+ }
+
+ private Optional<ReplicationSlotInfo> getSlotInfo(final Connection
connection, final String slotName) throws SQLException {
+ String sql = "SELECT slot_name, database FROM pg_replication_slots
WHERE slot_name=? AND plugin=?";
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.setString(2, DECODE_PLUGIN);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ return resultSet.next() ? Optional.of(new
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) :
Optional.empty();
+ }
+ }
+ }
+
+ private void createSlot(final Connection connection, final String
slotName) throws SQLException {
try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?, ?)")) {
preparedStatement.setString(1, slotName);
preparedStatement.setString(2, DECODE_PLUGIN);
@@ -69,14 +90,13 @@ public final class PostgreSQLIngestPositionManager
implements DialectIngestPosit
}
}
- private boolean isSlotExisting(final Connection connection, final String
slotName) throws SQLException {
- String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots
WHERE slot_name=? AND plugin=?";
- try (PreparedStatement preparedStatement =
connection.prepareStatement(checkSlotSQL)) {
+ private void dropSlotIfExist(final Connection connection, final String
slotName) throws SQLException {
+ if (!getSlotInfo(connection, slotName).isPresent()) {
+ return;
+ }
+ try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
preparedStatement.setString(1, slotName);
- preparedStatement.setString(2, DECODE_PLUGIN);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- return resultSet.next();
- }
+ preparedStatement.execute();
}
}
@@ -102,21 +122,7 @@ public final class PostgreSQLIngestPositionManager
implements DialectIngestPosit
@Override
public void destroy(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- dropSlotIfExist(connection, slotNameSuffix);
- }
- }
-
- private void dropSlotIfExist(final Connection connection, final String
slotNameSuffix) throws SQLException {
- String slotName =
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
- if (!isSlotExisting(connection, slotName)) {
- log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
- return;
- }
- log.info("dropSlotIfExist, slot exist, slotName={}", slotName);
- String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
- try (PreparedStatement preparedStatement =
connection.prepareStatement(dropSlotSQL)) {
- preparedStatement.setString(1, slotName);
- preparedStatement.execute();
+ dropSlotIfExist(connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
}
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
index 045aa5b3025..2c539c2a99a 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
@@ -119,7 +119,7 @@ class PostgreSQLIngestPositionManagerTest {
@SneakyThrows(SQLException.class)
private void mockSlotExistsOrNot(final boolean exists) {
PreparedStatement preparedStatement = mock(PreparedStatement.class);
- when(connection.prepareStatement("SELECT slot_name FROM
pg_replication_slots WHERE slot_name=? AND
plugin=?")).thenReturn(preparedStatement);
+ when(connection.prepareStatement("SELECT slot_name, database FROM
pg_replication_slots WHERE slot_name=? AND
plugin=?")).thenReturn(preparedStatement);
ResultSet resultSet = mock(ResultSet.class);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.next()).thenReturn(exists);