sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1199516978


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java:
##########
@@ -48,23 +49,25 @@ public MultiplexMemoryPipelineChannel(final int 
channelNumber, final int blockQu
     }
     
     @Override
-    public void pushRecord(final Record record) {
-        if (FinishedRecord.class.equals(record.getClass())) {
-            for (int i = 0; i < channelNumber; i++) {
-                pushRecord(record, i);
+    public void pushRecords(final List<Record> records) {
+        for (Record each : records) {
+            if (FinishedRecord.class.equals(each.getClass())) {
+                for (int i = 0; i < channelNumber; i++) {
+                    pushRecord(each, i);
+                }
+            } else if (DataRecord.class.equals(each.getClass())) {
+                pushRecord(each, Math.abs(each.hashCode() % channelNumber));

Review Comment:
   Could we merge data records into collections



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -64,7 +66,7 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
     
     private final boolean decodeWithTX;
     
-    private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+    private List<AbstractRowEvent> walEvents = new LinkedList<>();

Review Comment:
   `walEvents` could be `rowEvents`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -44,23 +44,29 @@ public SimpleMemoryPipelineChannel(final int 
blockQueueSize, final AckCallback a
     
     @SneakyThrows(InterruptedException.class)
     @Override
-    public void pushRecord(final Record dataRecord) {
-        queue.put(dataRecord);
+    public void pushRecords(final List<Record> records) {
+        queue.put(records);
     }
     
     @SneakyThrows(InterruptedException.class)
     // TODO thread-safe?
     @Override
     public List<Record> fetchRecords(final int batchSize, final int timeout, 
final TimeUnit timeUnit) {
-        List<Record> result = new ArrayList<>(batchSize);
+        List<Record> result = new LinkedList<>();
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(100L);

Review Comment:
   Could we verify whether `timeout` is less than `100` milliseconds?



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -184,6 +234,29 @@ private void initRowsEvent(final AbstractRowsEvent 
rowsEvent, final MySQLBinlogE
         rowsEvent.setServerId(binlogEventHeader.getServerId());
     }
     
+    private QueryEvent decodeQueryEvent(final int checksumLength, final 
MySQLPacketPayload payload) {
+        int threadId = payload.readInt4();
+        int executionTime = payload.readInt4();
+        // length of the name of the database
+        payload.skipReserved(1);
+        int errorCode = payload.readInt2();
+        // status variables block
+        payload.skipReserved(payload.readInt2());
+        String databaseName = payload.readStringNul();
+        String sql = 
payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
+        return new QueryEvent(threadId, executionTime, errorCode, 
databaseName, sql);
+    }
+    
+    private XidEvent decodeXidEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
+        XidEvent result = new XidEvent(payload.readInt8());
+        result.setFileName(binlogContext.getFileName());
+        result.setPosition(binlogEventHeader.getLogPos());
+        result.setTimestamp(binlogEventHeader.getTimestamp());
+        result.setServerId(binlogEventHeader.getServerId());
+        return result;
+    }
+    
+    // TODO May be used again later, keep this method first.

Review Comment:
   TODO could be removed



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java:
##########
@@ -122,10 +121,8 @@ public void onFailure(final Throwable throwable) {
     
     private PipelineChannel createChannel(final PipelineChannelCreator 
pipelineChannelCreator) {
         return pipelineChannelCreator.createPipelineChannel(1, records -> {
-            Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
-            if (null != lastNormalRecord) {
-                position.set(lastNormalRecord.getPosition());
-            }
+            Record lastRecord = records.get(records.size() - 1);
+            position.set(lastRecord.getPosition());

Review Comment:
   If there's FinishedRecord, could we set it to position?
   
   Looks `RecordUtils.getLastNormalRecord` could be kept to handle 
PlaceholderRecord, and also be updated to handle FinishedRecord?
   



##########
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfiguration.java:
##########
@@ -56,6 +56,7 @@ public final class YamlMigrationJobConfiguration implements 
YamlPipelineJobConfi
     
     private List<String> jobShardingDataNodes;
     
+    // TODO remove later

Review Comment:
   TODO could be removed



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new 
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            rowEvents = new ArrayList<>();
+            return;
+        }

Review Comment:
   Could we set `rowEvents = new ArrayList<>();` on CommitTXEvent, but wait 
next BeginTXEvent



##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -77,14 +79,16 @@ public final class MySQLClient {
     
     private Promise<Object> responseCallback;
     
-    private final ArrayBlockingQueue<AbstractBinlogEvent> blockingEventQueue = 
new ArrayBlockingQueue<>(10000);
+    private final ArrayBlockingQueue<List<AbstractBinlogEvent>> 
blockingEventQueue = new ArrayBlockingQueue<>(200);

Review Comment:
   From `10000` to `200`, is it enough? Though it's list, what's the average 
list size



##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new 
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            rowEvents = new ArrayList<>();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            rowEvents.add((AbstractRowEvent) event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : rowEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            records.add(walEventConverter.convert(event));
+            channel.pushRecords(records);
+        }
+    }
+    
+    private void processEventIgnoreTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {

Review Comment:
   Should CommitTXEvent be ignored?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to