This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3b19fe744e4 Break success process if job is stopping (#28476)
3b19fe744e4 is described below
commit 3b19fe744e41698f4a5fd834bb13ca9ded82ac4e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Sep 20 19:57:19 2023 +0800
Break success process if job is stopping (#28476)
* Break success process if job is stopping
* Add run_count in job props
* Update IncrementalExecuteCallback
* Update unit test
* Remove log
---
.../pipeline/core/importer/sink/PipelineDataSourceSink.java | 7 +------
.../core/job/service/impl/AbstractPipelineJobAPIImpl.java | 10 +++++++++-
.../core/task/runner/InventoryIncrementalTasksRunner.java | 4 ++++
.../shardingsphere/data/pipeline/cdc/core/job/CDCJob.java | 4 ++++
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 11 +++++++++--
.../consistency/MigrationDataConsistencyCheckerTest.java | 6 +++---
6 files changed, 30 insertions(+), 12 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index b193b5324f5..ca3a5165184 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -43,13 +43,11 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* Pipeline data source sink.
@@ -274,10 +272,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
preparedStatement.addBatch();
}
- int[] counts = preparedStatement.executeBatch();
- if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
- log.warn("batchDelete failed, counts={}, sql={},
dataRecords={}", Arrays.toString(counts), deleteSQL, dataRecords);
- }
+ preparedStatement.executeBatch();
} finally {
batchDeleteStatement.set(null);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index bb32906c9d0..4978dbd6192 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -107,6 +107,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
String createTimeFormat =
LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
+ result.getProps().setProperty("run_count", "1");
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}
@@ -125,6 +126,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
jobConfigPOJO.getProps().remove("stop_time");
jobConfigPOJO.getProps().remove("stop_time_millis");
+ jobConfigPOJO.getProps().setProperty("run_count",
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
"0")) + 1));
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierEnablePath,
jobConfigPOJO.getShardingTotalCount());
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
@@ -154,7 +156,13 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
}
- protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String
jobId) {
+ /**
+ * Get ElasticJob configuration POJO.
+ *
+ * @param jobId job id
+ * @return ElasticJob configuration POJO
+ */
+ public final JobConfigurationPOJO getElasticJobConfigPOJO(final String
jobId) {
JobConfigurationPOJO result =
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(result, () -> new
PipelineJobNotFoundException(jobId));
return result;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 755a049cf12..08415b7ecba 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -149,6 +149,10 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
@Override
public void onSuccess() {
+ if (jobItemContext.isStopping()) {
+ log.info("Inventory task onSuccess, stopping true, ignore");
+ return;
+ }
inventorySuccessCallback();
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 697fc3b9787..5f4edfc4bda 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -190,6 +190,10 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
@Override
public void onSuccess() {
+ if (jobItemContext.isStopping()) {
+ log.info("onSuccess, stopping true, ignore");
+ return;
+ }
log.info("onSuccess, all {} tasks finished.", identifier);
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 4a5a3fe622c..a55671d4a18 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -101,8 +101,11 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
parentJobConfig,
jobAPI.buildPipelineProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap =
checker.check(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
- log.info("job {} with check algorithm '{}' data consistency
checker result: {}", parentJobId, checkJobConfig.getAlgorithmTypeName(),
checkResultMap);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
+ log.info("job {} with check algorithm '{}' data consistency
checker result: {}, stopping: {}",
+ parentJobId, checkJobConfig.getAlgorithmTypeName(),
checkResultMap, jobItemContext.isStopping());
+ if (!jobItemContext.isStopping()) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
checkJobId, checkResultMap);
+ }
} finally {
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
}
@@ -121,6 +124,10 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
@Override
public void onSuccess() {
+ if (jobItemContext.isStopping()) {
+ log.info("onSuccess, stopping true, ignore");
+ return;
+ }
log.info("onSuccess, check job id: {}, parent job id: {}",
checkJobId, parentJobId);
jobItemContext.setStatus(JobStatus.FINISHED);
checkJobAPI.persistJobItemProgress(jobItemContext);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 3862d8c061b..ccafd074e0f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -64,15 +64,15 @@ class MigrationDataConsistencyCheckerTest {
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(),
0, "");
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
-
createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null);
+
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
null);
String checkKey = "t_order";
assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(),
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get(checkKey).getContentCheckResult().isMatched());
}
- private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext() {
- return new ConsistencyCheckJobItemProgressContext("", 0, "H2");
+ private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext(final String jobId) {
+ return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
}
private MigrationJobConfiguration createJobConfiguration() throws
SQLException {