sandynz commented on code in PR #25787:
URL: https://github.com/apache/shardingsphere/pull/25787#discussion_r1199516978
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java:
##########
@@ -48,23 +49,25 @@ public MultiplexMemoryPipelineChannel(final int
channelNumber, final int blockQu
}
@Override
- public void pushRecord(final Record record) {
- if (FinishedRecord.class.equals(record.getClass())) {
- for (int i = 0; i < channelNumber; i++) {
- pushRecord(record, i);
+ public void pushRecords(final List<Record> records) {
+ for (Record each : records) {
+ if (FinishedRecord.class.equals(each.getClass())) {
+ for (int i = 0; i < channelNumber; i++) {
+ pushRecord(each, i);
+ }
+ } else if (DataRecord.class.equals(each.getClass())) {
+ pushRecord(each, Math.abs(each.hashCode() % channelNumber));
Review Comment:
Could we merge data records into collections
##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -64,7 +66,7 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
private final boolean decodeWithTX;
- private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+ private List<AbstractRowEvent> walEvents = new LinkedList<>();
Review Comment:
`walEvents` could be `rowEvents`
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java:
##########
@@ -44,23 +44,29 @@ public SimpleMemoryPipelineChannel(final int
blockQueueSize, final AckCallback a
@SneakyThrows(InterruptedException.class)
@Override
- public void pushRecord(final Record dataRecord) {
- queue.put(dataRecord);
+ public void pushRecords(final List<Record> records) {
+ queue.put(records);
}
@SneakyThrows(InterruptedException.class)
// TODO thread-safe?
@Override
public List<Record> fetchRecords(final int batchSize, final int timeout,
final TimeUnit timeUnit) {
- List<Record> result = new ArrayList<>(batchSize);
+ List<Record> result = new LinkedList<>();
long start = System.currentTimeMillis();
- while (batchSize > queue.size()) {
+ int recordsCount = 0;
+ while (batchSize > recordsCount) {
+ List<Record> records = queue.poll();
+ if (null == records || records.isEmpty()) {
+ TimeUnit.MILLISECONDS.sleep(100L);
Review Comment:
Could we verify whether `timeout` is less than `100` milliseconds?
##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -184,6 +234,29 @@ private void initRowsEvent(final AbstractRowsEvent
rowsEvent, final MySQLBinlogE
rowsEvent.setServerId(binlogEventHeader.getServerId());
}
+ private QueryEvent decodeQueryEvent(final int checksumLength, final
MySQLPacketPayload payload) {
+ int threadId = payload.readInt4();
+ int executionTime = payload.readInt4();
+ // length of the name of the database
+ payload.skipReserved(1);
+ int errorCode = payload.readInt2();
+ // status variables block
+ payload.skipReserved(payload.readInt2());
+ String databaseName = payload.readStringNul();
+ String sql =
payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
+ return new QueryEvent(threadId, executionTime, errorCode,
databaseName, sql);
+ }
+
+ private XidEvent decodeXidEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
+ XidEvent result = new XidEvent(payload.readInt8());
+ result.setFileName(binlogContext.getFileName());
+ result.setPosition(binlogEventHeader.getLogPos());
+ result.setTimestamp(binlogEventHeader.getTimestamp());
+ result.setServerId(binlogEventHeader.getServerId());
+ return result;
+ }
+
+ // TODO May be used again later, keep this method first.
Review Comment:
TODO could be removed
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java:
##########
@@ -122,10 +121,8 @@ public void onFailure(final Throwable throwable) {
private PipelineChannel createChannel(final PipelineChannelCreator
pipelineChannelCreator) {
return pipelineChannelCreator.createPipelineChannel(1, records -> {
- Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
- if (null != lastNormalRecord) {
- position.set(lastNormalRecord.getPosition());
- }
+ Record lastRecord = records.get(records.size() - 1);
+ position.set(lastRecord.getPosition());
Review Comment:
If there's FinishedRecord, could we set it to position?
Looks `RecordUtils.getLastNormalRecord` could be kept to handle
PlaceholderRecord, and also be updated to handle FinishedRecord?
##########
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfiguration.java:
##########
@@ -56,6 +56,7 @@ public final class YamlMigrationJobConfiguration implements
YamlPipelineJobConfi
private List<String> jobShardingDataNodes;
+ // TODO remove later
Review Comment:
TODO could be removed
##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
continue;
}
AbstractWALEvent event = decodingPlugin.decode(message, new
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
- channel.pushRecord(walEventConverter.convert(event));
+ if (decodeWithTX) {
+ processEventWithTX(event);
+ } else {
+ processEventIgnoreTX(event);
+ }
}
} catch (final SQLException ex) {
throw new IngestException(ex);
}
}
+ private void processEventWithTX(final AbstractWALEvent event) {
+ if (event instanceof BeginTXEvent) {
+ rowEvents = new ArrayList<>();
+ return;
+ }
Review Comment:
Could we set `rowEvents = new ArrayList<>();` on CommitTXEvent, but wait
next BeginTXEvent
##########
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java:
##########
@@ -77,14 +79,16 @@ public final class MySQLClient {
private Promise<Object> responseCallback;
- private final ArrayBlockingQueue<AbstractBinlogEvent> blockingEventQueue =
new ArrayBlockingQueue<>(10000);
+ private final ArrayBlockingQueue<List<AbstractBinlogEvent>>
blockingEventQueue = new ArrayBlockingQueue<>(200);
Review Comment:
From `10000` to `200`, is it enough? Though it's list, what's the average
list size
##########
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java:
##########
@@ -86,13 +99,43 @@ protected void runBlocking() {
continue;
}
AbstractWALEvent event = decodingPlugin.decode(message, new
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
- channel.pushRecord(walEventConverter.convert(event));
+ if (decodeWithTX) {
+ processEventWithTX(event);
+ } else {
+ processEventIgnoreTX(event);
+ }
}
} catch (final SQLException ex) {
throw new IngestException(ex);
}
}
+ private void processEventWithTX(final AbstractWALEvent event) {
+ if (event instanceof BeginTXEvent) {
+ rowEvents = new ArrayList<>();
+ return;
+ }
+ if (event instanceof AbstractRowEvent) {
+ rowEvents.add((AbstractRowEvent) event);
+ return;
+ }
+ if (event instanceof CommitTXEvent) {
+ List<Record> records = new LinkedList<>();
+ for (AbstractWALEvent each : rowEvents) {
+ records.add(walEventConverter.convert(each));
+ }
+ records.add(walEventConverter.convert(event));
+ channel.pushRecords(records);
+ }
+ }
+
+ private void processEventIgnoreTX(final AbstractWALEvent event) {
+ if (event instanceof BeginTXEvent) {
Review Comment:
Should CommitTXEvent be ignored?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]