This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e4ed51f9bf Update rate limit in inventory dumper streaming query 
(#24512)
9e4ed51f9bf is described below

commit 9e4ed51f9bf31ed61e4f7ec33b03931068809aec
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Mar 9 12:22:14 2023 +0800

    Update rate limit in inventory dumper streaming query (#24512)
---
 .../data/pipeline/core/ingest/dumper/InventoryDumper.java    | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 277ab2084c2..3d30e7876e4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -43,6 +43,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestExcep
 import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -101,7 +102,6 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName());
         try (Connection connection = dataSource.getConnection()) {
             dump(tableMetaData, connection);
-            log.info("Inventory dump done");
         } catch (final SQLException ex) {
             log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
             throw new IngestException("Inventory dump failed on " + 
dumperConfig.getActualTableName(), ex);
@@ -111,9 +111,6 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     }
     
     private void dump(final PipelineTableMetaData tableMetaData, final 
Connection connection) throws SQLException {
-        if (null != dumperConfig.getRateLimitAlgorithm()) {
-            
dumperConfig.getRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
-        }
         int batchSize = dumperConfig.getBatchSize();
         DatabaseType databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType();
         try (PreparedStatement preparedStatement = 
JDBCStreamQueryUtil.generateStreamQueryPreparedStatement(databaseType, 
connection, buildInventoryDumpSQL())) {
@@ -123,15 +120,22 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
             }
             setParameters(preparedStatement);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                int rowCount = 0;
+                JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperConfig.getRateLimitAlgorithm();
                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
                 while (resultSet.next()) {
                     channel.pushRecord(loadDataRecord(resultSet, 
resultSetMetaData, tableMetaData));
+                    ++rowCount;
                     if (!isRunning()) {
                         log.info("Broke because of inventory dump is not 
running.");
                         break;
                     }
+                    if (null != rateLimitAlgorithm && 0 == rowCount % 
batchSize) {
+                        rateLimitAlgorithm.intercept(JobOperationType.SELECT, 
1);
+                    }
                 }
                 dumpStatement = null;
+                log.info("Inventory dump done, rowCount={}", rowCount);
             }
         }
     }

Reply via email to