This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f107c752cd Improve CDC pure incremental mode (#24080)
1f107c752cd is described below

commit 1f107c752cdc6988a4cd72fd9610d29937a61441
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Feb 10 17:29:12 2023 +0800

    Improve CDC pure incremental mode (#24080)
    
    * Improve pure incremental mode
    
    * Fix codestyle
---
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 79 ++++++++++++++++------
 .../cdc/context/job/CDCJobItemContext.java         | 16 ++++-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  | 16 +++--
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  5 +-
 4 files changed, 88 insertions(+), 28 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 5708a485a4d..bee9d5c08f6 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -30,15 +30,20 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJ
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
@@ -46,6 +51,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfigurat
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -54,8 +60,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIn
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtil;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
@@ -97,21 +106,21 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     /**
      * Create CDC job config.
      *
-     * @param event create CDC job event
-     * @return job id
+     * @param param create CDC job param
+     * @return true if job is not exist, otherwise false
      */
-    public boolean createJob(final CreateSubscriptionJobParameter event) {
+    public boolean createJob(final CreateSubscriptionJobParameter param) {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
-        yamlJobConfig.setDatabase(event.getDatabase());
-        yamlJobConfig.setTableNames(event.getSubscribeTableNames());
-        yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
-        yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
-        yamlJobConfig.setDecodeWithTX(event.isDecodeWithTX());
-        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
+        yamlJobConfig.setDatabase(param.getDatabase());
+        yamlJobConfig.setTableNames(param.getSubscribeTableNames());
+        yamlJobConfig.setSubscriptionName(param.getSubscriptionName());
+        yamlJobConfig.setSubscriptionMode(param.getSubscriptionMode());
+        yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
         
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
-        List<JobDataNodeLine> jobDataNodeLines = 
JobDataNodeLineConvertUtil.convertDataNodesToLines(event.getDataNodesMap());
+        List<JobDataNodeLine> jobDataNodeLines = 
JobDataNodeLineConvertUtil.convertDataNodesToLines(param.getDataNodesMap());
         
yamlJobConfig.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        JobDataNodeLine tableFirstDataNodes = new 
JobDataNodeLine(event.getDataNodesMap().entrySet().stream().map(each -> new 
JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1)))
+        JobDataNodeLine tableFirstDataNodes = new 
JobDataNodeLine(param.getDataNodesMap().entrySet().stream().map(each -> new 
JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1)))
                 .collect(Collectors.toList()));
         yamlJobConfig.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
         extendYamlJobConfiguration(yamlJobConfig);
@@ -127,9 +136,38 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
         jobConfigPOJO.setDisabled(true);
         repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
+        if 
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+            initIncrementalPosition(jobConfig);
+        }
         return true;
     }
     
+    private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+        String jobId = jobConfig.getJobId();
+        try {
+            for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
+                if (getJobItemProgress(jobId, i).isPresent()) {
+                    continue;
+                }
+                TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+                DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
+                InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
+                
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+                
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
+                IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress();
+                
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null,
 dumperConfig, dataSourceManager));
+                jobItemProgress.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+                jobItemProgress.setStatus(JobStatus.PREPARE_SUCCESS);
+                
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
i, 
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+            }
+        } catch (final SQLException ex) {
+            throw new 
PrepareJobWithGetBinlogPositionException(jobConfig.getJobId(), ex);
+        } finally {
+            dataSourceManager.close();
+        }
+    }
+    
     private ShardingSpherePipelineDataSourceConfiguration 
getDataSourceConfiguration(final ShardingSphereDatabase database) {
         Map<String, Map<String, Object>> dataSourceProps = new HashMap<>();
         for (Entry<String, DataSource> entry : 
database.getResourceMetaData().getDataSources().entrySet()) {
@@ -171,13 +209,8 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     @Override
     public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
-        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
-        Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
-        dataNodeLine.getEntries().forEach(each -> 
each.getDataNodes().forEach(node -> tableNameMap.put(new 
ActualTableName(node.getTableName()), new 
LogicTableName(each.getLogicTableName()))));
         TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getTableNames());
-        String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
-        StandardPipelineDataSourceConfiguration actualDataSourceConfiguration 
= 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, 
dataSourceName, actualDataSourceConfiguration, tableNameMap, 
tableNameSchemaNameMapping);
+        DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, 
jobShardingItem, tableNameSchemaNameMapping);
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
jobConfig.getTableNames(), tableNameSchemaNameMapping);
         CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig, 
importerConfig);
         log.debug("buildTaskConfiguration, result={}", result);
@@ -195,12 +228,16 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         return new TableNameSchemaNameMapping(tableNameSchemaMap);
     }
     
-    private static DumperConfiguration buildDumperConfiguration(final 
CDCJobConfiguration jobConfig, final String dataSourceName, final 
PipelineDataSourceConfiguration sourceDataSourceConfig,
-                                                                final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+    private static DumperConfiguration buildDumperConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+        Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
+        dataNodeLine.getEntries().forEach(each -> 
each.getDataNodes().forEach(node -> tableNameMap.put(new 
ActualTableName(node.getTableName()), new 
LogicTableName(each.getLogicTableName()))));
+        String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+        StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
         DumperConfiguration result = new DumperConfiguration();
         result.setJobId(jobConfig.getJobId());
         result.setDataSourceName(dataSourceName);
-        result.setDataSourceConfig(sourceDataSourceConfig);
+        result.setDataSourceConfig(actualDataSourceConfig);
         result.setTableNameMap(tableNameMap);
         result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
         result.setDecodeWithTX(jobConfig.isDecodeWithTX());
@@ -252,7 +289,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public void commit(final String jobId) {
-        // TODO to be implemented
+        throw new UnsupportedOperationException();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index 8d63276264e..f59fd9eb76b 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.cdc.context.job;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
@@ -46,7 +45,6 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * CDC job item context.
  */
-@RequiredArgsConstructor
 @Getter
 public final class CDCJobItemContext implements 
InventoryIncrementalJobItemContext {
     
@@ -94,6 +92,20 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
         }
     };
     
+    public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int 
shardingItem, final InventoryIncrementalJobItemProgress initProgress, final 
CDCProcessContext jobProcessContext,
+                             final CDCTaskConfiguration taskConfig, final 
PipelineDataSourceManager dataSourceManager, final ImporterConnector 
importerConnector) {
+        this.jobConfig = jobConfig;
+        this.shardingItem = shardingItem;
+        this.initProgress = initProgress;
+        this.jobProcessContext = jobProcessContext;
+        this.taskConfig = taskConfig;
+        this.dataSourceManager = dataSourceManager;
+        this.importerConnector = importerConnector;
+        if (null != initProgress) {
+            status = initProgress.getStatus();
+        }
+    }
+    
     @Override
     public String getJobId() {
         return jobConfig.getJobId();
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 334572c1542..cbcee655918 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
@@ -38,6 +39,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
 
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * CDC job preparer.
@@ -53,21 +55,27 @@ public final class CDCJobPreparer {
      * @param jobItemContext job item context
      */
     public void prepare(final CDCJobItemContext jobItemContext) {
-        if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem()).isPresent()) {
+        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        if (!jobItemProgress.isPresent()) {
             jobAPI.persistJobItemProgress(jobItemContext);
         }
         if (jobItemContext.isStopping()) {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
-        updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
+        boolean needUpdateJobStatus = !jobItemProgress.isPresent() || 
JobStatus.PREPARING.equals(jobItemContext.getStatus()) || 
JobStatus.RUNNING.equals(jobItemContext.getStatus())
+                || 
JobStatus.PREPARING_FAILURE.equals(jobItemContext.getStatus());
+        if (needUpdateJobStatus) {
+            updateJobItemStatus(JobStatus.PREPARING, jobItemContext);
+        }
         initIncrementalTasks(jobItemContext);
         CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
         if 
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
             initInventoryTasks(jobItemContext);
         }
-        jobAPI.persistJobItemProgress(jobItemContext);
-        updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);
+        if (needUpdateJobStatus) {
+            updateJobItemStatus(JobStatus.PREPARE_SUCCESS, jobItemContext);
+        }
     }
     
     private void updateJobItemStatus(final JobStatus jobStatus, final 
CDCJobItemContext jobItemContext) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 24c05731da1..c95f79f5357 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
+import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
@@ -43,10 +45,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
@@ -70,6 +72,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
+    @Getter(AccessLevel.PROTECTED)
     private final YamlInventoryIncrementalJobItemProgressSwapper 
jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
     
     protected abstract String getTargetDatabaseType(PipelineJobConfiguration 
pipelineJobConfig);

Reply via email to