This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 62cf7a3ccc9 Rename PipelineJobUpdateProgress (#32775)
62cf7a3ccc9 is described below
commit 62cf7a3ccc9c0d27e2891540dcfac60db47f4dcc
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 22:07:17 2024 +0800
Rename PipelineJobUpdateProgress (#32775)
---
.../ConsistencyCheckJobItemProgressContext.java | 6 +++---
.../consistencycheck/table/MatchingTableInventoryChecker.java | 4 ++--
.../pipeline/core/importer/SingleChannelConsumerImporter.java | 6 +++---
.../data/pipeline/core/importer/sink/PipelineSink.java | 4 ++--
.../core/importer/sink/type/PipelineDataSourceSink.java | 8 ++++----
.../job/progress/listener/PipelineJobProgressListener.java | 6 +++---
...essUpdatedParameter.java => PipelineJobUpdateProgress.java} | 4 ++--
.../data/pipeline/cdc/context/CDCJobItemContext.java | 6 +++---
.../data/pipeline/cdc/core/importer/CDCImporter.java | 8 ++++----
.../pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java | 10 +++++-----
.../cdc/core/importer/sink/PipelineCDCSocketSinkTest.java | 4 ++--
.../check/consistency/MigrationDataConsistencyChecker.java | 4 ++--
.../scenario/migration/context/MigrationJobItemContext.java | 6 +++---
.../fixture/algorithm/FixtureTransmissionJobItemContext.java | 4 ++--
14 files changed, 40 insertions(+), 40 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index dba540c9f70..0a986c4768a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -21,7 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import java.util.Collection;
@@ -61,8 +61,8 @@ public final class ConsistencyCheckJobItemProgressContext
implements PipelineJob
private final String sourceDatabaseType;
@Override
- public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
- checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+ public void onProgressUpdated(final PipelineJobUpdateProgress
updateProgress) {
+
checkedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index ddf99a8eea1..9a98e67a92f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -113,7 +113,7 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
}
- param.getProgressContext().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ param.getProgressContext().onProgressUpdated(new
PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
}
if (sourceCalculatedResults.hasNext()) {
checkResult.setMatched(false);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index 09e74e55d80..b2249bc377f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.List;
@@ -52,9 +52,9 @@ public final class SingleChannelConsumerImporter extends
AbstractPipelineLifecyc
if (records.isEmpty()) {
continue;
}
- PipelineJobProgressUpdatedParameter updatedParam = sink.write("",
records);
+ PipelineJobUpdateProgress updateProgress = sink.write("", records);
channel.ack(records);
- jobProgressListener.onProgressUpdated(updatedParam);
+ jobProgressListener.onProgressUpdated(updateProgress);
if (FinishedRecord.class.equals(records.get(records.size() -
1).getClass())) {
break;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
index 9f356cf35b5..9704abffe9b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineSink.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.importer.sink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import java.io.Closeable;
import java.util.Collection;
@@ -35,5 +35,5 @@ public interface PipelineSink extends Closeable {
* @param records records
* @return job progress updated parameter
*/
- PipelineJobProgressUpdatedParameter write(String ackId, Collection<Record>
records);
+ PipelineJobUpdateProgress write(String ackId, Collection<Record> records);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index d6e180dfb06..d9f1d81f0ab 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
@@ -73,17 +73,17 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
@Override
- public PipelineJobProgressUpdatedParameter write(final String ackId, final
Collection<Record> records) {
+ public PipelineJobUpdateProgress write(final String ackId, final
Collection<Record> records) {
List<DataRecord> dataRecords =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
if (dataRecords.isEmpty()) {
- return new PipelineJobProgressUpdatedParameter(0);
+ return new PipelineJobUpdateProgress(0);
}
for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
batchWrite(each.getDeleteDataRecords());
batchWrite(each.getInsertDataRecords());
batchWrite(each.getUpdateDataRecords());
}
- return new PipelineJobProgressUpdatedParameter((int)
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT ==
each.getType()).count());
+ return new PipelineJobUpdateProgress((int)
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT ==
each.getType()).count());
}
@SuppressWarnings("BusyWait")
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
index cc41810ab2a..0d4e4348696 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressListener.java
@@ -23,9 +23,9 @@ package
org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
public interface PipelineJobProgressListener {
/**
- * Emit on progress updated.
+ * Emit on pipeline job progress updated.
*
- * @param param process update parameter
+ * @param updateProgress pipeline job update process
*/
- void onProgressUpdated(PipelineJobProgressUpdatedParameter param);
+ void onProgressUpdated(PipelineJobUpdateProgress updateProgress);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java
similarity index 91%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java
index 79d36967e74..f1a802a19cd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobProgressUpdatedParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/PipelineJobUpdateProgress.java
@@ -21,11 +21,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * Pipeline job process update parameter.
+ * Pipeline job update progress.
*/
@RequiredArgsConstructor
@Getter
-public final class PipelineJobProgressUpdatedParameter {
+public final class PipelineJobUpdateProgress {
private final int processedRecordsCount;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index f1d189aa5af..ecca5660a2e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
@@ -117,8 +117,8 @@ public final class CDCJobItemContext implements
TransmissionJobItemContext {
}
@Override
- public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
- processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+ public void onProgressUpdated(final PipelineJobUpdateProgress
updateProgress) {
+
processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(),
shardingItem);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 2357fb41384..c8640dd71ab 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -35,7 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import java.util.ArrayList;
@@ -77,7 +77,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
protected void runBlocking() {
CDCImporterManager.putImporter(this);
for (CDCChannelProgressPair each : channelProgressPairs) {
- each.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ each.getJobProgressListener().onProgressUpdated(new
PipelineJobUpdateProgress(0));
}
while (isRunning()) {
if (needSorting) {
@@ -223,7 +223,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
Record lastRecord = records.get(records.size() - 1);
if (records.stream().noneMatch(DataRecord.class::isInstance)) {
channel.ack(records);
- channelProgressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ channelProgressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobUpdateProgress(0));
if (lastRecord instanceof FinishedRecord) {
channelProgressPairs.remove(channelProgressPair);
}
@@ -255,7 +255,7 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
if (lastRecord instanceof FinishedRecord) {
channelProgressPairs.remove(each.getKey());
}
- each.getLeft().getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+ each.getLeft().getJobProgressListener().onProgressUpdated(new
PipelineJobUpdateProgress(ackPosition.getDataRecordCount()));
}
ackCache.invalidate(ackId);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
index c175e38812a..700a23fed4b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertU
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -70,20 +70,20 @@ public final class PipelineCDCSocketSink implements
PipelineSink {
}
@Override
- public PipelineJobProgressUpdatedParameter write(final String ackId, final
Collection<Record> records) {
+ public PipelineJobUpdateProgress write(final String ackId, final
Collection<Record> records) {
if (records.isEmpty()) {
- return new PipelineJobProgressUpdatedParameter(0);
+ return new PipelineJobUpdateProgress(0);
}
while (!channel.isWritable() && channel.isActive()) {
doAwait();
}
if (!channel.isActive()) {
- return new PipelineJobProgressUpdatedParameter(0);
+ return new PipelineJobUpdateProgress(0);
}
Collection<DataRecordResult.Record> resultRecords =
getResultRecords(records);
DataRecordResult dataRecordResult =
DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
channel.writeAndFlush(CDCResponseUtils.succeed("",
ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
- return new PipelineJobProgressUpdatedParameter(resultRecords.size());
+ return new PipelineJobUpdateProgress(resultRecords.size());
}
@SneakyThrows(InterruptedException.class)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
index b52b3494ea2..414c2adfa8e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.junit.jupiter.api.Test;
@@ -44,7 +44,7 @@ class PipelineCDCSocketSinkTest {
ShardingSphereDatabase mockDatabase =
mock(ShardingSphereDatabase.class);
when(mockDatabase.getName()).thenReturn("test");
try (PipelineCDCSocketSink sink = new
PipelineCDCSocketSink(mockChannel, mockDatabase,
Collections.singletonList("test.t_order"))) {
- PipelineJobProgressUpdatedParameter actual = sink.write("ack",
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
+ PipelineJobUpdateProgress actual = sink.write("ack",
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
assertThat(actual.getProcessedRecordsCount(), is(0));
actual = sink.write("ack", Collections.singletonList(new
DataRecord(PipelineSQLOperationType.DELETE, "t_order", new
IngestPlaceholderPosition(), 1)));
assertThat(actual.getProcessedRecordsCount(), is(1));
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 1f5d8137572..0b3d3aa86a3 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -33,7 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
@@ -89,7 +89,7 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
.forEach(dataNode ->
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
progressContext.setRecordsCount(getRecordsCount());
progressContext.getTableNames().addAll(sourceTableNames);
- progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ progressContext.onProgressUpdated(new PipelineJobUpdateProgress(0));
Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult>
result = new LinkedHashMap<>();
try (
PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index f2d5c695e72..5cfbc0193eb 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
@@ -140,8 +140,8 @@ public final class MigrationJobItemContext implements
TransmissionJobItemContext
}
@Override
- public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
- processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+ public void onProgressUpdated(final PipelineJobUpdateProgress
updateProgress) {
+
processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
index bea381310b3..654f45291d9 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/algorithm/FixtureTransmissionJobItemContext.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemC
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -32,7 +32,7 @@ import java.util.Collection;
public final class FixtureTransmissionJobItemContext implements
TransmissionJobItemContext {
@Override
- public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
+ public void onProgressUpdated(final PipelineJobUpdateProgress
updateProgress) {
}
@Override