sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1198610893


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java:
##########
@@ -76,39 +79,37 @@ private static void saveAckPosition(final 
Map<SocketSinkImporter, CDCAckPosition
         }
     }
     
-    private static DataRecord findMinimumDataRecordWithComparator(final 
Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
-                                                                  final 
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final 
Comparator<DataRecord> dataRecordComparator) {
-        Map<SocketSinkImporter, DataRecord> waitSortedMap = new HashMap<>();
-        for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : 
incrementalRecordMap.entrySet()) {
-            Record peek = entry.getValue().peek();
+    private static List<DataRecord> findMinimumDataRecordWithComparator(final 
Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+                                                                        final 
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final 
Comparator<DataRecord> dataRecordComparator) {
+        Map<SocketSinkImporter, List<DataRecord>> waitSortedMap = new 
HashMap<>();
+        for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry 
: incrementalRecordMap.entrySet()) {
+            List<DataRecord> peek = entry.getValue().peek();
             if (null == peek) {
                 continue;
             }
-            if (peek instanceof DataRecord) {
-                waitSortedMap.put(entry.getKey(), (DataRecord) peek);
-            }
+            waitSortedMap.put(entry.getKey(), peek);
         }
         if (waitSortedMap.isEmpty()) {
-            return null;
+            return Collections.emptyList();
         }
-        DataRecord minRecord = null;
+        List<DataRecord> minRecords = null;
         SocketSinkImporter belongImporter = null;
-        for (Entry<SocketSinkImporter, DataRecord> entry : 
waitSortedMap.entrySet()) {
-            if (null == minRecord) {
-                minRecord = entry.getValue();
+        for (Entry<SocketSinkImporter, List<DataRecord>> entry : 
waitSortedMap.entrySet()) {
+            if (null == minRecords) {
+                minRecords = entry.getValue();
                 belongImporter = entry.getKey();
                 continue;
             }
-            if (dataRecordComparator.compare(minRecord, entry.getValue()) > 0) 
{
-                minRecord = entry.getValue();
+            if (dataRecordComparator.compare(minRecords.get(0), 
entry.getValue().get(0)) > 0) {
+                minRecords = entry.getValue();
                 belongImporter = entry.getKey();
             }
         }
-        if (null == minRecord) {
-            return null;
+        if (null == minRecords) {
+            return Collections.emptyList();
         }
         incrementalRecordMap.get(belongImporter).poll();
-        saveAckPosition(cdcAckPositionMap, belongImporter, minRecord);
-        return minRecord;
+        saveAckPosition(cdcAckPositionMap, belongImporter, 
minRecords.get(minRecords.size() - 1));
+        return minRecords;

Review Comment:
   `return minRecords` could be `return result`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -54,13 +54,22 @@ public void pushRecord(final Record dataRecord) {
     public List<Record> fetchRecords(final int batchSize, final int timeout, 
final TimeUnit timeUnit) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Is `records` possible to null?



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java:
##########
@@ -94,24 +93,11 @@ private IncrementalTaskProgress 
createIncrementalTaskProgress(final IngestPositi
         return result;
     }
     
-    private Collection<Importer> createImporters(final int concurrency, final 
ImporterConfiguration importerConfig, final ImporterConnector 
importerConnector, final PipelineChannel channel,
-                                                 final 
PipelineJobProgressListener jobProgressListener) {
-        Collection<Importer> result = new LinkedList<>();
-        for (int i = 0; i < concurrency; i++) {
-            result.add(TypedSPILoader.getService(ImporterCreator.class, 
importerConnector.getType()).createImporter(importerConfig, importerConnector, 
channel, jobProgressListener,
-                    ImporterType.INCREMENTAL));
-        }
-        return result;
-    }
-    
-    private PipelineChannel createChannel(final int concurrency, final 
PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress 
progress) {
-        return pipelineChannelCreator.createPipelineChannel(concurrency, 
records -> {
+    private PipelineChannel createChannel(final PipelineChannelCreator 
pipelineChannelCreator, final IncrementalTaskProgress progress) {
+        return pipelineChannelCreator.createPipelineChannel(1, records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
-            if (!(lastHandledRecord.getPosition() instanceof 
PlaceholderPosition)) {
-                progress.setPosition(lastHandledRecord.getPosition());
-                
progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
-            }
-            
progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
+            progress.setPosition(lastHandledRecord.getPosition());
+            
progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());

Review Comment:
   Is there `PlaceholderPosition` in `records`? Looks `setLastEventTimestamps` 
could not be invoked for `PlaceholderPosition`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java:
##########
@@ -122,10 +121,8 @@ public void onFailure(final Throwable throwable) {
     
     private PipelineChannel createChannel(final PipelineChannelCreator 
pipelineChannelCreator) {
         return pipelineChannelCreator.createPipelineChannel(1, records -> {
-            Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
-            if (null != lastNormalRecord) {
-                position.set(lastNormalRecord.getPosition());
-            }
+            Record lastRecord = records.get(records.size() - 1);
+            position.set(lastRecord.getPosition());

Review Comment:
   Could `getLastNormalRecord` method be removed in `RecordUtils`?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java:
##########
@@ -97,46 +101,58 @@ protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), 
binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (null == events) {
                 continue;
             }

Review Comment:
   Will `events` be null? Could we make it un-nullable



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -104,13 +146,16 @@ private AbstractBinlogEvent decodeEvent(final 
MySQLPacketPayload payload, final
             case DELETE_ROWS_EVENT_V1:
             case DELETE_ROWS_EVENT_V2:
                 return decodeDeleteRowsEventV2(binlogEventHeader, payload);
+            case QUERY_EVENT:
+                return decodeQueryEvent(binlogEventHeader.getChecksumLength(), 
payload);
+            case XID_EVENT:
+                return decodeXidEvent(binlogEventHeader, payload);
             default:
-                PlaceholderEvent result = 
createPlaceholderEvent(binlogEventHeader);

Review Comment:
   Why not keep `createPlaceholderEvent` to make save position growing, to 
reduce binlog replay after job restarting



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +98,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new 
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            walEvents.clear();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            walEvents.add(event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            walEvents.add(event);
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : walEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            channel.pushRecords(records);
+        }

Review Comment:
   Why `event` is put to `channel` in `OpenGaussWALDumper` but not here



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java:
##########
@@ -139,9 +146,13 @@ private void dump(final PipelineTableMetaData 
tableMetaData, final Connection co
                         rateLimitAlgorithm.intercept(JobOperationType.SELECT, 
1);
                     }
                 }
+                if (!dataRecords.isEmpty()) {
+                    channel.pushRecords(dataRecords);
+                }
                 dumpStatement.set(null);
                 log.info("Inventory dump done, rowCount={}", rowCount);
             }
+            

Review Comment:
   Empty line could be removed



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -54,13 +54,22 @@ public void pushRecord(final Record dataRecord) {
     public List<Record> fetchRecords(final int batchSize, final int timeout, 
final TimeUnit timeUnit) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);
+            } else {
+                recordsCount += records.size();
+                result.addAll(records);
+            }
+            if (recordsCount >= batchSize) {
+                return result;
+            }

Review Comment:
   Looks `if (recordsCount >= batchSize)` is not required



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java:
##########
@@ -65,24 +63,25 @@ public final class IncrementalTask implements PipelineTask, 
AutoCloseable {
     
     private final Dumper dumper;
     
-    private final Collection<Importer> importers;
+    private final Importer importer;
     
     @Getter
     private final IncrementalTaskProgress taskProgress;
     
     // TODO simplify parameters
-    public IncrementalTask(final int concurrency, final DumperConfiguration 
dumperConfig, final ImporterConfiguration importerConfig,
+    public IncrementalTask(final DumperConfiguration dumperConfig, final 
ImporterConfiguration importerConfig,

Review Comment:
   Is `concurrency` not needed any more?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java:
##########
@@ -97,46 +101,58 @@ protected void runBlocking() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), 
binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (null == events) {
                 continue;
             }
-            handleEvent(event);
+            handleEvent(events);
         }
-        channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
+        channel.pushRecords(Collections.singletonList(new FinishedRecord(new 
FinishedPosition())));
     }
     
-    private void handleEvent(final AbstractBinlogEvent event) {
-        if (event instanceof PlaceholderEvent || !((AbstractRowsEvent) 
event).getDatabaseName().equals(catalog) || 
!dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
-            createPlaceholderRecord(event);
-            return;
-        }
-        PipelineTableMetaData tableMetaData = 
getPipelineTableMetaData(((AbstractRowsEvent) event).getTableName());
-        if (event instanceof WriteRowsEvent) {
-            handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
-            return;
+    private void handleEvent(final List<AbstractBinlogEvent> events) {

Review Comment:
   Could we add new method `handleEvents` and keep `handleEvent`?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -69,11 +78,44 @@ protected void decode(final ChannelHandlerContext ctx, 
final ByteBuf in, final L
                 in.resetReaderIndex();
                 break;
             }
-            Optional.ofNullable(decodeEvent(payload, 
binlogEventHeader)).ifPresent(out::add);
+            AbstractBinlogEvent binlogEvent = decodeEvent(payload, 
binlogEventHeader);
+            if (null == binlogEvent) {
+                skipChecksum(binlogEventHeader.getEventType(), in);
+                return;
+            }
+            if (decodeWithTX) {
+                processEventWithTX(binlogEvent, out);
+            } else {
+                processEventIgnoreTX(binlogEvent, out);
+            }
             skipChecksum(binlogEventHeader.getEventType(), in);
         }
     }
     
+    private void processEventWithTX(final AbstractBinlogEvent binlogEvent, 
final List<Object> out) {
+        if (binlogEvent instanceof QueryEvent) {
+            QueryEvent queryEvent = (QueryEvent) binlogEvent;
+            if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
+                records.clear();
+            }

Review Comment:
   When begin SQL reach, `records` might not be empty, could we just clear it?



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -115,29 +117,31 @@ private PgConnection getReplicationConnectionUnwrap() 
throws SQLException {
     }
     
     private void processEventWithTX(final AbstractWALEvent event) {
-        if (event instanceof AbstractRowEvent) {
-            rowEvents.add((AbstractRowEvent) event);
+        if (event instanceof BeginTXEvent) {
+            walEvents.clear();
             return;

Review Comment:
   Verify: `walEvents` might be not empty on clearing



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +98,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new 
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            walEvents.clear();

Review Comment:
   Verify: `walEvents` might not be empty on clearing



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -305,15 +309,25 @@ private final class MySQLBinlogEventHandler extends 
ChannelInboundHandlerAdapter
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
+        @SuppressWarnings("unchecked")
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object 
msg) throws Exception {
             if (!running) {
                 return;
             }
+            reconnectTimes.set(0);
+            if (msg instanceof List) {
+                List<AbstractBinlogEvent> records = 
(List<AbstractBinlogEvent>) msg;
+                if (records.isEmpty()) {
+                    log.warn("The records is empty");
+                    return;
+                }
+                lastBinlogEvent.set(records.get(records.size() - 1));
+                blockingEventQueue.put(records);

Review Comment:
   Does `records` include whole transaction's records?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Query event.
+ */
+@Getter
+@Setter
+public final class QueryEvent extends AbstractBinlogEvent {
+    
+    private long threadId;
+    
+    private long executionTime;

Review Comment:
   It's better to use `final` for `QueryEvent` fields



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Xid event.

Review Comment:
   It's better to explain the meaning. Does it mean end of transaction?



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -64,7 +66,7 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
     
     private final boolean decodeWithTX;
     
-    private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+    private final List<AbstractRowEvent> walEvents = new LinkedList<>();

Review Comment:
   It's better to keep `rowEvents`, since the element is `AbstractRowEvent`



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Query event.

Review Comment:
   It's better to explain the meaning. Does it mean begin of transaction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to