This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9e4ed51f9bf Update rate limit in inventory dumper streaming query
(#24512)
9e4ed51f9bf is described below
commit 9e4ed51f9bf31ed61e4f7ec33b03931068809aec
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Mar 9 12:22:14 2023 +0800
Update rate limit in inventory dumper streaming query (#24512)
---
.../data/pipeline/core/ingest/dumper/InventoryDumper.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 277ab2084c2..3d30e7876e4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -43,6 +43,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestExcep
import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -101,7 +102,6 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
- log.info("Inventory dump done");
} catch (final SQLException ex) {
log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
throw new IngestException("Inventory dump failed on " +
dumperConfig.getActualTableName(), ex);
@@ -111,9 +111,6 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
}
private void dump(final PipelineTableMetaData tableMetaData, final
Connection connection) throws SQLException {
- if (null != dumperConfig.getRateLimitAlgorithm()) {
-
dumperConfig.getRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
- }
int batchSize = dumperConfig.getBatchSize();
DatabaseType databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType();
try (PreparedStatement preparedStatement =
JDBCStreamQueryUtil.generateStreamQueryPreparedStatement(databaseType,
connection, buildInventoryDumpSQL())) {
@@ -123,15 +120,22 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
}
setParameters(preparedStatement);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ int rowCount = 0;
+ JobRateLimitAlgorithm rateLimitAlgorithm =
dumperConfig.getRateLimitAlgorithm();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
channel.pushRecord(loadDataRecord(resultSet,
resultSetMetaData, tableMetaData));
+ ++rowCount;
if (!isRunning()) {
log.info("Broke because of inventory dump is not
running.");
break;
}
+ if (null != rateLimitAlgorithm && 0 == rowCount %
batchSize) {
+ rateLimitAlgorithm.intercept(JobOperationType.SELECT,
1);
+ }
}
dumpStatement = null;
+ log.info("Inventory dump done, rowCount={}", rowCount);
}
}
}