This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f55ce2a94c3 Fix sonar issues in pipeline (#26487)
f55ce2a94c3 is described below
commit f55ce2a94c34481b3cf8ed30f7e2a09d9bfde482
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jun 21 18:18:24 2023 +0800
Fix sonar issues in pipeline (#26487)
* Move comment
* Reduce Cognitive Complexity of
SingleTableInventoryDataConsistencyChecker.check0
---
...SingleTableInventoryDataConsistencyChecker.java | 23 ++++++++++++++--------
.../CoordinatorRegistryCenterInitializer.java | 2 +-
2 files changed, 16 insertions(+), 9 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 37f6ee9f118..2bb9b58429a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -116,26 +116,33 @@ public final class
SingleTableInventoryDataConsistencyChecker {
long sourceRecordsCount = 0;
long targetRecordsCount = 0;
boolean contentMatched = true;
- while (sourceCalculatedResults.hasNext() ||
targetCalculatedResults.hasNext()) {
+ while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
if (null != readRateLimitAlgorithm) {
readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
- DataConsistencyCalculatedResult sourceCalculatedResult =
sourceCalculatedResults.hasNext() ?
waitFuture(executor.submit(sourceCalculatedResults::next)) : null;
- DataConsistencyCalculatedResult targetCalculatedResult =
targetCalculatedResults.hasNext() ?
waitFuture(executor.submit(targetCalculatedResults::next)) : null;
- sourceRecordsCount += null == sourceCalculatedResult ? 0 :
sourceCalculatedResult.getRecordsCount();
- targetRecordsCount += null == targetCalculatedResult ? 0 :
targetCalculatedResult.getRecordsCount();
+ DataConsistencyCalculatedResult sourceCalculatedResult =
waitFuture(executor.submit(sourceCalculatedResults::next));
+ DataConsistencyCalculatedResult targetCalculatedResult =
waitFuture(executor.submit(targetCalculatedResults::next));
+ sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
+ targetRecordsCount += targetCalculatedResult.getRecordsCount();
contentMatched = Objects.equals(sourceCalculatedResult,
targetCalculatedResult);
if (!contentMatched) {
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
break;
}
- if (null != sourceCalculatedResult &&
sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+ if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
progressContext.getTableCheckPositions().put(sourceTable.getTableName().getOriginal(),
sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
- if (null != targetCalculatedResult &&
targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
+ if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
progressContext.getTableCheckPositions().put(targetTable.getTableName().getOriginal(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
}
- progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(null == sourceCalculatedResult ? 0 :
sourceCalculatedResult.getRecordsCount()));
+ progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ }
+ if (sourceCalculatedResults.hasNext()) {
+ // TODO Refactor DataConsistencyCalculatedResult to represent
inaccurate number
+ return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount + 1, targetRecordsCount),
new DataConsistencyContentCheckResult(false));
+ }
+ if (targetCalculatedResults.hasNext()) {
+ return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount + 1),
new DataConsistencyContentCheckResult(false));
}
return new DataConsistencyCheckResult(new
DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new
DataConsistencyContentCheckResult(contentMatched));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
index 36db76a53f1..27e0068f42e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registry/CoordinatorRegistryCenterInitializer.java
@@ -47,8 +47,8 @@ public final class CoordinatorRegistryCenterInitializer {
return result;
}
- // TODO Merge registry center code in ElasticJob and ShardingSphere mode;
Use spi to load impl;
private ZookeeperConfiguration getZookeeperConfig(final
ClusterPersistRepositoryConfiguration repositoryConfig, final String
namespaceRelativePath) {
+ // TODO Merge registry center code in ElasticJob and ShardingSphere
mode; Use SPI to load impl
Properties props = repositoryConfig.getProps();
ZookeeperProperties zookeeperProps = new ZookeeperProperties(props);
String namespace = repositoryConfig.getNamespace() + (null !=
namespaceRelativePath ? namespaceRelativePath : "");