This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f8ae1216ad4 Fix sonar issue of IncrementalTaskProgress (#25637)
f8ae1216ad4 is described below
commit f8ae1216ad4d9ae9a0a4821568003b99878acd33
Author: Liang Zhang <[email protected]>
AuthorDate: Sat May 13 12:21:33 2023 +0800
Fix sonar issue of IncrementalTaskProgress (#25637)
* Fix sonar issue of IncrementalTaskProgress
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 3 +-
.../api/task/progress/IncrementalTaskProgress.java | 51 +++++++++++++++++++---
...YamlJobItemIncrementalTasksProgressSwapper.java | 3 +-
.../data/pipeline/core/task/IncrementalTask.java | 3 +-
.../command/query/PostgreSQLCommand.java | 2 +-
5 files changed, 49 insertions(+), 13 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index b5b87dc36cc..6787dcfbe21 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -169,8 +169,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress();
-
incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
jobItemProgress.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
index cdc7ecbaab8..167c862835d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
@@ -17,18 +17,57 @@
package org.apache.shardingsphere.data.pipeline.api.task.progress;
-import lombok.Getter;
-import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* Incremental task progress.
*/
-@Getter
-@Setter
public final class IncrementalTaskProgress implements TaskProgress {
- private volatile IngestPosition<?> position;
+ private final AtomicReference<IngestPosition<?>> position = new
AtomicReference<>();
+
+ private final AtomicReference<IncrementalTaskDelay> incrementalTaskDelay =
new AtomicReference<>();
+
+ public IncrementalTaskProgress(final IngestPosition<?> position) {
+ this.position.set(position);
+ incrementalTaskDelay.set(new IncrementalTaskDelay());
+ }
+
+ /**
+ * Get position.
+ *
+ * @return position
+ */
+ public IngestPosition<?> getPosition() {
+ return position.get();
+ }
+
+ /**
+ * Set position.
+ *
+ * @param position position
+ */
+ public void setPosition(final IngestPosition<?> position) {
+ this.position.set(position);
+ }
+
+ /**
+ * Get incremental task delay.
+ *
+ * @return incremental task delay
+ */
+ public IncrementalTaskDelay getIncrementalTaskDelay() {
+ return incrementalTaskDelay.get();
+ }
- private IncrementalTaskDelay incrementalTaskDelay = new
IncrementalTaskDelay();
+ /**
+ * Set incremental task delay.
+ *
+ * @param incrementalTaskDelay incremental task delay
+ */
+ public void setIncrementalTaskDelay(final IncrementalTaskDelay
incrementalTaskDelay) {
+ this.incrementalTaskDelay.set(incrementalTaskDelay);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index e2c5b84ebe1..2c5a97f508a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -58,10 +58,9 @@ public final class
YamlJobItemIncrementalTasksProgressSwapper {
if (null == yamlProgress) {
return new JobItemIncrementalTasksProgress(null);
}
- IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
// TODO consider to remove parameter databaseType
PositionInitializer positionInitializer =
PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class,
databaseType);
-
taskProgress.setPosition(positionInitializer.init(yamlProgress.getPosition()));
+ IncrementalTaskProgress taskProgress = new
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
return new JobItemIncrementalTasksProgress(taskProgress);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 15a7d97a663..530c401da71 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -86,8 +86,7 @@ public final class IncrementalTask implements PipelineTask,
AutoCloseable {
}
private IncrementalTaskProgress createIncrementalTaskProgress(final
IngestPosition<?> position, final InventoryIncrementalJobItemProgress
jobItemProgress) {
- IncrementalTaskProgress result = new IncrementalTaskProgress();
- result.setPosition(position);
+ IncrementalTaskProgress result = new IncrementalTaskProgress(position);
if (null != jobItemProgress && null !=
jobItemProgress.getIncremental()) {
Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
.ifPresent(optional ->
result.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
diff --git
a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
index ab7a5cedaa4..92ec40d9d89 100644
---
a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
+++
b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
@@ -154,7 +154,7 @@ public enum PostgreSQLCommand {
}
/*
- * Refer to <a
href="https://bugs.openjdk.java.net/browse/JDK-8161372">JDK-8161372</a>.
+ * Refer to <a
href="https://bugs.openjdk.java.net/browse/JDK-8161372">JDK-8161372</a>.
*/
@SuppressWarnings("OptionalAssignedToNull")
private static Optional<PostgreSQLCommand> getPostgreSQLCommand(final
Class<? extends SQLStatement> sqlStatementClass) {