This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c0cd59c  Add data consistency check for scaling (#8925)
c0cd59c is described below

commit c0cd59c82aea7c019525b49bdec381962edbd60f
Author: Haoran Meng <[email protected]>
AuthorDate: Thu Jan 7 12:07:02 2021 +0800

    Add data consistency check for scaling (#8925)
    
    * Add data consistency check for scaling
    
    * Add data consistency check for scaling
---
 .../core/scaling/ScalingServiceHolder.java         | 29 ++++++++++++++++++++++
 .../scaling/callback/ScalingResultCallback.java    | 16 +++++++++++-
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
index e791ba5..1048c85 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
@@ -18,18 +18,22 @@
 package org.apache.shardingsphere.governance.core.scaling;
 
 import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
 import 
org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
 import 
org.apache.shardingsphere.governance.core.scaling.callback.ScalingResultCallback;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import 
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
 
+import java.util.Map;
 import java.util.Optional;
 
 /**
  * Scaling service holder.
  */
+@Slf4j
 public final class ScalingServiceHolder {
     
     private static final ScalingServiceHolder INSTANCE = new 
ScalingServiceHolder();
@@ -72,4 +76,29 @@ public final class ScalingServiceHolder {
             ShardingSphereEventBus.getInstance().post(new 
SwitchRuleConfigurationEvent(event.getSchemaName(), 
event.getRuleConfigurationCacheId()));
         }
     }
+    
+    /**
+     * Check scaling result.
+     * 
+     * @param jobId job Id
+     * @return true if scaling result check successfully, else false
+     */
+    public boolean checkScalingResult(final long jobId) {
+        return checkScalingResult(jobId, scalingJobService.check(jobId));
+    }
+    
+    private boolean checkScalingResult(final long jobId, final Map<String, 
DataConsistencyCheckResult> scalingResult) {
+        if (!scalingResult.isEmpty()) {
+            for (String key : scalingResult.keySet()) {
+                boolean isDataValid = scalingResult.get(key).isDataValid(); 
+                boolean isCountValid = scalingResult.get(key).isCountValid();
+                if (!isDataValid || !isCountValid) {
+                    log.error("Scaling job: {}, table: {} data consistency 
check failed, dataValid: {}, countValid: {}", jobId, key, isDataValid, 
isCountValid);
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
 }
diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
index a349e93..971d430 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
@@ -18,9 +18,13 @@
 package org.apache.shardingsphere.governance.core.scaling.callback;
 
 import 
org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.governance.core.scaling.ScalingServiceHolder;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Scaling result callback.
  */
@@ -37,7 +41,17 @@ public final class ScalingResultCallback implements 
ScalingCallback {
     
     @Override
     public void onSuccess(final long jobId) {
-        ShardingSphereEventBus.getInstance().post(new 
SwitchRuleConfigurationEvent(schemaName, ruleConfigurationCacheId));
+        if (LockContext.getLockStrategy().tryLock(30L, TimeUnit.SECONDS) && 
LockContext.getLockStrategy().checkLock()) {
+            try {
+                Thread.sleep(30000L);
+                if 
(ScalingServiceHolder.getInstance().checkScalingResult(jobId)) {
+                    ShardingSphereEventBus.getInstance().post(new 
SwitchRuleConfigurationEvent(schemaName, ruleConfigurationCacheId));
+                }  
+            } catch (final InterruptedException ignored) {
+            } finally {
+                LockContext.getLockStrategy().releaseLock();
+            }
+        }
     }
     
     @Override

Reply via email to