This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ab3b699 Disable rateLimiter config by default in scaling (#15437)
ab3b699 is described below
commit ab3b69972ccb77d7d6febbb5129fb3b59720c274
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Feb 16 16:30:49 2022 +0800
Disable rateLimiter config by default in scaling (#15437)
* Disable rateLimiter config by default
* Unit test
---
.../syntax/rdl/rule-definition/sharding.cn.md | 6 +-
.../syntax/rdl/rule-definition/sharding.en.md | 6 +-
.../distsql/syntax/rql/rule-query/sharding.cn.md | 2 +-
.../distsql/syntax/rql/rule-query/sharding.en.md | 2 +-
.../user-manual/shardingsphere-scaling/build.cn.md | 20 ++----
.../user-manual/shardingsphere-scaling/build.en.md | 20 ++----
.../resources/yaml/encrypt-dataConverters.yaml | 8 ---
.../src/test/resources/yaml/sharding-rule.yaml | 8 ---
.../src/test/resources/yaml/sharding-scaling.yaml | 8 ---
.../FixtureInputJobRateLimitAlgorithm.java | 39 +++++++++++
.../FixtureOutputJobRateLimitAlgorithm.java | 39 +++++++++++
...ateShardingScalingRuleStatementUpdaterTest.java | 18 +++---
...ta.pipeline.spi.ratelimit.JobRateLimitAlgorithm | 4 +-
.../spi/ratelimit/QPSJobRateLimitAlgorithm.java | 70 --------------------
.../spi/ratelimit/TPSJobRateLimitAlgorithm.java | 75 ----------------------
.../src/main/resources/conf/config-sharding.yaml | 16 -----
.../config_sharding_sphere_jdbc_source.yaml | 8 ---
.../config_sharding_sphere_jdbc_target.yaml | 8 ---
18 files changed, 104 insertions(+), 253 deletions(-)
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
index 14daa22..7ab94b4 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
@@ -293,13 +293,11 @@ DROP SHARDING BROADCAST TABLE RULES t_b;
CREATE SHARDING SCALING RULE sharding_scaling(
INPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=QPS, PROPERTIES("qps"=50)))
+ BATCH_SIZE=1000
),
OUTPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=TPS, PROPERTIES("tps"=2000)))
+ BATCH_SIZE=1000
),
STREAM_CHANNEL(TYPE(NAME=MEMORY, PROPERTIES("block-queue-size"=10000))),
COMPLETION_DETECTOR(TYPE(NAME=IDLE,
PROPERTIES("incremental-task-idle-minute-threshold"=30))),
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
index f8390e2..7e20f93 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
@@ -291,13 +291,11 @@ DROP SHARDING BROADCAST TABLE RULES;
CREATE SHARDING SCALING RULE sharding_scaling(
INPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=QPS, PROPERTIES("qps"=50)))
+ BATCH_SIZE=1000
),
OUTPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=TPS, PROPERTIES("tps"=2000)))
+ BATCH_SIZE=1000
),
STREAM_CHANNEL(TYPE(NAME=MEMORY, PROPERTIES("block-queue-size"=10000))),
COMPLETION_DETECTOR(TYPE(NAME=IDLE,
PROPERTIES("incremental-task-idle-minute-threshold"=30))),
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.cn.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.cn.md
index ae129bb..6dde648 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.cn.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.cn.md
@@ -272,7 +272,7 @@ mysql> SHOW SHARDING SCALING RULES;
+------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+-----------------------------------------------------+
| name | input
| output
| stream_channel
| completion_detector
| data_consistency_checker |
+------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+-----------------------------------------------------+
-| sharding_scaling |
{"workerThread":40,"batchSize":1000,"rateLimiter":{"type":"QPS","props":{"qps":"50"}}}
|
{"workerThread":40,"batchSize":1000,"rateLimiter":{"type":"TPS","props":{"tps":"2000"}}}
| {"type":"MEMORY","props":{"block-queue-size":"10000"}} |
{"type":"IDLE","props":{"incremental-task-idle-minute-threshold":"30"}} |
{"type":"DATA_MATCH","props":{"chunk-size":"1000"}} |
+| sharding_scaling | {"workerThread":40,"batchSize":1000} |
{"workerThread":40,"batchSize":1000} |
{"type":"MEMORY","props":{"block-queue-size":"10000"}} |
{"type":"IDLE","props":{"incremental-task-idle-minute-threshold":"30"}} |
{"type":"DATA_MATCH","props":{"chunk-size":"1000"}} |
+------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+-----------------------------------------------------+
1 row in set (0.00 sec)
```
\ No newline at end of file
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.en.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.en.md
index 798d61b..e7c9dfc 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.en.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rql/rule-query/sharding.en.md
@@ -272,7 +272,7 @@ mysql> SHOW SHARDING SCALING RULES;
+------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+-----------------------------------------------------+
| name | input
| output
| stream_channel
| completion_detector
| data_consistency_checker |
+------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+-----------------------------------------------------+
-| sharding_scaling |
{"workerThread":40,"batchSize":1000,"rateLimiter":{"type":"QPS","props":{"qps":"50"}}}
|
{"workerThread":40,"batchSize":1000,"rateLimiter":{"type":"TPS","props":{"tps":"2000"}}}
| {"type":"MEMORY","props":{"block-queue-size":"10000"}} |
{"type":"IDLE","props":{"incremental-task-idle-minute-threshold":"30"}} |
{"type":"DATA_MATCH","props":{"chunk-size":"1000"}} |
+| sharding_scaling | {"workerThread":40,"batchSize":1000} |
{"workerThread":40,"batchSize":1000} |
{"type":"MEMORY","props":{"block-queue-size":"10000"}} |
{"type":"IDLE","props":{"incremental-task-idle-minute-threshold":"30"}} |
{"type":"DATA_MATCH","props":{"chunk-size":"1000"}} |
+------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+--------------------------------------------------------+-------------------------------------------------------------------------+-----------------------------------------------------+
1 row in set (0.00 sec)
```
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
index d420975..6858084 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
@@ -59,16 +59,14 @@ rules:
workerThread: # 从源端摄取全量数据的线程池大小。如果不配置则使用默认值。
batchSize: # 一次查询操作返回的最大记录数。如果不配置则使用默认值。
rateLimiter: # 限流算法。如果不配置则不限流。
- type: # 算法类型。可选项:QPS
+ type: # 算法类型。可选项:
props: # 算法属性
- qps: # qps属性。适用算法类型:QPS
output: # 数据写入配置。如果不配置则部分参数默认生效。
workerThread: # 数据写入到目标端的线程池大小。如果不配置则使用默认值。
batchSize: # 一次批量写入操作的最大记录数。如果不配置则使用默认值。
rateLimiter: # 限流算法。如果不配置则不限流。
- type: # 算法类型。可选项:TPS
+ type: # 算法类型。可选项:
props: # 算法属性
- tps: # tps属性。适用算法类型:TPS
streamChannel: # 数据通道,连接生产者和消费者,用于 input 和 output 环节。如果不配置则默认使用 MEMORY 类型
type: # 算法类型。可选项:MEMORY
props: # 算法属性
@@ -95,17 +93,9 @@ rules:
input:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
output:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
streamChannel:
type: MEMORY
props:
@@ -129,13 +119,11 @@ rules:
CREATE SHARDING SCALING RULE default_scaling (
INPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=QPS, PROPERTIES("qps"=50)))
+ BATCH_SIZE=1000
),
OUTPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=TPS, PROPERTIES("tps"=2000)))
+ BATCH_SIZE=1000
),
STREAM_CHANNEL(TYPE(NAME=MEMORY, PROPERTIES("block-queue-size"=10000))),
COMPLETION_DETECTOR(TYPE(NAME=IDLE,
PROPERTIES("incremental-task-idle-minute-threshold"=3))),
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
index fc8f343..2bea501 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
@@ -58,16 +58,14 @@ rules:
workerThread: # Worker thread pool size for inventory data ingestion
from source. If it's not configured, then use system default value.
batchSize: # Maximum records count of a DML select operation. If it's
not configured, then use system default value.
rateLimiter: # Rate limit algorithm. If it's not configured, then
system will skip rate limit.
- type: # Algorithm type. Options: QPS
+ type: # Algorithm type. Options:
props: # Algorithm properties
- qps: # QPS property. Available for types: QPS
output: # Data write configuration. If it's not configured, then part of
its configuration will take effect.
workerThread: # Worker thread pool size for data importing to target.
If it's not configured, then use system default value.
batchSize: # Maximum records count of a DML insert/delete/update
operation. If it's not configured, then use system default value.
rateLimiter: # Rate limit algorithm. If it's not configured, then
system will skip rate limit.
- type: # Algorithm type. Options: TPS
+ type: # Algorithm type. Options:
props: # Algorithm properties
- tps: # TPS property. Available for types: TPS
streamChannel: # Algorithm of channel that connect producer and
consumer, used for input and output. If it's not configured, then system will
use MEMORY type
type: # Algorithm type. Options: MEMORY
props: # Algorithm properties
@@ -94,17 +92,9 @@ rules:
input:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
output:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
streamChannel:
type: MEMORY
props:
@@ -128,13 +118,11 @@ Create scaling configuration example:
CREATE SHARDING SCALING RULE default_scaling (
INPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=QPS, PROPERTIES("qps"=50)))
+ BATCH_SIZE=1000
),
OUTPUT(
WORKER_THREAD=40,
- BATCH_SIZE=1000,
- RATE_LIMITER(TYPE(NAME=TPS, PROPERTIES("tps"=2000)))
+ BATCH_SIZE=1000
),
STREAM_CHANNEL(TYPE(NAME=MEMORY, PROPERTIES("block-queue-size"=10000))),
COMPLETION_DETECTOR(TYPE(NAME=IDLE,
PROPERTIES("incremental-task-idle-minute-threshold"=3))),
diff --git
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index 04a1ae2..c474379 100644
---
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -21,17 +21,9 @@ dataConverters:
input:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
output:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
streamChannel:
type: MEMORY
props:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 58f8850..f99310e 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -111,17 +111,9 @@ rules:
input:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
output:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
streamChannel:
type: MEMORY
props:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 20ee9fe..0af8c08 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -21,17 +21,9 @@ scaling:
input:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
output:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
streamChannel:
type: MEMORY
props:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/FixtureInputJobRateLimitAlgorithm.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/FixtureInputJobRateLimitAlgorithm.java
new file mode 100644
index 0000000..e476017
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/FixtureInputJobRateLimitAlgorithm.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+
+/**
+ * Fixture input job rate limit algorithm for SPI.
+ */
+public final class FixtureInputJobRateLimitAlgorithm implements
JobRateLimitAlgorithm {
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public String getType() {
+ return "FIXTURE_INPUT";
+ }
+
+ @Override
+ public void intercept(final JobOperationType type, final Number data) {
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/FixtureOutputJobRateLimitAlgorithm.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/FixtureOutputJobRateLimitAlgorithm.java
new file mode 100644
index 0000000..fe95205
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/FixtureOutputJobRateLimitAlgorithm.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+
+/**
+ * Fixture output job rate limit algorithm for SPI.
+ */
+public final class FixtureOutputJobRateLimitAlgorithm implements
JobRateLimitAlgorithm {
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public String getType() {
+ return "FIXTURE_OUTPUT";
+ }
+
+ @Override
+ public void intercept(final JobOperationType type, final Number data) {
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
index 2e3582b..aea2d50 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
@@ -41,7 +41,9 @@ import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public final class CreateShardingScalingRuleStatementUpdaterTest {
- private static final String QPS_TYPE = "qps";
+ private static final String LIMIT_TYPE_INPUT = "FIXTURE_INPUT";
+
+ private static final String LIMIT_TYPE_OUTPUT = "FIXTURE_OUTPUT";
@Mock
private ShardingSphereMetaData shardingSphereMetaData;
@@ -122,8 +124,8 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
String key = result.getScaling().keySet().iterator().next();
assertThat(key, is("default_scaling"));
OnRuleAlteredActionConfiguration value = result.getScaling().get(key);
- assertThat(value.getInput().getRateLimiter().getType(), is(QPS_TYPE));
- assertThat(value.getOutput().getRateLimiter().getType(), is("TPS"));
+ assertThat(value.getInput().getRateLimiter().getType(),
is(LIMIT_TYPE_INPUT));
+ assertThat(value.getOutput().getRateLimiter().getType(),
is(LIMIT_TYPE_OUTPUT));
assertThat(value.getStreamChannel().getType(), is("MEMORY"));
assertThat(value.getCompletionDetector().getType(), is("IDLE"));
assertThat(value.getDataConsistencyChecker().getType(),
is("DATA_MATCH"));
@@ -141,8 +143,8 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
String key = currentRuleConfig.getScaling().keySet().iterator().next();
assertThat(key, is("default_scaling"));
OnRuleAlteredActionConfiguration value =
currentRuleConfig.getScaling().get(key);
- assertThat(value.getInput().getRateLimiter().getType(), is(QPS_TYPE));
- assertThat(value.getOutput().getRateLimiter().getType(), is("TPS"));
+ assertThat(value.getInput().getRateLimiter().getType(),
is(LIMIT_TYPE_INPUT));
+ assertThat(value.getOutput().getRateLimiter().getType(),
is(LIMIT_TYPE_OUTPUT));
assertThat(value.getStreamChannel().getType(), is("MEMORY"));
assertThat(value.getCompletionDetector().getType(), is("IDLE"));
assertThat(value.getDataConsistencyChecker().getType(),
is("DATA_MATCH"));
@@ -155,7 +157,7 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
private ShardingScalingRuleConfigurationSegment
createConfigurationWithInvalidRateLimiter() {
ShardingScalingRuleConfigurationSegment result = new
ShardingScalingRuleConfigurationSegment();
- result.setInputSegment(createInputOrOutputSegment("TPS"));
+ result.setInputSegment(createInputOrOutputSegment(LIMIT_TYPE_OUTPUT));
result.setOutputSegment(createInputOrOutputSegment("INVALID"));
return result;
}
@@ -184,8 +186,8 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
private ShardingScalingRuleConfigurationSegment
createCompleteConfiguration() {
ShardingScalingRuleConfigurationSegment result = new
ShardingScalingRuleConfigurationSegment();
- result.setInputSegment(createInputOrOutputSegment(QPS_TYPE));
- result.setOutputSegment(createInputOrOutputSegment("TPS"));
+ result.setInputSegment(createInputOrOutputSegment(LIMIT_TYPE_INPUT));
+ result.setOutputSegment(createInputOrOutputSegment(LIMIT_TYPE_OUTPUT));
result.setStreamChannel(createAlgorithmSegment("MEMORY"));
result.setCompletionDetector(createAlgorithmSegment("IDLE"));
AlgorithmSegment dataConsistencyChecker =
createAlgorithmSegment("DATA_MATCH");
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
similarity index 81%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
rename to
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
index ed12596..ad29654 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
@@ -15,5 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.QPSJobRateLimitAlgorithm
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.TPSJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.spi.ratelimit.FixtureInputJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.spi.ratelimit.FixtureOutputJobRateLimitAlgorithm
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
deleted file mode 100644
index 0e1dc3c..0000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.spi.ratelimit;
-
-import com.google.common.base.Strings;
-import com.google.common.util.concurrent.RateLimiter;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-
-import java.util.Properties;
-
-/**
- * QPS job rate limit algorithm for SPI.
- */
-public final class QPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
-
- private static final String QPS_KEY = "qps";
-
- private int qps = 50;
-
- private RateLimiter rateLimiter;
-
- @Getter
- @Setter
- private Properties props = new Properties();
-
- @Override
- public void init() {
- String qpsValue = props.getProperty(QPS_KEY);
- if (!Strings.isNullOrEmpty(qpsValue)) {
- qps = Integer.parseInt(qpsValue);
- }
- rateLimiter = RateLimiter.create(qps);
- }
-
- @Override
- public String getType() {
- return "QPS";
- }
-
- @Override
- public void intercept(final JobOperationType type, final Number data) {
- if (type != JobOperationType.SELECT) {
- return;
- }
- rateLimiter.acquire();
- }
-
- @Override
- public String toString() {
- return "QPSJobRateLimitAlgorithm{" + "props=" + props + '}';
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
deleted file mode 100644
index 836e5fb..0000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.spi.ratelimit;
-
-import com.google.common.base.Strings;
-import com.google.common.util.concurrent.RateLimiter;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-
-import java.util.Properties;
-
-/**
- * TPS job rate limit algorithm for SPI.
- */
-public final class TPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
-
- private static final String TPS_KEY = "tps";
-
- private int tps = 2000;
-
- private RateLimiter rateLimiter;
-
- @Getter
- @Setter
- private Properties props = new Properties();
-
- @Override
- public void init() {
- String tpsValue = props.getProperty(TPS_KEY);
- if (!Strings.isNullOrEmpty(tpsValue)) {
- tps = Integer.parseInt(tpsValue);
- }
- rateLimiter = RateLimiter.create(tps);
- }
-
- @Override
- public String getType() {
- return "TPS";
- }
-
- @Override
- public void intercept(final JobOperationType type, final Number data) {
- switch (type) {
- case INSERT:
- case DELETE:
- case UPDATE:
- break;
- default:
- return;
- }
- rateLimiter.acquire();
- }
-
- @Override
- public String toString() {
- return "TPSJobRateLimitAlgorithm{" + "props=" + props + '}';
- }
-}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index 653bf5f..0b6b770 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -100,17 +100,9 @@
# input:
# workerThread: 40
# batchSize: 1000
-# rateLimiter:
-# type: QPS
-# props:
-# qps: 50
# output:
# workerThread: 40
# batchSize: 1000
-# rateLimiter:
-# type: TPS
-# props:
-# tps: 2000
# streamChannel:
# type: MEMORY
# props:
@@ -206,17 +198,9 @@
# input:
# workerThread: 40
# batchSize: 1000
-# rateLimiter:
-# type: QPS
-# props:
-# qps: 50
# output:
# workerThread: 40
# batchSize: 1000
-# rateLimiter:
-# type: TPS
-# props:
-# tps: 2000
# streamChannel:
# type: MEMORY
# props:
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
index 55d7c32..b073165 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
@@ -54,17 +54,9 @@ rules:
input:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
output:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
streamChannel:
type: MEMORY
props:
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
index 02c7e6c..6e53d42 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
@@ -57,17 +57,9 @@ rules:
input:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
output:
workerThread: 40
batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
streamChannel:
type: MEMORY
props: