This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a51323f7d95 Refactor usage of InventoryDumperContextSplitter (#32592)
a51323f7d95 is described below

commit a51323f7d959912012c640779b7fcc2a823eb928
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 19 01:29:27 2024 +0800

    Refactor usage of InventoryDumperContextSplitter (#32592)
---
 .../core/preparer/inventory/splitter/InventoryTaskSplitter.java      | 3 ++-
 .../data/pipeline/cdc/core/prepare/CDCJobPreparer.java               | 5 +++--
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index 05ec09a93da..e3db30855f4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -61,7 +61,8 @@ public final class InventoryTaskSplitter {
         List<InventoryTask> result = new LinkedList<>();
         long startTimeMillis = System.currentTimeMillis();
         TransmissionProcessContext processContext = 
jobItemContext.getJobProcessContext();
-        for (InventoryDumperContext each : new 
InventoryDumperContextSplitter(sourceDataSource, 
dumperContext).split(jobItemContext)) {
+        InventoryDumperContextSplitter dumperContextSplitter = new 
InventoryDumperContextSplitter(sourceDataSource, dumperContext);
+        for (InventoryDumperContext each : 
dumperContextSplitter.split(jobItemContext)) {
             AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
 importerConfig.getBatchSize(), position);
             Dumper dumper = new InventoryDumper(each, channel, 
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index d36d59bf030..dec0fd192d2 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -117,8 +117,9 @@ public final class CDCJobPreparer {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
         TransmissionProcessContext processContext = 
jobItemContext.getJobProcessContext();
-        for (InventoryDumperContext each : new 
InventoryDumperContextSplitter(jobItemContext.getSourceDataSource(), new 
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()))
-                .split(jobItemContext)) {
+        InventoryDumperContextSplitter dumperContextSplitter = new 
InventoryDumperContextSplitter(
+                jobItemContext.getSourceDataSource(), new 
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()));
+        for (InventoryDumperContext each : 
dumperContextSplitter.split(jobItemContext)) {
             AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
 importerConfig.getBatchSize(), position);
             if (!(position.get() instanceof IngestFinishedPosition)) {

Reply via email to