This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d07f233e33a Recover value of migration incremental importer batch size
(#34670)
d07f233e33a is described below
commit d07f233e33aa49a853ed23665bef1ec0a944929f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Feb 14 20:42:22 2025 +0800
Recover value of migration incremental importer batch size (#34670)
---
.../core/importer/sink/type/PipelineDataSourceSink.java | 16 ++++++++++++++++
.../core/ingest/record/group/DataRecordGroupEngine.java | 9 ++++++---
.../ingest/record/group/DataRecordGroupEngineTest.java | 11 ++++++++++-
.../migration/preparer/MigrationJobPreparer.java | 3 ++-
.../migration/primarykey/IndexesMigrationE2EIT.java | 2 +-
5 files changed, 35 insertions(+), 6 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index d9f1d81f0ab..87429222f68 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -41,6 +41,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -78,6 +79,10 @@ public final class PipelineDataSourceSink implements
PipelineSink {
if (dataRecords.isEmpty()) {
return new PipelineJobUpdateProgress(0);
}
+ if (dataRecords.iterator().next().getUniqueKeyValue().isEmpty()) {
+ sequentialWrite(dataRecords);
+ return new PipelineJobUpdateProgress(dataRecords.size());
+ }
for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
batchWrite(each.getDeleteDataRecords());
batchWrite(each.getInsertDataRecords());
@@ -86,6 +91,17 @@ public final class PipelineDataSourceSink implements
PipelineSink {
return new PipelineJobUpdateProgress((int)
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT ==
each.getType()).count());
}
+ private void sequentialWrite(final List<DataRecord> buffer) {
+ // TODO It's better to use transaction, but delete operation may not
take effect on PostgreSQL sometimes
+ try {
+ for (DataRecord each : buffer) {
+ doWrite(Collections.singletonList(each), true);
+ }
+ } catch (final SQLException ex) {
+ throw new PipelineImporterJobWriteException(ex);
+ }
+ }
+
@SuppressWarnings("BusyWait")
@SneakyThrows(InterruptedException.class)
private void batchWrite(final Collection<DataRecord> records) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index aae1e972858..b8f9c6dacd0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -72,13 +72,16 @@ public final class DataRecordGroupEngine {
/**
* Group by table and type.
*
- * @param dataRecords data records
+ * @param dataRecords data records, related table must have primary key or
unique key
* @return grouped data records
+ * @throws IllegalArgumentException if related table has no primary key or
unique key
*/
public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
+ DataRecord firstDataRecord = dataRecords.get(0);
+
ShardingSpherePreconditions.checkState(!firstDataRecord.getUniqueKeyValue().isEmpty(),
+ () -> new
IllegalArgumentException(firstDataRecord.getTableName() + " must have primary
key or unique key"));
List<GroupedDataRecord> result = new ArrayList<>(100);
- List<DataRecord> mergedDataRecords =
dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords :
merge(dataRecords);
- Map<String, List<DataRecord>> tableGroup =
mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+ Map<String, List<DataRecord>> tableGroup =
merge(dataRecords).stream().collect(Collectors.groupingBy(DataRecord::getTableName));
for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
Map<PipelineSQLOperationType, List<DataRecord>> typeGroup =
entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
result.add(new GroupedDataRecord(entry.getKey(),
typeGroup.getOrDefault(PipelineSQLOperationType.INSERT,
Collections.emptyList()),
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index fc4bf4bc340..6dddb5bd513 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -216,7 +217,15 @@ class DataRecordGroupEngineTest {
}
@Test
- void assertGroup() {
+ void assertGroupOnTableHasNoUniqueKey() {
+ DataRecord dataRecord = new
DataRecord(PipelineSQLOperationType.INSERT, "order", new
IngestPlaceholderPosition(), 3);
+ dataRecord.setActualTableName("order_0");
+ List<DataRecord> dataRecords = Collections.singletonList(dataRecord);
+ assertThrows(IllegalArgumentException.class, () ->
groupEngine.group(dataRecords));
+ }
+
+ @Test
+ void assertGroupOnTableHasUniqueKey() {
List<DataRecord> dataRecords = mockDataRecords();
List<GroupedDataRecord> groupedDataRecords =
groupEngine.group(dataRecords);
assertThat(groupedDataRecords.size(), is(2));
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 1d2314aa29e..029b97261e8 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -189,7 +189,8 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
CreateIncrementalDumperParameter param = new
CreateIncrementalDumperParameter(
dumperContext, dumperContext.getCommonContext().getPosition(),
channel, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager());
Dumper dumper = IncrementalDumperCreator.create(param);
- Collection<Importer> importers = Collections.singletonList(new
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(),
jobItemContext));
+ Collection<Importer> importers = Collections.singletonList(new
SingleChannelConsumerImporter(channel,
taskConfig.getImporterConfig().getBatchSize(), 1000L,
+ jobItemContext.getSink(), jobItemContext));
PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index fa9bc36df4b..50afba8399e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -96,7 +96,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
// TODO PostgreSQL update delete events not support if table
without unique keys at increment task.
final Consumer<DataSource> incrementalTaskFn = dataSource -> {
if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
- doCreateUpdateDelete(containerComposer,
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class),
1).iterator().next());
+ doCreateUpdateDelete(containerComposer, "a1");
}
Object orderId =
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class),
1).iterator().next();
insertOneOrder(containerComposer, orderId);