This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b72b03d30c Improve processed records count calculatation at pipeline 
increment task. (#21513)
3b72b03d30c is described below

commit 3b72b03d30cd8919136a3ef3d6f548c3e17baf82
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Oct 12 11:00:55 2022 +0800

    Improve processed records count calculatation at pipeline increment task. 
(#21513)
    
    * Calculate processed records count before merged operation
    
    * Fix codestyle
    
    * Add null check
---
 .../data/pipeline/core/importer/DefaultImporter.java      | 15 +++++++++------
 .../data/pipeline/mysql/ingest/client/MySQLClient.java    |  6 +++++-
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 699fc6d1878..718bc121d18 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -31,8 +31,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -115,17 +115,20 @@ public final class DefaultImporter extends 
AbstractLifecycleExecutor implements
     }
     
     private PipelineJobProgressUpdatedParameter flush(final DataSource 
dataSource, final List<Record> buffer) {
-        List<GroupedDataRecord> result = 
MERGER.group(buffer.stream().filter(each -> each instanceof 
DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+        List<DataRecord> dataRecords = buffer.stream().filter(each -> each 
instanceof DataRecord).map(each -> (DataRecord) 
each).collect(Collectors.toList());
         int insertRecordNumber = 0;
-        int deleteRecordNumber = 0;
+        for (DataRecord each : dataRecords) {
+            if (IngestDataChangeType.INSERT.equals(each.getType())) {
+                insertRecordNumber++;
+            }
+        }
+        List<GroupedDataRecord> result = MERGER.group(dataRecords);
         for (GroupedDataRecord each : result) {
-            deleteRecordNumber += null != each.getDeleteDataRecords() ? 
each.getDeleteDataRecords().size() : 0;
             flushInternal(dataSource, each.getDeleteDataRecords());
-            insertRecordNumber += null != each.getInsertDataRecords() ? 
each.getInsertDataRecords().size() : 0;
             flushInternal(dataSource, each.getInsertDataRecords());
             flushInternal(dataSource, each.getUpdateDataRecords());
         }
-        return new PipelineJobProgressUpdatedParameter(insertRecordNumber - 
deleteRecordNumber);
+        return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
     }
     
     private void flushInternal(final DataSource dataSource, final 
List<DataRecord> buffer) {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 39c2144f44b..d532995c150 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -298,7 +298,7 @@ public final class MySQLClient {
             log.error("MySQLBinlogEventHandler protocol resolution error, file 
name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
@@ -306,6 +306,10 @@ public final class MySQLClient {
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent || null == 
lastBinlogEvent.getFileName()) {
+                log.warn("last binlog event is null or the file name is null, 
last binlog event:{}", lastBinlogEvent);
+                return;
+            }
             log.info("reconnect MySQL client, retry times={}", retryTimes);
             closeChannel();
             connect();

Reply via email to