This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ecb1fa497f Remove useless PipelineTask.close() (#33392)
2ecb1fa497f is described below

commit 2ecb1fa497f0bf17ad8420cda1b33de877ce732d
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Oct 24 18:07:34 2024 +0800

    Remove useless PipelineTask.close() (#33392)
    
    * Remove useless PipelineTask.close()
---
 .../data/pipeline/core/task/IncrementalTask.java              |  4 ----
 .../shardingsphere/data/pipeline/core/task/InventoryTask.java |  4 ----
 .../shardingsphere/data/pipeline/core/task/PipelineTask.java  |  8 +-------
 .../data/pipeline/core/task/TaskExecuteCallback.java          |  2 --
 .../pipeline/core/task/runner/TransmissionTasksRunner.java    | 11 ++---------
 .../data/pipeline/cdc/core/task/CDCIncrementalTask.java       |  4 ----
 .../data/pipeline/cdc/core/task/CDCInventoryTask.java         |  4 ----
 .../data/pipeline/cdc/core/task/CDCTasksRunner.java           |  5 +----
 .../data/pipeline/core/task/InventoryTaskTest.java            |  1 -
 9 files changed, 4 insertions(+), 39 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 1fe7258f6a6..8635391b28b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -66,8 +66,4 @@ public final class IncrementalTask implements PipelineTask {
             each.stop();
         }
     }
-    
-    @Override
-    public void close() {
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 613e47b69e4..8f177233abe 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -71,8 +71,4 @@ public final class InventoryTask implements PipelineTask {
     public InventoryTaskProgress getTaskProgress() {
         return new InventoryTaskProgress(position.get());
     }
-    
-    @Override
-    public void close() {
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 8438044489d..1a1d09259db 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,14 +19,13 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import org.apache.shardingsphere.data.pipeline.core.task.progress.TaskProgress;
 
-import java.io.Closeable;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Pipeline task interface.
  */
-public interface PipelineTask extends Closeable {
+public interface PipelineTask {
     
     /**
      * Start task.
@@ -53,9 +52,4 @@ public interface PipelineTask extends Closeable {
      * @return task progress
      */
     TaskProgress getTaskProgress();
-    
-    /**
-     * Close.
-     */
-    void close();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
index 280c7345f10..0de96b8b098 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.IOUtils;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 
 /**
@@ -39,6 +38,5 @@ public final class TaskExecuteCallback implements 
ExecuteCallback {
     public void onFailure(final Throwable throwable) {
         log.error("onFailure, task ID={}", task.getTaskId(), throwable);
         task.stop();
-        IOUtils.closeQuietly(task);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 062946f7e60..48ba7523af3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -35,7 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -120,14 +119,8 @@ public final class TransmissionTasksRunner implements 
PipelineTasksRunner {
     @Override
     public void stop() {
         jobItemContext.setStopping(true);
-        for (PipelineTask each : inventoryTasks) {
-            each.stop();
-            QuietlyCloser.close(each);
-        }
-        for (PipelineTask each : incrementalTasks) {
-            each.stop();
-            QuietlyCloser.close(each);
-        }
+        inventoryTasks.forEach(PipelineTask::stop);
+        incrementalTasks.forEach(PipelineTask::stop);
     }
     
     private final class InventoryTaskExecuteCallback implements 
ExecuteCallback {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
index f3700cbb2de..0d18ccf4b60 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
@@ -70,8 +70,4 @@ public final class CDCIncrementalTask implements PipelineTask 
{
             importer.stop();
         }
     }
-    
-    @Override
-    public void close() {
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
index 2226ae09044..6e3eb9066ea 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
@@ -82,8 +82,4 @@ public final class CDCInventoryTask implements PipelineTask {
     public InventoryTaskProgress getTaskProgress() {
         return new InventoryTaskProgress(position.get());
     }
-    
-    @Override
-    public void close() {
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
index 0e8f7d52a93..015d3e6666a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -17,9 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.task;
 
-import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 
@@ -56,11 +55,9 @@ public final class CDCTasksRunner implements 
PipelineTasksRunner {
         jobItemContext.setStopping(true);
         for (PipelineTask each : inventoryTasks) {
             each.stop();
-            QuietlyCloser.close(each);
         }
         for (PipelineTask each : incrementalTasks) {
             each.stop();
-            QuietlyCloser.close(each);
         }
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index b7bfbf6942f..3dcf4159bcd 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -78,7 +78,6 @@ class InventoryTaskTest {
                 PipelineContextUtils.getExecuteEngine(), 
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), 
mock(Importer.class), position);
         CompletableFuture.allOf(inventoryTask.start().toArray(new 
CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
         assertThat(inventoryTask.getTaskProgress().getPosition(), 
instanceOf(IntegerPrimaryKeyIngestPosition.class));
-        inventoryTask.close();
     }
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {

Reply via email to