This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a51323f7d95 Refactor usage of InventoryDumperContextSplitter (#32592)
a51323f7d95 is described below
commit a51323f7d959912012c640779b7fcc2a823eb928
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 19 01:29:27 2024 +0800
Refactor usage of InventoryDumperContextSplitter (#32592)
---
.../core/preparer/inventory/splitter/InventoryTaskSplitter.java | 3 ++-
.../data/pipeline/cdc/core/prepare/CDCJobPreparer.java | 5 +++--
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index 05ec09a93da..e3db30855f4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -61,7 +61,8 @@ public final class InventoryTaskSplitter {
List<InventoryTask> result = new LinkedList<>();
long startTimeMillis = System.currentTimeMillis();
TransmissionProcessContext processContext =
jobItemContext.getJobProcessContext();
- for (InventoryDumperContext each : new
InventoryDumperContextSplitter(sourceDataSource,
dumperContext).split(jobItemContext)) {
+ InventoryDumperContextSplitter dumperContextSplitter = new
InventoryDumperContextSplitter(sourceDataSource, dumperContext);
+ for (InventoryDumperContext each :
dumperContextSplitter.split(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel,
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index d36d59bf030..dec0fd192d2 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -117,8 +117,9 @@ public final class CDCJobPreparer {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
TransmissionProcessContext processContext =
jobItemContext.getJobProcessContext();
- for (InventoryDumperContext each : new
InventoryDumperContextSplitter(jobItemContext.getSourceDataSource(), new
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()))
- .split(jobItemContext)) {
+ InventoryDumperContextSplitter dumperContextSplitter = new
InventoryDumperContextSplitter(
+ jobItemContext.getSourceDataSource(), new
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()));
+ for (InventoryDumperContext each :
dumperContextSplitter.split(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
importerConfig.getBatchSize(), position);
if (!(position.get() instanceof IngestFinishedPosition)) {