This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b4d9801 Add input and output in scaling config; Refactor
JobRateLimitAlgorithm SPI; Add rateLimiter TPS impl (#14621)
b4d9801 is described below
commit b4d9801fc7526c5d2f259996c9775376963c42de
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Jan 8 13:19:03 2022 +0800
Add input and output in scaling config; Refactor JobRateLimitAlgorithm SPI;
Add rateLimiter TPS impl (#14621)
---
docs/document/content/dev-manual/scaling.cn.md | 10 ----
docs/document/content/dev-manual/scaling.en.md | 10 ----
.../user-manual/shardingsphere-scaling/build.cn.md | 40 ++++++++++----
.../user-manual/shardingsphere-scaling/build.en.md | 40 ++++++++++----
.../resources/yaml/encrypt-dataConverters.yaml | 20 +++++--
.../src/test/resources/yaml/sharding-rule.yaml | 20 +++++--
.../src/test/resources/yaml/sharding-scaling.yaml | 20 +++++--
.../OnRuleAlteredActionConfiguration.java | 32 +++++++++--
.../YamlOnRuleAlteredActionConfiguration.java | 29 ++++++++--
...nRuleAlteredActionConfigurationYamlSwapper.java | 64 ++++++++++++++++++++--
...eAlteredActionConfigurationYamlSwapperTest.java | 15 ++++-
.../consistency/DataConsistencyCheckerImpl.java | 7 ++-
.../ingest/dumper/AbstractInventoryDumper.java | 9 +--
.../core/prepare/InventoryTaskSplitter.java | 13 ++++-
...lgorithm.java => QPSJobRateLimitAlgorithm.java} | 14 +++--
...lgorithm.java => TPSJobRateLimitAlgorithm.java} | 31 +++++++----
.../scenario/rulealtered/RuleAlteredContext.java | 42 ++++++++------
...ta.pipeline.spi.ratelimit.JobRateLimitAlgorithm | 3 +-
.../ingest/InventoryDumperConfiguration.java | 2 +-
.../job/JobOperationType.java} | 18 +++---
.../spi/ratelimit/JobRateLimitAlgorithm.java | 8 ++-
.../src/main/resources/conf/config-sharding.yaml | 40 ++++++++++----
.../test/resources/docker/scaling/conf/server.yaml | 5 --
.../config_sharding_sphere_jdbc_source.yaml | 20 +++++--
.../config_sharding_sphere_jdbc_target.yaml | 20 +++++--
25 files changed, 368 insertions(+), 164 deletions(-)
diff --git a/docs/document/content/dev-manual/scaling.cn.md
b/docs/document/content/dev-manual/scaling.cn.md
index 6d5c1b9..b2377d5 100644
--- a/docs/document/content/dev-manual/scaling.cn.md
+++ b/docs/document/content/dev-manual/scaling.cn.md
@@ -17,16 +17,6 @@ chapter = true
| PostgreSQLScalingEntry | 基于 PostgreSQL 的弹性伸缩入口 |
| OpenGaussScalingEntry | 基于 openGauss 的弹性伸缩入口 |
-## JobRateLimitAlgorithm
-
-| *SPI 名称* | *详细说明*
|
-| ------------------------------------------- |
------------------------------------------- |
-| JobRateLimitAlgorithm | 任务限流算法
|
-
-| *已知实现类* | *详细说明*
|
-| ------------------------------------------- |
------------------------------------------- |
-| SourceJobRateLimitAlgorithm | 源端限流算法
|
-
## JobCompletionDetectAlgorithm
| *SPI 名称* | *详细说明*
|
diff --git a/docs/document/content/dev-manual/scaling.en.md
b/docs/document/content/dev-manual/scaling.en.md
index d3cbe5e..202fb43 100644
--- a/docs/document/content/dev-manual/scaling.en.md
+++ b/docs/document/content/dev-manual/scaling.en.md
@@ -17,16 +17,6 @@ chapter = true
| PostgreSQLScalingEntry | PostgreSQL entry of scaling |
| OpenGaussScalingEntry | openGauss entry of scaling |
-## JobRateLimitAlgorithm
-
-| *SPI Name* | *Description*
|
-| ------------------------------------------- |
------------------------------------------- |
-| JobRateLimitAlgorithm | job rate limit algorithm
|
-
-| *Implementation Class* | *Description*
|
-| ------------------------------------------- |
------------------------------------------- |
-| SourceJobRateLimitAlgorithm | rate limit algorithm for
source side |
-
## JobCompletionDetectAlgorithm
| *SPI Name* | *Description*
|
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
index 4a99571..50547dc 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
@@ -54,12 +54,20 @@ rules:
scaling:
<scaling-action-config-name> (+):
blockQueueSize: # 数据通道阻塞队列大小
- workerThread: # 给全量数据摄取和数据导入使用的工作线程池大小
- readBatchSize: # 一次查询操作返回的最大记录数
- rateLimiter: # 限流算法
- type: # 算法类型。可选项:SOURCE
- props: # 算法属性
- qps: # QPS属性。适用算法类型:SOURCE
+ input:
+ workerThread: # 从源端摄取全量数据的线程池大小
+ batchSize: # 一次查询操作返回的最大记录数
+ rateLimiter: # 限流算法
+ type: # 算法类型。可选项:QPS
+ props: # 算法属性
+ qps: # qps属性。适用算法类型:QPS
+ output:
+ workerThread: # 数据导入到目标端的线程池大小
+ batchSize: # 一次批量写入操作的最大记录数
+ rateLimiter: # 限流算法
+ type: # 算法类型。可选项:TPS
+ props: # 算法属性
+ tps: # tps属性。适用算法类型:TPS
completionDetector: # 作业是否接近完成检测算法。如果不配置,那么系统无法自动进行后续步骤,可以通过 DistSQL
手动操作。
type: # 算法类型。可选项:IDLE
props: # 算法属性
@@ -80,12 +88,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
+ input:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: QPS
+ props:
+ qps: 50
+ output:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: TPS
+ props:
+ tps: 2000
completionDetector:
type: IDLE
props:
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
index c41276d..3fe6b41 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
@@ -53,12 +53,20 @@ rules:
scaling:
<scaling-action-config-name> (+):
blockQueueSize: # Data channel blocking queue size
- workerThread: # Worker thread pool size for inventory data ingestion and
data importing
- readBatchSize: # Maximum records count of a query operation returning
- rateLimiter: # Rate limit algorithm
- type: # Algorithm type. Options: SOURCE
- props: # Algorithm properties
- qps: # QPS property. Available for types: SOURCE
+ input:
+ workerThread: # Worker thread pool size for inventory data ingestion
from source
+ batchSize: # Maximum records count of a DML select operation
+ rateLimiter: # Rate limit algorithm
+ type: # Algorithm type. Options: QPS
+ props: # Algorithm properties
+ qps: # QPS property. Available for types: QPS
+ output:
+ workerThread: # Worker thread pool size for data importing to target
+ batchSize: # Maximum records count of a DML insert/delete/update
operation
+ rateLimiter: # Rate limit algorithm
+ type: # Algorithm type. Options: TPS
+ props: # Algorithm properties
+ tps: # TPS property. Available for types: TPS
completionDetector: # Completion detect algorithm. If it's not
configured, then system won't continue to do next steps automatically.
type: # Algorithm type. Options: IDLE
props: # Algorithm properties
@@ -79,12 +87,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
+ input:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: QPS
+ props:
+ qps: 50
+ output:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: TPS
+ props:
+ tps: 2000
completionDetector:
type: IDLE
props:
diff --git
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index 8767f19..39d44da 100644
---
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -19,12 +19,20 @@ dataConverterName: default_convert
dataConverters:
default_convert:
blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
+ input:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: QPS
+ props:
+ qps: 50
+ output:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: TPS
+ props:
+ tps: 2000
completionDetector:
type: IDLE
props:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 4a55b94..4dd83cf 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -109,12 +109,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
+ input:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: QPS
+ props:
+ qps: 50
+ output:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: TPS
+ props:
+ tps: 2000
completionDetector:
type: IDLE
props:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 42d2df1..bb8027d 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -19,12 +19,20 @@ scalingName: default_scaling
scaling:
default_scaling:
blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
+ input:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: QPS
+ props:
+ qps: 50
+ output:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: TPS
+ props:
+ tps: 2000
completionDetector:
type: IDLE
props:
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 71dfd14..07a82a1 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.infra.config.rulealtered;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.ToString;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
/**
@@ -26,17 +27,40 @@ import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
*/
@RequiredArgsConstructor
@Getter
+@ToString
public final class OnRuleAlteredActionConfiguration {
private final int blockQueueSize;
- private final int workerThread;
+ private final InputConfiguration input;
- private final int readBatchSize;
-
- private final ShardingSphereAlgorithmConfiguration rateLimiter;
+ private final OutputConfiguration output;
private final ShardingSphereAlgorithmConfiguration completionDetector;
private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+
+ @RequiredArgsConstructor
+ @Getter
+ @ToString
+ public static final class InputConfiguration {
+
+ private final int workerThread;
+
+ private final int batchSize;
+
+ private final ShardingSphereAlgorithmConfiguration rateLimiter;
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ @ToString
+ public static final class OutputConfiguration {
+
+ private final int workerThread;
+
+ private final int batchSize;
+
+ private final ShardingSphereAlgorithmConfiguration rateLimiter;
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index b909fc4..4934ea8 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -17,8 +17,10 @@
package org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered;
+import lombok.Data;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSphereAlgorithmConfiguration;
@@ -27,17 +29,36 @@ import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSp
*/
@Getter
@Setter
+@ToString
public final class YamlOnRuleAlteredActionConfiguration implements
YamlConfiguration {
private int blockQueueSize = 10000;
- private int workerThread = 40;
+ private YamlInputConfiguration input;
- private int readBatchSize = 1000;
-
- private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+ private YamlOutputConfiguration output;
private YamlShardingSphereAlgorithmConfiguration completionDetector;
private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+
+ @Data
+ public static final class YamlInputConfiguration implements
YamlConfiguration {
+
+ private int workerThread = 40;
+
+ private int batchSize = 1000;
+
+ private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+ }
+
+ @Data
+ public static final class YamlOutputConfiguration implements
YamlConfiguration {
+
+ private int workerThread = 40;
+
+ private int batchSize = 1000;
+
+ private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 8ee36f6..11e959e 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -18,7 +18,11 @@
package org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered;
import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.ShardingSphereAlgorithmConfigurationYamlSwapper;
@@ -29,6 +33,10 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
private static final ShardingSphereAlgorithmConfigurationYamlSwapper
ALGORITHM_CONFIG_YAML_SWAPPER = new
ShardingSphereAlgorithmConfigurationYamlSwapper();
+ private static final InputConfigurationSwapper INPUT_CONFIG_SWAPPER = new
InputConfigurationSwapper();
+
+ private static final OutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER =
new OutputConfigurationSwapper();
+
@Override
public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final
OnRuleAlteredActionConfiguration data) {
if (null == data) {
@@ -36,9 +44,8 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
}
YamlOnRuleAlteredActionConfiguration result = new
YamlOnRuleAlteredActionConfiguration();
result.setBlockQueueSize(data.getBlockQueueSize());
- result.setWorkerThread(data.getWorkerThread());
- result.setReadBatchSize(data.getReadBatchSize());
-
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+
result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
+
result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
return result;
@@ -49,9 +56,56 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
if (null == yamlConfig) {
return null;
}
- return new
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
yamlConfig.getWorkerThread(), yamlConfig.getReadBatchSize(),
-
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()),
+ return new
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
+ INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
+ OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
}
+
+ public static class InputConfigurationSwapper implements
YamlConfigurationSwapper<YamlInputConfiguration, InputConfiguration> {
+
+ @Override
+ public YamlInputConfiguration swapToYamlConfiguration(final
InputConfiguration data) {
+ if (null == data) {
+ return null;
+ }
+ YamlInputConfiguration result = new YamlInputConfiguration();
+ result.setWorkerThread(data.getWorkerThread());
+ result.setBatchSize(data.getBatchSize());
+
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+ return result;
+ }
+
+ @Override
+ public InputConfiguration swapToObject(final YamlInputConfiguration
yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
+ return new InputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+ }
+ }
+
+ public static class OutputConfigurationSwapper implements
YamlConfigurationSwapper<YamlOutputConfiguration, OutputConfiguration> {
+
+ @Override
+ public YamlOutputConfiguration swapToYamlConfiguration(final
OutputConfiguration data) {
+ if (null == data) {
+ return null;
+ }
+ YamlOutputConfiguration result = new YamlOutputConfiguration();
+ result.setWorkerThread(data.getWorkerThread());
+ result.setBatchSize(data.getBatchSize());
+
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+ return result;
+ }
+
+ @Override
+ public OutputConfiguration swapToObject(final YamlOutputConfiguration
yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
+ return new OutputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+ }
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index bc0d82d..25b9f8f 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered;
import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSphereAlgorithmConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.junit.Test;
@@ -37,12 +39,19 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapperTest {
public void assertSwap() {
YamlOnRuleAlteredActionConfiguration yamlConfig = new
YamlOnRuleAlteredActionConfiguration();
yamlConfig.setBlockQueueSize(1000);
- yamlConfig.setWorkerThread(20);
- yamlConfig.setReadBatchSize(100);
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
- yamlConfig.setRateLimiter(new
YamlShardingSphereAlgorithmConfiguration("SOURCE", rateLimiterProps));
+ YamlInputConfiguration yamlInputConfig = new YamlInputConfiguration();
+ yamlInputConfig.setWorkerThread(40);
+ yamlInputConfig.setBatchSize(1000);
+ yamlInputConfig.setRateLimiter(new
YamlShardingSphereAlgorithmConfiguration("INPUT", rateLimiterProps));
+ yamlConfig.setInput(yamlInputConfig);
+ YamlOutputConfiguration yamlOutputConfig = new
YamlOutputConfiguration();
+ yamlOutputConfig.setWorkerThread(40);
+ yamlOutputConfig.setBatchSize(1000);
+ yamlOutputConfig.setRateLimiter(new
YamlShardingSphereAlgorithmConfiguration("OUTPUT", rateLimiterProps));
+ yamlConfig.setOutput(yamlOutputConfig);
Properties completionDetectorProps = new Properties();
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold",
"30");
yamlConfig.setCompletionDetector(new
YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 7f0d8cc..565d605 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
@@ -143,7 +144,7 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
Map<String, Boolean> result = new HashMap<>();
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job"
+ getJobIdPrefix(jobContext.getJobId()) + "-dataCheck-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- JobRateLimitAlgorithm rateLimitAlgorithm =
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
+ JobRateLimitAlgorithm inputRateLimitAlgorithm =
jobContext.getRuleAlteredContext().getInputRateLimitAlgorithm();
try (PipelineDataSourceWrapper sourceDataSource =
dataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource =
dataSourceFactory.newInstance(targetDataSourceConfig)) {
Map<String, TableMetaData> tableMetaDataMap =
getTableMetaDataMap(jobContext.getJobConfig().getWorkflowConfig().getSchemaName());
@@ -165,8 +166,8 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
Iterator<Object> targetCalculatedResultIterator =
targetCalculator.calculate(targetCalculateParameter).iterator();
boolean calculateResultsEquals = true;
while (sourceCalculatedResultIterator.hasNext() &&
targetCalculatedResultIterator.hasNext()) {
- if (null != rateLimitAlgorithm) {
- rateLimitAlgorithm.onQuery();
+ if (null != inputRateLimitAlgorithm) {
+
inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
Future<Object> sourceFuture =
executor.submit(sourceCalculatedResultIterator::next);
Future<Object> targetFuture =
executor.submit(targetCalculatedResultIterator::next);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 8a7103c..6860835 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineMetaDataManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -58,7 +59,7 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
@Getter(AccessLevel.PROTECTED)
private final InventoryDumperConfiguration inventoryDumperConfig;
- private final int readBatchSize;
+ private final int batchSize;
private final JobRateLimitAlgorithm rateLimitAlgorithm;
@@ -74,7 +75,7 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
throw new UnsupportedOperationException("AbstractInventoryDumper
only support StandardPipelineDataSourceConfiguration");
}
this.inventoryDumperConfig = inventoryDumperConfig;
- this.readBatchSize = inventoryDumperConfig.getReadBatchSize();
+ this.batchSize = inventoryDumperConfig.getBatchSize();
this.rateLimitAlgorithm =
inventoryDumperConfig.getRateLimitAlgorithm();
this.dataSourceManager = dataSourceManager;
tableMetaData = createTableMetaData();
@@ -121,12 +122,12 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
private Optional<Number> dump0(final Connection conn, final String sql,
final Number startUniqueKeyValue) throws SQLException {
if (null != rateLimitAlgorithm) {
- rateLimitAlgorithm.onQuery();
+ rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
try (PreparedStatement preparedStatement =
createPreparedStatement(conn, sql)) {
preparedStatement.setObject(1, startUniqueKeyValue);
preparedStatement.setObject(2,
getPositionEndValue(inventoryDumperConfig.getPosition()));
- preparedStatement.setInt(3, readBatchSize);
+ preparedStatement.setInt(3, batchSize);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
ResultSetMetaData metaData = resultSet.getMetaData();
int rowCount = 0;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 67ce4cc..b7a1b03 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -33,8 +33,11 @@ import
org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
import
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
import javax.sql.DataSource;
@@ -99,8 +102,12 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperConfiguration> splitByPrimaryKey(
final RuleAlteredJobContext jobContext, final DataSource
dataSource, final PipelineMetaDataManager metaDataManager, final
InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
- int readBatchSize =
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getReadBatchSize();
- JobRateLimitAlgorithm rateLimitAlgorithm =
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
+ InputConfiguration inputConfig =
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getInput();
+ if (null == inputConfig) {
+ inputConfig = new InputConfigurationSwapper().swapToObject(new
YamlInputConfiguration());
+ }
+ int batchSize = inputConfig.getBatchSize();
+ JobRateLimitAlgorithm rateLimitAlgorithm =
jobContext.getRuleAlteredContext().getInputRateLimitAlgorithm();
Collection<IngestPosition<?>> inventoryPositions =
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataManager);
int i = 0;
for (IngestPosition<?> inventoryPosition : inventoryPositions) {
@@ -109,7 +116,7 @@ public final class InventoryTaskSplitter {
splitDumperConfig.setShardingItem(i++);
splitDumperConfig.setTableName(dumperConfig.getTableName());
splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
- splitDumperConfig.setReadBatchSize(readBatchSize);
+ splitDumperConfig.setBatchSize(batchSize);
splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
result.add(splitDumperConfig);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
similarity index 80%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
index bdbf12a..0e1dc3c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
@@ -21,14 +21,15 @@ import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Getter;
import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import java.util.Properties;
/**
- * Source rule altered job rate limit algorithm for SPI.
+ * QPS job rate limit algorithm for SPI.
*/
-public final class SourceJobRateLimitAlgorithm implements
JobRateLimitAlgorithm {
+public final class QPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
private static final String QPS_KEY = "qps";
@@ -51,16 +52,19 @@ public final class SourceJobRateLimitAlgorithm implements
JobRateLimitAlgorithm
@Override
public String getType() {
- return "SOURCE";
+ return "QPS";
}
@Override
- public void onQuery() {
+ public void intercept(final JobOperationType type, final Number data) {
+ if (type != JobOperationType.SELECT) {
+ return;
+ }
rateLimiter.acquire();
}
@Override
public String toString() {
- return "SourceJobRateLimitAlgorithm{" + "props=" + props + '}';
+ return "QPSJobRateLimitAlgorithm{" + "props=" + props + '}';
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
similarity index 64%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
index bdbf12a..836e5fb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
@@ -21,18 +21,19 @@ import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Getter;
import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import java.util.Properties;
/**
- * Source rule altered job rate limit algorithm for SPI.
+ * TPS job rate limit algorithm for SPI.
*/
-public final class SourceJobRateLimitAlgorithm implements
JobRateLimitAlgorithm {
+public final class TPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
- private static final String QPS_KEY = "qps";
+ private static final String TPS_KEY = "tps";
- private int qps = 50;
+ private int tps = 2000;
private RateLimiter rateLimiter;
@@ -42,25 +43,33 @@ public final class SourceJobRateLimitAlgorithm implements
JobRateLimitAlgorithm
@Override
public void init() {
- String qpsValue = props.getProperty(QPS_KEY);
- if (!Strings.isNullOrEmpty(qpsValue)) {
- qps = Integer.parseInt(qpsValue);
+ String tpsValue = props.getProperty(TPS_KEY);
+ if (!Strings.isNullOrEmpty(tpsValue)) {
+ tps = Integer.parseInt(tpsValue);
}
- rateLimiter = RateLimiter.create(qps);
+ rateLimiter = RateLimiter.create(tps);
}
@Override
public String getType() {
- return "SOURCE";
+ return "TPS";
}
@Override
- public void onQuery() {
+ public void intercept(final JobOperationType type, final Number data) {
+ switch (type) {
+ case INSERT:
+ case DELETE:
+ case UPDATE:
+ break;
+ default:
+ return;
+ }
rateLimiter.acquire();
}
@Override
public String toString() {
- return "SourceJobRateLimitAlgorithm{" + "props=" + props + '}';
+ return "TPSJobRateLimitAlgorithm{" + "props=" + props + '}';
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index c939b2b..8bb0bdf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -30,6 +30,12 @@ import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.OutputConfigurationSwapper;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
@@ -55,7 +61,9 @@ public final class RuleAlteredContext {
private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
- private final JobRateLimitAlgorithm rateLimitAlgorithm;
+ private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
+
+ private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
private final
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter>
completionDetectAlgorithm;
@@ -73,29 +81,27 @@ public final class RuleAlteredContext {
public RuleAlteredContext(final OnRuleAlteredActionConfiguration
onRuleAlteredActionConfig) {
this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
- ShardingSphereAlgorithmConfiguration rateLimiter =
onRuleAlteredActionConfig.getRateLimiter();
- if (null != rateLimiter) {
- rateLimitAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(rateLimiter,
JobRateLimitAlgorithm.class);
- } else {
- rateLimitAlgorithm = null;
+ InputConfiguration inputConfig = onRuleAlteredActionConfig.getInput();
+ if (null == inputConfig) {
+ inputConfig = new InputConfigurationSwapper().swapToObject(new
YamlInputConfiguration());
}
- ShardingSphereAlgorithmConfiguration completionDetector =
onRuleAlteredActionConfig.getCompletionDetector();
- if (null != completionDetector) {
- completionDetectAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector,
JobCompletionDetectAlgorithm.class);
- } else {
- completionDetectAlgorithm = null;
+ ShardingSphereAlgorithmConfiguration inputRateLimiter =
inputConfig.getRateLimiter();
+ inputRateLimitAlgorithm = null != inputRateLimiter ?
ShardingSphereAlgorithmFactory.createAlgorithm(inputRateLimiter,
JobRateLimitAlgorithm.class) : null;
+ OutputConfiguration outputConfig =
onRuleAlteredActionConfig.getOutput();
+ if (null == outputConfig) {
+ outputConfig = new OutputConfigurationSwapper().swapToObject(new
YamlOutputConfiguration());
}
+ ShardingSphereAlgorithmConfiguration outputRateLimiter =
outputConfig.getRateLimiter();
+ outputRateLimitAlgorithm = null != outputRateLimiter ?
ShardingSphereAlgorithmFactory.createAlgorithm(outputRateLimiter,
JobRateLimitAlgorithm.class) : null;
+ ShardingSphereAlgorithmConfiguration completionDetector =
onRuleAlteredActionConfig.getCompletionDetector();
+ completionDetectAlgorithm = null != completionDetector ?
ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector,
JobCompletionDetectAlgorithm.class) : null;
sourceWritingStopAlgorithm = null;
ShardingSphereAlgorithmConfiguration dataConsistencyChecker =
onRuleAlteredActionConfig.getDataConsistencyChecker();
- if (null != dataConsistencyChecker) {
- dataConsistencyCheckAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(dataConsistencyChecker,
DataConsistencyCheckAlgorithm.class);
- } else {
- dataConsistencyCheckAlgorithm = null;
- }
+ dataConsistencyCheckAlgorithm = null != dataConsistencyChecker ?
ShardingSphereAlgorithmFactory.createAlgorithm(dataConsistencyChecker,
DataConsistencyCheckAlgorithm.class) : null;
checkoutLockAlgorithm = null;
- inventoryDumperExecuteEngine =
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
+ inventoryDumperExecuteEngine =
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread());
incrementalDumperExecuteEngine =
ExecuteEngine.newCachedThreadInstance();
- importerExecuteEngine =
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
+ importerExecuteEngine =
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread());
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
index af22464..ed12596 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
@@ -15,4 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.SourceJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.QPSJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.TPSJobRateLimitAlgorithm
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 5c594af..4bf773c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -36,7 +36,7 @@ public final class InventoryDumperConfiguration extends
DumperConfiguration {
private Integer shardingItem;
- private int readBatchSize = 1000;
+ private int batchSize = 1000;
private JobRateLimitAlgorithm rateLimitAlgorithm;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
similarity index 64%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
index 4217509..ee5ccb2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
+package org.apache.shardingsphere.data.pipeline.api.job;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
/**
- * Job rate limit algorithm, SPI.
+ * Job operation type.
*/
-public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm,
ShardingSphereAlgorithmPostProcessor {
+@RequiredArgsConstructor
+@Getter
+public enum JobOperationType {
- /**
- * Action before query.
- */
- void onQuery();
+ INSERT, DELETE, UPDATE, SELECT,
+ SYSTEM_LOAD, CPU_USAGE,
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
index 4217509..d0df9a5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
@@ -26,7 +27,10 @@ import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmP
public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm,
ShardingSphereAlgorithmPostProcessor {
/**
- * Action before query.
+ * Intercept.
+ *
+ * @param type job operation type
+ * @param data it's delta that means how much changed if type is INSERT,
DELETE, UPDATE, SELECT; it's null if type is SYSTEM_LOAD, CPU_USAGE
*/
- void onQuery();
+ void intercept(JobOperationType type, Number data);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index b1d2cd0..efdaa29 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -98,12 +98,20 @@
# scaling:
# default_scaling:
# blockQueueSize: 10000
-# workerThread: 40
-# readBatchSize: 1000
-# rateLimiter:
-# type: SOURCE
-# props:
-# qps: 50
+# input:
+# workerThread: 40
+# batchSize: 1000
+# rateLimiter:
+# type: QPS
+# props:
+# qps: 50
+# output:
+# workerThread: 40
+# batchSize: 1000
+# rateLimiter:
+# type: TPS
+# props:
+# tps: 2000
# completionDetector:
# type: IDLE
# props:
@@ -195,12 +203,20 @@
# scaling:
# default_scaling:
# blockQueueSize: 10000
-# workerThread: 40
-# readBatchSize: 1000
-# rateLimiter:
-# type: SOURCE
-# props:
-# qps: 50
+# input:
+# workerThread: 40
+# batchSize: 1000
+# rateLimiter:
+# type: QPS
+# props:
+# qps: 50
+# output:
+# workerThread: 40
+# batchSize: 1000
+# rateLimiter:
+# type: TPS
+# props:
+# tps: 2000
# completionDetector:
# type: IDLE
# props:
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
index 1f25db4..3585eb3 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
@@ -15,11 +15,6 @@
# limitations under the License.
#
-scaling:
- port: 8888
- blockQueueSize: 10000
- workerThread: 30
-
mode:
type: Cluster
repository:
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
index 8861e48..e6e492a 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
@@ -52,12 +52,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
+ input:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: QPS
+ props:
+ qps: 50
+ output:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: TPS
+ props:
+ tps: 2000
completionDetector:
type: FIXTURE
dataConsistencyChecker:
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
index 335f438..b569b7d 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
@@ -55,12 +55,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
- workerThread: 40
- readBatchSize: 1000
- rateLimiter:
- type: SOURCE
- props:
- qps: 50
+ input:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: QPS
+ props:
+ qps: 50
+ output:
+ workerThread: 40
+ batchSize: 1000
+ rateLimiter:
+ type: TPS
+ props:
+ tps: 2000
completionDetector:
type: FIXTURE
dataConsistencyChecker: