This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fbf06128641 Improve pipeline job failure status persistence and usage
(#24745)
fbf06128641 is described below
commit fbf06128641ae784bf29fe6934ae04361da08a45
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Mar 22 19:49:50 2023 +0800
Improve pipeline job failure status persistence and usage (#24745)
* Return inventory position if it's present in InventoryTaskSplitter
* Remove FAILURE job item progress persistence
* Only prepareAndCheckTarget when initProgress is null in
MigrationJobPreparer
---
.../apache/shardingsphere/data/pipeline/api/job/JobStatus.java | 1 +
.../data/pipeline/core/job/AbstractPipelineJob.java | 3 ---
.../data/pipeline/core/prepare/InventoryTaskSplitter.java | 8 +++++---
.../data/pipeline/core/task/InventoryIncrementalTasksRunner.java | 2 --
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 2 --
.../pipeline/scenario/migration/prepare/MigrationJobPreparer.java | 4 ++--
.../test/e2e/data/pipeline/cases/PipelineContainerComposer.java | 3 ---
7 files changed, 8 insertions(+), 15 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index d31c2505715..d7f2cd93e3e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -73,5 +73,6 @@ public enum JobStatus {
*/
EXECUTE_INCREMENTAL_TASK_FAILURE(false);
+ // TODO Remove unused field; Remove unused enum
private final boolean running;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 3b146248b76..0e636e73067 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,7 +22,6 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
@@ -71,8 +70,6 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
// CHECKSTYLE:ON
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId,
jobItemContext.getShardingItem(), ex);
- jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
- jobAPI.persistJobItemProgress(jobItemContext);
jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), ex);
jobAPI.stop(jobId);
if (ex instanceof RuntimeException) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index ecd85d92b3d..2b750b64ed5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -30,7 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.position.NoUniqueKeyPo
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.UnsupportedKeyPosition;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -144,9 +143,12 @@ public final class InventoryTaskSplitter {
private Collection<IngestPosition<?>> getInventoryPositions(final
InventoryIncrementalJobItemContext jobItemContext, final
InventoryDumperConfiguration dumperConfig,
final
DataSource dataSource) {
InventoryIncrementalJobItemProgress initProgress =
jobItemContext.getInitProgress();
- if (null != initProgress && initProgress.getStatus() !=
JobStatus.PREPARING_FAILURE) {
+ if (null != initProgress) {
// Do NOT filter FinishedPosition here, since whole inventory
tasks are required in job progress when persisting to register center.
- return
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
+ Collection<IngestPosition<?>> result =
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
+ if (!result.isEmpty()) {
+ return result;
+ }
}
if (!dumperConfig.hasUniqueKey()) {
return getPositionWithoutUniqueKey(jobItemContext, dataSource,
dumperConfig);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index a26d40ab923..0834db969a9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -141,7 +141,6 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
@Override
public void onFailure(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
-
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
String jobId = jobItemContext.getJobId();
jobAPI.persistJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
jobAPI.stop(jobId);
@@ -158,7 +157,6 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
@Override
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.",
throwable);
-
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
String jobId = jobItemContext.getJobId();
jobAPI.persistJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
jobAPI.stop(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 91a629c9ec6..0850dbbbb2d 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -141,8 +141,6 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
}
log.info("onFailure, check job id: {}, parent job id: {}",
checkJobId, parentJobId, throwable);
checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
- jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
- checkJobAPI.persistJobItemProgress(jobItemContext);
checkJobAPI.stop(checkJobId);
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 7b22e4b3e61..b4b3b0417bb 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -115,7 +115,7 @@ public final class MigrationJobPreparer {
lockContext.unlock(lockDefinition);
}
} else {
- log.warn("jobId={}, shardingItem={} try lock failed",
jobConfig.getJobId(), jobItemContext.getShardingItem());
+ log.warn("try lock failed, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobItemContext.getShardingItem());
}
}
@@ -124,7 +124,7 @@ public final class MigrationJobPreparer {
prepareTarget(jobItemContext);
}
InventoryIncrementalJobItemProgress initProgress =
jobItemContext.getInitProgress();
- if (null == initProgress || initProgress.getStatus() ==
JobStatus.PREPARING_FAILURE) {
+ if (null == initProgress) {
PipelineDataSourceWrapper targetDataSource =
((PipelineDataSourceManager)
jobItemContext.getImporterConnector().getConnector())
.getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
PipelineJobPreparerUtils.checkTargetDataSource(jobItemContext.getJobConfig().getTargetDatabaseType(),
jobItemContext.getTaskConfig().getImporterConfig(),
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 029ea4e5d65..5a3a9cf812d 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -450,9 +450,6 @@ public final class PipelineContainerComposer implements
AutoCloseable {
String incrementalIdleSeconds =
each.get("incremental_idle_seconds").toString();
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ?
0 : Integer.parseInt(incrementalIdleSeconds));
}
-
assertFalse(actualStatus.contains(JobStatus.PREPARING_FAILURE.name()), "status
is JobStatus.PREPARING_FAILURE");
-
assertFalse(actualStatus.contains(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()),
"status is JobStatus.EXECUTE_INVENTORY_TASK_FAILURE");
-
assertFalse(actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()),
"status is JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE");
if (Collections.min(incrementalIdleSecondsList) <= 5) {
Thread.sleep(3000L);
continue;