This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f55ce2a94c3 Fix sonar issues in pipeline (#26487)
f55ce2a94c3 is described below

commit f55ce2a94c34481b3cf8ed30f7e2a09d9bfde482
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jun 21 18:18:24 2023 +0800

    Fix sonar issues in pipeline (#26487)
    
    * Move comment
    
    * Reduce Cognitive Complexity of 
SingleTableInventoryDataConsistencyChecker.check0
---
 ...SingleTableInventoryDataConsistencyChecker.java | 23 ++++++++++++++--------
 .../CoordinatorRegistryCenterInitializer.java      |  2 +-
 2 files changed, 16 insertions(+), 9 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 37f6ee9f118..2bb9b58429a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -116,26 +116,33 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
         long sourceRecordsCount = 0;
         long targetRecordsCount = 0;
         boolean contentMatched = true;
-        while (sourceCalculatedResults.hasNext() || 
targetCalculatedResults.hasNext()) {
+        while (sourceCalculatedResults.hasNext() && 
targetCalculatedResults.hasNext()) {
             if (null != readRateLimitAlgorithm) {
                 readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
             }
-            DataConsistencyCalculatedResult sourceCalculatedResult = 
sourceCalculatedResults.hasNext() ? 
waitFuture(executor.submit(sourceCalculatedResults::next)) : null;
-            DataConsistencyCalculatedResult targetCalculatedResult = 
targetCalculatedResults.hasNext() ? 
waitFuture(executor.submit(targetCalculatedResults::next)) : null;
-            sourceRecordsCount += null == sourceCalculatedResult ? 0 : 
sourceCalculatedResult.getRecordsCount();
-            targetRecordsCount += null == targetCalculatedResult ? 0 : 
targetCalculatedResult.getRecordsCount();
+            DataConsistencyCalculatedResult sourceCalculatedResult = 
waitFuture(executor.submit(sourceCalculatedResults::next));
+            DataConsistencyCalculatedResult targetCalculatedResult = 
waitFuture(executor.submit(targetCalculatedResults::next));
+            sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
+            targetRecordsCount += targetCalculatedResult.getRecordsCount();
             contentMatched = Objects.equals(sourceCalculatedResult, 
targetCalculatedResult);
             if (!contentMatched) {
                 log.info("content matched false, jobId={}, sourceTable={}, 
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
                 break;
             }
-            if (null != sourceCalculatedResult && 
sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+            if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 
progressContext.getTableCheckPositions().put(sourceTable.getTableName().getOriginal(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            if (null != targetCalculatedResult && 
targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+            if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
                 
progressContext.getTableCheckPositions().put(targetTable.getTableName().getOriginal(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
-            progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(null == sourceCalculatedResult ? 0 : 
sourceCalculatedResult.getRecordsCount()));
+            progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+        }
+        if (sourceCalculatedResults.hasNext()) {
+            // TODO Refactor DataConsistencyCalculatedResult to represent 
inaccurate number
+            return new DataConsistencyCheckResult(new 
DataConsistencyCountCheckResult(sourceRecordsCount + 1, targetRecordsCount), 
new DataConsistencyContentCheckResult(false));
+        }
+        if (targetCalculatedResults.hasNext()) {
+            return new DataConsistencyCheckResult(new 
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount + 1), 
new DataConsistencyContentCheckResult(false));
         }
         return new DataConsistencyCheckResult(new 
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new 
DataConsistencyContentCheckResult(contentMatched));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
index 36db76a53f1..27e0068f42e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
@@ -47,8 +47,8 @@ public final class CoordinatorRegistryCenterInitializer {
         return result;
     }
     
-    // TODO Merge registry center code in ElasticJob and ShardingSphere mode; 
Use spi to load impl;
     private ZookeeperConfiguration getZookeeperConfig(final 
ClusterPersistRepositoryConfiguration repositoryConfig, final String 
namespaceRelativePath) {
+        // TODO Merge registry center code in ElasticJob and ShardingSphere 
mode; Use SPI to load impl
         Properties props = repositoryConfig.getProps();
         ZookeeperProperties zookeeperProps = new ZookeeperProperties(props);
         String namespace = repositoryConfig.getNamespace() + (null != 
namespaceRelativePath ? namespaceRelativePath : "");

Reply via email to