This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 597c6448fbb Add pipeline job common interfaces (#19758)
597c6448fbb is described below

commit 597c6448fbb9fdebcb7298d09289794f13244bd7
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Aug 2 15:00:12 2022 +0800

    Add pipeline job common interfaces (#19758)
    
    * Add pipeline job common interfaces
    
    * Improve scaling IT waitScalingFinished
---
 .../data/pipeline/api/RuleAlteredJobAPI.java         |  1 +
 .../api/config/job/PipelineJobConfiguration.java     |  7 +++++++
 .../RuleAlteredJobAlmostCompletedParameter.java      |  1 +
 .../PipelineJob.java}                                | 20 +++-----------------
 .../data/pipeline/api/job/progress/JobProgress.java  |  3 ++-
 .../progress/PipelineJobProgress.java}               | 20 +++-----------------
 .../scenario/rulealtered/RuleAlteredJob.java         |  3 ++-
 .../data/pipeline/cases/base/BaseITCase.java         | 10 +++++-----
 8 files changed, 24 insertions(+), 41 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index db201f9a5b0..283de7dc691 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -56,6 +56,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      * @param jobId job id
      * @return each sharding item progress
      */
+    // TODO now update JobProgress
     Map<Integer, JobProgress> getProgress(String jobId);
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
index 58bf6cb2128..f16a785e138 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
@@ -35,4 +35,11 @@ public interface PipelineJobConfiguration {
      * @return database name
      */
     String getDatabaseName();
+    
+    /**
+     * Get job sharding count.
+     *
+     * @return job sharding count
+     */
+    int getJobShardingCount();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
index 58d32a52eb3..7d349cb01b5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 @RequiredArgsConstructor
 @Getter
 @ToString
+// TODO now rename
 public final class RuleAlteredJobAlmostCompletedParameter {
     
     private final int jobShardingCount;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
similarity index 70%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
index 58bf6cb2128..183c32856ea 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
@@ -15,24 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config.job;
+package org.apache.shardingsphere.data.pipeline.api.job;
 
 /**
- * Pipeline job configuration.
+ * Pipeline job.
  */
-public interface PipelineJobConfiguration {
-    
-    /**
-     * Get job id.
-     *
-     * @return job id
-     */
-    String getJobId();
-    
-    /**
-     * Get database name.
-     *
-     * @return database name
-     */
-    String getDatabaseName();
+public interface PipelineJob {
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
index df70f90c1d7..2f0be9743b1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
@@ -37,7 +37,8 @@ import java.util.stream.Collectors;
  */
 @Getter
 @Setter
-public final class JobProgress {
+// TODO now rename
+public final class JobProgress implements PipelineJobProgress {
     
     private JobStatus status = JobStatus.RUNNING;
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/PipelineJobProgress.java
similarity index 70%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/PipelineJobProgress.java
index 58bf6cb2128..ab7a8e080bb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/PipelineJobProgress.java
@@ -15,24 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config.job;
+package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
 /**
- * Pipeline job configuration.
+ * Pipeline job progress.
  */
-public interface PipelineJobConfiguration {
-    
-    /**
-     * Get job id.
-     *
-     * @return job id
-     */
-    String getJobId();
-    
-    /**
-     * Get database name.
-     *
-     * @return database name
-     */
-    String getDatabaseName();
+public interface PipelineJobProgress {
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 4ff6a24d464..5c579515b2b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -31,7 +32,7 @@ import 
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
  * Rule altered job.
  */
 @Slf4j
-public final class RuleAlteredJob implements SimpleJob {
+public final class RuleAlteredJob implements SimpleJob, PipelineJob {
     
     private final GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
     
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 10628d82044..f1bef198a71 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -332,6 +332,7 @@ public abstract class BaseITCase {
     }
     
     protected void applyScaling(final String jobId) {
+        assertBeforeApplyScalingMetadataCorrectly();
         executeWithLog(String.format("APPLY SCALING %s", jobId));
     }
     
@@ -356,11 +357,11 @@ public abstract class BaseITCase {
         Set<String> actualStatus = null;
         for (int i = 0; i < 15; i++) {
             actualStatus = new HashSet<>();
-            List<Map<String, Object>> showScalingStatusResMap = 
showScalingStatus(jobId);
-            log.info("show scaling status result: {}", 
showScalingStatusResMap);
+            List<Map<String, Object>> showScalingStatusResult = 
showScalingStatus(jobId);
+            log.info("show scaling status result: {}", 
showScalingStatusResult);
             boolean finished = true;
-            for (Map<String, Object> entry : showScalingStatusResMap) {
-                String status = entry.get("status").toString();
+            for (Map<String, Object> each : showScalingStatusResult) {
+                String status = each.get("status").toString();
                 assertThat(status, not(JobStatus.PREPARING_FAILURE.name()));
                 assertThat(status, 
not(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
                 assertThat(status, 
not(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
@@ -373,7 +374,6 @@ public abstract class BaseITCase {
             if (finished) {
                 break;
             }
-            assertBeforeApplyScalingMetadataCorrectly();
             ThreadUtil.sleep(4, TimeUnit.SECONDS);
         }
         assertThat(actualStatus, 
is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));

Reply via email to