This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 725b0647eaa Refactor CDCJobAPI (#32482)
725b0647eaa is described below
commit 725b0647eaa7a1f7edb66f4eeab224f0e7fd0e69
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 13 05:20:09 2024 +0800
Refactor CDCJobAPI (#32482)
* Refactor CDCJobAPI
* Refactor CDCJobAPI
---
.../data/pipeline/cdc/api/CDCJobAPI.java | 19 +++++++------------
1 file changed, 7 insertions(+), 12 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index c51301bac87..bb56972dcfe 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -201,8 +201,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- return new IncrementalDumperContext(
- new DumperCommonContext(dataSourceName,
actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
+ return new IncrementalDumperContext(new
DumperCommonContext(dataSourceName, actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}
@@ -281,19 +280,15 @@ public final class CDCJobAPI implements
TransmissionJobAPI {
* @param jobId job id
* @return job item infos
*/
- public List<CDCJobItemInfo> getJobItemInfos(final String jobId) {
+ public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
- Collection<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(jobType).getJobItemInfos(jobId);
- List<CDCJobItemInfo> result = new LinkedList<>();
- for (TransmissionJobItemInfo each : jobItemInfos) {
+ Collection<CDCJobItemInfo> result = new LinkedList<>();
+ for (TransmissionJobItemInfo each : new
TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
TransmissionJobItemProgress jobItemProgress =
each.getJobItemProgress();
- if (null == jobItemProgress) {
- result.add(new CDCJobItemInfo(each, "", ""));
- continue;
- }
- result.add(new CDCJobItemInfo(each,
jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse(""),
- getCurrentPosition(database,
jobItemProgress.getDataSourceName())));
+ String confirmedPosition = null == jobItemProgress ? "" :
jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse("");
+ String currentPosition = null == jobItemProgress ? "" :
getCurrentPosition(database, jobItemProgress.getDataSourceName());
+ result.add(new CDCJobItemInfo(each, confirmedPosition,
currentPosition));
}
return result;
}