This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ae8bf1a Refactor RuleAlteredJobCompletionDetectAlgorithm parameter
(#14236)
ae8bf1a is described below
commit ae8bf1a8962caef11b4382b3e974724b65037497
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 22 22:49:37 2021 +0800
Refactor RuleAlteredJobCompletionDetectAlgorithm parameter (#14236)
---
...AllIncrementalTasksAlmostFinishedParameter.java | 30 ++++++++++-----------
.../data/pipeline/core/job/FinishedCheckJob.java | 1 +
...dleRuleAlteredJobCompletionDetectAlgorithm.java | 4 ++-
.../RuleAlteredJobProgressDetector.java | 4 ++-
.../RuleAlteredJobCompletionDetectAlgorithm.java | 9 +++----
...ureRuleAlteredJobCompletionDetectAlgorithm.java | 5 ++--
...uleAlteredJobCompletionDetectAlgorithmTest.java | 31 +++++++++++++---------
7 files changed, 44 insertions(+), 40 deletions(-)
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/AllIncrementalTasksAlmostFinishedParameter.java
similarity index 59%
copy from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/AllIncrementalTasksAlmostFinishedParameter.java
index a6f408f..ddff3c3 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/AllIncrementalTasksAlmostFinishedParameter.java
@@ -15,25 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.api.detect;
-import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
import java.util.Collection;
-public final class FixtureRuleAlteredJobCompletionDetectAlgorithm implements
RuleAlteredJobCompletionDetectAlgorithm {
-
- @Override
- public void init() {
- }
-
- @Override
- public boolean allIncrementalTasksAlmostFinished(final Collection<Long>
incrementalTaskIdleMinutes) {
- return true;
- }
+/**
+ * All incremental tasks almost finished parameter.
+ */
+@Getter
+@Setter
+@Builder
+@ToString
+public final class AllIncrementalTasksAlmostFinishedParameter {
- @Override
- public String getType() {
- return "FIXTURE";
- }
+ private Collection<Long> incrementalTaskIdleMinutes;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index e670159..252db73 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -87,6 +87,7 @@ public final class FinishedCheckJob implements SimpleJob {
sourceWritingStopAlgorithm.resumeSourceWriting(schemaName, jobId + "");
}
}
+ log.info("job {} finished", jobId);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index 42a650f..5676b27 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
import java.util.Collection;
@@ -51,7 +52,8 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithm implements RuleAl
}
@Override
- public boolean allIncrementalTasksAlmostFinished(final Collection<Long>
incrementalTaskIdleMinutes) {
+ public boolean allIncrementalTasksAlmostFinished(final
AllIncrementalTasksAlmostFinishedParameter parameter) {
+ Collection<Long> incrementalTaskIdleMinutes =
parameter.getIncrementalTaskIdleMinutes();
if (null == incrementalTaskIdleMinutes ||
incrementalTaskIdleMinutes.isEmpty()) {
return false;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
index adda3f7..435596f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobProgressDetector.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -67,7 +68,8 @@ public final class RuleAlteredJobProgressDetector {
return latestActiveTimeMillis > 0 ?
TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) : 0;
})
.collect(Collectors.toList());
- return
completionDetectAlgorithm.allIncrementalTasksAlmostFinished(incrementalTaskIdleMinutes);
+ AllIncrementalTasksAlmostFinishedParameter parameter =
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(incrementalTaskIdleMinutes).build();
+ return
completionDetectAlgorithm.allIncrementalTasksAlmostFinished(parameter);
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
index 52b3357..164035c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobCompletionDetectAlgorithm.java
@@ -17,23 +17,20 @@
package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
+import
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
-import java.util.Collection;
-
/**
* Rule altered job completion detect algorithm for SPI.
*/
-// TODO extract JobCompletionDetectAlgorithm
public interface RuleAlteredJobCompletionDetectAlgorithm extends
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
/**
* All incremental tasks is almost finished.
*
- * @param incrementalTaskIdleMinutes incremental task idle minutes
+ * @param parameter parameter
* @return Almost finished or not
*/
- // TODO parameter
- boolean allIncrementalTasksAlmostFinished(Collection<Long>
incrementalTaskIdleMinutes);
+ boolean
allIncrementalTasksAlmostFinished(AllIncrementalTasksAlmostFinishedParameter
parameter);
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
index a6f408f..33eeb9f 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureRuleAlteredJobCompletionDetectAlgorithm.java
@@ -17,10 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
+import
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCompletionDetectAlgorithm;
-import java.util.Collection;
-
public final class FixtureRuleAlteredJobCompletionDetectAlgorithm implements
RuleAlteredJobCompletionDetectAlgorithm {
@Override
@@ -28,7 +27,7 @@ public final class
FixtureRuleAlteredJobCompletionDetectAlgorithm implements Rul
}
@Override
- public boolean allIncrementalTasksAlmostFinished(final Collection<Long>
incrementalTaskIdleMinutes) {
+ public boolean allIncrementalTasksAlmostFinished(final
AllIncrementalTasksAlmostFinishedParameter parameter) {
return true;
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
index 174b323..5e05a1b 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
@@ -17,12 +17,12 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+import
org.apache.shardingsphere.data.pipeline.api.detect.AllIncrementalTasksAlmostFinishedParameter;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
@@ -34,6 +34,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@@ -41,7 +42,7 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Mock
private Properties propsMock;
- private IdleRuleAlteredJobCompletionDetectAlgorithm detectAlgorithm = new
IdleRuleAlteredJobCompletionDetectAlgorithm();
+ private final IdleRuleAlteredJobCompletionDetectAlgorithm detectAlgorithm
= new IdleRuleAlteredJobCompletionDetectAlgorithm();
@Before
public void setup() throws Exception {
@@ -51,28 +52,28 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test(expected = IllegalArgumentException.class)
public void assertInitFailNoIdleThresholdKey() {
-
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(false);
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(false);
detectAlgorithm.init();
}
@Test(expected = IllegalArgumentException.class)
public void assertInitFailInvalidIdleThresholdKey() {
-
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-
Mockito.when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("@");
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
+
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("@");
detectAlgorithm.init();
}
@Test(expected = IllegalArgumentException.class)
public void assertInitFailNegativeIdleThresholdKey() {
-
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-
Mockito.when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("-8");
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
+
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("-8");
detectAlgorithm.init();
}
@Test
public void assertInitSuccess() {
-
Mockito.when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-
Mockito.when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("4");
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
+
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("4");
detectAlgorithm.init();
}
@@ -83,21 +84,25 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test
public void assertFalseOnNullIncrementalTasks() {
- assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(null));
+ AllIncrementalTasksAlmostFinishedParameter parameter =
AllIncrementalTasksAlmostFinishedParameter.builder().build();
+
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
}
@Test
public void assertFalseOnEmptyIncrementalTasks() {
-
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(Collections.emptyList()));
+ AllIncrementalTasksAlmostFinishedParameter parameter =
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(Collections.emptyList()).build();
+
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
}
@Test
public void assertFalseOnFewPendingIncrementalTasks() {
-
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(Arrays.asList(10L,
50L)));
+ AllIncrementalTasksAlmostFinishedParameter parameter =
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(Arrays.asList(10L,
50L)).build();
+
assertFalse(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
}
@Test
public void assertTrueWhenAllIncrementalTasksAlmostFinished() {
-
assertTrue(detectAlgorithm.allIncrementalTasksAlmostFinished(Arrays.asList(60L,
50L, 30L)));
+ AllIncrementalTasksAlmostFinishedParameter parameter =
AllIncrementalTasksAlmostFinishedParameter.builder().incrementalTaskIdleMinutes(Arrays.asList(60L,
50L, 30L)).build();
+
assertTrue(detectAlgorithm.allIncrementalTasksAlmostFinished(parameter));
}
}