This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5c91008360e Fix sonar issue of DataSourceImporter (#25643)
5c91008360e is described below
commit 5c91008360e17e37aa1e0b10b1ccb2fe92fd92b2
Author: Liang Zhang <[email protected]>
AuthorDate: Sat May 13 20:31:26 2023 +0800
Fix sonar issue of DataSourceImporter (#25643)
* Fix sonar issue of CalculationContext
* Fix sonar issue of DataSourceImporter
---
.../api/executor/AbstractLifecycleExecutor.java | 4 ++--
.../pipeline/core/importer/DataSourceImporter.java | 25 +++++++++++-----------
2 files changed, 15 insertions(+), 14 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index 89ad3b69224..461034577f3 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -46,7 +46,7 @@ public abstract class AbstractLifecycleExecutor implements
LifecycleExecutor {
private volatile long startTimeMillis;
@Override
- public void start() {
+ public final void start() {
running = true;
startTimeMillis = System.currentTimeMillis();
runBlocking();
@@ -78,7 +78,7 @@ public abstract class AbstractLifecycleExecutor implements
LifecycleExecutor {
protected abstract void doStop() throws Exception;
- protected void cancelStatement(final Statement statement) throws
SQLException {
+ protected final void cancelStatement(final Statement statement) throws
SQLException {
if (null == statement || statement.isClosed()) {
return;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 163b4fd39e8..5fc458ea99b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -53,6 +53,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -77,11 +78,11 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
private final JobRateLimitAlgorithm rateLimitAlgorithm;
- private volatile Statement batchInsertStatement;
+ private final AtomicReference<Statement> batchInsertStatement = new
AtomicReference<>();
- private volatile Statement updateStatement;
+ private final AtomicReference<Statement> updateStatement = new
AtomicReference<>();
- private volatile Statement batchDeleteStatement;
+ private final AtomicReference<Statement> batchDeleteStatement = new
AtomicReference<>();
public DataSourceImporter(final ImporterConfiguration importerConfig,
final ImporterConnector importerConnector, final PipelineChannel channel,
final PipelineJobProgressListener
jobProgressListener) {
@@ -210,7 +211,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
DataRecord dataRecord = dataRecords.get(0);
String insertSql =
pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()),
dataRecord);
try (PreparedStatement preparedStatement =
connection.prepareStatement(insertSql)) {
- batchInsertStatement = preparedStatement;
+ batchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
for (int i = 0; i < each.getColumnCount(); i++) {
@@ -220,7 +221,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
}
preparedStatement.executeBatch();
} finally {
- batchInsertStatement = null;
+ batchInsertStatement.set(null);
}
}
@@ -240,7 +241,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
List<Column> updatedColumns =
pipelineSqlBuilder.extractUpdatedColumns(record);
String updateSql =
pipelineSqlBuilder.buildUpdateSQL(getSchemaName(record.getTableName()), record,
conditionColumns);
try (PreparedStatement preparedStatement =
connection.prepareStatement(updateSql)) {
- updateStatement = preparedStatement;
+ updateStatement.set(preparedStatement);
for (int i = 0; i < updatedColumns.size(); i++) {
preparedStatement.setObject(i + 1,
updatedColumns.get(i).getValue());
}
@@ -259,7 +260,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
log.warn("executeUpdate failed, updateCount={}, updateSql={},
updatedColumns={}, conditionColumns={}", updateCount, updateSql,
updatedColumns, conditionColumns);
}
} finally {
- updateStatement = null;
+ updateStatement.set(null);
}
}
@@ -268,7 +269,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
List<Column> conditionColumns =
RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName()));
String deleteSQL =
pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord, conditionColumns);
try (PreparedStatement preparedStatement =
connection.prepareStatement(deleteSQL)) {
- batchDeleteStatement = preparedStatement;
+ batchDeleteStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
for (int i = 0; i < conditionColumns.size(); i++) {
@@ -285,7 +286,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
log.warn("batchDelete failed, counts={}, sql={},
conditionColumns={}", Arrays.toString(counts), deleteSQL, conditionColumns);
}
} finally {
- batchDeleteStatement = null;
+ batchDeleteStatement.set(null);
}
}
@@ -309,8 +310,8 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
@Override
protected void doStop() throws SQLException {
- cancelStatement(batchInsertStatement);
- cancelStatement(updateStatement);
- cancelStatement(batchDeleteStatement);
+ cancelStatement(batchInsertStatement.get());
+ cancelStatement(updateStatement.get());
+ cancelStatement(batchDeleteStatement.get());
}
}