This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2059ef63716 Refactor pipeline modules: follow code inspection (#37935)
2059ef63716 is described below

commit 2059ef637160885d6da5f8c773bc070a4cc2d701
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Feb 2 16:51:48 2026 +0800

    Refactor pipeline modules: follow code inspection (#37935)
    
    * Non-distinguishable logging calls
    
    * Unstable API Usage
    
    * Extract long code in method
    
    * Refactor PipelineMetaDataNodeWatcher.getInstance
    
    * Refactor TestDecodingPlugin.getColumnName
    
    * Method parameter always has the same value
    
    * Redundant 'throws' clause
    
    * Remove unused PrepareJobWithCheckPrivilegeFailedException
    
    * Remove unused PrepareJobWithInvalidSourceDataSourceException
    
    * Remove unused PrepareJobWithoutEnoughPrivilegeException
    
    * Remove unused PrepareJobWithoutUserException
    
    * Explicit type can be replaced with '<>'
    
    * @NotNull/@Nullable problems
    
    * Constant values
    
    * Nullability and data flow problems
    
    * Result of method call ignored
    
    * Unused assignment
    
    * AutoCloseable used without 'try'-with-resources
    
    * Style
---
 .../user-manual/error-code/sql-error-code.cn.md    |  4 ---
 .../user-manual/error-code/sql-error-code.en.md    |  4 ---
 ...YamlTableDataConsistencyCheckResultSwapper.java |  4 ++-
 .../core/context/PipelineContextManager.java       |  3 +-
 .../pipeline/core/datanode/JobDataNodeEntry.java   |  2 +-
 ...repareJobWithCheckPrivilegeFailedException.java | 34 ----------------------
 ...areJobWithInvalidSourceDataSourceException.java | 32 --------------------
 .../PrepareJobWithoutEnoughPrivilegeException.java | 34 ----------------------
 .../job/PrepareJobWithoutUserException.java        | 32 --------------------
 .../core/execute/PipelineExecuteEngine.java        |  9 +++---
 .../ingest/dumper/inventory/InventoryDumper.java   |  6 ++--
 .../core/ingest/dumper/inventory/query/Range.java  |  4 +--
 .../AbstractRecordTableInventoryCalculator.java    |  2 +-
 .../AbstractStreamingTableInventoryCalculator.java |  3 +-
 .../PipelineContextManagerLifecycleListener.java   |  2 +-
 .../metadata/model/PipelineColumnMetaData.java     |  2 +-
 .../core/metadata/model/PipelineTableMetaData.java |  4 +--
 .../metadata/node/PipelineMetaDataNodeWatcher.java |  7 ++---
 .../ratelimit/type/QPSJobRateLimitAlgorithm.java   |  1 +
 .../ratelimit/type/TPSJobRateLimitAlgorithm.java   |  1 +
 ...ordTableInventoryCheckCalculatedResultTest.java |  6 ----
 .../DataMatchTableDataConsistencyCheckerTest.java  | 29 ++++++++++--------
 .../CRC32TableInventoryCheckCalculatorTest.java    |  8 ++---
 .../progress/PipelineJobProgressDetectorTest.java  |  3 +-
 .../incremental/client/MySQLBinlogClientTest.java  |  2 +-
 .../netty/MySQLBinlogEventPacketDecoderTest.java   |  7 +++--
 .../dumper/MySQLIncrementalDumperTest.java         | 16 ++++++----
 .../incremental/wal/decode/TestDecodingPlugin.java |  9 ++----
 .../wal/PostgreSQLLogicalReplicationTest.java      |  9 +++++-
 .../wal/decode/TestDecodingPluginTest.java         |  4 +--
 .../PostgreSQLColumnPropertiesAppenderTest.java    |  6 ++--
 .../PostgreSQLPipelineFreemarkerManagerTest.java   | 34 ++++++++++++----------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  4 +--
 .../core/importer/CSNRecordsComparatorTest.java    |  7 +++--
 .../util/ConsistencyCheckSequenceTest.java         |  2 +-
 35 files changed, 106 insertions(+), 230 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md 
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 4c69c618015..d7da4c93448 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -132,10 +132,6 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | 18103       | 42S02     | Can not get meta data for table '%s' when split by 
range.                      |
 | 18104       | HY000     | Can not split by unique key '%s' for table '%s'.   
                            |
 | 18105       | HY000     | Target table '%s' is not empty.                    
                            |
-| 18106       | 01007     | Source data source lacks '%s' privilege(s).        
                            |
-| 18107       | HY000     | Source data source required '%s = %s', now is 
'%s'.                            |
-| 18108       | 42S02     | User '%s' does exist.                              
                            |
-| 18109       | 08000     | Check privileges failed on source data source.     
                            |
 | 18110       | HY000     | Importer job write data failed.                    
                            |
 | 18111       | 08000     | Get binlog position failed by job '%s'.            
                            |
 | 18112       | HY000     | Can not find consistency check job of '%s'.        
                            |
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md 
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 20f4d2ebe3c..07d42ae7c07 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -132,10 +132,6 @@ SQL error codes provide by standard `SQL State`, `Vendor 
Code` and `Reason`, whi
 | 18103       | 42S02     | Can not get meta data for table '%s' when split by 
range.                      |
 | 18104       | HY000     | Can not split by unique key '%s' for table '%s'.   
                            |
 | 18105       | HY000     | Target table '%s' is not empty.                    
                            |
-| 18106       | 01007     | Source data source lacks '%s' privilege(s).        
                            |
-| 18107       | HY000     | Source data source required '%s = %s', now is 
'%s'.                            |
-| 18108       | 42S02     | User '%s' does exist.                              
                            |
-| 18109       | 08000     | Check privileges failed on source data source.     
                            |
 | 18110       | HY000     | Importer job write data failed.                    
                            |
 | 18111       | 08000     | Get binlog position failed by job '%s'.            
                            |
 | 18112       | HY000     | Can not find consistency check job of '%s'.        
                            |
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
index cd07e8d48c5..d5ded2db0c6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
@@ -23,6 +23,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
+import java.util.Objects;
+
 /**
  * Yaml table data consistency check result swapper.
  */
@@ -32,7 +34,7 @@ public final class YamlTableDataConsistencyCheckResultSwapper 
implements YamlCon
     public YamlTableDataConsistencyCheckResult swapToYamlConfiguration(final 
TableDataConsistencyCheckResult data) {
         YamlTableDataConsistencyCheckResult result = new 
YamlTableDataConsistencyCheckResult();
         if (data.isIgnored()) {
-            result.setIgnoredType(data.getIgnoredType().name());
+            
result.setIgnoredType(Objects.requireNonNull(data.getIgnoredType()).name());
             return result;
         }
         result.setMatched(data.isMatched());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
index 0d63c35fea5..fdb622efe66 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
@@ -63,7 +63,8 @@ public final class PipelineContextManager {
     }
     
     /**
-     * Remove context.
+     * Remove ContextManager.
+     * It's invoked on <code>ContextManager.close</code>, so removed context 
is not necessary to close.
      *
      * @param key key
      */
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
index 36fd9b06db3..2960899e329 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
@@ -39,7 +39,7 @@ public final class JobDataNodeEntry {
     /**
      * Unmarshal from text.
      *
-     * @param text marshalled entry
+     * @param text marshaled entry
      * @return entry
      */
     public static JobDataNodeEntry unmarshal(final String text) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
deleted file mode 100644
index 1d28a491b05..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import 
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-import java.sql.SQLException;
-
-/**
- * Prepare job with check privilege failed exception.
- */
-public final class PrepareJobWithCheckPrivilegeFailedException extends 
PipelineJobException {
-    
-    private static final long serialVersionUID = -8462039913248251254L;
-    
-    public PrepareJobWithCheckPrivilegeFailedException(final SQLException 
cause) {
-        super(XOpenSQLState.CONNECTION_EXCEPTION, 9, "Check privileges failed 
on source data source.", cause);
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
deleted file mode 100644
index 07b4fb8efec..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import 
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-/**
- * Prepare job with invalid source data source exception.
- */
-public final class PrepareJobWithInvalidSourceDataSourceException extends 
PipelineJobException {
-    
-    private static final long serialVersionUID = -7710035889344958565L;
-    
-    public PrepareJobWithInvalidSourceDataSourceException(final String 
dataSourceKey, final String toBeCheckedValue, final String actualValue) {
-        super(XOpenSQLState.GENERAL_ERROR, 7, String.format("Source data 
source required '%s = %s', now is '%s'.", dataSourceKey, toBeCheckedValue, 
actualValue));
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
deleted file mode 100644
index dced3195e7e..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import 
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-import java.util.Collection;
-
-/**
- * Prepare job without enough privilege exception.
- */
-public final class PrepareJobWithoutEnoughPrivilegeException extends 
PipelineJobException {
-    
-    private static final long serialVersionUID = -8462039913248251254L;
-    
-    public PrepareJobWithoutEnoughPrivilegeException(final Collection<String> 
privileges) {
-        super(XOpenSQLState.PRIVILEGE_NOT_GRANTED, 6, String.format("Source 
data source lacks '%s' privilege(s).", privileges));
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
deleted file mode 100644
index 74fa81d6143..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import 
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-/**
- * Prepare job without user exception.
- */
-public final class PrepareJobWithoutUserException extends PipelineJobException 
{
-    
-    private static final long serialVersionUID = 7250019436391155770L;
-    
-    public PrepareJobWithoutUserException(final String username) {
-        super(XOpenSQLState.NOT_FOUND, 8, String.format("User '%s' does 
exist.", username));
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
index 00d41b94895..4f120adebeb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
@@ -118,12 +118,11 @@ public final class PipelineExecuteEngine {
     public static void trigger(final Collection<CompletableFuture<?>> futures, 
final ExecuteCallback executeCallback) {
         BlockingQueue<CompletableFuture<?>> futureQueue = new 
LinkedBlockingQueue<>();
         for (CompletableFuture<?> each : futures) {
-            each.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
-                
-                @SneakyThrows(InterruptedException.class)
-                @Override
-                public void accept(final Object unused, final Throwable 
throwable) {
+            each.whenCompleteAsync((BiConsumer<Object, Throwable>) (unused, 
throwable) -> {
+                try {
                     futureQueue.put(each);
+                } catch (final InterruptedException ex) {
+                    Thread.currentThread().interrupt();
                 }
             }, CALLBACK_EXECUTOR);
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index e102552d6b8..dbb60b1c02f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -125,7 +125,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 columnNames, dumperContext.getUniqueKeyColumns(), 
QueryType.RANGE_QUERY, null);
         Range<?> range = Range.closed(((UniqueKeyIngestPosition<?>) 
initialPosition).getLowerBound(), ((UniqueKeyIngestPosition<?>) 
initialPosition).getUpperBound());
         calculateParam.setRange(range);
-        RecordTableInventoryDumpCalculator dumpCalculator = new 
RecordTableInventoryDumpCalculator(dumperContext.getBatchSize(), 
StreamingRangeType.SMALL);
+        RecordTableInventoryDumpCalculator dumpCalculator = new 
RecordTableInventoryDumpCalculator(dumperContext.getBatchSize());
         long rowCount = 0L;
         try {
             JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperContext.getRateLimitAlgorithm();
@@ -227,8 +227,8 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     
     private class RecordTableInventoryDumpCalculator extends 
AbstractRecordTableInventoryCalculator<List<DataRecord>, DataRecord> {
         
-        RecordTableInventoryDumpCalculator(final int chunkSize, final 
StreamingRangeType streamingRangeType) {
-            super(chunkSize, streamingRangeType);
+        RecordTableInventoryDumpCalculator(final int chunkSize) {
+            super(chunkSize, StreamingRangeType.SMALL);
         }
         
         @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
index 7ab3d99a0fe..a72ff2bab86 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
@@ -48,7 +48,7 @@ public final class Range<T> {
      * @return closed range
      */
     public static <T> Range<T> closed(final T lowerBound, final T upperBound) {
-        return new Range<T>(lowerBound, true, upperBound);
+        return new Range<>(lowerBound, true, upperBound);
     }
     
     /**
@@ -60,7 +60,7 @@ public final class Range<T> {
      * @return open-closed range
      */
     public static <T> Range<T> openClosed(final T lowerBound, final T 
upperBound) {
-        return new Range<T>(lowerBound, false, upperBound);
+        return new Range<>(lowerBound, false, upperBound);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
index abe132df149..ad87d43a1e5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
@@ -299,7 +299,7 @@ public abstract class 
AbstractRecordTableInventoryCalculator<S, C> extends Abstr
             for (Object each : uniqueKeysValues) {
                 preparedStatement.setObject(parameterIndex++, each);
             }
-            if (null != param.getShardingColumnsNames() && 
!param.getShardingColumnsNames().isEmpty()) {
+            if (!param.getShardingColumnsNames().isEmpty()) {
                 List<Object> shardingColumnsValues = 
param.getShardingColumnsValues();
                 ShardingSpherePreconditions.checkNotNull(shardingColumnsValues,
                         () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new 
RuntimeException("Sharding columns values is null when names not empty.")));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
index 4cefbfb1922..78a8b69e682 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.que
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.jspecify.annotations.NonNull;
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -56,7 +57,7 @@ public abstract class 
AbstractStreamingTableInventoryCalculator<S> extends Abstr
         private final TableInventoryCalculateParameter param;
         
         @Override
-        public Iterator<S> iterator() {
+        public @NonNull Iterator<S> iterator() {
             return new ResultIterator(param);
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index f4196932d26..9d9459d3bd1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -54,7 +54,7 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
         }
         PipelineContextKey contextKey = new 
PipelineContextKey(preSelectedDatabaseName, 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType());
         PipelineContextManager.putContext(contextKey, contextManager);
-        PipelineMetaDataNodeWatcher.getInstance(contextKey);
+        PipelineMetaDataNodeWatcher.init(contextKey);
         ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
         try {
             dispatchEnablePipelineJobStartEvent(contextKey);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
index 5c854dc3dd3..23fd1128de5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
@@ -19,9 +19,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.model;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
-import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
+import org.jspecify.annotations.NonNull;
 
 /**
  * Column meta data.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index fbe6405eab6..2e930186435 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -19,10 +19,10 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.model;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
-import lombok.NonNull;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
+import org.jspecify.annotations.NonNull;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -54,7 +54,7 @@ public final class PipelineTableMetaData {
     @Getter
     private final Collection<PipelineIndexMetaData> uniqueIndexes;
     
-    public PipelineTableMetaData(final String name, final 
Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap, final 
Collection<PipelineIndexMetaData> uniqueIndexes) {
+    public PipelineTableMetaData(final @NonNull String name, final 
Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap, final 
Collection<PipelineIndexMetaData> uniqueIndexes) {
         this.name = name;
         this.columnMetaDataMap = columnMetaDataMap;
         List<PipelineColumnMetaData> columnMetaDataList = new 
ArrayList<>(columnMetaDataMap.values());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
index 2c4f3e42bb5..88e74893536 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -73,12 +73,11 @@ public final class PipelineMetaDataNodeWatcher {
     }
     
     /**
-     * Get instance.
+     * Initialize for context key.
      *
      * @param contextKey context key
-     * @return instance
      */
-    public static PipelineMetaDataNodeWatcher getInstance(final 
PipelineContextKey contextKey) {
-        return INSTANCE_MAP.computeIfAbsent(contextKey, 
PipelineMetaDataNodeWatcher::new);
+    public static void init(final PipelineContextKey contextKey) {
+        INSTANCE_MAP.computeIfAbsent(contextKey, 
PipelineMetaDataNodeWatcher::new);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
index e2c474aa310..f24dffab605 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
@@ -29,6 +29,7 @@ import java.util.Properties;
 /**
  * QPS job rate limit algorithm.
  */
+@SuppressWarnings("UnstableApiUsage")
 public final class QPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
     
     private static final String QPS_KEY = "qps";
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
index d2f4a377499..19fbe6478d5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
@@ -29,6 +29,7 @@ import java.util.Properties;
 /**
  * TPS job rate limit algorithm.
  */
+@SuppressWarnings("UnstableApiUsage")
 public final class TPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
     
     private static final String TPS_KEY = "tps";
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
index 9747ebda106..10d241f37d1 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
@@ -29,15 +29,9 @@ import java.util.Map;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 
 class RecordTableInventoryCheckCalculatedResultTest {
     
-    @Test
-    void assertNotEqualsWithNull() {
-        assertFalse(new RecordTableInventoryCheckCalculatedResult(0, 
Collections.emptyList()).equals(null));
-    }
-    
     @Test
     void assertEqualsWithSameObject() {
         RecordTableInventoryCheckCalculatedResult calculatedResult = new 
RecordTableInventoryCheckCalculatedResult(0, Collections.emptyList());
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
index 80fa678a5af..79080c9fedb 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
@@ -36,18 +36,20 @@ class DataMatchTableDataConsistencyCheckerTest {
     @Test
     void assertChunkSizeInitSuccess() {
         for (String each : Arrays.asList("1", "1000")) {
-            DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker();
-            checker.init(PropertiesBuilder.build(new Property("chunk-size", 
each)));
-            String actual = 
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("chunkSize"),
 checker).toString();
-            assertThat(actual, is(each));
+            try (DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker()) {
+                checker.init(PropertiesBuilder.build(new 
Property("chunk-size", each)));
+                String actual = 
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("chunkSize"),
 checker).toString();
+                assertThat(actual, is(each));
+            }
         }
     }
     
     @Test
     void assertChunkSizeInitFailure() {
-        assertThrows(PipelineInvalidParameterException.class, () -> new 
DataMatchTableDataConsistencyChecker().init(PropertiesBuilder.build(new 
Property("chunk-size", "xyz"))));
-        for (String each : Arrays.asList("0", "-1")) {
-            assertThrows(PipelineInvalidParameterException.class, () -> new 
DataMatchTableDataConsistencyChecker().init(PropertiesBuilder.build(new 
Property("chunk-size", each))));
+        for (String each : Arrays.asList("0", "-1", "xyz")) {
+            try (DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker()) {
+                assertThrows(PipelineInvalidParameterException.class, () -> 
checker.init(PropertiesBuilder.build(new Property("chunk-size", each))));
+            }
         }
     }
     
@@ -55,15 +57,18 @@ class DataMatchTableDataConsistencyCheckerTest {
     @Test
     void assertStreamingRangeTypeInitSuccess() {
         for (String each : Arrays.asList("small", "large", "SMALL", "LARGE")) {
-            DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker();
-            checker.init(PropertiesBuilder.build(new 
Property("streaming-range-type", each)));
-            String actual = 
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("streamingRangeType"),
 checker).toString();
-            assertThat(actual, is(each.toUpperCase()));
+            try (DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker()) {
+                checker.init(PropertiesBuilder.build(new 
Property("streaming-range-type", each)));
+                String actual = 
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("streamingRangeType"),
 checker).toString();
+                assertThat(actual, is(each.toUpperCase()));
+            }
         }
     }
     
     @Test
     void assertStreamingRangeTypeInitFailure() {
-        assertThrows(PipelineInvalidParameterException.class, () -> new 
DataMatchTableDataConsistencyChecker().init(PropertiesBuilder.build(new 
Property("streaming-range-type", "xyz"))));
+        try (DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker()) {
+            assertThrows(PipelineInvalidParameterException.class, () -> 
checker.init(PropertiesBuilder.build(new Property("streaming-range-type", 
"xyz"))));
+        }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
index 8a8b1a783fa..3bfde4bc9ab 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
@@ -75,21 +75,21 @@ class CRC32TableInventoryCheckCalculatorTest {
     
     @Test
     void assertCalculateSuccess() throws SQLException {
-        PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
+        PreparedStatement preparedStatement0 = mockPreparedStatement(123L);
         when(connection.prepareStatement("SELECT CRC32(foo_col) FROM 
foo_tbl")).thenReturn(preparedStatement0);
-        PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
+        PreparedStatement preparedStatement1 = mockPreparedStatement(456L);
         when(connection.prepareStatement("SELECT CRC32(bar_col) FROM 
foo_tbl")).thenReturn(preparedStatement1);
         Iterator<TableInventoryCheckCalculatedResult> actual = new 
CRC32TableInventoryCheckCalculator().calculate(parameter).iterator();
         assertThat(actual.next().getRecordsCount(), is(10));
         assertFalse(actual.hasNext());
     }
     
-    private PreparedStatement mockPreparedStatement(final long 
expectedCRC32Result, final int expectedRecordsCount) throws SQLException {
+    private PreparedStatement mockPreparedStatement(final long 
expectedCRC32Result) throws SQLException {
         ResultSet resultSet = mock(ResultSet.class);
         PreparedStatement result = mock(PreparedStatement.class, 
RETURNS_DEEP_STUBS);
         when(result.executeQuery()).thenReturn(resultSet);
         when(resultSet.getLong(1)).thenReturn(expectedCRC32Result);
-        when(resultSet.getInt(2)).thenReturn(expectedRecordsCount);
+        when(resultSet.getInt(2)).thenReturn(10);
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
index 57ee254b7df..e100d6880a9 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
@@ -61,8 +61,7 @@ class PipelineJobProgressDetectorTest {
     
     @Test
     void assertIsInventoryFinishedWhenCollectionElementIsNull() {
-        TransmissionJobItemProgress jobItemProgress = null;
-        assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(jobItemProgress)));
+        assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(null)));
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
index a4752d3acb1..20762731df5 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
@@ -125,7 +125,7 @@ class MySQLBinlogClientTest {
     @AfterEach
     void tearDown() {
         eventLoopGroup.shutdownGracefully();
-        Thread.interrupted();
+        Thread.currentThread().interrupt();
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 3c4e9314c7c..bcd8efa300e 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -256,10 +256,10 @@ class MySQLBinlogEventPacketDecoderTest {
     @Test
     void assertDecodeFormatDescriptionEventWithZeroChecksumAndExtraBytes() {
         MySQLBinlogEventPacketDecoder decoderWithoutChecksum = new 
MySQLBinlogEventPacketDecoder(0, new ConcurrentHashMap<>(), true);
-        ByteBuf byteBuf = createFormatDescriptionEventByteBuf(0, 4);
+        ByteBuf byteBuf = createFormatDescriptionEventByteBuf(4);
         decoderWithoutChecksum.decode(channelHandlerContext, byteBuf, new 
LinkedList<>());
         assertThat(byteBuf.readableBytes(), is(0));
-        ByteBuf byteBufWithWarning = createFormatDescriptionEventByteBuf(0, 3);
+        ByteBuf byteBufWithWarning = createFormatDescriptionEventByteBuf(3);
         decoderWithoutChecksum.decode(channelHandlerContext, 
byteBufWithWarning, new LinkedList<>());
         assertThat(byteBufWithWarning.readableBytes(), is(0));
     }
@@ -303,9 +303,10 @@ class MySQLBinlogEventPacketDecoderTest {
         return result;
     }
     
-    private ByteBuf createFormatDescriptionEventByteBuf(final int 
checksumLength, final int extraBytesLength) {
+    private ByteBuf createFormatDescriptionEventByteBuf(final int 
extraBytesLength) {
         ByteBuf result = Unpooled.buffer();
         int bodyLength = 2 + 50 + 4 + 1 + 
(MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue() - 1) + 1 + 1 + 
extraBytesLength;
+        int checksumLength = 0;
         int eventSize = 
MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH + bodyLength + 
checksumLength;
         result.writeByte(0);
         result.writeIntLE(1);
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 249a2ad2978..bcab78b4570 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -141,12 +141,7 @@ class MySQLIncrementalDumperTest {
         MySQLBaseRowsBinlogEvent unsupportedEvent = 
mock(MySQLBaseRowsBinlogEvent.class);
         when(unsupportedEvent.getDatabaseName()).thenReturn("test");
         when(unsupportedEvent.getTableName()).thenReturn("t_order");
-        MySQLWriteRowsBinlogEvent filteredEvent = new 
MySQLWriteRowsBinlogEvent("binlog-000001", 13L, 2L, "other_db", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
-        MySQLWriteRowsBinlogEvent writeEvent = new 
MySQLWriteRowsBinlogEvent("binlog-000001", 4L, 5L, "test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
-        MySQLUpdateRowsBinlogEvent updateEvent = new 
MySQLUpdateRowsBinlogEvent("binlog-000001", 5L, 6L, "test", "t_order",
-                Collections.singletonList(new Serializable[]{101, 1, "OK"}), 
Collections.singletonList(new Serializable[]{101, 1, "UPDATED"}));
-        MySQLDeleteRowsBinlogEvent deleteEvent = new 
MySQLDeleteRowsBinlogEvent("binlog-000001", 6L, 7L, "test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
-        List<MySQLBaseBinlogEvent> firstPollEvents = Arrays.asList(new 
PlaceholderBinlogEvent("binlog-000001", 3L, 1L), writeEvent, updateEvent, 
deleteEvent, filteredEvent, unsupportedEvent);
+        List<MySQLBaseBinlogEvent> firstPollEvents = 
getMySQLBinlogEvents(unsupportedEvent);
         PipelineChannel channel = mock(PipelineChannel.class);
         MySQLIncrementalDumper dumper = new 
MySQLIncrementalDumper(dumperContext, new MySQLBinlogPosition("binlog-000001", 
4L), channel, metaDataLoader);
         MySQLBinlogClient client = mock(MySQLBinlogClient.class);
@@ -179,4 +174,13 @@ class MySQLIncrementalDumperTest {
         assertThat(pushed.get(4), isA(PlaceholderRecord.class));
         assertFalse(dumperThread.isAlive());
     }
+    
+    private List<MySQLBaseBinlogEvent> getMySQLBinlogEvents(final 
MySQLBaseRowsBinlogEvent unsupportedEvent) {
+        MySQLWriteRowsBinlogEvent filteredEvent = new 
MySQLWriteRowsBinlogEvent("binlog-000001", 13L, 2L, "other_db", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        MySQLWriteRowsBinlogEvent writeEvent = new 
MySQLWriteRowsBinlogEvent("binlog-000001", 4L, 5L, "test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        MySQLUpdateRowsBinlogEvent updateEvent = new 
MySQLUpdateRowsBinlogEvent("binlog-000001", 5L, 6L, "test", "t_order",
+                Collections.singletonList(new Serializable[]{101, 1, "OK"}), 
Collections.singletonList(new Serializable[]{101, 1, "UPDATED"}));
+        MySQLDeleteRowsBinlogEvent deleteEvent = new 
MySQLDeleteRowsBinlogEvent("binlog-000001", 6L, 7L, "test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        return Arrays.asList(new PlaceholderBinlogEvent("binlog-000001", 3L, 
1L), writeEvent, updateEvent, deleteEvent, filteredEvent, unsupportedEvent);
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
index 8b4aa61b9e4..0769caaeba0 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
@@ -136,22 +136,19 @@ public final class TestDecodingPlugin implements 
DecodingPlugin {
     }
     
     private Object readColumn(final ByteBuffer data) {
-        readColumnName(data);
+        skipColumnName(data);
         String columnType = readColumnType(data);
         data.get();
         return readColumnData(data, columnType);
     }
     
-    private String readColumnName(final ByteBuffer data) {
-        StringBuilder eventType = new StringBuilder();
+    private void skipColumnName(final ByteBuffer data) {
         while (data.hasRemaining()) {
             char c = (char) data.get();
             if ('[' == c) {
-                return eventType.toString();
+                return;
             }
-            eventType.append(c);
         }
-        return eventType.toString();
     }
     
     private String readColumnType(final ByteBuffer data) {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
index ad7f6fec23d..69812580de3 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wa
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.BaseLogSequenceNumber;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.PostgreSQLLogSequenceNumber;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -29,6 +30,7 @@ import org.postgresql.PGConnection;
 import org.postgresql.jdbc.PgConnection;
 import org.postgresql.replication.LogSequenceNumber;
 import org.postgresql.replication.PGReplicationConnection;
+import org.postgresql.replication.PGReplicationStream;
 import org.postgresql.replication.fluent.ChainedStreamBuilder;
 import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
 
@@ -88,7 +90,12 @@ class PostgreSQLLogicalReplicationTest {
         when(chainedLogicalStreamBuilder.withSlotOption(anyString(), 
eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder);
         BaseLogSequenceNumber basePosition = new 
PostgreSQLLogSequenceNumber(startPosition);
         logicalReplication.createReplicationStream(connection, "", 
basePosition);
-        verify(chainedLogicalStreamBuilder).start();
+        PGReplicationStream stream = null;
+        try {
+            stream = verify(chainedLogicalStreamBuilder).start();
+        } finally {
+            QuietlyCloser.close(stream);
+        }
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
index 56c072f7745..26ac8ebadd1 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
@@ -169,12 +169,12 @@ class TestDecodingPluginTest {
         BaseTimestampUtils timestampUtils = new BaseTimestampUtils() {
             
             @Override
-            public Time toTime(final Calendar cal, final String input) throws 
SQLException {
+            public Time toTime(final Calendar cal, final String input) {
                 return Time.valueOf(input);
             }
             
             @Override
-            public Timestamp toTimestamp(final Calendar cal, final String 
input) throws SQLException {
+            public Timestamp toTimestamp(final Calendar cal, final String 
input) {
                 return Timestamp.valueOf(input);
             }
         };
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
index 273a3d83d90..69375652600 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
@@ -862,7 +862,7 @@ class PostgreSQLColumnPropertiesAppenderTest {
         column.put("cltype", "numeric(5,2)");
         Map<String, Object> unmatchedColumn = createUnmatchedColumn();
         when(templateExecutor.executeByTemplate(anyMap(), 
eq("component/columns/%s/properties.ftl"))).thenReturn(Arrays.asList(column, 
unmatchedColumn));
-        Map<String, Object> editModeTypesEntry = 
createEditModeTypesEntry("alpha");
+        Map<String, Object> editModeTypesEntry = createEditModeTypesEntry();
         when(templateExecutor.executeByTemplate(anyMap(), 
eq("component/columns/%s/edit_mode_types_multi.ftl"))).thenReturn(Collections.singleton(editModeTypesEntry));
         appender.append(context);
         Collection<?> columns = (Collection<?>) context.get("columns");
@@ -916,10 +916,10 @@ class PostgreSQLColumnPropertiesAppenderTest {
         return result;
     }
     
-    private Map<String, Object> createEditModeTypesEntry(final String... 
editTypes) {
+    private Map<String, Object> createEditModeTypesEntry() {
         Map<String, Object> entry = new LinkedHashMap<>();
         entry.put("main_oid", "1");
-        entry.put("edit_types", mockSQLArray(editTypes));
+        entry.put("edit_types", mockSQLArray(new String[]{"alpha"}));
         return entry;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
index fa82ae69057..40d140d8d01 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
@@ -25,9 +25,9 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.regex.Pattern;
 
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 class PostgreSQLPipelineFreemarkerManagerTest {
     
@@ -51,23 +51,10 @@ class PostgreSQLPipelineFreemarkerManagerTest {
     
     @Test
     void assertCreateTableTemplateRendersNormalizedSequenceNumbers() {
-        Map<String, Object> column = new LinkedHashMap<>(16, 1F);
-        column.put("name", "id");
-        column.put("cltype", "int4");
-        column.put("displaytypname", "integer");
-        column.put("attnotnull", true);
-        column.put("attidentity", "a");
-        column.put("colconstype", "i");
-        column.put("seqincrement", 1L);
-        column.put("seqstart", 1L);
-        column.put("seqmin", 1L);
-        column.put("seqmax", 2147483647L);
-        column.put("seqcache", 1L);
-        column.put("seqcycle", false);
         Map<String, Object> dataModel = new LinkedHashMap<>(8, 1F);
         dataModel.put("schema", "public");
         dataModel.put("name", "t_order");
-        dataModel.put("columns", Collections.singletonList(column));
+        dataModel.put("columns", Collections.singletonList(getColumn()));
         dataModel.put("primary_key", Collections.emptyList());
         dataModel.put("unique_constraint", Collections.emptyList());
         dataModel.put("foreign_key", Collections.emptyList());
@@ -84,4 +71,21 @@ class PostgreSQLPipelineFreemarkerManagerTest {
         String expectedSql = 
"CREATETABLEIFNOTEXISTSpublic.t_order(idintegerNOTNULLGENERATEDALWAYSASIDENTITY(INCREMENT1START1MINVALUE1MAXVALUE2147483647CACHE1))";
         assertThat(compactSql, is(expectedSql));
     }
+    
+    private static Map<String, Object> getColumn() {
+        Map<String, Object> column = new LinkedHashMap<>(16, 1F);
+        column.put("name", "id");
+        column.put("cltype", "int4");
+        column.put("displaytypname", "integer");
+        column.put("attnotnull", true);
+        column.put("attidentity", "a");
+        column.put("colconstype", "i");
+        column.put("seqincrement", 1L);
+        column.put("seqstart", 1L);
+        column.put("seqmin", 1L);
+        column.put("seqmax", 2147483647L);
+        column.put("seqcache", 1L);
+        column.put("seqcycle", false);
+        return column;
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 12c10ca71d0..d5624f749b8 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -111,7 +111,7 @@ public final class CDCJob implements PipelineJob {
         Collection<CDCJobItemContext> jobItemContexts = new LinkedList<>();
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
             if (jobRunnerManager.isStopping()) {
-                log.info("Job is stopping, ignore.");
+                log.info("Execute, job is stopping, ignore.");
                 return;
             }
             TransmissionJobItemProgress jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
@@ -238,7 +238,7 @@ public final class CDCJob implements PipelineJob {
         @Override
         public void onSuccess() {
             if (jobItemContext.isStopping()) {
-                log.info("Job is stopping, ignore.");
+                log.info("onSuccess, job is stopping, ignore.");
                 return;
             }
             log.info("All {} tasks finished successful.", identifier);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
index 8a6e2bc0374..2401132527b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.Pipeli
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.Objects;
 import java.util.PriorityQueue;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -39,8 +40,8 @@ class CSNRecordsComparatorTest {
         queue.add(new CSNRecords(1L, channelProgressPair, 
Collections.emptyList()));
         queue.add(new CSNRecords(2L, channelProgressPair, 
Collections.emptyList()));
         assertThat(queue.size(), is(3));
-        assertThat(queue.poll().getCsn(), is(1L));
-        assertThat(queue.poll().getCsn(), is(2L));
-        assertThat(queue.poll().getCsn(), is(3L));
+        assertThat(Objects.requireNonNull(queue.poll()).getCsn(), is(1L));
+        assertThat(Objects.requireNonNull(queue.poll()).getCsn(), is(2L));
+        assertThat(Objects.requireNonNull(queue.poll()).getCsn(), is(3L));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
index 5cdae219eeb..9f86a2641a8 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
@@ -35,7 +35,7 @@ class ConsistencyCheckSequenceTest {
         int currentSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         assertThat(currentSequence = 
ConsistencyCheckSequence.getNextSequence(currentSequence), is(2));
         assertThat(currentSequence = 
ConsistencyCheckSequence.getNextSequence(currentSequence), is(3));
-        assertThat(currentSequence = 
ConsistencyCheckSequence.getNextSequence(currentSequence), is(1));
+        assertThat(ConsistencyCheckSequence.getNextSequence(currentSequence), 
is(1));
     }
     
     @Test

Reply via email to