sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198753186
##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -305,15 +309,25 @@ private final class MySQLBinlogEventHandler extends
ChannelInboundHandlerAdapter
this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
}
+ @SuppressWarnings("unchecked")
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object
msg) throws Exception {
if (!running) {
return;
}
+ reconnectTimes.set(0);
+ if (msg instanceof List) {
+ List<AbstractBinlogEvent> records =
(List<AbstractBinlogEvent>) msg;
+ if (records.isEmpty()) {
+ log.warn("The records is empty");
+ return;
+ }
+ lastBinlogEvent.set(records.get(records.size() - 1));
+ blockingEventQueue.put(records);
Review Comment:
When is `MySQLClient.MySQLBinlogEventHandler.channelRead` invoked, looks
it's not guaranteed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]