This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3c48ca0b501 Review and improve pipeline code (#29773)
3c48ca0b501 is described below
commit 3c48ca0b501ad1166fe0ce2934a3ddfae45475cc
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 19 17:43:19 2024 +0800
Review and improve pipeline code (#29773)
* Allow stop job repeatedly
* Fix log typo
* Cancel statement ignore exception
* Remove AbstractSeparablePipelineJob no argument constructor
* Update MemoryPipelineChannelTest
---
.../result/RecordSingleTableInventoryCalculatedResult.java | 2 +-
.../data/pipeline/core/job/AbstractSeparablePipelineJob.java | 5 -----
.../data/pipeline/core/job/service/PipelineJobManager.java | 3 ---
.../data/pipeline/core/util/PipelineJdbcUtils.java | 10 ++++------
.../core/channel/memory/MemoryPipelineChannelTest.java | 8 ++++----
5 files changed, 9 insertions(+), 19 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
index 8bd104982f0..2e3aa64ac71 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
@@ -81,7 +81,7 @@ public final class RecordSingleTableInventoryCalculatedResult
implements SingleT
return false;
}
if (!DataConsistencyCheckUtils.recordsEquals(thisRecord,
thatRecord, equalsBuilder)) {
- log.warn("Records not equals, record1={}, record2={}.",
thatRecord, thatRecord);
+ log.warn("Records not equals, record1={}, record2={}.",
thisRecord, thatRecord);
return false;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 56ce33a6d83..23e85e456d3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -57,11 +57,6 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
- // TODO Remove constructor
- protected AbstractSeparablePipelineJob() {
- this("");
- }
-
protected AbstractSeparablePipelineJob(final String jobId) {
jobRunnerManager = new PipelineJobRunnerManager();
jobProcessContext = isTransmissionProcessContextNeeded() ?
createTransmissionProcessContext(jobId) : null;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 8ced0ef5853..7f1995d95ee 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -141,9 +141,6 @@ public final class PipelineJobManager {
PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- if (jobConfigPOJO.isDisabled()) {
- return;
- }
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
String barrierPath =
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index 4ffd6621cf0..94e4167e6d4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -19,9 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
-import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
@@ -83,15 +81,15 @@ public final class PipelineJdbcUtils {
* Cancel statement.
*
* @param statement statement
- * @throws SQLWrapperException if cancelling statement failed
*/
- public static void cancelStatement(final Statement statement) throws
SQLWrapperException {
+ public static void cancelStatement(final Statement statement) {
try {
if (!statement.isClosed()) {
statement.cancel();
}
- } catch (final SQLException ex) {
- throw new SQLWrapperException(ex);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ignored) {
+ // CHECKSTYLE:ON
}
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index 651dd9166cb..4d6f941fe34 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -41,13 +41,13 @@ class MemoryPipelineChannelTest {
void assertZeroQueueSizeWorks() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(0, new
InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new
PlaceholderRecord(new IngestFinishedPosition()));
- Semaphore semaphore = new Semaphore(1);
+ CountDownLatch latch = new CountDownLatch(1);
Thread thread = new Thread(() -> {
- semaphore.release();
+ latch.countDown();
channel.push(records);
});
thread.start();
- assertTrue(semaphore.tryAcquire(1L, TimeUnit.SECONDS));
+ assertTrue(latch.await(1L, TimeUnit.SECONDS));
assertThat(channel.fetch(1, 500L), is(records));
}