This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f8e48c8  Add rateLimiter SPI and default implementation, add 
readBatchSize and integrate with inventory dumper, refactor blockQueueSize 
integration (#14215)
f8e48c8 is described below

commit f8e48c8be63adac2b7a2a25d9e6723703dd84311
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 22 13:45:54 2021 +0800

    Add rateLimiter SPI and default implementation, add readBatchSize and 
integrate with inventory dumper, refactor blockQueueSize integration (#14215)
---
 .../resources/yaml/encrypt-dataConverters.yaml     |  5 ++
 ...hardingRuleAlteredJobConfigurationPreparer.java | 14 +++-
 .../src/test/resources/yaml/sharding-rule.yaml     |  1 +
 .../src/test/resources/yaml/sharding-scaling.yaml  |  5 ++
 .../OnRuleAlteredActionConfiguration.java          |  4 +
 .../YamlOnRuleAlteredActionConfiguration.java      |  4 +
 ...nRuleAlteredActionConfigurationYamlSwapper.java |  5 +-
 ...eAlteredActionConfigurationYamlSwapperTest.java |  5 ++
 .../api/config/ingest/DumperConfiguration.java     |  2 +
 .../ingest/InventoryDumperConfiguration.java       |  6 ++
 .../core/ingest/channel/MemoryChannel.java         |  8 +-
 .../channel/distribution/BlockingQueueChannel.java | 11 ++-
 .../channel/distribution/DistributionChannel.java  |  4 +-
 .../ingest/dumper/AbstractInventoryDumper.java     | 94 +++++++++++++++++-----
 .../core/prepare/InventoryTaskSplitter.java        | 13 ++-
 .../ratelimit/SourceJobRateLimitAlgorithm.java     | 61 ++++++++++++++
 .../data/pipeline/core/task/IncrementalTask.java   |  2 +-
 .../data/pipeline/core/task/InventoryTask.java     |  2 +-
 .../scenario/rulealtered/RuleAlteredContext.java   | 10 +++
 .../spi/ratelimit/JobRateLimitAlgorithm.java       | 24 +++---
 ...ta.pipeline.spi.ratelimit.JobRateLimitAlgorithm | 14 +---
 .../src/main/resources/conf/config-sharding.yaml   |  5 ++
 .../distribution/DistributionChannelTest.java      |  2 +-
 .../pipeline/core/job/FinishedCheckJobTest.java    | 24 +++---
 .../data/pipeline/core/task/InventoryTaskTest.java | 13 ++-
 .../config_sharding_sphere_jdbc_source.yaml        | 11 ++-
 .../config_sharding_sphere_jdbc_target.yaml        | 11 ++-
 27 files changed, 277 insertions(+), 83 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index c884cb9..eb06d0c 100644
--- 
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++ 
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -20,6 +20,11 @@ dataConverters:
   default_convert:
     blockQueueSize: 10000
     workerThread: 40
+    readBatchSize: 1000
+    rateLimiter:
+      type: SOURCE
+      props:
+        qps: 50
     completionDetector:
       type: IDLE
       props:
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
index 46382ec..d463309 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.ShardingSphereJDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.StandardJDBCDataSourceConfiguration;
+import 
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
@@ -135,7 +136,8 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         filterByShardingDataSourceTables(dataSourceTableNameMap, handleConfig);
         Map<String, Set<String>> shardingColumnsMap = 
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig));
         for (Entry<String, Map<String, String>> entry : 
dataSourceTableNameMap.entrySet()) {
-            DumperConfiguration dumperConfig = 
createDumperConfig(entry.getKey(), 
sourceDataSource.get(entry.getKey()).getProps(), entry.getValue());
+            OnRuleAlteredActionConfiguration ruleAlteredActionConfig = 
getRuleAlteredActionConfig(targetRuleConfig.orElse(sourceRuleConfig)).orElse(null);
+            DumperConfiguration dumperConfig = 
createDumperConfig(entry.getKey(), 
sourceDataSource.get(entry.getKey()).getProps(), entry.getValue(), 
ruleAlteredActionConfig);
             ImporterConfiguration importerConfig = 
createImporterConfig(ruleConfig, handleConfig, shardingColumnsMap);
             TaskConfiguration taskConfig = new TaskConfiguration(handleConfig, 
dumperConfig, importerConfig);
             log.info("toTaskConfigs, dataSourceName={}, taskConfig={}", 
entry.getKey(), taskConfig);
@@ -144,6 +146,10 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return result;
     }
     
+    private Optional<OnRuleAlteredActionConfiguration> 
getRuleAlteredActionConfig(final ShardingRuleConfiguration shardingRuleConfig) {
+        return 
Optional.ofNullable(shardingRuleConfig.getScaling().get(shardingRuleConfig.getScalingName()));
+    }
+    
     private static ShardingSphereJDBCDataSourceConfiguration 
getSourceConfiguration(final RuleConfiguration ruleConfig) {
         JDBCDataSourceConfiguration result = ruleConfig.getSource().unwrap();
         Preconditions.checkArgument(result instanceof 
ShardingSphereJDBCDataSourceConfiguration, "Only support ShardingSphere source 
data source.");
@@ -261,11 +267,15 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return Collections.emptySet();
     }
     
-    private static DumperConfiguration createDumperConfig(final String 
dataSourceName, final Map<String, Object> props, final Map<String, String> 
tableMap) {
+    private static DumperConfiguration createDumperConfig(final String 
dataSourceName, final Map<String, Object> props, final Map<String, String> 
tableMap,
+                                                          final 
OnRuleAlteredActionConfiguration ruleAlteredActionConfig) {
         DumperConfiguration result = new DumperConfiguration();
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(new 
StandardJDBCDataSourceConfiguration(YamlEngine.marshal(props)));
         result.setTableNameMap(tableMap);
+        if (null != ruleAlteredActionConfig) {
+            
result.setBlockQueueSize(ruleAlteredActionConfig.getBlockQueueSize());
+        }
         return result;
     }
     
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 11dedab..7e8ecc4 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -110,6 +110,7 @@ rules:
     default_scaling:
       blockQueueSize: 10000
       workerThread: 40
+      readBatchSize: 1000
       completionDetector:
         type: IDLE
         props:
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 59eaa4b..42d2df1 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -20,6 +20,11 @@ scaling:
   default_scaling:
     blockQueueSize: 10000
     workerThread: 40
+    readBatchSize: 1000
+    rateLimiter:
+      type: SOURCE
+      props:
+        qps: 50
     completionDetector:
       type: IDLE
       props:
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 20b954c..71dfd14 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -32,6 +32,10 @@ public final class OnRuleAlteredActionConfiguration {
     
     private final int workerThread;
     
+    private final int readBatchSize;
+    
+    private final ShardingSphereAlgorithmConfiguration rateLimiter;
+    
     private final ShardingSphereAlgorithmConfiguration completionDetector;
     
     private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index 456e4e9..b909fc4 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -33,6 +33,10 @@ public final class YamlOnRuleAlteredActionConfiguration 
implements YamlConfigura
     
     private int workerThread = 40;
     
+    private int readBatchSize = 1000;
+    
+    private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+    
     private YamlShardingSphereAlgorithmConfiguration completionDetector;
     
     private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 985567d..1e4b76a 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -34,6 +34,8 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
         YamlOnRuleAlteredActionConfiguration result = new 
YamlOnRuleAlteredActionConfiguration();
         result.setBlockQueueSize(data.getBlockQueueSize());
         result.setWorkerThread(data.getWorkerThread());
+        result.setReadBatchSize(data.getReadBatchSize());
+        
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
         
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
         
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
         return result;
@@ -41,7 +43,8 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
     
     @Override
     public OnRuleAlteredActionConfiguration swapToObject(final 
YamlOnRuleAlteredActionConfiguration yamlConfig) {
-        return new 
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(), 
yamlConfig.getWorkerThread(),
+        return new 
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(), 
yamlConfig.getWorkerThread(), yamlConfig.getReadBatchSize(),
+                
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()),
                 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
                 
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
     }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index b0d153b..866f12d 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -35,6 +35,11 @@ public final class 
OnRuleAlteredActionConfigurationYamlSwapperTest {
         YamlOnRuleAlteredActionConfiguration yamlConfig = new 
YamlOnRuleAlteredActionConfiguration();
         yamlConfig.setBlockQueueSize(1000);
         yamlConfig.setWorkerThread(20);
+        yamlConfig.setReadBatchSize(100);
+        Properties rateLimiterProps = new Properties();
+        rateLimiterProps.setProperty("batch-size", "1000");
+        rateLimiterProps.setProperty("qps", "50");
+        yamlConfig.setRateLimiter(new 
YamlShardingSphereAlgorithmConfiguration("SOURCE", rateLimiterProps));
         Properties completionDetectorProps = new Properties();
         
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold", 
"30");
         yamlConfig.setCompletionDetector(new 
YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index d7c4ac4..988ccc5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -37,6 +37,8 @@ public class DumperConfiguration {
     
     private JDBCDataSourceConfiguration dataSourceConfig;
     
+    private int blockQueueSize = 10000;
+    
     private IngestPosition<?> position;
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index a97ed8a..5c594af 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.api.config.ingest;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 
 /**
  * Inventory dumper configuration.
@@ -35,9 +36,14 @@ public final class InventoryDumperConfiguration extends 
DumperConfiguration {
     
     private Integer shardingItem;
     
+    private int readBatchSize = 1000;
+    
+    private JobRateLimitAlgorithm rateLimitAlgorithm;
+    
     public InventoryDumperConfiguration(final DumperConfiguration 
dumperConfig) {
         setDataSourceName(dumperConfig.getDataSourceName());
         setDataSourceConfig(dumperConfig.getDataSourceConfig());
         setTableNameMap(dumperConfig.getTableNameMap());
+        setBlockQueueSize(dumperConfig.getBlockQueueSize());
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
index b755260..848c495 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
@@ -32,14 +32,18 @@ import java.util.concurrent.BlockingQueue;
  */
 public final class MemoryChannel implements Channel {
     
-    // TODO persist blockQueueSize into registry center and transfer it here 
by construction
-    private final BlockingQueue<Record> queue = new 
ArrayBlockingQueue<>(10000);
+    private final BlockingQueue<Record> queue;
     
     private final AckCallback ackCallback;
     
     private final List<Record> toBeAcknowledgeRecords = new LinkedList<>();
     
     public MemoryChannel(final AckCallback ackCallback) {
+        this(10000, ackCallback);
+    }
+    
+    public MemoryChannel(final int blockQueueSize, final AckCallback 
ackCallback) {
+        this.queue = new ArrayBlockingQueue<>(blockQueueSize);
         this.ackCallback = ackCallback;
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
index 408485a..4d0e8f4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
@@ -30,11 +30,18 @@ import java.util.concurrent.BlockingQueue;
  */
 public final class BlockingQueueChannel extends AbstractBitSetChannel {
     
-    // TODO persist blockQueueSize into registry center and transfer it here 
by construction
-    private final BlockingQueue<Record> queue = new 
ArrayBlockingQueue<>(10000);
+    private final BlockingQueue<Record> queue;
     
     private long fetchedIndex;
     
+    public BlockingQueueChannel() {
+        this(10000);
+    }
+    
+    public BlockingQueueChannel(final int blockQueueSize) {
+        this.queue = new ArrayBlockingQueue<>(blockQueueSize);
+    }
+    
     @Override
     public void pushRecord(final Record dataRecord, final long index) throws 
InterruptedException {
         getManualBitSet().set(index);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
index b1208eb..4ad04ee 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
@@ -61,12 +61,12 @@ public final class DistributionChannel implements Channel {
     
     private ScheduledExecutorService scheduleAckRecordsExecutor;
     
-    public DistributionChannel(final int channelNumber, final AckCallback 
ackCallback) {
+    public DistributionChannel(final int channelNumber, final int 
blockQueueSize, final AckCallback ackCallback) {
         this.channelNumber = channelNumber;
         this.ackCallback = ackCallback;
         channels = new BitSetChannel[channelNumber];
         for (int i = 0; i < channelNumber; i++) {
-            channels[i] = new BlockingQueueChannel();
+            channels[i] = new BlockingQueueChannel(blockQueueSize);
         }
         scheduleAckRecords();
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 4777242..44b0deb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -37,6 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.MetaDataManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.StandardJDBCDataSourceConfiguration;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
@@ -46,6 +47,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.Optional;
 
 /**
  * Abstract JDBC dumper implement.
@@ -56,6 +58,10 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
     @Getter(AccessLevel.PROTECTED)
     private final InventoryDumperConfiguration inventoryDumperConfig;
     
+    private final int readBatchSize;
+    
+    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    
     private final DataSourceManager dataSourceManager;
     
     private final TableMetaData tableMetaData;
@@ -68,6 +74,8 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
             throw new UnsupportedOperationException("AbstractInventoryDumper 
only support StandardJDBCDataSourceConfiguration");
         }
         this.inventoryDumperConfig = inventoryDumperConfig;
+        this.readBatchSize = inventoryDumperConfig.getReadBatchSize();
+        this.rateLimitAlgorithm = 
inventoryDumperConfig.getRateLimitAlgorithm();
         this.dataSourceManager = dataSourceManager;
         tableMetaData = createTableMetaData();
     }
@@ -85,25 +93,15 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
     }
     
     private void dump() {
+        String sql = getDumpSQL();
+        IngestPosition<?> position = inventoryDumperConfig.getPosition();
+        log.info("inventory dump, sql={}, position={}", sql, position);
         try (Connection conn = 
dataSourceManager.getDataSource(inventoryDumperConfig.getDataSourceConfig()).getConnection())
 {
-            String sql = String.format("SELECT * FROM %s %s", 
inventoryDumperConfig.getTableName(), 
getWhereCondition(inventoryDumperConfig.getPrimaryKey(), 
inventoryDumperConfig.getPosition()));
-            log.info("inventory dump, sql={}", sql);
-            PreparedStatement ps = createPreparedStatement(conn, sql);
-            ResultSet rs = ps.executeQuery();
-            ResultSetMetaData metaData = rs.getMetaData();
-            int rowCount = 0;
-            while (isRunning() && rs.next()) {
-                DataRecord record = new DataRecord(newPosition(rs), 
metaData.getColumnCount());
-                record.setType(IngestDataChangeType.INSERT);
-                
record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
-                for (int i = 1; i <= metaData.getColumnCount(); i++) {
-                    record.addColumn(new Column(metaData.getColumnName(i), 
readValue(rs, i), true, tableMetaData.isPrimaryKey(i - 1)));
-                }
-                pushRecord(record);
-                rowCount++;
+            Number startPrimaryValue = getPositionBeginValue(position) - 1;
+            Optional<Number> maxPrimaryValue;
+            while ((maxPrimaryValue = dump0(conn, sql, 
startPrimaryValue)).isPresent()) {
+                startPrimaryValue = maxPrimaryValue.get();
             }
-            log.info("dump, rowCount={}", rowCount);
-            pushRecord(new FinishedRecord(new FinishedPosition()));
         } catch (final SQLException ex) {
             stop();
             channel.close();
@@ -113,12 +111,64 @@ public abstract class AbstractInventoryDumper extends 
AbstractLifecycleExecutor
         }
     }
     
-    private String getWhereCondition(final String primaryKey, final 
IngestPosition<?> position) {
-        if (null == primaryKey || null == position) {
-            return "";
+    private String getDumpSQL() {
+        String tableName = inventoryDumperConfig.getTableName();
+        String primaryKey = inventoryDumperConfig.getPrimaryKey();
+        return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ? 
AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?";
+    }
+    
+    private Optional<Number> dump0(final Connection conn, final String sql, 
final Number startPrimaryValue) throws SQLException {
+        if (null != rateLimitAlgorithm) {
+            rateLimitAlgorithm.onQuery();
+        }
+        try (PreparedStatement preparedStatement = 
createPreparedStatement(conn, sql)) {
+            preparedStatement.setObject(1, startPrimaryValue);
+            preparedStatement.setObject(2, 
getPositionEndValue(inventoryDumperConfig.getPosition()));
+            preparedStatement.setInt(3, readBatchSize);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                ResultSetMetaData metaData = resultSet.getMetaData();
+                int rowCount = 0;
+                Number maxPrimaryValue = null;
+                while (isRunning() && resultSet.next()) {
+                    DataRecord record = new DataRecord(newPosition(resultSet), 
metaData.getColumnCount());
+                    record.setType(IngestDataChangeType.INSERT);
+                    
record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
+                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                        boolean isPrimaryKey = tableMetaData.isPrimaryKey(i - 
1);
+                        Object value = readValue(resultSet, i);
+                        if (isPrimaryKey) {
+                            maxPrimaryValue = (Number) value;
+                        }
+                        record.addColumn(new Column(metaData.getColumnName(i), 
value, true, isPrimaryKey));
+                    }
+                    pushRecord(record);
+                    rowCount++;
+                }
+                log.info("dump, rowCount={}, maxPrimaryValue={}", rowCount, 
maxPrimaryValue);
+                pushRecord(new FinishedRecord(new FinishedPosition()));
+                return Optional.ofNullable(maxPrimaryValue);
+            }
+        }
+    }
+    
+    private long getPositionBeginValue(final IngestPosition<?> position) {
+        if (null == position) {
+            return 0;
+        }
+        if (!(position instanceof PrimaryKeyPosition)) {
+            return 0;
+        }
+        return ((PrimaryKeyPosition) position).getBeginValue();
+    }
+    
+    private long getPositionEndValue(final IngestPosition<?> position) {
+        if (null == position) {
+            return Integer.MAX_VALUE;
+        }
+        if (!(position instanceof PrimaryKeyPosition)) {
+            return Integer.MAX_VALUE;
         }
-        PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) position;
-        return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, 
primaryKeyPosition.getBeginValue(), primaryKeyPosition.getEndValue());
+        return ((PrimaryKeyPosition) position).getEndValue();
     }
     
     private IngestPosition<?> newPosition(final ResultSet rs) throws 
SQLException {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 42a9d70..e9820aa 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepare
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import 
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
@@ -63,7 +64,7 @@ public final class InventoryTaskSplitter {
      * @param importerExecuteEngine execute engine
      * @return split inventory data task
      */
-    // TODO remove jobContext, use init JobProgress -  sourceDatabaseType - 
batchSize
+    // TODO remove jobContext, use init JobProgress -  sourceDatabaseType - 
readBatchSize - rateLimitAlgorithm
     public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext 
jobContext, final TaskConfiguration taskConfig, final DataSourceManager 
dataSourceManager,
                                                   final ExecuteEngine 
importerExecuteEngine) {
         List<InventoryTask> result = new LinkedList<>();
@@ -98,6 +99,8 @@ public final class InventoryTaskSplitter {
     private Collection<InventoryDumperConfiguration> splitByPrimaryKey(
             final RuleAlteredJobContext jobContext, final DataSource 
dataSource, final MetaDataManager metaDataManager, final 
InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
+        int readBatchSize = 
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getReadBatchSize();
+        JobRateLimitAlgorithm rateLimitAlgorithm = 
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
         Collection<IngestPosition<?>> inventoryPositions = 
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataManager);
         int i = 0;
         for (IngestPosition<?> inventoryPosition : inventoryPositions) {
@@ -106,6 +109,8 @@ public final class InventoryTaskSplitter {
             splitDumperConfig.setShardingItem(i++);
             splitDumperConfig.setTableName(dumperConfig.getTableName());
             splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
+            splitDumperConfig.setReadBatchSize(readBatchSize);
+            splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
             result.add(splitDumperConfig);
         }
         return result;
@@ -171,9 +176,13 @@ public final class InventoryTaskSplitter {
                 ps.setLong(1, beginId);
                 ps.setLong(2, 
jobContext.getJobConfig().getHandleConfig().getShardingSize());
                 try (ResultSet rs = ps.executeQuery()) {
-                    rs.next();
+                    if (!rs.next()) {
+                        log.info("getPositionByPrimaryKeyRange, rs.next false, 
break");
+                        break;
+                    }
                     long endId = rs.getLong(1);
                     if (endId == 0) {
+                        log.info("getPositionByPrimaryKeyRange, endId is 0, 
break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getTableName(), 
dumperConfig.getPrimaryKey(), beginId);
                         break;
                     }
                     result.add(new PrimaryKeyPosition(beginId, endId));
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/SourceJobRateLimitAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/SourceJobRateLimitAlgorithm.java
new file mode 100644
index 0000000..d59919a
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/SourceJobRateLimitAlgorithm.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ratelimit;
+
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+
+import java.util.Properties;
+
+/**
+ * Source rule altered job rate limit algorithm for SPI.
+ */
+public final class SourceJobRateLimitAlgorithm implements 
JobRateLimitAlgorithm {
+    
+    private static final String QPS_KEY = "qps";
+    
+    private int qps = 50;
+    
+    private RateLimiter rateLimiter;
+    
+    @Getter
+    @Setter
+    private Properties props = new Properties();
+    
+    @Override
+    public void init() {
+        String qpsValue = props.getProperty(QPS_KEY);
+        if (!Strings.isNullOrEmpty(qpsValue)) {
+            qps = Integer.parseInt(qpsValue);
+        }
+        rateLimiter = RateLimiter.create(qps);
+    }
+    
+    @Override
+    public String getType() {
+        return "SOURCE";
+    }
+    
+    @Override
+    public void onQuery() {
+        rateLimiter.acquire();
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 8715af1..4eb1adf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -100,7 +100,7 @@ public final class IncrementalTask extends 
AbstractLifecycleExecutor implements
     }
     
     private void instanceChannel(final Collection<Importer> importers) {
-        DistributionChannel channel = new 
DistributionChannel(importers.size(), records -> {
+        DistributionChannel channel = new 
DistributionChannel(importers.size(), dumperConfig.getBlockQueueSize(), records 
-> {
             Record lastHandledRecord = records.get(records.size() - 1);
             if (!(lastHandledRecord.getPosition() instanceof 
PlaceholderPosition)) {
                 progress.setPosition(lastHandledRecord.getPosition());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 00d4079..04bea95 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -104,7 +104,7 @@ public final class InventoryTask extends 
AbstractLifecycleExecutor implements Pi
     }
     
     private void instanceChannel(final Importer importer) {
-        MemoryChannel channel = new MemoryChannel(records -> {
+        MemoryChannel channel = new 
MemoryChannel(inventoryDumperConfig.getReadBatchSize(), records -> {
             Optional<Record> record = records.stream().filter(each -> 
!(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
             record.ifPresent(value -> position = value.getPosition());
         });
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index d96d4ae..ea2483f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
@@ -37,6 +38,7 @@ import 
org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 public final class RuleAlteredContext {
     
     static {
+        ShardingSphereServiceLoader.register(JobRateLimitAlgorithm.class);
         
ShardingSphereServiceLoader.register(RuleAlteredJobCompletionDetectAlgorithm.class);
         
ShardingSphereServiceLoader.register(DataConsistencyCheckAlgorithm.class);
     }
@@ -45,6 +47,8 @@ public final class RuleAlteredContext {
     
     private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
     
+    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    
     private final RuleAlteredJobCompletionDetectAlgorithm 
completionDetectAlgorithm;
     
     private final DataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm;
@@ -57,6 +61,12 @@ public final class RuleAlteredContext {
     
     public RuleAlteredContext(final OnRuleAlteredActionConfiguration 
onRuleAlteredActionConfig) {
         this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
+        ShardingSphereAlgorithmConfiguration rateLimiter = 
onRuleAlteredActionConfig.getRateLimiter();
+        if (null != rateLimiter) {
+            rateLimitAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(rateLimiter, 
JobRateLimitAlgorithm.class);
+        } else {
+            rateLimitAlgorithm = null;
+        }
         ShardingSphereAlgorithmConfiguration completionDetector = 
onRuleAlteredActionConfig.getCompletionDetector();
         if (null != completionDetector) {
             completionDetectAlgorithm = 
ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector, 
RuleAlteredJobCompletionDetectAlgorithm.class);
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
similarity index 62%
copy from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
index 20b954c..4217509 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
@@ -15,24 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
 /**
- * On rule altered action configuration.
+ * Job rate limit algorithm, SPI.
  */
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm, 
ShardingSphereAlgorithmPostProcessor {
     
-    private final int blockQueueSize;
-    
-    private final int workerThread;
-    
-    private final ShardingSphereAlgorithmConfiguration completionDetector;
-    
-    private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+    /**
+     * Action before query.
+     */
+    void onQuery();
 }
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
similarity index 72%
copy from 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
index 59eaa4b..7c562ad 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
@@ -15,16 +15,4 @@
 # limitations under the License.
 #
 
-scalingName: default_scaling
-scaling:
-  default_scaling:
-    blockQueueSize: 10000
-    workerThread: 40
-    completionDetector:
-      type: IDLE
-      props:
-        incremental-task-idle-minute-threshold: 30
-    dataConsistencyChecker:
-      type: DATA_MATCH
-      props:
-        chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.core.ratelimit.SourceJobRateLimitAlgorithm
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index 585deeb..c22eb44 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -99,6 +99,11 @@
 #    default_scaling:
 #      blockQueueSize: 10000
 #      workerThread: 40
+#      readBatchSize: 1000
+#      rateLimiter:
+#        type: SOURCE
+#        props:
+#          qps: 50
 #      completionDetector:
 #        type: IDLE
 #        props:
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
index 0c5fd04..118de67 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
@@ -64,7 +64,7 @@ public final class DistributionChannelTest {
     private void execute(final AckCallback ackCallback, final int count, final 
Record... records) {
         CountDownLatch countDownLatch = new CountDownLatch(count);
         AtomicBoolean acknowledged = new AtomicBoolean();
-        DistributionChannel distributionChannel = new DistributionChannel(2, 
ackRecords -> {
+        DistributionChannel distributionChannel = new DistributionChannel(2, 
10000, ackRecords -> {
             ackCallback.onAck(ackRecords);
             acknowledged.set(true);
         });
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 4156684..652d658 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -19,9 +19,11 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -30,9 +32,10 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -58,23 +61,20 @@ public final class FinishedCheckJobTest {
     
     @Test
     public void assertExecuteAllDisabledJob() {
-        JobInfo jobInfo = new JobInfo(1L);
-        jobInfo.setActive(false);
-        List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+        Optional<Long> jobId = 
PipelineJobAPIFactory.getPipelineJobAPI().start(ResourceUtil.mockJobConfig());
+        assertTrue(jobId.isPresent());
+        List<JobInfo> jobInfos = 
PipelineJobAPIFactory.getPipelineJobAPI().list();
+        jobInfos.forEach(each -> each.setActive(false));
         when(pipelineJobAPI.list()).thenReturn(jobInfos);
         finishedCheckJob.execute(null);
     }
     
     @Test
     public void assertExecuteActiveJob() {
-        JobInfo jobInfo = new JobInfo(1L);
-        jobInfo.setActive(true);
-        jobInfo.setJobParameter("handleConfig:\n"
-                + "  concurrency: 2\n"
-                + "  shardingTables:\n"
-                + "  - ds_0.t_order_$->{0..1}\n"
-                + "ruleConfig:\n");
-        List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+        Optional<Long> jobId = 
PipelineJobAPIFactory.getPipelineJobAPI().start(ResourceUtil.mockJobConfig());
+        assertTrue(jobId.isPresent());
+        List<JobInfo> jobInfos = 
PipelineJobAPIFactory.getPipelineJobAPI().list();
+        jobInfos.forEach(each -> each.setActive(true));
         when(pipelineJobAPI.list()).thenReturn(jobInfos);
         finishedCheckJob.execute(null);
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index f98801c..93873df 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -21,6 +21,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
@@ -50,6 +52,11 @@ public final class InventoryTaskTest {
     public void assertStartWithGetEstimatedRowsFailure() {
         InventoryDumperConfiguration inventoryDumperConfig = new 
InventoryDumperConfiguration(taskConfig.getDumperConfig());
         inventoryDumperConfig.setTableName("t_non_exist");
+        IngestPosition<?> position = 
taskConfig.getDumperConfig().getPosition();
+        if (null == position) {
+            position = new PrimaryKeyPosition(0, 1000);
+        }
+        inventoryDumperConfig.setPosition(position);
         try (InventoryTask inventoryTask = new 
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), 
RuleAlteredContextUtil.getExecuteEngine())) {
             inventoryTask.start();
         }
@@ -60,7 +67,11 @@ public final class InventoryTaskTest {
         initTableData(taskConfig.getDumperConfig());
         InventoryDumperConfiguration inventoryDumperConfig = new 
InventoryDumperConfiguration(taskConfig.getDumperConfig());
         inventoryDumperConfig.setTableName("t_order");
-        
inventoryDumperConfig.setPosition(taskConfig.getDumperConfig().getPosition());
+        IngestPosition<?> position = 
taskConfig.getDumperConfig().getPosition();
+        if (null == position) {
+            position = new PrimaryKeyPosition(0, 1000);
+        }
+        inventoryDumperConfig.setPosition(position);
         try (InventoryTask inventoryTask = new 
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), 
RuleAlteredContextUtil.getExecuteEngine())) {
             inventoryTask.start();
             assertFalse(inventoryTask.getProgress().getPosition() instanceof 
FinishedPosition);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
index e4631c9..8861e48 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
@@ -48,12 +48,17 @@ rules:
       type: INLINE
       props:
         algorithm-expression: t_order
+  scalingName: default_scaling
   scaling:
     default_scaling:
-      blockQueueSize: 1000
+      blockQueueSize: 10000
+      workerThread: 40
+      readBatchSize: 1000
+      rateLimiter:
+        type: SOURCE
+        props:
+          qps: 50
       completionDetector:
         type: FIXTURE
       dataConsistencyChecker:
         type: FIXTURE
-      workerThread: 5
-  scalingName: default_scaling
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
index 5ef9d7c..335f438 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
@@ -51,12 +51,17 @@ rules:
       type: INLINE
       props:
         algorithm-expression: t_order_$->{order_id % 2}
+  scalingName: default_scaling
   scaling:
     default_scaling:
-      blockQueueSize: 1000
+      blockQueueSize: 10000
+      workerThread: 40
+      readBatchSize: 1000
+      rateLimiter:
+        type: SOURCE
+        props:
+          qps: 50
       completionDetector:
         type: FIXTURE
       dataConsistencyChecker:
         type: FIXTURE
-      workerThread: 5
-  scalingName: default_scaling

Reply via email to