This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3393f0ace86 Add generic type of AbstractSeparablePipelineJob (#29341)
3393f0ace86 is described below
commit 3393f0ace8615f909a80ea7ef0316052ef531cb7
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 18:48:09 2023 +0800
Add generic type of AbstractSeparablePipelineJob (#29341)
* Remove SimpleJob of CDCJob
* Refactor AbstractSeparablePipelineJob
* Refactor AbstractSeparablePipelineJob
* Refactor MigrationJob
* Refactor MigrationJob
---
.../core/job/AbstractSeparablePipelineJob.java | 30 +++++-----
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 3 +-
.../consistencycheck/ConsistencyCheckJob.java | 13 ++---
.../pipeline/scenario/migration/MigrationJob.java | 66 ++++++++++------------
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
5 files changed, 55 insertions(+), 59 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index cd668ca127f..ac5e44b1fe0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -31,25 +31,27 @@ import java.sql.SQLException;
/**
* Abstract separable pipeline job.
+ *
+ * @param <T> type of pipeline job item context
*/
@Slf4j
-public abstract class AbstractSeparablePipelineJob extends AbstractPipelineJob
{
+public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobItemContext> extends AbstractPipelineJob {
protected AbstractSeparablePipelineJob(final String jobId) {
super(jobId);
}
@Override
- public void execute(final ShardingContext shardingContext) {
+ public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
if (isStopping()) {
- log.info("stopping true, ignore");
+ log.info("Stopping true, ignore");
return;
}
try {
- execute(buildPipelineJobItemContext(shardingContext));
+ execute(buildJobItemContext(shardingContext));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -58,20 +60,24 @@ public abstract class AbstractSeparablePipelineJob extends
AbstractPipelineJob {
}
}
- private void execute(final PipelineJobItemContext jobItemContext) {
+ private void execute(final T jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
- PipelineTasksRunner tasksRunner =
buildPipelineTasksRunner(jobItemContext);
+ PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
prepare(jobItemContext);
- log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
+ log.info("Start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
}
- protected final void prepare(final PipelineJobItemContext jobItemContext) {
+ protected abstract T buildJobItemContext(ShardingContext shardingContext);
+
+ protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);
+
+ protected final void prepare(final T jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
@@ -81,14 +87,10 @@ public abstract class AbstractSeparablePipelineJob extends
AbstractPipelineJob {
}
}
- protected abstract void doPrepare(PipelineJobItemContext jobItemContext)
throws SQLException;
-
- protected abstract PipelineJobItemContext
buildPipelineJobItemContext(ShardingContext shardingContext);
-
- protected abstract PipelineTasksRunner
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+ protected abstract void doPrepare(T jobItemContext) throws SQLException;
private void processFailed(final PipelineJobManager jobManager, final
String jobId, final int shardingItem, final Exception ex) {
- log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
+ log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
try {
jobManager.stop(jobId);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 009ea5df9f6..a9854bfd848 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -58,7 +58,6 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConf
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
@@ -73,7 +72,7 @@ import java.util.stream.Collectors;
* CDC job.
*/
@Slf4j
-public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemContext> implements SimpleJob {
+public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemContext> {
@Getter
private final PipelineSink sink;
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 5d13dd54240..61aa3e775fc 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -18,10 +18,9 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -36,14 +35,14 @@ import java.util.Optional;
* Consistency check job.
*/
@Slf4j
-public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob {
+public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
public ConsistencyCheckJob(final String jobId) {
super(jobId);
}
@Override
- public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
+ public ConsistencyCheckJobItemContext buildJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
@@ -51,12 +50,12 @@ public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob {
}
@Override
- protected PipelineTasksRunner buildPipelineTasksRunner(final
PipelineJobItemContext pipelineJobItemContext) {
- return new
ConsistencyCheckTasksRunner((ConsistencyCheckJobItemContext)
pipelineJobItemContext);
+ protected PipelineTasksRunner buildTasksRunner(final
ConsistencyCheckJobItemContext jobItemContext) {
+ return new ConsistencyCheckTasksRunner(jobItemContext);
}
@Override
- protected void doPrepare(final PipelineJobItemContext jobItemContext) {
+ protected void doPrepare(final ConsistencyCheckJobItemContext
jobItemContext) {
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 5a204212d5b..9ed45a30c28 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -20,37 +20,34 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
-import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
-import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
@@ -66,25 +63,29 @@ import java.util.stream.Collectors;
* Migration job.
*/
@Slf4j
-public final class MigrationJob extends AbstractSeparablePipelineJob {
+public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJobItemContext> {
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager;
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+ private final PipelineProcessConfigurationPersistService
processConfigPersistService;
- private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ private final PipelineDataSourceManager dataSourceManager;
// Shared by all sharding items
- private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
+ private final MigrationJobPreparer jobPreparer;
public MigrationJob(final String jobId) {
super(jobId);
+ jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());
+ processConfigPersistService = new
PipelineProcessConfigurationPersistService();
+ dataSourceManager = new DefaultPipelineDataSourceManager();
+ jobPreparer = new MigrationJobPreparer();
}
@Override
- protected TransmissionJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
- int shardingItem = shardingContext.getShardingItem();
+ protected MigrationJobItemContext buildJobItemContext(final
ShardingContext shardingContext) {
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ int shardingItem = shardingContext.getShardingItem();
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
"MIGRATION"));
@@ -100,45 +101,40 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob {
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- MigrationTaskConfiguration result = new MigrationTaskConfiguration(
-
incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
- log.info("buildTaskConfiguration, result={}", result);
- return result;
+ return new
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
}
- private Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ private Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper mapper) {
Collection<CreateTableConfiguration> result = new LinkedList<>();
for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
- String sourceSchemaName =
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
- String targetSchemaName =
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
+ String sourceSchemaName =
mapper.getSchemaName(each.getLogicTableName());
+ String targetSchemaName = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData().isSchemaAvailable()
? sourceSchemaName : null;
DataNode dataNode = each.getDataNodes().get(0);
PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSources().get(dataNode.getDataSourceName());
CreateTableConfiguration createTableConfig = new
CreateTableConfiguration(sourceDataSourceConfig, new
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
jobConfig.getTarget(), new
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
result.add(createTableConfig);
}
- log.info("buildCreateTableConfigurations, result={}", result);
return result;
}
private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper mapper) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
- return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
+ return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, mapper, batchSize, writeRateLimitAlgorithm, retryTimes,
concurrency);
}
@Override
- protected PipelineTasksRunner buildPipelineTasksRunner(final
PipelineJobItemContext pipelineJobItemContext) {
- return new TransmissionTasksRunner((TransmissionJobItemContext)
pipelineJobItemContext);
+ protected PipelineTasksRunner buildTasksRunner(final
MigrationJobItemContext jobItemContext) {
+ return new TransmissionTasksRunner(jobItemContext);
}
@Override
- protected void doPrepare(final PipelineJobItemContext jobItemContext)
throws SQLException {
- jobPreparer.prepare((MigrationJobItemContext) jobItemContext);
+ protected void doPrepare(final MigrationJobItemContext jobItemContext)
throws SQLException {
+ jobPreparer.prepare(jobItemContext);
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 8fd9e67ad47..3b91f6ffc0e 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -55,7 +55,7 @@ class ConsistencyCheckJobTest {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob(checkJobId);
- ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildPipelineJobItemContext(
+ ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildJobItemContext(
new ShardingContext(checkJobId, "", 1,
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0,
""));
assertThat(actual.getProgressContext().getSourceTableCheckPositions(),
is(expectTableCheckPosition));
assertThat(actual.getProgressContext().getTargetTableCheckPositions(),
is(expectTableCheckPosition));