This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 39a96e5592b Fix drop streaming failed and improve CDC inventory task 
thread wait intervals (#29453)
39a96e5592b is described below

commit 39a96e5592bc21e778b1f6cd54f0fcabbaa86372
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Dec 19 21:16:12 2023 +0800

    Fix drop streaming failed and improve CDC inventory task thread wait 
intervals (#29453)
    
    * Adjust CDC thread wait intervals
    
    * Fix drop streaming failed
    
    * Fixes needSort flag may not equals
    
    * Use requestBody
---
 .../core/ingest/dumper/InventoryDumper.java        | 18 +++++------
 .../inventory/InventoryRecordsCountCalculator.java |  2 +-
 .../pipeline/cdc/core/importer/CDCImporter.java    | 37 ++++++++++++----------
 .../cdc/core/importer/sink/CDCSocketSink.java      |  2 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  9 +++---
 .../frontend/netty/CDCChannelInboundHandler.java   |  4 +--
 6 files changed, 38 insertions(+), 34 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 3092c75f2f9..ac0b876db8d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -21,10 +21,17 @@ import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPositionFactory;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
@@ -33,17 +40,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPositionFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineInventoryDumpSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -146,7 +146,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 dataRecords.add(new FinishedRecord(new FinishedPosition()));
                 channel.pushRecords(dataRecords);
                 dumpStatement.set(null);
-                log.info("Inventory dump done, rowCount={}", rowCount);
+                log.info("Inventory dump done, rowCount={}, dataSource={}, 
actualTable={}", rowCount, 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
             }
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
index 894d5d55f91..786a5f8ff62 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
@@ -93,7 +93,7 @@ public final class InventoryRecordsCountCalculator {
                 result = resultSet.getLong(1);
             }
         }
-        log.info("getCount cost {} ms, sql: {}", System.currentTimeMillis() - 
startTimeMillis, countSQL);
+        log.info("getCount cost {} ms, sql: {}, count: {}", 
System.currentTimeMillis() - startTimeMillis, countSQL, result);
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 64d27ace25d..b325b546fc2 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -25,18 +25,18 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.core.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 
 import java.util.ArrayList;
@@ -78,11 +78,14 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     @Override
     protected void runBlocking() {
         CDCImporterManager.putImporter(this);
+        for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+            each.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
+        }
         while (isRunning()) {
             if (needSorting) {
-                doWithSorting(originalChannelProgressPairs);
+                doWithSorting();
             } else {
-                doWithoutSorting(originalChannelProgressPairs);
+                doWithoutSorting();
             }
             if (originalChannelProgressPairs.isEmpty()) {
                 break;
@@ -90,36 +93,38 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         }
     }
     
-    private void doWithoutSorting(final List<CDCChannelProgressPair> 
channelProgressPairs) {
-        for (final CDCChannelProgressPair channelProgressPair : 
channelProgressPairs) {
+    private void doWithoutSorting() {
+        for (final CDCChannelProgressPair channelProgressPair : 
originalChannelProgressPairs) {
             PipelineChannel channel = channelProgressPair.getChannel();
             List<Record> records = channel.fetchRecords(batchSize, timeout, 
timeUnit).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
             if (records.isEmpty()) {
                 continue;
             }
+            Record lastRecord = records.get(records.size() - 1);
+            if (lastRecord instanceof FinishedRecord && 
records.stream().noneMatch(DataRecord.class::isInstance)) {
+                channel.ack(records);
+                
channelProgressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
+                originalChannelProgressPairs.remove(channelProgressPair);
+                continue;
+            }
             if (null != rateLimitAlgorithm) {
                 rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
             }
             String ackId = CDCAckId.build(importerId).marshal();
             ackCache.put(ackId, 
Collections.singletonList(Pair.of(channelProgressPair, new 
CDCAckPosition(records.get(records.size() - 1), 
getDataRecordsCount(records)))));
             sink.write(ackId, records);
-            Record lastRecord = records.get(records.size() - 1);
-            if (lastRecord instanceof FinishedRecord && 
records.stream().noneMatch(DataRecord.class::isInstance)) {
-                channel.ack(records);
-                
channelProgressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
-            }
         }
     }
     
     @SneakyThrows(InterruptedException.class)
-    private void doWithSorting(final List<CDCChannelProgressPair> 
channelProgressPairs) {
+    private void doWithSorting() {
         if (null != rateLimitAlgorithm) {
             rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
         }
         CSNRecords firstCsnRecords = null;
         List<CSNRecords> csnRecordsList = new LinkedList<>();
-        for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
-            prepareTransactionRecords(channelProgressPairs);
+        for (int i = 0, count = originalChannelProgressPairs.size(); i < 
count; i++) {
+            prepareTransactionRecords(originalChannelProgressPairs);
             CSNRecords csnRecords = csnRecordsQueue.peek();
             if (null == csnRecords) {
                 continue;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index 45aff5d8222..6a3dff0ce45 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -49,7 +49,7 @@ import java.util.concurrent.locks.ReentrantLock;
 @Slf4j
 public final class CDCSocketSink implements PipelineSink {
     
-    private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
+    private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
     
     private final Lock lock = new ReentrantLock();
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 9b991c3e8e1..f22418276c4 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -41,8 +41,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -53,7 +53,6 @@ import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabase
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -79,7 +78,7 @@ public final class CDCJobPreparer {
         AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
         List<CDCChannelProgressPair> inventoryChannelProgressPairs = new 
CopyOnWriteArrayList<>();
         AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
-        List<CDCChannelProgressPair> incrementalChannelProgressPairs = new 
LinkedList<>();
+        List<CDCChannelProgressPair> incrementalChannelProgressPairs = new 
CopyOnWriteArrayList<>();
         for (CDCJobItemContext each : jobItemContexts) {
             initTasks0(each, inventoryImporterUsed, 
inventoryChannelProgressPairs, incrementalImporterUsed, 
incrementalChannelProgressPairs);
         }
@@ -127,7 +126,7 @@ public final class CDCJobPreparer {
             }
             Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
             Importer importer = importerUsed.get() ? null
-                    : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
+                    : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, 
jobItemContext.getSink(),
                             needSorting(ImporterType.INVENTORY, 
hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
                             importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
@@ -156,7 +155,7 @@ public final class CDCJobPreparer {
         channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
         Dumper dumper = 
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
                 .createIncrementalDumper(dumperContext, 
dumperContext.getCommonContext().getPosition(), channel, 
jobItemContext.getSourceMetaDataLoader());
-        boolean needSorting = needSorting(ImporterType.INCREMENTAL, 
hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
+        boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
         Importer importer = importerUsed.get() ? null
                 : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
                         jobItemContext.getSink(), needSorting, 
importerConfig.getRateLimitAlgorithm());
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 88ded2b79ed..acbd2fdf009 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -210,7 +210,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         StopStreamingRequestBody requestBody = 
request.getStopStreamingRequestBody();
         String database = 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
         checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), database);
-        backendHandler.stopStreaming(connectionContext.getJobId(), 
ctx.channel().id());
+        backendHandler.stopStreaming(requestBody.getStreamingId(), 
ctx.channel().id());
         connectionContext.setJobId(null);
         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
     }
@@ -218,7 +218,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     private void processDropStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
         DropStreamingRequestBody requestBody = 
request.getDropStreamingRequestBody();
         checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
-        backendHandler.dropStreaming(connectionContext.getJobId());
+        backendHandler.dropStreaming(requestBody.getStreamingId());
         connectionContext.setJobId(null);
         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
     }

Reply via email to