sandynz commented on code in PR #20537:
URL: https://github.com/apache/shardingsphere/pull/20537#discussion_r955645207
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java:
##########
@@ -56,13 +56,8 @@ public String getType() {
@Override
public void intercept(final JobOperationType type, final Number data) {
- switch (type) {
- case INSERT:
- case DELETE:
- case UPDATE:
- break;
- default:
- return;
+ if (JobOperationType.SELECT == type) {
+ return;
}
Review Comment:
It's better to keep INSERT/DELETE/UPDATE, since JobOperationType might add
new type, then it might cause issue
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java:
##########
@@ -45,9 +46,11 @@ public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI,
* @param jobConfig job configuration
* @param jobShardingItem job sharding item
* @param pipelineProcessConfig pipeline process configuration
+ * @param writeRateLimitAlgorithm write rate limit algorithm
* @return task configuration
*/
- TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration
jobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
+ TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration
jobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig,
+ JobRateLimitAlgorithm
writeRateLimitAlgorithm);
Review Comment:
Looks it's not necessary to add new parameter `JobRateLimitAlgorithm
writeRateLimitAlgorithm` for `buildTaskConfiguration` method, since
AbstractPipelineProcessContext's constructor
`AbstractPipelineProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig)`, we could just build a
PipelineProcessContext instance to get writeRateLimitAlgorithm (need to add get
writeRateLimitAlgorithm method in PipelineProcessContext interface).
##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java:
##########
@@ -144,12 +149,21 @@ private void doFlush(final DataSource dataSource, final
List<DataRecord> buffer)
connection.setAutoCommit(false);
switch (buffer.get(0).getType()) {
case IngestDataChangeType.INSERT:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.INSERT,
buffer.size());
+ }
executeBatchInsert(connection, buffer);
break;
case IngestDataChangeType.UPDATE:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.UPDATE,
buffer.size());
+ }
executeUpdate(connection, buffer);
break;
case IngestDataChangeType.DELETE:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.DELETE,
buffer.size());
+ }
Review Comment:
It's better to use `1` instread of `buffer.size()`, since
INSERT/UPDATE/DELETE is in transaction.
##########
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java:
##########
@@ -159,8 +159,8 @@ public static MigrationJobItemContext
mockMigrationJobItemContext(final Migratio
PipelineProcessConfiguration processConfig =
mockPipelineProcessConfiguration();
MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), processConfig);
int jobShardingItem = 0;
- TaskConfiguration taskConfig = new
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem,
processConfig);
- return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
+ TaskConfiguration taskConfig = new
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem,
processConfig, processContext.getWriteRateLimitAlgorithm());
+ return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
processContext, taskConfig, new
DefaultPipelineDataSourceManager());
Review Comment:
The last blank character at end of line could be removed. And it's better to
merge the last two lines into one line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]