This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c48ca0b501 Review and improve pipeline code (#29773)
3c48ca0b501 is described below

commit 3c48ca0b501ad1166fe0ce2934a3ddfae45475cc
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 19 17:43:19 2024 +0800

    Review and improve pipeline code (#29773)
    
    * Allow stop job repeatedly
    
    * Fix log typo
    
    * Cancel statement ignore exception
    
    * Remove AbstractSeparablePipelineJob no argument constructor
    
    * Update MemoryPipelineChannelTest
---
 .../result/RecordSingleTableInventoryCalculatedResult.java     |  2 +-
 .../data/pipeline/core/job/AbstractSeparablePipelineJob.java   |  5 -----
 .../data/pipeline/core/job/service/PipelineJobManager.java     |  3 ---
 .../data/pipeline/core/util/PipelineJdbcUtils.java             | 10 ++++------
 .../core/channel/memory/MemoryPipelineChannelTest.java         |  8 ++++----
 5 files changed, 9 insertions(+), 19 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
index 8bd104982f0..2e3aa64ac71 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
@@ -81,7 +81,7 @@ public final class RecordSingleTableInventoryCalculatedResult 
implements SingleT
                 return false;
             }
             if (!DataConsistencyCheckUtils.recordsEquals(thisRecord, 
thatRecord, equalsBuilder)) {
-                log.warn("Records not equals, record1={}, record2={}.", 
thatRecord, thatRecord);
+                log.warn("Records not equals, record1={}, record2={}.", 
thisRecord, thatRecord);
                 return false;
             }
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 56ce33a6d83..23e85e456d3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -57,11 +57,6 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
-    // TODO Remove constructor
-    protected AbstractSeparablePipelineJob() {
-        this("");
-    }
-    
     protected AbstractSeparablePipelineJob(final String jobId) {
         jobRunnerManager = new PipelineJobRunnerManager();
         jobProcessContext = isTransmissionProcessContextNeeded() ? 
createTransmissionProcessContext(jobId) : null;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 8ced0ef5853..7f1995d95ee 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -141,9 +141,6 @@ public final class PipelineJobManager {
         PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
         
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        if (jobConfigPOJO.isDisabled()) {
-            return;
-        }
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
         String barrierPath = 
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index 4ffd6621cf0..94e4167e6d4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -19,9 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
 
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 
@@ -83,15 +81,15 @@ public final class PipelineJdbcUtils {
      * Cancel statement.
      *
      * @param statement statement
-     * @throws SQLWrapperException if cancelling statement failed
      */
-    public static void cancelStatement(final Statement statement) throws 
SQLWrapperException {
+    public static void cancelStatement(final Statement statement) {
         try {
             if (!statement.isClosed()) {
                 statement.cancel();
             }
-        } catch (final SQLException ex) {
-            throw new SQLWrapperException(ex);
+            // CHECKSTYLE:OFF
+        } catch (final Exception ignored) {
+            // CHECKSTYLE:ON
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index 651dd9166cb..4d6f941fe34 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -41,13 +41,13 @@ class MemoryPipelineChannelTest {
     void assertZeroQueueSizeWorks() {
         MemoryPipelineChannel channel = new MemoryPipelineChannel(0, new 
InventoryTaskAckCallback(new AtomicReference<>()));
         List<Record> records = Collections.singletonList(new 
PlaceholderRecord(new IngestFinishedPosition()));
-        Semaphore semaphore = new Semaphore(1);
+        CountDownLatch latch = new CountDownLatch(1);
         Thread thread = new Thread(() -> {
-            semaphore.release();
+            latch.countDown();
             channel.push(records);
         });
         thread.start();
-        assertTrue(semaphore.tryAcquire(1L, TimeUnit.SECONDS));
+        assertTrue(latch.await(1L, TimeUnit.SECONDS));
         assertThat(channel.fetch(1, 500L), is(records));
     }
     

Reply via email to