This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ae8bf1a  Refactor RuleAlteredJobCompletionDetectAlgorithm parameter 
(#14236)
ae8bf1a is described below

commit ae8bf1a8962caef11b4382b3e974724b65037497
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 22 22:49:37 2021 +0800

    Refactor RuleAlteredJobCompletionDetectAlgorithm parameter (#14236)
---
 ...AllIncrementalTasksAlmostFinishedParameter.java | 30 ++++++++++-----------
 .../data/pipeline/core/job/FinishedCheckJob.java   |  1 +
 ...dleRuleAlteredJobCompletionDetectAlgorithm.java |  4 ++-
 .../RuleAlteredJobProgressDetector.java            |  4 ++-
 .../RuleAlteredJobCompletionDetectAlgorithm.java   |  9 +++----
 ...ureRuleAlteredJobCompletionDetectAlgorithm.java |  5 ++--
 ...uleAlteredJobCompletionDetectAlgorithmTest.java | 31 +++++++++++++---------
 7 files changed, 44 insertions(+), 40 deletions(-)

diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/AllIncrementalTasksAlmostFinishedParameter.java
similarity index 59%
copy from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/AllIncrementalTasksAlmostFinishedParameter.java
index a6f408f..ddff3c3 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/AllIncrementalTasksAlmostFinishedParameter.java
@@ -15,25 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.api.detect;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 
 import java.util.Collection;
 
-public final class FixtureRuleAlteredJobCompletionDetectAlgorithm implements 
RuleAlteredJobCompletionDetectAlgorithm {
-    
-    @Override
-    public void init() {
-    }
-    
-    @Override
-    public boolean allIncrementalTasksAlmostFinished(final Collection<Long> 
incrementalTaskIdleMinutes) {
-        return true;
-    }
+/**
+ * All incremental tasks almost finished parameter.
+ */
+@Getter
+@Setter
+@Builder
+@ToString
+public final class AllIncrementalTasksAlmostFinishedParameter {
     
-    @Override
-    public String getType() {
-        return "FIXTURE";
-    }
+    private Collection<Long> incrementalTaskIdleMinutes;
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index e670159..252db73 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -87,6 +87,7 @@ public final class FinishedCheckJob implements SimpleJob {
                         
sourceWritingStopAlgorithm.resumeSourceWriting(schemaName, jobId + "");
                     }
                 }
+                log.info("job {} finished", jobId);
                 // CHECKSTYLE:OFF
             } catch (final Exception ex) {
                 // CHECKSTYLE:ON
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index 42a650f..5676b27 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 import com.google.common.base.Preconditions;
 import lombok.Getter;
 import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
 
 import java.util.Collection;
@@ -51,7 +52,8 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithm implements RuleAl
     }
     
     @Override
-    public boolean allIncrementalTasksAlmostFinished(final Collection<Long> 
incrementalTaskIdleMinutes) {
+    public boolean allIncrementalTasksAlmostFinished(final 
AllIncrementalTasksAlmostFinishedParameter parameter) {
+        Collection<Long> incrementalTaskIdleMinutes = 
parameter.getIncrementalTaskIdleMinutes();
         if (null == incrementalTaskIdleMinutes || 
incrementalTaskIdleMinutes.isEmpty()) {
             return false;
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
index adda3f7..435596f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -67,7 +68,8 @@ public final class RuleAlteredJobProgressDetector {
                     return latestActiveTimeMillis > 0 ? 
TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) : 0;
                 })
                 .collect(Collectors.toList());
-        return 
completionDetectAlgorithm.allIncrementalTasksAlmostFinished(incrementalTaskIdleMinutes);
+        AllIncrementalTasksAlmostFinishedParameter parameter = 
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(incrementalTaskIdleMinutes).build();
+        return 
completionDetectAlgorithm.allIncrementalTasksAlmostFinished(parameter);
     }
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
index 52b3357..164035c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
@@ -17,23 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
 
+import 
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
 import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
-import java.util.Collection;
-
 /**
  * Rule altered job completion detect algorithm for SPI.
  */
-// TODO extract JobCompletionDetectAlgorithm
 public interface RuleAlteredJobCompletionDetectAlgorithm extends 
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
     
     /**
      * All incremental tasks is almost finished.
      *
-     * @param incrementalTaskIdleMinutes incremental task idle minutes
+     * @param parameter parameter
      * @return Almost finished or not
      */
-    // TODO parameter
-    boolean allIncrementalTasksAlmostFinished(Collection<Long> 
incrementalTaskIdleMinutes);
+    boolean 
allIncrementalTasksAlmostFinished(AllIncrementalTasksAlmostFinishedParameter 
parameter);
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
index a6f408f..33eeb9f 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
@@ -17,10 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
+import 
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
 
-import java.util.Collection;
-
 public final class FixtureRuleAlteredJobCompletionDetectAlgorithm implements 
RuleAlteredJobCompletionDetectAlgorithm {
     
     @Override
@@ -28,7 +27,7 @@ public final class 
FixtureRuleAlteredJobCompletionDetectAlgorithm implements Rul
     }
     
     @Override
-    public boolean allIncrementalTasksAlmostFinished(final Collection<Long> 
incrementalTaskIdleMinutes) {
+    public boolean allIncrementalTasksAlmostFinished(final 
AllIncrementalTasksAlmostFinishedParameter parameter) {
         return true;
     }
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
index 174b323..5e05a1b 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
+import 
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -34,6 +34,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@@ -41,7 +42,7 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     @Mock
     private Properties propsMock;
     
-    private IdleRuleAlteredJobCompletionDetectAlgorithm detectAlgorithm = new 
IdleRuleAlteredJobCompletionDetectAlgorithm();
+    private final IdleRuleAlteredJobCompletionDetectAlgorithm detectAlgorithm 
= new IdleRuleAlteredJobCompletionDetectAlgorithm();
     
     @Before
     public void setup() throws Exception {
@@ -51,28 +52,28 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     
     @Test(expected = IllegalArgumentException.class)
     public void assertInitFailNoIdleThresholdKey() {
-        
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(false);
+        
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(false);
         detectAlgorithm.init();
     }
     
     @Test(expected = IllegalArgumentException.class)
     public void assertInitFailInvalidIdleThresholdKey() {
-        
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-        
Mockito.when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("@");
+        
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
+        
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("@");
         detectAlgorithm.init();
     }
     
     @Test(expected = IllegalArgumentException.class)
     public void assertInitFailNegativeIdleThresholdKey() {
-        
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-        
Mockito.when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("-8");
+        
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
+        
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("-8");
         detectAlgorithm.init();
     }
     
     @Test
     public void assertInitSuccess() {
-        
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-        
Mockito.when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("4");
+        
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
+        
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("4");
         detectAlgorithm.init();
     }
     
@@ -83,21 +84,25 @@ public final class 
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     
     @Test
     public void assertFalseOnNullIncrementalTasks() {
-        assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(null));
+        AllIncrementalTasksAlmostFinishedParameter parameter = 
AllIncrementalTasksAlmostFinishedParameter.builder().build();
+        
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
     }
     
     @Test
     public void assertFalseOnEmptyIncrementalTasks() {
-        
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(Collections.emptyList()));
+        AllIncrementalTasksAlmostFinishedParameter parameter = 
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(Collections.emptyList()).build();
+        
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
     }
     
     @Test
     public void assertFalseOnFewPendingIncrementalTasks() {
-        
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(Arrays.asList(10L,
 50L)));
+        AllIncrementalTasksAlmostFinishedParameter parameter = 
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(Arrays.asList(10L,
 50L)).build();
+        
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
     }
     
     @Test
     public void assertTrueWhenAllIncrementalTasksAlmostFinished() {
-        
assertTrue(detectAlgorithm.allIncrementalTasksAlmostFinished(Arrays.asList(60L, 
50L, 30L)));
+        AllIncrementalTasksAlmostFinishedParameter parameter = 
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(Arrays.asList(60L,
 50L, 30L)).build();
+        
assertTrue(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
     }
 }

Reply via email to