This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 654a31207c4 Extract InputConfiguration and OutputConfiguration from
OnRuleAlteredActionConfiguration (#19880)
654a31207c4 is described below
commit 654a31207c4bb3ef53bc0d601978f08ce5ff8880
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 5 09:46:55 2022 +0800
Extract InputConfiguration and OutputConfiguration from
OnRuleAlteredActionConfiguration (#19880)
---
.../ShardingScalingRuleStatementConverter.java | 16 ++---
.../ShardingScalingRulesQueryResultSetTest.java | 8 +--
.../data/pipeline/PipelineInputConfiguration.java | 40 +++++++++++
.../data/pipeline/PipelineOutputConfiguration.java | 38 ++++++++++
.../OnRuleAlteredActionConfiguration.java | 32 ++-------
.../pipeline/YamlPipelineInputConfiguration.java | 67 +++++++++++++++++
.../pipeline/YamlPipelineOutputConfiguration.java | 60 ++++++++++++++++
.../rule/YamlOnRuleAlteredActionConfiguration.java | 84 ++--------------------
.../YamlPipelineInputConfigurationSwapper.java | 54 ++++++++++++++
.../YamlPipelineOutputConfigurationSwapper.java | 52 ++++++++++++++
...amlOnRuleAlteredActionConfigurationSwapper.java | 56 ++-------------
...nRuleAlteredActionConfigurationSwapperTest.java | 10 +--
.../scenario/rulealtered/RuleAlteredContext.java | 16 ++---
.../rulealtered/prepare/InventoryTaskSplitter.java | 4 +-
14 files changed, 350 insertions(+), 187 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
index a42edab3015..051d739d86f 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
@@ -21,9 +21,9 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
import
org.apache.shardingsphere.scaling.distsql.statement.segment.InputOrOutputSegment;
import
org.apache.shardingsphere.scaling.distsql.statement.segment.ShardingScalingRuleConfigurationSegment;
@@ -40,26 +40,26 @@ public final class ShardingScalingRuleStatementConverter {
* @return on rule altered action configuration
*/
public static OnRuleAlteredActionConfiguration convert(final
ShardingScalingRuleConfigurationSegment segment) {
- InputConfiguration inputConfig =
convertToInputConfiguration(segment.getInputSegment());
- OutputConfiguration outputConfig =
convertToOutputConfiguration(segment.getOutputSegment());
+ PipelineInputConfiguration inputConfig =
convertToInputConfiguration(segment.getInputSegment());
+ PipelineOutputConfiguration outputConfig =
convertToOutputConfiguration(segment.getOutputSegment());
AlgorithmConfiguration streamChannel =
convertToAlgorithm(segment.getStreamChannel());
AlgorithmConfiguration completionDetector =
convertToAlgorithm(segment.getCompletionDetector());
AlgorithmConfiguration dataConsistencyChecker =
convertToAlgorithm(segment.getDataConsistencyCalculator());
return new OnRuleAlteredActionConfiguration(inputConfig, outputConfig,
streamChannel, completionDetector, dataConsistencyChecker);
}
- private static InputConfiguration convertToInputConfiguration(final
InputOrOutputSegment inputSegment) {
+ private static PipelineInputConfiguration
convertToInputConfiguration(final InputOrOutputSegment inputSegment) {
if (null == inputSegment) {
return null;
}
- return new InputConfiguration(inputSegment.getWorkerThread(),
inputSegment.getBatchSize(), inputSegment.getShardingSize(),
convertToAlgorithm(inputSegment.getRateLimiter()));
+ return new PipelineInputConfiguration(inputSegment.getWorkerThread(),
inputSegment.getBatchSize(), inputSegment.getShardingSize(),
convertToAlgorithm(inputSegment.getRateLimiter()));
}
- private static OutputConfiguration convertToOutputConfiguration(final
InputOrOutputSegment outputSegment) {
+ private static PipelineOutputConfiguration
convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
if (null == outputSegment) {
return null;
}
- return new OutputConfiguration(outputSegment.getWorkerThread(),
outputSegment.getBatchSize(),
convertToAlgorithm(outputSegment.getRateLimiter()));
+ return new
PipelineOutputConfiguration(outputSegment.getWorkerThread(),
outputSegment.getBatchSize(),
convertToAlgorithm(outputSegment.getRateLimiter()));
}
private static AlgorithmConfiguration convertToAlgorithm(final
AlgorithmSegment segment) {
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
index 11965f44055..60f82bbf4d0 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
@@ -19,9 +19,9 @@ package
org.apache.shardingsphere.scaling.distsql.handler.query;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import
org.apache.shardingsphere.sharding.distsql.handler.query.ShardingAlgorithmsQueryResultSet;
@@ -81,8 +81,8 @@ public final class ShardingScalingRulesQueryResultSetTest {
}
private OnRuleAlteredActionConfiguration buildCompleteConfiguration() {
- InputConfiguration inputConfig = new InputConfiguration(10, 100, 10,
new AlgorithmConfiguration("QPS", createProperties("qps", "50")));
- OutputConfiguration outputConfig = new OutputConfiguration(10, 100,
new AlgorithmConfiguration("TPS", createProperties("tps", "2000")));
+ PipelineInputConfiguration inputConfig = new
PipelineInputConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS",
createProperties("qps", "50")));
+ PipelineOutputConfiguration outputConfig = new
PipelineOutputConfiguration(10, 100, new AlgorithmConfiguration("TPS",
createProperties("tps", "2000")));
AlgorithmConfiguration streamChannel = new
AlgorithmConfiguration("MEMORY", createProperties("block-queue-size", "10000"));
AlgorithmConfiguration completionDetector = new
AlgorithmConfiguration("IDLE",
createProperties("incremental-task-idle-seconds-threshold", "1800"));
AlgorithmConfiguration dataConsistencyChecker = new
AlgorithmConfiguration("DATA_MATCH", createProperties("chunk-size", "1000"));
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
new file mode 100644
index 00000000000..d723b011eec
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.rule.data.pipeline;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+
+/**
+ * Pipeline input configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class PipelineInputConfiguration {
+
+ private final Integer workerThread;
+
+ private final Integer batchSize;
+
+ private final Integer shardingSize;
+
+ private final AlgorithmConfiguration rateLimiter;
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
new file mode 100644
index 00000000000..af617327bc1
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.rule.data.pipeline;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+
+/**
+ * Pipeline output configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class PipelineOutputConfiguration {
+
+ private final Integer workerThread;
+
+ private final Integer batchSize;
+
+ private final AlgorithmConfiguration rateLimiter;
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
index 4e5a57be02a..bcc6dd0451a 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -21,6 +21,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
/**
* On rule altered action configuration.
@@ -30,39 +32,13 @@ import
org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
@ToString
public final class OnRuleAlteredActionConfiguration {
- private final InputConfiguration input;
+ private final PipelineInputConfiguration input;
- private final OutputConfiguration output;
+ private final PipelineOutputConfiguration output;
private final AlgorithmConfiguration streamChannel;
private final AlgorithmConfiguration completionDetector;
private final AlgorithmConfiguration dataConsistencyCalculator;
-
- @RequiredArgsConstructor
- @Getter
- @ToString
- public static final class InputConfiguration {
-
- private final Integer workerThread;
-
- private final Integer batchSize;
-
- private final Integer shardingSize;
-
- private final AlgorithmConfiguration rateLimiter;
- }
-
- @RequiredArgsConstructor
- @Getter
- @ToString
- public static final class OutputConfiguration {
-
- private final Integer workerThread;
-
- private final Integer batchSize;
-
- private final AlgorithmConfiguration rateLimiter;
- }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
new file mode 100644
index 00000000000..3c7ffd0bbc2
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
+
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+
+/**
+ * YAML pipeline input configuration.
+ */
+@Data
+public final class YamlPipelineInputConfiguration implements YamlConfiguration
{
+
+ private static final Integer DEFAULT_WORKER_THREAD = 40;
+
+ private static final Integer DEFAULT_BATCH_SIZE = 1000;
+
+ private static final Integer DEFAULT_SHARDING_SIZE = 1000_0000;
+
+ private Integer workerThread = DEFAULT_WORKER_THREAD;
+
+ private Integer batchSize = DEFAULT_BATCH_SIZE;
+
+ private Integer shardingSize = DEFAULT_SHARDING_SIZE;
+
+ private YamlAlgorithmConfiguration rateLimiter;
+
+ /**
+ * Build with default value.
+ *
+ * @return input configuration
+ */
+ public static YamlPipelineInputConfiguration buildWithDefaultValue() {
+ return new YamlPipelineInputConfiguration();
+ }
+
+ /**
+ * Fill in null fields with default value.
+ */
+ public void fillInNullFieldsWithDefaultValue() {
+ if (null == workerThread) {
+ workerThread = DEFAULT_WORKER_THREAD;
+ }
+ if (null == batchSize) {
+ batchSize = DEFAULT_BATCH_SIZE;
+ }
+ if (null == shardingSize) {
+ shardingSize = DEFAULT_SHARDING_SIZE;
+ }
+ }
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
new file mode 100644
index 00000000000..578357cd1d0
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
+
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+
+/**
+ * YAML pipeline output configuration.
+ */
+@Data
+public final class YamlPipelineOutputConfiguration implements
YamlConfiguration {
+
+ private static final Integer DEFAULT_WORKER_THREAD = 40;
+
+ private static final Integer DEFAULT_BATCH_SIZE = 1000;
+
+ private Integer workerThread = DEFAULT_WORKER_THREAD;
+
+ private Integer batchSize = DEFAULT_BATCH_SIZE;
+
+ private YamlAlgorithmConfiguration rateLimiter;
+
+ /**
+ * Build with default value.
+ *
+ * @return output configuration
+ */
+ public static YamlPipelineOutputConfiguration buildWithDefaultValue() {
+ return new YamlPipelineOutputConfiguration();
+ }
+
+ /**
+ * Fill in null fields with default value.
+ */
+ public void fillInNullFieldsWithDefaultValue() {
+ if (null == workerThread) {
+ workerThread = DEFAULT_WORKER_THREAD;
+ }
+ if (null == batchSize) {
+ batchSize = DEFAULT_BATCH_SIZE;
+ }
+ }
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
index 14401d0ea67..2c742bc0d60 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
@@ -17,12 +17,13 @@
package org.apache.shardingsphere.infra.yaml.config.pojo.rule;
-import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
/**
* YAML on rule altered action configuration.
@@ -32,90 +33,13 @@ import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmC
@ToString
public final class YamlOnRuleAlteredActionConfiguration implements
YamlConfiguration {
- private YamlInputConfiguration input;
+ private YamlPipelineInputConfiguration input;
- private YamlOutputConfiguration output;
+ private YamlPipelineOutputConfiguration output;
private YamlAlgorithmConfiguration streamChannel;
private YamlAlgorithmConfiguration completionDetector;
private YamlAlgorithmConfiguration dataConsistencyChecker;
-
- @Data
- public static final class YamlInputConfiguration implements
YamlConfiguration {
-
- private static final Integer DEFAULT_WORKER_THREAD = 40;
-
- private static final Integer DEFAULT_BATCH_SIZE = 1000;
-
- private static final Integer DEFAULT_SHARDING_SIZE = 1000_0000;
-
- private Integer workerThread = DEFAULT_WORKER_THREAD;
-
- private Integer batchSize = DEFAULT_BATCH_SIZE;
-
- private Integer shardingSize = DEFAULT_SHARDING_SIZE;
-
- private YamlAlgorithmConfiguration rateLimiter;
-
- /**
- * Build with default value.
- *
- * @return input configuration
- */
- public static YamlInputConfiguration buildWithDefaultValue() {
- return new YamlInputConfiguration();
- }
-
- /**
- * Fill in null fields with default value.
- */
- public void fillInNullFieldsWithDefaultValue() {
- if (null == workerThread) {
- workerThread = DEFAULT_WORKER_THREAD;
- }
- if (null == batchSize) {
- batchSize = DEFAULT_BATCH_SIZE;
- }
- if (null == shardingSize) {
- shardingSize = DEFAULT_SHARDING_SIZE;
- }
- }
- }
-
- @Data
- public static final class YamlOutputConfiguration implements
YamlConfiguration {
-
- private static final Integer DEFAULT_WORKER_THREAD = 40;
-
- private static final Integer DEFAULT_BATCH_SIZE = 1000;
-
- private Integer workerThread = DEFAULT_WORKER_THREAD;
-
- private Integer batchSize = DEFAULT_BATCH_SIZE;
-
- private YamlAlgorithmConfiguration rateLimiter;
-
- /**
- * Build with default value.
- *
- * @return output configuration
- */
- public static YamlOutputConfiguration buildWithDefaultValue() {
- return new YamlOutputConfiguration();
- }
-
- /**
- * Fill in null fields with default value.
- */
- public void fillInNullFieldsWithDefaultValue() {
- if (null == workerThread) {
- workerThread = DEFAULT_WORKER_THREAD;
- }
- if (null == batchSize) {
- batchSize = DEFAULT_BATCH_SIZE;
- }
- }
- }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
new file mode 100644
index 00000000000..1152905d4d0
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import lombok.Data;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+
+/**
+ * YAML pipeline input configuration swapper.
+ */
+@Data
+public final class YamlPipelineInputConfigurationSwapper implements
YamlConfigurationSwapper<YamlPipelineInputConfiguration,
PipelineInputConfiguration> {
+
+ private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
+
+ @Override
+ public YamlPipelineInputConfiguration swapToYamlConfiguration(final
PipelineInputConfiguration data) {
+ if (null == data) {
+ return null;
+ }
+ YamlPipelineInputConfiguration result = new
YamlPipelineInputConfiguration();
+ result.setWorkerThread(data.getWorkerThread());
+ result.setBatchSize(data.getBatchSize());
+ result.setShardingSize(data.getShardingSize());
+
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+ return result;
+ }
+
+ @Override
+ public PipelineInputConfiguration swapToObject(final
YamlPipelineInputConfiguration yamlConfig) {
+ return null == yamlConfig
+ ? null
+ : new PipelineInputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
+
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+ }
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
new file mode 100644
index 00000000000..52f48ca2bc0
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import lombok.Data;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+
+/**
+ * YAML pipeline output configuration swapper.
+ */
+@Data
+public final class YamlPipelineOutputConfigurationSwapper implements
YamlConfigurationSwapper<YamlPipelineOutputConfiguration,
PipelineOutputConfiguration> {
+
+ private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
+
+ @Override
+ public YamlPipelineOutputConfiguration swapToYamlConfiguration(final
PipelineOutputConfiguration data) {
+ if (null == data) {
+ return null;
+ }
+ YamlPipelineOutputConfiguration result = new
YamlPipelineOutputConfiguration();
+ result.setWorkerThread(data.getWorkerThread());
+ result.setBatchSize(data.getBatchSize());
+
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
+ return result;
+ }
+
+ @Override
+ public PipelineOutputConfiguration swapToObject(final
YamlPipelineOutputConfiguration yamlConfig) {
+ return null == yamlConfig
+ ? null
+ : new
PipelineOutputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+ }
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
index 1702fb75672..5fca55ab5fa 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
@@ -18,13 +18,11 @@
package org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineOutputConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineInputConfigurationSwapper;
/**
* YAML on rule altered action configuration swapper.
@@ -33,9 +31,9 @@ public final class
YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
- private static final InputConfigurationSwapper INPUT_CONFIG_SWAPPER = new
InputConfigurationSwapper();
+ private static final YamlPipelineInputConfigurationSwapper
INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
- private static final OutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER =
new OutputConfigurationSwapper();
+ private static final YamlPipelineOutputConfigurationSwapper
OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
@Override
public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final
OnRuleAlteredActionConfiguration data) {
@@ -63,50 +61,4 @@ public final class
YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
}
-
- public static class InputConfigurationSwapper implements
YamlConfigurationSwapper<YamlInputConfiguration, InputConfiguration> {
-
- @Override
- public YamlInputConfiguration swapToYamlConfiguration(final
InputConfiguration data) {
- if (null == data) {
- return null;
- }
- YamlInputConfiguration result = new YamlInputConfiguration();
- result.setWorkerThread(data.getWorkerThread());
- result.setBatchSize(data.getBatchSize());
- result.setShardingSize(data.getShardingSize());
-
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
- return result;
- }
-
- @Override
- public InputConfiguration swapToObject(final YamlInputConfiguration
yamlConfig) {
- return null == yamlConfig
- ? null
- : new InputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
-
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
- }
- }
-
- public static class OutputConfigurationSwapper implements
YamlConfigurationSwapper<YamlOutputConfiguration, OutputConfiguration> {
-
- @Override
- public YamlOutputConfiguration swapToYamlConfiguration(final
OutputConfiguration data) {
- if (null == data) {
- return null;
- }
- YamlOutputConfiguration result = new YamlOutputConfiguration();
- result.setWorkerThread(data.getWorkerThread());
- result.setBatchSize(data.getBatchSize());
-
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
- return result;
- }
-
- @Override
- public OutputConfiguration swapToObject(final YamlOutputConfiguration
yamlConfig) {
- return null == yamlConfig
- ? null
- : new OutputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
- }
- }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
index 2bba1745967..42c6f5a4e9e 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
@@ -18,11 +18,11 @@
package org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.Test;
import java.util.Properties;
@@ -41,10 +41,10 @@ public final class
YamlOnRuleAlteredActionConfigurationSwapperTest {
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
- YamlInputConfiguration yamlInputConfig =
YamlInputConfiguration.buildWithDefaultValue();
+ YamlPipelineInputConfiguration yamlInputConfig =
YamlPipelineInputConfiguration.buildWithDefaultValue();
yamlConfig.setInput(yamlInputConfig);
yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT",
rateLimiterProps));
- YamlOutputConfiguration yamlOutputConfig =
YamlOutputConfiguration.buildWithDefaultValue();
+ YamlPipelineOutputConfiguration yamlOutputConfig =
YamlPipelineOutputConfiguration.buildWithDefaultValue();
yamlOutputConfig.setRateLimiter(new
YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
yamlConfig.setOutput(yamlOutputConfig);
Properties streamChannelProps = new Properties();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 6f2ed908b5f..b6e0c69070d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -31,13 +31,13 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration.YamlOutputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
import java.util.Properties;
@@ -80,10 +80,10 @@ public final class RuleAlteredContext {
public RuleAlteredContext(final String jobId, final
OnRuleAlteredActionConfiguration actionConfig) {
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig =
convertActionConfig(actionConfig);
this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
- InputConfiguration inputConfig = onRuleAlteredActionConfig.getInput();
+ PipelineInputConfiguration inputConfig =
onRuleAlteredActionConfig.getInput();
AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
inputRateLimitAlgorithm = null != inputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
- OutputConfiguration outputConfig =
onRuleAlteredActionConfig.getOutput();
+ PipelineOutputConfiguration outputConfig =
onRuleAlteredActionConfig.getOutput();
AlgorithmConfiguration outputRateLimiter =
outputConfig.getRateLimiter();
outputRateLimitAlgorithm = null != outputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
AlgorithmConfiguration streamChannel =
onRuleAlteredActionConfig.getStreamChannel();
@@ -102,12 +102,12 @@ public final class RuleAlteredContext {
private OnRuleAlteredActionConfiguration convertActionConfig(final
OnRuleAlteredActionConfiguration actionConfig) {
YamlOnRuleAlteredActionConfiguration yamlActionConfig =
SWAPPER.swapToYamlConfiguration(actionConfig);
if (null == yamlActionConfig.getInput()) {
-
yamlActionConfig.setInput(YamlInputConfiguration.buildWithDefaultValue());
+
yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
} else {
yamlActionConfig.getInput().fillInNullFieldsWithDefaultValue();
}
if (null == yamlActionConfig.getOutput()) {
-
yamlActionConfig.setOutput(YamlOutputConfiguration.buildWithDefaultValue());
+
yamlActionConfig.setOutput(YamlPipelineOutputConfiguration.buildWithDefaultValue());
} else {
yamlActionConfig.getOutput().fillInNullFieldsWithDefaultValue();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 2dc4079d45f..635711c39c8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -45,7 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredC
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
import
org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
import javax.sql.DataSource;
@@ -111,7 +111,7 @@ public final class InventoryTaskSplitter {
final
InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
RuleAlteredContext ruleAlteredContext =
jobContext.getRuleAlteredContext();
- InputConfiguration inputConfig =
ruleAlteredContext.getOnRuleAlteredActionConfig().getInput();
+ PipelineInputConfiguration inputConfig =
ruleAlteredContext.getOnRuleAlteredActionConfig().getInput();
int batchSize = inputConfig.getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm =
ruleAlteredContext.getInputRateLimitAlgorithm();
Collection<IngestPosition<?>> inventoryPositions =
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataLoader);