This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e51d73d0b2c Improve cleaning on pipeline job stopping (#26839)
e51d73d0b2c is described below
commit e51d73d0b2cf5c6fc704f7c782c1d43e1589d891
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Jul 8 18:10:51 2023 +0800
Improve cleaning on pipeline job stopping (#26839)
---
.../data/pipeline/core/job/AbstractPipelineJob.java | 9 ++++-----
.../pipeline/core/job/AbstractSimplePipelineJob.java | 15 ---------------
.../data/pipeline/cdc/core/job/CDCJob.java | 16 +---------------
.../data/pipeline/cases/PipelineContainerComposer.java | 2 +-
4 files changed, 6 insertions(+), 36 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index a74f5aa1bf8..b071052777d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.common.util.CloseUtils;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -37,7 +38,6 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -121,10 +121,6 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
return new ArrayList<>(tasksRunnerMap.keySet());
}
- protected Collection<PipelineTasksRunner> getTasksRunners() {
- return Collections.unmodifiableCollection(tasksRunnerMap.values());
- }
-
protected boolean addTasksRunner(final int shardingItem, final
PipelineTasksRunner tasksRunner) {
if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
log.warn("shardingItem {} tasks runner exists, ignore",
shardingItem);
@@ -178,6 +174,9 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private void innerClean() {
PipelineJobProgressPersistService.remove(jobId);
+ for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+
CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
+ }
}
protected abstract void doClean();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 2aba307d8c3..7480585c916 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.common.util.CloseUtils;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -46,14 +45,6 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
@Override
public void execute(final ShardingContext shardingContext) {
- try {
- execute0(shardingContext);
- } finally {
- clean();
- }
- }
-
- private void execute0(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
@@ -71,10 +62,4 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
}
-
- private void clean() {
- for (PipelineTasksRunner each : getTasksRunners()) {
-
CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
- }
- }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 1d7ee243f9a..c62bdfe7032 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
@@ -73,14 +73,6 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
- try {
- execute0(shardingContext);
- } finally {
- clean();
- }
- }
-
- private void execute0(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
@@ -108,12 +100,6 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
executeIncrementalTasks(jobItemContexts);
}
- private void clean() {
- for (PipelineTasksRunner each : getTasksRunners()) {
-
CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
- }
- }
-
private CDCJobItemContext buildPipelineJobItemContext(final
CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<InventoryIncrementalJobItemProgress> initProgress =
jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index dcd6bdcfc40..6c50af1aaa7 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -458,7 +458,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
Set<String> actualStatus = new HashSet<>();
Collection<Integer> incrementalIdleSecondsList = new
LinkedList<>();
for (Map<String, Object> each : listJobStatus) {
-
assertTrue(Strings.isNullOrEmpty(each.get("error_message").toString()),
"error_message is not null");
+ assertTrue(Strings.isNullOrEmpty((String)
each.get("error_message")), "error_message: `" + each.get("error_message") +
"`");
actualStatus.add(each.get("status").toString());
String incrementalIdleSeconds =
each.get("incremental_idle_seconds").toString();
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ?
0 : Integer.parseInt(incrementalIdleSeconds));