This is an automated email from the ASF dual-hosted git repository. zhonghongsheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 54d7e9c90b9 Improve Postgres generate slot name at scaling (#19864) 54d7e9c90b9 is described below commit 54d7e9c90b91fa404c3dc88fbb8c0e7e89c2b62e Author: Xinze Guo <101622833+aze...@users.noreply.github.com> AuthorDate: Thu Aug 4 18:24:22 2022 +0800 Improve Postgres generate slot name at scaling (#19864) --- .../ingest/OpenGaussPositionInitializer.java | 5 +++- .../ingest/PostgreSQLPositionInitializer.java | 5 +++- .../postgresql/ingest/PostgreSQLWalDumperTest.java | 1 + .../data/pipeline/cases/base/BaseITCase.java | 27 ++++++++-------------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java index e30e6d844ce..c05d33c4e3c 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition; import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer; @@ -127,7 +128,9 @@ public final class OpenGaussPositionInitializer implements PositionInitializer { * @throws SQLException failed when getCatalog */ public static String getUniqueSlotName(final Connection connection) throws SQLException { - return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog()); + // same as PostgreSQL, but length over 64 will throw an exception directly + String slotName = DigestUtils.md5Hex(connection.getCatalog()); + return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); } @Override diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java index 34275e4aef4..178f4e5c3c4 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber; import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer; @@ -128,7 +129,9 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer * @throws SQLException failed when getCatalog */ public static String getUniqueSlotName(final Connection connection) throws SQLException { - return String.format("%s_%s", SLOT_NAME_PREFIX, connection.getCatalog()); + // PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit + String slotName = DigestUtils.md5Hex(connection.getCatalog()); + return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); } @Override diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java index 1c18128a0a6..8c0912d1874 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java @@ -115,6 +115,7 @@ public final class PostgreSQLWalDumperTest { ReflectionUtil.setFieldValue(walDumper, "logicalReplication", logicalReplication); when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection); when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection); + when(pgConnection.getCatalog()).thenReturn("test_db"); when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection), position.getLogSequenceNumber())) .thenReturn(pgReplicationStream); ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes()); diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java index f0cf37a191d..318e2c96829 100644 --- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java +++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java @@ -23,6 +23,7 @@ import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil; @@ -64,7 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -355,24 +356,16 @@ public abstract class BaseITCase { } log.info("jobId: {}", jobId); Set<String> actualStatus = null; - for (int i = 0; i < 15; i++) { - actualStatus = new HashSet<>(); + for (int i = 0; i < 20; i++) { List<Map<String, Object>> showScalingStatusResult = showScalingStatus(jobId); log.info("show scaling status result: {}", showScalingStatusResult); - boolean finished = true; - for (Map<String, Object> each : showScalingStatusResult) { - String status = each.get("status").toString(); - assertThat(status, not(JobStatus.PREPARING_FAILURE.name())); - assertThat(status, not(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name())); - assertThat(status, not(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())); - actualStatus.add(status); - if (!Objects.equals(status, JobStatus.EXECUTE_INCREMENTAL_TASK.name())) { - log.info("scaling status before increment, status: {}", status); - finished = false; - break; - } - } - if (finished) { + actualStatus = showScalingStatusResult.stream().map(each -> each.get("status").toString()).collect(Collectors.toSet()); + assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(), + JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()))); + if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) { + break; + } else if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) { + log.error("one of the shardingItem was not started correctly"); break; } ThreadUtil.sleep(2, TimeUnit.SECONDS);