This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 54d7e9c90b9 Improve Postgres generate slot name at scaling (#19864)
54d7e9c90b9 is described below

commit 54d7e9c90b91fa404c3dc88fbb8c0e7e89c2b62e
Author: Xinze Guo <101622833+aze...@users.noreply.github.com>
AuthorDate: Thu Aug 4 18:24:22 2022 +0800

    Improve Postgres generate slot name at scaling (#19864)
---
 .../ingest/OpenGaussPositionInitializer.java       |  5 +++-
 .../ingest/PostgreSQLPositionInitializer.java      |  5 +++-
 .../postgresql/ingest/PostgreSQLWalDumperTest.java |  1 +
 .../data/pipeline/cases/base/BaseITCase.java       | 27 ++++++++--------------
 4 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index e30e6d844ce..c05d33c4e3c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
@@ -127,7 +128,9 @@ public final class OpenGaussPositionInitializer implements 
PositionInitializer {
      * @throws SQLException failed when getCatalog
      */
     public static String getUniqueSlotName(final Connection connection) throws 
SQLException {
-        return String.format("%s_%s", SLOT_NAME_PREFIX, 
connection.getCatalog());
+        // same as PostgreSQL, but length over 64 will throw an exception 
directly
+        String slotName = DigestUtils.md5Hex(connection.getCatalog());
+        return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 34275e4aef4..178f4e5c3c4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
@@ -128,7 +129,9 @@ public final class PostgreSQLPositionInitializer implements 
PositionInitializer
      * @throws SQLException failed when getCatalog
      */
     public static String getUniqueSlotName(final Connection connection) throws 
SQLException {
-        return String.format("%s_%s", SLOT_NAME_PREFIX, 
connection.getCatalog());
+        // PostgreSQL slot name maximum length can't exceed 64,automatic 
truncation when the length exceeds the limit
+        String slotName = DigestUtils.md5Hex(connection.getCatalog());
+        return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 1c18128a0a6..8c0912d1874 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -115,6 +115,7 @@ public final class PostgreSQLWalDumperTest {
             ReflectionUtil.setFieldValue(walDumper, "logicalReplication", 
logicalReplication);
             
when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
             
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+            when(pgConnection.getCatalog()).thenReturn("test_db");
             when(logicalReplication.createReplicationStream(pgConnection, 
PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection), 
position.getLogSequenceNumber()))
                     .thenReturn(pgReplicationStream);
             ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: 
order_id[integer]:1".getBytes());
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index f0cf37a191d..318e2c96829 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -23,6 +23,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -64,7 +65,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -355,24 +356,16 @@ public abstract class BaseITCase {
         }
         log.info("jobId: {}", jobId);
         Set<String> actualStatus = null;
-        for (int i = 0; i < 15; i++) {
-            actualStatus = new HashSet<>();
+        for (int i = 0; i < 20; i++) {
             List<Map<String, Object>> showScalingStatusResult = 
showScalingStatus(jobId);
             log.info("show scaling status result: {}", 
showScalingStatusResult);
-            boolean finished = true;
-            for (Map<String, Object> each : showScalingStatusResult) {
-                String status = each.get("status").toString();
-                assertThat(status, not(JobStatus.PREPARING_FAILURE.name()));
-                assertThat(status, 
not(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
-                assertThat(status, 
not(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
-                actualStatus.add(status);
-                if (!Objects.equals(status, 
JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
-                    log.info("scaling status before increment, status: {}", 
status);
-                    finished = false;
-                    break;
-                }
-            }
-            if (finished) {
+            actualStatus = showScalingStatusResult.stream().map(each -> 
each.get("status").toString()).collect(Collectors.toSet());
+            assertFalse(CollectionUtils.containsAny(actualStatus, 
Arrays.asList(JobStatus.PREPARING_FAILURE.name(), 
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
+                    JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+            if (actualStatus.size() == 1 && 
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+                break;
+            } else if (actualStatus.size() >= 1 && 
actualStatus.containsAll(new HashSet<>(Arrays.asList("", 
JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
+                log.error("one of the shardingItem was not started correctly");
                 break;
             }
             ThreadUtil.sleep(2, TimeUnit.SECONDS);

Reply via email to