sandynz commented on code in PR #20537:
URL: https://github.com/apache/shardingsphere/pull/20537#discussion_r955645207


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ratelimit/TPSJobRateLimitAlgorithm.java:
##########
@@ -56,13 +56,8 @@ public String getType() {
     
     @Override
     public void intercept(final JobOperationType type, final Number data) {
-        switch (type) {
-            case INSERT:
-            case DELETE:
-            case UPDATE:
-                break;
-            default:
-                return;
+        if (JobOperationType.SELECT == type) {
+            return;
         }

Review Comment:
   It's better to keep INSERT/DELETE/UPDATE, since JobOperationType might add 
new type, then it might cause issue



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java:
##########
@@ -45,9 +46,11 @@ public interface MigrationJobAPI extends PipelineJobAPI, 
MigrationJobPublicAPI,
      * @param jobConfig job configuration
      * @param jobShardingItem job sharding item
      * @param pipelineProcessConfig pipeline process configuration
+     * @param writeRateLimitAlgorithm write rate limit algorithm
      * @return task configuration
      */
-    TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration 
jobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig);
+    TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration 
jobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig,
+                                             JobRateLimitAlgorithm 
writeRateLimitAlgorithm);

Review Comment:
   Looks it's not necessary to add new parameter `JobRateLimitAlgorithm 
writeRateLimitAlgorithm` for `buildTaskConfiguration` method, since 
AbstractPipelineProcessContext's constructor 
`AbstractPipelineProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig)`, we could just build a 
PipelineProcessContext instance to get writeRateLimitAlgorithm (need to add get 
writeRateLimitAlgorithm method in PipelineProcessContext interface).
   



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java:
##########
@@ -144,12 +149,21 @@ private void doFlush(final DataSource dataSource, final 
List<DataRecord> buffer)
             connection.setAutoCommit(false);
             switch (buffer.get(0).getType()) {
                 case IngestDataChangeType.INSERT:
+                    if (null != rateLimitAlgorithm) {
+                        rateLimitAlgorithm.intercept(JobOperationType.INSERT, 
buffer.size());
+                    }
                     executeBatchInsert(connection, buffer);
                     break;
                 case IngestDataChangeType.UPDATE:
+                    if (null != rateLimitAlgorithm) {
+                        rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 
buffer.size());
+                    }
                     executeUpdate(connection, buffer);
                     break;
                 case IngestDataChangeType.DELETE:
+                    if (null != rateLimitAlgorithm) {
+                        rateLimitAlgorithm.intercept(JobOperationType.DELETE, 
buffer.size());
+                    }

Review Comment:
   It's better to use `1` instread of `buffer.size()`, since 
INSERT/UPDATE/DELETE is in transaction.



##########
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java:
##########
@@ -159,8 +159,8 @@ public static MigrationJobItemContext 
mockMigrationJobItemContext(final Migratio
         PipelineProcessConfiguration processConfig = 
mockPipelineProcessConfiguration();
         MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), processConfig);
         int jobShardingItem = 0;
-        TaskConfiguration taskConfig = new 
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, 
processConfig);
-        return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
+        TaskConfiguration taskConfig = new 
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem, 
processConfig, processContext.getWriteRateLimitAlgorithm());
+        return new MigrationJobItemContext(jobConfig, jobShardingItem, null, 
                 processContext, taskConfig, new 
DefaultPipelineDataSourceManager());

Review Comment:
   The last blank character at end of line could be removed. And it's better to 
merge the last two lines into one line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to