This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7dd6caf4801 Refactor pipeline channel, read records by transaction at
increment task (#25787)
7dd6caf4801 is described below
commit 7dd6caf48018ef390780545b648b13f251464673
Author: Xinze Guo <[email protected]>
AuthorDate: Tue May 23 20:35:18 2023 +0800
Refactor pipeline channel, read records by transaction at increment task
(#25787)
* Refactor pipeline channel, read record by transaction
* Add MySQL decode tx switch
* Refactor importers to importer
* Simple code and rollback MultiplexMemoryPipelineChannel
* Improve MultiplexMemoryPipelineChannel and fix codestyle
* Improve comment refer url
---
.../api/ingest/channel/PipelineChannel.java | 4 +-
.../connector/SocketSinkImporterConnector.java | 23 +++++--
.../cdc/core/importer/SocketSinkImporter.java | 2 +-
.../data/pipeline/cdc/util/CDCDataRecordUtils.java | 59 ++++++++--------
.../yaml/job/YamlCDCJobConfigurationSwapper.java | 2 +-
.../pipeline/cdc/util/CDCDataRecordUtilsTest.java | 30 ++++----
.../pipeline/core/importer/DataSourceImporter.java | 29 ++++----
.../memory/MemoryPipelineChannelCreator.java | 5 +-
.../memory/MultiplexMemoryPipelineChannel.java | 34 +++++++---
.../memory/SimpleMemoryPipelineChannel.java | 22 +++---
.../core/ingest/dumper/InventoryDumper.java | 14 +++-
.../data/pipeline/core/record/RecordUtils.java | 19 ------
.../data/pipeline/core/task/IncrementalTask.java | 2 +-
.../data/pipeline/core/task/InventoryTask.java | 13 ++--
.../spi/ingest/channel/PipelineChannelCreator.java | 3 +-
.../memory/MemoryPipelineChannelCreatorTest.java | 4 +-
.../memory/MultiplexMemoryPipelineChannelTest.java | 4 +-
.../memory/SimpleMemoryPipelineChannelTest.java | 41 +++++++++++
.../data/pipeline/core/record/RecordUtilsTest.java | 17 -----
.../mysql/ingest/MySQLIncrementalDumper.java | 79 +++++++++++++++-------
.../mysql/ingest/binlog/event/QueryEvent.java} | 35 ++++++----
.../mysql/ingest/binlog/event/XidEvent.java} | 24 +++----
.../pipeline/mysql/ingest/client/MySQLClient.java | 30 ++++++--
.../netty/MySQLBinlogEventPacketDecoder.java | 77 ++++++++++++++++++++-
.../mysql/ingest/MySQLIncrementalDumperTest.java | 45 ++++++------
.../mysql/ingest/client/MySQLClientTest.java | 2 +-
.../netty/MySQLBinlogEventPacketDecoderTest.java | 26 ++++---
.../opengauss/ingest/OpenGaussWALDumper.java | 20 +++---
.../postgresql/ingest/PostgreSQLWALDumper.java | 45 +++++++++++-
.../ingest/wal/decode/TestDecodingPlugin.java | 11 ++-
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 6 +-
.../core/importer/DataSourceImporterTest.java | 3 +-
32 files changed, 476 insertions(+), 254 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
index 5fa2b8ce353..25ce5a44a59 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
@@ -30,9 +30,9 @@ public interface PipelineChannel {
/**
* Push {@code DataRecord} into channel.
*
- * @param dataRecord data
+ * @param dataRecords data records
*/
- void pushRecord(Record dataRecord);
+ void pushRecords(List<Record> dataRecords);
/**
* Fetch {@code Record} list from channel.
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index b326565741e..905adc5fef8 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -34,13 +34,13 @@ import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerato
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtils;
import
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -78,7 +78,7 @@ public final class SocketSinkImporterConnector implements
ImporterConnector, Aut
private final Map<String, String> tableNameSchemaMap = new HashMap<>();
- private final Map<SocketSinkImporter, BlockingQueue<Record>>
incrementalRecordMap = new ConcurrentHashMap<>();
+ private final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>>
incrementalRecordMap = new ConcurrentHashMap<>();
private final AtomicInteger runningIncrementalTaskCount = new
AtomicInteger(0);
@@ -120,7 +120,7 @@ public final class SocketSinkImporterConnector implements
ImporterConnector, Aut
return;
}
Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap =
new HashMap<>();
- importerDataRecordMap.put(socketSinkImporter, new
CDCAckPosition(RecordUtils.getLastNormalRecord(recordList), dataRecordCount));
+ importerDataRecordMap.put(socketSinkImporter, new
CDCAckPosition(lastRecord, dataRecordCount));
writeImmediately(recordList, importerDataRecordMap);
} else if (ImporterType.INCREMENTAL == importerType) {
writeIntoQueue(recordList, socketSinkImporter);
@@ -160,12 +160,21 @@ public final class SocketSinkImporterConnector implements
ImporterConnector, Aut
@SneakyThrows(InterruptedException.class)
private void writeIntoQueue(final List<Record> dataRecords, final
SocketSinkImporter socketSinkImporter) {
- BlockingQueue<Record> blockingQueue =
incrementalRecordMap.get(socketSinkImporter);
+ BlockingQueue<List<DataRecord>> blockingQueue =
incrementalRecordMap.get(socketSinkImporter);
if (null == blockingQueue) {
log.warn("not find the queue to write");
return;
}
+ Map<Long, List<DataRecord>> recordsMap = new LinkedHashMap<>();
for (Record each : dataRecords) {
+ if (!(each instanceof DataRecord)) {
+ continue;
+ }
+ DataRecord dataRecord = (DataRecord) each;
+ // TODO need improve if support global transaction
+ recordsMap.computeIfAbsent(dataRecord.getCsn(), ignored -> new
LinkedList<>()).add(dataRecord);
+ }
+ for (List<DataRecord> each : recordsMap.values()) {
blockingQueue.put(each);
}
}
@@ -223,11 +232,11 @@ public final class SocketSinkImporterConnector implements
ImporterConnector, Aut
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap =
new HashMap<>();
List<DataRecord> dataRecords = new LinkedList<>();
for (int i = 0; i < batchSize; i++) {
- DataRecord minimumDataRecord =
CDCDataRecordUtils.findMinimumDataRecordAndSavePosition(incrementalRecordMap,
dataRecordComparator, cdcAckPositionMap);
- if (null == minimumDataRecord) {
+ List<DataRecord> minimumRecords =
CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(incrementalRecordMap,
dataRecordComparator, cdcAckPositionMap);
+ if (minimumRecords.isEmpty()) {
break;
}
- dataRecords.add(minimumDataRecord);
+ dataRecords.addAll(minimumRecords);
}
if (dataRecords.isEmpty()) {
Thread.sleep(200L);
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
index 691b5800028..a6eb2942911 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
@@ -80,7 +80,7 @@ public final class SocketSinkImporter extends
AbstractLifecycleExecutor implemen
}
while (isRunning()) {
List<Record> records = channel.fetchRecords(batchSize, 500,
TimeUnit.MILLISECONDS);
- if (null != records && !records.isEmpty()) {
+ if (!records.isEmpty()) {
List<Record> recordList = records.stream().filter(each ->
!(each instanceof PlaceholderRecord)).collect(Collectors.toList());
processDataRecords(recordList);
if (FinishedRecord.class.equals(records.get(records.size() -
1).getClass())) {
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
index 2a65306008a..6189f2ba161 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
@@ -24,8 +24,10 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
@@ -44,8 +46,8 @@ public final class CDCDataRecordUtils {
* @param cdcAckPositionMap CDC ack position map.
* @return minimum data record
*/
- public static DataRecord findMinimumDataRecordAndSavePosition(final
Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap, final
Comparator<DataRecord> dataRecordComparator,
- final
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
+ public static List<DataRecord> findMinimumDataRecordsAndSavePosition(final
Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+ final
Comparator<DataRecord> dataRecordComparator, final Map<SocketSinkImporter,
CDCAckPosition> cdcAckPositionMap) {
if (null == dataRecordComparator) {
return
findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
} else {
@@ -53,17 +55,18 @@ public final class CDCDataRecordUtils {
}
}
- private static DataRecord findMinimumDataRecordWithoutComparator(final
Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
- final
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
- for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry :
incrementalRecordMap.entrySet()) {
- Record record = entry.getValue().poll();
- if (!(record instanceof DataRecord)) {
+ private static List<DataRecord>
findMinimumDataRecordWithoutComparator(final Map<SocketSinkImporter,
BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+
final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
+ for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry
: incrementalRecordMap.entrySet()) {
+ List<DataRecord> records = entry.getValue().poll();
+ if (null == records || records.isEmpty()) {
continue;
}
- saveAckPosition(cdcAckPositionMap, entry.getKey(), record);
- return (DataRecord) record;
+ DataRecord lastRecord = records.get(records.size() - 1);
+ saveAckPosition(cdcAckPositionMap, entry.getKey(), lastRecord);
+ return records;
}
- return null;
+ return Collections.emptyList();
}
private static void saveAckPosition(final Map<SocketSinkImporter,
CDCAckPosition> cdcAckPositionMap, final SocketSinkImporter socketSinkImporter,
final Record record) {
@@ -76,39 +79,37 @@ public final class CDCDataRecordUtils {
}
}
- private static DataRecord findMinimumDataRecordWithComparator(final
Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
- final
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final
Comparator<DataRecord> dataRecordComparator) {
- Map<SocketSinkImporter, DataRecord> waitSortedMap = new HashMap<>();
- for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry :
incrementalRecordMap.entrySet()) {
- Record peek = entry.getValue().peek();
+ private static List<DataRecord> findMinimumDataRecordWithComparator(final
Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+ final
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final
Comparator<DataRecord> dataRecordComparator) {
+ Map<SocketSinkImporter, List<DataRecord>> waitSortedMap = new
HashMap<>();
+ for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry
: incrementalRecordMap.entrySet()) {
+ List<DataRecord> peek = entry.getValue().peek();
if (null == peek) {
continue;
}
- if (peek instanceof DataRecord) {
- waitSortedMap.put(entry.getKey(), (DataRecord) peek);
- }
+ waitSortedMap.put(entry.getKey(), peek);
}
if (waitSortedMap.isEmpty()) {
- return null;
+ return Collections.emptyList();
}
- DataRecord minRecord = null;
+ List<DataRecord> result = null;
SocketSinkImporter belongImporter = null;
- for (Entry<SocketSinkImporter, DataRecord> entry :
waitSortedMap.entrySet()) {
- if (null == minRecord) {
- minRecord = entry.getValue();
+ for (Entry<SocketSinkImporter, List<DataRecord>> entry :
waitSortedMap.entrySet()) {
+ if (null == result) {
+ result = entry.getValue();
belongImporter = entry.getKey();
continue;
}
- if (dataRecordComparator.compare(minRecord, entry.getValue()) > 0)
{
- minRecord = entry.getValue();
+ if (dataRecordComparator.compare(result.get(0),
entry.getValue().get(0)) > 0) {
+ result = entry.getValue();
belongImporter = entry.getKey();
}
}
- if (null == minRecord) {
- return null;
+ if (null == result) {
+ return Collections.emptyList();
}
incrementalRecordMap.get(belongImporter).poll();
- saveAckPosition(cdcAckPositionMap, belongImporter, minRecord);
- return minRecord;
+ saveAckPosition(cdcAckPositionMap, belongImporter,
result.get(result.size() - 1));
+ return result;
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 8041712e616..979a583388d 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -53,7 +53,7 @@ public final class YamlCDCJobConfigurationSwapper implements
YamlConfigurationSw
result.setDecodeWithTX(data.isDecodeWithTX());
result.setSinkConfig(swapToYamlSinkConfiguration(data.getSinkConfig()));
result.setConcurrency(data.getConcurrency());
- result.setRetryTimes(0);
+ result.setRetryTimes(data.getRetryTimes());
return result;
}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
index 575d221de2e..514c948b6fb 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
@@ -19,48 +19,50 @@ package org.apache.shardingsphere.data.pipeline.cdc.util;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
class CDCDataRecordUtilsTest {
@Test
void assertFindMinimumDataRecordAndSavePosition() throws
InterruptedException {
- final Map<SocketSinkImporter, BlockingQueue<Record>>
actualIncrementalRecordMap = new HashMap<>();
- ArrayBlockingQueue<Record> queueFirst = new ArrayBlockingQueue<>(5);
- queueFirst.put(generateDataRecord(0));
- queueFirst.put(generateDataRecord(2));
- queueFirst.put(generateDataRecord(4));
+ final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>>
actualIncrementalRecordMap = new HashMap<>();
+ ArrayBlockingQueue<List<DataRecord>> queueFirst = new
ArrayBlockingQueue<>(5);
+ queueFirst.put(Collections.singletonList(generateDataRecord(0)));
+ queueFirst.put(Collections.singletonList(generateDataRecord(2)));
+ queueFirst.put(Collections.singletonList(generateDataRecord(4)));
SocketSinkImporter mockSocketSinkImporterFirst =
mock(SocketSinkImporter.class);
actualIncrementalRecordMap.put(mockSocketSinkImporterFirst,
queueFirst);
- ArrayBlockingQueue<Record> queueSecond = new ArrayBlockingQueue<>(5);
- queueSecond.put(generateDataRecord(1));
- queueSecond.put(generateDataRecord(3));
- queueSecond.put(generateDataRecord(5));
+ ArrayBlockingQueue<List<DataRecord>> queueSecond = new
ArrayBlockingQueue<>(5);
+ queueSecond.put(Collections.singletonList(generateDataRecord(1)));
+ queueSecond.put(Collections.singletonList(generateDataRecord(3)));
+ queueSecond.put(Collections.singletonList(generateDataRecord(5)));
SocketSinkImporter mockSocketSinkImporterSecond =
mock(SocketSinkImporter.class);
actualIncrementalRecordMap.put(mockSocketSinkImporterSecond,
queueSecond);
Comparator<DataRecord> dataRecordComparator =
DataRecordComparatorGenerator.generatorIncrementalComparator(new
OpenGaussDatabaseType());
final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new
HashMap<>();
for (long i = 0; i <= 5; i++) {
- DataRecord minimumDataRecord =
CDCDataRecordUtils.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
dataRecordComparator, cdcAckPositionMap);
- assertThat(minimumDataRecord.getCsn(), is(i));
+ List<DataRecord> minimumDataRecord =
CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap,
dataRecordComparator, cdcAckPositionMap);
+ assertThat(minimumDataRecord.size(), is(1));
+ assertThat(minimumDataRecord.get(0).getCsn(), is(i));
}
-
assertNull(CDCDataRecordUtils.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
dataRecordComparator, cdcAckPositionMap));
+
assertTrue(CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap,
dataRecordComparator, cdcAckPositionMap).isEmpty());
}
private DataRecord generateDataRecord(final long csn) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 1514e76ba6b..2a662553ec0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -99,7 +99,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
int batchSize = importerConfig.getBatchSize();
while (isRunning()) {
List<Record> records = channel.fetchRecords(batchSize, 3,
TimeUnit.SECONDS);
- if (null != records && !records.isEmpty()) {
+ if (!records.isEmpty()) {
PipelineJobProgressUpdatedParameter updatedParam =
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()),
records);
channel.ack(records);
jobProgressListener.onProgressUpdated(updatedParam);
@@ -266,12 +266,13 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
private void executeBatchDelete(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
- List<Column> conditionColumns =
RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName()));
- String deleteSQL =
pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord, conditionColumns);
+ String deleteSQL =
pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord,
+ RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName())));
try (PreparedStatement preparedStatement =
connection.prepareStatement(deleteSQL)) {
batchDeleteStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
+ List<Column> conditionColumns =
RecordUtils.extractConditionColumns(each,
importerConfig.getShardingColumns(dataRecord.getTableName()));
for (int i = 0; i < conditionColumns.size(); i++) {
Object oldValue = conditionColumns.get(i).getOldValue();
if (null == oldValue) {
@@ -283,7 +284,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
}
int[] counts = preparedStatement.executeBatch();
if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
- log.warn("batchDelete failed, counts={}, sql={},
conditionColumns={}", Arrays.toString(counts), deleteSQL, conditionColumns);
+ log.warn("batchDelete failed, counts={}, sql={},
dataRecords={}", Arrays.toString(counts), deleteSQL, dataRecords);
}
} finally {
batchDeleteStatement.set(null);
@@ -295,23 +296,19 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
return;
}
try (Connection connection = dataSource.getConnection()) {
- sequentialFlush(connection, buffer);
+ // TODO it's better use transaction, but execute delete maybe not
effect when open transaction of PostgreSQL sometimes
+ for (DataRecord each : buffer) {
+ try {
+ doFlush(connection, each);
+ } catch (final SQLException ex) {
+ throw new
PipelineImporterJobWriteException(String.format("Write failed, record=%s",
each), ex);
+ }
+ }
} catch (final SQLException ex) {
throw new PipelineImporterJobWriteException(ex);
}
}
- private void sequentialFlush(final Connection connection, final
List<DataRecord> buffer) {
- // TODO it's better use transaction, but execute delete maybe not
effect when open transaction of PostgreSQL sometimes
- for (DataRecord each : buffer) {
- try {
- doFlush(connection, each);
- } catch (final SQLException ex) {
- throw new
PipelineImporterJobWriteException(String.format("Write failed, record=%s",
each), ex);
- }
- }
- }
-
@Override
protected void doStop() throws SQLException {
cancelStatement(batchInsertStatement.get());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
index ede119b8d74..63b16b247e0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
@@ -40,8 +40,9 @@ public final class MemoryPipelineChannelCreator implements
PipelineChannelCreato
}
@Override
- public PipelineChannel createPipelineChannel(final int outputConcurrency,
final AckCallback ackCallback) {
- return 1 == outputConcurrency ? new
SimpleMemoryPipelineChannel(blockQueueSize, ackCallback) : new
MultiplexMemoryPipelineChannel(outputConcurrency, blockQueueSize, ackCallback);
+ public PipelineChannel createPipelineChannel(final int outputConcurrency,
final int averageElementSize, final AckCallback ackCallback) {
+ return 1 == outputConcurrency ? new SimpleMemoryPipelineChannel((int)
Math.ceil((double) blockQueueSize / averageElementSize), ackCallback)
+ : new MultiplexMemoryPipelineChannel(outputConcurrency,
blockQueueSize, ackCallback);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 50bc4df6ad8..2e09b725c51 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -23,7 +23,9 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,25 +50,37 @@ public final class MultiplexMemoryPipelineChannel
implements PipelineChannel {
}
@Override
- public void pushRecord(final Record record) {
- if (FinishedRecord.class.equals(record.getClass())) {
+ public void pushRecords(final List<Record> records) {
+ Record firstRecord = records.get(0);
+ if (1 == records.size()) {
+ pushRecord(firstRecord);
+ return;
+ }
+ long insertDataRecordsCount =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
-> IngestDataChangeType.INSERT.equals(each.getType())).count();
+ if (insertDataRecordsCount == records.size()) {
+ channels.get(Math.abs(firstRecord.hashCode() %
channelNumber)).pushRecords(records);
+ return;
+ }
+ for (Record record : records) {
+ pushRecord(record);
+ }
+ }
+
+ private void pushRecord(final Record record) {
+ List<Record> records = Collections.singletonList(record);
+ if (record instanceof FinishedRecord) {
for (int i = 0; i < channelNumber; i++) {
- pushRecord(record, i);
+ channels.get(i).pushRecords(records);
}
} else if (DataRecord.class.equals(record.getClass())) {
- pushRecord(record, Math.abs(record.hashCode() % channelNumber));
+ channels.get(Math.abs(record.hashCode() %
channelNumber)).pushRecords(records);
} else if (PlaceholderRecord.class.equals(record.getClass())) {
- pushRecord(record, 0);
+ channels.get(0).pushRecords(records);
} else {
throw new UnsupportedOperationException("Unsupported record type:
" + record.getClass().getName());
}
}
- private void pushRecord(final Record record, final int channelIndex) {
- PipelineChannel channel = channels.get(channelIndex);
- channel.pushRecord(record);
- }
-
@Override
public List<Record> fetchRecords(final int batchSize, final int timeout,
final TimeUnit timeUnit) {
return findChannel().fetchRecords(batchSize, timeout, timeUnit);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 954e79ab9a9..605c3d25849 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
*/
public final class SimpleMemoryPipelineChannel implements PipelineChannel {
- private final BlockingQueue<Record> queue;
+ private final BlockingQueue<List<Record>> queue;
private final AckCallback ackCallback;
@@ -44,23 +44,29 @@ public final class SimpleMemoryPipelineChannel implements
PipelineChannel {
@SneakyThrows(InterruptedException.class)
@Override
- public void pushRecord(final Record dataRecord) {
- queue.put(dataRecord);
+ public void pushRecords(final List<Record> records) {
+ queue.put(records);
}
@SneakyThrows(InterruptedException.class)
// TODO thread-safe?
@Override
public List<Record> fetchRecords(final int batchSize, final int timeout,
final TimeUnit timeUnit) {
- List<Record> result = new ArrayList<>(batchSize);
+ List<Record> result = new LinkedList<>();
long start = System.currentTimeMillis();
- while (batchSize > queue.size()) {
+ int recordsCount = 0;
+ while (batchSize > recordsCount) {
+ List<Record> records = queue.poll();
+ if (null == records || records.isEmpty()) {
+ TimeUnit.MILLISECONDS.sleep(Math.min(100,
timeUnit.toMillis(timeout)));
+ } else {
+ recordsCount += records.size();
+ result.addAll(records);
+ }
if (timeUnit.toMillis(timeout) <= System.currentTimeMillis() -
start) {
break;
}
- TimeUnit.MILLISECONDS.sleep(100L);
}
- queue.drainTo(result, batchSize);
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 8dfc0048cd4..d1562e68dc4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPos
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -59,6 +60,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@@ -108,7 +110,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
throw new IngestException("Inventory dump failed on " +
dumperConfig.getActualTableName(), ex);
} finally {
- channel.pushRecord(new FinishedRecord(new FinishedPosition()));
+ channel.pushRecords(Collections.singletonList(new
FinishedRecord(new FinishedPosition())));
}
}
@@ -128,9 +130,14 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
int rowCount = 0;
JobRateLimitAlgorithm rateLimitAlgorithm =
dumperConfig.getRateLimitAlgorithm();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ List<Record> dataRecords = new LinkedList<>();
while (resultSet.next()) {
- channel.pushRecord(loadDataRecord(resultSet,
resultSetMetaData, tableMetaData));
+ dataRecords.add(loadDataRecord(resultSet,
resultSetMetaData, tableMetaData));
++rowCount;
+ if (dataRecords.size() >= batchSize) {
+ channel.pushRecords(dataRecords);
+ dataRecords = new LinkedList<>();
+ }
if (!isRunning()) {
log.info("Broke because of inventory dump is not
running.");
break;
@@ -139,6 +146,9 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
rateLimitAlgorithm.intercept(JobOperationType.SELECT,
1);
}
}
+ if (!dataRecords.isEmpty()) {
+ channel.pushRecords(dataRecords);
+ }
dumpStatement.set(null);
log.info("Inventory dump done, rowCount={}", rowCount);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
index 68336abd7c9..ac27b0c7c9b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
@@ -19,10 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.record;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import java.util.ArrayList;
import java.util.List;
@@ -87,21 +85,4 @@ public final class RecordUtils {
}
return result;
}
-
- /**
- * Get last normal record.
- *
- * @param records records
- * @return last normal record.
- */
- public static Record getLastNormalRecord(final List<Record> records) {
- for (int index = records.size() - 1; index >= 0; index--) {
- Record record = records.get(index);
- if (record.getPosition() instanceof PlaceholderPosition) {
- continue;
- }
- return record;
- }
- return null;
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index b081785bd40..75426260d90 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -105,7 +105,7 @@ public final class IncrementalTask implements PipelineTask,
AutoCloseable {
}
private PipelineChannel createChannel(final int concurrency, final
PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress
progress) {
- return pipelineChannelCreator.createPipelineChannel(concurrency,
records -> {
+ return pipelineChannelCreator.createPipelineChannel(concurrency, 5,
records -> {
Record lastHandledRecord = records.get(records.size() - 1);
if (!(lastHandledRecord.getPosition() instanceof
PlaceholderPosition)) {
progress.setPosition(lastHandledRecord.getPosition());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 7aa477675a6..852230ed3c0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -34,7 +34,6 @@ import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskPr
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
@@ -76,7 +75,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
taskId = generateTaskId(inventoryDumperConfig);
this.inventoryDumperExecuteEngine = inventoryDumperExecuteEngine;
this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
- channel = createChannel(pipelineChannelCreator);
+ channel = createChannel(pipelineChannelCreator,
importerConfig.getBatchSize());
dumper = new InventoryDumper(inventoryDumperConfig, channel,
sourceDataSource, sourceMetaDataLoader);
importer = TypedSPILoader.getService(ImporterCreator.class,
importerConnector.getType()).createImporter(importerConfig,
importerConnector, channel, jobProgressListener, ImporterType.INVENTORY);
@@ -120,12 +119,10 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
return result;
}
- private PipelineChannel createChannel(final PipelineChannelCreator
pipelineChannelCreator) {
- return pipelineChannelCreator.createPipelineChannel(1, records -> {
- Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
- if (null != lastNormalRecord) {
- position.set(lastNormalRecord.getPosition());
- }
+ private PipelineChannel createChannel(final PipelineChannelCreator
pipelineChannelCreator, final int batchSize) {
+ return pipelineChannelCreator.createPipelineChannel(1, batchSize,
records -> {
+ Record lastRecord = records.get(records.size() - 1);
+ position.set(lastRecord.getPosition());
});
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
index d9ac1de2854..4a1b6d2dc0e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
@@ -30,8 +30,9 @@ public interface PipelineChannelCreator extends TypedSPI {
* Create pipeline channel.
*
* @param outputConcurrency output concurrency
+ * @param averageElementSize average element size, affect the size of the
queue
* @param ackCallback ack callback
* @return {@link PipelineChannel}
*/
- PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback
ackCallback);
+ PipelineChannel createPipelineChannel(int outputConcurrency, int
averageElementSize, AckCallback ackCallback);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
index d3fb83b6283..4e1c839c3fd 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
@@ -46,11 +46,11 @@ class MemoryPipelineChannelCreatorTest {
@Test
void assertCreateSimpleMemoryPipelineChannel() {
- assertThat(TypedSPILoader.getService(PipelineChannelCreator.class,
"MEMORY").createPipelineChannel(1, mock(AckCallback.class)),
instanceOf(SimpleMemoryPipelineChannel.class));
+ assertThat(TypedSPILoader.getService(PipelineChannelCreator.class,
"MEMORY").createPipelineChannel(1, 1, mock(AckCallback.class)),
instanceOf(SimpleMemoryPipelineChannel.class));
}
@Test
void assertCreateMultiplexMemoryPipelineChannel() {
- assertThat(TypedSPILoader.getService(PipelineChannelCreator.class,
"MEMORY").createPipelineChannel(2, mock(AckCallback.class)),
instanceOf(MultiplexMemoryPipelineChannel.class));
+ assertThat(TypedSPILoader.getService(PipelineChannelCreator.class,
"MEMORY").createPipelineChannel(2, 1, mock(AckCallback.class)),
instanceOf(MultiplexMemoryPipelineChannel.class));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 75099e77fb7..2d0b1d258cf 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -70,9 +70,7 @@ class MultiplexMemoryPipelineChannelTest {
CountDownLatch countDownLatch = new CountDownLatch(recordCount);
MultiplexMemoryPipelineChannel memoryChannel = new
MultiplexMemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
fetchWithMultiThreads(memoryChannel, countDownLatch);
- for (Record record : records) {
- memoryChannel.pushRecord(record);
- }
+ memoryChannel.pushRecords(Arrays.asList(records));
boolean awaitResult = countDownLatch.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult, "await failed");
memoryChannel.close();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
new file mode 100644
index 00000000000..0666bc8c258
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SimpleMemoryPipelineChannelTest {
+
+ @Test
+ void assertFetchRecordsTimeoutCorrectly() {
+ SimpleMemoryPipelineChannel simpleMemoryPipelineChannel = new
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
+ long startMills = System.currentTimeMillis();
+ simpleMemoryPipelineChannel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
+ long endMills = System.currentTimeMillis();
+ assertTrue(endMills - startMills >= 1 && endMills - startMills < 50);
+ startMills = System.currentTimeMillis();
+ simpleMemoryPipelineChannel.fetchRecords(1, 500,
TimeUnit.MILLISECONDS);
+ endMills = System.currentTimeMillis();
+ assertTrue(endMills - startMills >= 500 && endMills - startMills <
600);
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
index 6206d792aec..0f5c00a80e9 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
@@ -17,23 +17,16 @@
package org.apache.shardingsphere.data.pipeline.core.record;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import static org.hamcrest.CoreMatchers.hasItems;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -73,14 +66,4 @@ class RecordUtilsTest {
result.addColumn(new Column("c3", "", true, false));
return result;
}
-
- @Test
- void assertGetLastNormalRecord() {
- List<Record> actual = Arrays.asList(new DataRecord(new
IntegerPrimaryKeyPosition(0, 1), 0), new PlaceholderRecord(new
PlaceholderPosition()));
- Record expected = RecordUtils.getLastNormalRecord(actual);
- assertThat(expected, instanceOf(DataRecord.class));
- actual = Arrays.asList(new DataRecord(new IntegerPrimaryKeyPosition(0,
1), 0), new PlaceholderRecord(new PlaceholderPosition()), new
FinishedRecord(new FinishedPosition()));
- expected = RecordUtils.getLastNormalRecord(actual);
- assertThat(expected, instanceOf(FinishedRecord.class));
- }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 772060603ca..71f7c8f0f55 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -25,12 +25,13 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJd
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -42,7 +43,6 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPositio
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
@@ -55,6 +55,9 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.io.Serializable;
import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
@@ -88,7 +91,8 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
YamlJdbcConfiguration jdbcConfig =
((StandardPipelineDataSourceConfiguration)
dumperConfig.getDataSourceConfig()).getJdbcConfig();
log.info("incremental dump, jdbcUrl={}", jdbcConfig.getUrl());
DataSourceMetaData metaData =
TypedSPILoader.getService(DatabaseType.class,
"MySQL").getDataSourceMetaData(jdbcConfig.getUrl(), null);
- client = new MySQLClient(new ConnectInfo(new Random().nextInt(),
metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(),
jdbcConfig.getPassword()));
+ ConnectInfo connectInfo = new ConnectInfo(new Random().nextInt(),
metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(),
jdbcConfig.getPassword());
+ client = new MySQLClient(connectInfo, dumperConfig.isDecodeWithTX());
catalog = metaData.getCatalog();
}
@@ -97,46 +101,64 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
client.connect();
client.subscribe(binlogPosition.getFilename(),
binlogPosition.getPosition());
while (isRunning()) {
- AbstractBinlogEvent event = client.poll();
- if (null == event) {
+ List<AbstractBinlogEvent> events = client.poll();
+ if (events.isEmpty()) {
continue;
}
- handleEvent(event);
+ handleEvents(events);
}
- channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
+ channel.pushRecords(Collections.singletonList(new FinishedRecord(new
FinishedPosition())));
}
- private void handleEvent(final AbstractBinlogEvent event) {
- if (event instanceof PlaceholderEvent || !((AbstractRowsEvent)
event).getDatabaseName().equals(catalog) ||
!dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
- createPlaceholderRecord(event);
+ private void handleEvents(final List<AbstractBinlogEvent> events) {
+ List<Record> dataRecords = new LinkedList<>();
+ for (AbstractBinlogEvent each : events) {
+ if (!(each instanceof AbstractRowsEvent)) {
+ dataRecords.add(createPlaceholderRecord(each));
+ continue;
+ }
+ dataRecords.addAll(handleEvent(each));
+ }
+ if (dataRecords.isEmpty()) {
return;
}
- PipelineTableMetaData tableMetaData =
getPipelineTableMetaData(((AbstractRowsEvent) event).getTableName());
+ channel.pushRecords(dataRecords);
+ }
+
+ private List<? extends Record> handleEvent(final AbstractBinlogEvent
event) {
+ if (!(event instanceof AbstractRowsEvent)) {
+ return Collections.singletonList(createPlaceholderRecord(event));
+ }
+ AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
+ if (!rowsEvent.getDatabaseName().equals(catalog) ||
!dumperConfig.containsTable(rowsEvent.getTableName())) {
+ return Collections.singletonList(createPlaceholderRecord(event));
+ }
+ PipelineTableMetaData tableMetaData =
getPipelineTableMetaData(rowsEvent.getTableName());
if (event instanceof WriteRowsEvent) {
- handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
- return;
+ return handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
}
if (event instanceof UpdateRowsEvent) {
- handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
- return;
+ return handleUpdateRowsEvent((UpdateRowsEvent) event,
tableMetaData);
}
if (event instanceof DeleteRowsEvent) {
- handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
+ return handleDeleteRowsEvent((DeleteRowsEvent) event,
tableMetaData);
}
+ return Collections.emptyList();
}
- private void createPlaceholderRecord(final AbstractBinlogEvent event) {
- PlaceholderRecord placeholderRecord = new PlaceholderRecord(new
BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
- placeholderRecord.setCommitTime(event.getTimestamp() * 1000L);
- channel.pushRecord(placeholderRecord);
+ private PlaceholderRecord createPlaceholderRecord(final
AbstractBinlogEvent event) {
+ PlaceholderRecord result = new PlaceholderRecord(new
BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
+ result.setCommitTime(event.getTimestamp() * 1000L);
+ return result;
}
private PipelineTableMetaData getPipelineTableMetaData(final String
actualTableName) {
return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new
ActualTableName(actualTableName)), actualTableName);
}
- private void handleWriteRowsEvent(final WriteRowsEvent event, final
PipelineTableMetaData tableMetaData) {
+ private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event,
final PipelineTableMetaData tableMetaData) {
Set<ColumnName> columnNameSet =
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+ List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord = createDataRecord(event, each.length);
dataRecord.setType(IngestDataChangeType.INSERT);
@@ -147,16 +169,18 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
}
- channel.pushRecord(dataRecord);
+ result.add(dataRecord);
}
+ return result;
}
private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet,
final String columnName) {
return null != columnNameSet && !columnNameSet.contains(new
ColumnName(columnName));
}
- private void handleUpdateRowsEvent(final UpdateRowsEvent event, final
PipelineTableMetaData tableMetaData) {
+ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent
event, final PipelineTableMetaData tableMetaData) {
Set<ColumnName> columnNameSet =
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+ List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Serializable[] afterValues = event.getAfterRows().get(i);
@@ -174,12 +198,14 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
handleValue(columnMetaData, oldValue),
handleValue(columnMetaData, newValue), updated,
columnMetaData.isUniqueKey()));
}
- channel.pushRecord(dataRecord);
+ result.add(dataRecord);
}
+ return result;
}
- private void handleDeleteRowsEvent(final DeleteRowsEvent event, final
PipelineTableMetaData tableMetaData) {
+ private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent
event, final PipelineTableMetaData tableMetaData) {
Set<ColumnName> columnNameSet =
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+ List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord = createDataRecord(event, each.length);
dataRecord.setType(IngestDataChangeType.DELETE);
@@ -190,8 +216,9 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, each[i]), null, true,
columnMetaData.isUniqueKey()));
}
- channel.pushRecord(dataRecord);
+ result.add(dataRecord);
}
+ return result;
}
private Serializable handleValue(final PipelineColumnMetaData
columnMetaData, final Serializable value) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java
similarity index 50%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
copy to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java
index d9ac1de2854..e006c61d7d6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java
@@ -15,23 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
/**
- * Pipeline channel creator.
+ * Query event.This event is written into the binary log file for:
+ * 1. STATEMENT based replication (updating statements)
+ * 2. DDLs
+ * 3. COMMIT related to non transactional engines (MyISAM, BLACKHOLE etc).
+ *
+ * @see <a
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_query">QUERY_EVENT</a>
*/
-public interface PipelineChannelCreator extends TypedSPI {
+@Getter
+@RequiredArgsConstructor
+public final class QueryEvent extends AbstractBinlogEvent {
+
+ private final long threadId;
+
+ private final long executionTime;
+
+ private final int errorCode;
+
+ private final String databaseName;
- /**
- * Create pipeline channel.
- *
- * @param outputConcurrency output concurrency
- * @param ackCallback ack callback
- * @return {@link PipelineChannel}
- */
- PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback
ackCallback);
+ private final String sql;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java
similarity index 55%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
copy to
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java
index d9ac1de2854..0e1bc1be469 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
/**
- * Pipeline channel creator.
+ * XID event is generated for a COMMIT of a transaction that modifies one or
more tables of an XA-capable storage engine.
+ *
+ * @see <a
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_xid">XID_EVENT</a>
*/
-public interface PipelineChannelCreator extends TypedSPI {
+@RequiredArgsConstructor
+@Getter
+public final class XidEvent extends AbstractBinlogEvent {
- /**
- * Create pipeline channel.
- *
- * @param outputConcurrency output concurrency
- * @param ackCallback ack callback
- * @return {@link PipelineChannel}
- */
- PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback
ackCallback);
+ private final long xid;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index bec2dbd84c0..d2c9dbaf502 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -54,6 +54,8 @@ import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
@@ -77,7 +79,7 @@ public final class MySQLClient {
private Promise<Object> responseCallback;
- private final ArrayBlockingQueue<AbstractBinlogEvent> blockingEventQueue =
new ArrayBlockingQueue<>(10000);
+ private final ArrayBlockingQueue<List<AbstractBinlogEvent>>
blockingEventQueue = new ArrayBlockingQueue<>(2500);
private ServerInfo serverInfo;
@@ -85,6 +87,8 @@ public final class MySQLClient {
private final AtomicInteger reconnectTimes = new AtomicInteger();
+ private final boolean decodeWithTX;
+
/**
* Connect to MySQL.
*/
@@ -206,7 +210,7 @@ public final class MySQLClient {
channel.pipeline().remove(MySQLCommandPacketDecoder.class);
channel.pipeline().remove(MySQLCommandResponseHandler.class);
String tableKey = String.join(":", connectInfo.getHost(),
String.valueOf(connectInfo.getPort()));
- channel.pipeline().addLast(new
MySQLBinlogEventPacketDecoder(checksumLength,
GlobalTableMapEventMapping.getTableMapEventMap(tableKey)));
+ channel.pipeline().addLast(new
MySQLBinlogEventPacketDecoder(checksumLength,
GlobalTableMapEventMapping.getTableMapEventMap(tableKey), decodeWithTX));
channel.pipeline().addLast(new
MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition)));
resetSequenceID();
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int)
binlogPosition, connectInfo.getServerId(), binlogFileName));
@@ -228,13 +232,14 @@ public final class MySQLClient {
*
* @return binlog event
*/
- public synchronized AbstractBinlogEvent poll() {
+ public synchronized List<AbstractBinlogEvent> poll() {
ShardingSpherePreconditions.checkState(running,
BinlogSyncChannelAlreadyClosedException::new);
try {
- return blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
+ List<AbstractBinlogEvent> result = blockingEventQueue.poll(100L,
TimeUnit.MILLISECONDS);
+ return null == result ? Collections.emptyList() : result;
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
- return null;
+ return Collections.emptyList();
}
}
@@ -305,15 +310,26 @@ public final class MySQLClient {
this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
}
+ @SuppressWarnings("unchecked")
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object
msg) throws Exception {
if (!running) {
return;
}
+ reconnectTimes.set(0);
+ if (msg instanceof List) {
+ List<AbstractBinlogEvent> records =
(List<AbstractBinlogEvent>) msg;
+ if (records.isEmpty()) {
+ log.warn("The records is empty");
+ return;
+ }
+ lastBinlogEvent.set(records.get(records.size() - 1));
+ blockingEventQueue.put(records);
+ return;
+ }
if (msg instanceof AbstractBinlogEvent) {
lastBinlogEvent.set((AbstractBinlogEvent) msg);
- blockingEventQueue.put(lastBinlogEvent.get());
- reconnectTimes.set(0);
+
blockingEventQueue.put(Collections.singletonList(lastBinlogEvent.get()));
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 98079fc3c97..14088dc3a1a 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -28,8 +28,10 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.Abstrac
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
@@ -39,6 +41,7 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlog
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -49,22 +52,41 @@ import java.util.Optional;
@Slf4j
public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
+ private static final String TX_BEGIN_SQL = "BEGIN";
+
private final BinlogContext binlogContext;
- public MySQLBinlogEventPacketDecoder(final int checksumLength, final
Map<Long, MySQLBinlogTableMapEventPacket> tableMap) {
+ private final boolean decodeWithTX;
+
+ private List<AbstractBinlogEvent> records = new LinkedList<>();
+
+ public MySQLBinlogEventPacketDecoder(final int checksumLength, final
Map<Long, MySQLBinlogTableMapEventPacket> tableMap, final boolean decodeWithTX)
{
+ this.decodeWithTX = decodeWithTX;
binlogContext = new BinlogContext(checksumLength, tableMap);
}
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in,
final List<Object> out) {
- // readable bytes must greater + statusCode(1b) + header-length(19b) +
while (in.readableBytes() >= 1 +
MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
in.markReaderIndex();
MySQLPacketPayload payload = new MySQLPacketPayload(in,
ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
checkPayload(payload);
MySQLBinlogEventHeader binlogEventHeader = new
MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
if (checkEventIntegrity(in, binlogEventHeader)) {
- decodeEvent(binlogEventHeader, payload).ifPresent(out::add);
+ Optional<AbstractBinlogEvent> binlogEvent =
decodeEvent(binlogEventHeader, payload);
+ if (!binlogEvent.isPresent()) {
+ skipChecksum(binlogEventHeader.getEventType(), in);
+ return;
+ }
+ if (binlogEvent.get() instanceof PlaceholderEvent) {
+ out.add(binlogEvent);
+ } else {
+ if (decodeWithTX) {
+ processEventWithTX(binlogEvent.get(), out);
+ } else {
+ processEventIgnoreTX(binlogEvent.get(), out);
+ }
+ }
skipChecksum(binlogEventHeader.getEventType(), in);
} else {
break;
@@ -98,6 +120,30 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
return true;
}
+ private void processEventWithTX(final AbstractBinlogEvent binlogEvent,
final List<Object> out) {
+ if (binlogEvent instanceof QueryEvent) {
+ QueryEvent queryEvent = (QueryEvent) binlogEvent;
+ if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
+ records = new LinkedList<>();
+ }
+ } else if (binlogEvent instanceof XidEvent) {
+ records.add(binlogEvent);
+ out.add(records);
+ } else {
+ records.add(binlogEvent);
+ }
+ }
+
+ private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent,
final List<Object> out) {
+ if (binlogEvent instanceof QueryEvent) {
+ QueryEvent queryEvent = (QueryEvent) binlogEvent;
+ if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
+ return;
+ }
+ }
+ out.add(binlogEvent);
+ }
+
private Optional<AbstractBinlogEvent> decodeEvent(final
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
switch
(MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType()).orElse(MySQLBinlogEventType.UNKNOWN_EVENT))
{
case ROTATE_EVENT:
@@ -118,6 +164,10 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
case DELETE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V2:
return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader,
payload));
+ case QUERY_EVENT:
+ return
Optional.of(decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload));
+ case XID_EVENT:
+ return Optional.of(decodeXidEvent(binlogEventHeader, payload));
default:
return Optional.of(decodePlaceholderEvent(binlogEventHeader,
payload));
}
@@ -191,6 +241,27 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
return result;
}
+ private QueryEvent decodeQueryEvent(final int checksumLength, final
MySQLPacketPayload payload) {
+ int threadId = payload.readInt4();
+ int executionTime = payload.readInt4();
+ payload.skipReserved(1);
+ int errorCode = payload.readInt2();
+ payload.skipReserved(payload.readInt2());
+ String databaseName = payload.readStringNul();
+ String sql =
payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
+ return new QueryEvent(threadId, executionTime, errorCode,
databaseName, sql);
+ }
+
+ private XidEvent decodeXidEvent(final MySQLBinlogEventHeader
binlogEventHeader, final MySQLPacketPayload payload) {
+ XidEvent result = new XidEvent(payload.readInt8());
+ result.setFileName(binlogContext.getFileName());
+ result.setPosition(binlogEventHeader.getLogPos());
+ result.setTimestamp(binlogEventHeader.getTimestamp());
+ result.setServerId(binlogEventHeader.getServerId());
+ return result;
+ }
+
+ // TODO May be used again later, keep this method first.
private PlaceholderEvent createPlaceholderEvent(final
MySQLBinlogEventHeader binlogEventHeader) {
PlaceholderEvent result = new PlaceholderEvent();
result.setFileName(binlogContext.getFileName());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 52f13528191..b386245eb57 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -23,7 +23,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
@@ -34,8 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableM
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
@@ -63,16 +61,19 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
+@SuppressWarnings("unchecked")
class MySQLIncrementalDumperTest {
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@@ -81,19 +82,18 @@ class MySQLIncrementalDumperTest {
private MySQLIncrementalDumper incrementalDumper;
- private MultiplexMemoryPipelineChannel channel;
-
private PipelineTableMetaData pipelineTableMetaData;
@BeforeEach
void setUp() {
dumperConfig = mockDumperConfiguration();
initTableData(dumperConfig);
- dumperConfig.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0",
"root", "root"));
- channel = new MultiplexMemoryPipelineChannel(1, 10000, new
EmptyAckCallback());
- PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+ dumperConfig.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("mock:mysql://127.0.0.1:3306/test",
"root", "root"));
+ PipelineTableMetaDataLoader metaDataLoader =
mock(PipelineTableMetaDataLoader.class);
+ SimpleMemoryPipelineChannel channel = new
SimpleMemoryPipelineChannel(10000, new EmptyAckCallback());
incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new
BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader);
pipelineTableMetaData = new PipelineTableMetaData("t_order",
mockOrderColumnsMetaDataMap(), Collections.emptyList());
+ when(metaDataLoader.getTableMetaData(any(),
any())).thenReturn(pipelineTableMetaData);
}
private DumperConfiguration mockDumperConfiguration() {
@@ -101,6 +101,7 @@ class MySQLIncrementalDumperTest {
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(Collections.emptyMap()));
+ result.setTargetTableColumnsMap(Collections.singletonMap(new
LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id"))));
return result;
}
@@ -150,8 +151,7 @@ class MySQLIncrementalDumperTest {
rowsEvent.setTableName("t_order");
rowsEvent.setAfterRows(Collections.singletonList(new
Serializable[]{101, 1, "OK"}));
Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent",
WriteRowsEvent.class, PipelineTableMetaData.class);
- Plugins.getMemberAccessor().invoke(method, incrementalDumper,
rowsEvent, pipelineTableMetaData);
- List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+ List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent,
pipelineTableMetaData);
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(IngestDataChangeType.INSERT));
@@ -175,13 +175,12 @@ class MySQLIncrementalDumperTest {
private void assertUpdateRowsEvent0(final Map<LogicTableName,
Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws
ReflectiveOperationException {
dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
- rowsEvent.setDatabaseName("");
+ rowsEvent.setDatabaseName("test");
rowsEvent.setTableName("t_order");
rowsEvent.setBeforeRows(Collections.singletonList(new
Serializable[]{101, 1, "OK"}));
rowsEvent.setAfterRows(Collections.singletonList(new
Serializable[]{101, 1, "OK2"}));
Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent",
UpdateRowsEvent.class, PipelineTableMetaData.class);
- Plugins.getMemberAccessor().invoke(method, incrementalDumper,
rowsEvent, pipelineTableMetaData);
- List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+ List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent,
pipelineTableMetaData);
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(IngestDataChangeType.UPDATE));
@@ -205,8 +204,7 @@ class MySQLIncrementalDumperTest {
rowsEvent.setTableName("t_order");
rowsEvent.setBeforeRows(Collections.singletonList(new
Serializable[]{101, 1, "OK"}));
Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent",
DeleteRowsEvent.class, PipelineTableMetaData.class);
- Plugins.getMemberAccessor().invoke(method, incrementalDumper,
rowsEvent, pipelineTableMetaData);
- List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+ List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent,
pipelineTableMetaData);
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(IngestDataChangeType.DELETE));
@@ -215,19 +213,20 @@ class MySQLIncrementalDumperTest {
@Test
void assertPlaceholderEvent() throws ReflectiveOperationException {
-
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
AbstractBinlogEvent.class), incrementalDumper, new PlaceholderEvent());
- List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+ List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
AbstractBinlogEvent.class),
+ incrementalDumper, new PlaceholderEvent());
assertThat(actual.size(), is(1));
- assertThat(actual.get(0), instanceOf(PlaceholderRecord.class));
}
@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
- rowsEvent.setDatabaseName("unknown_database");
-
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
AbstractBinlogEvent.class), incrementalDumper, rowsEvent);
- List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+ rowsEvent.setDatabaseName("test");
+ rowsEvent.setTableName("t_order");
+ rowsEvent.setAfterRows(Collections.singletonList(new
Serializable[]{1}));
+ List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
AbstractBinlogEvent.class),
+ incrementalDumper, rowsEvent);
assertThat(actual.size(), is(1));
- assertThat(actual.get(0), instanceOf(PlaceholderRecord.class));
+ assertThat(actual.get(0), instanceOf(DataRecord.class));
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index dcc3972be7d..7d9ffb84459 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -69,7 +69,7 @@ class MySQLClientTest {
@SuppressWarnings("unchecked")
@BeforeEach
void setUp() throws InterruptedException {
- mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306,
"username", "password"));
+ mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306,
"username", "password"), false);
when(channel.pipeline()).thenReturn(pipeline);
when(channel.isOpen()).thenReturn(true);
when(channel.close()).thenReturn(channelFuture);
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 73bb4d7a18f..e0fb747d164 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
@@ -70,7 +71,7 @@ class MySQLBinlogEventPacketDecoderTest {
@BeforeEach
void setUp() throws NoSuchFieldException, IllegalAccessException {
- binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4, new
ConcurrentHashMap<>());
+ binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4, new
ConcurrentHashMap<>(), true);
binlogContext = (BinlogContext)
Plugins.getMemberAccessor().get(MySQLBinlogEventPacketDecoder.class.getDeclaredField("binlogContext"),
binlogEventPacketDecoder);
when(channelHandlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()).thenReturn(StandardCharsets.UTF_8);
columnDefs = Lists.newArrayList(new
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONGLONG), new
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONG),
@@ -124,13 +125,15 @@ class MySQLBinlogEventPacketDecoderTest {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
// the hex data is from INSERT INTO t_order(order_id, user_id, status,
t_numeric) VALUES (1, 1, 'SUCCESS',null);
byteBuf.writeBytes(StringUtil.decodeHexDump("007a36a9621e0100000038000000bb7c000000007b00000000000100020004ff08010000000000000001000000075355434345535365eff9ff"));
+
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
binlogContext.getTableMap().put(123L, tableMapEventPacket);
when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
- assertThat(decodedEvents.get(0), instanceOf(WriteRowsEvent.class));
- WriteRowsEvent actual = (WriteRowsEvent) decodedEvents.get(0);
+ LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
+ assertThat(actualEventList.get(0), instanceOf(WriteRowsEvent.class));
+ WriteRowsEvent actual = (WriteRowsEvent) actualEventList.get(0);
assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
}
@@ -140,13 +143,15 @@ class MySQLBinlogEventPacketDecoderTest {
// the hex data is from update t_order set status = 'updated' where
order_id = 1;
byteBuf.writeBytes(StringUtil.decodeHexDump("00cb38a9621f010000004e0000000c7e000000007b00000000000100020004ffff08010000000000000001000000075355434345535308010000000000000001000000077570"
+ "6461746564e78cee6c"));
+
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
binlogContext.getTableMap().put(123L, tableMapEventPacket);
when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
- assertThat(decodedEvents.get(0), instanceOf(UpdateRowsEvent.class));
- UpdateRowsEvent actual = (UpdateRowsEvent) decodedEvents.get(0);
+ LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
+ assertThat(actualEventList.get(0), instanceOf(UpdateRowsEvent.class));
+ UpdateRowsEvent actual = (UpdateRowsEvent) actualEventList.get(0);
assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("updated".getBytes()), null}));
}
@@ -156,13 +161,16 @@ class MySQLBinlogEventPacketDecoderTest {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
// delete from t_order where order_id = 1;
byteBuf.writeBytes(StringUtil.decodeHexDump("002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5"));
+
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
binlogContext.getTableMap().put(116L, tableMapEventPacket);
when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
- assertThat(decodedEvents.get(0), instanceOf(DeleteRowsEvent.class));
- DeleteRowsEvent actual = (DeleteRowsEvent) decodedEvents.get(0);
+ LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
+ assertThat(actualEventList.get(0), instanceOf(DeleteRowsEvent.class));
+ assertThat(actualEventList.get(1), instanceOf(XidEvent.class));
+ DeleteRowsEvent actual = (DeleteRowsEvent) actualEventList.get(0);
assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
}
@@ -171,6 +179,7 @@ class MySQLBinlogEventPacketDecoderTest {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
byte[] completeData =
StringUtil.decodeHexDump("002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
byteBuf.writeBytes(completeData);
+
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
// write incomplete event data
byteBuf.writeBytes(StringUtil.decodeHexDump("3400"));
List<Object> decodedEvents = new LinkedList<>();
@@ -178,7 +187,6 @@ class MySQLBinlogEventPacketDecoderTest {
when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
- assertThat(byteBuf.readerIndex(), is(completeData.length));
}
@Test
@@ -186,6 +194,7 @@ class MySQLBinlogEventPacketDecoderTest {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
byte[] completeData =
StringUtil.decodeHexDump("002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
byteBuf.writeBytes(completeData);
+
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
byte[] notCompleteData =
StringUtil.decodeHexDump("00cb38a962130100000041000000be7d000000007b000000000001000464735f310009745f6f726465725f31000408030f");
byteBuf.writeBytes(notCompleteData);
List<Object> decodedEvents = new LinkedList<>();
@@ -193,6 +202,5 @@ class MySQLBinlogEventPacketDecoderTest {
when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf,
decodedEvents);
assertThat(decodedEvents.size(), is(1));
- assertThat(byteBuf.readerIndex(), is(completeData.length));
}
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 52f5f86b081..e438f3905c6 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExe
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
@@ -44,6 +45,7 @@ import org.opengauss.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -64,7 +66,7 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
private final boolean decodeWithTX;
- private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+ private List<AbstractRowEvent> rowEvents = new LinkedList<>();
public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final
IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
@@ -115,29 +117,31 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
}
private void processEventWithTX(final AbstractWALEvent event) {
- if (event instanceof AbstractRowEvent) {
- rowEvents.add((AbstractRowEvent) event);
+ if (event instanceof BeginTXEvent) {
return;
}
- if (event instanceof BeginTXEvent) {
- rowEvents.clear();
+ if (event instanceof AbstractRowEvent) {
+ rowEvents.add((AbstractRowEvent) event);
return;
}
if (event instanceof CommitTXEvent) {
Long csn = ((CommitTXEvent) event).getCsn();
+ List<Record> records = new LinkedList<>();
for (AbstractRowEvent each : rowEvents) {
each.setCsn(csn);
- channel.pushRecord(walEventConverter.convert(each));
+ records.add(walEventConverter.convert(each));
}
+ records.add(walEventConverter.convert(event));
+ channel.pushRecords(records);
+ rowEvents = new LinkedList<>();
}
- channel.pushRecord(walEventConverter.convert(event));
}
private void processEventIgnoreTX(final AbstractWALEvent event) {
if (event instanceof BeginTXEvent) {
return;
}
- channel.pushRecord(walEventConverter.convert(event));
+
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 88d8b70caa5..d5da268daf1 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExe
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
@@ -33,7 +34,10 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Deco
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.postgresql.jdbc.PgConnection;
@@ -42,6 +46,10 @@ import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
/**
* PostgreSQL WAL dumper.
@@ -58,6 +66,10 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
private final PostgreSQLLogicalReplication logicalReplication;
+ private final boolean decodeWithTX;
+
+ private List<AbstractRowEvent> rowEvents = new LinkedList<>();
+
public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final
IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
@@ -67,6 +79,7 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
this.channel = channel;
walEventConverter = new WALEventConverter(dumperConfig,
metaDataLoader);
logicalReplication = new PostgreSQLLogicalReplication();
+ this.decodeWithTX = dumperConfig.isDecodeWithTX();
}
@SneakyThrows(InterruptedException.class)
@@ -86,13 +99,43 @@ public final class PostgreSQLWALDumper extends
AbstractLifecycleExecutor impleme
continue;
}
AbstractWALEvent event = decodingPlugin.decode(message, new
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
- channel.pushRecord(walEventConverter.convert(event));
+ if (decodeWithTX) {
+ processEventWithTX(event);
+ } else {
+ processEventIgnoreTX(event);
+ }
}
} catch (final SQLException ex) {
throw new IngestException(ex);
}
}
+ private void processEventWithTX(final AbstractWALEvent event) {
+ if (event instanceof BeginTXEvent) {
+ rowEvents = new ArrayList<>();
+ return;
+ }
+ if (event instanceof AbstractRowEvent) {
+ rowEvents.add((AbstractRowEvent) event);
+ return;
+ }
+ if (event instanceof CommitTXEvent) {
+ List<Record> records = new LinkedList<>();
+ for (AbstractWALEvent each : rowEvents) {
+ records.add(walEventConverter.convert(each));
+ }
+ records.add(walEventConverter.convert(event));
+ channel.pushRecords(records);
+ }
+ }
+
+ private void processEventIgnoreTX(final AbstractWALEvent event) {
+ if (event instanceof BeginTXEvent) {
+ return;
+ }
+
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
+ }
+
@Override
protected void doStop() {
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 330c60e7a05..40729839477 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -23,6 +23,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -45,7 +47,14 @@ public final class TestDecodingPlugin implements
DecodingPlugin {
@Override
public AbstractWALEvent decode(final ByteBuffer data, final
BaseLogSequenceNumber logSequenceNumber) {
- AbstractWALEvent result = "table".equals(readEventType(data)) ?
readTableEvent(data) : new PlaceholderEvent();
+ String type = readEventType(data);
+ if (type.startsWith("BEGIN")) {
+ return new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+ }
+ if (type.startsWith("COMMIT")) {
+ return new CommitTXEvent(Long.parseLong(readNextSegment(data)),
null);
+ }
+ AbstractWALEvent result = "table".equals(type) ? readTableEvent(data)
: new PlaceholderEvent();
result.setLogSequenceNumber(logSequenceNumber);
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 36689a5d87d..c823fc41844 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
@@ -76,14 +76,14 @@ class PostgreSQLWALDumperTest {
private PostgreSQLWALDumper walDumper;
- private MultiplexMemoryPipelineChannel channel;
+ private SimpleMemoryPipelineChannel channel;
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@BeforeEach
void setUp() {
position = new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
- channel = new MultiplexMemoryPipelineChannel(1, 10000, new
EmptyAckCallback());
+ channel = new SimpleMemoryPipelineChannel(10000, new
EmptyAckCallback());
String jdbcUrl =
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL";
String username = "root";
String password = "root";
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index 1f190e36cbc..a02398e7e49 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -154,7 +155,7 @@ class DataSourceImporterTest {
private List<Record> mockRecords(final DataRecord dataRecord) {
List<Record> result = new LinkedList<>();
result.add(dataRecord);
- result.add(new FinishedRecord(new PlaceholderPosition()));
+ result.add(new FinishedRecord(new FinishedPosition()));
return result;
}