This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ced99955c59 Enable pipeline TPS rate limiter (#20537)
ced99955c59 is described below

commit ced99955c59c6d72c19d93e43f1388fceeb35820
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 26 15:12:17 2022 +0800

    Enable pipeline TPS rate limiter (#20537)
    
    * Enable pipeline TPS rate limiter
    
    * Fix codestyle
---
 .../data/pipeline/api/config/ImporterConfiguration.java    |  3 +++
 .../data/pipeline/api/context/PipelineProcessContext.java  |  7 +++++++
 .../data/pipeline/core/importer/DefaultImporter.java       | 14 ++++++++++++++
 .../core/spi/ratelimit/TPSJobRateLimitAlgorithm.java       |  3 +--
 .../pipeline/scenario/migration/MigrationJobAPIImpl.java   |  5 ++++-
 .../data/pipeline/core/importer/DefaultImporterTest.java   |  2 +-
 .../pipeline/core/importer/ImporterCreatorFactoryTest.java |  2 +-
 .../data/pipeline/core/util/PipelineContextUtil.java       |  3 +--
 8 files changed, 32 insertions(+), 7 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
index d451719ecf2..700c0e5348c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
@@ -24,6 +24,7 @@ import lombok.ToString;
 import org.apache.commons.lang3.ObjectUtils;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 
 import java.util.Collection;
@@ -51,6 +52,8 @@ public final class ImporterConfiguration {
     
     private final int batchSize;
     
+    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    
     private final int retryTimes;
     
     private final int concurrency;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index ab015372778..39a166061d0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -46,4 +46,11 @@ public interface PipelineProcessContext {
      * @return job read rate limit algorithm
      */
     JobRateLimitAlgorithm getReadRateLimitAlgorithm();
+    
+    /**
+     * Get job write rate limit algorithm.
+     *
+     * @return job write rate limit algorithm
+     */
+    JobRateLimitAlgorithm getWriteRateLimitAlgorithm();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 0b172372839..332953b7f6d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -37,6 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
 import javax.sql.DataSource;
@@ -66,9 +68,12 @@ public final class DefaultImporter extends 
AbstractLifecycleExecutor implements
     
     private final PipelineJobProgressListener jobProgressListener;
     
+    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    
     public DefaultImporter(final ImporterConfiguration importerConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
                            final PipelineJobProgressListener 
jobProgressListener) {
         this.importerConfig = importerConfig;
+        rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
         this.dataSourceManager = dataSourceManager;
         this.channel = channel;
         pipelineSqlBuilder = 
PipelineSQLBuilderFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType());
@@ -144,12 +149,21 @@ public final class DefaultImporter extends 
AbstractLifecycleExecutor implements
             connection.setAutoCommit(false);
             switch (buffer.get(0).getType()) {
                 case IngestDataChangeType.INSERT:
+                    if (null != rateLimitAlgorithm) {
+                        rateLimitAlgorithm.intercept(JobOperationType.INSERT, 
1);
+                    }
                     executeBatchInsert(connection, buffer);
                     break;
                 case IngestDataChangeType.UPDATE:
+                    if (null != rateLimitAlgorithm) {
+                        rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 
1);
+                    }
                     executeUpdate(connection, buffer);
                     break;
                 case IngestDataChangeType.DELETE:
+                    if (null != rateLimitAlgorithm) {
+                        rateLimitAlgorithm.intercept(JobOperationType.DELETE, 
1);
+                    }
                     executeBatchDelete(connection, buffer);
                     break;
                 default:
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
index 63aff8d493f..8c3c4fc52b4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java
@@ -60,11 +60,10 @@ public final class TPSJobRateLimitAlgorithm implements 
JobRateLimitAlgorithm {
             case INSERT:
             case DELETE:
             case UPDATE:
+                rateLimiter.acquire(null != data ? data.intValue() : 1);
                 break;
             default:
-                return;
         }
-        rateLimiter.acquire(null != data ? data.intValue() : 1);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 6b4d8413a0b..c96620cfc6a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -213,7 +214,9 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
-        return new ImporterConfiguration(dataSourceConfig, 
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, 
retryTimes, concurrency);
+        PipelineProcessContext migrationProcessContext = new 
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+        return new ImporterConfiguration(dataSourceConfig, 
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, 
migrationProcessContext.getWriteRateLimitAlgorithm(),
+                retryTimes, concurrency);
     }
     
     private static Map<LogicTableName, Set<String>> unmodifiable(final 
Map<LogicTableName, Set<String>> shardingColumnsMap) {
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index 98b4ceeded8..c81823ee2a8 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -171,6 +171,6 @@ public final class DefaultImporterTest {
     
     private ImporterConfiguration mockImporterConfiguration() {
         Map<LogicTableName, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new LogicTableName("test_table"), 
Collections.singleton("user"));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, null, 3, 3);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
index 5dc9c174099..55dc0b0fe56 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
@@ -66,6 +66,6 @@ public final class ImporterCreatorFactoryTest {
     private ImporterConfiguration createImporterConfiguration(final String 
databaseType) {
         Map<LogicTableName, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new LogicTableName("t_order"), new 
HashSet<>(Arrays.asList("order_id", "user_id", "status")));
         PipelineDataSourceConfiguration dataSourceConfig = new 
FixturePipelineDataSourceConfiguration(DatabaseTypeFactory.getInstance(databaseType));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, null, 3, 3);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 98fe2cabb1d..56409e6fbd1 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -160,8 +160,7 @@ public final class PipelineContextUtil {
         MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), processConfig);
         int jobShardingItem = 0;
         TaskConfiguration taskConfig = new 
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, 
processConfig);
-        return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
-                processContext, taskConfig, new 
DefaultPipelineDataSourceManager());
+        return new MigrationJobItemContext(jobConfig, jobShardingItem, null, 
processContext, taskConfig, new DefaultPipelineDataSourceManager());
     }
     
     private static PipelineProcessConfiguration 
mockPipelineProcessConfiguration() {

Reply via email to