This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 80cc292 Add sourceWritingStopAlgorithm SPI and DEFAULT impl, add
checkoutLockAlgorithm SPI and DEFAULT impl, add sourceWritingStopper and
checkoutLocker in config (#14232)
80cc292 is described below
commit 80cc29217f9d242a34a30f4a037a7028a448d06c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 22 19:17:24 2021 +0800
Add sourceWritingStopAlgorithm SPI and DEFAULT impl, add
checkoutLockAlgorithm SPI and DEFAULT impl, add sourceWritingStopper and
checkoutLocker in config (#14232)
* Add sourceWritingStopper in config
* Add sourceWritingStopAlgorithm SPI and DEFAULT impl
* Add checkoutLocker in config
* Add checkoutLockAlgorithm SPI and DEFAULT impl
* Unit test
---
.../resources/yaml/encrypt-dataConverters.yaml | 4 ++
.../src/test/resources/yaml/sharding-rule.yaml | 8 ++++
.../src/test/resources/yaml/sharding-scaling.yaml | 4 ++
.../OnRuleAlteredActionConfiguration.java | 4 ++
.../YamlOnRuleAlteredActionConfiguration.java | 4 ++
...ingSphereAlgorithmConfigurationYamlSwapper.java | 6 +++
...nRuleAlteredActionConfigurationYamlSwapper.java | 6 ++-
...eAlteredActionConfigurationYamlSwapperTest.java | 4 ++
.../data/pipeline/api/PipelineJobAPI.java | 1 +
.../data/pipeline/core/job/FinishedCheckJob.java | 52 +++++++++++++++++-----
.../DefaultRuleAlteredCheckoutLockAlgorithm.java | 34 +++++++-------
...faultRuleAlteredSourceWritingStopAlgorithm.java | 34 +++++++-------
.../scenario/rulealtered/RuleAlteredContext.java | 18 ++++++++
.../RuleAlteredCheckoutLockAlgorithm.java | 37 +++++++--------
.../RuleAlteredSourceWritingStopAlgorithm.java | 37 +++++++--------
...pi.rulealtered.RuleAlteredCheckoutLockAlgorithm | 19 +-------
...lealtered.RuleAlteredSourceWritingStopAlgorithm | 19 +-------
.../src/main/resources/conf/config-sharding.yaml | 4 ++
18 files changed, 180 insertions(+), 115 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index eb06d0c..63761f4 100644
---
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -29,7 +29,11 @@ dataConverters:
type: IDLE
props:
incremental-task-idle-minute-threshold: 1
+ sourceWritingStopper:
+ type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 10000
+ checkoutLocker:
+ type: DEFAULT
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 7e8ecc4..c768f47 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -111,14 +111,22 @@ rules:
blockQueueSize: 10000
workerThread: 40
readBatchSize: 1000
+ rateLimiter:
+ type: SOURCE
+ props:
+ qps: 50
completionDetector:
type: IDLE
props:
incremental-task-idle-minute-threshold: 30
+ sourceWritingStopper:
+ type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 1000
+ checkoutLocker:
+ type: DEFAULT
props:
sql-show: true
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 42d2df1..40fed15 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -29,7 +29,11 @@ scaling:
type: IDLE
props:
incremental-task-idle-minute-threshold: 30
+ sourceWritingStopper:
+ type: DEFAULT
dataConsistencyChecker:
type: DATA_MATCH
props:
chunk-size: 1000
+ checkoutLocker:
+ type: DEFAULT
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 71dfd14..29acf05 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -38,5 +38,9 @@ public final class OnRuleAlteredActionConfiguration {
private final ShardingSphereAlgorithmConfiguration completionDetector;
+ private final ShardingSphereAlgorithmConfiguration sourceWritingStopper;
+
private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+
+ private final ShardingSphereAlgorithmConfiguration checkoutLocker;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index b909fc4..dfc5ab8 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -39,5 +39,9 @@ public final class YamlOnRuleAlteredActionConfiguration
implements YamlConfigura
private YamlShardingSphereAlgorithmConfiguration completionDetector;
+ private YamlShardingSphereAlgorithmConfiguration sourceWritingStopper;
+
private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+
+ private YamlShardingSphereAlgorithmConfiguration checkoutLocker;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
index 6886dbd..9856bc0 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/algorithm/ShardingSphereAlgorithmConfigurationYamlSwapper.java
@@ -28,6 +28,9 @@ public final class
ShardingSphereAlgorithmConfigurationYamlSwapper implements Ya
@Override
public YamlShardingSphereAlgorithmConfiguration
swapToYamlConfiguration(final ShardingSphereAlgorithmConfiguration data) {
+ if (null == data) {
+ return null;
+ }
YamlShardingSphereAlgorithmConfiguration result = new
YamlShardingSphereAlgorithmConfiguration();
result.setType(data.getType());
result.setProps(data.getProps());
@@ -36,6 +39,9 @@ public final class
ShardingSphereAlgorithmConfigurationYamlSwapper implements Ya
@Override
public ShardingSphereAlgorithmConfiguration swapToObject(final
YamlShardingSphereAlgorithmConfiguration yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
return new ShardingSphereAlgorithmConfiguration(yamlConfig.getType(),
yamlConfig.getProps());
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 1e4b76a..4646303 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -37,7 +37,9 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
result.setReadBatchSize(data.getReadBatchSize());
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
+
result.setSourceWritingStopper(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getSourceWritingStopper()));
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
+
result.setCheckoutLocker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCheckoutLocker()));
return result;
}
@@ -46,6 +48,8 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
return new
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
yamlConfig.getWorkerThread(), yamlConfig.getReadBatchSize(),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
-
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
+
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getSourceWritingStopper()),
+
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()),
+
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCheckoutLocker()));
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index 866f12d..eb9f8bf 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -43,9 +43,13 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapperTest {
Properties completionDetectorProps = new Properties();
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold",
"30");
yamlConfig.setCompletionDetector(new
YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
+ Properties sourceWritingStopperProps = new Properties();
+ yamlConfig.setSourceWritingStopper(new
YamlShardingSphereAlgorithmConfiguration("DEFAULT", sourceWritingStopperProps));
Properties dataConsistencyCheckerProps = new Properties();
dataConsistencyCheckerProps.setProperty("chunk-size", "1000");
yamlConfig.setDataConsistencyChecker(new
YamlShardingSphereAlgorithmConfiguration("DATA_MATCH",
dataConsistencyCheckerProps));
+ Properties checkoutLockerProps = new Properties();
+ yamlConfig.setCheckoutLocker(new
YamlShardingSphereAlgorithmConfiguration("DEFAULT", checkoutLockerProps));
OnRuleAlteredActionConfigurationYamlSwapper yamlSwapper = new
OnRuleAlteredActionConfigurationYamlSwapper();
OnRuleAlteredActionConfiguration actualConfig =
yamlSwapper.swapToObject(yamlConfig);
YamlOnRuleAlteredActionConfiguration actualYamlConfig =
yamlSwapper.swapToYamlConfiguration(actualConfig);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
index 46cacd0..b0c5558 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
@@ -33,6 +33,7 @@ import java.util.Optional;
/**
* Pipeline job API.
*/
+// TODO separate dedicated methods for rule altered job
public interface PipelineJobAPI extends RequiredSPI {
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 49f8fd8..e670159 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -26,9 +26,12 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
+import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import java.util.List;
import java.util.Map;
@@ -38,6 +41,11 @@ public final class FinishedCheckJob implements SimpleJob {
private final PipelineJobAPI pipelineJobAPI =
PipelineJobAPIFactory.getPipelineJobAPI();
+ static {
+
ShardingSphereServiceLoader.register(RuleAlteredSourceWritingStopAlgorithm.class);
+
ShardingSphereServiceLoader.register(RuleAlteredCheckoutLockAlgorithm.class);
+ }
+
@Override
public void execute(final ShardingContext shardingContext) {
List<JobInfo> jobInfos = pipelineJobAPI.list();
@@ -57,17 +65,28 @@ public final class FinishedCheckJob implements SimpleJob {
if (null == ruleAlteredContext.getCompletionDetectAlgorithm())
{
log.info("completionDetector not configured, auto switch
will not be enabled. You could query migration progress and switch manually
with DistSQL.");
}
- // TODO lock proxy
- if (!pipelineJobAPI.isDataConsistencyCheckNeeded(jobId)) {
- log.info("dataConsistencyCheckAlgorithm is not configured,
data consistency check is ignored.");
- pipelineJobAPI.switchClusterConfiguration(jobId);
- continue;
- }
- if (!dataConsistencyCheck(jobId)) {
- log.error("data consistency check failed, job {}", jobId);
- continue;
+ RuleAlteredSourceWritingStopAlgorithm
sourceWritingStopAlgorithm = ruleAlteredContext.getSourceWritingStopAlgorithm();
+ String schemaName =
jobConfig.getWorkflowConfig().getSchemaName();
+ try {
+ if (null != sourceWritingStopAlgorithm) {
+
sourceWritingStopAlgorithm.stopSourceWriting(schemaName, jobId + "");
+ }
+ if (!pipelineJobAPI.isDataConsistencyCheckNeeded(jobId)) {
+ log.info("dataConsistencyCheckAlgorithm is not
configured, data consistency check is ignored.");
+ pipelineJobAPI.switchClusterConfiguration(jobId);
+ continue;
+ }
+ if (!dataConsistencyCheck(jobId)) {
+ log.error("data consistency check failed, job {}",
jobId);
+ continue;
+ }
+ RuleAlteredCheckoutLockAlgorithm checkoutLockAlgorithm =
ruleAlteredContext.getCheckoutLockAlgorithm();
+ switchClusterConfiguration(schemaName, jobId,
checkoutLockAlgorithm);
+ } finally {
+ if (null != sourceWritingStopAlgorithm) {
+
sourceWritingStopAlgorithm.resumeSourceWriting(schemaName, jobId + "");
+ }
}
- pipelineJobAPI.switchClusterConfiguration(jobId);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -80,4 +99,17 @@ public final class FinishedCheckJob implements SimpleJob {
Map<String, DataConsistencyCheckResult> checkResultMap =
pipelineJobAPI.dataConsistencyCheck(jobId);
return pipelineJobAPI.aggregateDataConsistencyCheckResults(jobId,
checkResultMap);
}
+
+ private void switchClusterConfiguration(final String schemaName, final
long jobId, final RuleAlteredCheckoutLockAlgorithm checkoutLockAlgorithm) {
+ try {
+ if (null != checkoutLockAlgorithm) {
+ checkoutLockAlgorithm.lock(schemaName, jobId + "");
+ }
+ pipelineJobAPI.switchClusterConfiguration(jobId);
+ } finally {
+ if (null != checkoutLockAlgorithm) {
+ checkoutLockAlgorithm.releaseLock(schemaName, jobId + "");
+ }
+ }
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredCheckoutLockAlgorithm.java
similarity index 54%
copy from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredCheckoutLockAlgorithm.java
index 71dfd14..e4dbb74 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredCheckoutLockAlgorithm.java
@@ -15,28 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm;
/**
- * On rule altered action configuration.
+ * Default rule altered checkout lock algorithm.
*/
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public final class DefaultRuleAlteredCheckoutLockAlgorithm implements
RuleAlteredCheckoutLockAlgorithm {
- private final int blockQueueSize;
+ @Override
+ public void init() {
+ }
- private final int workerThread;
+ // TODO impl default checkoutLockAlgorithm
+ @Override
+ public void lock(final String schemaName, final String jobId) {
+ }
- private final int readBatchSize;
+ @Override
+ public void releaseLock(final String schemaName, final String jobId) {
+ }
- private final ShardingSphereAlgorithmConfiguration rateLimiter;
-
- private final ShardingSphereAlgorithmConfiguration completionDetector;
-
- private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+ @Override
+ public String getType() {
+ return "DEFAULT";
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredSourceWritingStopAlgorithm.java
similarity index 53%
copy from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredSourceWritingStopAlgorithm.java
index 71dfd14..3e64908 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/DefaultRuleAlteredSourceWritingStopAlgorithm.java
@@ -15,28 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm;
/**
- * On rule altered action configuration.
+ * Default rule altered source writing stop algorithm.
*/
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public final class DefaultRuleAlteredSourceWritingStopAlgorithm implements
RuleAlteredSourceWritingStopAlgorithm {
- private final int blockQueueSize;
+ @Override
+ public void init() {
+ }
- private final int workerThread;
+ // TODO impl default sourceWritingStopAlgorithm
+ @Override
+ public void stopSourceWriting(final String schemaName, final String jobId)
{
+ }
- private final int readBatchSize;
+ @Override
+ public void resumeSourceWriting(final String schemaName, final String
jobId) {
+ }
- private final ShardingSphereAlgorithmConfiguration rateLimiter;
-
- private final ShardingSphereAlgorithmConfiguration completionDetector;
-
- private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+ @Override
+ public String getType() {
+ return "DEFAULT";
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index ea2483f..24ef141 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -22,7 +22,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -51,8 +53,12 @@ public final class RuleAlteredContext {
private final RuleAlteredJobCompletionDetectAlgorithm
completionDetectAlgorithm;
+ private final RuleAlteredSourceWritingStopAlgorithm
sourceWritingStopAlgorithm;
+
private final DataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm;
+ private final RuleAlteredCheckoutLockAlgorithm checkoutLockAlgorithm;
+
private final ExecuteEngine inventoryDumperExecuteEngine;
private final ExecuteEngine incrementalDumperExecuteEngine;
@@ -73,12 +79,24 @@ public final class RuleAlteredContext {
} else {
completionDetectAlgorithm = null;
}
+ ShardingSphereAlgorithmConfiguration sourceWritingStopper =
onRuleAlteredActionConfig.getSourceWritingStopper();
+ if (null != sourceWritingStopper) {
+ sourceWritingStopAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(sourceWritingStopper,
RuleAlteredSourceWritingStopAlgorithm.class);
+ } else {
+ sourceWritingStopAlgorithm = null;
+ }
ShardingSphereAlgorithmConfiguration dataConsistencyChecker =
onRuleAlteredActionConfig.getDataConsistencyChecker();
if (null != dataConsistencyChecker) {
dataConsistencyCheckAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(dataConsistencyChecker,
DataConsistencyCheckAlgorithm.class);
} else {
dataConsistencyCheckAlgorithm = null;
}
+ ShardingSphereAlgorithmConfiguration checkoutLocker =
onRuleAlteredActionConfig.getCheckoutLocker();
+ if (null != checkoutLocker) {
+ checkoutLockAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(checkoutLocker,
RuleAlteredCheckoutLockAlgorithm.class);
+ } else {
+ checkoutLockAlgorithm = null;
+ }
inventoryDumperExecuteEngine =
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
incrementalDumperExecuteEngine =
ExecuteEngine.newCachedThreadInstance();
importerExecuteEngine =
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredCheckoutLockAlgorithm.java
similarity index 56%
copy from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredCheckoutLockAlgorithm.java
index 71dfd14..805059d 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredCheckoutLockAlgorithm.java
@@ -15,28 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
/**
- * On rule altered action configuration.
+ * Rule altered checkout lock algorithm, SPI.
*/
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public interface RuleAlteredCheckoutLockAlgorithm extends
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
- private final int blockQueueSize;
+ /**
+ * Lock to protect checkout (switch configuration).
+ *
+ * @param schemaName schema name
+ * @param jobId job id
+ */
+ void lock(String schemaName, String jobId);
- private final int workerThread;
-
- private final int readBatchSize;
-
- private final ShardingSphereAlgorithmConfiguration rateLimiter;
-
- private final ShardingSphereAlgorithmConfiguration completionDetector;
-
- private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+ /**
+ * Release lock.
+ *
+ * @param schemaName schema name
+ * @param jobId job id
+ */
+ void releaseLock(String schemaName, String jobId);
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredSourceWritingStopAlgorithm.java
similarity index 55%
copy from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredSourceWritingStopAlgorithm.java
index 71dfd14..ff753b8 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredSourceWritingStopAlgorithm.java
@@ -15,28 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
/**
- * On rule altered action configuration.
+ * Rule altered source writing stop algorithm, SPI.
*/
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public interface RuleAlteredSourceWritingStopAlgorithm extends
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
- private final int blockQueueSize;
+ /**
+ * Stop source writing.
+ *
+ * @param schemaName schema name
+ * @param jobId job id
+ */
+ void stopSourceWriting(String schemaName, String jobId);
- private final int workerThread;
-
- private final int readBatchSize;
-
- private final ShardingSphereAlgorithmConfiguration rateLimiter;
-
- private final ShardingSphereAlgorithmConfiguration completionDetector;
-
- private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+ /**
+ * Resume source writing.
+ *
+ * @param schemaName schema name
+ * @param jobId job id
+ */
+ void resumeSourceWriting(String schemaName, String jobId);
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm
similarity index 67%
copy from
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm
index 42d2df1..8b266ce 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredCheckoutLockAlgorithm
@@ -15,21 +15,4 @@
# limitations under the License.
#
-scalingName: default_scaling
-scaling:
- default_scaling:
- blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
- completionDetector:
- type: IDLE
- props:
- incremental-task-idle-minute-threshold: 30
- dataConsistencyChecker:
- type: DATA_MATCH
- props:
- chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.scenario.rulealtered.DefaultRuleAlteredCheckoutLockAlgorithm
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm
similarity index 67%
copy from
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm
index 42d2df1..38b3ac8 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredSourceWritingStopAlgorithm
@@ -15,21 +15,4 @@
# limitations under the License.
#
-scalingName: default_scaling
-scaling:
- default_scaling:
- blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
- completionDetector:
- type: IDLE
- props:
- incremental-task-idle-minute-threshold: 30
- dataConsistencyChecker:
- type: DATA_MATCH
- props:
- chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.scenario.rulealtered.DefaultRuleAlteredSourceWritingStopAlgorithm
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index c22eb44..08fb878 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -108,8 +108,12 @@
# type: IDLE
# props:
# incremental-task-idle-minute-threshold: 30
+# sourceWritingStopper:
+# type: DEFAULT
# dataConsistencyChecker:
# type: DEFAULT
+# checkoutLocker:
+# type: DEFAULT
######################################################################################################
#