This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7a2b92042a0 Refactor InventoryDumperContextSplitter (#32596)
7a2b92042a0 is described below
commit 7a2b92042a0b62acdc66ba3ef30ed4547609484f
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 19 21:21:21 2024 +0800
Refactor InventoryDumperContextSplitter (#32596)
* Refactor PipelineTaskUtils.generateInventoryTaskId
* Refactor PipelineTaskUtils.generateInventoryTaskId
* Refactor PipelineTaskUtils.generateInventoryTaskId
* Refactor PipelineTaskUtils.generateInventoryTaskId
* Refactor InventoryTask
* Fix javadoc of InventoryDumperContextSplitter
* Refactor InventoryDumperContextSplitter
* Refactor InventoryDumperContextSplitter
---
.../splitter/InventoryDumperContextSplitter.java | 84 +++++++++++-----------
.../inventory/splitter/InventoryTaskSplitter.java | 2 +-
.../data/pipeline/core/task/InventoryTask.java | 2 +-
.../data/pipeline/core/task/PipelineTaskUtils.java | 11 ++-
.../pipeline/core/task/PipelineTaskUtilsTest.java | 36 ++++++++++
5 files changed, 86 insertions(+), 49 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 4ccdfd82b60..6a68d80d70e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Range;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -31,7 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
@@ -39,6 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgori
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -52,10 +51,9 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * Inventory task splitter.
+ * Inventory dumper context splitter.
*/
@RequiredArgsConstructor
-@Slf4j
public final class InventoryDumperContextSplitter {
private final PipelineDataSourceWrapper sourceDataSource;
@@ -69,56 +67,46 @@ public final class InventoryDumperContextSplitter {
* @return inventory dumper contexts
*/
public Collection<InventoryDumperContext> split(final
TransmissionJobItemContext jobItemContext) {
- return splitByTable(dumperContext).stream().flatMap(each ->
splitByPrimaryKey(each, jobItemContext,
sourceDataSource).stream()).collect(Collectors.toList());
+ return splitByTable().stream().flatMap(each -> splitByPrimaryKey(each,
jobItemContext).stream()).collect(Collectors.toList());
}
- private Collection<InventoryDumperContext> splitByTable(final
InventoryDumperContext dumperContext) {
- Collection<InventoryDumperContext> result = new LinkedList<>();
-
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key,
value) -> {
- InventoryDumperContext inventoryDumperContext = new
InventoryDumperContext(dumperContext.getCommonContext());
- // use original table name, for metadata loader, since some
database table name case-sensitive
- inventoryDumperContext.setActualTableName(key.toString());
- inventoryDumperContext.setLogicTableName(value.toString());
- inventoryDumperContext.getCommonContext().setPosition(new
IngestPlaceholderPosition());
-
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
-
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
- result.add(inventoryDumperContext);
- });
+ private Collection<InventoryDumperContext> splitByTable() {
+ return
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().entrySet()
+ .stream().map(entry ->
createTableSpLitDumperContext(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ }
+
+ private InventoryDumperContext createTableSpLitDumperContext(final
CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier
logicTableName) {
+ InventoryDumperContext result = new
InventoryDumperContext(dumperContext.getCommonContext());
+ // use original table name, for metadata loader, since some database
table name case-sensitive
+ result.setActualTableName(actualTableName.toString());
+ result.setLogicTableName(logicTableName.toString());
+ result.getCommonContext().setPosition(new IngestPlaceholderPosition());
+ result.setInsertColumnNames(dumperContext.getInsertColumnNames());
+ result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
return result;
}
- private Collection<InventoryDumperContext> splitByPrimaryKey(final
InventoryDumperContext dumperContext, final TransmissionJobItemContext
jobItemContext,
- final
PipelineDataSourceWrapper dataSource) {
+ private Collection<InventoryDumperContext> splitByPrimaryKey(final
InventoryDumperContext dumperContext, final TransmissionJobItemContext
jobItemContext) {
if (null == dumperContext.getUniqueKeyColumns()) {
- String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
- String actualTableName = dumperContext.getActualTableName();
- List<PipelineColumnMetaData> uniqueKeyColumns =
PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName,
jobItemContext.getSourceMetaDataLoader());
- dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
+
dumperContext.setUniqueKeyColumns(getTableUniqueKeys(dumperContext,
jobItemContext));
}
Collection<InventoryDumperContext> result = new LinkedList<>();
TransmissionProcessContext jobProcessContext =
jobItemContext.getJobProcessContext();
- PipelineReadConfiguration readConfig =
jobProcessContext.getProcessConfiguration().getRead();
- int batchSize = readConfig.getBatchSize();
+ int batchSize =
jobProcessContext.getProcessConfiguration().getRead().getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm =
jobProcessContext.getReadRateLimitAlgorithm();
- Collection<IngestPosition> inventoryPositions =
getInventoryPositions(dumperContext, jobItemContext, dataSource);
int i = 0;
- for (IngestPosition each : inventoryPositions) {
- InventoryDumperContext splitDumperContext = new
InventoryDumperContext(dumperContext.getCommonContext());
- splitDumperContext.getCommonContext().setPosition(each);
- splitDumperContext.setShardingItem(i++);
-
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
-
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
-
splitDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
-
splitDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
- splitDumperContext.setBatchSize(batchSize);
- splitDumperContext.setRateLimitAlgorithm(rateLimitAlgorithm);
- result.add(splitDumperContext);
+ for (IngestPosition each : getInventoryPositions(dumperContext,
jobItemContext)) {
+ result.add(createPrimaryKeySplitDumperContext(dumperContext, each,
i++, batchSize, rateLimitAlgorithm));
}
return result;
}
- private Collection<IngestPosition> getInventoryPositions(final
InventoryDumperContext dumperContext, final TransmissionJobItemContext
jobItemContext,
- final
PipelineDataSourceWrapper dataSource) {
+ private List<PipelineColumnMetaData> getTableUniqueKeys(final
InventoryDumperContext dumperContext, final TransmissionJobItemContext
jobItemContext) {
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ return PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName,
dumperContext.getActualTableName(), jobItemContext.getSourceMetaDataLoader());
+ }
+
+ private Collection<IngestPosition> getInventoryPositions(final
InventoryDumperContext dumperContext, final TransmissionJobItemContext
jobItemContext) {
TransmissionJobItemProgress initProgress =
jobItemContext.getInitProgress();
if (null != initProgress) {
// Do NOT filter FinishedPosition here, since whole inventory
tasks are required in job progress when persisting to register center.
@@ -127,7 +115,7 @@ public final class InventoryDumperContextSplitter {
return result;
}
}
- long tableRecordsCount =
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
+ long tableRecordsCount =
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext,
sourceDataSource);
jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
if (!dumperContext.hasUniqueKey()) {
return Collections.singleton(new UnsupportedKeyIngestPosition());
@@ -136,7 +124,7 @@ public final class InventoryDumperContextSplitter {
if (1 == uniqueKeyColumns.size()) {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
- return getPositionByIntegerUniqueKeyRange(dumperContext,
tableRecordsCount, jobItemContext, dataSource);
+ return getPositionByIntegerUniqueKeyRange(dumperContext,
tableRecordsCount, jobItemContext, sourceDataSource);
}
if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
// TODO Support string unique key table splitting. Ascii
characters ordering are different in different versions of databases.
@@ -179,4 +167,18 @@ public final class InventoryDumperContextSplitter {
throw new
SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(),
uniqueKey, ex);
}
}
+
+ private InventoryDumperContext createPrimaryKeySplitDumperContext(final
InventoryDumperContext dumperContext, final IngestPosition position,
+ final
int shardingItem, final int batchSize, final JobRateLimitAlgorithm
rateLimitAlgorithm) {
+ InventoryDumperContext result = new
InventoryDumperContext(dumperContext.getCommonContext());
+ result.getCommonContext().setPosition(position);
+ result.setShardingItem(shardingItem);
+ result.setActualTableName(dumperContext.getActualTableName());
+ result.setLogicTableName(dumperContext.getLogicTableName());
+ result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
+ result.setInsertColumnNames(dumperContext.getInsertColumnNames());
+ result.setBatchSize(batchSize);
+ result.setRateLimitAlgorithm(rateLimitAlgorithm);
+ return result;
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index e3db30855f4..034081e5203 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -70,7 +70,7 @@ public final class InventoryTaskSplitter {
result.add(new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(), dumper, importer,
position));
}
- log.info("splitInventoryData cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ log.info("Split inventory tasks cost {} ms",
System.currentTimeMillis() - startTimeMillis);
return result;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 19d88aea7a9..afe20fc1e3d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
* Inventory task.
*/
@RequiredArgsConstructor
-@ToString(exclude = {"inventoryDumperExecuteEngine",
"inventoryImporterExecuteEngine", "dumper", "importer"})
+@ToString(of = {"taskId", "position"})
public final class InventoryTask implements PipelineTask {
@Getter
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index a29460f6fef..a4ae090d786 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -33,14 +33,13 @@ import java.util.Optional;
public final class PipelineTaskUtils {
/**
- * Generate inventory task id.
+ * Generate inventory task ID.
*
- * @param inventoryDumperContext inventory dumper context
- * @return inventory task id
+ * @param dumperContext inventory dumper context
+ * @return generated ID
*/
- public static String generateInventoryTaskId(final InventoryDumperContext
inventoryDumperContext) {
- String result = String.format("%s.%s",
inventoryDumperContext.getCommonContext().getDataSourceName(),
inventoryDumperContext.getActualTableName());
- return result + "#" + inventoryDumperContext.getShardingItem();
+ public static String generateInventoryTaskId(final InventoryDumperContext
dumperContext) {
+ return String.format("%s.%s#%s",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName(), dumperContext.getShardingItem());
}
/**
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
new file mode 100644
index 00000000000..903e99ec83a
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.task;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class PipelineTaskUtilsTest {
+
+ @Test
+ void assertGenerateInventoryTaskId() {
+ InventoryDumperContext dumperContext = new InventoryDumperContext(new
DumperCommonContext("foo_ds", null, null, null));
+ dumperContext.setActualTableName("foo_actual_tbl");
+ dumperContext.setShardingItem(1);
+ assertThat(PipelineTaskUtils.generateInventoryTaskId(dumperContext),
is("foo_ds.foo_actual_tbl#1"));
+ }
+}