This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c17ec12959e Review and improve pipeline code (#28441)
c17ec12959e is described below
commit c17ec12959e2d9ef72ff4d401cfb957d8b101f16
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 15 16:08:04 2023 +0800
Review and improve pipeline code (#28441)
* Rename progress columns of "show migration check status" DistSQL result
* Add incremental_idle_seconds column in "show migration check status"
DistSQL result
* Add sourceDatabaseType for ConsistencyCheckJobItemProgress
* Review
* Impl AutoCloseable for TableDataConsistencyChecker
* Improve PipelineDataSource Wrapper and Manager
* Clean code
* Update unit test
---
.../StandardPipelineDataSourceConfiguration.java | 4 +--
.../DefaultPipelineDataSourceManager.java | 10 ++++--
.../datasource/PipelineDataSourceWrapper.java | 38 ++++++++++++++--------
.../progress/ConsistencyCheckJobItemProgress.java | 2 ++
.../yaml/YamlConsistencyCheckJobItemProgress.java | 2 ++
...YamlConsistencyCheckJobItemProgressSwapper.java | 3 +-
.../common/pojo/ConsistencyCheckJobItemInfo.java | 6 ++--
.../common/util/PipelineLazyInitializer.java | 5 +++
.../ConsistencyCheckJobItemProgressContext.java | 2 ++
.../CRC32MatchTableDataConsistencyChecker.java | 4 +++
.../DataMatchTableDataConsistencyChecker.java | 4 +++
.../table/MatchingTableInventoryChecker.java | 2 +-
.../table/TableDataConsistencyChecker.java | 5 ++-
.../pipeline/core/task/TaskExecuteCallback.java | 2 +-
.../datasource/PipelineDataSourceWrapperTest.java | 12 -------
.../mysql/ingest/MySQLIncrementalDumper.java | 6 ++--
.../query/ShowMigrationCheckStatusExecutor.java | 6 ++--
.../ShowMigrationCheckStatusExecutorTest.java | 7 ++--
.../api/impl/ConsistencyCheckJobAPI.java | 13 ++++----
.../context/ConsistencyCheckJobItemContext.java | 2 +-
.../ConsistencyCheckJobItemContextTest.java | 4 +--
.../MigrationDataConsistencyChecker.java | 5 +--
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 2 +-
.../cases/migration/AbstractMigrationE2EIT.java | 4 +--
.../FixtureTableDataConsistencyChecker.java | 4 +++
.../consistencycheck/ConsistencyCheckJobTest.java | 2 ++
.../migration/api/impl/MigrationJobAPITest.java | 2 +-
.../MigrationDataConsistencyCheckerTest.java | 2 +-
28 files changed, 100 insertions(+), 60 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 90bd9d1f84c..39b30d6bd26 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -64,8 +64,8 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
this(param, YamlEngine.unmarshal(param, Map.class));
}
- public StandardPipelineDataSourceConfiguration(final Map<String, Object>
yamlDataSourceConfig) {
- this(YamlEngine.marshal(yamlDataSourceConfig), new
HashMap<>(yamlDataSourceConfig));
+ public StandardPipelineDataSourceConfiguration(final Map<String, Object>
poolProps) {
+ this(YamlEngine.marshal(poolProps), new HashMap<>(poolProps));
}
private StandardPipelineDataSourceConfiguration(final String param, final
Map<String, Object> yamlConfig) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
index fee03ff0457..c637ca71d8f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/DefaultPipelineDataSourceManager.java
@@ -41,7 +41,11 @@ public final class DefaultPipelineDataSourceManager
implements PipelineDataSourc
synchronized (cachedDataSources) {
result = cachedDataSources.get(dataSourceConfig);
if (null != result) {
- return result;
+ if (!result.isClosed()) {
+ return result;
+ } else {
+ log.warn("{} is already closed, create again", result);
+ }
}
result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, result);
@@ -49,10 +53,12 @@ public final class DefaultPipelineDataSourceManager
implements PipelineDataSourc
}
}
- // TODO monitor each DataSource close
@Override
public void close() {
for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
+ if (each.isClosed()) {
+ continue;
+ }
try {
each.close();
} catch (final SQLException ex) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
index c5bbda09c0f..8fdac2e2d35 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapper.java
@@ -21,12 +21,14 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import
org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
/**
@@ -41,6 +43,17 @@ public final class PipelineDataSourceWrapper implements
DataSource, AutoCloseabl
private final DatabaseType databaseType;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Whether underlying data source is closed or not.
+ *
+ * @return true if closed
+ */
+ public boolean isClosed() {
+ return closed.get();
+ }
+
@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
@@ -88,21 +101,20 @@ public final class PipelineDataSourceWrapper implements
DataSource, AutoCloseabl
@Override
public void close() throws SQLException {
- if (null == dataSource) {
+ if (closed.get()) {
+ return;
+ }
+ if (!(dataSource instanceof AutoCloseable)) {
+ log.warn("Data source is not closed, it might cause connection
leak, data source: {}", dataSource);
return;
}
- if (dataSource instanceof AutoCloseable) {
- try {
- ((AutoCloseable) dataSource).close();
- } catch (final SQLException ex) {
- throw ex;
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- throw new SQLException("data source close failed.", ex);
- }
- } else {
- log.warn("dataSource is not closed, it might cause connection
leak, dataSource={}", dataSource);
+ try {
+ new DataSourcePoolDestroyer(dataSource).asyncDestroy();
+ closed.set(true);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ throw new SQLException("Data source close failed.", ex);
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
index 3b36f63e16c..f43db99d70f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/ConsistencyCheckJobItemProgress.java
@@ -52,4 +52,6 @@ public final class ConsistencyCheckJobItemProgress implements
PipelineJobItemPro
private final Map<String, Object> sourceTableCheckPositions;
private final Map<String, Object> targetTableCheckPositions;
+
+ private final String sourceDatabaseType;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index c7984ca0c4e..2ed595408b3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -48,4 +48,6 @@ public final class YamlConsistencyCheckJobItemProgress
implements YamlConfigurat
private Map<String, Object> sourceTableCheckPositions = new
LinkedHashMap<>();
private Map<String, Object> targetTableCheckPositions = new
LinkedHashMap<>();
+
+ private String sourceDatabaseType;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index bece1b9b17c..c399505edf2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -38,6 +38,7 @@ public final class YamlConsistencyCheckJobItemProgressSwapper
implements YamlCon
result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
result.setSourceTableCheckPositions(data.getSourceTableCheckPositions());
result.setTargetTableCheckPositions(data.getTargetTableCheckPositions());
+ result.setSourceDatabaseType(data.getSourceDatabaseType());
return result;
}
@@ -45,7 +46,7 @@ public final class YamlConsistencyCheckJobItemProgressSwapper
implements YamlCon
public ConsistencyCheckJobItemProgress swapToObject(final
YamlConsistencyCheckJobItemProgress yamlConfig) {
ConsistencyCheckJobItemProgress result = new
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(),
yamlConfig.getIgnoredTableNames(), yamlConfig.getCheckedRecordsCount(),
yamlConfig.getRecordsCount(),
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(),
- yamlConfig.getSourceTableCheckPositions(),
yamlConfig.getTargetTableCheckPositions());
+ yamlConfig.getSourceTableCheckPositions(),
yamlConfig.getTargetTableCheckPositions(), yamlConfig.getSourceDatabaseType());
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
index 0fa4f6cb77f..db7b605b71e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
@@ -35,9 +35,11 @@ public final class ConsistencyCheckJobItemInfo {
private String checkFailedTableNames;
- private int finishedPercentage;
+ private int inventoryFinishedPercentage;
- private long remainingSeconds;
+ private long inventoryRemainingSeconds;
+
+ private String incrementalIdleSeconds = "";
private String checkBeginTime;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
index f21d0adfd3e..df9927ecd62 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/PipelineLazyInitializer.java
@@ -38,6 +38,11 @@ public abstract class PipelineLazyInitializer<T> extends
LazyInitializer<T> {
return result;
}
+ /**
+ * Is initialized.
+ *
+ * @return initialized or not
+ */
public boolean isInitialized() {
return initialized.get();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index 7822408a81c..ddd11644075 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -58,6 +58,8 @@ public final class ConsistencyCheckJobItemProgressContext
implements PipelineJob
private final Map<String, Object> targetTableCheckPositions = new
ConcurrentHashMap<>();
+ private final String sourceDatabaseType;
+
@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index 42f7ff0824e..7f07d1fb81e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -47,6 +47,10 @@ public final class CRC32MatchTableDataConsistencyChecker
implements TableDataCon
return result;
}
+ @Override
+ public void close() {
+ }
+
@Override
public String getType() {
return "CRC32_MATCH";
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 9ae4cc1cfb6..410c64c4af2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -74,6 +74,10 @@ public final class DataMatchTableDataConsistencyChecker
implements TableDataCons
return
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class);
}
+ @Override
+ public void close() {
+ }
+
@Override
public String getType() {
return "DATA_MATCH";
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index d941dbb0d26..3e485e97f52 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -59,7 +59,7 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
@Override
public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
- ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) +
"-check-%d");
+ ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) +
"-matching-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
try {
return checkSingleTableInventoryData(param, executor);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
index 2faa5cc0775..010e504f66d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
@@ -25,7 +25,7 @@ import java.util.Collection;
/**
* Table data consistency checker.
*/
-public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm {
+public interface TableDataConsistencyChecker extends ShardingSphereAlgorithm,
AutoCloseable {
/**
* Build table inventory checker.
@@ -50,4 +50,7 @@ public interface TableDataConsistencyChecker extends
ShardingSphereAlgorithm {
* @return supported database types
*/
Collection<DatabaseType> getSupportedDatabaseTypes();
+
+ @Override
+ void close();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
index c4e88b5a79d..82b5ec4259a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
@@ -37,7 +37,7 @@ public final class TaskExecuteCallback implements
ExecuteCallback {
@Override
public void onFailure(final Throwable throwable) {
- log.error("onFailure, task ID={}", task.getTaskId());
+ log.error("onFailure, task ID={}", task.getTaskId(), throwable);
task.stop();
IOUtils.closeQuietly(task);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
index 7a61e3bc6ea..455a2b0dc2d 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/PipelineDataSourceWrapperTest.java
@@ -110,16 +110,4 @@ class PipelineDataSourceWrapperTest {
doThrow(new
SQLException("")).when(dataSource).setLogWriter(printWriter);
assertThrows(SQLException.class, () -> new
PipelineDataSourceWrapper(dataSource,
TypedSPILoader.getService(DatabaseType.class,
"FIXTURE")).setLogWriter(printWriter));
}
-
- @Test
- void assertCloseExceptionFailure() throws Exception {
- doThrow(new Exception("")).when((AutoCloseable) dataSource).close();
- assertThrows(SQLException.class, () -> new
PipelineDataSourceWrapper(dataSource,
TypedSPILoader.getService(DatabaseType.class, "FIXTURE")).close());
- }
-
- @Test
- void assertCloseSQLExceptionFailure() throws Exception {
- doThrow(new SQLException("")).when((AutoCloseable) dataSource).close();
- assertThrows(SQLException.class, () -> new
PipelineDataSourceWrapper(dataSource,
TypedSPILoader.getService(DatabaseType.class, "FIXTURE")).close());
- }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 0a0eeb75081..6c5cde7993a 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -47,10 +47,10 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandler;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
-import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
+import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.io.Serializable;
@@ -88,10 +88,10 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
this.channel = channel;
this.metaDataLoader = metaDataLoader;
YamlJdbcConfiguration jdbcConfig =
((StandardPipelineDataSourceConfiguration)
dumperConfig.getDataSourceConfig()).getJdbcConfig();
- log.info("incremental dump, jdbcUrl={}", jdbcConfig.getUrl());
ConnectionPropertiesParser parser =
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class,
TypedSPILoader.getService(DatabaseType.class, "MySQL"));
ConnectionProperties connectionProps =
parser.parse(jdbcConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(generateServerId(),
connectionProps.getHostname(), connectionProps.getPort(),
jdbcConfig.getUsername(), jdbcConfig.getPassword());
+ log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={},
port={}", jdbcConfig.getUrl(), connectInfo.getServerId(),
connectInfo.getHost(), connectInfo.getPort());
client = new MySQLClient(connectInfo, dumperConfig.isDecodeWithTX());
catalog = connectionProps.getCatalog();
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
index 98202a3c5b6..a1f95255599 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
@@ -49,15 +49,15 @@ public final class ShowMigrationCheckStatusExecutor
implements QueryableRALExecu
private LocalDataQueryResultRow convert(final ConsistencyCheckJobItemInfo
info) {
String checkResult = null == info.getCheckSuccess() ? "" :
info.getCheckSuccess().toString();
return new
LocalDataQueryResultRow(Optional.ofNullable(info.getTableNames()).orElse(""),
checkResult, Optional.ofNullable(info.getCheckFailedTableNames()).orElse(""),
- String.valueOf(info.getFinishedPercentage()),
info.getRemainingSeconds(),
+ String.valueOf(info.getInventoryFinishedPercentage()),
info.getInventoryRemainingSeconds(), info.getIncrementalIdleSeconds(),
Optional.ofNullable(info.getCheckBeginTime()).orElse(""),
Optional.ofNullable(info.getCheckEndTime()).orElse(""),
info.getDurationSeconds(),
info.getAlgorithmType(),
Optional.ofNullable(info.getAlgorithmProps()).orElse(""),
Optional.ofNullable(info.getErrorMessage()).orElse(""));
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("tables", "result", "check_failed_tables",
"finished_percentage", "remaining_seconds", "check_begin_time",
"check_end_time", "duration_seconds",
- "algorithm_type", "algorithm_props", "error_message");
+ return Arrays.asList("tables", "result", "check_failed_tables",
"inventory_finished_percentage", "inventory_remaining_seconds",
"incremental_idle_seconds",
+ "check_begin_time", "check_end_time", "duration_seconds",
"algorithm_type", "algorithm_props", "error_message");
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
index fa45d54fb3a..4ffef5d5e08 100644
---
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
+++
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
@@ -32,13 +32,14 @@ class ShowMigrationCheckStatusExecutorTest {
@Test
void assertGetColumnNames() {
Collection<String> columns = executor.getColumnNames();
- assertThat(columns.size(), is(11));
+ assertThat(columns.size(), is(12));
Iterator<String> iterator = columns.iterator();
assertThat(iterator.next(), is("tables"));
assertThat(iterator.next(), is("result"));
assertThat(iterator.next(), is("check_failed_tables"));
- assertThat(iterator.next(), is("finished_percentage"));
- assertThat(iterator.next(), is("remaining_seconds"));
+ assertThat(iterator.next(), is("inventory_finished_percentage"));
+ assertThat(iterator.next(), is("inventory_remaining_seconds"));
+ assertThat(iterator.next(), is("incremental_idle_seconds"));
assertThat(iterator.next(), is("check_begin_time"));
assertThat(iterator.next(), is("check_end_time"));
assertThat(iterator.next(), is("duration_seconds"));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 37acaab9719..4eedb50c8cd 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -122,6 +122,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
yamlConfig.setParentJobId(parentJobId);
yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName());
yamlConfig.setAlgorithmProps(param.getAlgorithmProps());
+
yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType());
start(new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
return result;
}
@@ -160,7 +161,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
String ignoredTableNames = String.join(",",
progressContext.getIgnoredTableNames());
ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(tableNames, ignoredTableNames,
progressContext.getCheckedRecordsCount().get(),
progressContext.getRecordsCount(),
progressContext.getCheckBeginTimeMillis(),
progressContext.getCheckEndTimeMillis(),
- progressContext.getSourceTableCheckPositions(),
progressContext.getTargetTableCheckPositions());
+ progressContext.getSourceTableCheckPositions(),
progressContext.getTargetTableCheckPositions(),
progressContext.getSourceDatabaseType());
jobItemProgress.setStatus(context.getStatus());
return
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress));
}
@@ -304,7 +305,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
}
ConsistencyCheckJobItemProgress jobItemProgress = progress.get();
if (null == jobItemProgress.getRecordsCount() || null ==
jobItemProgress.getCheckedRecordsCount()) {
- result.setFinishedPercentage(0);
+ result.setInventoryFinishedPercentage(0);
result.setCheckSuccess(null);
return result;
}
@@ -312,14 +313,14 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
long recordsCount = jobItemProgress.getRecordsCount();
long checkedRecordsCount =
Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
if (JobStatus.FINISHED == jobItemProgress.getStatus()) {
- result.setFinishedPercentage(100);
+ result.setInventoryFinishedPercentage(100);
LocalDateTime checkEndTime = new
Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
Duration duration = Duration.between(checkBeginTime, checkEndTime);
result.setDurationSeconds(duration.getSeconds());
result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
- result.setRemainingSeconds(0L);
+ result.setInventoryRemainingSeconds(0L);
} else if (0 != recordsCount && 0 != checkedRecordsCount) {
- result.setFinishedPercentage((int) (checkedRecordsCount * 100 /
recordsCount));
+ result.setInventoryFinishedPercentage((int) (checkedRecordsCount *
100 / recordsCount));
JobConfigurationPOJO jobConfigPOJO =
getElasticJobConfigPOJO(checkJobId);
Long stopTimeMillis = jobConfigPOJO.isDisabled() ?
Long.parseLong(jobConfigPOJO.getProps().getProperty("stop_time_millis")) : null;
long durationMillis = (null != stopTimeMillis ? stopTimeMillis :
System.currentTimeMillis()) - jobItemProgress.getCheckBeginTimeMillis();
@@ -328,7 +329,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
result.setCheckEndTime(DATE_TIME_FORMATTER.format(new
Timestamp(stopTimeMillis).toLocalDateTime()));
}
long remainingMills = Math.max(0, (long) ((recordsCount -
checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis));
- result.setRemainingSeconds(remainingMills / 1000);
+ result.setInventoryRemainingSeconds(remainingMills / 1000);
}
String tableNames = jobItemProgress.getTableNames();
result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index 329d9d1e8cb..5dd7865398b 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -56,7 +56,7 @@ public final class ConsistencyCheckJobItemContext implements
PipelineJobItemCont
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
this.status = status;
- progressContext = new ConsistencyCheckJobItemProgressContext(jobId,
shardingItem);
+ progressContext = new ConsistencyCheckJobItemProgressContext(jobId,
shardingItem, jobConfig.getSourceDatabaseType().getType());
if (null != jobItemProgress) {
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
Optional.ofNullable(jobItemProgress.getSourceTableCheckPositions()).ifPresent(progressContext.getSourceTableCheckPositions()::putAll);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 753f5e9105f..b84f69bff9e 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -43,7 +43,7 @@ class ConsistencyCheckJobItemContextTest {
void assertConstructWithoutTableCheckPositions() {
Map<String, Object> sourceTableCheckPositions = Collections.emptyMap();
Map<String, Object> targetTableCheckPositions = Collections.emptyMap();
- ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null,
sourceTableCheckPositions, targetTableCheckPositions);
+ ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null,
sourceTableCheckPositions, targetTableCheckPositions, "H2");
ConsistencyCheckJobItemContext actual = new
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "",
"DATA_MATCH", null, databaseType),
0, JobStatus.RUNNING, jobItemProgress);
verifyProgressContext(actual.getProgressContext(), 0,
sourceTableCheckPositions, targetTableCheckPositions);
@@ -53,7 +53,7 @@ class ConsistencyCheckJobItemContextTest {
void assertConstructWithTableCheckPositions() {
Map<String, Object> sourceTableCheckPositions = ImmutableMap.of(TABLE,
6);
Map<String, Object> targetTableCheckPositions = ImmutableMap.of(TABLE,
5);
- ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null,
sourceTableCheckPositions, targetTableCheckPositions);
+ ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null,
sourceTableCheckPositions, targetTableCheckPositions, "H2");
ConsistencyCheckJobItemContext actual = new
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "",
"DATA_MATCH", null, databaseType),
0, JobStatus.RUNNING, jobItemProgress);
verifyProgressContext(actual.getProgressContext(), 1,
sourceTableCheckPositions, targetTableCheckPositions);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 13dde2fb842..846f9b430e0 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -87,8 +87,9 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
progressContext.getTableNames().addAll(sourceTableNames);
progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
Map<SchemaTableName, TableDataConsistencyCheckResult> result = new
LinkedHashMap<>();
- TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
- try (PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager()) {
+ try (
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
checkTableInventoryData(each, tableChecker, result,
dataSourceManager);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index ecf7e569595..a54c866caea 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -201,7 +201,7 @@ class CDCE2EIT {
StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(targetDataSource);
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(),
schemaTableName.getTableName().getOriginal());
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
- ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0);
+ ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0,
sourceDataSource.getDatabaseType().getType());
TableInventoryCheckParameter param = new
TableInventoryCheckParameter("", sourceDataSource, targetDataSource,
schemaTableName, schemaTableName,
tableMetaData.getColumnNames(), uniqueKeys, null,
progressContext);
TableDataConsistencyChecker tableChecker =
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new
Properties());
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index ffd99fd94c6..470f46434a1 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -165,7 +165,7 @@ public abstract class AbstractMigrationE2EIT {
continue;
}
List<String> checkEndTimeList = resultList.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
- Set<String> finishedPercentages = resultList.stream().map(map ->
map.get("finished_percentage").toString()).collect(Collectors.toSet());
+ Set<String> finishedPercentages = resultList.stream().map(map ->
map.get("inventory_finished_percentage").toString()).collect(Collectors.toSet());
if (checkEndTimeList.size() == resultList.size() && 1 ==
finishedPercentages.size() && finishedPercentages.contains("100")) {
break;
} else {
@@ -176,7 +176,7 @@ public abstract class AbstractMigrationE2EIT {
assertFalse(resultList.isEmpty());
for (Map<String, Object> each : resultList) {
assertTrue(Boolean.parseBoolean(each.get("result").toString()),
String.format("%s check result is false", each.get("tables")));
- assertThat("finished_percentage is not 100",
each.get("finished_percentage").toString(), is("100"));
+ assertThat("inventory_finished_percentage is not 100",
each.get("inventory_finished_percentage").toString(), is("100"));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
index a3dd3e28c4a..fdcc39f5120 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureTableDataConsistencyChecker.java
@@ -39,6 +39,10 @@ public final class FixtureTableDataConsistencyChecker
implements TableDataConsis
return
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class);
}
+ @Override
+ public void close() {
+ }
+
@Override
public String getType() {
return "FIXTURE";
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 613a74c9bbf..a8c3af697f2 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -72,6 +72,8 @@ class ConsistencyCheckJobTest {
YamlConsistencyCheckJobConfiguration result = new
YamlConsistencyCheckJobConfiguration();
result.setJobId(checkJobId);
result.setParentJobId("");
+ result.setAlgorithmTypeName("DATA_MATCH");
+ result.setSourceDatabaseType("H2");
return result;
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index d052d9609ae..96fc9eba5c8 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -177,7 +177,7 @@ class MigrationJobAPITest {
Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobAPI.buildPipelineDataConsistencyChecker(
- jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0)).check("FIXTURE", null);
+ jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE",
null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index f4e849174e1..3862d8c061b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -72,7 +72,7 @@ class MigrationDataConsistencyCheckerTest {
}
private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext() {
- return new ConsistencyCheckJobItemProgressContext("", 0);
+ return new ConsistencyCheckJobItemProgressContext("", 0, "H2");
}
private MigrationJobConfiguration createJobConfiguration() throws
SQLException {