azexcy commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1200003281


##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -305,15 +309,25 @@ private final class MySQLBinlogEventHandler extends 
ChannelInboundHandlerAdapter
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
+        @SuppressWarnings("unchecked")
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object 
msg) throws Exception {
             if (!running) {
                 return;
             }
+            reconnectTimes.set(0);
+            if (msg instanceof List) {
+                List<AbstractBinlogEvent> records = 
(List<AbstractBinlogEvent>) msg;
+                if (records.isEmpty()) {
+                    log.warn("The records is empty");
+                    return;
+                }
+                lastBinlogEvent.set(records.get(records.size() - 1));
+                blockingEventQueue.put(records);

Review Comment:
   It's guaranteed by previous handler `MySQLBinlogEventPacketDecoder`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to