This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cc456d0a78b Improve consumption timeout in CDC E2E of PostgreSQL
(#29911)
cc456d0a78b is described below
commit cc456d0a78bcd322a2d5f4365db406077a1f2f21
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 30 12:15:02 2024 +0800
Improve consumption timeout in CDC E2E of PostgreSQL (#29911)
* Add log for DataSourceRecordConsumer.write
* Refactor CDCImporter
* Improve consumption timeout in CDC E2E of PostgreSQL
---
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 2 +-
.../pipeline/cdc/core/importer/CDCImporter.java | 34 ++++++++---------
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 2 +-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 1 +
.../cases/cdc/DataSourceRecordConsumer.java | 43 ++++++++++++++--------
5 files changed, 47 insertions(+), 35 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 090bfd0de53..bdf81364994 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -168,7 +168,7 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink)
jobItemContext.getSink();
- cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
+ cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", Optional.ofNullable(throwable.getMessage()).orElse("")));
}
PipelineJobRegistry.stop(jobId);
jobAPI.disable(jobId);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 27addbe865f..ff130efc1a6 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -57,7 +57,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
@Getter
private final String importerId = RandomStringUtils.randomAlphanumeric(8);
- private final List<CDCChannelProgressPair> originalChannelProgressPairs;
+ private final List<CDCChannelProgressPair> channelProgressPairs;
private final int batchSize;
@@ -76,7 +76,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
@Override
protected void runBlocking() {
CDCImporterManager.putImporter(this);
- for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+ for (CDCChannelProgressPair each : channelProgressPairs) {
each.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
}
while (isRunning()) {
@@ -85,7 +85,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
} else {
doWithoutSorting();
}
- if (originalChannelProgressPairs.isEmpty()) {
+ if (channelProgressPairs.isEmpty()) {
break;
}
}
@@ -113,8 +113,8 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
private List<CSNRecords> getCsnRecordsList() {
List<CSNRecords> result = new LinkedList<>();
CSNRecords firstRecords = null;
- for (int i = 0, count = originalChannelProgressPairs.size(); i <
count; i++) {
- prepareTransactionRecords(originalChannelProgressPairs);
+ for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
+ prepareTransactionRecords();
CSNRecords csnRecords = csnRecordsQueue.peek();
if (null == csnRecords) {
continue;
@@ -133,15 +133,15 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
// TODO openGauss CSN should be incremented for every transaction.
Currently, CSN might be duplicated in transactions.
// TODO Use channels watermark depth to improve performance.
- private void prepareTransactionRecords(final List<CDCChannelProgressPair>
channelProgressPairs) {
+ private void prepareTransactionRecords() {
if (csnRecordsQueue.isEmpty()) {
- prepareWhenQueueIsEmpty(channelProgressPairs);
+ prepareWhenQueueIsEmpty();
} else {
- prepareWhenQueueIsNotEmpty(channelProgressPairs,
csnRecordsQueue.peek().getCsn());
+ prepareWhenQueueIsNotEmpty(csnRecordsQueue.peek().getCsn());
}
}
- private void prepareWhenQueueIsEmpty(final List<CDCChannelProgressPair>
channelProgressPairs) {
+ private void prepareWhenQueueIsEmpty() {
for (CDCChannelProgressPair each : channelProgressPairs) {
PipelineChannel channel = each.getChannel();
List<Record> records = channel.poll();
@@ -156,7 +156,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
}
- private void prepareWhenQueueIsNotEmpty(final List<CDCChannelProgressPair>
channelProgressPairs, final long oldestCSN) {
+ private void prepareWhenQueueIsNotEmpty(final long oldestCSN) {
for (CDCChannelProgressPair each : channelProgressPairs) {
PipelineChannel channel = each.getChannel();
List<Record> records = channel.peek();
@@ -209,13 +209,13 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
private void doWithoutSorting() {
- for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+ for (CDCChannelProgressPair each : channelProgressPairs) {
doWithoutSorting(each);
}
}
- private void doWithoutSorting(final CDCChannelProgressPair progressPair) {
- PipelineChannel channel = progressPair.getChannel();
+ private void doWithoutSorting(final CDCChannelProgressPair
channelProgressPair) {
+ PipelineChannel channel = channelProgressPair.getChannel();
List<Record> records = channel.fetch(batchSize, timeoutMillis);
if (records.isEmpty()) {
return;
@@ -223,9 +223,9 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
Record lastRecord = records.get(records.size() - 1);
if (records.stream().noneMatch(DataRecord.class::isInstance)) {
channel.ack(records);
- progressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ channelProgressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
if (lastRecord instanceof FinishedRecord) {
- originalChannelProgressPairs.remove(progressPair);
+ channelProgressPairs.remove(channelProgressPair);
}
return;
}
@@ -233,7 +233,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
}
String ackId = CDCAckId.build(importerId).marshal();
- ackCache.put(ackId, Collections.singletonList(Pair.of(progressPair,
new CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
+ ackCache.put(ackId,
Collections.singletonList(Pair.of(channelProgressPair, new
CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
sink.write(ackId, records);
}
@@ -253,7 +253,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
Record lastRecord = ackPosition.getLastRecord();
each.getLeft().getChannel().ack(Collections.singletonList(lastRecord));
if (lastRecord instanceof FinishedRecord) {
- originalChannelProgressPairs.remove(each.getKey());
+ channelProgressPairs.remove(each.getKey());
}
each.getLeft().getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 1ce11748f68..fab205426af 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -146,7 +146,7 @@ public final class CDCJobPreparer {
.createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel,
jobItemContext.getSourceMetaDataLoader());
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
- : new CDCImporter(channelProgressPairs, 1, 300L,
jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
+ : new CDCImporter(channelProgressPairs, 1, 100L,
jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
PipelineTask incrementalTask = new CDCIncrementalTask(
dumperContext.getCommonContext().getDataSourceName(),
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper,
importer, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 2fa48fc997e..536b38ce50d 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -125,6 +125,7 @@ class CDCE2EIT {
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
String tableName = dialectDatabaseMetaData.isSchemaAvailable() ?
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) :
SOURCE_TABLE_NAME;
new E2EIncrementalTask(sourceDataSource, tableName, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
for (int i = 1; i <= 4; i++) {
int orderId = 10000 + i;
containerComposer.proxyExecuteWithLog(String.format("INSERT
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId,
i), 0);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index c77ab2b2004..eab65bb2ce8 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -20,8 +20,6 @@ package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.cdc;
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.cdc.client.util.ProtobufAnyValueConverter;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
@@ -29,6 +27,8 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.test.e2e.data.pipeline.util.SQLBuilderUtils;
@@ -41,7 +41,6 @@ import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -91,7 +90,7 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
return;
}
for (Record each : records) {
- write(each, connection);
+ write(each, connection, records.size() < 5);
}
}
@@ -114,39 +113,46 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
}
}
- private void write(final Record ingestedRecord, final Connection
connection) throws SQLException {
+ private void write(final Record ingestedRecord, final Connection
connection, final boolean printSQL) throws SQLException {
String sql = buildSQL(ingestedRecord);
MetaData metaData = ingestedRecord.getMetaData();
PipelineTableMetaData tableMetaData =
loadTableMetaData(metaData.getSchema(), metaData.getTable());
try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- Map<String, TableColumn> afterMap = new
LinkedHashMap<>(ingestedRecord.getBeforeList().size(), 1F);
- ingestedRecord.getAfterList().forEach(each ->
afterMap.put(each.getName(), each));
+ int updateCount;
switch (ingestedRecord.getDataChangeType()) {
case INSERT:
for (int i = 0; i < ingestedRecord.getAfterCount(); i++) {
TableColumn tableColumn =
ingestedRecord.getAfterList().get(i);
preparedStatement.setObject(i + 1,
convertValueFromAny(tableMetaData, tableColumn));
}
+ updateCount = preparedStatement.executeUpdate();
+ if (1 != updateCount || printSQL) {
+ log.info("Execute insert, update count: {}, sql: {},
values: {}", updateCount, sql,
+
ingestedRecord.getAfterList().stream().map(each ->
convertValueFromAny(tableMetaData, each)).collect(Collectors.toList()));
+ }
break;
case UPDATE:
for (int i = 0; i < ingestedRecord.getAfterCount(); i++) {
TableColumn tableColumn =
ingestedRecord.getAfterList().get(i);
preparedStatement.setObject(i + 1,
convertValueFromAny(tableMetaData, tableColumn));
}
- preparedStatement.setObject(ingestedRecord.getAfterCount()
+ 1, convertValueFromAny(tableMetaData, afterMap.get("order_id")));
+ preparedStatement.setObject(ingestedRecord.getAfterCount()
+ 1, convertValueFromAny(tableMetaData,
getOrderIdTableColumn(ingestedRecord.getAfterList())));
+ updateCount = preparedStatement.executeUpdate();
+ if (1 != updateCount || printSQL) {
+ log.info("Execute update, update count: {}, sql: {},
values: {}", updateCount, sql,
+
ingestedRecord.getAfterList().stream().map(each ->
convertValueFromAny(tableMetaData, each)).collect(Collectors.toList()));
+ }
break;
case DELETE:
- TableColumn orderId =
ingestedRecord.getBeforeList().stream().filter(each ->
"order_id".equals(each.getName())).findFirst()
- .orElseThrow(() -> new
UnsupportedOperationException("No primary key found in the t_order"));
- preparedStatement.setObject(1,
convertValueFromAny(tableMetaData, orderId));
- preparedStatement.execute();
+ Object orderId = convertValueFromAny(tableMetaData,
getOrderIdTableColumn(ingestedRecord.getBeforeList()));
+ preparedStatement.setObject(1, orderId);
+ updateCount = preparedStatement.executeUpdate();
+ if (1 != updateCount || printSQL) {
+ log.info("Execute delete, update count: {}, sql: {},
order_id: {}", updateCount, sql, orderId);
+ }
break;
default:
}
- int updateCount = preparedStatement.executeUpdate();
- if (1 != updateCount) {
- log.warn("executeUpdate failed, update count: {}, sql: {},
updated columns: {}", updateCount, sql, afterMap.keySet());
- }
}
}
@@ -180,6 +186,11 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
}
}
+ private TableColumn getOrderIdTableColumn(final List<TableColumn>
tableColumns) {
+ return tableColumns.stream().filter(each ->
"order_id".equals(each.getName())).findFirst()
+ .orElseThrow(() -> new UnsupportedOperationException("No
primary key found in the t_order"));
+ }
+
private Object convertValueFromAny(final PipelineTableMetaData
tableMetaData, final TableColumn tableColumn) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(tableColumn.getName());
Object result;