This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc2d9b63428 Revise consistency check job code; Migration job drop
cascade check jobs (#21273)
bc2d9b63428 is described below
commit bc2d9b634288205b7602fbe00d66161c8a585ffe
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 30 11:28:25 2022 +0800
Revise consistency check job code; Migration job drop cascade check jobs
(#21273)
* Add log
* Revise consistency check job code
* Revise consistency check job code
* Migration job drop cascade check jobs
* Update sequence range
* Unit test
---
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 2 +-
.../job/ConsistencyCheckJobConfiguration.java | 2 +-
.../yaml/YamlConsistencyCheckJobConfiguration.java | 2 +-
...amlConsistencyCheckJobConfigurationSwapper.java | 4 +--
.../job/progress/ConsistencyCheckJobProgress.java | 4 ++-
.../dumper/IncrementalDumperCreatorFactory.java | 14 +++++++-
.../pipeline/core/api/GovernanceRepositoryAPI.java | 16 ++++++---
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 32 ++++++++++-------
...mpletedConsistencyCheckJobExistsException.java} | 8 ++---
.../yaml/YamlConsistencyCheckJobProgress.java | 2 +-
.../metadata/generator/PipelineDDLGenerator.java | 4 ++-
.../core/metadata/node/PipelineMetaDataNode.java | 4 +--
.../core/prepare/InventoryTaskSplitter.java | 2 ++
.../core/prepare/PipelineJobPreparerUtils.java | 24 ++++++++-----
.../data/pipeline/core/task/IncrementalTask.java | 2 +-
...tencyCheckChangedJobConfigurationProcessor.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 3 +-
.../consistencycheck/ConsistencyCheckJobAPI.java | 1 -
.../ConsistencyCheckJobAPIImpl.java | 41 ++++++++++++----------
.../consistencycheck/ConsistencyCheckJobId.java | 20 ++++++-----
.../ConsistencyCheckJobItemContext.java | 2 +-
.../pipeline/scenario/migration/MigrationJob.java | 2 ++
.../scenario/migration/MigrationJobAPIImpl.java | 19 ++++++++++
.../scenario/migration/MigrationJobPreparer.java | 11 +++---
.../api/impl/ConsistencyCheckJobAPIImplTest.java | 15 ++++----
25 files changed, 152 insertions(+), 86 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 5e226ce1124..008f0002705 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -31,7 +31,7 @@ import java.util.Map;
public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI,
RequiredSPI {
/**
- * Create job migration config and start.
+ * Create consistency check configuration and start job.
*
* @param parameter create consistency check job parameter
* @return job id
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
index 8576a374cae..b80eab65ee1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
@@ -39,7 +39,7 @@ public final class ConsistencyCheckJobConfiguration
implements PipelineJobConfig
private final String algorithmTypeName;
- private final Properties algorithmProperties;
+ private final Properties algorithmProps;
@Override
public String getSourceDatabaseType() {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
index 53ec03e9c42..69ace660ef1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
@@ -39,7 +39,7 @@ public final class YamlConsistencyCheckJobConfiguration
implements YamlPipelineJ
private String algorithmTypeName;
- private Properties algorithmProperties;
+ private Properties algorithmProps;
@Override
public String getTargetDatabaseName() {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
index 64ce12e1029..f945d8d0804 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -34,13 +34,13 @@ public final class
YamlConsistencyCheckJobConfigurationSwapper implements YamlCo
result.setJobId(data.getJobId());
result.setParentJobId(data.getParentJobId());
result.setAlgorithmTypeName(data.getAlgorithmTypeName());
- result.setAlgorithmProperties(data.getAlgorithmProperties());
+ result.setAlgorithmProps(data.getAlgorithmProps());
return result;
}
@Override
public ConsistencyCheckJobConfiguration swapToObject(final
YamlConsistencyCheckJobConfiguration yamlConfig) {
- return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(),
yamlConfig.getAlgorithmProperties());
+ return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(),
yamlConfig.getAlgorithmProps());
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index dbb318489b4..22a16efa840 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -19,13 +19,15 @@ package
org.apache.shardingsphere.data.pipeline.api.job.progress;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
/**
- * Data check job item progress.
+ * Data consistency check job progress.
*/
@Getter
@Setter
+@ToString
public final class ConsistencyCheckJobProgress implements
PipelineJobItemProgress {
private JobStatus status = JobStatus.RUNNING;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
index f81e9fbcd61..1f39e23fd02 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
@@ -22,6 +22,8 @@ import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+import java.util.Optional;
+
/**
* Incremental dumper creator factory.
*/
@@ -33,7 +35,7 @@ public class IncrementalDumperCreatorFactory {
}
/**
- * Incremental dumper creator.
+ * Get incremental dumper creator.
*
* @param databaseType database type
* @return incremental dumper creator
@@ -41,4 +43,14 @@ public class IncrementalDumperCreatorFactory {
public static IncrementalDumperCreator getInstance(final String
databaseType) {
return
TypedSPIRegistry.getRegisteredService(IncrementalDumperCreator.class,
databaseType);
}
+
+ /**
+ * Find incremental dumper creator.
+ *
+ * @param databaseType database type
+ * @return incremental dumper creator optional
+ */
+ public static Optional<IncrementalDumperCreator> findInstance(final String
databaseType) {
+ return
TypedSPIRegistry.findRegisteredService(IncrementalDumperCreator.class,
databaseType);
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index edbc645a141..e7dc53f1050 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -66,7 +66,7 @@ public interface GovernanceRepositoryAPI {
Optional<String> getCheckLatestJobId(String jobId);
/**
- * Persist check latest result.
+ * Persist check latest job id.
*
* @param jobId job id
* @param checkJobId check job id
@@ -83,13 +83,21 @@ public interface GovernanceRepositoryAPI {
Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId,
String checkJobId);
/**
- * Persist check latest detailed result.
+ * Persist check job result.
*
* @param jobId job id
* @param checkJobId check job id
- * @param dataConsistencyCheckResult check result
+ * @param checkResultMap check result map
*/
- void persistCheckJobResult(String jobId, String checkJobId, Map<String,
DataConsistencyCheckResult> dataConsistencyCheckResult);
+ void persistCheckJobResult(String jobId, String checkJobId, Map<String,
DataConsistencyCheckResult> checkResultMap);
+
+ /**
+ * Delete check job result.
+ *
+ * @param jobId job id
+ * @param checkJobId check job id
+ */
+ void deleteCheckJobResult(String jobId, String checkJobId);
/**
* List check job ids.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 3baf3db9f5f..329915e7485 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -79,30 +79,36 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public Map<String, DataConsistencyCheckResult> getCheckJobResult(final
String jobId, final String checkJobId) {
Map<String, DataConsistencyCheckResult> result = new HashMap<>();
- String checkJobText =
repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
- if (StringUtils.isBlank(checkJobText)) {
+ String yamlCheckResultMapText =
repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+ if (StringUtils.isBlank(yamlCheckResultMapText)) {
return Collections.emptyMap();
}
- Map<String, String> checkJobConfigMap =
YamlEngine.unmarshal(checkJobText, Map.class, true);
- for (Entry<String, String> entry : checkJobConfigMap.entrySet()) {
+ Map<String, String> yamlCheckResultMap =
YamlEngine.unmarshal(yamlCheckResultMapText, Map.class, true);
+ for (Entry<String, String> entry : yamlCheckResultMap.entrySet()) {
result.put(entry.getKey(),
YamlDataConsistencyCheckResultSwapper.swapToObject(entry.getValue()));
}
return result;
}
@Override
- public void persistCheckJobResult(final String jobId, final String
checkJobId, final Map<String, DataConsistencyCheckResult>
dataConsistencyCheckResult) {
- if (null == dataConsistencyCheckResult) {
- log.warn("data consistency check is null, jobId {}, checkJobId
{}", jobId, checkJobId);
+ public void persistCheckJobResult(final String jobId, final String
checkJobId, final Map<String, DataConsistencyCheckResult> checkResultMap) {
+ if (null == checkResultMap) {
+ log.warn("checkResultMap is null, jobId {}, checkJobId {}", jobId,
checkJobId);
return;
}
- log.info("persist check job result '{}' for job {}", checkJobId,
jobId);
- Map<String, String> checkResultMap = new LinkedHashMap<>();
- for (Entry<String, DataConsistencyCheckResult> entry :
dataConsistencyCheckResult.entrySet()) {
- YamlDataConsistencyCheckResult checkResult = new
YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
- checkResultMap.put(entry.getKey(),
YamlEngine.marshal(checkResult));
+ log.info("persist check job result for job {}", checkJobId);
+ Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
+ for (Entry<String, DataConsistencyCheckResult> entry :
checkResultMap.entrySet()) {
+ YamlDataConsistencyCheckResult yamlCheckResult = new
YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
+ yamlCheckResultMap.put(entry.getKey(),
YamlEngine.marshal(yamlCheckResult));
}
- repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId,
checkJobId), YamlEngine.marshal(checkResultMap));
+ repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId,
checkJobId), YamlEngine.marshal(yamlCheckResultMap));
+ }
+
+ @Override
+ public void deleteCheckJobResult(final String jobId, final String
checkJobId) {
+ log.info("deleteCheckJobResult, jobId={}, checkJobId={}", jobId,
checkJobId);
+ repository.delete(PipelineMetaDataNode.getCheckJobResultPath(jobId,
checkJobId));
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
similarity index 77%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
index dff0e6a1905..e5ab9d1c8db 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
@@ -21,13 +21,13 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Pipeline job has already existed exception.
+ * Uncompleted consistency check job exists exception.
*/
-public final class PipelineJobHasAlreadyExistedException extends
PipelineSQLException {
+public final class UncompletedConsistencyCheckJobExistsException extends
PipelineSQLException {
private static final long serialVersionUID = 2854259384634892428L;
- public PipelineJobHasAlreadyExistedException(final String jobId) {
- super(XOpenSQLState.GENERAL_ERROR, 86, "Job `%s` has already existed",
jobId);
+ public UncompletedConsistencyCheckJobExistsException(final String jobId) {
+ super(XOpenSQLState.GENERAL_ERROR, 86, "Uncompelted consistency check
job `%s` exists", jobId);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index 05dd63bb70c..f0baaf647c8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -21,7 +21,7 @@ import lombok.Data;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Yaml data check job progress.
+ * Yaml data consistency check job progress.
*/
@Data
public final class YamlConsistencyCheckJobProgress implements
YamlConfiguration {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index 1df7d29a532..a2785126509 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -72,12 +72,14 @@ public final class PipelineDDLGenerator {
*/
public String generateLogicDDL(final DatabaseType databaseType, final
DataSource sourceDataSource,
final String schemaName, final String
sourceTableName, final String targetTableName, final
ShardingSphereSQLParserEngine parserEngine) throws SQLException {
- log.info("generateLogicDDLSQL, databaseType={}, schemaName={},
sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName,
sourceTableName, targetTableName);
+ log.info("generateLogicDDL, databaseType={}, schemaName={},
sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName,
sourceTableName, targetTableName);
+ long startTimeMillis = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
for (String each :
CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource,
schemaName, sourceTableName)) {
Optional<String> queryContext = decorate(databaseType,
sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(ddlSQL ->
result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
+ log.info("generateLogicDDL cost {} ms", System.currentTimeMillis() -
startTimeMillis);
return result.toString();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 81a35c9f7e5..190bf24e353 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -118,7 +118,7 @@ public final class PipelineMetaDataNode {
}
/**
- * Get check latest detailed result path.
+ * Get check latest job id path.
*
* @param jobId job id
* @return check latest job id path
@@ -128,7 +128,7 @@ public final class PipelineMetaDataNode {
}
/**
- * Get check latest result path.
+ * Get check job result path.
*
* @param jobId job id
* @param checkJobId check job id
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index fd8354ab791..f14e56d3e71 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -73,11 +73,13 @@ public final class InventoryTaskSplitter {
*/
public List<InventoryTask> splitInventoryData(final
InventoryIncrementalJobItemContext jobItemContext) {
List<InventoryTask> result = new LinkedList<>();
+ long startTimeMillis = System.currentTimeMillis();
PipelineChannelCreator pipelineChannelCreator =
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
for (InventoryDumperConfiguration each :
splitDumperConfig(jobItemContext, dumperConfig)) {
result.add(new InventoryTask(each, importerConfig,
pipelineChannelCreator, jobItemContext.getDataSourceManager(),
sourceDataSource, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(),
jobItemContext.getJobProcessContext().getInventoryImporterExecuteEngine(),
jobItemContext));
}
+ log.info("splitInventoryData cost {} ms", System.currentTimeMillis() -
startTimeMillis);
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index d1ea79d9981..c58c67eaad7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourc
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializerFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -47,11 +48,8 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import javax.sql.DataSource;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Optional;
-import java.util.Set;
/**
* Pipeline job preparer utils.
@@ -59,8 +57,6 @@ import java.util.Set;
@Slf4j
public final class PipelineJobPreparerUtils {
- private static final Set<String> INCREMENTAL_SUPPORTED_DATABASES = new
HashSet<>(Arrays.asList("MySQL", "PostgreSQL", "openGauss"));
-
/**
* Is incremental supported.
*
@@ -68,8 +64,7 @@ public final class PipelineJobPreparerUtils {
* @return true if supported, otherwise false
*/
public static boolean isIncrementalSupported(final String databaseType) {
- // TODO check by IncrementalDumperCreator SPI
- return INCREMENTAL_SUPPORTED_DATABASES.contains(databaseType);
+ return
IncrementalDumperCreatorFactory.findInstance(databaseType).isPresent();
}
/**
@@ -85,7 +80,9 @@ public final class PipelineJobPreparerUtils {
log.info("dataSourcePreparer null, ignore prepare target");
return;
}
+ long startTimeMillis = System.currentTimeMillis();
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+ log.info("prepareTargetSchema cost {} ms", System.currentTimeMillis()
- startTimeMillis);
}
/**
@@ -113,7 +110,9 @@ public final class PipelineJobPreparerUtils {
log.info("dataSourcePreparer null, ignore prepare target");
return;
}
+ long startTimeMillis = System.currentTimeMillis();
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
+ log.info("prepareTargetTables cost {} ms", System.currentTimeMillis()
- startTimeMillis);
}
/**
@@ -135,7 +134,10 @@ public final class PipelineJobPreparerUtils {
}
String databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- return
PositionInitializerFactory.getInstance(databaseType).init(dataSource,
dumperConfig.getJobId());
+ long startTimeMillis = System.currentTimeMillis();
+ IngestPosition<?> result =
PositionInitializerFactory.getInstance(databaseType).init(dataSource,
dumperConfig.getJobId());
+ log.info("getIncrementalPosition cost {} ms",
System.currentTimeMillis() - startTimeMillis);
+ return result;
}
/**
@@ -149,10 +151,12 @@ public final class PipelineJobPreparerUtils {
log.info("source data source is empty, skip check");
return;
}
+ final long startTimeMillis = System.currentTimeMillis();
DataSourceChecker dataSourceChecker =
DataSourceCheckerFactory.getInstance(databaseType);
dataSourceChecker.checkConnection(dataSources);
dataSourceChecker.checkPrivilege(dataSources);
dataSourceChecker.checkVariable(dataSources);
+ log.info("checkSourceDataSource cost {} ms",
System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -168,8 +172,10 @@ public final class PipelineJobPreparerUtils {
log.info("target data source is empty, skip check");
return;
}
+ long startTimeMillis = System.currentTimeMillis();
dataSourceChecker.checkConnection(targetDataSources);
dataSourceChecker.checkTargetTable(targetDataSources,
importerConfig.getTableNameSchemaNameMapping(),
importerConfig.getLogicTableNames());
+ log.info("checkTargetDataSource cost {} ms",
System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -182,6 +188,7 @@ public final class PipelineJobPreparerUtils {
public static void destroyPosition(final String jobId, final
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
PositionInitializer positionInitializer =
PositionInitializerFactory.getInstance(databaseType.getType());
+ final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup database type:{}, data source type:{}",
databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof
ShardingSpherePipelineDataSourceConfiguration) {
ShardingSpherePipelineDataSourceConfiguration dataSourceConfig =
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
@@ -199,5 +206,6 @@ public final class PipelineJobPreparerUtils {
positionInitializer.destroy(dataSource, jobId);
}
}
+ log.info("destroyPosition cost {} ms", System.currentTimeMillis() -
startTimeMillis);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 03889561674..9e7363970a0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -46,7 +46,7 @@ import java.util.concurrent.CompletableFuture;
* Incremental task.
*/
@Slf4j
-@ToString(exclude = {"incrementalDumperExecuteEngine",
"importerExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
+@ToString(exclude = {"incrementalExecuteEngine", "channel", "dumper",
"importers", "taskProgress"})
public final class IncrementalTask implements PipelineTask, AutoCloseable {
@Getter
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index ea606b8e296..8d71eae5a7c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -30,7 +30,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.concurrent.CompletableFuture;
/**
- * Consistency check job configuration changed processor.
+ * Consistency check changed job configuration processor.
*/
@Slf4j
public final class ConsistencyCheckChangedJobConfigurationProcessor implements
PipelineChangedJobConfigurationProcessor {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 05dc69d7467..e00a4e6359c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -59,14 +59,13 @@ public final class ConsistencyCheckJob extends
AbstractPipelineJob implements Si
jobAPI.persistJobItemProgress(jobItemContext);
String parentJobId = consistencyCheckJobConfig.getParentJobId();
log.info("execute consistency check, job id:{}, referred job id:{}",
checkJobId, parentJobId);
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(parentJobId,
checkJobId);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobPublicAPI jobPublicAPI =
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult =
Collections.emptyMap();
try {
dataConsistencyCheckResult =
StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())
? jobPublicAPI.dataConsistencyCheck(parentJobId)
- : jobPublicAPI.dataConsistencyCheck(parentJobId,
consistencyCheckJobConfig.getAlgorithmTypeName(),
consistencyCheckJobConfig.getAlgorithmProperties());
+ : jobPublicAPI.dataConsistencyCheck(parentJobId,
consistencyCheckJobConfig.getAlgorithmTypeName(),
consistencyCheckJobConfig.getAlgorithmProps());
status = JobStatus.FINISHED;
} catch (final SQLWrapperException ex) {
log.error("data consistency check failed", ex);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
index dca95957b5c..d8495582568 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
@@ -25,5 +25,4 @@ import
org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
* Consistency check job API.
*/
public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI,
PipelineJobAPI, PipelineJobItemAPI {
-
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 4ad73f6ee91..c2e88634d30 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -39,9 +39,9 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -63,28 +63,31 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
ConsistencyCheckJobId jobId = (ConsistencyCheckJobId) pipelineJobId;
- return jobId.getPipelineJobId() + jobId.getSequence();
+ return jobId.getParentJobId() + jobId.getSequence();
}
@Override
public String createJobAndStart(final CreateConsistencyCheckJobParameter
parameter) {
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- Optional<String> optional =
repositoryAPI.getCheckLatestJobId(parameter.getJobId());
- if (optional.isPresent()) {
- PipelineJobItemProgress progress =
getJobItemProgress(optional.get(), 0);
- if (null != progress && JobStatus.FINISHED !=
progress.getStatus()) {
- log.info("check job already existed and status isn't FINISHED,
status {}", progress.getStatus());
- throw new
PipelineJobHasAlreadyExistedException(optional.get());
+ String parentJobId = parameter.getJobId();
+ Optional<String> checkLatestJobId =
repositoryAPI.getCheckLatestJobId(parentJobId);
+ if (checkLatestJobId.isPresent()) {
+ PipelineJobItemProgress progress =
getJobItemProgress(checkLatestJobId.get(), 0);
+ if (null == progress || JobStatus.FINISHED !=
progress.getStatus()) {
+ log.info("check job already exists and status is not FINISHED,
progress={}", progress);
+ throw new
UncompletedConsistencyCheckJobExistsException(checkLatestJobId.get());
}
}
- int consistencyCheckVersionNew = optional.map(s ->
ConsistencyCheckJobId.getSequence(s) + 1).orElse(0);
+ int sequence = checkLatestJobId.map(s ->
ConsistencyCheckJobId.parseSequence(s) +
1).orElse(ConsistencyCheckJobId.MIN_SEQUENCE);
+ String result = marshalJobId(new ConsistencyCheckJobId(parentJobId,
sequence));
+ repositoryAPI.persistCheckLatestJobId(parentJobId, result);
+ repositoryAPI.deleteCheckJobResult(parentJobId, result);
+ dropJob(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new
YamlConsistencyCheckJobConfiguration();
- ConsistencyCheckJobId checkJobId = new
ConsistencyCheckJobId(parameter.getJobId(), consistencyCheckVersionNew);
- String result = marshalJobId(checkJobId);
yamlConfig.setJobId(result);
- yamlConfig.setParentJobId(parameter.getJobId());
+ yamlConfig.setParentJobId(parentJobId);
yamlConfig.setAlgorithmTypeName(parameter.getAlgorithmTypeName());
- yamlConfig.setAlgorithmProperties(parameter.getAlgorithmProps());
+ yamlConfig.setAlgorithmProps(parameter.getAlgorithmProps());
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig);
start(jobConfig);
return result;
@@ -108,7 +111,7 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
}
@Override
- public PipelineJobItemProgress getJobItemProgress(final String jobId,
final int shardingItem) {
+ public ConsistencyCheckJobProgress getJobItemProgress(final String jobId,
final int shardingItem) {
String progress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
if (StringUtils.isBlank(progress)) {
return null;
@@ -121,7 +124,7 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
@Override
public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- ConsistencyCheckJobProgress jobProgress =
(ConsistencyCheckJobProgress) getJobItemProgress(jobId, shardingItem);
+ ConsistencyCheckJobProgress jobProgress = getJobItemProgress(jobId,
shardingItem);
if (null == jobProgress) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
return;
@@ -135,6 +138,7 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
log.info("Start disable check job {}", jobId);
PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
ShardingSpherePreconditions.checkState(null == jobProgress ||
JobStatus.FINISHED != jobProgress.getStatus(), () -> new
PipelineJobHasAlreadyFinishedException(jobId));
+ super.startDisabledJob(jobId);
}
@Override
@@ -170,21 +174,22 @@ public final class ConsistencyCheckJobAPIImpl extends
AbstractPipelineJobAPIImpl
@Override
public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
+ throw new UnsupportedOperationException();
}
@Override
public PipelineTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public PipelineProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
protected PipelineJobInfo getJobInfo(final String jobId) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 9e8d3b2b87b..727d83dedb6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -32,26 +32,28 @@ public final class ConsistencyCheckJobId extends
AbstractPipelineJobId {
public static final String CURRENT_VERSION = "01";
- private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
+ public static final int MIN_SEQUENCE = 1;
- private final String pipelineJobId;
+ private static final int MAX_SEQUENCE = 3;
+
+ private final String parentJobId;
private final int sequence;
- public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final
int sequence) {
+ public ConsistencyCheckJobId(final @NonNull String parentJobId, final int
sequence) {
super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
- this.pipelineJobId = pipelineJobId;
- this.sequence = sequence > MAX_CONSISTENCY_CHECK_VERSION ? 0 :
sequence;
+ this.parentJobId = parentJobId;
+ this.sequence = sequence > MAX_SEQUENCE ? MIN_SEQUENCE : sequence;
}
/**
- * Get consistency check version.
+ * Parse consistency check sequence.
*
- * @param consistencyCheckJobId consistency check job id.
+ * @param checkJobId consistency check job id
* @return sequence
*/
- public static int getSequence(final @NonNull String consistencyCheckJobId)
{
- String versionString =
consistencyCheckJobId.substring(consistencyCheckJobId.length() - 1);
+ public static int parseSequence(final @NonNull String checkJobId) {
+ String versionString = checkJobId.substring(checkJobId.length() - 1);
return Integer.parseInt(versionString);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index b3471ac39b4..fb5841d9fde 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -54,6 +54,6 @@ public final class ConsistencyCheckJobItemContext implements
PipelineJobItemCont
@Override
public PipelineProcessContext getJobProcessContext() {
- throw new UnsupportedOperationException("");
+ throw new UnsupportedOperationException();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 58ba577aa8d..67fd3c59616 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -88,7 +88,9 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
private void prepare(final MigrationJobItemContext jobItemContext) {
try {
+ long startTimeMillis = System.currentTimeMillis();
jobPreparer.prepare(jobItemContext);
+ log.info("prepare cost {} ms", System.currentTimeMillis() -
startTimeMillis);
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
// CHECKSTYLE:ON
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 9d3d1f91e2e..e88940af179 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -255,9 +255,25 @@ public final class MigrationJobAPIImpl extends
AbstractInventoryIncrementalJobAP
@Override
public void rollback(final String jobId) throws SQLException {
log.info("Rollback job {}", jobId);
+ final long startTimeMillis = System.currentTimeMillis();
+ dropCheckJobs(jobId);
stop(jobId);
cleanTempTableOnRollback(jobId);
dropJob(jobId);
+ log.info("Rollback cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ }
+
+ private void dropCheckJobs(final String jobId) {
+ Collection<String> checkJobIds =
PipelineAPIFactory.getGovernanceRepositoryAPI().listCheckJobIds(jobId);
+ if (checkJobIds.isEmpty()) {
+ return;
+ }
+ log.info("dropCheckJobs start...");
+ long startTimeMillis = System.currentTimeMillis();
+ for (String each : checkJobIds) {
+ dropJob(each);
+ }
+ log.info("dropCheckJobs cost {} ms", System.currentTimeMillis() -
startTimeMillis);
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
@@ -281,8 +297,11 @@ public final class MigrationJobAPIImpl extends
AbstractInventoryIncrementalJobAP
public void commit(final String jobId) {
checkModeConfig();
log.info("Commit job {}", jobId);
+ final long startTimeMillis = System.currentTimeMillis();
+ dropCheckJobs(jobId);
stop(jobId);
dropJob(jobId);
+ log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index bf698ad74e0..d7c397c7f4d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -74,7 +74,6 @@ public final class MigrationJobPreparer {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
- // TODO check metadata
if
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
{
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
@@ -84,6 +83,8 @@ public final class MigrationJobPreparer {
}
}
initInventoryTasks(jobItemContext);
+ log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
+ jobItemContext.getJobId(), jobItemContext.getShardingItem(),
jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
private void prepareAndCheckTargetWithLock(final MigrationJobItemContext
jobItemContext) throws SQLException {
@@ -94,8 +95,9 @@ public final class MigrationJobPreparer {
if (null == JOB_API.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem())) {
JOB_API.persistJobItemProgress(jobItemContext);
}
+ long startTimeMillis = System.currentTimeMillis();
if (lockContext.tryLock(lockDefinition, 180000)) {
- log.info("try lock success, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobItemContext.getShardingItem());
+ log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobConfig.getJobId(), jobItemContext.getShardingItem(),
System.currentTimeMillis() - startTimeMillis);
try {
InventoryIncrementalJobItemProgress jobItemProgress =
JOB_API.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
boolean prepareFlag =
JobStatus.PREPARING.equals(jobItemProgress.getStatus()) ||
JobStatus.RUNNING.equals(jobItemProgress.getStatus())
@@ -111,7 +113,7 @@ public final class MigrationJobPreparer {
}
}
} finally {
- log.info("unlock, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobItemContext.getShardingItem());
+ log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobConfig.getJobId(), jobItemContext.getShardingItem(),
System.currentTimeMillis() - startTimeMillis);
lockContext.unlock(lockDefinition);
}
}
@@ -143,15 +145,12 @@ public final class MigrationJobPreparer {
}
private void initInventoryTasks(final MigrationJobItemContext
jobItemContext) {
- log.info("initInventoryTasks, start...");
InventoryDumperConfiguration inventoryDumperConfig = new
InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
PipelineColumnMetaData uniqueKeyColumn =
jobItemContext.getJobConfig().getUniqueKeyColumn();
inventoryDumperConfig.setUniqueKey(uniqueKeyColumn.getName());
inventoryDumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
InventoryTaskSplitter inventoryTaskSplitter = new
InventoryTaskSplitter(jobItemContext.getSourceDataSource(),
inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig());
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
- log.info("initInventoryTasks, jobId={}, shardingItem={},
inventoryTasks={}, incrementalTasks={}",
- jobItemContext.getJobId(), jobItemContext.getShardingItem(),
jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
index 5545cc94dfe..0d0a25a722f 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -61,11 +61,12 @@ public final class ConsistencyCheckJobAPIImplTest {
CreateConsistencyCheckJobParameter parameter = new
CreateConsistencyCheckJobParameter(migrationJobId, null, null);
String checkJobId = checkJobAPI.createJobAndStart(parameter);
ConsistencyCheckJobConfiguration jobConfig =
(ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
- String expectCheckJobId = "j0201j0101test0";
+ int expectedSequence = ConsistencyCheckJobId.MIN_SEQUENCE;
+ String expectCheckJobId = "j0201" + migrationJobId + expectedSequence;
assertThat(jobConfig.getJobId(), is(expectCheckJobId));
assertNull(jobConfig.getAlgorithmTypeName());
- int consistencyCheckVersion =
ConsistencyCheckJobId.getSequence(expectCheckJobId);
- assertThat(consistencyCheckVersion, is(0));
+ int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
+ assertThat(sequence, is(expectedSequence));
}
@Test
@@ -75,11 +76,11 @@ public final class ConsistencyCheckJobAPIImplTest {
CreateConsistencyCheckJobParameter parameter = new
CreateConsistencyCheckJobParameter(jobId.get(), null, null);
String checkJobId = checkJobAPI.createJobAndStart(parameter);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(jobId.get(),
checkJobId);
- Map<String, DataConsistencyCheckResult> expectResult =
Collections.singletonMap("t_order", new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(1, 1),
+ Map<String, DataConsistencyCheckResult> expectedCheckResult =
Collections.singletonMap("t_order", new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(1, 1),
new DataConsistencyContentCheckResult(true)));
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(),
checkJobId, expectResult);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(),
checkJobId, expectedCheckResult);
Map<String, DataConsistencyCheckResult> actualCheckResult =
checkJobAPI.getLatestDataConsistencyCheckResult(jobId.get());
- assertThat(actualCheckResult.size(), is(expectResult.size()));
-
assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(),
is(expectResult.get("t_order").getContentCheckResult().isMatched()));
+ assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
+
assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(),
is(expectedCheckResult.get("t_order").getContentCheckResult().isMatched()));
}
}