This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c0cd59c Add data consistency check for scaling (#8925)
c0cd59c is described below
commit c0cd59c82aea7c019525b49bdec381962edbd60f
Author: Haoran Meng <[email protected]>
AuthorDate: Thu Jan 7 12:07:02 2021 +0800
Add data consistency check for scaling (#8925)
* Add data consistency check for scaling
* Add data consistency check for scaling
---
.../core/scaling/ScalingServiceHolder.java | 29 ++++++++++++++++++++++
.../scaling/callback/ScalingResultCallback.java | 16 +++++++++++-
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
index e791ba5..1048c85 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
@@ -18,18 +18,22 @@
package org.apache.shardingsphere.governance.core.scaling;
import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
import
org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
import
org.apache.shardingsphere.governance.core.scaling.callback.ScalingResultCallback;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
+import java.util.Map;
import java.util.Optional;
/**
* Scaling service holder.
*/
+@Slf4j
public final class ScalingServiceHolder {
private static final ScalingServiceHolder INSTANCE = new
ScalingServiceHolder();
@@ -72,4 +76,29 @@ public final class ScalingServiceHolder {
ShardingSphereEventBus.getInstance().post(new
SwitchRuleConfigurationEvent(event.getSchemaName(),
event.getRuleConfigurationCacheId()));
}
}
+
+ /**
+ * Check scaling result.
+ *
+ * @param jobId job Id
+ * @return true if scaling result check successfully, else false
+ */
+ public boolean checkScalingResult(final long jobId) {
+ return checkScalingResult(jobId, scalingJobService.check(jobId));
+ }
+
+ private boolean checkScalingResult(final long jobId, final Map<String,
DataConsistencyCheckResult> scalingResult) {
+ if (!scalingResult.isEmpty()) {
+ for (String key : scalingResult.keySet()) {
+ boolean isDataValid = scalingResult.get(key).isDataValid();
+ boolean isCountValid = scalingResult.get(key).isCountValid();
+ if (!isDataValid || !isCountValid) {
+ log.error("Scaling job: {}, table: {} data consistency
check failed, dataValid: {}, countValid: {}", jobId, key, isDataValid,
isCountValid);
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
index a349e93..971d430 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
@@ -18,9 +18,13 @@
package org.apache.shardingsphere.governance.core.scaling.callback;
import
org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.governance.core.scaling.ScalingServiceHolder;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
+import java.util.concurrent.TimeUnit;
+
/**
* Scaling result callback.
*/
@@ -37,7 +41,17 @@ public final class ScalingResultCallback implements
ScalingCallback {
@Override
public void onSuccess(final long jobId) {
- ShardingSphereEventBus.getInstance().post(new
SwitchRuleConfigurationEvent(schemaName, ruleConfigurationCacheId));
+ if (LockContext.getLockStrategy().tryLock(30L, TimeUnit.SECONDS) &&
LockContext.getLockStrategy().checkLock()) {
+ try {
+ Thread.sleep(30000L);
+ if
(ScalingServiceHolder.getInstance().checkScalingResult(jobId)) {
+ ShardingSphereEventBus.getInstance().post(new
SwitchRuleConfigurationEvent(schemaName, ruleConfigurationCacheId));
+ }
+ } catch (final InterruptedException ignored) {
+ } finally {
+ LockContext.getLockStrategy().releaseLock();
+ }
+ }
}
@Override