This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ced99955c59 Enable pipeline TPS rate limiter (#20537)
ced99955c59 is described below
commit ced99955c59c6d72c19d93e43f1388fceeb35820
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 26 15:12:17 2022 +0800
Enable pipeline TPS rate limiter (#20537)
* Enable pipeline TPS rate limiter
* Fix codestyle
---
.../data/pipeline/api/config/ImporterConfiguration.java | 3 +++
.../data/pipeline/api/context/PipelineProcessContext.java | 7 +++++++
.../data/pipeline/core/importer/DefaultImporter.java | 14 ++++++++++++++
.../core/spi/ratelimit/TPSJobRateLimitAlgorithm.java | 3 +--
.../pipeline/scenario/migration/MigrationJobAPIImpl.java | 5 ++++-
.../data/pipeline/core/importer/DefaultImporterTest.java | 2 +-
.../pipeline/core/importer/ImporterCreatorFactoryTest.java | 2 +-
.../data/pipeline/core/util/PipelineContextUtil.java | 3 +--
8 files changed, 32 insertions(+), 7 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
index d451719ecf2..700c0e5348c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
@@ -24,6 +24,7 @@ import lombok.ToString;
import org.apache.commons.lang3.ObjectUtils;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import java.util.Collection;
@@ -51,6 +52,8 @@ public final class ImporterConfiguration {
private final int batchSize;
+ private final JobRateLimitAlgorithm rateLimitAlgorithm;
+
private final int retryTimes;
private final int concurrency;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index ab015372778..39a166061d0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -46,4 +46,11 @@ public interface PipelineProcessContext {
* @return job read rate limit algorithm
*/
JobRateLimitAlgorithm getReadRateLimitAlgorithm();
+
+ /**
+ * Get job write rate limit algorithm.
+ *
+ * @return job write rate limit algorithm
+ */
+ JobRateLimitAlgorithm getWriteRateLimitAlgorithm();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 0b172372839..332953b7f6d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -37,6 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import javax.sql.DataSource;
@@ -66,9 +68,12 @@ public final class DefaultImporter extends
AbstractLifecycleExecutor implements
private final PipelineJobProgressListener jobProgressListener;
+ private final JobRateLimitAlgorithm rateLimitAlgorithm;
+
public DefaultImporter(final ImporterConfiguration importerConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
final PipelineJobProgressListener
jobProgressListener) {
this.importerConfig = importerConfig;
+ rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
this.dataSourceManager = dataSourceManager;
this.channel = channel;
pipelineSqlBuilder =
PipelineSQLBuilderFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType());
@@ -144,12 +149,21 @@ public final class DefaultImporter extends
AbstractLifecycleExecutor implements
connection.setAutoCommit(false);
switch (buffer.get(0).getType()) {
case IngestDataChangeType.INSERT:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.INSERT,
1);
+ }
executeBatchInsert(connection, buffer);
break;
case IngestDataChangeType.UPDATE:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.UPDATE,
1);
+ }
executeUpdate(connection, buffer);
break;
case IngestDataChangeType.DELETE:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.DELETE,
1);
+ }
executeBatchDelete(connection, buffer);
break;
default:
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
index 63aff8d493f..8c3c4fc52b4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
@@ -60,11 +60,10 @@ public final class TPSJobRateLimitAlgorithm implements
JobRateLimitAlgorithm {
case INSERT:
case DELETE:
case UPDATE:
+ rateLimiter.acquire(null != data ? data.intValue() : 1);
break;
default:
- return;
}
- rateLimiter.acquire(null != data ? data.intValue() : 1);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 6b4d8413a0b..c96620cfc6a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -213,7 +214,9 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
- return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
retryTimes, concurrency);
+ PipelineProcessContext migrationProcessContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+ return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
migrationProcessContext.getWriteRateLimitAlgorithm(),
+ retryTimes, concurrency);
}
private static Map<LogicTableName, Set<String>> unmodifiable(final
Map<LogicTableName, Set<String>> shardingColumnsMap) {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index 98b4ceeded8..c81823ee2a8 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -171,6 +171,6 @@ public final class DefaultImporterTest {
private ImporterConfiguration mockImporterConfiguration() {
Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("test_table"),
Collections.singleton("user"));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, null, 3, 3);
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
index 5dc9c174099..55dc0b0fe56 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
@@ -66,6 +66,6 @@ public final class ImporterCreatorFactoryTest {
private ImporterConfiguration createImporterConfiguration(final String
databaseType) {
Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("t_order"), new
HashSet<>(Arrays.asList("order_id", "user_id", "status")));
PipelineDataSourceConfiguration dataSourceConfig = new
FixturePipelineDataSourceConfiguration(DatabaseTypeFactory.getInstance(databaseType));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, null, 3, 3);
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 98fe2cabb1d..56409e6fbd1 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -160,8 +160,7 @@ public final class PipelineContextUtil {
MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), processConfig);
int jobShardingItem = 0;
TaskConfiguration taskConfig = new
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem,
processConfig);
- return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
- processContext, taskConfig, new
DefaultPipelineDataSourceManager());
+ return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
processContext, taskConfig, new DefaultPipelineDataSourceManager());
}
private static PipelineProcessConfiguration
mockPipelineProcessConfiguration() {