This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 618548ed8bf Add openGauss/PostgreSQL dumper reconnect at pipeline 
(#25932)
618548ed8bf is described below

commit 618548ed8bfc07771bc659ff82b68fe6e9a5d5fd
Author: Xinze Guo <[email protected]>
AuthorDate: Mon May 29 21:00:26 2023 +0800

    Add openGauss/PostgreSQL dumper reconnect at pipeline (#25932)
    
    * Add openGauss/PostgreSQL dumper reconnect
    
    * Simplify code
    
    * Improve reconnect
    
    * Improve reconnectTimes init
---
 .../pipeline/mysql/ingest/client/MySQLClient.java  |  7 +++----
 .../opengauss/ingest/OpenGaussWALDumper.java       | 23 +++++++++++++++++++---
 .../postgresql/ingest/PostgreSQLWALDumper.java     | 23 +++++++++++++++++++---
 .../cases/migration/AbstractMigrationE2EIT.java    |  2 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  2 +-
 .../pipeline/cases/task/E2EIncrementalTask.java    |  2 +-
 6 files changed, 46 insertions(+), 13 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 9d07b905b6a..5eac27cd25e 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -351,13 +351,12 @@ public final class MySQLClient {
         
         private void reconnect() {
             closeChannel();
-            if (reconnectTimes.get() > 3) {
-                log.warn("exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
+            if (reconnectTimes.incrementAndGet() > 3) {
+                log.warn("Exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
                 return;
             }
-            reconnectTimes.incrementAndGet();
             connect();
-            log.info("reconnect times {}", reconnectTimes.get());
+            log.info("Reconnect times {}", reconnectTimes.get());
             subscribe(lastBinlogEvent.get().getFileName(), 
lastBinlogEvent.get().getPosition());
         }
     }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index e438f3905c6..bbc5040c61e 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
@@ -48,10 +49,12 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * WAL dumper of openGauss.
  */
+@Slf4j
 public final class OpenGaussWALDumper extends AbstractLifecycleExecutor 
implements IncrementalDumper {
     
     private final DumperConfiguration dumperConfig;
@@ -80,9 +83,25 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
         this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
-    @SneakyThrows(InterruptedException.class)
     @Override
     protected void runBlocking() {
+        AtomicInteger reconnectTimes = new AtomicInteger();
+        while (isRunning()) {
+            try {
+                dump();
+                break;
+            } catch (final SQLException ex) {
+                int times = reconnectTimes.incrementAndGet();
+                log.error("Connect failed, reconnect times={}", times, ex);
+                if (times >= 5) {
+                    throw new IngestException(ex);
+                }
+            }
+        }
+    }
+    
+    @SneakyThrows(InterruptedException.class)
+    private void dump() throws SQLException {
         PGReplicationStream stream = null;
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
             stream = logicalReplication.createReplicationStream(connection, 
walPosition.getLogSequenceNumber(), 
OpenGaussPositionInitializer.getUniqueSlotName(connection, 
dumperConfig.getJobId()));
@@ -100,8 +119,6 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
                     processEventIgnoreTX(event);
                 }
             }
-        } catch (final SQLException ex) {
-            throw new IngestException(ex);
         } finally {
             if (null != stream) {
                 try {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index d5da268daf1..67ba0431e82 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
@@ -50,10 +51,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * PostgreSQL WAL dumper.
  */
+@Slf4j
 public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor 
implements IncrementalDumper {
     
     private final DumperConfiguration dumperConfig;
@@ -82,9 +85,25 @@ public final class PostgreSQLWALDumper extends 
AbstractLifecycleExecutor impleme
         this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
-    @SneakyThrows(InterruptedException.class)
     @Override
     protected void runBlocking() {
+        AtomicInteger reconnectTimes = new AtomicInteger();
+        while (isRunning()) {
+            try {
+                dump();
+                break;
+            } catch (final SQLException ex) {
+                int times = reconnectTimes.incrementAndGet();
+                log.error("Connect failed, reconnect times={}", times, ex);
+                if (times >= 5) {
+                    throw new IngestException(ex);
+                }
+            }
+        }
+    }
+    
+    @SneakyThrows(InterruptedException.class)
+    private void dump() throws SQLException {
         // TODO use unified PgConnection
         try (
                 Connection connection = 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperConfig.getDataSourceConfig());
@@ -105,8 +124,6 @@ public final class PostgreSQLWALDumper extends 
AbstractLifecycleExecutor impleme
                     processEventIgnoreTX(event);
                 }
             }
-        } catch (final SQLException ex) {
-            throw new IngestException(ex);
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index c9741373b44..a6554ee7580 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -125,7 +125,7 @@ public abstract class AbstractMigrationE2EIT {
     }
     
     protected void startMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
-        containerComposer.proxyExecuteWithLog(String.format("START MIGRATION 
'%s'", jobId), 1);
+        containerComposer.proxyExecuteWithLog(String.format("START MIGRATION 
'%s'", jobId), 4);
     }
     
     protected void commitMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 51fff681a5c..da030243900 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -103,7 +103,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s 
(order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(containerComposer, jobId);
-        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
+        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
                 String.format("SELECT * FROM %s WHERE order_id = %s", 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), 
recordId)).isEmpty());
         containerComposer.assertProxyOrderRecordExist(String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 42235de2b07..1550d33f5f2 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -86,7 +86,7 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
         }
         for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
             setNullToAllFields(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
-            deleteOrderById(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
+            deleteOrderById(primaryKeys.remove(random.nextInt(0, 
primaryKeys.size())));
         }
         log.info("increment task runnable execute successfully.");
     }

Reply via email to