This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 62cf7a3ccc9 Rename PipelineJobUpdateProgress (#32775)
62cf7a3ccc9 is described below

commit 62cf7a3ccc9c0d27e2891540dcfac60db47f4dcc
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 22:07:17 2024 +0800

    Rename PipelineJobUpdateProgress (#32775)
---
 .../ConsistencyCheckJobItemProgressContext.java                |  6 +++---
 .../consistencycheck/table/MatchingTableInventoryChecker.java  |  4 ++--
 .../pipeline/core/importer/SingleChannelConsumerImporter.java  |  6 +++---
 .../data/pipeline/core/importer/sink/PipelineSink.java         |  4 ++--
 .../core/importer/sink/type/PipelineDataSourceSink.java        |  8 ++++----
 .../job/progress/listener/PipelineJobProgressListener.java     |  6 +++---
 ...essUpdatedParameter.java => PipelineJobUpdateProgress.java} |  4 ++--
 .../data/pipeline/cdc/context/CDCJobItemContext.java           |  6 +++---
 .../data/pipeline/cdc/core/importer/CDCImporter.java           |  8 ++++----
 .../pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java | 10 +++++-----
 .../cdc/core/importer/sink/PipelineCDCSocketSinkTest.java      |  4 ++--
 .../check/consistency/MigrationDataConsistencyChecker.java     |  4 ++--
 .../scenario/migration/context/MigrationJobItemContext.java    |  6 +++---
 .../fixture/algorithm/FixtureTransmissionJobItemContext.java   |  4 ++--
 14 files changed, 40 insertions(+), 40 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index dba540c9f70..0a986c4768a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -21,7 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 import java.util.Collection;
@@ -61,8 +61,8 @@ public final class ConsistencyCheckJobItemProgressContext 
implements PipelineJob
     private final String sourceDatabaseType;
     
     @Override
-    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
-        checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+    public void onProgressUpdated(final PipelineJobUpdateProgress 
updateProgress) {
+        
checkedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
         PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index ddf99a8eea1..9a98e67a92f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -113,7 +113,7 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            param.getProgressContext().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+            param.getProgressContext().onProgressUpdated(new 
PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
         }
         if (sourceCalculatedResults.hasNext()) {
             checkResult.setMatched(false);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index 09e74e55d80..b2249bc377f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.List;
@@ -52,9 +52,9 @@ public final class SingleChannelConsumerImporter extends 
AbstractPipelineLifecyc
             if (records.isEmpty()) {
                 continue;
             }
-            PipelineJobProgressUpdatedParameter updatedParam = sink.write("", 
records);
+            PipelineJobUpdateProgress updateProgress = sink.write("", records);
             channel.ack(records);
-            jobProgressListener.onProgressUpdated(updatedParam);
+            jobProgressListener.onProgressUpdated(updateProgress);
             if (FinishedRecord.class.equals(records.get(records.size() - 
1).getClass())) {
                 break;
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
index 9f356cf35b5..9704abffe9b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.importer.sink;
 
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 
 import java.io.Closeable;
 import java.util.Collection;
@@ -35,5 +35,5 @@ public interface PipelineSink extends Closeable {
      * @param records records
      * @return job progress updated parameter
      */
-    PipelineJobProgressUpdatedParameter write(String ackId, Collection<Record> 
records);
+    PipelineJobUpdateProgress write(String ackId, Collection<Record> records);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index d6e180dfb06..d9f1d81f0ab 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -30,7 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
@@ -73,17 +73,17 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     }
     
     @Override
-    public PipelineJobProgressUpdatedParameter write(final String ackId, final 
Collection<Record> records) {
+    public PipelineJobUpdateProgress write(final String ackId, final 
Collection<Record> records) {
         List<DataRecord> dataRecords = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
         if (dataRecords.isEmpty()) {
-            return new PipelineJobProgressUpdatedParameter(0);
+            return new PipelineJobUpdateProgress(0);
         }
         for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
             batchWrite(each.getDeleteDataRecords());
             batchWrite(each.getInsertDataRecords());
             batchWrite(each.getUpdateDataRecords());
         }
-        return new PipelineJobProgressUpdatedParameter((int) 
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == 
each.getType()).count());
+        return new PipelineJobUpdateProgress((int) 
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == 
each.getType()).count());
     }
     
     @SuppressWarnings("BusyWait")
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
index cc41810ab2a..0d4e4348696 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
@@ -23,9 +23,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
 public interface PipelineJobProgressListener {
     
     /**
-     * Emit on progress updated.
+     * Emit on pipeline job progress updated.
      *
-     * @param param process update parameter
+     * @param updateProgress pipeline job update process
      */
-    void onProgressUpdated(PipelineJobProgressUpdatedParameter param);
+    void onProgressUpdated(PipelineJobUpdateProgress updateProgress);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java
similarity index 91%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java
index 79d36967e74..f1a802a19cd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java
@@ -21,11 +21,11 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 /**
- * Pipeline job process update parameter.
+ * Pipeline job update progress.
  */
 @RequiredArgsConstructor
 @Getter
-public final class PipelineJobProgressUpdatedParameter {
+public final class PipelineJobUpdateProgress {
     
     private final int processedRecordsCount;
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index f1d189aa5af..ecca5660a2e 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -30,7 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
@@ -117,8 +117,8 @@ public final class CDCJobItemContext implements 
TransmissionJobItemContext {
     }
     
     @Override
-    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
-        processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+    public void onProgressUpdated(final PipelineJobUpdateProgress 
updateProgress) {
+        
processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
         PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(), 
shardingItem);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 2357fb41384..c8640dd71ab 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -35,7 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 
 import java.util.ArrayList;
@@ -77,7 +77,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     protected void runBlocking() {
         CDCImporterManager.putImporter(this);
         for (CDCChannelProgressPair each : channelProgressPairs) {
-            each.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
+            each.getJobProgressListener().onProgressUpdated(new 
PipelineJobUpdateProgress(0));
         }
         while (isRunning()) {
             if (needSorting) {
@@ -223,7 +223,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         Record lastRecord = records.get(records.size() - 1);
         if (records.stream().noneMatch(DataRecord.class::isInstance)) {
             channel.ack(records);
-            channelProgressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
+            channelProgressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobUpdateProgress(0));
             if (lastRecord instanceof FinishedRecord) {
                 channelProgressPairs.remove(channelProgressPair);
             }
@@ -255,7 +255,7 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
             if (lastRecord instanceof FinishedRecord) {
                 channelProgressPairs.remove(each.getKey());
             }
-            each.getLeft().getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+            each.getLeft().getJobProgressListener().onProgressUpdated(new 
PipelineJobUpdateProgress(ackPosition.getDataRecordCount()));
         }
         ackCache.invalidate(ackId);
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
index c175e38812a..700a23fed4b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertU
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 
@@ -70,20 +70,20 @@ public final class PipelineCDCSocketSink implements 
PipelineSink {
     }
     
     @Override
-    public PipelineJobProgressUpdatedParameter write(final String ackId, final 
Collection<Record> records) {
+    public PipelineJobUpdateProgress write(final String ackId, final 
Collection<Record> records) {
         if (records.isEmpty()) {
-            return new PipelineJobProgressUpdatedParameter(0);
+            return new PipelineJobUpdateProgress(0);
         }
         while (!channel.isWritable() && channel.isActive()) {
             doAwait();
         }
         if (!channel.isActive()) {
-            return new PipelineJobProgressUpdatedParameter(0);
+            return new PipelineJobUpdateProgress(0);
         }
         Collection<DataRecordResult.Record> resultRecords = 
getResultRecords(records);
         DataRecordResult dataRecordResult = 
DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
         channel.writeAndFlush(CDCResponseUtils.succeed("", 
ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
-        return new PipelineJobProgressUpdatedParameter(resultRecords.size());
+        return new PipelineJobUpdateProgress(resultRecords.size());
     }
     
     @SneakyThrows(InterruptedException.class)
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
index b52b3494ea2..414c2adfa8e 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.junit.jupiter.api.Test;
 
@@ -44,7 +44,7 @@ class PipelineCDCSocketSinkTest {
         ShardingSphereDatabase mockDatabase = 
mock(ShardingSphereDatabase.class);
         when(mockDatabase.getName()).thenReturn("test");
         try (PipelineCDCSocketSink sink = new 
PipelineCDCSocketSink(mockChannel, mockDatabase, 
Collections.singletonList("test.t_order"))) {
-            PipelineJobProgressUpdatedParameter actual = sink.write("ack", 
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
+            PipelineJobUpdateProgress actual = sink.write("ack", 
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
             assertThat(actual.getProcessedRecordsCount(), is(0));
             actual = sink.write("ack", Collections.singletonList(new 
DataRecord(PipelineSQLOperationType.DELETE, "t_order", new 
IngestPlaceholderPosition(), 1)));
             assertThat(actual.getProcessedRecordsCount(), is(1));
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 1f5d8137572..0b3d3aa86a3 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -33,7 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
@@ -89,7 +89,7 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
                 .forEach(dataNode -> 
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
         progressContext.setRecordsCount(getRecordsCount());
         progressContext.getTableNames().addAll(sourceTableNames);
-        progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
+        progressContext.onProgressUpdated(new PipelineJobUpdateProgress(0));
         Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> 
result = new LinkedHashMap<>();
         try (
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index f2d5c695e72..5cfbc0193eb 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -28,7 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
@@ -140,8 +140,8 @@ public final class MigrationJobItemContext implements 
TransmissionJobItemContext
     }
     
     @Override
-    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
-        processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+    public void onProgressUpdated(final PipelineJobUpdateProgress 
updateProgress) {
+        
processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
         PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
index bea381310b3..654f45291d9 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemC
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 
@@ -32,7 +32,7 @@ import java.util.Collection;
 public final class FixtureTransmissionJobItemContext implements 
TransmissionJobItemContext {
     
     @Override
-    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
+    public void onProgressUpdated(final PipelineJobUpdateProgress 
updateProgress) {
     }
     
     @Override

Reply via email to