This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 80cc292  Add sourceWritingStopAlgorithm SPI and DEFAULT impl, add 
checkoutLockAlgorithm SPI and DEFAULT impl, add sourceWritingStopper and 
checkoutLocker in config (#14232)
80cc292 is described below

commit 80cc29217f9d242a34a30f4a037a7028a448d06c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 22 19:17:24 2021 +0800

    Add sourceWritingStopAlgorithm SPI and DEFAULT impl, add 
checkoutLockAlgorithm SPI and DEFAULT impl, add sourceWritingStopper and 
checkoutLocker in config (#14232)
    
    * Add sourceWritingStopper in config
    
    * Add sourceWritingStopAlgorithm SPI and DEFAULT impl
    
    * Add checkoutLocker in config
    
    * Add checkoutLockAlgorithm SPI and DEFAULT impl
    
    * Unit test
---
 .../resources/yaml/encrypt-dataConverters.yaml     |  4 ++
 .../src/test/resources/yaml/sharding-rule.yaml     |  8 ++++
 .../src/test/resources/yaml/sharding-scaling.yaml  |  4 ++
 .../OnRuleAlteredActionConfiguration.java          |  4 ++
 .../YamlOnRuleAlteredActionConfiguration.java      |  4 ++
 ...ingSphereAlgorithmConfigurationYamlSwapper.java |  6 +++
 ...nRuleAlteredActionConfigurationYamlSwapper.java |  6 ++-
 ...eAlteredActionConfigurationYamlSwapperTest.java |  4 ++
 .../data/pipeline/api/PipelineJobAPI.java          |  1 +
 .../data/pipeline/core/job/FinishedCheckJob.java   | 52 +++++++++++++++++-----
 .../DefaultRuleAlteredCheckoutLockAlgorithm.java   | 34 +++++++-------
 ...faultRuleAlteredSourceWritingStopAlgorithm.java | 34 +++++++-------
 .../scenario/rulealtered/RuleAlteredContext.java   | 18 ++++++++
 .../RuleAlteredCheckoutLockAlgorithm.java          | 37 +++++++--------
 .../RuleAlteredSourceWritingStopAlgorithm.java     | 37 +++++++--------
 ...pi.rulealtered.RuleAlteredCheckoutLockAlgorithm | 19 +-------
 ...lealtered.RuleAlteredSourceWritingStopAlgorithm | 19 +-------
 .../src/main/resources/conf/config-sharding.yaml   |  4 ++
 18 files changed, 180 insertions(+), 115 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index eb06d0c..63761f4 100644
--- 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++ 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -29,7 +29,11 @@ dataConverters:
       type: IDLE
       props:
         incremental-task-idle-minute-threshold: 1
+    sourceWritingStopper:
+      type: DEFAULT
     dataConsistencyChecker:
       type: DATA_MATCH
       props:
         chunk-size: 10000
+    checkoutLocker:
+      type: DEFAULT
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 7e8ecc4..c768f47 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -111,14 +111,22 @@ rules:
       blockQueueSize: 10000
       workerThread: 40
       readBatchSize: 1000
+      rateLimiter:
+        type: SOURCE
+        props:
+          qps: 50
       completionDetector:
         type: IDLE
         props:
           incremental-task-idle-minute-threshold: 30
+      sourceWritingStopper:
+        type: DEFAULT
       dataConsistencyChecker:
         type: DATA_MATCH
         props:
           chunk-size: 1000
+      checkoutLocker:
+        type: DEFAULT
 
 props:
   sql-show: true
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 42d2df1..40fed15 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -29,7 +29,11 @@ scaling:
       type: IDLE
       props:
         incremental-task-idle-minute-threshold: 30
+    sourceWritingStopper:
+      type: DEFAULT
     dataConsistencyChecker:
       type: DATA_MATCH
       props:
         chunk-size: 1000
+    checkoutLocker:
+      type: DEFAULT
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 71dfd14..29acf05 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -38,5 +38,9 @@ public final class OnRuleAlteredActionConfiguration {
     
     private final ShardingSphereAlgorithmConfiguration completionDetector;
     
+    private final ShardingSphereAlgorithmConfiguration sourceWritingStopper;
+    
     private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    
+    private final ShardingSphereAlgorithmConfiguration checkoutLocker;
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index b909fc4..dfc5ab8 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -39,5 +39,9 @@ public final class YamlOnRuleAlteredActionConfiguration 
implements YamlConfigura
     
     private YamlShardingSphereAlgorithmConfiguration completionDetector;
     
+    private YamlShardingSphereAlgorithmConfiguration sourceWritingStopper;
+    
     private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    
+    private YamlShardingSphereAlgorithmConfiguration checkoutLocker;
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
index 6886dbd..9856bc0 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
@@ -28,6 +28,9 @@ public final class 
ShardingSphereAlgorithmConfigurationYamlSwapper implements Ya
     
     @Override
     public YamlShardingSphereAlgorithmConfiguration 
swapToYamlConfiguration(final ShardingSphereAlgorithmConfiguration data) {
+        if (null == data) {
+            return null;
+        }
         YamlShardingSphereAlgorithmConfiguration result = new 
YamlShardingSphereAlgorithmConfiguration();
         result.setType(data.getType());
         result.setProps(data.getProps());
@@ -36,6 +39,9 @@ public final class 
ShardingSphereAlgorithmConfigurationYamlSwapper implements Ya
     
     @Override
     public ShardingSphereAlgorithmConfiguration swapToObject(final 
YamlShardingSphereAlgorithmConfiguration yamlConfig) {
+        if (null == yamlConfig) {
+            return null;
+        }
         return new ShardingSphereAlgorithmConfiguration(yamlConfig.getType(), 
yamlConfig.getProps());
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 1e4b76a..4646303 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -37,7 +37,9 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
         result.setReadBatchSize(data.getReadBatchSize());
         
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
         
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
+        
result.setSourceWritingStopper(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getSourceWritingStopper()));
         
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
+        
result.setCheckoutLocker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCheckoutLocker()));
         return result;
     }
     
@@ -46,6 +48,8 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
         return new 
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(), 
yamlConfig.getWorkerThread(), yamlConfig.getReadBatchSize(),
                 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()),
                 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
-                
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
+                
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getSourceWritingStopper()),
+                
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()),
+                
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCheckoutLocker()));
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index 866f12d..eb9f8bf 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -43,9 +43,13 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapperTest {
         Properties completionDetectorProps = new Properties();
         
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold", 
"30");
         yamlConfig.setCompletionDetector(new 
YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
+        Properties sourceWritingStopperProps = new Properties();
+        yamlConfig.setSourceWritingStopper(new 
YamlShardingSphereAlgorithmConfiguration("DEFAULT", sourceWritingStopperProps));
         Properties dataConsistencyCheckerProps = new Properties();
         dataConsistencyCheckerProps.setProperty("chunk-size", "1000");
         yamlConfig.setDataConsistencyChecker(new 
YamlShardingSphereAlgorithmConfiguration("DATA_MATCH", 
dataConsistencyCheckerProps));
+        Properties checkoutLockerProps = new Properties();
+        yamlConfig.setCheckoutLocker(new 
YamlShardingSphereAlgorithmConfiguration("DEFAULT", checkoutLockerProps));
         OnRuleAlteredActionConfigurationYamlSwapper yamlSwapper = new 
OnRuleAlteredActionConfigurationYamlSwapper();
         OnRuleAlteredActionConfiguration actualConfig = 
yamlSwapper.swapToObject(yamlConfig);
         YamlOnRuleAlteredActionConfiguration actualYamlConfig = 
yamlSwapper.swapToYamlConfiguration(actualConfig);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
index 46cacd0..b0c5558 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 /**
  * Pipeline job API.
  */
+// TODO separate dedicated methods for rule altered job
 public interface PipelineJobAPI extends RequiredSPI {
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 49f8fd8..e670159 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -26,9 +26,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
+import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
 import java.util.List;
 import java.util.Map;
@@ -38,6 +41,11 @@ public final class FinishedCheckJob implements SimpleJob {
     
     private final PipelineJobAPI pipelineJobAPI = 
PipelineJobAPIFactory.getPipelineJobAPI();
     
+    static {
+        
ShardingSphereServiceLoader.register(RuleAlteredSourceWritingStopAlgorithm.class);
+        
ShardingSphereServiceLoader.register(RuleAlteredCheckoutLockAlgorithm.class);
+    }
+    
     @Override
     public void execute(final ShardingContext shardingContext) {
         List<JobInfo> jobInfos = pipelineJobAPI.list();
@@ -57,17 +65,28 @@ public final class FinishedCheckJob implements SimpleJob {
                 if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) 
{
                     log.info("completionDetector not configured, auto switch 
will not be enabled. You could query migration progress and switch manually 
with DistSQL.");
                 }
-                // TODO lock proxy
-                if (!pipelineJobAPI.isDataConsistencyCheckNeeded(jobId)) {
-                    log.info("dataConsistencyCheckAlgorithm is not configured, 
data consistency check is ignored.");
-                    pipelineJobAPI.switchClusterConfiguration(jobId);
-                    continue;
-                }
-                if (!dataConsistencyCheck(jobId)) {
-                    log.error("data consistency check failed, job {}", jobId);
-                    continue;
+                RuleAlteredSourceWritingStopAlgorithm 
sourceWritingStopAlgorithm = ruleAlteredContext.getSourceWritingStopAlgorithm();
+                String schemaName = 
jobConfig.getWorkflowConfig().getSchemaName();
+                try {
+                    if (null != sourceWritingStopAlgorithm) {
+                        
sourceWritingStopAlgorithm.stopSourceWriting(schemaName, jobId + "");
+                    }
+                    if (!pipelineJobAPI.isDataConsistencyCheckNeeded(jobId)) {
+                        log.info("dataConsistencyCheckAlgorithm is not 
configured, data consistency check is ignored.");
+                        pipelineJobAPI.switchClusterConfiguration(jobId);
+                        continue;
+                    }
+                    if (!dataConsistencyCheck(jobId)) {
+                        log.error("data consistency check failed, job {}", 
jobId);
+                        continue;
+                    }
+                    RuleAlteredCheckoutLockAlgorithm checkoutLockAlgorithm = 
ruleAlteredContext.getCheckoutLockAlgorithm();
+                    switchClusterConfiguration(schemaName, jobId, 
checkoutLockAlgorithm);
+                } finally {
+                    if (null != sourceWritingStopAlgorithm) {
+                        
sourceWritingStopAlgorithm.resumeSourceWriting(schemaName, jobId + "");
+                    }
                 }
-                pipelineJobAPI.switchClusterConfiguration(jobId);
                 // CHECKSTYLE:OFF
             } catch (final Exception ex) {
                 // CHECKSTYLE:ON
@@ -80,4 +99,17 @@ public final class FinishedCheckJob implements SimpleJob {
         Map<String, DataConsistencyCheckResult> checkResultMap = 
pipelineJobAPI.dataConsistencyCheck(jobId);
         return pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId, 
checkResultMap);
     }
+    
+    private void switchClusterConfiguration(final String schemaName, final 
long jobId, final RuleAlteredCheckoutLockAlgorithm checkoutLockAlgorithm) {
+        try {
+            if (null != checkoutLockAlgorithm) {
+                checkoutLockAlgorithm.lock(schemaName, jobId + "");
+            }
+            pipelineJobAPI.switchClusterConfiguration(jobId);
+        } finally {
+            if (null != checkoutLockAlgorithm) {
+                checkoutLockAlgorithm.releaseLock(schemaName, jobId + "");
+            }
+        }
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredCheckoutLockAlgorithm.java
similarity index 54%
copy from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredCheckoutLockAlgorithm.java
index 71dfd14..e4dbb74 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredCheckoutLockAlgorithm.java
@@ -15,28 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm;
 
 /**
- * On rule altered action configuration.
+ * Default rule altered checkout lock algorithm.
  */
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public final class DefaultRuleAlteredCheckoutLockAlgorithm implements 
RuleAlteredCheckoutLockAlgorithm {
     
-    private final int blockQueueSize;
+    @Override
+    public void init() {
+    }
     
-    private final int workerThread;
+    // TODO impl default checkoutLockAlgorithm
+    @Override
+    public void lock(final String schemaName, final String jobId) {
+    }
     
-    private final int readBatchSize;
+    @Override
+    public void releaseLock(final String schemaName, final String jobId) {
+    }
     
-    private final ShardingSphereAlgorithmConfiguration rateLimiter;
-    
-    private final ShardingSphereAlgorithmConfiguration completionDetector;
-    
-    private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    @Override
+    public String getType() {
+        return "DEFAULT";
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredSourceWritingStopAlgorithm.java
similarity index 53%
copy from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredSourceWritingStopAlgorithm.java
index 71dfd14..3e64908 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredSourceWritingStopAlgorithm.java
@@ -15,28 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm;
 
 /**
- * On rule altered action configuration.
+ * Default rule altered source writing stop algorithm.
  */
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public final class DefaultRuleAlteredSourceWritingStopAlgorithm implements 
RuleAlteredSourceWritingStopAlgorithm {
     
-    private final int blockQueueSize;
+    @Override
+    public void init() {
+    }
     
-    private final int workerThread;
+    // TODO impl default sourceWritingStopAlgorithm
+    @Override
+    public void stopSourceWriting(final String schemaName, final String jobId) 
{
+    }
     
-    private final int readBatchSize;
+    @Override
+    public void resumeSourceWriting(final String schemaName, final String 
jobId) {
+    }
     
-    private final ShardingSphereAlgorithmConfiguration rateLimiter;
-    
-    private final ShardingSphereAlgorithmConfiguration completionDetector;
-    
-    private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    @Override
+    public String getType() {
+        return "DEFAULT";
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index ea2483f..24ef141 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -22,7 +22,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -51,8 +53,12 @@ public final class RuleAlteredContext {
     
     private final RuleAlteredJobCompletionDetectAlgorithm 
completionDetectAlgorithm;
     
+    private final RuleAlteredSourceWritingStopAlgorithm 
sourceWritingStopAlgorithm;
+    
     private final DataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm;
     
+    private final RuleAlteredCheckoutLockAlgorithm checkoutLockAlgorithm;
+    
     private final ExecuteEngine inventoryDumperExecuteEngine;
     
     private final ExecuteEngine incrementalDumperExecuteEngine;
@@ -73,12 +79,24 @@ public final class RuleAlteredContext {
         } else {
             completionDetectAlgorithm = null;
         }
+        ShardingSphereAlgorithmConfiguration sourceWritingStopper = 
onRuleAlteredActionConfig.getSourceWritingStopper();
+        if (null != sourceWritingStopper) {
+            sourceWritingStopAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(sourceWritingStopper, 
RuleAlteredSourceWritingStopAlgorithm.class);
+        } else {
+            sourceWritingStopAlgorithm = null;
+        }
         ShardingSphereAlgorithmConfiguration dataConsistencyChecker = 
onRuleAlteredActionConfig.getDataConsistencyChecker();
         if (null != dataConsistencyChecker) {
             dataConsistencyCheckAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(dataConsistencyChecker, 
DataConsistencyCheckAlgorithm.class);
         } else {
             dataConsistencyCheckAlgorithm = null;
         }
+        ShardingSphereAlgorithmConfiguration checkoutLocker = 
onRuleAlteredActionConfig.getCheckoutLocker();
+        if (null != checkoutLocker) {
+            checkoutLockAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(checkoutLocker, 
RuleAlteredCheckoutLockAlgorithm.class);
+        } else {
+            checkoutLockAlgorithm = null;
+        }
         inventoryDumperExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
         incrementalDumperExecuteEngine = 
ExecuteEngine.newCachedThreadInstance();
         importerExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredCheckoutLockAlgorithm.java
similarity index 56%
copy from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredCheckoutLockAlgorithm.java
index 71dfd14..805059d 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredCheckoutLockAlgorithm.java
@@ -15,28 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
 /**
- * On rule altered action configuration.
+ * Rule altered checkout lock algorithm, SPI.
  */
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public interface RuleAlteredCheckoutLockAlgorithm extends 
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
     
-    private final int blockQueueSize;
+    /**
+     * Lock to protect checkout (switch configuration).
+     *
+     * @param schemaName schema name
+     * @param jobId job id
+     */
+    void lock(String schemaName, String jobId);
     
-    private final int workerThread;
-    
-    private final int readBatchSize;
-    
-    private final ShardingSphereAlgorithmConfiguration rateLimiter;
-    
-    private final ShardingSphereAlgorithmConfiguration completionDetector;
-    
-    private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    /**
+     * Release lock.
+     *
+     * @param schemaName schema name
+     * @param jobId job id
+     */
+    void releaseLock(String schemaName, String jobId);
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredSourceWritingStopAlgorithm.java
similarity index 55%
copy from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredSourceWritingStopAlgorithm.java
index 71dfd14..ff753b8 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredSourceWritingStopAlgorithm.java
@@ -15,28 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
 /**
- * On rule altered action configuration.
+ * Rule altered source writing stop algorithm, SPI.
  */
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public interface RuleAlteredSourceWritingStopAlgorithm extends 
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
     
-    private final int blockQueueSize;
+    /**
+     * Stop source writing.
+     *
+     * @param schemaName schema name
+     * @param jobId job id
+     */
+    void stopSourceWriting(String schemaName, String jobId);
     
-    private final int workerThread;
-    
-    private final int readBatchSize;
-    
-    private final ShardingSphereAlgorithmConfiguration rateLimiter;
-    
-    private final ShardingSphereAlgorithmConfiguration completionDetector;
-    
-    private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    /**
+     * Resume source writing.
+     *
+     * @param schemaName schema name
+     * @param jobId job id
+     */
+    void resumeSourceWriting(String schemaName, String jobId);
 }
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm
similarity index 67%
copy from 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm
index 42d2df1..8b266ce 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm
@@ -15,21 +15,4 @@
 # limitations under the License.
 #
 
-scalingName: default_scaling
-scaling:
-  default_scaling:
-    blockQueueSize: 10000
-    workerThread: 40
-    readBatchSize: 1000
-    rateLimiter:
-      type: SOURCE
-      props:
-        qps: 50
-    completionDetector:
-      type: IDLE
-      props:
-        incremental-task-idle-minute-threshold: 30
-    dataConsistencyChecker:
-      type: DATA_MATCH
-      props:
-        chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.scenario.rulealtered.DefaultRuleAlteredCheckoutLockAlgorithm
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm
similarity index 67%
copy from 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm
index 42d2df1..38b3ac8 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm
@@ -15,21 +15,4 @@
 # limitations under the License.
 #
 
-scalingName: default_scaling
-scaling:
-  default_scaling:
-    blockQueueSize: 10000
-    workerThread: 40
-    readBatchSize: 1000
-    rateLimiter:
-      type: SOURCE
-      props:
-        qps: 50
-    completionDetector:
-      type: IDLE
-      props:
-        incremental-task-idle-minute-threshold: 30
-    dataConsistencyChecker:
-      type: DATA_MATCH
-      props:
-        chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.scenario.rulealtered.DefaultRuleAlteredSourceWritingStopAlgorithm
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index c22eb44..08fb878 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -108,8 +108,12 @@
 #        type: IDLE
 #        props:
 #          incremental-task-idle-minute-threshold: 30
+#      sourceWritingStopper:
+#        type: DEFAULT
 #      dataConsistencyChecker:
 #        type: DEFAULT
+#      checkoutLocker:
+#        type: DEFAULT
 
 
######################################################################################################
 #

Reply via email to