This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2ecb1fa497f Remove useless PipelineTask.close() (#33392)
2ecb1fa497f is described below
commit 2ecb1fa497f0bf17ad8420cda1b33de877ce732d
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Oct 24 18:07:34 2024 +0800
Remove useless PipelineTask.close() (#33392)
* Remove useless PipelineTask.close()
---
.../data/pipeline/core/task/IncrementalTask.java | 4 ----
.../shardingsphere/data/pipeline/core/task/InventoryTask.java | 4 ----
.../shardingsphere/data/pipeline/core/task/PipelineTask.java | 8 +-------
.../data/pipeline/core/task/TaskExecuteCallback.java | 2 --
.../pipeline/core/task/runner/TransmissionTasksRunner.java | 11 ++---------
.../data/pipeline/cdc/core/task/CDCIncrementalTask.java | 4 ----
.../data/pipeline/cdc/core/task/CDCInventoryTask.java | 4 ----
.../data/pipeline/cdc/core/task/CDCTasksRunner.java | 5 +----
.../data/pipeline/core/task/InventoryTaskTest.java | 1 -
9 files changed, 4 insertions(+), 39 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 1fe7258f6a6..8635391b28b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -66,8 +66,4 @@ public final class IncrementalTask implements PipelineTask {
each.stop();
}
}
-
- @Override
- public void close() {
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 613e47b69e4..8f177233abe 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -71,8 +71,4 @@ public final class InventoryTask implements PipelineTask {
public InventoryTaskProgress getTaskProgress() {
return new InventoryTaskProgress(position.get());
}
-
- @Override
- public void close() {
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 8438044489d..1a1d09259db 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,14 +19,13 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.core.task.progress.TaskProgress;
-import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
/**
* Pipeline task interface.
*/
-public interface PipelineTask extends Closeable {
+public interface PipelineTask {
/**
* Start task.
@@ -53,9 +52,4 @@ public interface PipelineTask extends Closeable {
* @return task progress
*/
TaskProgress getTaskProgress();
-
- /**
- * Close.
- */
- void close();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
index 280c7345f10..0de96b8b098 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.IOUtils;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
/**
@@ -39,6 +38,5 @@ public final class TaskExecuteCallback implements
ExecuteCallback {
public void onFailure(final Throwable throwable) {
log.error("onFailure, task ID={}", task.getTaskId(), throwable);
task.stop();
- IOUtils.closeQuietly(task);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 062946f7e60..48ba7523af3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -35,7 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.Collection;
import java.util.LinkedList;
@@ -120,14 +119,8 @@ public final class TransmissionTasksRunner implements
PipelineTasksRunner {
@Override
public void stop() {
jobItemContext.setStopping(true);
- for (PipelineTask each : inventoryTasks) {
- each.stop();
- QuietlyCloser.close(each);
- }
- for (PipelineTask each : incrementalTasks) {
- each.stop();
- QuietlyCloser.close(each);
- }
+ inventoryTasks.forEach(PipelineTask::stop);
+ incrementalTasks.forEach(PipelineTask::stop);
}
private final class InventoryTaskExecuteCallback implements
ExecuteCallback {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
index f3700cbb2de..0d18ccf4b60 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
@@ -70,8 +70,4 @@ public final class CDCIncrementalTask implements PipelineTask
{
importer.stop();
}
}
-
- @Override
- public void close() {
- }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
index 2226ae09044..6e3eb9066ea 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
@@ -82,8 +82,4 @@ public final class CDCInventoryTask implements PipelineTask {
public InventoryTaskProgress getTaskProgress() {
return new InventoryTaskProgress(position.get());
}
-
- @Override
- public void close() {
- }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
index 0e8f7d52a93..015d3e6666a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -17,9 +17,8 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.task;
-import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -56,11 +55,9 @@ public final class CDCTasksRunner implements
PipelineTasksRunner {
jobItemContext.setStopping(true);
for (PipelineTask each : inventoryTasks) {
each.stop();
- QuietlyCloser.close(each);
}
for (PipelineTask each : incrementalTasks) {
each.stop();
- QuietlyCloser.close(each);
}
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index b7bfbf6942f..3dcf4159bcd 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -78,7 +78,6 @@ class InventoryTaskTest {
PipelineContextUtils.getExecuteEngine(),
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class),
mock(Importer.class), position);
CompletableFuture.allOf(inventoryTask.start().toArray(new
CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
assertThat(inventoryTask.getTaskProgress().getPosition(),
instanceOf(IntegerPrimaryKeyIngestPosition.class));
- inventoryTask.close();
}
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {