sandynz commented on code in PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#discussion_r992920684


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java:
##########
@@ -115,13 +115,20 @@ private void write() {
     }
     
     private PipelineJobProgressUpdatedParameter flush(final DataSource 
dataSource, final List<Record> buffer) {
-        List<GroupedDataRecord> result = 
MERGER.group(buffer.stream().filter(each -> each instanceof 
DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+        List<DataRecord> dataRecords = buffer.stream().filter(each -> each 
instanceof DataRecord).map(each -> (DataRecord) 
each).collect(Collectors.toList());
         int insertRecordNumber = 0;
         int deleteRecordNumber = 0;
+        for (DataRecord each : dataRecords) {
+            if (IngestDataChangeType.INSERT.equals(each.getType())) {
+                insertRecordNumber++;
+            }
+            if (IngestDataChangeType.DELETE.equals(each.getType())) {
+                deleteRecordNumber++;
+            }

Review Comment:
   Currently, `processedRecordsCount = insertRecordNumber - deleteRecordNumber`.
   Looks `deleteRecordNumber` should be ignored, since the deleted records must 
be inserted before on migration case, so it's already processed.



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -298,14 +298,18 @@ public void exceptionCaught(final ChannelHandlerContext 
ctx, final Throwable cau
             log.error("MySQLBinlogEventHandler protocol resolution error, file 
name:{}, position:{}", fileName, position, cause);
             reconnect();
         }
-        
+    
         private void reconnect() {
             if (reconnectTimes.get() > 3) {
                 log.warn("exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
                 running = false;
                 return;
             }
             int retryTimes = reconnectTimes.incrementAndGet();
+            if (null == lastBinlogEvent.getFileName()) {
+                log.warn("can't get file name from binlog event, last binlog 
event:{}", lastBinlogEvent);
+                return;
+            }

Review Comment:
   How it occur?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to