This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 654a31207c4 Extract InputConfiguration and OutputConfiguration from 
OnRuleAlteredActionConfiguration (#19880)
654a31207c4 is described below

commit 654a31207c4bb3ef53bc0d601978f08ce5ff8880
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 5 09:46:55 2022 +0800

    Extract InputConfiguration and OutputConfiguration from 
OnRuleAlteredActionConfiguration (#19880)
---
 .../ShardingScalingRuleStatementConverter.java     | 16 ++---
 .../ShardingScalingRulesQueryResultSetTest.java    |  8 +--
 .../data/pipeline/PipelineInputConfiguration.java  | 40 +++++++++++
 .../data/pipeline/PipelineOutputConfiguration.java | 38 ++++++++++
 .../OnRuleAlteredActionConfiguration.java          | 32 ++-------
 .../pipeline/YamlPipelineInputConfiguration.java   | 67 +++++++++++++++++
 .../pipeline/YamlPipelineOutputConfiguration.java  | 60 ++++++++++++++++
 .../rule/YamlOnRuleAlteredActionConfiguration.java | 84 ++--------------------
 .../YamlPipelineInputConfigurationSwapper.java     | 54 ++++++++++++++
 .../YamlPipelineOutputConfigurationSwapper.java    | 52 ++++++++++++++
 ...amlOnRuleAlteredActionConfigurationSwapper.java | 56 ++-------------
 ...nRuleAlteredActionConfigurationSwapperTest.java | 10 +--
 .../scenario/rulealtered/RuleAlteredContext.java   | 16 ++---
 .../rulealtered/prepare/InventoryTaskSplitter.java |  4 +-
 14 files changed, 350 insertions(+), 187 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
index a42edab3015..051d739d86f 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
@@ -21,9 +21,9 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
 import 
org.apache.shardingsphere.scaling.distsql.statement.segment.InputOrOutputSegment;
 import 
org.apache.shardingsphere.scaling.distsql.statement.segment.ShardingScalingRuleConfigurationSegment;
 
@@ -40,26 +40,26 @@ public final class ShardingScalingRuleStatementConverter {
      * @return on rule altered action configuration
      */
     public static OnRuleAlteredActionConfiguration convert(final 
ShardingScalingRuleConfigurationSegment segment) {
-        InputConfiguration inputConfig = 
convertToInputConfiguration(segment.getInputSegment());
-        OutputConfiguration outputConfig = 
convertToOutputConfiguration(segment.getOutputSegment());
+        PipelineInputConfiguration inputConfig = 
convertToInputConfiguration(segment.getInputSegment());
+        PipelineOutputConfiguration outputConfig = 
convertToOutputConfiguration(segment.getOutputSegment());
         AlgorithmConfiguration streamChannel = 
convertToAlgorithm(segment.getStreamChannel());
         AlgorithmConfiguration completionDetector = 
convertToAlgorithm(segment.getCompletionDetector());
         AlgorithmConfiguration dataConsistencyChecker = 
convertToAlgorithm(segment.getDataConsistencyCalculator());
         return new OnRuleAlteredActionConfiguration(inputConfig, outputConfig, 
streamChannel, completionDetector, dataConsistencyChecker);
     }
     
-    private static InputConfiguration convertToInputConfiguration(final 
InputOrOutputSegment inputSegment) {
+    private static PipelineInputConfiguration 
convertToInputConfiguration(final InputOrOutputSegment inputSegment) {
         if (null == inputSegment) {
             return null;
         }
-        return new InputConfiguration(inputSegment.getWorkerThread(), 
inputSegment.getBatchSize(), inputSegment.getShardingSize(), 
convertToAlgorithm(inputSegment.getRateLimiter()));
+        return new PipelineInputConfiguration(inputSegment.getWorkerThread(), 
inputSegment.getBatchSize(), inputSegment.getShardingSize(), 
convertToAlgorithm(inputSegment.getRateLimiter()));
     }
     
-    private static OutputConfiguration convertToOutputConfiguration(final 
InputOrOutputSegment outputSegment) {
+    private static PipelineOutputConfiguration 
convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
         if (null == outputSegment) {
             return null;
         }
-        return new OutputConfiguration(outputSegment.getWorkerThread(), 
outputSegment.getBatchSize(), 
convertToAlgorithm(outputSegment.getRateLimiter()));
+        return new 
PipelineOutputConfiguration(outputSegment.getWorkerThread(), 
outputSegment.getBatchSize(), 
convertToAlgorithm(outputSegment.getRateLimiter()));
     }
     
     private static AlgorithmConfiguration convertToAlgorithm(final 
AlgorithmSegment segment) {
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
index 11965f44055..60f82bbf4d0 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
@@ -19,9 +19,9 @@ package 
org.apache.shardingsphere.scaling.distsql.handler.query;
 
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.distsql.handler.query.ShardingAlgorithmsQueryResultSet;
@@ -81,8 +81,8 @@ public final class ShardingScalingRulesQueryResultSetTest {
     }
     
     private OnRuleAlteredActionConfiguration buildCompleteConfiguration() {
-        InputConfiguration inputConfig = new InputConfiguration(10, 100, 10, 
new AlgorithmConfiguration("QPS", createProperties("qps", "50")));
-        OutputConfiguration outputConfig = new OutputConfiguration(10, 100, 
new AlgorithmConfiguration("TPS", createProperties("tps", "2000")));
+        PipelineInputConfiguration inputConfig = new 
PipelineInputConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS", 
createProperties("qps", "50")));
+        PipelineOutputConfiguration outputConfig = new 
PipelineOutputConfiguration(10, 100, new AlgorithmConfiguration("TPS", 
createProperties("tps", "2000")));
         AlgorithmConfiguration streamChannel = new 
AlgorithmConfiguration("MEMORY", createProperties("block-queue-size", "10000"));
         AlgorithmConfiguration completionDetector = new 
AlgorithmConfiguration("IDLE", 
createProperties("incremental-task-idle-seconds-threshold", "1800"));
         AlgorithmConfiguration dataConsistencyChecker = new 
AlgorithmConfiguration("DATA_MATCH", createProperties("chunk-size", "1000"));
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
new file mode 100644
index 00000000000..d723b011eec
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.rule.data.pipeline;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+
+/**
+ * Pipeline input configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class PipelineInputConfiguration {
+    
+    private final Integer workerThread;
+    
+    private final Integer batchSize;
+    
+    private final Integer shardingSize;
+    
+    private final AlgorithmConfiguration rateLimiter;
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
new file mode 100644
index 00000000000..af617327bc1
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.rule.data.pipeline;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+
+/**
+ * Pipeline output configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class PipelineOutputConfiguration {
+    
+    private final Integer workerThread;
+    
+    private final Integer batchSize;
+    
+    private final AlgorithmConfiguration rateLimiter;
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
index 4e5a57be02a..bcc6dd0451a 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -21,6 +21,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
 
 /**
  * On rule altered action configuration.
@@ -30,39 +32,13 @@ import 
org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 @ToString
 public final class OnRuleAlteredActionConfiguration {
     
-    private final InputConfiguration input;
+    private final PipelineInputConfiguration input;
     
-    private final OutputConfiguration output;
+    private final PipelineOutputConfiguration output;
     
     private final AlgorithmConfiguration streamChannel;
     
     private final AlgorithmConfiguration completionDetector;
     
     private final AlgorithmConfiguration dataConsistencyCalculator;
-    
-    @RequiredArgsConstructor
-    @Getter
-    @ToString
-    public static final class InputConfiguration {
-        
-        private final Integer workerThread;
-        
-        private final Integer batchSize;
-        
-        private final Integer shardingSize;
-        
-        private final AlgorithmConfiguration rateLimiter;
-    }
-    
-    @RequiredArgsConstructor
-    @Getter
-    @ToString
-    public static final class OutputConfiguration {
-        
-        private final Integer workerThread;
-        
-        private final Integer batchSize;
-        
-        private final AlgorithmConfiguration rateLimiter;
-    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
new file mode 100644
index 00000000000..3c7ffd0bbc2
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
+
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+
+/**
+ * YAML pipeline input configuration.
+ */
+@Data
+public final class YamlPipelineInputConfiguration implements YamlConfiguration 
{
+    
+    private static final Integer DEFAULT_WORKER_THREAD = 40;
+    
+    private static final Integer DEFAULT_BATCH_SIZE = 1000;
+    
+    private static final Integer DEFAULT_SHARDING_SIZE = 1000_0000;
+    
+    private Integer workerThread = DEFAULT_WORKER_THREAD;
+    
+    private Integer batchSize = DEFAULT_BATCH_SIZE;
+    
+    private Integer shardingSize = DEFAULT_SHARDING_SIZE;
+    
+    private YamlAlgorithmConfiguration rateLimiter;
+    
+    /**
+     * Build with default value.
+     *
+     * @return input configuration
+     */
+    public static YamlPipelineInputConfiguration buildWithDefaultValue() {
+        return new YamlPipelineInputConfiguration();
+    }
+    
+    /**
+     * Fill in null fields with default value.
+     */
+    public void fillInNullFieldsWithDefaultValue() {
+        if (null == workerThread) {
+            workerThread = DEFAULT_WORKER_THREAD;
+        }
+        if (null == batchSize) {
+            batchSize = DEFAULT_BATCH_SIZE;
+        }
+        if (null == shardingSize) {
+            shardingSize = DEFAULT_SHARDING_SIZE;
+        }
+    }
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
new file mode 100644
index 00000000000..578357cd1d0
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
+
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+
+/**
+ * YAML pipeline output configuration.
+ */
+@Data
+public final class YamlPipelineOutputConfiguration implements 
YamlConfiguration {
+    
+    private static final Integer DEFAULT_WORKER_THREAD = 40;
+    
+    private static final Integer DEFAULT_BATCH_SIZE = 1000;
+    
+    private Integer workerThread = DEFAULT_WORKER_THREAD;
+    
+    private Integer batchSize = DEFAULT_BATCH_SIZE;
+    
+    private YamlAlgorithmConfiguration rateLimiter;
+    
+    /**
+     * Build with default value.
+     *
+     * @return output configuration
+     */
+    public static YamlPipelineOutputConfiguration buildWithDefaultValue() {
+        return new YamlPipelineOutputConfiguration();
+    }
+    
+    /**
+     * Fill in null fields with default value.
+     */
+    public void fillInNullFieldsWithDefaultValue() {
+        if (null == workerThread) {
+            workerThread = DEFAULT_WORKER_THREAD;
+        }
+        if (null == batchSize) {
+            batchSize = DEFAULT_BATCH_SIZE;
+        }
+    }
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
index 14401d0ea67..2c742bc0d60 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
@@ -17,12 +17,13 @@
 
 package org.apache.shardingsphere.infra.yaml.config.pojo.rule;
 
-import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
 
 /**
  * YAML on rule altered action configuration.
@@ -32,90 +33,13 @@ import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmC
 @ToString
 public final class YamlOnRuleAlteredActionConfiguration implements 
YamlConfiguration {
     
-    private YamlInputConfiguration input;
+    private YamlPipelineInputConfiguration input;
     
-    private YamlOutputConfiguration output;
+    private YamlPipelineOutputConfiguration output;
     
     private YamlAlgorithmConfiguration streamChannel;
     
     private YamlAlgorithmConfiguration completionDetector;
     
     private YamlAlgorithmConfiguration dataConsistencyChecker;
-    
-    @Data
-    public static final class YamlInputConfiguration implements 
YamlConfiguration {
-        
-        private static final Integer DEFAULT_WORKER_THREAD = 40;
-        
-        private static final Integer DEFAULT_BATCH_SIZE = 1000;
-        
-        private static final Integer DEFAULT_SHARDING_SIZE = 1000_0000;
-        
-        private Integer workerThread = DEFAULT_WORKER_THREAD;
-        
-        private Integer batchSize = DEFAULT_BATCH_SIZE;
-        
-        private Integer shardingSize = DEFAULT_SHARDING_SIZE;
-        
-        private YamlAlgorithmConfiguration rateLimiter;
-        
-        /**
-         * Build with default value.
-         *
-         * @return input configuration
-         */
-        public static YamlInputConfiguration buildWithDefaultValue() {
-            return new YamlInputConfiguration();
-        }
-        
-        /**
-         * Fill in null fields with default value.
-         */
-        public void fillInNullFieldsWithDefaultValue() {
-            if (null == workerThread) {
-                workerThread = DEFAULT_WORKER_THREAD;
-            }
-            if (null == batchSize) {
-                batchSize = DEFAULT_BATCH_SIZE;
-            }
-            if (null == shardingSize) {
-                shardingSize = DEFAULT_SHARDING_SIZE;
-            }
-        }
-    }
-    
-    @Data
-    public static final class YamlOutputConfiguration implements 
YamlConfiguration {
-        
-        private static final Integer DEFAULT_WORKER_THREAD = 40;
-        
-        private static final Integer DEFAULT_BATCH_SIZE = 1000;
-        
-        private Integer workerThread = DEFAULT_WORKER_THREAD;
-        
-        private Integer batchSize = DEFAULT_BATCH_SIZE;
-        
-        private YamlAlgorithmConfiguration rateLimiter;
-        
-        /**
-         * Build with default value.
-         *
-         * @return output configuration
-         */
-        public static YamlOutputConfiguration buildWithDefaultValue() {
-            return new YamlOutputConfiguration();
-        }
-        
-        /**
-         * Fill in null fields with default value.
-         */
-        public void fillInNullFieldsWithDefaultValue() {
-            if (null == workerThread) {
-                workerThread = DEFAULT_WORKER_THREAD;
-            }
-            if (null == batchSize) {
-                batchSize = DEFAULT_BATCH_SIZE;
-            }
-        }
-    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
new file mode 100644
index 00000000000..1152905d4d0
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import lombok.Data;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+
+/**
+ * YAML pipeline input configuration swapper.
+ */
+@Data
+public final class YamlPipelineInputConfigurationSwapper implements 
YamlConfigurationSwapper<YamlPipelineInputConfiguration, 
PipelineInputConfiguration> {
+    
+    private static final YamlAlgorithmConfigurationSwapper 
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
+    
+    @Override
+    public YamlPipelineInputConfiguration swapToYamlConfiguration(final 
PipelineInputConfiguration data) {
+        if (null == data) {
+            return null;
+        }
+        YamlPipelineInputConfiguration result = new 
YamlPipelineInputConfiguration();
+        result.setWorkerThread(data.getWorkerThread());
+        result.setBatchSize(data.getBatchSize());
+        result.setShardingSize(data.getShardingSize());
+        
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+        return result;
+    }
+    
+    @Override
+    public PipelineInputConfiguration swapToObject(final 
YamlPipelineInputConfiguration yamlConfig) {
+        return null == yamlConfig
+                ? null
+                : new PipelineInputConfiguration(yamlConfig.getWorkerThread(), 
yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
+                
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+    }
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
new file mode 100644
index 00000000000..52f48ca2bc0
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import lombok.Data;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+
+/**
+ * YAML pipeline output configuration swapper.
+ */
+@Data
+public final class YamlPipelineOutputConfigurationSwapper implements 
YamlConfigurationSwapper<YamlPipelineOutputConfiguration, 
PipelineOutputConfiguration> {
+    
+    private static final YamlAlgorithmConfigurationSwapper 
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
+    
+    @Override
+    public YamlPipelineOutputConfiguration swapToYamlConfiguration(final 
PipelineOutputConfiguration data) {
+        if (null == data) {
+            return null;
+        }
+        YamlPipelineOutputConfiguration result = new 
YamlPipelineOutputConfiguration();
+        result.setWorkerThread(data.getWorkerThread());
+        result.setBatchSize(data.getBatchSize());
+        
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+        return result;
+    }
+    
+    @Override
+    public PipelineOutputConfiguration swapToObject(final 
YamlPipelineOutputConfiguration yamlConfig) {
+        return null == yamlConfig
+                ? null
+                : new 
PipelineOutputConfiguration(yamlConfig.getWorkerThread(), 
yamlConfig.getBatchSize(), 
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+    }
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
index 1702fb75672..5fca55ab5fa 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
@@ -18,13 +18,11 @@
 package org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered;
 
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineOutputConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineInputConfigurationSwapper;
 
 /**
  * YAML on rule altered action configuration swapper.
@@ -33,9 +31,9 @@ public final class 
YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
     
     private static final YamlAlgorithmConfigurationSwapper 
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
     
-    private static final InputConfigurationSwapper INPUT_CONFIG_SWAPPER = new 
InputConfigurationSwapper();
+    private static final YamlPipelineInputConfigurationSwapper 
INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
     
-    private static final OutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER = 
new OutputConfigurationSwapper();
+    private static final YamlPipelineOutputConfigurationSwapper 
OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
     
     @Override
     public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final 
OnRuleAlteredActionConfiguration data) {
@@ -63,50 +61,4 @@ public final class 
YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
                 
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
                 
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
     }
-    
-    public static class InputConfigurationSwapper implements 
YamlConfigurationSwapper<YamlInputConfiguration, InputConfiguration> {
-        
-        @Override
-        public YamlInputConfiguration swapToYamlConfiguration(final 
InputConfiguration data) {
-            if (null == data) {
-                return null;
-            }
-            YamlInputConfiguration result = new YamlInputConfiguration();
-            result.setWorkerThread(data.getWorkerThread());
-            result.setBatchSize(data.getBatchSize());
-            result.setShardingSize(data.getShardingSize());
-            
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
-            return result;
-        }
-        
-        @Override
-        public InputConfiguration swapToObject(final YamlInputConfiguration 
yamlConfig) {
-            return null == yamlConfig
-                    ? null
-                    : new InputConfiguration(yamlConfig.getWorkerThread(), 
yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
-                            
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
-        }
-    }
-    
-    public static class OutputConfigurationSwapper implements 
YamlConfigurationSwapper<YamlOutputConfiguration, OutputConfiguration> {
-        
-        @Override
-        public YamlOutputConfiguration swapToYamlConfiguration(final 
OutputConfiguration data) {
-            if (null == data) {
-                return null;
-            }
-            YamlOutputConfiguration result = new YamlOutputConfiguration();
-            result.setWorkerThread(data.getWorkerThread());
-            result.setBatchSize(data.getBatchSize());
-            
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
-            return result;
-        }
-        
-        @Override
-        public OutputConfiguration swapToObject(final YamlOutputConfiguration 
yamlConfig) {
-            return null == yamlConfig
-                    ? null
-                    : new OutputConfiguration(yamlConfig.getWorkerThread(), 
yamlConfig.getBatchSize(), 
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
-        }
-    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
index 2bba1745967..42c6f5a4e9e 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
@@ -18,11 +18,11 @@
 package org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered;
 
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -41,10 +41,10 @@ public final class 
YamlOnRuleAlteredActionConfigurationSwapperTest {
         Properties rateLimiterProps = new Properties();
         rateLimiterProps.setProperty("batch-size", "1000");
         rateLimiterProps.setProperty("qps", "50");
-        YamlInputConfiguration yamlInputConfig = 
YamlInputConfiguration.buildWithDefaultValue();
+        YamlPipelineInputConfiguration yamlInputConfig = 
YamlPipelineInputConfiguration.buildWithDefaultValue();
         yamlConfig.setInput(yamlInputConfig);
         yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", 
rateLimiterProps));
-        YamlOutputConfiguration yamlOutputConfig = 
YamlOutputConfiguration.buildWithDefaultValue();
+        YamlPipelineOutputConfiguration yamlOutputConfig = 
YamlPipelineOutputConfiguration.buildWithDefaultValue();
         yamlOutputConfig.setRateLimiter(new 
YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
         yamlConfig.setOutput(yamlOutputConfig);
         Properties streamChannelProps = new Properties();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 6f2ed908b5f..b6e0c69070d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -31,13 +31,13 @@ import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
 
 import java.util.Properties;
@@ -80,10 +80,10 @@ public final class RuleAlteredContext {
     public RuleAlteredContext(final String jobId, final 
OnRuleAlteredActionConfiguration actionConfig) {
         OnRuleAlteredActionConfiguration onRuleAlteredActionConfig = 
convertActionConfig(actionConfig);
         this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
-        InputConfiguration inputConfig = onRuleAlteredActionConfig.getInput();
+        PipelineInputConfiguration inputConfig = 
onRuleAlteredActionConfig.getInput();
         AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
         inputRateLimitAlgorithm = null != inputRateLimiter ? 
JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
-        OutputConfiguration outputConfig = 
onRuleAlteredActionConfig.getOutput();
+        PipelineOutputConfiguration outputConfig = 
onRuleAlteredActionConfig.getOutput();
         AlgorithmConfiguration outputRateLimiter = 
outputConfig.getRateLimiter();
         outputRateLimitAlgorithm = null != outputRateLimiter ? 
JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
         AlgorithmConfiguration streamChannel = 
onRuleAlteredActionConfig.getStreamChannel();
@@ -102,12 +102,12 @@ public final class RuleAlteredContext {
     private OnRuleAlteredActionConfiguration convertActionConfig(final 
OnRuleAlteredActionConfiguration actionConfig) {
         YamlOnRuleAlteredActionConfiguration yamlActionConfig = 
SWAPPER.swapToYamlConfiguration(actionConfig);
         if (null == yamlActionConfig.getInput()) {
-            
yamlActionConfig.setInput(YamlInputConfiguration.buildWithDefaultValue());
+            
yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
         } else {
             yamlActionConfig.getInput().fillInNullFieldsWithDefaultValue();
         }
         if (null == yamlActionConfig.getOutput()) {
-            
yamlActionConfig.setOutput(YamlOutputConfiguration.buildWithDefaultValue());
+            
yamlActionConfig.setOutput(YamlPipelineOutputConfiguration.buildWithDefaultValue());
         } else {
             yamlActionConfig.getOutput().fillInNullFieldsWithDefaultValue();
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 2dc4079d45f..635711c39c8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -45,7 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredC
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
 import 
org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
 
 import javax.sql.DataSource;
@@ -111,7 +111,7 @@ public final class InventoryTaskSplitter {
                                                                        final 
InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         RuleAlteredContext ruleAlteredContext = 
jobContext.getRuleAlteredContext();
-        InputConfiguration inputConfig = 
ruleAlteredContext.getOnRuleAlteredActionConfig().getInput();
+        PipelineInputConfiguration inputConfig = 
ruleAlteredContext.getOnRuleAlteredActionConfig().getInput();
         int batchSize = inputConfig.getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = 
ruleAlteredContext.getInputRateLimitAlgorithm();
         Collection<IngestPosition<?>> inventoryPositions = 
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataLoader);

Reply via email to