sandynz commented on code in PR #21513:
URL: https://github.com/apache/shardingsphere/pull/21513#discussion_r992920684
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java:
##########
@@ -115,13 +115,20 @@ private void write() {
}
private PipelineJobProgressUpdatedParameter flush(final DataSource
dataSource, final List<Record> buffer) {
- List<GroupedDataRecord> result =
MERGER.group(buffer.stream().filter(each -> each instanceof
DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+ List<DataRecord> dataRecords = buffer.stream().filter(each -> each
instanceof DataRecord).map(each -> (DataRecord)
each).collect(Collectors.toList());
int insertRecordNumber = 0;
int deleteRecordNumber = 0;
+ for (DataRecord each : dataRecords) {
+ if (IngestDataChangeType.INSERT.equals(each.getType())) {
+ insertRecordNumber++;
+ }
+ if (IngestDataChangeType.DELETE.equals(each.getType())) {
+ deleteRecordNumber++;
+ }
Review Comment:
Currently, `processedRecordsCount = insertRecordNumber - deleteRecordNumber`.
Looks `deleteRecordNumber` should be ignored, since the deleted records must
be inserted before on migration case, so it's already processed.
##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -298,14 +298,18 @@ public void exceptionCaught(final ChannelHandlerContext
ctx, final Throwable cau
log.error("MySQLBinlogEventHandler protocol resolution error, file
name:{}, position:{}", fileName, position, cause);
reconnect();
}
-
+
private void reconnect() {
if (reconnectTimes.get() > 3) {
log.warn("exceeds the maximum number of retry times, last
binlog event:{}", lastBinlogEvent);
running = false;
return;
}
int retryTimes = reconnectTimes.incrementAndGet();
+ if (null == lastBinlogEvent.getFileName()) {
+ log.warn("can't get file name from binlog event, last binlog
event:{}", lastBinlogEvent);
+ return;
+ }
Review Comment:
How it occur?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]