This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6a53bf7e5e9 Extract InventoryIncrementalProcessContext to simplify
PipelineProcessContext (#20884)
6a53bf7e5e9 is described below
commit 6a53bf7e5e9546403d0c218bb78ee474e44bed0c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 8 21:42:00 2022 +0800
Extract InventoryIncrementalProcessContext to simplify
PipelineProcessContext (#20884)
---
.../api/context/PipelineProcessContext.java | 23 ----------------------
...bstractInventoryIncrementalProcessContext.java} | 7 +++----
.../InventoryIncrementalJobItemContext.java | 3 +++
.../InventoryIncrementalProcessContext.java} | 15 ++++----------
.../core/prepare/InventoryTaskSplitter.java | 20 +++++++++----------
.../scenario/migration/MigrationJobAPIImpl.java | 3 +--
.../migration/MigrationProcessContext.java | 4 ++--
7 files changed, 23 insertions(+), 52 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index 39a166061d0..0ee57cb7276 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -17,8 +17,6 @@
package org.apache.shardingsphere.data.pipeline.api.context;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
/**
@@ -32,25 +30,4 @@ public interface PipelineProcessContext {
* @return pipeline process config
*/
PipelineProcessConfiguration getPipelineProcessConfig();
-
- /**
- * Get pipeline channel creator.
- *
- * @return pipeline channel creator
- */
- PipelineChannelCreator getPipelineChannelCreator();
-
- /**
- * Get job read rate limit algorithm.
- *
- * @return job read rate limit algorithm
- */
- JobRateLimitAlgorithm getReadRateLimitAlgorithm();
-
- /**
- * Get job write rate limit algorithm.
- *
- * @return job write rate limit algorithm
- */
- JobRateLimitAlgorithm getWriteRateLimitAlgorithm();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
similarity index 94%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
index 7079eeb5c9d..3497e683a6d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
@@ -22,7 +22,6 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
@@ -35,11 +34,11 @@ import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadCon
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
/**
- * Abstract pipeline process context.
+ * Abstract inventory incremental process context.
*/
@Getter
@Slf4j
-public abstract class AbstractPipelineProcessContext implements
PipelineProcessContext {
+public abstract class AbstractInventoryIncrementalProcessContext implements
InventoryIncrementalProcessContext {
private final PipelineProcessConfiguration pipelineProcessConfig;
@@ -55,7 +54,7 @@ public abstract class AbstractPipelineProcessContext
implements PipelineProcessC
private final LazyInitializer<ExecuteEngine>
importerExecuteEngineLazyInitializer;
- public AbstractPipelineProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
+ public AbstractInventoryIncrementalProcessContext(final String jobId,
final PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
PipelineReadConfiguration readConfig = processConfig.getRead();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 91637df225d..36a153ff08d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -28,6 +28,9 @@ import java.util.Collection;
*/
public interface InventoryIncrementalJobItemContext extends
PipelineJobItemContext {
+ @Override
+ InventoryIncrementalProcessContext getJobProcessContext();
+
/**
* Get inventory tasks.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalProcessContext.java
similarity index 79%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalProcessContext.java
index 39a166061d0..b3cdbf66e10 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalProcessContext.java
@@ -15,23 +15,16 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.context;
+package org.apache.shardingsphere.data.pipeline.core.context;
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
/**
- * Pipeline process context.
+ * Inventory incremental process context.
*/
-public interface PipelineProcessContext {
-
- /**
- * Get pipeline process config.
- *
- * @return pipeline process config
- */
- PipelineProcessConfiguration getPipelineProcessConfig();
+public interface InventoryIncrementalProcessContext extends
PipelineProcessContext {
/**
* Get pipeline channel creator.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index e10415451c0..e8e6c315534 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -23,8 +23,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -39,6 +37,8 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
@@ -86,7 +86,7 @@ public final class InventoryTaskSplitter {
* @param jobItemContext job item context
* @return split inventory data task
*/
- public List<InventoryTask> splitInventoryData(final PipelineJobItemContext
jobItemContext) {
+ public List<InventoryTask> splitInventoryData(final
InventoryIncrementalJobItemContext jobItemContext) {
List<InventoryTask> result = new LinkedList<>();
PipelineChannelCreator pipelineChannelCreator =
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
DefaultPipelineJobProgressListener jobProgressListener = new
DefaultPipelineJobProgressListener(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
@@ -97,10 +97,10 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<InventoryDumperConfiguration> splitDumperConfig(final
PipelineJobItemContext jobItemContext, final DumperConfiguration dumperConfig) {
+ private Collection<InventoryDumperConfiguration> splitDumperConfig(final
InventoryIncrementalJobItemContext jobItemContext, final DumperConfiguration
dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
- result.addAll(splitByPrimaryKey(jobItemContext, sourceDataSource,
metaDataLoader, each));
+ result.addAll(splitByPrimaryKey(each, jobItemContext,
sourceDataSource, metaDataLoader));
}
return result;
}
@@ -118,10 +118,10 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final
PipelineJobItemContext jobItemContext, final DataSource dataSource, final
PipelineTableMetaDataLoader metaDataLoader,
- final
InventoryDumperConfiguration dumperConfig) {
+ private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final
InventoryDumperConfiguration dumperConfig, final
InventoryIncrementalJobItemContext jobItemContext,
+ final
DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
- PipelineProcessContext jobProcessContext =
jobItemContext.getJobProcessContext();
+ InventoryIncrementalProcessContext jobProcessContext =
jobItemContext.getJobProcessContext();
PipelineReadConfiguration readConfig =
jobProcessContext.getPipelineProcessConfig().getRead();
int batchSize = readConfig.getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm =
jobProcessContext.getReadRateLimitAlgorithm();
@@ -142,7 +142,7 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<IngestPosition<?>> getInventoryPositions(final
PipelineJobItemContext jobItemContext, final InventoryDumperConfiguration
dumperConfig,
+ private Collection<IngestPosition<?>> getInventoryPositions(final
InventoryIncrementalJobItemContext jobItemContext, final
InventoryDumperConfiguration dumperConfig,
final
DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
String schemaName = dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
@@ -197,7 +197,7 @@ public final class InventoryTaskSplitter {
String.format("Can not split range for table %s, reason: table
contains multiple unique index or unique index contains nullable/multiple
column(s)", tableName));
}
- private Collection<IngestPosition<?>>
getPositionByIntegerPrimaryKeyRange(final PipelineJobItemContext
jobItemContext, final DataSource dataSource,
+ private Collection<IngestPosition<?>>
getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext
jobItemContext, final DataSource dataSource,
final InventoryDumperConfiguration dumperConfig) {
Collection<IngestPosition<?>> result = new LinkedList<>();
PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index df60ac463b9..e1f1e62dc49 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -34,7 +34,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -235,7 +234,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
- PipelineProcessContext migrationProcessContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+ MigrationProcessContext migrationProcessContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
return new ImporterConfiguration(jobConfig.getTarget(),
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
migrationProcessContext.getWriteRateLimitAlgorithm(),
retryTimes, concurrency);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index 058fae8e271..aa2db5d4be6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.context.AbstractPipelineProcessContext;
+import
org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryIncrementalProcessContext;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
/**
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcess
*/
@Getter
@Slf4j
-public final class MigrationProcessContext extends
AbstractPipelineProcessContext {
+public final class MigrationProcessContext extends
AbstractInventoryIncrementalProcessContext {
public MigrationProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
super(jobId, originalProcessConfig);