This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new de9ad7e8f07 Rename PipelineProcessConfiguration related input and
output to read and write (#20261)
de9ad7e8f07 is described below
commit de9ad7e8f0757416c150c37b1f3ede31b4183ba9
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Aug 18 19:22:20 2022 +0800
Rename PipelineProcessConfiguration related input and output to read and
write (#20261)
* Rename PipelineProcessConfiguration related input and output to read and
write
* Continue rename
---
...hardingRuleAlteredJobConfigurationPreparer.java | 2 +-
.../ShardingScalingRuleStatementConverter.java | 16 ++++-----
.../ShardingScalingRulesQueryResultSetTest.java | 8 ++---
.../pipeline/PipelineProcessConfiguration.java | 4 +--
...uration.java => PipelineReadConfiguration.java} | 4 +--
...ration.java => PipelineWriteConfiguration.java} | 4 +--
.../OnRuleAlteredActionConfiguration.java | 8 ++---
.../pipeline/YamlPipelineProcessConfiguration.java | 4 +--
...ion.java => YamlPipelineReadConfiguration.java} | 10 +++---
...on.java => YamlPipelineWriteConfiguration.java} | 10 +++---
.../rule/YamlOnRuleAlteredActionConfiguration.java | 8 ++---
.../YamlPipelineProcessConfigurationSwapper.java | 12 +++----
...a => YamlPipelineReadConfigurationSwapper.java} | 16 ++++-----
... => YamlPipelineWriteConfigurationSwapper.java} | 16 ++++-----
...amlOnRuleAlteredActionConfigurationSwapper.java | 16 ++++-----
...amlPipelineProcessConfigurationSwapperTest.java | 12 +++----
...nRuleAlteredActionConfigurationSwapperTest.java | 8 ++---
.../api/context/PipelineProcessContext.java | 6 ++--
.../check/consistency/DataConsistencyChecker.java | 6 ++--
.../context/AbstractPipelineProcessContext.java | 40 +++++++++++-----------
.../core/prepare/InventoryTaskSplitter.java | 10 +++---
21 files changed, 110 insertions(+), 110 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 36d60d5262b..e611ae7c40f 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -239,7 +239,7 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
private static ImporterConfiguration createImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
- int batchSize = pipelineProcessConfig.getOutput().getBatchSize();
+ int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
retryTimes, concurrency);
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
index 09ecd1520f2..4dcf5416bfd 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
@@ -21,8 +21,8 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import
org.apache.shardingsphere.migration.distsql.statement.segment.InputOrOutputSegment;
import
org.apache.shardingsphere.migration.distsql.statement.segment.ShardingScalingRuleConfigurationSegment;
@@ -40,26 +40,26 @@ public final class ShardingScalingRuleStatementConverter {
* @return on rule altered action configuration
*/
public static OnRuleAlteredActionConfiguration convert(final
ShardingScalingRuleConfigurationSegment segment) {
- PipelineInputConfiguration inputConfig =
convertToInputConfiguration(segment.getInputSegment());
- PipelineOutputConfiguration outputConfig =
convertToOutputConfiguration(segment.getOutputSegment());
+ PipelineReadConfiguration inputConfig =
convertToInputConfiguration(segment.getInputSegment());
+ PipelineWriteConfiguration outputConfig =
convertToOutputConfiguration(segment.getOutputSegment());
AlgorithmConfiguration streamChannel =
convertToAlgorithm(segment.getStreamChannel());
AlgorithmConfiguration completionDetector =
convertToAlgorithm(segment.getCompletionDetector());
AlgorithmConfiguration dataConsistencyChecker =
convertToAlgorithm(segment.getDataConsistencyCalculator());
return new OnRuleAlteredActionConfiguration(inputConfig, outputConfig,
streamChannel, completionDetector, dataConsistencyChecker);
}
- private static PipelineInputConfiguration
convertToInputConfiguration(final InputOrOutputSegment inputSegment) {
+ private static PipelineReadConfiguration convertToInputConfiguration(final
InputOrOutputSegment inputSegment) {
if (null == inputSegment) {
return null;
}
- return new PipelineInputConfiguration(inputSegment.getWorkerThread(),
inputSegment.getBatchSize(), inputSegment.getShardingSize(),
convertToAlgorithm(inputSegment.getRateLimiter()));
+ return new PipelineReadConfiguration(inputSegment.getWorkerThread(),
inputSegment.getBatchSize(), inputSegment.getShardingSize(),
convertToAlgorithm(inputSegment.getRateLimiter()));
}
- private static PipelineOutputConfiguration
convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
+ private static PipelineWriteConfiguration
convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
if (null == outputSegment) {
return null;
}
- return new
PipelineOutputConfiguration(outputSegment.getWorkerThread(),
outputSegment.getBatchSize(),
convertToAlgorithm(outputSegment.getRateLimiter()));
+ return new PipelineWriteConfiguration(outputSegment.getWorkerThread(),
outputSegment.getBatchSize(),
convertToAlgorithm(outputSegment.getRateLimiter()));
}
private static AlgorithmConfiguration convertToAlgorithm(final
AlgorithmSegment segment) {
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
index 8b37b741c0f..5ba69104f48 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
@@ -19,8 +19,8 @@ package
org.apache.shardingsphere.scaling.distsql.handler.query;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.migration.distsql.handler.query.ShardingScalingRulesQueryResultSet;
@@ -82,8 +82,8 @@ public final class ShardingScalingRulesQueryResultSetTest {
}
private OnRuleAlteredActionConfiguration buildCompleteConfiguration() {
- PipelineInputConfiguration inputConfig = new
PipelineInputConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS",
createProperties("qps", "50")));
- PipelineOutputConfiguration outputConfig = new
PipelineOutputConfiguration(10, 100, new AlgorithmConfiguration("TPS",
createProperties("tps", "2000")));
+ PipelineReadConfiguration inputConfig = new
PipelineReadConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS",
createProperties("qps", "50")));
+ PipelineWriteConfiguration outputConfig = new
PipelineWriteConfiguration(10, 100, new AlgorithmConfiguration("TPS",
createProperties("tps", "2000")));
AlgorithmConfiguration streamChannel = new
AlgorithmConfiguration("MEMORY", createProperties("block-queue-size", "10000"));
AlgorithmConfiguration completionDetector = new
AlgorithmConfiguration("IDLE",
createProperties("incremental-task-idle-seconds-threshold", "1800"));
AlgorithmConfiguration dataConsistencyChecker = new
AlgorithmConfiguration("DATA_MATCH", createProperties("chunk-size", "1000"));
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
index 928960d3026..a9932adafda 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
@@ -30,9 +30,9 @@ import
org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
@ToString
public final class PipelineProcessConfiguration {
- private final PipelineInputConfiguration input;
+ private final PipelineReadConfiguration read;
- private final PipelineOutputConfiguration output;
+ private final PipelineWriteConfiguration write;
private final AlgorithmConfiguration streamChannel;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
similarity index 94%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
index d723b011eec..207af4c44e6 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
@@ -23,12 +23,12 @@ import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
/**
- * Pipeline input configuration.
+ * Pipeline read configuration.
*/
@RequiredArgsConstructor
@Getter
@ToString
-public final class PipelineInputConfiguration {
+public final class PipelineReadConfiguration {
private final Integer workerThread;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
similarity index 93%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
index af617327bc1..e9ff81a53d0 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
@@ -23,12 +23,12 @@ import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
/**
- * Pipeline output configuration.
+ * Pipeline write configuration.
*/
@RequiredArgsConstructor
@Getter
@ToString
-public final class PipelineOutputConfiguration {
+public final class PipelineWriteConfiguration {
private final Integer workerThread;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
index bcc6dd0451a..a7f77e8de44 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -21,8 +21,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
/**
* On rule altered action configuration.
@@ -32,9 +32,9 @@ import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputC
@ToString
public final class OnRuleAlteredActionConfiguration {
- private final PipelineInputConfiguration input;
+ private final PipelineReadConfiguration input;
- private final PipelineOutputConfiguration output;
+ private final PipelineWriteConfiguration output;
private final AlgorithmConfiguration streamChannel;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
index 71bb47943e6..3cd18b1864e 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -31,9 +31,9 @@ import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmC
@ToString
public final class YamlPipelineProcessConfiguration implements
YamlConfiguration {
- private YamlPipelineInputConfiguration input;
+ private YamlPipelineReadConfiguration read;
- private YamlPipelineOutputConfiguration output;
+ private YamlPipelineWriteConfiguration write;
private YamlAlgorithmConfiguration streamChannel;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
similarity index 87%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
index 3c7ffd0bbc2..4e13aabca34 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
@@ -22,10 +22,10 @@ import
org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
/**
- * YAML pipeline input configuration.
+ * YAML pipeline read configuration.
*/
@Data
-public final class YamlPipelineInputConfiguration implements YamlConfiguration
{
+public final class YamlPipelineReadConfiguration implements YamlConfiguration {
private static final Integer DEFAULT_WORKER_THREAD = 40;
@@ -44,10 +44,10 @@ public final class YamlPipelineInputConfiguration
implements YamlConfiguration {
/**
* Build with default value.
*
- * @return input configuration
+ * @return read configuration
*/
- public static YamlPipelineInputConfiguration buildWithDefaultValue() {
- return new YamlPipelineInputConfiguration();
+ public static YamlPipelineReadConfiguration buildWithDefaultValue() {
+ return new YamlPipelineReadConfiguration();
}
/**
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
similarity index 85%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
index 578357cd1d0..fb5460ae6b6 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
@@ -22,10 +22,10 @@ import
org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
/**
- * YAML pipeline output configuration.
+ * YAML pipeline write configuration.
*/
@Data
-public final class YamlPipelineOutputConfiguration implements
YamlConfiguration {
+public final class YamlPipelineWriteConfiguration implements YamlConfiguration
{
private static final Integer DEFAULT_WORKER_THREAD = 40;
@@ -40,10 +40,10 @@ public final class YamlPipelineOutputConfiguration
implements YamlConfiguration
/**
* Build with default value.
*
- * @return output configuration
+ * @return write configuration
*/
- public static YamlPipelineOutputConfiguration buildWithDefaultValue() {
- return new YamlPipelineOutputConfiguration();
+ public static YamlPipelineWriteConfiguration buildWithDefaultValue() {
+ return new YamlPipelineWriteConfiguration();
}
/**
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
index 2c742bc0d60..93cda75fdca 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
@@ -22,8 +22,8 @@ import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
/**
* YAML on rule altered action configuration.
@@ -33,9 +33,9 @@ import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipeli
@ToString
public final class YamlOnRuleAlteredActionConfiguration implements
YamlConfiguration {
- private YamlPipelineInputConfiguration input;
+ private YamlPipelineReadConfiguration input;
- private YamlPipelineOutputConfiguration output;
+ private YamlPipelineWriteConfiguration output;
private YamlAlgorithmConfiguration streamChannel;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
index 16154b6f2fc..7c1f7cf4950 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
@@ -29,9 +29,9 @@ public final class YamlPipelineProcessConfigurationSwapper
implements YamlConfig
private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
- private static final YamlPipelineInputConfigurationSwapper
INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+ private static final YamlPipelineReadConfigurationSwapper
READ_CONFIG_SWAPPER = new YamlPipelineReadConfigurationSwapper();
- private static final YamlPipelineOutputConfigurationSwapper
OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+ private static final YamlPipelineWriteConfigurationSwapper
WRITE_CONFIG_SWAPPER = new YamlPipelineWriteConfigurationSwapper();
@Override
public YamlPipelineProcessConfiguration swapToYamlConfiguration(final
PipelineProcessConfiguration data) {
@@ -39,8 +39,8 @@ public final class YamlPipelineProcessConfigurationSwapper
implements YamlConfig
return null;
}
YamlPipelineProcessConfiguration result = new
YamlPipelineProcessConfiguration();
-
result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
-
result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+
result.setRead(READ_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRead()));
+
result.setWrite(WRITE_CONFIG_SWAPPER.swapToYamlConfiguration(data.getWrite()));
result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
return result;
}
@@ -51,8 +51,8 @@ public final class YamlPipelineProcessConfigurationSwapper
implements YamlConfig
return null;
}
return new PipelineProcessConfiguration(
- INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
- OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+ READ_CONFIG_SWAPPER.swapToObject(yamlConfig.getRead()),
+ WRITE_CONFIG_SWAPPER.swapToObject(yamlConfig.getWrite()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()));
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
similarity index 73%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
index ba705d67a15..28e60dc3eee 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
@@ -18,25 +18,25 @@
package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
import lombok.Data;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
/**
- * YAML pipeline input configuration swapper.
+ * YAML pipeline read configuration swapper.
*/
@Data
-public final class YamlPipelineInputConfigurationSwapper implements
YamlConfigurationSwapper<YamlPipelineInputConfiguration,
PipelineInputConfiguration> {
+public final class YamlPipelineReadConfigurationSwapper implements
YamlConfigurationSwapper<YamlPipelineReadConfiguration,
PipelineReadConfiguration> {
private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
@Override
- public YamlPipelineInputConfiguration swapToYamlConfiguration(final
PipelineInputConfiguration data) {
+ public YamlPipelineReadConfiguration swapToYamlConfiguration(final
PipelineReadConfiguration data) {
if (null == data) {
return null;
}
- YamlPipelineInputConfiguration result = new
YamlPipelineInputConfiguration();
+ YamlPipelineReadConfiguration result = new
YamlPipelineReadConfiguration();
result.setWorkerThread(data.getWorkerThread());
result.setBatchSize(data.getBatchSize());
result.setShardingSize(data.getShardingSize());
@@ -45,10 +45,10 @@ public final class YamlPipelineInputConfigurationSwapper
implements YamlConfigur
}
@Override
- public PipelineInputConfiguration swapToObject(final
YamlPipelineInputConfiguration yamlConfig) {
+ public PipelineReadConfiguration swapToObject(final
YamlPipelineReadConfiguration yamlConfig) {
return null == yamlConfig
? null
- : new PipelineInputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
+ : new PipelineReadConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
similarity index 70%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
index de24408ea96..a86c8b4e4f0 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
@@ -18,25 +18,25 @@
package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
import lombok.Data;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
/**
- * YAML pipeline output configuration swapper.
+ * YAML pipeline write configuration swapper.
*/
@Data
-public final class YamlPipelineOutputConfigurationSwapper implements
YamlConfigurationSwapper<YamlPipelineOutputConfiguration,
PipelineOutputConfiguration> {
+public final class YamlPipelineWriteConfigurationSwapper implements
YamlConfigurationSwapper<YamlPipelineWriteConfiguration,
PipelineWriteConfiguration> {
private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
@Override
- public YamlPipelineOutputConfiguration swapToYamlConfiguration(final
PipelineOutputConfiguration data) {
+ public YamlPipelineWriteConfiguration swapToYamlConfiguration(final
PipelineWriteConfiguration data) {
if (null == data) {
return null;
}
- YamlPipelineOutputConfiguration result = new
YamlPipelineOutputConfiguration();
+ YamlPipelineWriteConfiguration result = new
YamlPipelineWriteConfiguration();
result.setWorkerThread(data.getWorkerThread());
result.setBatchSize(data.getBatchSize());
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
@@ -44,9 +44,9 @@ public final class YamlPipelineOutputConfigurationSwapper
implements YamlConfigu
}
@Override
- public PipelineOutputConfiguration swapToObject(final
YamlPipelineOutputConfiguration yamlConfig) {
+ public PipelineWriteConfiguration swapToObject(final
YamlPipelineWriteConfiguration yamlConfig) {
return null == yamlConfig
? null
- : new
PipelineOutputConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+ : new PipelineWriteConfiguration(yamlConfig.getWorkerThread(),
yamlConfig.getBatchSize(),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
index f5deaeaba24..99ccbade6fa 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
@@ -21,8 +21,8 @@ import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActi
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineOutputConfigurationSwapper;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineInputConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineWriteConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineReadConfigurationSwapper;
/**
* YAML on rule altered action configuration swapper.
@@ -31,9 +31,9 @@ public final class
YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
- private static final YamlPipelineInputConfigurationSwapper
INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+ private static final YamlPipelineReadConfigurationSwapper
READ_CONFIG_SWAPPER = new YamlPipelineReadConfigurationSwapper();
- private static final YamlPipelineOutputConfigurationSwapper
OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+ private static final YamlPipelineWriteConfigurationSwapper
WRITE_CONFIG_SWAPPER = new YamlPipelineWriteConfigurationSwapper();
@Override
public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final
OnRuleAlteredActionConfiguration data) {
@@ -41,8 +41,8 @@ public final class
YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
return null;
}
YamlOnRuleAlteredActionConfiguration result = new
YamlOnRuleAlteredActionConfiguration();
-
result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
-
result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+
result.setInput(READ_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
+
result.setOutput(WRITE_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
result.setCompletionDetector(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
result.setDataConsistencyChecker(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyCalculator()));
@@ -55,8 +55,8 @@ public final class
YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
return null;
}
return new OnRuleAlteredActionConfiguration(
- INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
- OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+ READ_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
+ WRITE_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
index 8558f0197c1..1fc13fa0082 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
import org.junit.Test;
@@ -41,12 +41,12 @@ public final class
YamlPipelineProcessConfigurationSwapperTest {
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
- YamlPipelineInputConfiguration yamlInputConfig =
YamlPipelineInputConfiguration.buildWithDefaultValue();
- yamlConfig.setInput(yamlInputConfig);
+ YamlPipelineReadConfiguration yamlInputConfig =
YamlPipelineReadConfiguration.buildWithDefaultValue();
+ yamlConfig.setRead(yamlInputConfig);
yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT",
rateLimiterProps));
- YamlPipelineOutputConfiguration yamlOutputConfig =
YamlPipelineOutputConfiguration.buildWithDefaultValue();
+ YamlPipelineWriteConfiguration yamlOutputConfig =
YamlPipelineWriteConfiguration.buildWithDefaultValue();
yamlOutputConfig.setRateLimiter(new
YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
- yamlConfig.setOutput(yamlOutputConfig);
+ yamlConfig.setWrite(yamlOutputConfig);
Properties streamChannelProps = new Properties();
streamChannelProps.setProperty("block-queue-size", "10000");
yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY",
streamChannelProps));
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
index 42c6f5a4e9e..2cc32ed1c3a 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
import org.junit.Test;
@@ -41,10 +41,10 @@ public final class
YamlOnRuleAlteredActionConfigurationSwapperTest {
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
- YamlPipelineInputConfiguration yamlInputConfig =
YamlPipelineInputConfiguration.buildWithDefaultValue();
+ YamlPipelineReadConfiguration yamlInputConfig =
YamlPipelineReadConfiguration.buildWithDefaultValue();
yamlConfig.setInput(yamlInputConfig);
yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT",
rateLimiterProps));
- YamlPipelineOutputConfiguration yamlOutputConfig =
YamlPipelineOutputConfiguration.buildWithDefaultValue();
+ YamlPipelineWriteConfiguration yamlOutputConfig =
YamlPipelineWriteConfiguration.buildWithDefaultValue();
yamlOutputConfig.setRateLimiter(new
YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
yamlConfig.setOutput(yamlOutputConfig);
Properties streamChannelProps = new Properties();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index a0138e7ba4b..ab015372778 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -41,9 +41,9 @@ public interface PipelineProcessContext {
PipelineChannelCreator getPipelineChannelCreator();
/**
- * Get job input rate limit algorithm.
+ * Get job read rate limit algorithm.
*
- * @return job input rate limit algorithm
+ * @return job read rate limit algorithm
*/
- JobRateLimitAlgorithm getInputRateLimitAlgorithm();
+ JobRateLimitAlgorithm getReadRateLimitAlgorithm();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 3a18c77820c..08f2e2a0631 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -157,7 +157,7 @@ public final class DataConsistencyChecker {
PipelineDataSourceConfiguration targetDataSourceConfig =
jobConfig.getTarget();
ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" +
getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- JobRateLimitAlgorithm inputRateLimitAlgorithm =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
+ JobRateLimitAlgorithm readRateLimitAlgorithm =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getReadRateLimitAlgorithm();
Map<String, DataConsistencyContentCheckResult> result = new
HashMap<>(logicTableNames.size(), 1);
try (
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
@@ -177,8 +177,8 @@ public final class DataConsistencyChecker {
Iterator<Object> targetCalculatedResults =
calculator.calculate(targetParameter).iterator();
boolean contentMatched = true;
while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
- if (null != inputRateLimitAlgorithm) {
-
inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+ if (null != readRateLimitAlgorithm) {
+
readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
Future<Object> sourceFuture =
executor.submit(sourceCalculatedResults::next);
Future<Object> targetFuture =
executor.submit(targetCalculatedResults::next);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
index 4178c1dbea0..774ca9e6145 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
@@ -30,12 +30,12 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
@@ -52,9 +52,9 @@ public abstract class AbstractPipelineProcessContext
implements PipelineProcessC
private final PipelineProcessConfiguration pipelineProcessConfig;
- private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
+ private final JobRateLimitAlgorithm readRateLimitAlgorithm;
- private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
+ private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
private final PipelineChannelCreator pipelineChannelCreator;
@@ -67,19 +67,19 @@ public abstract class AbstractPipelineProcessContext
implements PipelineProcessC
public AbstractPipelineProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig =
convertProcessConfig(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
- PipelineInputConfiguration inputConfig = processConfig.getInput();
- AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
- inputRateLimitAlgorithm = null != inputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
- PipelineOutputConfiguration outputConfig = processConfig.getOutput();
- AlgorithmConfiguration outputRateLimiter =
outputConfig.getRateLimiter();
- outputRateLimitAlgorithm = null != outputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
+ PipelineReadConfiguration readConfig = processConfig.getRead();
+ AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
+ readRateLimitAlgorithm = null != readRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(readRateLimiter) : null;
+ PipelineWriteConfiguration writeConfig = processConfig.getWrite();
+ AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
+ writeRateLimitAlgorithm = null != writeRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(writeRateLimiter) : null;
AlgorithmConfiguration streamChannel =
processConfig.getStreamChannel();
pipelineChannelCreator =
PipelineChannelCreatorFactory.newInstance(streamChannel);
inventoryDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
@Override
protected ExecuteEngine initialize() {
- return
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(),
"Inventory-" + jobId);
+ return
ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-"
+ jobId);
}
};
incrementalDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
@@ -93,22 +93,22 @@ public abstract class AbstractPipelineProcessContext
implements PipelineProcessC
@Override
protected ExecuteEngine initialize() {
- return
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(),
"Importer-" + jobId);
+ return
ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-"
+ jobId);
}
};
}
private PipelineProcessConfiguration convertProcessConfig(final
PipelineProcessConfiguration originalProcessConfig) {
YamlPipelineProcessConfiguration yamlActionConfig =
SWAPPER.swapToYamlConfiguration(originalProcessConfig);
- if (null == yamlActionConfig.getInput()) {
-
yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
+ if (null == yamlActionConfig.getRead()) {
+
yamlActionConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
} else {
- yamlActionConfig.getInput().fillInNullFieldsWithDefaultValue();
+ yamlActionConfig.getRead().fillInNullFieldsWithDefaultValue();
}
- if (null == yamlActionConfig.getOutput()) {
-
yamlActionConfig.setOutput(YamlPipelineOutputConfiguration.buildWithDefaultValue());
+ if (null == yamlActionConfig.getWrite()) {
+
yamlActionConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
} else {
- yamlActionConfig.getOutput().fillInNullFieldsWithDefaultValue();
+ yamlActionConfig.getWrite().fillInNullFieldsWithDefaultValue();
}
if (null == yamlActionConfig.getStreamChannel()) {
yamlActionConfig.setStreamChannel(new
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new
Properties()));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 072ea2756bf..7769110cd9d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -48,7 +48,7 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -120,9 +120,9 @@ public final class InventoryTaskSplitter {
final
InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
PipelineProcessContext ruleAlteredContext =
jobItemContext.getJobProcessContext();
- PipelineInputConfiguration inputConfig =
ruleAlteredContext.getPipelineProcessConfig().getInput();
- int batchSize = inputConfig.getBatchSize();
- JobRateLimitAlgorithm rateLimitAlgorithm =
ruleAlteredContext.getInputRateLimitAlgorithm();
+ PipelineReadConfiguration readConfig =
ruleAlteredContext.getPipelineProcessConfig().getRead();
+ int batchSize = readConfig.getBatchSize();
+ JobRateLimitAlgorithm rateLimitAlgorithm =
ruleAlteredContext.getReadRateLimitAlgorithm();
Collection<IngestPosition<?>> inventoryPositions =
getInventoryPositions(jobItemContext, dumperConfig, dataSource, metaDataLoader);
int i = 0;
for (IngestPosition<?> inventoryPosition : inventoryPositions) {
@@ -201,7 +201,7 @@ public final class InventoryTaskSplitter {
PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
String sql =
PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), dumperConfig.getUniqueKey());
- int shardingSize =
jobItemContext.getJobProcessContext().getPipelineProcessConfig().getInput().getShardingSize();
+ int shardingSize =
jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
try (
Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {