This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d9801  Add input and output in scaling config; Refactor 
JobRateLimitAlgorithm SPI; Add rateLimiter TPS impl (#14621)
b4d9801 is described below

commit b4d9801fc7526c5d2f259996c9775376963c42de
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Jan 8 13:19:03 2022 +0800

    Add input and output in scaling config; Refactor JobRateLimitAlgorithm SPI; 
Add rateLimiter TPS impl (#14621)
---
 docs/document/content/dev-manual/scaling.cn.md     | 10 ----
 docs/document/content/dev-manual/scaling.en.md     | 10 ----
 .../user-manual/shardingsphere-scaling/build.cn.md | 40 ++++++++++----
 .../user-manual/shardingsphere-scaling/build.en.md | 40 ++++++++++----
 .../resources/yaml/encrypt-dataConverters.yaml     | 20 +++++--
 .../src/test/resources/yaml/sharding-rule.yaml     | 20 +++++--
 .../src/test/resources/yaml/sharding-scaling.yaml  | 20 +++++--
 .../OnRuleAlteredActionConfiguration.java          | 32 +++++++++--
 .../YamlOnRuleAlteredActionConfiguration.java      | 29 ++++++++--
 ...nRuleAlteredActionConfigurationYamlSwapper.java | 64 ++++++++++++++++++++--
 ...eAlteredActionConfigurationYamlSwapperTest.java | 15 ++++-
 .../consistency/DataConsistencyCheckerImpl.java    |  7 ++-
 .../ingest/dumper/AbstractInventoryDumper.java     |  9 +--
 .../core/prepare/InventoryTaskSplitter.java        | 13 ++++-
 ...lgorithm.java => QPSJobRateLimitAlgorithm.java} | 14 +++--
 ...lgorithm.java => TPSJobRateLimitAlgorithm.java} | 31 +++++++----
 .../scenario/rulealtered/RuleAlteredContext.java   | 42 ++++++++------
 ...ta.pipeline.spi.ratelimit.JobRateLimitAlgorithm |  3 +-
 .../ingest/InventoryDumperConfiguration.java       |  2 +-
 .../job/JobOperationType.java}                     | 18 +++---
 .../spi/ratelimit/JobRateLimitAlgorithm.java       |  8 ++-
 .../src/main/resources/conf/config-sharding.yaml   | 40 ++++++++++----
 .../test/resources/docker/scaling/conf/server.yaml |  5 --
 .../config_sharding_sphere_jdbc_source.yaml        | 20 +++++--
 .../config_sharding_sphere_jdbc_target.yaml        | 20 +++++--
 25 files changed, 368 insertions(+), 164 deletions(-)

diff --git a/docs/document/content/dev-manual/scaling.cn.md 
b/docs/document/content/dev-manual/scaling.cn.md
index 6d5c1b9..b2377d5 100644
--- a/docs/document/content/dev-manual/scaling.cn.md
+++ b/docs/document/content/dev-manual/scaling.cn.md
@@ -17,16 +17,6 @@ chapter = true
 | PostgreSQLScalingEntry | 基于 PostgreSQL 的弹性伸缩入口 |
 | OpenGaussScalingEntry  | 基于 openGauss 的弹性伸缩入口  |
 
-## JobRateLimitAlgorithm
-
-| *SPI 名称*                                   | *详细说明*                          
         |
-| ------------------------------------------- | 
------------------------------------------- |
-| JobRateLimitAlgorithm                       | 任务限流算法                         
         |
-
-| *已知实现类*                                  | *详细说明*                            
       |
-| ------------------------------------------- | 
------------------------------------------- |
-| SourceJobRateLimitAlgorithm                 | 源端限流算法                         
         |
-
 ## JobCompletionDetectAlgorithm
 
 | *SPI 名称*                                   | *详细说明*                          
         |
diff --git a/docs/document/content/dev-manual/scaling.en.md 
b/docs/document/content/dev-manual/scaling.en.md
index d3cbe5e..202fb43 100644
--- a/docs/document/content/dev-manual/scaling.en.md
+++ b/docs/document/content/dev-manual/scaling.en.md
@@ -17,16 +17,6 @@ chapter = true
 | PostgreSQLScalingEntry | PostgreSQL entry of scaling |
 | OpenGaussScalingEntry  | openGauss entry of scaling |
 
-## JobRateLimitAlgorithm
-
-| *SPI Name*                                   | *Description*                 
             |
-| ------------------------------------------- | 
------------------------------------------- |
-| JobRateLimitAlgorithm                       | job rate limit algorithm       
             |
-
-| *Implementation Class*                      | *Description*                  
             |
-| ------------------------------------------- | 
------------------------------------------- |
-| SourceJobRateLimitAlgorithm                 | rate limit algorithm for 
source side        |
-
 ## JobCompletionDetectAlgorithm
 
 | *SPI Name*                                  | *Description*                  
             |
diff --git 
a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md 
b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
index 4a99571..50547dc 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
@@ -54,12 +54,20 @@ rules:
   scaling:
     <scaling-action-config-name> (+):
       blockQueueSize: # 数据通道阻塞队列大小
-      workerThread: # 给全量数据摄取和数据导入使用的工作线程池大小
-      readBatchSize: # 一次查询操作返回的最大记录数
-      rateLimiter: # 限流算法
-        type: # 算法类型。可选项:SOURCE
-        props: # 算法属性
-          qps: # QPS属性。适用算法类型:SOURCE
+      input:
+        workerThread: # 从源端摄取全量数据的线程池大小
+        batchSize: # 一次查询操作返回的最大记录数
+        rateLimiter: # 限流算法
+          type: # 算法类型。可选项:QPS
+          props: # 算法属性
+            qps: # qps属性。适用算法类型:QPS
+      output:
+        workerThread: # 数据导入到目标端的线程池大小
+        batchSize: # 一次批量写入操作的最大记录数
+        rateLimiter: # 限流算法
+          type: # 算法类型。可选项:TPS
+          props: # 算法属性
+            tps: # tps属性。适用算法类型:TPS
       completionDetector: # 作业是否接近完成检测算法。如果不配置,那么系统无法自动进行后续步骤,可以通过 DistSQL 
手动操作。
         type: # 算法类型。可选项:IDLE
         props: # 算法属性
@@ -80,12 +88,20 @@ rules:
   scaling:
     default_scaling:
       blockQueueSize: 10000
-      workerThread: 40
-      readBatchSize: 1000
-      rateLimiter:
-        type: SOURCE
-        props:
-          qps: 50
+      input:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: QPS
+          props:
+            qps: 50
+      output:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: TPS
+          props:
+            tps: 2000
       completionDetector:
         type: IDLE
         props:
diff --git 
a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md 
b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
index c41276d..3fe6b41 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
@@ -53,12 +53,20 @@ rules:
   scaling:
     <scaling-action-config-name> (+):
       blockQueueSize: # Data channel blocking queue size
-      workerThread: # Worker thread pool size for inventory data ingestion and 
data importing
-      readBatchSize: # Maximum records count of a query operation returning
-      rateLimiter: # Rate limit algorithm
-        type: # Algorithm type. Options: SOURCE
-        props: # Algorithm properties
-          qps: # QPS property. Available for types: SOURCE
+      input:
+        workerThread: # Worker thread pool size for inventory data ingestion 
from source
+        batchSize: # Maximum records count of a DML select operation
+        rateLimiter: # Rate limit algorithm
+          type: # Algorithm type. Options: QPS
+          props: # Algorithm properties
+            qps: # QPS property. Available for types: QPS
+      output:
+        workerThread: # Worker thread pool size for data importing to target
+        batchSize: # Maximum records count of a DML insert/delete/update 
operation
+        rateLimiter: # Rate limit algorithm
+          type: # Algorithm type. Options: TPS
+          props: # Algorithm properties
+            tps: # TPS property. Available for types: TPS
       completionDetector: # Completion detect algorithm. If it's not 
configured, then system won't continue to do next steps automatically.
         type: # Algorithm type. Options: IDLE
         props: # Algorithm properties
@@ -79,12 +87,20 @@ rules:
   scaling:
     default_scaling:
       blockQueueSize: 10000
-      workerThread: 40
-      readBatchSize: 1000
-      rateLimiter:
-        type: SOURCE
-        props:
-          qps: 50
+      input:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: QPS
+          props:
+            qps: 50
+      output:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: TPS
+          props:
+            tps: 2000
       completionDetector:
         type: IDLE
         props:
diff --git 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index 8767f19..39d44da 100644
--- 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++ 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -19,12 +19,20 @@ dataConverterName: default_convert
 dataConverters:
   default_convert:
     blockQueueSize: 10000
-    workerThread: 40
-    readBatchSize: 1000
-    rateLimiter:
-      type: SOURCE
-      props:
-        qps: 50
+    input:
+      workerThread: 40
+      batchSize: 1000
+      rateLimiter:
+        type: QPS
+        props:
+          qps: 50
+    output:
+      workerThread: 40
+      batchSize: 1000
+      rateLimiter:
+        type: TPS
+        props:
+          tps: 2000
     completionDetector:
       type: IDLE
       props:
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 4a55b94..4dd83cf 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -109,12 +109,20 @@ rules:
   scaling:
     default_scaling:
       blockQueueSize: 10000
-      workerThread: 40
-      readBatchSize: 1000
-      rateLimiter:
-        type: SOURCE
-        props:
-          qps: 50
+      input:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: QPS
+          props:
+            qps: 50
+      output:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: TPS
+          props:
+            tps: 2000
       completionDetector:
         type: IDLE
         props:
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 42d2df1..bb8027d 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -19,12 +19,20 @@ scalingName: default_scaling
 scaling:
   default_scaling:
     blockQueueSize: 10000
-    workerThread: 40
-    readBatchSize: 1000
-    rateLimiter:
-      type: SOURCE
-      props:
-        qps: 50
+    input:
+      workerThread: 40
+      batchSize: 1000
+      rateLimiter:
+        type: QPS
+        props:
+          qps: 50
+    output:
+      workerThread: 40
+      batchSize: 1000
+      rateLimiter:
+        type: TPS
+        props:
+          tps: 2000
     completionDetector:
       type: IDLE
       props:
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 71dfd14..07a82a1 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.infra.config.rulealtered;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 
 /**
@@ -26,17 +27,40 @@ import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
  */
 @RequiredArgsConstructor
 @Getter
+@ToString
 public final class OnRuleAlteredActionConfiguration {
     
     private final int blockQueueSize;
     
-    private final int workerThread;
+    private final InputConfiguration input;
     
-    private final int readBatchSize;
-    
-    private final ShardingSphereAlgorithmConfiguration rateLimiter;
+    private final OutputConfiguration output;
     
     private final ShardingSphereAlgorithmConfiguration completionDetector;
     
     private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    
+    @RequiredArgsConstructor
+    @Getter
+    @ToString
+    public static final class InputConfiguration {
+        
+        private final int workerThread;
+        
+        private final int batchSize;
+        
+        private final ShardingSphereAlgorithmConfiguration rateLimiter;
+    }
+    
+    @RequiredArgsConstructor
+    @Getter
+    @ToString
+    public static final class OutputConfiguration {
+        
+        private final int workerThread;
+        
+        private final int batchSize;
+        
+        private final ShardingSphereAlgorithmConfiguration rateLimiter;
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index b909fc4..4934ea8 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -17,8 +17,10 @@
 
 package org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered;
 
+import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSphereAlgorithmConfiguration;
 
@@ -27,17 +29,36 @@ import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSp
  */
 @Getter
 @Setter
+@ToString
 public final class YamlOnRuleAlteredActionConfiguration implements 
YamlConfiguration {
     
     private int blockQueueSize = 10000;
     
-    private int workerThread = 40;
+    private YamlInputConfiguration input;
     
-    private int readBatchSize = 1000;
-    
-    private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+    private YamlOutputConfiguration output;
     
     private YamlShardingSphereAlgorithmConfiguration completionDetector;
     
     private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    
+    @Data
+    public static final class YamlInputConfiguration implements 
YamlConfiguration {
+        
+        private int workerThread = 40;
+        
+        private int batchSize = 1000;
+        
+        private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+    }
+    
+    @Data
+    public static final class YamlOutputConfiguration implements 
YamlConfiguration {
+        
+        private int workerThread = 40;
+        
+        private int batchSize = 1000;
+        
+        private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 8ee36f6..11e959e 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -18,7 +18,11 @@
 package org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered;
 
 import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.ShardingSphereAlgorithmConfigurationYamlSwapper;
 
@@ -29,6 +33,10 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
     
     private static final ShardingSphereAlgorithmConfigurationYamlSwapper 
ALGORITHM_CONFIG_YAML_SWAPPER = new 
ShardingSphereAlgorithmConfigurationYamlSwapper();
     
+    private static final InputConfigurationSwapper INPUT_CONFIG_SWAPPER = new 
InputConfigurationSwapper();
+    
+    private static final OutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER = 
new OutputConfigurationSwapper();
+    
     @Override
     public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final 
OnRuleAlteredActionConfiguration data) {
         if (null == data) {
@@ -36,9 +44,8 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
         }
         YamlOnRuleAlteredActionConfiguration result = new 
YamlOnRuleAlteredActionConfiguration();
         result.setBlockQueueSize(data.getBlockQueueSize());
-        result.setWorkerThread(data.getWorkerThread());
-        result.setReadBatchSize(data.getReadBatchSize());
-        
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+        
result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
+        
result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
         
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
         
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
         return result;
@@ -49,9 +56,56 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
         if (null == yamlConfig) {
             return null;
         }
-        return new 
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(), 
yamlConfig.getWorkerThread(), yamlConfig.getReadBatchSize(),
-                
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()),
+        return new 
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
+                INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
+                OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
                 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
                 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
     }
+    
+    public static class InputConfigurationSwapper implements 
YamlConfigurationSwapper<YamlInputConfiguration, InputConfiguration> {
+        
+        @Override
+        public YamlInputConfiguration swapToYamlConfiguration(final 
InputConfiguration data) {
+            if (null == data) {
+                return null;
+            }
+            YamlInputConfiguration result = new YamlInputConfiguration();
+            result.setWorkerThread(data.getWorkerThread());
+            result.setBatchSize(data.getBatchSize());
+            
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+            return result;
+        }
+        
+        @Override
+        public InputConfiguration swapToObject(final YamlInputConfiguration 
yamlConfig) {
+            if (null == yamlConfig) {
+                return null;
+            }
+            return new InputConfiguration(yamlConfig.getWorkerThread(), 
yamlConfig.getBatchSize(), 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+        }
+    }
+    
+    public static class OutputConfigurationSwapper implements 
YamlConfigurationSwapper<YamlOutputConfiguration, OutputConfiguration> {
+        
+        @Override
+        public YamlOutputConfiguration swapToYamlConfiguration(final 
OutputConfiguration data) {
+            if (null == data) {
+                return null;
+            }
+            YamlOutputConfiguration result = new YamlOutputConfiguration();
+            result.setWorkerThread(data.getWorkerThread());
+            result.setBatchSize(data.getBatchSize());
+            
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+            return result;
+        }
+        
+        @Override
+        public OutputConfiguration swapToObject(final YamlOutputConfiguration 
yamlConfig) {
+            if (null == yamlConfig) {
+                return null;
+            }
+            return new OutputConfiguration(yamlConfig.getWorkerThread(), 
yamlConfig.getBatchSize(), 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+        }
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index bc0d82d..25b9f8f 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -20,6 +20,8 @@ package 
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered;
 import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSphereAlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.junit.Test;
 
@@ -37,12 +39,19 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapperTest {
     public void assertSwap() {
         YamlOnRuleAlteredActionConfiguration yamlConfig = new 
YamlOnRuleAlteredActionConfiguration();
         yamlConfig.setBlockQueueSize(1000);
-        yamlConfig.setWorkerThread(20);
-        yamlConfig.setReadBatchSize(100);
         Properties rateLimiterProps = new Properties();
         rateLimiterProps.setProperty("batch-size", "1000");
         rateLimiterProps.setProperty("qps", "50");
-        yamlConfig.setRateLimiter(new 
YamlShardingSphereAlgorithmConfiguration("SOURCE", rateLimiterProps));
+        YamlInputConfiguration yamlInputConfig = new YamlInputConfiguration();
+        yamlInputConfig.setWorkerThread(40);
+        yamlInputConfig.setBatchSize(1000);
+        yamlInputConfig.setRateLimiter(new 
YamlShardingSphereAlgorithmConfiguration("INPUT", rateLimiterProps));
+        yamlConfig.setInput(yamlInputConfig);
+        YamlOutputConfiguration yamlOutputConfig = new 
YamlOutputConfiguration();
+        yamlOutputConfig.setWorkerThread(40);
+        yamlOutputConfig.setBatchSize(1000);
+        yamlOutputConfig.setRateLimiter(new 
YamlShardingSphereAlgorithmConfiguration("OUTPUT", rateLimiterProps));
+        yamlConfig.setOutput(yamlOutputConfig);
         Properties completionDetectorProps = new Properties();
         
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold", 
"30");
         yamlConfig.setCompletionDetector(new 
YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 7f0d8cc..565d605 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
@@ -143,7 +144,7 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
         Map<String, Boolean> result = new HashMap<>();
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" 
+ getJobIdPrefix(jobContext.getJobId()) + "-dataCheck-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        JobRateLimitAlgorithm rateLimitAlgorithm = 
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
+        JobRateLimitAlgorithm inputRateLimitAlgorithm = 
jobContext.getRuleAlteredContext().getInputRateLimitAlgorithm();
         try (PipelineDataSourceWrapper sourceDataSource = 
dataSourceFactory.newInstance(sourceDataSourceConfig);
              PipelineDataSourceWrapper targetDataSource = 
dataSourceFactory.newInstance(targetDataSourceConfig)) {
             Map<String, TableMetaData> tableMetaDataMap = 
getTableMetaDataMap(jobContext.getJobConfig().getWorkflowConfig().getSchemaName());
@@ -165,8 +166,8 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
                 Iterator<Object> targetCalculatedResultIterator = 
targetCalculator.calculate(targetCalculateParameter).iterator();
                 boolean calculateResultsEquals = true;
                 while (sourceCalculatedResultIterator.hasNext() && 
targetCalculatedResultIterator.hasNext()) {
-                    if (null != rateLimitAlgorithm) {
-                        rateLimitAlgorithm.onQuery();
+                    if (null != inputRateLimitAlgorithm) {
+                        
inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
                     }
                     Future<Object> sourceFuture = 
executor.submit(sourceCalculatedResultIterator::next);
                     Future<Object> targetFuture = 
executor.submit(targetCalculatedResultIterator::next);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 8a7103c..6860835 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineMetaDataManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -58,7 +59,7 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
     @Getter(AccessLevel.PROTECTED)
     private final InventoryDumperConfiguration inventoryDumperConfig;
     
-    private final int readBatchSize;
+    private final int batchSize;
     
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
@@ -74,7 +75,7 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
             throw new UnsupportedOperationException("AbstractInventoryDumper 
only support StandardPipelineDataSourceConfiguration");
         }
         this.inventoryDumperConfig = inventoryDumperConfig;
-        this.readBatchSize = inventoryDumperConfig.getReadBatchSize();
+        this.batchSize = inventoryDumperConfig.getBatchSize();
         this.rateLimitAlgorithm = 
inventoryDumperConfig.getRateLimitAlgorithm();
         this.dataSourceManager = dataSourceManager;
         tableMetaData = createTableMetaData();
@@ -121,12 +122,12 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
     
     private Optional<Number> dump0(final Connection conn, final String sql, 
final Number startUniqueKeyValue) throws SQLException {
         if (null != rateLimitAlgorithm) {
-            rateLimitAlgorithm.onQuery();
+            rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
         }
         try (PreparedStatement preparedStatement = 
createPreparedStatement(conn, sql)) {
             preparedStatement.setObject(1, startUniqueKeyValue);
             preparedStatement.setObject(2, 
getPositionEndValue(inventoryDumperConfig.getPosition()));
-            preparedStatement.setInt(3, readBatchSize);
+            preparedStatement.setInt(3, batchSize);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 ResultSetMetaData metaData = resultSet.getMetaData();
                 int rowCount = 0;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 67ce4cc..b7a1b03 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -33,8 +33,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
 import 
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import javax.sql.DataSource;
@@ -99,8 +102,12 @@ public final class InventoryTaskSplitter {
     private Collection<InventoryDumperConfiguration> splitByPrimaryKey(
             final RuleAlteredJobContext jobContext, final DataSource 
dataSource, final PipelineMetaDataManager metaDataManager, final 
InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
-        int readBatchSize = 
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getReadBatchSize();
-        JobRateLimitAlgorithm rateLimitAlgorithm = 
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
+        InputConfiguration inputConfig = 
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getInput();
+        if (null == inputConfig) {
+            inputConfig = new InputConfigurationSwapper().swapToObject(new 
YamlInputConfiguration());
+        }
+        int batchSize = inputConfig.getBatchSize();
+        JobRateLimitAlgorithm rateLimitAlgorithm = 
jobContext.getRuleAlteredContext().getInputRateLimitAlgorithm();
         Collection<IngestPosition<?>> inventoryPositions = 
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataManager);
         int i = 0;
         for (IngestPosition<?> inventoryPosition : inventoryPositions) {
@@ -109,7 +116,7 @@ public final class InventoryTaskSplitter {
             splitDumperConfig.setShardingItem(i++);
             splitDumperConfig.setTableName(dumperConfig.getTableName());
             splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
-            splitDumperConfig.setReadBatchSize(readBatchSize);
+            splitDumperConfig.setBatchSize(batchSize);
             splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
             result.add(splitDumperConfig);
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
similarity index 80%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
index bdbf12a..0e1dc3c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/QPSJobRateLimitAlgorithm.java
@@ -21,14 +21,15 @@ import com.google.common.base.Strings;
 import com.google.common.util.concurrent.RateLimiter;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 
 import java.util.Properties;
 
 /**
- * Source rule altered job rate limit algorithm for SPI.
+ * QPS job rate limit algorithm for SPI.
  */
-public final class SourceJobRateLimitAlgorithm implements 
JobRateLimitAlgorithm {
+public final class QPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
     
     private static final String QPS_KEY = "qps";
     
@@ -51,16 +52,19 @@ public final class SourceJobRateLimitAlgorithm implements 
JobRateLimitAlgorithm
     
     @Override
     public String getType() {
-        return "SOURCE";
+        return "QPS";
     }
     
     @Override
-    public void onQuery() {
+    public void intercept(final JobOperationType type, final Number data) {
+        if (type != JobOperationType.SELECT) {
+            return;
+        }
         rateLimiter.acquire();
     }
     
     @Override
     public String toString() {
-        return "SourceJobRateLimitAlgorithm{" + "props=" + props + '}';
+        return "QPSJobRateLimitAlgorithm{" + "props=" + props + '}';
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
similarity index 64%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
index bdbf12a..836e5fb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/SourceJobRateLimitAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
@@ -21,18 +21,19 @@ import com.google.common.base.Strings;
 import com.google.common.util.concurrent.RateLimiter;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 
 import java.util.Properties;
 
 /**
- * Source rule altered job rate limit algorithm for SPI.
+ * TPS job rate limit algorithm for SPI.
  */
-public final class SourceJobRateLimitAlgorithm implements 
JobRateLimitAlgorithm {
+public final class TPSJobRateLimitAlgorithm implements JobRateLimitAlgorithm {
     
-    private static final String QPS_KEY = "qps";
+    private static final String TPS_KEY = "tps";
     
-    private int qps = 50;
+    private int tps = 2000;
     
     private RateLimiter rateLimiter;
     
@@ -42,25 +43,33 @@ public final class SourceJobRateLimitAlgorithm implements 
JobRateLimitAlgorithm
     
     @Override
     public void init() {
-        String qpsValue = props.getProperty(QPS_KEY);
-        if (!Strings.isNullOrEmpty(qpsValue)) {
-            qps = Integer.parseInt(qpsValue);
+        String tpsValue = props.getProperty(TPS_KEY);
+        if (!Strings.isNullOrEmpty(tpsValue)) {
+            tps = Integer.parseInt(tpsValue);
         }
-        rateLimiter = RateLimiter.create(qps);
+        rateLimiter = RateLimiter.create(tps);
     }
     
     @Override
     public String getType() {
-        return "SOURCE";
+        return "TPS";
     }
     
     @Override
-    public void onQuery() {
+    public void intercept(final JobOperationType type, final Number data) {
+        switch (type) {
+            case INSERT:
+            case DELETE:
+            case UPDATE:
+                break;
+            default:
+                return;
+        }
         rateLimiter.acquire();
     }
     
     @Override
     public String toString() {
-        return "SourceJobRateLimitAlgorithm{" + "props=" + props + '}';
+        return "TPSJobRateLimitAlgorithm{" + "props=" + props + '}';
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index c939b2b..8bb0bdf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -30,6 +30,12 @@ import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.OutputConfigurationSwapper;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
@@ -55,7 +61,9 @@ public final class RuleAlteredContext {
     
     private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
     
-    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
+    
+    private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
     
     private final 
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> 
completionDetectAlgorithm;
     
@@ -73,29 +81,27 @@ public final class RuleAlteredContext {
     
     public RuleAlteredContext(final OnRuleAlteredActionConfiguration 
onRuleAlteredActionConfig) {
         this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
-        ShardingSphereAlgorithmConfiguration rateLimiter = 
onRuleAlteredActionConfig.getRateLimiter();
-        if (null != rateLimiter) {
-            rateLimitAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(rateLimiter, 
JobRateLimitAlgorithm.class);
-        } else {
-            rateLimitAlgorithm = null;
+        InputConfiguration inputConfig = onRuleAlteredActionConfig.getInput();
+        if (null == inputConfig) {
+            inputConfig = new InputConfigurationSwapper().swapToObject(new 
YamlInputConfiguration());
         }
-        ShardingSphereAlgorithmConfiguration completionDetector = 
onRuleAlteredActionConfig.getCompletionDetector();
-        if (null != completionDetector) {
-            completionDetectAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector, 
JobCompletionDetectAlgorithm.class);
-        } else {
-            completionDetectAlgorithm = null;
+        ShardingSphereAlgorithmConfiguration inputRateLimiter = 
inputConfig.getRateLimiter();
+        inputRateLimitAlgorithm = null != inputRateLimiter ? 
ShardingSphereAlgorithmFactory.createAlgorithm(inputRateLimiter, 
JobRateLimitAlgorithm.class) : null;
+        OutputConfiguration outputConfig = 
onRuleAlteredActionConfig.getOutput();
+        if (null == outputConfig) {
+            outputConfig = new OutputConfigurationSwapper().swapToObject(new 
YamlOutputConfiguration());
         }
+        ShardingSphereAlgorithmConfiguration outputRateLimiter = 
outputConfig.getRateLimiter();
+        outputRateLimitAlgorithm = null != outputRateLimiter ? 
ShardingSphereAlgorithmFactory.createAlgorithm(outputRateLimiter, 
JobRateLimitAlgorithm.class) : null;
+        ShardingSphereAlgorithmConfiguration completionDetector = 
onRuleAlteredActionConfig.getCompletionDetector();
+        completionDetectAlgorithm = null != completionDetector ? 
ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector, 
JobCompletionDetectAlgorithm.class) : null;
         sourceWritingStopAlgorithm = null;
         ShardingSphereAlgorithmConfiguration dataConsistencyChecker = 
onRuleAlteredActionConfig.getDataConsistencyChecker();
-        if (null != dataConsistencyChecker) {
-            dataConsistencyCheckAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(dataConsistencyChecker, 
DataConsistencyCheckAlgorithm.class);
-        } else {
-            dataConsistencyCheckAlgorithm = null;
-        }
+        dataConsistencyCheckAlgorithm = null != dataConsistencyChecker ? 
ShardingSphereAlgorithmFactory.createAlgorithm(dataConsistencyChecker, 
DataConsistencyCheckAlgorithm.class) : null;
         checkoutLockAlgorithm = null;
-        inventoryDumperExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
+        inventoryDumperExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread());
         incrementalDumperExecuteEngine = 
ExecuteEngine.newCachedThreadInstance();
-        importerExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(onRuleAlteredActionConfig.getWorkerThread());
+        importerExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread());
     }
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
index af22464..ed12596 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
@@ -15,4 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.SourceJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.QPSJobRateLimitAlgorithm
+org.apache.shardingsphere.data.pipeline.core.spi.ratelimit.TPSJobRateLimitAlgorithm
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 5c594af..4bf773c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -36,7 +36,7 @@ public final class InventoryDumperConfiguration extends 
DumperConfiguration {
     
     private Integer shardingItem;
     
-    private int readBatchSize = 1000;
+    private int batchSize = 1000;
     
     private JobRateLimitAlgorithm rateLimitAlgorithm;
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
similarity index 64%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
index 4217509..ee5ccb2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobOperationType.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
+package org.apache.shardingsphere.data.pipeline.api.job;
 
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
 /**
- * Job rate limit algorithm, SPI.
+ * Job operation type.
  */
-public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm, 
ShardingSphereAlgorithmPostProcessor {
+@RequiredArgsConstructor
+@Getter
+public enum JobOperationType {
     
-    /**
-     * Action before query.
-     */
-    void onQuery();
+    INSERT, DELETE, UPDATE, SELECT,
+    SYSTEM_LOAD, CPU_USAGE,
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
index 4217509..d0df9a5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
 
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
@@ -26,7 +27,10 @@ import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmP
 public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm, 
ShardingSphereAlgorithmPostProcessor {
     
     /**
-     * Action before query.
+     * Intercept.
+     *
+     * @param type job operation type
+     * @param data it's delta that means how much changed if type is INSERT, 
DELETE, UPDATE, SELECT; it's null if type is SYSTEM_LOAD, CPU_USAGE
      */
-    void onQuery();
+    void intercept(JobOperationType type, Number data);
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index b1d2cd0..efdaa29 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -98,12 +98,20 @@
 #  scaling:
 #    default_scaling:
 #      blockQueueSize: 10000
-#      workerThread: 40
-#      readBatchSize: 1000
-#      rateLimiter:
-#        type: SOURCE
-#        props:
-#          qps: 50
+#      input:
+#        workerThread: 40
+#        batchSize: 1000
+#        rateLimiter:
+#          type: QPS
+#          props:
+#            qps: 50
+#      output:
+#        workerThread: 40
+#        batchSize: 1000
+#        rateLimiter:
+#          type: TPS
+#          props:
+#            tps: 2000
 #      completionDetector:
 #        type: IDLE
 #        props:
@@ -195,12 +203,20 @@
 #  scaling:
 #    default_scaling:
 #      blockQueueSize: 10000
-#      workerThread: 40
-#      readBatchSize: 1000
-#      rateLimiter:
-#        type: SOURCE
-#        props:
-#          qps: 50
+#      input:
+#        workerThread: 40
+#        batchSize: 1000
+#        rateLimiter:
+#          type: QPS
+#          props:
+#            qps: 50
+#      output:
+#        workerThread: 40
+#        batchSize: 1000
+#        rateLimiter:
+#          type: TPS
+#          props:
+#            tps: 2000
 #      completionDetector:
 #        type: IDLE
 #        props:
diff --git 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
index 1f25db4..3585eb3 100644
--- 
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
+++ 
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/resources/docker/scaling/conf/server.yaml
@@ -15,11 +15,6 @@
 # limitations under the License.
 #
 
-scaling:
-  port: 8888
-  blockQueueSize: 10000
-  workerThread: 30
-
 mode:
   type: Cluster
   repository:
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
index 8861e48..e6e492a 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
@@ -52,12 +52,20 @@ rules:
   scaling:
     default_scaling:
       blockQueueSize: 10000
-      workerThread: 40
-      readBatchSize: 1000
-      rateLimiter:
-        type: SOURCE
-        props:
-          qps: 50
+      input:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: QPS
+          props:
+            qps: 50
+      output:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: TPS
+          props:
+            tps: 2000
       completionDetector:
         type: FIXTURE
       dataConsistencyChecker:
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
index 335f438..b569b7d 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
@@ -55,12 +55,20 @@ rules:
   scaling:
     default_scaling:
       blockQueueSize: 10000
-      workerThread: 40
-      readBatchSize: 1000
-      rateLimiter:
-        type: SOURCE
-        props:
-          qps: 50
+      input:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: QPS
+          props:
+            qps: 50
+      output:
+        workerThread: 40
+        batchSize: 1000
+        rateLimiter:
+          type: TPS
+          props:
+            tps: 2000
       completionDetector:
         type: FIXTURE
       dataConsistencyChecker:

Reply via email to