This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dd1c56886a7 Remove TransmissionProcessContext.pipelineChannelCreator
(#29549)
dd1c56886a7 is described below
commit dd1c56886a776b3029cd9b045a93637b9877e06c
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 26 15:43:17 2023 +0800
Remove TransmissionProcessContext.pipelineChannelCreator (#29549)
---
.../data/pipeline/core/context/PipelineProcessContext.java | 2 +-
.../pipeline/core/context/TransmissionProcessContext.java | 13 +++----------
.../core/preparer/inventory/InventoryTaskSplitter.java | 6 +++---
.../data/pipeline/core/task/PipelineTaskUtils.java | 13 ++++++++-----
.../org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 2 +-
.../data/pipeline/cdc/core/prepare/CDCJobPreparer.java | 5 +++--
.../context/ConsistencyCheckProcessContext.java | 2 +-
.../data/pipeline/scenario/migration/MigrationJob.java | 2 +-
.../scenario/migration/preparer/MigrationJobPreparer.java | 5 ++---
9 files changed, 23 insertions(+), 27 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
index 1284c64309a..b63c29eab64 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
@@ -29,5 +29,5 @@ public interface PipelineProcessContext extends AutoCloseable
{
*
* @return pipeline process config
*/
- PipelineProcessConfiguration getPipelineProcessConfig();
+ PipelineProcessConfiguration getProcessConfig();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java
index 3c129f4dd7c..cb312d7d8b2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java
@@ -20,12 +20,11 @@ package
org.apache.shardingsphere.data.pipeline.core.context;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
@@ -37,7 +36,7 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
public final class TransmissionProcessContext implements
PipelineProcessContext {
@Getter
- private final PipelineProcessConfiguration pipelineProcessConfig;
+ private final PipelineProcessConfiguration processConfig;
@Getter
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
@@ -45,9 +44,6 @@ public final class TransmissionProcessContext implements
PipelineProcessContext
@Getter
private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
- @Getter
- private final PipelineChannelCreator pipelineChannelCreator;
-
private final PipelineLazyInitializer<ExecuteEngine>
inventoryDumperExecuteEngineLazyInitializer;
private final PipelineLazyInitializer<ExecuteEngine>
inventoryImporterExecuteEngineLazyInitializer;
@@ -55,16 +51,13 @@ public final class TransmissionProcessContext implements
PipelineProcessContext
private final PipelineLazyInitializer<ExecuteEngine>
incrementalExecuteEngineLazyInitializer;
public TransmissionProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
- this.pipelineProcessConfig = processConfig;
+ this.processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
PipelineReadConfiguration readConfig = processConfig.getRead();
AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
readRateLimitAlgorithm = null == readRateLimiter ? null :
TypedSPILoader.getService(JobRateLimitAlgorithm.class,
readRateLimiter.getType(), readRateLimiter.getProps());
PipelineWriteConfiguration writeConfig = processConfig.getWrite();
AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
writeRateLimitAlgorithm = null == writeRateLimiter ? null :
TypedSPILoader.getService(JobRateLimitAlgorithm.class,
writeRateLimiter.getType(), writeRateLimiter.getProps());
- AlgorithmConfiguration streamChannel =
processConfig.getStreamChannel();
- pipelineChannelCreator =
TypedSPILoader.getService(PipelineChannelCreator.class,
streamChannel.getType(), streamChannel.getProps());
inventoryDumperExecuteEngineLazyInitializer = new
PipelineLazyInitializer<ExecuteEngine>() {
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
index 464b0ec048f..f53be4e4c85 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
@@ -83,7 +83,7 @@ public final class InventoryTaskSplitter {
TransmissionProcessContext processContext =
jobItemContext.getJobProcessContext();
for (InventoryDumperContext each :
splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
- PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
importerConfig.getBatchSize(), position);
+ PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfig().getStreamChannel(),
importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel,
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Importer importer = new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
result.add(new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
@@ -132,7 +132,7 @@ public final class InventoryTaskSplitter {
}
Collection<InventoryDumperContext> result = new LinkedList<>();
TransmissionProcessContext jobProcessContext =
jobItemContext.getJobProcessContext();
- PipelineReadConfiguration readConfig =
jobProcessContext.getPipelineProcessConfig().getRead();
+ PipelineReadConfiguration readConfig =
jobProcessContext.getProcessConfig().getRead();
int batchSize = readConfig.getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm =
jobProcessContext.getReadRateLimitAlgorithm();
Collection<IngestPosition> inventoryPositions =
getInventoryPositions(dumperContext, jobItemContext, dataSource);
@@ -188,7 +188,7 @@ public final class InventoryTaskSplitter {
}
Collection<IngestPosition> result = new LinkedList<>();
Range<Long> uniqueKeyValuesRange =
getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext);
- int shardingSize =
jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
+ int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfig().getRead().getShardingSize();
long splitCount = tableRecordsCount / shardingSize +
(tableRecordsCount % shardingSize > 0 ? 1 : 0);
long interval = (uniqueKeyValuesRange.getMaximum() -
uniqueKeyValuesRange.getMinimum()) / splitCount;
IntervalToRangeIterator rangeIterator = new
IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(),
uniqueKeyValuesRange.getMaximum(), interval);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index 74ba13eafc4..2f749f1c349 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -26,6 +26,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Invent
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@@ -66,24 +68,25 @@ public final class PipelineTaskUtils {
/**
* Create pipeline channel for inventory task.
*
- * @param channelCreator pipeline channel creator
+ * @param channelConfig pipeline channel configuration
* @param importerBatchSize importer batch size
* @param position ingest position
* @return created pipeline channel
*/
- public static PipelineChannel createInventoryChannel(final
PipelineChannelCreator channelCreator, final int importerBatchSize, final
AtomicReference<IngestPosition> position) {
- return channelCreator.newInstance(importerBatchSize, new
InventoryTaskAckCallback(position));
+ public static PipelineChannel createInventoryChannel(final
AlgorithmConfiguration channelConfig, final int importerBatchSize, final
AtomicReference<IngestPosition> position) {
+ return TypedSPILoader.getService(PipelineChannelCreator.class,
channelConfig.getType(),
channelConfig.getProps()).newInstance(importerBatchSize, new
InventoryTaskAckCallback(position));
}
/**
* Create pipeline channel for incremental task.
*
* @param concurrency output concurrency
- * @param channelCreator pipeline channel creator
+ * @param channelConfig pipeline channel configuration
* @param progress incremental task progress
* @return created pipeline channel
*/
- public static PipelineChannel createIncrementalChannel(final int
concurrency, final PipelineChannelCreator channelCreator, final
IncrementalTaskProgress progress) {
+ public static PipelineChannel createIncrementalChannel(final int
concurrency, final AlgorithmConfiguration channelConfig, final
IncrementalTaskProgress progress) {
+ PipelineChannelCreator channelCreator =
TypedSPILoader.getService(PipelineChannelCreator.class,
channelConfig.getType(), channelConfig.getProps());
return 1 == concurrency
? channelCreator.newInstance(5, new
IncrementalTaskAckCallback(progress))
: new MultiplexPipelineChannel(concurrency, channelCreator, 5,
new IncrementalTaskAckCallback(progress));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index f39a246d5f1..d234606fe95 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -94,7 +94,7 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
"STREAMING"));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
- CDCTaskConfiguration taskConfig =
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ CDCTaskConfiguration taskConfig =
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem,
jobProcessContext.getProcessConfig());
return new CDCJobItemContext((CDCJobConfiguration) jobConfig,
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig,
getJobRunnerManager().getDataSourceManager(), sink);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 4b5fd0e8301..a9a55734b89 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -118,7 +118,7 @@ public final class CDCJobPreparer {
for (InventoryDumperContext each : new
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()),
importerConfig)
.splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
- PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
importerConfig.getBatchSize(), position);
+ PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfig().getStreamChannel(),
importerConfig.getBatchSize(), position);
if (!(position.get() instanceof IngestFinishedPosition)) {
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
}
@@ -139,7 +139,8 @@ public final class CDCJobPreparer {
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
- PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getPipelineChannelCreator(),
taskProgress);
+ PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
+ importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
Dumper dumper =
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel,
jobItemContext.getSourceMetaDataLoader());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
index 3201b01d165..2d279047607 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
@@ -45,7 +45,7 @@ public final class ConsistencyCheckProcessContext implements
PipelineProcessCont
}
@Override
- public PipelineProcessConfiguration getPipelineProcessConfig() {
+ public PipelineProcessConfiguration getProcessConfig() {
return PipelineProcessConfigurationUtils.convertWithDefaultValue(null);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 8b4c7dceb6f..91563b587b2 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -58,7 +58,7 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
@Override
protected MigrationJobItemContext buildJobItemContext(final
MigrationJobConfiguration jobConfig,
final int
shardingItem, final TransmissionJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
- MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext, taskConfig,
getJobRunnerManager().getDataSourceManager());
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 5c2be177179..4150a87677e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -199,13 +198,13 @@ public final class MigrationJobPreparer {
private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
- PipelineChannelCreator pipelineChannelCreator =
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(),
jobItemContext.getInitProgress());
- PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(),
pipelineChannelCreator, taskProgress);
+ PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
+ importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(),
taskProgress);
Dumper dumper =
DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
Collection<Importer> importers = createImporters(importerConfig,
jobItemContext.getSink(), channel, jobItemContext);