This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f8e48c8 Add rateLimiter SPI and default implementation, add
readBatchSize and integrate with inventory dumper, refactor blockQueueSize
integration (#14215)
f8e48c8 is described below
commit f8e48c8be63adac2b7a2a25d9e6723703dd84311
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 22 13:45:54 2021 +0800
Add rateLimiter SPI and default implementation, add readBatchSize and
integrate with inventory dumper, refactor blockQueueSize integration (#14215)
---
.../resources/yaml/encrypt-dataConverters.yaml | 5 ++
...hardingRuleAlteredJobConfigurationPreparer.java | 14 +++-
.../src/test/resources/yaml/sharding-rule.yaml | 1 +
.../src/test/resources/yaml/sharding-scaling.yaml | 5 ++
.../OnRuleAlteredActionConfiguration.java | 4 +
.../YamlOnRuleAlteredActionConfiguration.java | 4 +
...nRuleAlteredActionConfigurationYamlSwapper.java | 5 +-
...eAlteredActionConfigurationYamlSwapperTest.java | 5 ++
.../api/config/ingest/DumperConfiguration.java | 2 +
.../ingest/InventoryDumperConfiguration.java | 6 ++
.../core/ingest/channel/MemoryChannel.java | 8 +-
.../channel/distribution/BlockingQueueChannel.java | 11 ++-
.../channel/distribution/DistributionChannel.java | 4 +-
.../ingest/dumper/AbstractInventoryDumper.java | 94 +++++++++++++++++-----
.../core/prepare/InventoryTaskSplitter.java | 13 ++-
.../ratelimit/SourceJobRateLimitAlgorithm.java | 61 ++++++++++++++
.../data/pipeline/core/task/IncrementalTask.java | 2 +-
.../data/pipeline/core/task/InventoryTask.java | 2 +-
.../scenario/rulealtered/RuleAlteredContext.java | 10 +++
.../spi/ratelimit/JobRateLimitAlgorithm.java | 24 +++---
...ta.pipeline.spi.ratelimit.JobRateLimitAlgorithm | 14 +---
.../src/main/resources/conf/config-sharding.yaml | 5 ++
.../distribution/DistributionChannelTest.java | 2 +-
.../pipeline/core/job/FinishedCheckJobTest.java | 24 +++---
.../data/pipeline/core/task/InventoryTaskTest.java | 13 ++-
.../config_sharding_sphere_jdbc_source.yaml | 11 ++-
.../config_sharding_sphere_jdbc_target.yaml | 11 ++-
27 files changed, 277 insertions(+), 83 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index c884cb9..eb06d0c 100644
---
a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++
b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -20,6 +20,11 @@ dataConverters:
default_convert:
blockQueueSize: 10000
workerThread: 40
+ readBatchSize: 1000
+ rateLimiter:
+ type: SOURCE
+ props:
+ qps: 50
completionDetector:
type: IDLE
props:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
index 46382ec..d463309 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.ShardingSphereJDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.StandardJDBCDataSourceConfiguration;
+import
org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
@@ -135,7 +136,8 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
filterByShardingDataSourceTables(dataSourceTableNameMap, handleConfig);
Map<String, Set<String>> shardingColumnsMap =
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig));
for (Entry<String, Map<String, String>> entry :
dataSourceTableNameMap.entrySet()) {
- DumperConfiguration dumperConfig =
createDumperConfig(entry.getKey(),
sourceDataSource.get(entry.getKey()).getProps(), entry.getValue());
+ OnRuleAlteredActionConfiguration ruleAlteredActionConfig =
getRuleAlteredActionConfig(targetRuleConfig.orElse(sourceRuleConfig)).orElse(null);
+ DumperConfiguration dumperConfig =
createDumperConfig(entry.getKey(),
sourceDataSource.get(entry.getKey()).getProps(), entry.getValue(),
ruleAlteredActionConfig);
ImporterConfiguration importerConfig =
createImporterConfig(ruleConfig, handleConfig, shardingColumnsMap);
TaskConfiguration taskConfig = new TaskConfiguration(handleConfig,
dumperConfig, importerConfig);
log.info("toTaskConfigs, dataSourceName={}, taskConfig={}",
entry.getKey(), taskConfig);
@@ -144,6 +146,10 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return result;
}
+ private Optional<OnRuleAlteredActionConfiguration>
getRuleAlteredActionConfig(final ShardingRuleConfiguration shardingRuleConfig) {
+ return
Optional.ofNullable(shardingRuleConfig.getScaling().get(shardingRuleConfig.getScalingName()));
+ }
+
private static ShardingSphereJDBCDataSourceConfiguration
getSourceConfiguration(final RuleConfiguration ruleConfig) {
JDBCDataSourceConfiguration result = ruleConfig.getSource().unwrap();
Preconditions.checkArgument(result instanceof
ShardingSphereJDBCDataSourceConfiguration, "Only support ShardingSphere source
data source.");
@@ -261,11 +267,15 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return Collections.emptySet();
}
- private static DumperConfiguration createDumperConfig(final String
dataSourceName, final Map<String, Object> props, final Map<String, String>
tableMap) {
+ private static DumperConfiguration createDumperConfig(final String
dataSourceName, final Map<String, Object> props, final Map<String, String>
tableMap,
+ final
OnRuleAlteredActionConfiguration ruleAlteredActionConfig) {
DumperConfiguration result = new DumperConfiguration();
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(new
StandardJDBCDataSourceConfiguration(YamlEngine.marshal(props)));
result.setTableNameMap(tableMap);
+ if (null != ruleAlteredActionConfig) {
+
result.setBlockQueueSize(ruleAlteredActionConfig.getBlockQueueSize());
+ }
return result;
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 11dedab..7e8ecc4 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -110,6 +110,7 @@ rules:
default_scaling:
blockQueueSize: 10000
workerThread: 40
+ readBatchSize: 1000
completionDetector:
type: IDLE
props:
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index 59eaa4b..42d2df1 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -20,6 +20,11 @@ scaling:
default_scaling:
blockQueueSize: 10000
workerThread: 40
+ readBatchSize: 1000
+ rateLimiter:
+ type: SOURCE
+ props:
+ qps: 50
completionDetector:
type: IDLE
props:
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 20b954c..71dfd14 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -32,6 +32,10 @@ public final class OnRuleAlteredActionConfiguration {
private final int workerThread;
+ private final int readBatchSize;
+
+ private final ShardingSphereAlgorithmConfiguration rateLimiter;
+
private final ShardingSphereAlgorithmConfiguration completionDetector;
private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index 456e4e9..b909fc4 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -33,6 +33,10 @@ public final class YamlOnRuleAlteredActionConfiguration
implements YamlConfigura
private int workerThread = 40;
+ private int readBatchSize = 1000;
+
+ private YamlShardingSphereAlgorithmConfiguration rateLimiter;
+
private YamlShardingSphereAlgorithmConfiguration completionDetector;
private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 985567d..1e4b76a 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -34,6 +34,8 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
YamlOnRuleAlteredActionConfiguration result = new
YamlOnRuleAlteredActionConfiguration();
result.setBlockQueueSize(data.getBlockQueueSize());
result.setWorkerThread(data.getWorkerThread());
+ result.setReadBatchSize(data.getReadBatchSize());
+
result.setRateLimiter(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
return result;
@@ -41,7 +43,8 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
@Override
public OnRuleAlteredActionConfiguration swapToObject(final
YamlOnRuleAlteredActionConfiguration yamlConfig) {
- return new
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
yamlConfig.getWorkerThread(),
+ return new
OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
yamlConfig.getWorkerThread(), yamlConfig.getReadBatchSize(),
+
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getRateLimiter()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index b0d153b..866f12d 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -35,6 +35,11 @@ public final class
OnRuleAlteredActionConfigurationYamlSwapperTest {
YamlOnRuleAlteredActionConfiguration yamlConfig = new
YamlOnRuleAlteredActionConfiguration();
yamlConfig.setBlockQueueSize(1000);
yamlConfig.setWorkerThread(20);
+ yamlConfig.setReadBatchSize(100);
+ Properties rateLimiterProps = new Properties();
+ rateLimiterProps.setProperty("batch-size", "1000");
+ rateLimiterProps.setProperty("qps", "50");
+ yamlConfig.setRateLimiter(new
YamlShardingSphereAlgorithmConfiguration("SOURCE", rateLimiterProps));
Properties completionDetectorProps = new Properties();
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold",
"30");
yamlConfig.setCompletionDetector(new
YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index d7c4ac4..988ccc5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -37,6 +37,8 @@ public class DumperConfiguration {
private JDBCDataSourceConfiguration dataSourceConfig;
+ private int blockQueueSize = 10000;
+
private IngestPosition<?> position;
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index a97ed8a..5c594af 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.api.config.ingest;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
/**
* Inventory dumper configuration.
@@ -35,9 +36,14 @@ public final class InventoryDumperConfiguration extends
DumperConfiguration {
private Integer shardingItem;
+ private int readBatchSize = 1000;
+
+ private JobRateLimitAlgorithm rateLimitAlgorithm;
+
public InventoryDumperConfiguration(final DumperConfiguration
dumperConfig) {
setDataSourceName(dumperConfig.getDataSourceName());
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
+ setBlockQueueSize(dumperConfig.getBlockQueueSize());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
index b755260..848c495 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/MemoryChannel.java
@@ -32,14 +32,18 @@ import java.util.concurrent.BlockingQueue;
*/
public final class MemoryChannel implements Channel {
- // TODO persist blockQueueSize into registry center and transfer it here
by construction
- private final BlockingQueue<Record> queue = new
ArrayBlockingQueue<>(10000);
+ private final BlockingQueue<Record> queue;
private final AckCallback ackCallback;
private final List<Record> toBeAcknowledgeRecords = new LinkedList<>();
public MemoryChannel(final AckCallback ackCallback) {
+ this(10000, ackCallback);
+ }
+
+ public MemoryChannel(final int blockQueueSize, final AckCallback
ackCallback) {
+ this.queue = new ArrayBlockingQueue<>(blockQueueSize);
this.ackCallback = ackCallback;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
index 408485a..4d0e8f4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
@@ -30,11 +30,18 @@ import java.util.concurrent.BlockingQueue;
*/
public final class BlockingQueueChannel extends AbstractBitSetChannel {
- // TODO persist blockQueueSize into registry center and transfer it here
by construction
- private final BlockingQueue<Record> queue = new
ArrayBlockingQueue<>(10000);
+ private final BlockingQueue<Record> queue;
private long fetchedIndex;
+ public BlockingQueueChannel() {
+ this(10000);
+ }
+
+ public BlockingQueueChannel(final int blockQueueSize) {
+ this.queue = new ArrayBlockingQueue<>(blockQueueSize);
+ }
+
@Override
public void pushRecord(final Record dataRecord, final long index) throws
InterruptedException {
getManualBitSet().set(index);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
index b1208eb..4ad04ee 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannel.java
@@ -61,12 +61,12 @@ public final class DistributionChannel implements Channel {
private ScheduledExecutorService scheduleAckRecordsExecutor;
- public DistributionChannel(final int channelNumber, final AckCallback
ackCallback) {
+ public DistributionChannel(final int channelNumber, final int
blockQueueSize, final AckCallback ackCallback) {
this.channelNumber = channelNumber;
this.ackCallback = ackCallback;
channels = new BitSetChannel[channelNumber];
for (int i = 0; i < channelNumber; i++) {
- channels[i] = new BlockingQueueChannel();
+ channels[i] = new BlockingQueueChannel(blockQueueSize);
}
scheduleAckRecords();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 4777242..44b0deb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -37,6 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.MetaDataManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.JDBCDataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.jdbc.config.impl.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
@@ -46,6 +47,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.Optional;
/**
* Abstract JDBC dumper implement.
@@ -56,6 +58,10 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
@Getter(AccessLevel.PROTECTED)
private final InventoryDumperConfiguration inventoryDumperConfig;
+ private final int readBatchSize;
+
+ private final JobRateLimitAlgorithm rateLimitAlgorithm;
+
private final DataSourceManager dataSourceManager;
private final TableMetaData tableMetaData;
@@ -68,6 +74,8 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
throw new UnsupportedOperationException("AbstractInventoryDumper
only support StandardJDBCDataSourceConfiguration");
}
this.inventoryDumperConfig = inventoryDumperConfig;
+ this.readBatchSize = inventoryDumperConfig.getReadBatchSize();
+ this.rateLimitAlgorithm =
inventoryDumperConfig.getRateLimitAlgorithm();
this.dataSourceManager = dataSourceManager;
tableMetaData = createTableMetaData();
}
@@ -85,25 +93,15 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
}
private void dump() {
+ String sql = getDumpSQL();
+ IngestPosition<?> position = inventoryDumperConfig.getPosition();
+ log.info("inventory dump, sql={}, position={}", sql, position);
try (Connection conn =
dataSourceManager.getDataSource(inventoryDumperConfig.getDataSourceConfig()).getConnection())
{
- String sql = String.format("SELECT * FROM %s %s",
inventoryDumperConfig.getTableName(),
getWhereCondition(inventoryDumperConfig.getPrimaryKey(),
inventoryDumperConfig.getPosition()));
- log.info("inventory dump, sql={}", sql);
- PreparedStatement ps = createPreparedStatement(conn, sql);
- ResultSet rs = ps.executeQuery();
- ResultSetMetaData metaData = rs.getMetaData();
- int rowCount = 0;
- while (isRunning() && rs.next()) {
- DataRecord record = new DataRecord(newPosition(rs),
metaData.getColumnCount());
- record.setType(IngestDataChangeType.INSERT);
-
record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- record.addColumn(new Column(metaData.getColumnName(i),
readValue(rs, i), true, tableMetaData.isPrimaryKey(i - 1)));
- }
- pushRecord(record);
- rowCount++;
+ Number startPrimaryValue = getPositionBeginValue(position) - 1;
+ Optional<Number> maxPrimaryValue;
+ while ((maxPrimaryValue = dump0(conn, sql,
startPrimaryValue)).isPresent()) {
+ startPrimaryValue = maxPrimaryValue.get();
}
- log.info("dump, rowCount={}", rowCount);
- pushRecord(new FinishedRecord(new FinishedPosition()));
} catch (final SQLException ex) {
stop();
channel.close();
@@ -113,12 +111,64 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
}
}
- private String getWhereCondition(final String primaryKey, final
IngestPosition<?> position) {
- if (null == primaryKey || null == position) {
- return "";
+ private String getDumpSQL() {
+ String tableName = inventoryDumperConfig.getTableName();
+ String primaryKey = inventoryDumperConfig.getPrimaryKey();
+ return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ?
AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?";
+ }
+
+ private Optional<Number> dump0(final Connection conn, final String sql,
final Number startPrimaryValue) throws SQLException {
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.onQuery();
+ }
+ try (PreparedStatement preparedStatement =
createPreparedStatement(conn, sql)) {
+ preparedStatement.setObject(1, startPrimaryValue);
+ preparedStatement.setObject(2,
getPositionEndValue(inventoryDumperConfig.getPosition()));
+ preparedStatement.setInt(3, readBatchSize);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ int rowCount = 0;
+ Number maxPrimaryValue = null;
+ while (isRunning() && resultSet.next()) {
+ DataRecord record = new DataRecord(newPosition(resultSet),
metaData.getColumnCount());
+ record.setType(IngestDataChangeType.INSERT);
+
record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
+ for (int i = 1; i <= metaData.getColumnCount(); i++) {
+ boolean isPrimaryKey = tableMetaData.isPrimaryKey(i -
1);
+ Object value = readValue(resultSet, i);
+ if (isPrimaryKey) {
+ maxPrimaryValue = (Number) value;
+ }
+ record.addColumn(new Column(metaData.getColumnName(i),
value, true, isPrimaryKey));
+ }
+ pushRecord(record);
+ rowCount++;
+ }
+ log.info("dump, rowCount={}, maxPrimaryValue={}", rowCount,
maxPrimaryValue);
+ pushRecord(new FinishedRecord(new FinishedPosition()));
+ return Optional.ofNullable(maxPrimaryValue);
+ }
+ }
+ }
+
+ private long getPositionBeginValue(final IngestPosition<?> position) {
+ if (null == position) {
+ return 0;
+ }
+ if (!(position instanceof PrimaryKeyPosition)) {
+ return 0;
+ }
+ return ((PrimaryKeyPosition) position).getBeginValue();
+ }
+
+ private long getPositionEndValue(final IngestPosition<?> position) {
+ if (null == position) {
+ return Integer.MAX_VALUE;
+ }
+ if (!(position instanceof PrimaryKeyPosition)) {
+ return Integer.MAX_VALUE;
}
- PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) position;
- return String.format("WHERE %s BETWEEN %d AND %d", primaryKey,
primaryKeyPosition.getBeginValue(), primaryKeyPosition.getEndValue());
+ return ((PrimaryKeyPosition) position).getEndValue();
}
private IngestPosition<?> newPosition(final ResultSet rs) throws
SQLException {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 42a9d70..e9820aa 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepare
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
@@ -63,7 +64,7 @@ public final class InventoryTaskSplitter {
* @param importerExecuteEngine execute engine
* @return split inventory data task
*/
- // TODO remove jobContext, use init JobProgress - sourceDatabaseType -
batchSize
+ // TODO remove jobContext, use init JobProgress - sourceDatabaseType -
readBatchSize - rateLimitAlgorithm
public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext
jobContext, final TaskConfiguration taskConfig, final DataSourceManager
dataSourceManager,
final ExecuteEngine
importerExecuteEngine) {
List<InventoryTask> result = new LinkedList<>();
@@ -98,6 +99,8 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperConfiguration> splitByPrimaryKey(
final RuleAlteredJobContext jobContext, final DataSource
dataSource, final MetaDataManager metaDataManager, final
InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
+ int readBatchSize =
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getReadBatchSize();
+ JobRateLimitAlgorithm rateLimitAlgorithm =
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
Collection<IngestPosition<?>> inventoryPositions =
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataManager);
int i = 0;
for (IngestPosition<?> inventoryPosition : inventoryPositions) {
@@ -106,6 +109,8 @@ public final class InventoryTaskSplitter {
splitDumperConfig.setShardingItem(i++);
splitDumperConfig.setTableName(dumperConfig.getTableName());
splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
+ splitDumperConfig.setReadBatchSize(readBatchSize);
+ splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
result.add(splitDumperConfig);
}
return result;
@@ -171,9 +176,13 @@ public final class InventoryTaskSplitter {
ps.setLong(1, beginId);
ps.setLong(2,
jobContext.getJobConfig().getHandleConfig().getShardingSize());
try (ResultSet rs = ps.executeQuery()) {
- rs.next();
+ if (!rs.next()) {
+ log.info("getPositionByPrimaryKeyRange, rs.next false,
break");
+ break;
+ }
long endId = rs.getLong(1);
if (endId == 0) {
+ log.info("getPositionByPrimaryKeyRange, endId is 0,
break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getTableName(),
dumperConfig.getPrimaryKey(), beginId);
break;
}
result.add(new PrimaryKeyPosition(beginId, endId));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/SourceJobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/SourceJobRateLimitAlgorithm.java
new file mode 100644
index 0000000..d59919a
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ratelimit/SourceJobRateLimitAlgorithm.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ratelimit;
+
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+
+import java.util.Properties;
+
+/**
+ * Source rule altered job rate limit algorithm for SPI.
+ */
+public final class SourceJobRateLimitAlgorithm implements
JobRateLimitAlgorithm {
+
+ private static final String QPS_KEY = "qps";
+
+ private int qps = 50;
+
+ private RateLimiter rateLimiter;
+
+ @Getter
+ @Setter
+ private Properties props = new Properties();
+
+ @Override
+ public void init() {
+ String qpsValue = props.getProperty(QPS_KEY);
+ if (!Strings.isNullOrEmpty(qpsValue)) {
+ qps = Integer.parseInt(qpsValue);
+ }
+ rateLimiter = RateLimiter.create(qps);
+ }
+
+ @Override
+ public String getType() {
+ return "SOURCE";
+ }
+
+ @Override
+ public void onQuery() {
+ rateLimiter.acquire();
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 8715af1..4eb1adf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -100,7 +100,7 @@ public final class IncrementalTask extends
AbstractLifecycleExecutor implements
}
private void instanceChannel(final Collection<Importer> importers) {
- DistributionChannel channel = new
DistributionChannel(importers.size(), records -> {
+ DistributionChannel channel = new
DistributionChannel(importers.size(), dumperConfig.getBlockQueueSize(), records
-> {
Record lastHandledRecord = records.get(records.size() - 1);
if (!(lastHandledRecord.getPosition() instanceof
PlaceholderPosition)) {
progress.setPosition(lastHandledRecord.getPosition());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 00d4079..04bea95 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -104,7 +104,7 @@ public final class InventoryTask extends
AbstractLifecycleExecutor implements Pi
}
private void instanceChannel(final Importer importer) {
- MemoryChannel channel = new MemoryChannel(records -> {
+ MemoryChannel channel = new
MemoryChannel(inventoryDumperConfig.getReadBatchSize(), records -> {
Optional<Record> record = records.stream().filter(each ->
!(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
record.ifPresent(value -> position = value.getPosition());
});
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index d96d4ae..ea2483f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
@@ -37,6 +38,7 @@ import
org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
public final class RuleAlteredContext {
static {
+ ShardingSphereServiceLoader.register(JobRateLimitAlgorithm.class);
ShardingSphereServiceLoader.register(RuleAlteredJobCompletionDetectAlgorithm.class);
ShardingSphereServiceLoader.register(DataConsistencyCheckAlgorithm.class);
}
@@ -45,6 +47,8 @@ public final class RuleAlteredContext {
private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
+ private final JobRateLimitAlgorithm rateLimitAlgorithm;
+
private final RuleAlteredJobCompletionDetectAlgorithm
completionDetectAlgorithm;
private final DataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm;
@@ -57,6 +61,12 @@ public final class RuleAlteredContext {
public RuleAlteredContext(final OnRuleAlteredActionConfiguration
onRuleAlteredActionConfig) {
this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
+ ShardingSphereAlgorithmConfiguration rateLimiter =
onRuleAlteredActionConfig.getRateLimiter();
+ if (null != rateLimiter) {
+ rateLimitAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(rateLimiter,
JobRateLimitAlgorithm.class);
+ } else {
+ rateLimitAlgorithm = null;
+ }
ShardingSphereAlgorithmConfiguration completionDetector =
onRuleAlteredActionConfig.getCompletionDetector();
if (null != completionDetector) {
completionDetectAlgorithm =
ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector,
RuleAlteredJobCompletionDetectAlgorithm.class);
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
similarity index 62%
copy from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
index 20b954c..4217509 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
@@ -15,24 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.config.rulealtered;
+package org.apache.shardingsphere.data.pipeline.spi.ratelimit;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
/**
- * On rule altered action configuration.
+ * Job rate limit algorithm, SPI.
*/
-@RequiredArgsConstructor
-@Getter
-public final class OnRuleAlteredActionConfiguration {
+public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm,
ShardingSphereAlgorithmPostProcessor {
- private final int blockQueueSize;
-
- private final int workerThread;
-
- private final ShardingSphereAlgorithmConfiguration completionDetector;
-
- private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
+ /**
+ * Action before query.
+ */
+ void onQuery();
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
similarity index 72%
copy from
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
index 59eaa4b..7c562ad 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm
@@ -15,16 +15,4 @@
# limitations under the License.
#
-scalingName: default_scaling
-scaling:
- default_scaling:
- blockQueueSize: 10000
- workerThread: 40
- completionDetector:
- type: IDLE
- props:
- incremental-task-idle-minute-threshold: 30
- dataConsistencyChecker:
- type: DATA_MATCH
- props:
- chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.core.ratelimit.SourceJobRateLimitAlgorithm
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index 585deeb..c22eb44 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -99,6 +99,11 @@
# default_scaling:
# blockQueueSize: 10000
# workerThread: 40
+# readBatchSize: 1000
+# rateLimiter:
+# type: SOURCE
+# props:
+# qps: 50
# completionDetector:
# type: IDLE
# props:
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
index 0c5fd04..118de67 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/DistributionChannelTest.java
@@ -64,7 +64,7 @@ public final class DistributionChannelTest {
private void execute(final AckCallback ackCallback, final int count, final
Record... records) {
CountDownLatch countDownLatch = new CountDownLatch(count);
AtomicBoolean acknowledged = new AtomicBoolean();
- DistributionChannel distributionChannel = new DistributionChannel(2,
ackRecords -> {
+ DistributionChannel distributionChannel = new DistributionChannel(2,
10000, ackRecords -> {
ackCallback.onAck(ackRecords);
acknowledged.set(true);
});
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 4156684..652d658 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -19,9 +19,11 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
import
org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -30,9 +32,10 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -58,23 +61,20 @@ public final class FinishedCheckJobTest {
@Test
public void assertExecuteAllDisabledJob() {
- JobInfo jobInfo = new JobInfo(1L);
- jobInfo.setActive(false);
- List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+ Optional<Long> jobId =
PipelineJobAPIFactory.getPipelineJobAPI().start(ResourceUtil.mockJobConfig());
+ assertTrue(jobId.isPresent());
+ List<JobInfo> jobInfos =
PipelineJobAPIFactory.getPipelineJobAPI().list();
+ jobInfos.forEach(each -> each.setActive(false));
when(pipelineJobAPI.list()).thenReturn(jobInfos);
finishedCheckJob.execute(null);
}
@Test
public void assertExecuteActiveJob() {
- JobInfo jobInfo = new JobInfo(1L);
- jobInfo.setActive(true);
- jobInfo.setJobParameter("handleConfig:\n"
- + " concurrency: 2\n"
- + " shardingTables:\n"
- + " - ds_0.t_order_$->{0..1}\n"
- + "ruleConfig:\n");
- List<JobInfo> jobInfos = Collections.singletonList(jobInfo);
+ Optional<Long> jobId =
PipelineJobAPIFactory.getPipelineJobAPI().start(ResourceUtil.mockJobConfig());
+ assertTrue(jobId.isPresent());
+ List<JobInfo> jobInfos =
PipelineJobAPIFactory.getPipelineJobAPI().list();
+ jobInfos.forEach(each -> each.setActive(true));
when(pipelineJobAPI.list()).thenReturn(jobInfos);
finishedCheckJob.execute(null);
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index f98801c..93873df 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -21,6 +21,8 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
@@ -50,6 +52,11 @@ public final class InventoryTaskTest {
public void assertStartWithGetEstimatedRowsFailure() {
InventoryDumperConfiguration inventoryDumperConfig = new
InventoryDumperConfiguration(taskConfig.getDumperConfig());
inventoryDumperConfig.setTableName("t_non_exist");
+ IngestPosition<?> position =
taskConfig.getDumperConfig().getPosition();
+ if (null == position) {
+ position = new PrimaryKeyPosition(0, 1000);
+ }
+ inventoryDumperConfig.setPosition(position);
try (InventoryTask inventoryTask = new
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
RuleAlteredContextUtil.getExecuteEngine())) {
inventoryTask.start();
}
@@ -60,7 +67,11 @@ public final class InventoryTaskTest {
initTableData(taskConfig.getDumperConfig());
InventoryDumperConfiguration inventoryDumperConfig = new
InventoryDumperConfiguration(taskConfig.getDumperConfig());
inventoryDumperConfig.setTableName("t_order");
-
inventoryDumperConfig.setPosition(taskConfig.getDumperConfig().getPosition());
+ IngestPosition<?> position =
taskConfig.getDumperConfig().getPosition();
+ if (null == position) {
+ position = new PrimaryKeyPosition(0, 1000);
+ }
+ inventoryDumperConfig.setPosition(position);
try (InventoryTask inventoryTask = new
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
RuleAlteredContextUtil.getExecuteEngine())) {
inventoryTask.start();
assertFalse(inventoryTask.getProgress().getPosition() instanceof
FinishedPosition);
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
index e4631c9..8861e48 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
@@ -48,12 +48,17 @@ rules:
type: INLINE
props:
algorithm-expression: t_order
+ scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 1000
+ blockQueueSize: 10000
+ workerThread: 40
+ readBatchSize: 1000
+ rateLimiter:
+ type: SOURCE
+ props:
+ qps: 50
completionDetector:
type: FIXTURE
dataConsistencyChecker:
type: FIXTURE
- workerThread: 5
- scalingName: default_scaling
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
index 5ef9d7c..335f438 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
@@ -51,12 +51,17 @@ rules:
type: INLINE
props:
algorithm-expression: t_order_$->{order_id % 2}
+ scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 1000
+ blockQueueSize: 10000
+ workerThread: 40
+ readBatchSize: 1000
+ rateLimiter:
+ type: SOURCE
+ props:
+ qps: 50
completionDetector:
type: FIXTURE
dataConsistencyChecker:
type: FIXTURE
- workerThread: 5
- scalingName: default_scaling