This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2059ef63716 Refactor pipeline modules: follow code inspection (#37935)
2059ef63716 is described below
commit 2059ef637160885d6da5f8c773bc070a4cc2d701
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Feb 2 16:51:48 2026 +0800
Refactor pipeline modules: follow code inspection (#37935)
* Non-distinguishable logging calls
* Unstable API Usage
* Extract long code in method
* Refactor PipelineMetaDataNodeWatcher.getInstance
* Refactor TestDecodingPlugin.getColumnName
* Method parameter always has the same value
* Redundant 'throws' clause
* Remove unused PrepareJobWithCheckPrivilegeFailedException
* Remove unused PrepareJobWithInvalidSourceDataSourceException
* Remove unused PrepareJobWithoutEnoughPrivilegeException
* Remove unused PrepareJobWithoutUserException
* Explicit type can be replaced with '<>'
* @NotNull/@Nullable problems
* Constant values
* Nullability and data flow problems
* Result of method call ignored
* Unused assignment
* AutoCloseable used without 'try'-with-resources
* Style
---
.../user-manual/error-code/sql-error-code.cn.md | 4 ---
.../user-manual/error-code/sql-error-code.en.md | 4 ---
...YamlTableDataConsistencyCheckResultSwapper.java | 4 ++-
.../core/context/PipelineContextManager.java | 3 +-
.../pipeline/core/datanode/JobDataNodeEntry.java | 2 +-
...repareJobWithCheckPrivilegeFailedException.java | 34 ----------------------
...areJobWithInvalidSourceDataSourceException.java | 32 --------------------
.../PrepareJobWithoutEnoughPrivilegeException.java | 34 ----------------------
.../job/PrepareJobWithoutUserException.java | 32 --------------------
.../core/execute/PipelineExecuteEngine.java | 9 +++---
.../ingest/dumper/inventory/InventoryDumper.java | 6 ++--
.../core/ingest/dumper/inventory/query/Range.java | 4 +--
.../AbstractRecordTableInventoryCalculator.java | 2 +-
.../AbstractStreamingTableInventoryCalculator.java | 3 +-
.../PipelineContextManagerLifecycleListener.java | 2 +-
.../metadata/model/PipelineColumnMetaData.java | 2 +-
.../core/metadata/model/PipelineTableMetaData.java | 4 +--
.../metadata/node/PipelineMetaDataNodeWatcher.java | 7 ++---
.../ratelimit/type/QPSJobRateLimitAlgorithm.java | 1 +
.../ratelimit/type/TPSJobRateLimitAlgorithm.java | 1 +
...ordTableInventoryCheckCalculatedResultTest.java | 6 ----
.../DataMatchTableDataConsistencyCheckerTest.java | 29 ++++++++++--------
.../CRC32TableInventoryCheckCalculatorTest.java | 8 ++---
.../progress/PipelineJobProgressDetectorTest.java | 3 +-
.../incremental/client/MySQLBinlogClientTest.java | 2 +-
.../netty/MySQLBinlogEventPacketDecoderTest.java | 7 +++--
.../dumper/MySQLIncrementalDumperTest.java | 16 ++++++----
.../incremental/wal/decode/TestDecodingPlugin.java | 9 ++----
.../wal/PostgreSQLLogicalReplicationTest.java | 9 +++++-
.../wal/decode/TestDecodingPluginTest.java | 4 +--
.../PostgreSQLColumnPropertiesAppenderTest.java | 6 ++--
.../PostgreSQLPipelineFreemarkerManagerTest.java | 34 ++++++++++++----------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 4 +--
.../core/importer/CSNRecordsComparatorTest.java | 7 +++--
.../util/ConsistencyCheckSequenceTest.java | 2 +-
35 files changed, 106 insertions(+), 230 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 4c69c618015..d7da4c93448 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -132,10 +132,6 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| 18103 | 42S02 | Can not get meta data for table '%s' when split by
range. |
| 18104 | HY000 | Can not split by unique key '%s' for table '%s'.
|
| 18105 | HY000 | Target table '%s' is not empty.
|
-| 18106 | 01007 | Source data source lacks '%s' privilege(s).
|
-| 18107 | HY000 | Source data source required '%s = %s', now is
'%s'. |
-| 18108 | 42S02 | User '%s' does exist.
|
-| 18109 | 08000 | Check privileges failed on source data source.
|
| 18110 | HY000 | Importer job write data failed.
|
| 18111 | 08000 | Get binlog position failed by job '%s'.
|
| 18112 | HY000 | Can not find consistency check job of '%s'.
|
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 20f4d2ebe3c..07d42ae7c07 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -132,10 +132,6 @@ SQL error codes provide by standard `SQL State`, `Vendor
Code` and `Reason`, whi
| 18103 | 42S02 | Can not get meta data for table '%s' when split by
range. |
| 18104 | HY000 | Can not split by unique key '%s' for table '%s'.
|
| 18105 | HY000 | Target table '%s' is not empty.
|
-| 18106 | 01007 | Source data source lacks '%s' privilege(s).
|
-| 18107 | HY000 | Source data source required '%s = %s', now is
'%s'. |
-| 18108 | 42S02 | User '%s' does exist.
|
-| 18109 | 08000 | Check privileges failed on source data source.
|
| 18110 | HY000 | Importer job write data failed.
|
| 18111 | 08000 | Get binlog position failed by job '%s'.
|
| 18112 | HY000 | Can not find consistency check job of '%s'.
|
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
index cd07e8d48c5..d5ded2db0c6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
@@ -23,6 +23,8 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import java.util.Objects;
+
/**
* Yaml table data consistency check result swapper.
*/
@@ -32,7 +34,7 @@ public final class YamlTableDataConsistencyCheckResultSwapper
implements YamlCon
public YamlTableDataConsistencyCheckResult swapToYamlConfiguration(final
TableDataConsistencyCheckResult data) {
YamlTableDataConsistencyCheckResult result = new
YamlTableDataConsistencyCheckResult();
if (data.isIgnored()) {
- result.setIgnoredType(data.getIgnoredType().name());
+
result.setIgnoredType(Objects.requireNonNull(data.getIgnoredType()).name());
return result;
}
result.setMatched(data.isMatched());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
index 0d63c35fea5..fdb622efe66 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
@@ -63,7 +63,8 @@ public final class PipelineContextManager {
}
/**
- * Remove context.
+ * Remove ContextManager.
+ * It's invoked on <code>ContextManager.close</code>, so removed context
is not necessary to close.
*
* @param key key
*/
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
index 36fd9b06db3..2960899e329 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeEntry.java
@@ -39,7 +39,7 @@ public final class JobDataNodeEntry {
/**
* Unmarshal from text.
*
- * @param text marshalled entry
+ * @param text marshaled entry
* @return entry
*/
public static JobDataNodeEntry unmarshal(final String text) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
deleted file mode 100644
index 1d28a491b05..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-import java.sql.SQLException;
-
-/**
- * Prepare job with check privilege failed exception.
- */
-public final class PrepareJobWithCheckPrivilegeFailedException extends
PipelineJobException {
-
- private static final long serialVersionUID = -8462039913248251254L;
-
- public PrepareJobWithCheckPrivilegeFailedException(final SQLException
cause) {
- super(XOpenSQLState.CONNECTION_EXCEPTION, 9, "Check privileges failed
on source data source.", cause);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
deleted file mode 100644
index 07b4fb8efec..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-/**
- * Prepare job with invalid source data source exception.
- */
-public final class PrepareJobWithInvalidSourceDataSourceException extends
PipelineJobException {
-
- private static final long serialVersionUID = -7710035889344958565L;
-
- public PrepareJobWithInvalidSourceDataSourceException(final String
dataSourceKey, final String toBeCheckedValue, final String actualValue) {
- super(XOpenSQLState.GENERAL_ERROR, 7, String.format("Source data
source required '%s = %s', now is '%s'.", dataSourceKey, toBeCheckedValue,
actualValue));
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
deleted file mode 100644
index dced3195e7e..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-import java.util.Collection;
-
-/**
- * Prepare job without enough privilege exception.
- */
-public final class PrepareJobWithoutEnoughPrivilegeException extends
PipelineJobException {
-
- private static final long serialVersionUID = -8462039913248251254L;
-
- public PrepareJobWithoutEnoughPrivilegeException(final Collection<String>
privileges) {
- super(XOpenSQLState.PRIVILEGE_NOT_GRANTED, 6, String.format("Source
data source lacks '%s' privilege(s).", privileges));
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
deleted file mode 100644
index 74fa81d6143..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
-
-import
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
-
-/**
- * Prepare job without user exception.
- */
-public final class PrepareJobWithoutUserException extends PipelineJobException
{
-
- private static final long serialVersionUID = 7250019436391155770L;
-
- public PrepareJobWithoutUserException(final String username) {
- super(XOpenSQLState.NOT_FOUND, 8, String.format("User '%s' does
exist.", username));
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
index 00d41b94895..4f120adebeb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
@@ -118,12 +118,11 @@ public final class PipelineExecuteEngine {
public static void trigger(final Collection<CompletableFuture<?>> futures,
final ExecuteCallback executeCallback) {
BlockingQueue<CompletableFuture<?>> futureQueue = new
LinkedBlockingQueue<>();
for (CompletableFuture<?> each : futures) {
- each.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
-
- @SneakyThrows(InterruptedException.class)
- @Override
- public void accept(final Object unused, final Throwable
throwable) {
+ each.whenCompleteAsync((BiConsumer<Object, Throwable>) (unused,
throwable) -> {
+ try {
futureQueue.put(each);
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
}
}, CALLBACK_EXECUTOR);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index e102552d6b8..dbb60b1c02f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -125,7 +125,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
columnNames, dumperContext.getUniqueKeyColumns(),
QueryType.RANGE_QUERY, null);
Range<?> range = Range.closed(((UniqueKeyIngestPosition<?>)
initialPosition).getLowerBound(), ((UniqueKeyIngestPosition<?>)
initialPosition).getUpperBound());
calculateParam.setRange(range);
- RecordTableInventoryDumpCalculator dumpCalculator = new
RecordTableInventoryDumpCalculator(dumperContext.getBatchSize(),
StreamingRangeType.SMALL);
+ RecordTableInventoryDumpCalculator dumpCalculator = new
RecordTableInventoryDumpCalculator(dumperContext.getBatchSize());
long rowCount = 0L;
try {
JobRateLimitAlgorithm rateLimitAlgorithm =
dumperContext.getRateLimitAlgorithm();
@@ -227,8 +227,8 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
private class RecordTableInventoryDumpCalculator extends
AbstractRecordTableInventoryCalculator<List<DataRecord>, DataRecord> {
- RecordTableInventoryDumpCalculator(final int chunkSize, final
StreamingRangeType streamingRangeType) {
- super(chunkSize, streamingRangeType);
+ RecordTableInventoryDumpCalculator(final int chunkSize) {
+ super(chunkSize, StreamingRangeType.SMALL);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
index 7ab3d99a0fe..a72ff2bab86 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/Range.java
@@ -48,7 +48,7 @@ public final class Range<T> {
* @return closed range
*/
public static <T> Range<T> closed(final T lowerBound, final T upperBound) {
- return new Range<T>(lowerBound, true, upperBound);
+ return new Range<>(lowerBound, true, upperBound);
}
/**
@@ -60,7 +60,7 @@ public final class Range<T> {
* @return open-closed range
*/
public static <T> Range<T> openClosed(final T lowerBound, final T
upperBound) {
- return new Range<T>(lowerBound, false, upperBound);
+ return new Range<>(lowerBound, false, upperBound);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
index abe132df149..ad87d43a1e5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
@@ -299,7 +299,7 @@ public abstract class
AbstractRecordTableInventoryCalculator<S, C> extends Abstr
for (Object each : uniqueKeysValues) {
preparedStatement.setObject(parameterIndex++, each);
}
- if (null != param.getShardingColumnsNames() &&
!param.getShardingColumnsNames().isEmpty()) {
+ if (!param.getShardingColumnsNames().isEmpty()) {
List<Object> shardingColumnsValues =
param.getShardingColumnsValues();
ShardingSpherePreconditions.checkNotNull(shardingColumnsValues,
() -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new
RuntimeException("Sharding columns values is null when names not empty.")));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
index 4cefbfb1922..78a8b69e682 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.que
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.jspecify.annotations.NonNull;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -56,7 +57,7 @@ public abstract class
AbstractStreamingTableInventoryCalculator<S> extends Abstr
private final TableInventoryCalculateParameter param;
@Override
- public Iterator<S> iterator() {
+ public @NonNull Iterator<S> iterator() {
return new ResultIterator(param);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index f4196932d26..9d9459d3bd1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -54,7 +54,7 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
}
PipelineContextKey contextKey = new
PipelineContextKey(preSelectedDatabaseName,
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType());
PipelineContextManager.putContext(contextKey, contextManager);
- PipelineMetaDataNodeWatcher.getInstance(contextKey);
+ PipelineMetaDataNodeWatcher.init(contextKey);
ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
try {
dispatchEnablePipelineJobStartEvent(contextKey);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
index 5c854dc3dd3..23fd1128de5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
@@ -19,9 +19,9 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.model;
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
+import org.jspecify.annotations.NonNull;
/**
* Column meta data.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index fbe6405eab6..2e930186435 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -19,10 +19,10 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.model;
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.NonNull;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
+import org.jspecify.annotations.NonNull;
import java.util.ArrayList;
import java.util.Collection;
@@ -54,7 +54,7 @@ public final class PipelineTableMetaData {
@Getter
private final Collection<PipelineIndexMetaData> uniqueIndexes;
- public PipelineTableMetaData(final String name, final
Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap, final
Collection<PipelineIndexMetaData> uniqueIndexes) {
+ public PipelineTableMetaData(final @NonNull String name, final
Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap, final
Collection<PipelineIndexMetaData> uniqueIndexes) {
this.name = name;
this.columnMetaDataMap = columnMetaDataMap;
List<PipelineColumnMetaData> columnMetaDataList = new
ArrayList<>(columnMetaDataMap.values());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
index 2c4f3e42bb5..88e74893536 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -73,12 +73,11 @@ public final class PipelineMetaDataNodeWatcher {
}
/**
- * Get instance.
+ * Initialize for context key.
*
* @param contextKey context key
- * @return instance
*/
- public static PipelineMetaDataNodeWatcher getInstance(final
PipelineContextKey contextKey) {
- return INSTANCE_MAP.computeIfAbsent(contextKey,
PipelineMetaDataNodeWatcher::new);
+ public static void init(final PipelineContextKey contextKey) {
+ INSTANCE_MAP.computeIfAbsent(contextKey,
PipelineMetaDataNodeWatcher::new);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
index e2c474aa310..f24dffab605 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/QPSJobRateLimitAlgorithm.java
@@ -29,6 +29,7 @@ import java.util.Properties;
/**
* QPS job rate limit algorithm.
*/
+@SuppressWarnings("UnstableApiUsage")
public final class QPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
private static final String QPS_KEY = "qps";
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
index d2f4a377499..19fbe6478d5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/type/TPSJobRateLimitAlgorithm.java
@@ -29,6 +29,7 @@ import java.util.Properties;
/**
* TPS job rate limit algorithm.
*/
+@SuppressWarnings("UnstableApiUsage")
public final class TPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
private static final String TPS_KEY = "tps";
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
index 9747ebda106..10d241f37d1 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
@@ -29,15 +29,9 @@ import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
class RecordTableInventoryCheckCalculatedResultTest {
- @Test
- void assertNotEqualsWithNull() {
- assertFalse(new RecordTableInventoryCheckCalculatedResult(0,
Collections.emptyList()).equals(null));
- }
-
@Test
void assertEqualsWithSameObject() {
RecordTableInventoryCheckCalculatedResult calculatedResult = new
RecordTableInventoryCheckCalculatedResult(0, Collections.emptyList());
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
index 80fa678a5af..79080c9fedb 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyCheckerTest.java
@@ -36,18 +36,20 @@ class DataMatchTableDataConsistencyCheckerTest {
@Test
void assertChunkSizeInitSuccess() {
for (String each : Arrays.asList("1", "1000")) {
- DataMatchTableDataConsistencyChecker checker = new
DataMatchTableDataConsistencyChecker();
- checker.init(PropertiesBuilder.build(new Property("chunk-size",
each)));
- String actual =
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("chunkSize"),
checker).toString();
- assertThat(actual, is(each));
+ try (DataMatchTableDataConsistencyChecker checker = new
DataMatchTableDataConsistencyChecker()) {
+ checker.init(PropertiesBuilder.build(new
Property("chunk-size", each)));
+ String actual =
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("chunkSize"),
checker).toString();
+ assertThat(actual, is(each));
+ }
}
}
@Test
void assertChunkSizeInitFailure() {
- assertThrows(PipelineInvalidParameterException.class, () -> new
DataMatchTableDataConsistencyChecker().init(PropertiesBuilder.build(new
Property("chunk-size", "xyz"))));
- for (String each : Arrays.asList("0", "-1")) {
- assertThrows(PipelineInvalidParameterException.class, () -> new
DataMatchTableDataConsistencyChecker().init(PropertiesBuilder.build(new
Property("chunk-size", each))));
+ for (String each : Arrays.asList("0", "-1", "xyz")) {
+ try (DataMatchTableDataConsistencyChecker checker = new
DataMatchTableDataConsistencyChecker()) {
+ assertThrows(PipelineInvalidParameterException.class, () ->
checker.init(PropertiesBuilder.build(new Property("chunk-size", each))));
+ }
}
}
@@ -55,15 +57,18 @@ class DataMatchTableDataConsistencyCheckerTest {
@Test
void assertStreamingRangeTypeInitSuccess() {
for (String each : Arrays.asList("small", "large", "SMALL", "LARGE")) {
- DataMatchTableDataConsistencyChecker checker = new
DataMatchTableDataConsistencyChecker();
- checker.init(PropertiesBuilder.build(new
Property("streaming-range-type", each)));
- String actual =
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("streamingRangeType"),
checker).toString();
- assertThat(actual, is(each.toUpperCase()));
+ try (DataMatchTableDataConsistencyChecker checker = new
DataMatchTableDataConsistencyChecker()) {
+ checker.init(PropertiesBuilder.build(new
Property("streaming-range-type", each)));
+ String actual =
Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("streamingRangeType"),
checker).toString();
+ assertThat(actual, is(each.toUpperCase()));
+ }
}
}
@Test
void assertStreamingRangeTypeInitFailure() {
- assertThrows(PipelineInvalidParameterException.class, () -> new
DataMatchTableDataConsistencyChecker().init(PropertiesBuilder.build(new
Property("streaming-range-type", "xyz"))));
+ try (DataMatchTableDataConsistencyChecker checker = new
DataMatchTableDataConsistencyChecker()) {
+ assertThrows(PipelineInvalidParameterException.class, () ->
checker.init(PropertiesBuilder.build(new Property("streaming-range-type",
"xyz"))));
+ }
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
index 8a8b1a783fa..3bfde4bc9ab 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
@@ -75,21 +75,21 @@ class CRC32TableInventoryCheckCalculatorTest {
@Test
void assertCalculateSuccess() throws SQLException {
- PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
+ PreparedStatement preparedStatement0 = mockPreparedStatement(123L);
when(connection.prepareStatement("SELECT CRC32(foo_col) FROM
foo_tbl")).thenReturn(preparedStatement0);
- PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
+ PreparedStatement preparedStatement1 = mockPreparedStatement(456L);
when(connection.prepareStatement("SELECT CRC32(bar_col) FROM
foo_tbl")).thenReturn(preparedStatement1);
Iterator<TableInventoryCheckCalculatedResult> actual = new
CRC32TableInventoryCheckCalculator().calculate(parameter).iterator();
assertThat(actual.next().getRecordsCount(), is(10));
assertFalse(actual.hasNext());
}
- private PreparedStatement mockPreparedStatement(final long
expectedCRC32Result, final int expectedRecordsCount) throws SQLException {
+ private PreparedStatement mockPreparedStatement(final long
expectedCRC32Result) throws SQLException {
ResultSet resultSet = mock(ResultSet.class);
PreparedStatement result = mock(PreparedStatement.class,
RETURNS_DEEP_STUBS);
when(result.executeQuery()).thenReturn(resultSet);
when(resultSet.getLong(1)).thenReturn(expectedCRC32Result);
- when(resultSet.getInt(2)).thenReturn(expectedRecordsCount);
+ when(resultSet.getInt(2)).thenReturn(10);
return result;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
index 57ee254b7df..e100d6880a9 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
@@ -61,8 +61,7 @@ class PipelineJobProgressDetectorTest {
@Test
void assertIsInventoryFinishedWhenCollectionElementIsNull() {
- TransmissionJobItemProgress jobItemProgress = null;
- assertFalse(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(jobItemProgress)));
+ assertFalse(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(null)));
}
@Test
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
index a4752d3acb1..20762731df5 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
@@ -125,7 +125,7 @@ class MySQLBinlogClientTest {
@AfterEach
void tearDown() {
eventLoopGroup.shutdownGracefully();
- Thread.interrupted();
+ Thread.currentThread().interrupt();
}
@Test
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 3c4e9314c7c..bcd8efa300e 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -256,10 +256,10 @@ class MySQLBinlogEventPacketDecoderTest {
@Test
void assertDecodeFormatDescriptionEventWithZeroChecksumAndExtraBytes() {
MySQLBinlogEventPacketDecoder decoderWithoutChecksum = new
MySQLBinlogEventPacketDecoder(0, new ConcurrentHashMap<>(), true);
- ByteBuf byteBuf = createFormatDescriptionEventByteBuf(0, 4);
+ ByteBuf byteBuf = createFormatDescriptionEventByteBuf(4);
decoderWithoutChecksum.decode(channelHandlerContext, byteBuf, new
LinkedList<>());
assertThat(byteBuf.readableBytes(), is(0));
- ByteBuf byteBufWithWarning = createFormatDescriptionEventByteBuf(0, 3);
+ ByteBuf byteBufWithWarning = createFormatDescriptionEventByteBuf(3);
decoderWithoutChecksum.decode(channelHandlerContext,
byteBufWithWarning, new LinkedList<>());
assertThat(byteBufWithWarning.readableBytes(), is(0));
}
@@ -303,9 +303,10 @@ class MySQLBinlogEventPacketDecoderTest {
return result;
}
- private ByteBuf createFormatDescriptionEventByteBuf(final int
checksumLength, final int extraBytesLength) {
+ private ByteBuf createFormatDescriptionEventByteBuf(final int
extraBytesLength) {
ByteBuf result = Unpooled.buffer();
int bodyLength = 2 + 50 + 4 + 1 +
(MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue() - 1) + 1 + 1 +
extraBytesLength;
+ int checksumLength = 0;
int eventSize =
MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH + bodyLength +
checksumLength;
result.writeByte(0);
result.writeIntLE(1);
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 249a2ad2978..bcab78b4570 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -141,12 +141,7 @@ class MySQLIncrementalDumperTest {
MySQLBaseRowsBinlogEvent unsupportedEvent =
mock(MySQLBaseRowsBinlogEvent.class);
when(unsupportedEvent.getDatabaseName()).thenReturn("test");
when(unsupportedEvent.getTableName()).thenReturn("t_order");
- MySQLWriteRowsBinlogEvent filteredEvent = new
MySQLWriteRowsBinlogEvent("binlog-000001", 13L, 2L, "other_db", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
- MySQLWriteRowsBinlogEvent writeEvent = new
MySQLWriteRowsBinlogEvent("binlog-000001", 4L, 5L, "test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
- MySQLUpdateRowsBinlogEvent updateEvent = new
MySQLUpdateRowsBinlogEvent("binlog-000001", 5L, 6L, "test", "t_order",
- Collections.singletonList(new Serializable[]{101, 1, "OK"}),
Collections.singletonList(new Serializable[]{101, 1, "UPDATED"}));
- MySQLDeleteRowsBinlogEvent deleteEvent = new
MySQLDeleteRowsBinlogEvent("binlog-000001", 6L, 7L, "test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
- List<MySQLBaseBinlogEvent> firstPollEvents = Arrays.asList(new
PlaceholderBinlogEvent("binlog-000001", 3L, 1L), writeEvent, updateEvent,
deleteEvent, filteredEvent, unsupportedEvent);
+ List<MySQLBaseBinlogEvent> firstPollEvents =
getMySQLBinlogEvents(unsupportedEvent);
PipelineChannel channel = mock(PipelineChannel.class);
MySQLIncrementalDumper dumper = new
MySQLIncrementalDumper(dumperContext, new MySQLBinlogPosition("binlog-000001",
4L), channel, metaDataLoader);
MySQLBinlogClient client = mock(MySQLBinlogClient.class);
@@ -179,4 +174,13 @@ class MySQLIncrementalDumperTest {
assertThat(pushed.get(4), isA(PlaceholderRecord.class));
assertFalse(dumperThread.isAlive());
}
+
+ private List<MySQLBaseBinlogEvent> getMySQLBinlogEvents(final
MySQLBaseRowsBinlogEvent unsupportedEvent) {
+ MySQLWriteRowsBinlogEvent filteredEvent = new
MySQLWriteRowsBinlogEvent("binlog-000001", 13L, 2L, "other_db", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ MySQLWriteRowsBinlogEvent writeEvent = new
MySQLWriteRowsBinlogEvent("binlog-000001", 4L, 5L, "test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ MySQLUpdateRowsBinlogEvent updateEvent = new
MySQLUpdateRowsBinlogEvent("binlog-000001", 5L, 6L, "test", "t_order",
+ Collections.singletonList(new Serializable[]{101, 1, "OK"}),
Collections.singletonList(new Serializable[]{101, 1, "UPDATED"}));
+ MySQLDeleteRowsBinlogEvent deleteEvent = new
MySQLDeleteRowsBinlogEvent("binlog-000001", 6L, 7L, "test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ return Arrays.asList(new PlaceholderBinlogEvent("binlog-000001", 3L,
1L), writeEvent, updateEvent, deleteEvent, filteredEvent, unsupportedEvent);
+ }
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
index 8b4aa61b9e4..0769caaeba0 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPlugin.java
@@ -136,22 +136,19 @@ public final class TestDecodingPlugin implements
DecodingPlugin {
}
private Object readColumn(final ByteBuffer data) {
- readColumnName(data);
+ skipColumnName(data);
String columnType = readColumnType(data);
data.get();
return readColumnData(data, columnType);
}
- private String readColumnName(final ByteBuffer data) {
- StringBuilder eventType = new StringBuilder();
+ private void skipColumnName(final ByteBuffer data) {
while (data.hasRemaining()) {
char c = (char) data.get();
if ('[' == c) {
- return eventType.toString();
+ return;
}
- eventType.append(c);
}
- return eventType.toString();
}
private String readColumnType(final ByteBuffer data) {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
index ad7f6fec23d..69812580de3 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/PostgreSQLLogicalReplicationTest.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wa
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.BaseLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.PostgreSQLLogSequenceNumber;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -29,6 +30,7 @@ import org.postgresql.PGConnection;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationConnection;
+import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.ChainedStreamBuilder;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
@@ -88,7 +90,12 @@ class PostgreSQLLogicalReplicationTest {
when(chainedLogicalStreamBuilder.withSlotOption(anyString(),
eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder);
BaseLogSequenceNumber basePosition = new
PostgreSQLLogSequenceNumber(startPosition);
logicalReplication.createReplicationStream(connection, "",
basePosition);
- verify(chainedLogicalStreamBuilder).start();
+ PGReplicationStream stream = null;
+ try {
+ stream = verify(chainedLogicalStreamBuilder).start();
+ } finally {
+ QuietlyCloser.close(stream);
+ }
}
@Test
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
index 56c072f7745..26ac8ebadd1 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
@@ -169,12 +169,12 @@ class TestDecodingPluginTest {
BaseTimestampUtils timestampUtils = new BaseTimestampUtils() {
@Override
- public Time toTime(final Calendar cal, final String input) throws
SQLException {
+ public Time toTime(final Calendar cal, final String input) {
return Time.valueOf(input);
}
@Override
- public Timestamp toTimestamp(final Calendar cal, final String
input) throws SQLException {
+ public Timestamp toTimestamp(final Calendar cal, final String
input) {
return Timestamp.valueOf(input);
}
};
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
index 273a3d83d90..69375652600 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/ddl/column/PostgreSQLColumnPropertiesAppenderTest.java
@@ -862,7 +862,7 @@ class PostgreSQLColumnPropertiesAppenderTest {
column.put("cltype", "numeric(5,2)");
Map<String, Object> unmatchedColumn = createUnmatchedColumn();
when(templateExecutor.executeByTemplate(anyMap(),
eq("component/columns/%s/properties.ftl"))).thenReturn(Arrays.asList(column,
unmatchedColumn));
- Map<String, Object> editModeTypesEntry =
createEditModeTypesEntry("alpha");
+ Map<String, Object> editModeTypesEntry = createEditModeTypesEntry();
when(templateExecutor.executeByTemplate(anyMap(),
eq("component/columns/%s/edit_mode_types_multi.ftl"))).thenReturn(Collections.singleton(editModeTypesEntry));
appender.append(context);
Collection<?> columns = (Collection<?>) context.get("columns");
@@ -916,10 +916,10 @@ class PostgreSQLColumnPropertiesAppenderTest {
return result;
}
- private Map<String, Object> createEditModeTypesEntry(final String...
editTypes) {
+ private Map<String, Object> createEditModeTypesEntry() {
Map<String, Object> entry = new LinkedHashMap<>();
entry.put("main_oid", "1");
- entry.put("edit_types", mockSQLArray(editTypes));
+ entry.put("edit_types", mockSQLArray(new String[]{"alpha"}));
return entry;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
index fa82ae69057..40d140d8d01 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/template/PostgreSQLPipelineFreemarkerManagerTest.java
@@ -25,9 +25,9 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.regex.Pattern;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
class PostgreSQLPipelineFreemarkerManagerTest {
@@ -51,23 +51,10 @@ class PostgreSQLPipelineFreemarkerManagerTest {
@Test
void assertCreateTableTemplateRendersNormalizedSequenceNumbers() {
- Map<String, Object> column = new LinkedHashMap<>(16, 1F);
- column.put("name", "id");
- column.put("cltype", "int4");
- column.put("displaytypname", "integer");
- column.put("attnotnull", true);
- column.put("attidentity", "a");
- column.put("colconstype", "i");
- column.put("seqincrement", 1L);
- column.put("seqstart", 1L);
- column.put("seqmin", 1L);
- column.put("seqmax", 2147483647L);
- column.put("seqcache", 1L);
- column.put("seqcycle", false);
Map<String, Object> dataModel = new LinkedHashMap<>(8, 1F);
dataModel.put("schema", "public");
dataModel.put("name", "t_order");
- dataModel.put("columns", Collections.singletonList(column));
+ dataModel.put("columns", Collections.singletonList(getColumn()));
dataModel.put("primary_key", Collections.emptyList());
dataModel.put("unique_constraint", Collections.emptyList());
dataModel.put("foreign_key", Collections.emptyList());
@@ -84,4 +71,21 @@ class PostgreSQLPipelineFreemarkerManagerTest {
String expectedSql =
"CREATETABLEIFNOTEXISTSpublic.t_order(idintegerNOTNULLGENERATEDALWAYSASIDENTITY(INCREMENT1START1MINVALUE1MAXVALUE2147483647CACHE1))";
assertThat(compactSql, is(expectedSql));
}
+
+ private static Map<String, Object> getColumn() {
+ Map<String, Object> column = new LinkedHashMap<>(16, 1F);
+ column.put("name", "id");
+ column.put("cltype", "int4");
+ column.put("displaytypname", "integer");
+ column.put("attnotnull", true);
+ column.put("attidentity", "a");
+ column.put("colconstype", "i");
+ column.put("seqincrement", 1L);
+ column.put("seqstart", 1L);
+ column.put("seqmin", 1L);
+ column.put("seqmax", 2147483647L);
+ column.put("seqcache", 1L);
+ column.put("seqcycle", false);
+ return column;
+ }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 12c10ca71d0..d5624f749b8 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -111,7 +111,7 @@ public final class CDCJob implements PipelineJob {
Collection<CDCJobItemContext> jobItemContexts = new LinkedList<>();
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
if (jobRunnerManager.isStopping()) {
- log.info("Job is stopping, ignore.");
+ log.info("Execute, job is stopping, ignore.");
return;
}
TransmissionJobItemProgress jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
@@ -238,7 +238,7 @@ public final class CDCJob implements PipelineJob {
@Override
public void onSuccess() {
if (jobItemContext.isStopping()) {
- log.info("Job is stopping, ignore.");
+ log.info("onSuccess, job is stopping, ignore.");
return;
}
log.info("All {} tasks finished successful.", identifier);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
index 8a6e2bc0374..2401132527b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.Pipeli
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.Objects;
import java.util.PriorityQueue;
import static org.hamcrest.CoreMatchers.is;
@@ -39,8 +40,8 @@ class CSNRecordsComparatorTest {
queue.add(new CSNRecords(1L, channelProgressPair,
Collections.emptyList()));
queue.add(new CSNRecords(2L, channelProgressPair,
Collections.emptyList()));
assertThat(queue.size(), is(3));
- assertThat(queue.poll().getCsn(), is(1L));
- assertThat(queue.poll().getCsn(), is(2L));
- assertThat(queue.poll().getCsn(), is(3L));
+ assertThat(Objects.requireNonNull(queue.poll()).getCsn(), is(1L));
+ assertThat(Objects.requireNonNull(queue.poll()).getCsn(), is(2L));
+ assertThat(Objects.requireNonNull(queue.poll()).getCsn(), is(3L));
}
}
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
index 5cdae219eeb..9f86a2641a8 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
@@ -35,7 +35,7 @@ class ConsistencyCheckSequenceTest {
int currentSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
assertThat(currentSequence =
ConsistencyCheckSequence.getNextSequence(currentSequence), is(2));
assertThat(currentSequence =
ConsistencyCheckSequence.getNextSequence(currentSequence), is(3));
- assertThat(currentSequence =
ConsistencyCheckSequence.getNextSequence(currentSequence), is(1));
+ assertThat(ConsistencyCheckSequence.getNextSequence(currentSequence),
is(1));
}
@Test