This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c24d70c389 Fix PostgreSQL replication slot not cleaned on job deleted 
(#30853)
5c24d70c389 is described below

commit 5c24d70c389ede802efac7d113dd1ba276db6d9a
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Apr 12 10:51:01 2024 +0800

    Fix PostgreSQL replication slot not cleaned on job deleted (#30853)
    
    * Fix PostgreSQL replication slot not cleaned on job deleted
    
    * Add replication slots count check for PostgreSQL E2E
    
    * Improve E2E in NATIVE mode
    
    * Ignore ADDED and UPDATED event when job is disabled
---
 .../JobConfigurationChangedProcessEngine.java      |  4 ++-
 .../pipeline/cases/PipelineContainerComposer.java  | 12 +++++---
 .../cases/migration/AbstractMigrationE2EIT.java    |  2 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 34 +++++++++++++++-------
 4 files changed, 36 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
index bfd188a3d11..666efd52db2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
@@ -53,11 +53,13 @@ public final class JobConfigurationChangedProcessEngine {
             Collection<Integer> shardingItems = 
PipelineJobRegistry.getShardingItems(jobId);
             PipelineJobRegistry.stop(jobId);
             disableJob(jobId, shardingItems);
-            return;
         }
         switch (eventType) {
             case ADDED:
             case UPDATED:
+                if (jobConfig.isDisabled()) {
+                    break;
+                }
                 if (PipelineJobRegistry.isExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it 
already exists", jobId);
                 } else {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 878338c5ee4..defd2d5bfef 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -144,6 +144,8 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         try (Connection connection = DriverManager.getConnection(jdbcUrl, 
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
             cleanUpPipelineJobs(connection, jobType);
             cleanUpProxyDatabase(connection);
+            // Compatible with "drop database if exists sharding_db;" failed 
for now
+            cleanUpProxyDatabase(connection);
             createProxyDatabase(connection);
         }
         cleanUpDataSource();
@@ -243,7 +245,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
                 .replace("${url}", getActualJdbcUrlTemplate(storageUnitName, 
true));
         proxyExecuteWithLog(registerStorageUnitTemplate, 0);
         int timeout = databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
-        Awaitility.await().ignoreExceptions().atMost(timeout, 
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> 
showStorageUnitsName().contains(storageUnitName));
+        Awaitility.await().ignoreExceptions().atMost(timeout, 
TimeUnit.SECONDS).pollInterval(3, TimeUnit.SECONDS).until(() -> 
showStorageUnitsName().contains(storageUnitName));
     }
     
     /**
@@ -263,7 +265,9 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      * @return storage units names
      */
     public List<String> showStorageUnitsName() {
-        return queryForListWithLog(proxyDataSource, "SHOW STORAGE 
UNITS").stream().map(each -> 
String.valueOf(each.get("name"))).collect(Collectors.toList());
+        List<String> result = queryForListWithLog(proxyDataSource, "SHOW 
STORAGE UNITS").stream().map(each -> 
String.valueOf(each.get("name"))).collect(Collectors.toList());
+        log.info("Show storage units name: {}", result);
+        return result;
     }
     
     /**
@@ -531,9 +535,9 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     public void assertOrderRecordExist(final DataSource dataSource, final 
String tableName, final Object orderId) {
         String sql;
         if (orderId instanceof String) {
-            sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", 
tableName, orderId);
+            sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s' AND 
user_id>0", tableName, orderId);
         } else {
-            sql = String.format("SELECT 1 FROM %s WHERE order_id = %s", 
tableName, orderId);
+            sql = String.format("SELECT 1 FROM %s WHERE order_id = %s AND 
user_id>0", tableName, orderId);
         }
         assertOrderRecordExist(dataSource, sql);
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 374e4eb13c1..a67d88e71a9 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -76,7 +76,7 @@ public abstract class AbstractMigrationE2EIT {
                 .replace("${ds3}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, 
true))
                 .replace("${ds4}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
true));
         containerComposer.proxyExecuteWithLog(addTargetResource, 0);
-        Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
+        Awaitility.await().ignoreExceptions().atMost(15L, 
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
     }
     
     protected void createSourceSchema(final PipelineContainerComposer 
containerComposer, final String schemaName) throws SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 159c61ac54b..0b9cf90772a 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
-import 
org.apache.shardingsphere.infra.algorithm.core.context.AlgorithmSQLContext;
 import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -41,13 +40,17 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.LocalDateTime;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
 
 @PipelineE2ESettings(database = {
         @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = 
"env/scenario/general/postgresql.xml"),
@@ -78,7 +81,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             log.info("init data begin: {}", LocalDateTime.now());
             
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), 
dataPair.getLeft());
             
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
-            log.info("init data end: {}", LocalDateTime.now());
+            int replicationSlotsCount = 
getReplicationSlotsCount(containerComposer);
+            log.info("init data end: {}, replication slots count: {}", 
LocalDateTime.now(), replicationSlotsCount);
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
             String jobId = getJobIdByTableName(containerComposer, "ds_0.test." 
+ SOURCE_TABLE_NAME);
@@ -90,7 +94,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
schemaTableName, 10000);
-            checkOrderMigration(containerComposer, jdbcDataSource, jobId);
+            checkOrderMigration(containerComposer, jobId);
+            startMigrationWithSchema(containerComposer, "t_order_item", 
"t_order_item");
             checkOrderItemMigration(containerComposer);
             for (String each : listJobId(containerComposer)) {
                 commitMigrationByJobId(containerComposer, each);
@@ -98,27 +103,36 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
             
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, 
PipelineContainerComposer.SCHEMA_NAME);
+            assertThat("Replication slots count doesn't match, it might be not 
cleaned, run `SELECT * FROM pg_replication_slots;` in PostgreSQL to verify",
+                    getReplicationSlotsCount(containerComposer), 
is(replicationSlotsCount));
         }
     }
     
-    private void checkOrderMigration(final PipelineContainerComposer 
containerComposer, final DataSource jdbcDataSource, final String jobId) throws 
SQLException {
+    private void checkOrderMigration(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(containerComposer, jobId);
-        long recordId = new 
SnowflakeKeyGenerateAlgorithm().generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next();
-        containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s 
(order_id,user_id,status) VALUES (%s, %s, '%s')",
-                String.join(".", PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(containerComposer, jobId);
-        containerComposer.assertOrderRecordExist(jdbcDataSource, 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), 
recordId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
     private void checkOrderItemMigration(final PipelineContainerComposer 
containerComposer) throws SQLException {
-        startMigrationWithSchema(containerComposer, "t_order_item", 
"t_order_item");
         String jobId = getJobIdByTableName(containerComposer, 
"ds_0.test.t_order_item");
         containerComposer.waitJobStatusReached(String.format("SHOW MIGRATION 
STATUS '%s'", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
+    private int getReplicationSlotsCount(final PipelineContainerComposer 
containerComposer) throws SQLException {
+        try (
+                Connection connection = 
containerComposer.getSourceDataSource().getConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery("SELECT COUNT(1) 
FROM pg_replication_slots")) {
+            if (!resultSet.next()) {
+                return 0;
+            }
+            return resultSet.getInt(1);
+        }
+    }
+    
     private static boolean isEnabled() {
         return 
PipelineE2ECondition.isEnabled(TypedSPILoader.getService(DatabaseType.class, 
"PostgreSQL"), TypedSPILoader.getService(DatabaseType.class, "openGauss"));
     }

Reply via email to