This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5c24d70c389 Fix PostgreSQL replication slot not cleaned on job deleted
(#30853)
5c24d70c389 is described below
commit 5c24d70c389ede802efac7d113dd1ba276db6d9a
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Apr 12 10:51:01 2024 +0800
Fix PostgreSQL replication slot not cleaned on job deleted (#30853)
* Fix PostgreSQL replication slot not cleaned on job deleted
* Add replication slots count check for PostgreSQL E2E
* Improve E2E in NATIVE mode
* Ignore ADDED and UPDATED event when job is disabled
---
.../JobConfigurationChangedProcessEngine.java | 4 ++-
.../pipeline/cases/PipelineContainerComposer.java | 12 +++++---
.../cases/migration/AbstractMigrationE2EIT.java | 2 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 34 +++++++++++++++-------
4 files changed, 36 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
index bfd188a3d11..666efd52db2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java
@@ -53,11 +53,13 @@ public final class JobConfigurationChangedProcessEngine {
Collection<Integer> shardingItems =
PipelineJobRegistry.getShardingItems(jobId);
PipelineJobRegistry.stop(jobId);
disableJob(jobId, shardingItems);
- return;
}
switch (eventType) {
case ADDED:
case UPDATED:
+ if (jobConfig.isDisabled()) {
+ break;
+ }
if (PipelineJobRegistry.isExisting(jobId)) {
log.info("{} added to executing jobs failed since it
already exists", jobId);
} else {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 878338c5ee4..defd2d5bfef 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -144,6 +144,8 @@ public final class PipelineContainerComposer implements
AutoCloseable {
try (Connection connection = DriverManager.getConnection(jdbcUrl,
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
cleanUpPipelineJobs(connection, jobType);
cleanUpProxyDatabase(connection);
+ // Compatible with "drop database if exists sharding_db;" failed
for now
+ cleanUpProxyDatabase(connection);
createProxyDatabase(connection);
}
cleanUpDataSource();
@@ -243,7 +245,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
.replace("${url}", getActualJdbcUrlTemplate(storageUnitName,
true));
proxyExecuteWithLog(registerStorageUnitTemplate, 0);
int timeout = databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
- Awaitility.await().ignoreExceptions().atMost(timeout,
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() ->
showStorageUnitsName().contains(storageUnitName));
+ Awaitility.await().ignoreExceptions().atMost(timeout,
TimeUnit.SECONDS).pollInterval(3, TimeUnit.SECONDS).until(() ->
showStorageUnitsName().contains(storageUnitName));
}
/**
@@ -263,7 +265,9 @@ public final class PipelineContainerComposer implements
AutoCloseable {
* @return storage units names
*/
public List<String> showStorageUnitsName() {
- return queryForListWithLog(proxyDataSource, "SHOW STORAGE
UNITS").stream().map(each ->
String.valueOf(each.get("name"))).collect(Collectors.toList());
+ List<String> result = queryForListWithLog(proxyDataSource, "SHOW
STORAGE UNITS").stream().map(each ->
String.valueOf(each.get("name"))).collect(Collectors.toList());
+ log.info("Show storage units name: {}", result);
+ return result;
}
/**
@@ -531,9 +535,9 @@ public final class PipelineContainerComposer implements
AutoCloseable {
public void assertOrderRecordExist(final DataSource dataSource, final
String tableName, final Object orderId) {
String sql;
if (orderId instanceof String) {
- sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'",
tableName, orderId);
+ sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s' AND
user_id>0", tableName, orderId);
} else {
- sql = String.format("SELECT 1 FROM %s WHERE order_id = %s",
tableName, orderId);
+ sql = String.format("SELECT 1 FROM %s WHERE order_id = %s AND
user_id>0", tableName, orderId);
}
assertOrderRecordExist(dataSource, sql);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 374e4eb13c1..a67d88e71a9 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -76,7 +76,7 @@ public abstract class AbstractMigrationE2EIT {
.replace("${ds3}",
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3,
true))
.replace("${ds4}",
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
true));
containerComposer.proxyExecuteWithLog(addTargetResource, 0);
- Awaitility.await().ignoreExceptions().atMost(10L,
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 3 ==
containerComposer.showStorageUnitsName().size());
+ Awaitility.await().ignoreExceptions().atMost(15L,
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> 3 ==
containerComposer.showStorageUnitsName().size());
}
protected void createSourceSchema(final PipelineContainerComposer
containerComposer, final String schemaName) throws SQLException {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 159c61ac54b..0b9cf90772a 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
-import
org.apache.shardingsphere.infra.algorithm.core.context.AlgorithmSQLContext;
import
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -41,13 +40,17 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
@PipelineE2ESettings(database = {
@PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles =
"env/scenario/general/postgresql.xml"),
@@ -78,7 +81,8 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
log.info("init data begin: {}", LocalDateTime.now());
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
dataPair.getLeft());
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
- log.info("init data end: {}", LocalDateTime.now());
+ int replicationSlotsCount =
getReplicationSlotsCount(containerComposer);
+ log.info("init data end: {}, replication slots count: {}",
LocalDateTime.now(), replicationSlotsCount);
startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME,
"t_order");
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
String jobId = getJobIdByTableName(containerComposer, "ds_0.test."
+ SOURCE_TABLE_NAME);
@@ -90,7 +94,8 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
schemaTableName, 10000);
- checkOrderMigration(containerComposer, jdbcDataSource, jobId);
+ checkOrderMigration(containerComposer, jobId);
+ startMigrationWithSchema(containerComposer, "t_order_item",
"t_order_item");
checkOrderItemMigration(containerComposer);
for (String each : listJobId(containerComposer)) {
commitMigrationByJobId(containerComposer, each);
@@ -98,27 +103,36 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1,
PipelineContainerComposer.SCHEMA_NAME);
+ assertThat("Replication slots count doesn't match, it might be not
cleaned, run `SELECT * FROM pg_replication_slots;` in PostgreSQL to verify",
+ getReplicationSlotsCount(containerComposer),
is(replicationSlotsCount));
}
}
- private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final DataSource jdbcDataSource, final String jobId) throws
SQLException {
+ private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(containerComposer, jobId);
- long recordId = new
SnowflakeKeyGenerateAlgorithm().generateKeys(mock(AlgorithmSQLContext.class),
1).iterator().next();
- containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (%s, %s, '%s')",
- String.join(".", PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
startMigrationByJobId(containerComposer, jobId);
- containerComposer.assertOrderRecordExist(jdbcDataSource,
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME),
recordId);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
private void checkOrderItemMigration(final PipelineContainerComposer
containerComposer) throws SQLException {
- startMigrationWithSchema(containerComposer, "t_order_item",
"t_order_item");
String jobId = getJobIdByTableName(containerComposer,
"ds_0.test.t_order_item");
containerComposer.waitJobStatusReached(String.format("SHOW MIGRATION
STATUS '%s'", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
+ private int getReplicationSlotsCount(final PipelineContainerComposer
containerComposer) throws SQLException {
+ try (
+ Connection connection =
containerComposer.getSourceDataSource().getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("SELECT COUNT(1)
FROM pg_replication_slots")) {
+ if (!resultSet.next()) {
+ return 0;
+ }
+ return resultSet.getInt(1);
+ }
+ }
+
private static boolean isEnabled() {
return
PipelineE2ECondition.isEnabled(TypedSPILoader.getService(DatabaseType.class,
"PostgreSQL"), TypedSPILoader.getService(DatabaseType.class, "openGauss"));
}