This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 618548ed8bf Add openGauss/PostgreSQL dumper reconnect at pipeline
(#25932)
618548ed8bf is described below
commit 618548ed8bfc07771bc659ff82b68fe6e9a5d5fd
Author: Xinze Guo <[email protected]>
AuthorDate: Mon May 29 21:00:26 2023 +0800
Add openGauss/PostgreSQL dumper reconnect at pipeline (#25932)
* Add openGauss/PostgreSQL dumper reconnect
* Simplify code
* Improve reconnect
* Improve reconnectTimes init
---
.../pipeline/mysql/ingest/client/MySQLClient.java | 7 +++----
.../opengauss/ingest/OpenGaussWALDumper.java | 23 +++++++++++++++++++---
.../postgresql/ingest/PostgreSQLWALDumper.java | 23 +++++++++++++++++++---
.../cases/migration/AbstractMigrationE2EIT.java | 2 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 2 +-
.../pipeline/cases/task/E2EIncrementalTask.java | 2 +-
6 files changed, 46 insertions(+), 13 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 9d07b905b6a..5eac27cd25e 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -351,13 +351,12 @@ public final class MySQLClient {
private void reconnect() {
closeChannel();
- if (reconnectTimes.get() > 3) {
- log.warn("exceeds the maximum number of retry times, last
binlog event:{}", lastBinlogEvent);
+ if (reconnectTimes.incrementAndGet() > 3) {
+ log.warn("Exceeds the maximum number of retry times, last
binlog event:{}", lastBinlogEvent);
return;
}
- reconnectTimes.incrementAndGet();
connect();
- log.info("reconnect times {}", reconnectTimes.get());
+ log.info("Reconnect times {}", reconnectTimes.get());
subscribe(lastBinlogEvent.get().getFileName(),
lastBinlogEvent.get().getPosition());
}
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index e438f3905c6..bbc5040c61e 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
@@ -48,10 +49,12 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* WAL dumper of openGauss.
*/
+@Slf4j
public final class OpenGaussWALDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
private final DumperConfiguration dumperConfig;
@@ -80,9 +83,25 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
this.decodeWithTX = dumperConfig.isDecodeWithTX();
}
- @SneakyThrows(InterruptedException.class)
@Override
protected void runBlocking() {
+ AtomicInteger reconnectTimes = new AtomicInteger();
+ while (isRunning()) {
+ try {
+ dump();
+ break;
+ } catch (final SQLException ex) {
+ int times = reconnectTimes.incrementAndGet();
+ log.error("Connect failed, reconnect times={}", times, ex);
+ if (times >= 5) {
+ throw new IngestException(ex);
+ }
+ }
+ }
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ private void dump() throws SQLException {
PGReplicationStream stream = null;
try (PgConnection connection = getReplicationConnectionUnwrap()) {
stream = logicalReplication.createReplicationStream(connection,
walPosition.getLogSequenceNumber(),
OpenGaussPositionInitializer.getUniqueSlotName(connection,
dumperConfig.getJobId()));
@@ -100,8 +119,6 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
processEventIgnoreTX(event);
}
}
- } catch (final SQLException ex) {
- throw new IngestException(ex);
} finally {
if (null != stream) {
try {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index d5da268daf1..67ba0431e82 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
@@ -50,10 +51,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* PostgreSQL WAL dumper.
*/
+@Slf4j
public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor
implements IncrementalDumper {
private final DumperConfiguration dumperConfig;
@@ -82,9 +85,25 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
this.decodeWithTX = dumperConfig.isDecodeWithTX();
}
- @SneakyThrows(InterruptedException.class)
@Override
protected void runBlocking() {
+ AtomicInteger reconnectTimes = new AtomicInteger();
+ while (isRunning()) {
+ try {
+ dump();
+ break;
+ } catch (final SQLException ex) {
+ int times = reconnectTimes.incrementAndGet();
+ log.error("Connect failed, reconnect times={}", times, ex);
+ if (times >= 5) {
+ throw new IngestException(ex);
+ }
+ }
+ }
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ private void dump() throws SQLException {
// TODO use unified PgConnection
try (
Connection connection =
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperConfig.getDataSourceConfig());
@@ -105,8 +124,6 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
processEventIgnoreTX(event);
}
}
- } catch (final SQLException ex) {
- throw new IngestException(ex);
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index c9741373b44..a6554ee7580 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -125,7 +125,7 @@ public abstract class AbstractMigrationE2EIT {
}
protected void startMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
- containerComposer.proxyExecuteWithLog(String.format("START MIGRATION
'%s'", jobId), 1);
+ containerComposer.proxyExecuteWithLog(String.format("START MIGRATION
'%s'", jobId), 4);
}
protected void commitMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 51fff681a5c..da030243900 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -103,7 +103,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s
(order_id,user_id,status) VALUES (%s, %s, '%s')",
String.join(".", PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
startMigrationByJobId(containerComposer, jobId);
- Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
+ Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
String.format("SELECT * FROM %s WHERE order_id = %s",
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME),
recordId)).isEmpty());
containerComposer.assertProxyOrderRecordExist(String.join(".",
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 42235de2b07..1550d33f5f2 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -86,7 +86,7 @@ public final class E2EIncrementalTask extends
BaseIncrementTask {
}
for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
setNullToAllFields(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
- deleteOrderById(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
+ deleteOrderById(primaryKeys.remove(random.nextInt(0,
primaryKeys.size())));
}
log.info("increment task runnable execute successfully.");
}