This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a53bf7e5e9 Extract InventoryIncrementalProcessContext to simplify 
PipelineProcessContext (#20884)
6a53bf7e5e9 is described below

commit 6a53bf7e5e9546403d0c218bb78ee474e44bed0c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 8 21:42:00 2022 +0800

    Extract InventoryIncrementalProcessContext to simplify 
PipelineProcessContext (#20884)
---
 .../api/context/PipelineProcessContext.java        | 23 ----------------------
 ...bstractInventoryIncrementalProcessContext.java} |  7 +++----
 .../InventoryIncrementalJobItemContext.java        |  3 +++
 .../InventoryIncrementalProcessContext.java}       | 15 ++++----------
 .../core/prepare/InventoryTaskSplitter.java        | 20 +++++++++----------
 .../scenario/migration/MigrationJobAPIImpl.java    |  3 +--
 .../migration/MigrationProcessContext.java         |  4 ++--
 7 files changed, 23 insertions(+), 52 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index 39a166061d0..0ee57cb7276 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -17,8 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.api.context;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 
 /**
@@ -32,25 +30,4 @@ public interface PipelineProcessContext {
      * @return pipeline process config
      */
     PipelineProcessConfiguration getPipelineProcessConfig();
-    
-    /**
-     * Get pipeline channel creator.
-     *
-     * @return pipeline channel creator
-     */
-    PipelineChannelCreator getPipelineChannelCreator();
-    
-    /**
-     * Get job read rate limit algorithm.
-     *
-     * @return job read rate limit algorithm
-     */
-    JobRateLimitAlgorithm getReadRateLimitAlgorithm();
-    
-    /**
-     * Get job write rate limit algorithm.
-     *
-     * @return job write rate limit algorithm
-     */
-    JobRateLimitAlgorithm getWriteRateLimitAlgorithm();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
similarity index 94%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
index 7079eeb5c9d..3497e683a6d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
@@ -22,7 +22,6 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
@@ -35,11 +34,11 @@ import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadCon
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
 
 /**
- * Abstract pipeline process context.
+ * Abstract inventory incremental process context.
  */
 @Getter
 @Slf4j
-public abstract class AbstractPipelineProcessContext implements 
PipelineProcessContext {
+public abstract class AbstractInventoryIncrementalProcessContext implements 
InventoryIncrementalProcessContext {
     
     private final PipelineProcessConfiguration pipelineProcessConfig;
     
@@ -55,7 +54,7 @@ public abstract class AbstractPipelineProcessContext 
implements PipelineProcessC
     
     private final LazyInitializer<ExecuteEngine> 
importerExecuteEngineLazyInitializer;
     
-    public AbstractPipelineProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
+    public AbstractInventoryIncrementalProcessContext(final String jobId, 
final PipelineProcessConfiguration originalProcessConfig) {
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
         this.pipelineProcessConfig = processConfig;
         PipelineReadConfiguration readConfig = processConfig.getRead();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 91637df225d..36a153ff08d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -28,6 +28,9 @@ import java.util.Collection;
  */
 public interface InventoryIncrementalJobItemContext extends 
PipelineJobItemContext {
     
+    @Override
+    InventoryIncrementalProcessContext getJobProcessContext();
+    
     /**
      * Get inventory tasks.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalProcessContext.java
similarity index 79%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalProcessContext.java
index 39a166061d0..b3cdbf66e10 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalProcessContext.java
@@ -15,23 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.context;
+package org.apache.shardingsphere.data.pipeline.core.context;
 
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 
 /**
- * Pipeline process context.
+ * Inventory incremental process context.
  */
-public interface PipelineProcessContext {
-    
-    /**
-     * Get pipeline process config.
-     *
-     * @return pipeline process config
-     */
-    PipelineProcessConfiguration getPipelineProcessConfig();
+public interface InventoryIncrementalProcessContext extends 
PipelineProcessContext {
     
     /**
      * Get pipeline channel creator.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index e10415451c0..e8e6c315534 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -23,8 +23,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
@@ -39,6 +37,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
@@ -86,7 +86,7 @@ public final class InventoryTaskSplitter {
      * @param jobItemContext job item context
      * @return split inventory data task
      */
-    public List<InventoryTask> splitInventoryData(final PipelineJobItemContext 
jobItemContext) {
+    public List<InventoryTask> splitInventoryData(final 
InventoryIncrementalJobItemContext jobItemContext) {
         List<InventoryTask> result = new LinkedList<>();
         PipelineChannelCreator pipelineChannelCreator = 
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
         DefaultPipelineJobProgressListener jobProgressListener = new 
DefaultPipelineJobProgressListener(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
@@ -97,10 +97,10 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitDumperConfig(final 
PipelineJobItemContext jobItemContext, final DumperConfiguration dumperConfig) {
+    private Collection<InventoryDumperConfiguration> splitDumperConfig(final 
InventoryIncrementalJobItemContext jobItemContext, final DumperConfiguration 
dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
-            result.addAll(splitByPrimaryKey(jobItemContext, sourceDataSource, 
metaDataLoader, each));
+            result.addAll(splitByPrimaryKey(each, jobItemContext, 
sourceDataSource, metaDataLoader));
         }
         return result;
     }
@@ -118,10 +118,10 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final 
PipelineJobItemContext jobItemContext, final DataSource dataSource, final 
PipelineTableMetaDataLoader metaDataLoader,
-                                                                       final 
InventoryDumperConfiguration dumperConfig) {
+    private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final 
InventoryDumperConfiguration dumperConfig, final 
InventoryIncrementalJobItemContext jobItemContext,
+                                                                       final 
DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
-        PipelineProcessContext jobProcessContext = 
jobItemContext.getJobProcessContext();
+        InventoryIncrementalProcessContext jobProcessContext = 
jobItemContext.getJobProcessContext();
         PipelineReadConfiguration readConfig = 
jobProcessContext.getPipelineProcessConfig().getRead();
         int batchSize = readConfig.getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = 
jobProcessContext.getReadRateLimitAlgorithm();
@@ -142,7 +142,7 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<IngestPosition<?>> getInventoryPositions(final 
PipelineJobItemContext jobItemContext, final InventoryDumperConfiguration 
dumperConfig,
+    private Collection<IngestPosition<?>> getInventoryPositions(final 
InventoryIncrementalJobItemContext jobItemContext, final 
InventoryDumperConfiguration dumperConfig,
                                                                 final 
DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
         String schemaName = dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName()));
         String actualTableName = dumperConfig.getActualTableName();
@@ -197,7 +197,7 @@ public final class InventoryTaskSplitter {
                 String.format("Can not split range for table %s, reason: table 
contains multiple unique index or unique index contains nullable/multiple 
column(s)", tableName));
     }
     
-    private Collection<IngestPosition<?>> 
getPositionByIntegerPrimaryKeyRange(final PipelineJobItemContext 
jobItemContext, final DataSource dataSource,
+    private Collection<IngestPosition<?>> 
getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext 
jobItemContext, final DataSource dataSource,
                                                                               
final InventoryDumperConfiguration dumperConfig) {
         Collection<IngestPosition<?>> result = new LinkedList<>();
         PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index df60ac463b9..e1f1e62dc49 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -34,7 +34,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -235,7 +234,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
-        PipelineProcessContext migrationProcessContext = new 
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+        MigrationProcessContext migrationProcessContext = new 
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
         return new ImporterConfiguration(jobConfig.getTarget(), 
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, 
migrationProcessContext.getWriteRateLimitAlgorithm(),
                 retryTimes, concurrency);
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index 058fae8e271..aa2db5d4be6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.context.AbstractPipelineProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 
 /**
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcess
  */
 @Getter
 @Slf4j
-public final class MigrationProcessContext extends 
AbstractPipelineProcessContext {
+public final class MigrationProcessContext extends 
AbstractInventoryIncrementalProcessContext {
     
     public MigrationProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
         super(jobId, originalProcessConfig);

Reply via email to