This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cc456d0a78b Improve consumption timeout in CDC E2E of PostgreSQL 
(#29911)
cc456d0a78b is described below

commit cc456d0a78bcd322a2d5f4365db406077a1f2f21
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 30 12:15:02 2024 +0800

    Improve consumption timeout in CDC E2E of PostgreSQL (#29911)
    
    * Add log for DataSourceRecordConsumer.write
    
    * Refactor CDCImporter
    
    * Improve consumption timeout in CDC E2E of PostgreSQL
---
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  2 +-
 .../pipeline/cdc/core/importer/CDCImporter.java    | 34 ++++++++---------
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  2 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  1 +
 .../cases/cdc/DataSourceRecordConsumer.java        | 43 ++++++++++++++--------
 5 files changed, 47 insertions(+), 35 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 090bfd0de53..bdf81364994 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -168,7 +168,7 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
             
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
             if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
                 PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink) 
jobItemContext.getSink();
-                cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
+                cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", Optional.ofNullable(throwable.getMessage()).orElse("")));
             }
             PipelineJobRegistry.stop(jobId);
             jobAPI.disable(jobId);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 27addbe865f..ff130efc1a6 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -57,7 +57,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     @Getter
     private final String importerId = RandomStringUtils.randomAlphanumeric(8);
     
-    private final List<CDCChannelProgressPair> originalChannelProgressPairs;
+    private final List<CDCChannelProgressPair> channelProgressPairs;
     
     private final int batchSize;
     
@@ -76,7 +76,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     @Override
     protected void runBlocking() {
         CDCImporterManager.putImporter(this);
-        for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+        for (CDCChannelProgressPair each : channelProgressPairs) {
             each.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
         }
         while (isRunning()) {
@@ -85,7 +85,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
             } else {
                 doWithoutSorting();
             }
-            if (originalChannelProgressPairs.isEmpty()) {
+            if (channelProgressPairs.isEmpty()) {
                 break;
             }
         }
@@ -113,8 +113,8 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     private List<CSNRecords> getCsnRecordsList() {
         List<CSNRecords> result = new LinkedList<>();
         CSNRecords firstRecords = null;
-        for (int i = 0, count = originalChannelProgressPairs.size(); i < 
count; i++) {
-            prepareTransactionRecords(originalChannelProgressPairs);
+        for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
+            prepareTransactionRecords();
             CSNRecords csnRecords = csnRecordsQueue.peek();
             if (null == csnRecords) {
                 continue;
@@ -133,15 +133,15 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     
     // TODO openGauss CSN should be incremented for every transaction. 
Currently, CSN might be duplicated in transactions.
     // TODO Use channels watermark depth to improve performance.
-    private void prepareTransactionRecords(final List<CDCChannelProgressPair> 
channelProgressPairs) {
+    private void prepareTransactionRecords() {
         if (csnRecordsQueue.isEmpty()) {
-            prepareWhenQueueIsEmpty(channelProgressPairs);
+            prepareWhenQueueIsEmpty();
         } else {
-            prepareWhenQueueIsNotEmpty(channelProgressPairs, 
csnRecordsQueue.peek().getCsn());
+            prepareWhenQueueIsNotEmpty(csnRecordsQueue.peek().getCsn());
         }
     }
     
-    private void prepareWhenQueueIsEmpty(final List<CDCChannelProgressPair> 
channelProgressPairs) {
+    private void prepareWhenQueueIsEmpty() {
         for (CDCChannelProgressPair each : channelProgressPairs) {
             PipelineChannel channel = each.getChannel();
             List<Record> records = channel.poll();
@@ -156,7 +156,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         }
     }
     
-    private void prepareWhenQueueIsNotEmpty(final List<CDCChannelProgressPair> 
channelProgressPairs, final long oldestCSN) {
+    private void prepareWhenQueueIsNotEmpty(final long oldestCSN) {
         for (CDCChannelProgressPair each : channelProgressPairs) {
             PipelineChannel channel = each.getChannel();
             List<Record> records = channel.peek();
@@ -209,13 +209,13 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     }
     
     private void doWithoutSorting() {
-        for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+        for (CDCChannelProgressPair each : channelProgressPairs) {
             doWithoutSorting(each);
         }
     }
     
-    private void doWithoutSorting(final CDCChannelProgressPair progressPair) {
-        PipelineChannel channel = progressPair.getChannel();
+    private void doWithoutSorting(final CDCChannelProgressPair 
channelProgressPair) {
+        PipelineChannel channel = channelProgressPair.getChannel();
         List<Record> records = channel.fetch(batchSize, timeoutMillis);
         if (records.isEmpty()) {
             return;
@@ -223,9 +223,9 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         Record lastRecord = records.get(records.size() - 1);
         if (records.stream().noneMatch(DataRecord.class::isInstance)) {
             channel.ack(records);
-            progressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
+            channelProgressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
             if (lastRecord instanceof FinishedRecord) {
-                originalChannelProgressPairs.remove(progressPair);
+                channelProgressPairs.remove(channelProgressPair);
             }
             return;
         }
@@ -233,7 +233,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
             rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
         }
         String ackId = CDCAckId.build(importerId).marshal();
-        ackCache.put(ackId, Collections.singletonList(Pair.of(progressPair, 
new CDCAckPosition(records.get(records.size() - 1), 
getDataRecordsCount(records)))));
+        ackCache.put(ackId, 
Collections.singletonList(Pair.of(channelProgressPair, new 
CDCAckPosition(records.get(records.size() - 1), 
getDataRecordsCount(records)))));
         sink.write(ackId, records);
     }
     
@@ -253,7 +253,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
             Record lastRecord = ackPosition.getLastRecord();
             
each.getLeft().getChannel().ack(Collections.singletonList(lastRecord));
             if (lastRecord instanceof FinishedRecord) {
-                originalChannelProgressPairs.remove(each.getKey());
+                channelProgressPairs.remove(each.getKey());
             }
             each.getLeft().getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
         }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 1ce11748f68..fab205426af 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -146,7 +146,7 @@ public final class CDCJobPreparer {
                 .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, 
jobItemContext.getSourceMetaDataLoader());
         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
         Importer importer = importerUsed.get() ? null
-                : new CDCImporter(channelProgressPairs, 1, 300L, 
jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
+                : new CDCImporter(channelProgressPairs, 1, 100L, 
jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
         PipelineTask incrementalTask = new CDCIncrementalTask(
                 dumperContext.getCommonContext().getDataSourceName(), 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, 
importer, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 2fa48fc997e..536b38ce50d 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -125,6 +125,7 @@ class CDCE2EIT {
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
             String tableName = dialectDatabaseMetaData.isSchemaAvailable() ? 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : 
SOURCE_TABLE_NAME;
             new E2EIncrementalTask(sourceDataSource, tableName, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
+            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
             for (int i = 1; i <= 4; i++) {
                 int orderId = 10000 + i;
                 containerComposer.proxyExecuteWithLog(String.format("INSERT 
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId, 
i), 0);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index c77ab2b2004..eab65bb2ce8 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -20,8 +20,6 @@ package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.cdc;
 import com.google.common.base.Strings;
 import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.util.ProtobufAnyValueConverter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
@@ -29,6 +27,8 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.SQLBuilderUtils;
 
@@ -41,7 +41,6 @@ import java.sql.Types;
 import java.time.LocalDate;
 import java.time.LocalTime;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -91,7 +90,7 @@ public final class DataSourceRecordConsumer implements 
Consumer<List<Record>> {
             return;
         }
         for (Record each : records) {
-            write(each, connection);
+            write(each, connection, records.size() < 5);
         }
     }
     
@@ -114,39 +113,46 @@ public final class DataSourceRecordConsumer implements 
Consumer<List<Record>> {
         }
     }
     
-    private void write(final Record ingestedRecord, final Connection 
connection) throws SQLException {
+    private void write(final Record ingestedRecord, final Connection 
connection, final boolean printSQL) throws SQLException {
         String sql = buildSQL(ingestedRecord);
         MetaData metaData = ingestedRecord.getMetaData();
         PipelineTableMetaData tableMetaData = 
loadTableMetaData(metaData.getSchema(), metaData.getTable());
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            Map<String, TableColumn> afterMap = new 
LinkedHashMap<>(ingestedRecord.getBeforeList().size(), 1F);
-            ingestedRecord.getAfterList().forEach(each -> 
afterMap.put(each.getName(), each));
+            int updateCount;
             switch (ingestedRecord.getDataChangeType()) {
                 case INSERT:
                     for (int i = 0; i < ingestedRecord.getAfterCount(); i++) {
                         TableColumn tableColumn = 
ingestedRecord.getAfterList().get(i);
                         preparedStatement.setObject(i + 1, 
convertValueFromAny(tableMetaData, tableColumn));
                     }
+                    updateCount = preparedStatement.executeUpdate();
+                    if (1 != updateCount || printSQL) {
+                        log.info("Execute insert, update count: {}, sql: {}, 
values: {}", updateCount, sql,
+                                
ingestedRecord.getAfterList().stream().map(each -> 
convertValueFromAny(tableMetaData, each)).collect(Collectors.toList()));
+                    }
                     break;
                 case UPDATE:
                     for (int i = 0; i < ingestedRecord.getAfterCount(); i++) {
                         TableColumn tableColumn = 
ingestedRecord.getAfterList().get(i);
                         preparedStatement.setObject(i + 1, 
convertValueFromAny(tableMetaData, tableColumn));
                     }
-                    preparedStatement.setObject(ingestedRecord.getAfterCount() 
+ 1, convertValueFromAny(tableMetaData, afterMap.get("order_id")));
+                    preparedStatement.setObject(ingestedRecord.getAfterCount() 
+ 1, convertValueFromAny(tableMetaData, 
getOrderIdTableColumn(ingestedRecord.getAfterList())));
+                    updateCount = preparedStatement.executeUpdate();
+                    if (1 != updateCount || printSQL) {
+                        log.info("Execute update, update count: {}, sql: {}, 
values: {}", updateCount, sql,
+                                
ingestedRecord.getAfterList().stream().map(each -> 
convertValueFromAny(tableMetaData, each)).collect(Collectors.toList()));
+                    }
                     break;
                 case DELETE:
-                    TableColumn orderId = 
ingestedRecord.getBeforeList().stream().filter(each -> 
"order_id".equals(each.getName())).findFirst()
-                            .orElseThrow(() -> new 
UnsupportedOperationException("No primary key found in the t_order"));
-                    preparedStatement.setObject(1, 
convertValueFromAny(tableMetaData, orderId));
-                    preparedStatement.execute();
+                    Object orderId = convertValueFromAny(tableMetaData, 
getOrderIdTableColumn(ingestedRecord.getBeforeList()));
+                    preparedStatement.setObject(1, orderId);
+                    updateCount = preparedStatement.executeUpdate();
+                    if (1 != updateCount || printSQL) {
+                        log.info("Execute delete, update count: {}, sql: {}, 
order_id: {}", updateCount, sql, orderId);
+                    }
                     break;
                 default:
             }
-            int updateCount = preparedStatement.executeUpdate();
-            if (1 != updateCount) {
-                log.warn("executeUpdate failed, update count: {}, sql: {}, 
updated columns: {}", updateCount, sql, afterMap.keySet());
-            }
         }
     }
     
@@ -180,6 +186,11 @@ public final class DataSourceRecordConsumer implements 
Consumer<List<Record>> {
         }
     }
     
+    private TableColumn getOrderIdTableColumn(final List<TableColumn> 
tableColumns) {
+        return tableColumns.stream().filter(each -> 
"order_id".equals(each.getName())).findFirst()
+                .orElseThrow(() -> new UnsupportedOperationException("No 
primary key found in the t_order"));
+    }
+    
     private Object convertValueFromAny(final PipelineTableMetaData 
tableMetaData, final TableColumn tableColumn) {
         PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(tableColumn.getName());
         Object result;

Reply via email to