This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4335a55 Add second level threshold for IDLE completion detect
algorithm (#15480)
4335a55 is described below
commit 4335a55ef5e73dd011960caf3d9f82bf41ae7c78
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Feb 18 11:51:11 2022 +0800
Add second level threshold for IDLE completion detect algorithm (#15480)
* Refactor PipelineSimpleLock
* Increase FinishedCheckJob execution frequency
* Add incremental-task-idle-second-threshold for IDLE completion detector
* Change scaling status column name to incremental_idle_seconds
* Increase job progress persist frequency
---
.../user-manual/shardingsphere-scaling/usage.cn.md | 2 +-
.../user-manual/shardingsphere-scaling/usage.en.md | 2 +-
.../ShowScalingJobStatusQueryResultSet.java | 4 +-
.../core/constant/DataPipelineConstants.java | 9 +++-
.../core/execute/FinishedCheckJobExecutor.java | 3 +-
.../pipeline/core/execute/PipelineJobExecutor.java | 4 +-
...DistributeLock.java => PipelineSimpleLock.java} | 48 +++++++++++++---------
.../rulealtered/RuleAlteredJobSchedulerCenter.java | 3 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 4 +-
...dleRuleAlteredJobCompletionDetectAlgorithm.java | 26 ++++++++----
...uleAlteredJobCompletionDetectAlgorithmTest.java | 19 +++++----
11 files changed, 74 insertions(+), 50 deletions(-)
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index 2643706..e9b9b7b 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -234,7 +234,7 @@ show scaling status {jobId};
```
mysql> show scaling status 660152090995195904;
+------+-------------+----------+-------------------------------+--------------------------+
-| item | data_source | status | inventory_finished_percentage |
incremental_idle_minutes |
+| item | data_source | status | inventory_finished_percentage |
incremental_idle_seconds |
+------+-------------+----------+-------------------------------+--------------------------+
| 0 | ds_1 | FINISHED | 100 | 2834
|
| 1 | ds_0 | FINISHED | 100 | 2834
|
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 5d34402..e48d66a 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -236,7 +236,7 @@ Response:
```
mysql> show scaling status 660152090995195904;
+------+-------------+----------+-------------------------------+--------------------------+
-| item | data_source | status | inventory_finished_percentage |
incremental_idle_minutes |
+| item | data_source | status | inventory_finished_percentage |
incremental_idle_seconds |
+------+-------------+----------+-------------------------------+--------------------------+
| 0 | ds_1 | FINISHED | 100 | 2834
|
| 1 | ds_0 | FINISHED | 100 | 2834
|
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
index 254e954..9528d01 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
@@ -53,7 +53,7 @@ public final class ShowScalingJobStatusQueryResultSet
implements DistSQLResultSe
list.add(entry.getValue().isActive() ? "true" :
"false");
list.add(entry.getValue().getInventoryFinishedPercentage());
long latestActiveTimeMillis =
entry.getValue().getIncrementalLatestActiveTimeMillis();
- list.add(latestActiveTimeMillis > 0 ?
TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) :
0);
+ list.add(latestActiveTimeMillis > 0 ?
TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) :
0);
} else {
list.add("");
list.add("");
@@ -67,7 +67,7 @@ public final class ShowScalingJobStatusQueryResultSet
implements DistSQLResultSe
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("item", "data_source", "status", "active",
"inventory_finished_percentage", "incremental_idle_minutes");
+ return Arrays.asList("item", "data_source", "status", "active",
"inventory_finished_percentage", "incremental_idle_seconds");
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
index 7933f9a..8a7e550 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
@@ -27,8 +27,13 @@ import lombok.NoArgsConstructor;
public final class DataPipelineConstants {
/**
+ * Data pipeline node name.
+ */
+ // TODO change to pipeline after job configuration structure completed
+ public static final String DATA_PIPELINE_NODE_NAME = "scaling";
+
+ /**
* Data pipeline root path.
*/
- // TODO change to /pipeline after job configuration structure completed
- public static final String DATA_PIPELINE_ROOT = "/scaling";
+ public static final String DATA_PIPELINE_ROOT = "/" +
DATA_PIPELINE_NODE_NAME;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
index 2f287e9..5d5f6c3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
@@ -32,10 +32,11 @@ public final class FinishedCheckJobExecutor extends
AbstractLifecycleExecutor {
private static final String JOB_NAME = "_finished_check";
- private static final String CRON_EXPRESSION = "0 * * * * ?";
+ private static final String CRON_EXPRESSION = "*/10 * * * * ?";
@Override
protected void doStart() {
+ // TODO refactor it and FinishedCheck after ejob support non-cron job
new ScheduleJobBootstrap(PipelineAPIFactory.getRegistryCenter(), new
FinishedCheckJob(), createJobConfig()).schedule();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 8fa0e7d..e1f7b71 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import
org.apache.shardingsphere.data.pipeline.core.lock.ScalingSchemaNameDistributeLock;
+import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -72,7 +72,7 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
case ADDED:
case UPDATED:
JobConfiguration jobConfig =
YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class,
true);
- if
(ScalingSchemaNameDistributeLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(),
1000)) {
+ if
(PipelineSimpleLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(),
1000)) {
execute(jobConfigPOJO);
}
break;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
similarity index 60%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
index 932f62f..ac9422b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
@@ -18,38 +18,46 @@
package org.apache.shardingsphere.data.pipeline.core.lock;
import com.google.common.collect.Maps;
+import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Map;
+import java.util.Optional;
/**
- * Distributed locks added to the schema name during scaling.
+ * Pipeline simple lock.
*/
-public final class ScalingSchemaNameDistributeLock {
+// TODO extract interface and factory
+public final class PipelineSimpleLock {
- private static volatile ScalingSchemaNameDistributeLock instance;
+ private static volatile PipelineSimpleLock instance;
private final LockRegistryService lockRegistryService;
private final Map<String, Boolean> lockNameLockedMap;
- private ScalingSchemaNameDistributeLock() {
- ClusterPersistRepository repository = (ClusterPersistRepository)
PipelineContext.getContextManager().getMetaDataContexts().getMetaDataPersistService().get().getRepository();
+ private PipelineSimpleLock() {
+ Optional<MetaDataPersistService> persistServiceOptional =
PipelineContext.getContextManager().getMetaDataContexts().getMetaDataPersistService();
+ persistServiceOptional.orElseThrow(() -> new RuntimeException("Could
not get metadata persist service"));
+ // TODO Use PersistRepository later
+ ClusterPersistRepository repository = (ClusterPersistRepository)
persistServiceOptional.get().getRepository();
lockRegistryService = new LockRegistryService(repository);
lockNameLockedMap = Maps.newConcurrentMap();
}
/**
- * get ScalingSchemaNameDistributeLock instance.
- * @return ScalingSchemaNameDistributeLock
+ * Get instance.
+ *
+ * @return instance
*/
- public static ScalingSchemaNameDistributeLock getInstance() {
+ public static PipelineSimpleLock getInstance() {
if (null == instance) {
- synchronized (ScalingSchemaNameDistributeLock.class) {
+ synchronized (PipelineSimpleLock.class) {
if (null == instance) {
- instance = new ScalingSchemaNameDistributeLock();
+ instance = new PipelineSimpleLock();
}
}
}
@@ -57,21 +65,23 @@ public final class ScalingSchemaNameDistributeLock {
}
/**
- * Try to get lock.
+ * Try to lock.
+ *
* @param lockName lock name
- * @param timeoutMilliseconds the maximum time in milliseconds to acquire
lock
- * @return true if get the lock, false if not
+ * @param timeoutMills the maximum time in milliseconds to acquire lock
+ * @return true if lock got, else false
*/
- public boolean tryLock(final String lockName, final long
timeoutMilliseconds) {
- boolean locked =
lockRegistryService.tryLock(decorateLockName(lockName), timeoutMilliseconds);
- if (locked) {
+ public boolean tryLock(final String lockName, final long timeoutMills) {
+ boolean result =
lockRegistryService.tryLock(decorateLockName(lockName), timeoutMills);
+ if (result) {
lockNameLockedMap.put(lockName, true);
}
- return locked;
+ return result;
}
/**
* Release lock.
+ *
* @param lockName lock name
*/
public void releaseLock(final String lockName) {
@@ -81,7 +91,7 @@ public final class ScalingSchemaNameDistributeLock {
}
}
- private String decorateLockName(final String schemaName) {
- return "Scaling-" + schemaName;
+ private String decorateLockName(final String lockName) {
+ return DataPipelineConstants.DATA_PIPELINE_NODE_NAME + "-" + lockName;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 243fe5b..fdaa411 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -46,8 +46,7 @@ public final class RuleAlteredJobSchedulerCenter {
private static final GovernanceRepositoryAPI REGISTRY_REPOSITORY_API =
PipelineAPIFactory.getGovernanceRepositoryAPI();
static {
- // TODO it's too slow to persist job progress
- JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new
PersistJobContextRunnable(), 1, 1, TimeUnit.MINUTES);
+ JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new
PersistJobContextRunnable(), 10, 10, TimeUnit.SECONDS);
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index b13fd21..4c83bfe 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -35,7 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
-import
org.apache.shardingsphere.data.pipeline.core.lock.ScalingSchemaNameDistributeLock;
+import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
@@ -326,6 +326,6 @@ public final class RuleAlteredJobWorker {
*/
@Subscribe
public void scalingReleaseSchemaNameLock(final
ScalingReleaseSchemaNameLockEvent event) {
-
ScalingSchemaNameDistributeLock.getInstance().releaseLock(event.getSchemaName());
+ PipelineSimpleLock.getInstance().releaseLock(event.getSchemaName());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index c6af445..d1edd90 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -35,12 +35,16 @@ import java.util.stream.Collectors;
*/
public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> {
- public static final String IDLE_THRESHOLD_KEY =
"incremental-task-idle-minute-threshold";
+ public static final String IDLE_MINUTE_THRESHOLD_KEY =
"incremental-task-idle-minute-threshold";
+
+ public static final String IDLE_SECOND_THRESHOLD_KEY =
"incremental-task-idle-second-threshold";
+
+ public static final long DEFAULT_IDLE_SECOND_THRESHOLD =
TimeUnit.MINUTES.toSeconds(30);
private Properties props = new Properties();
@Getter
- private long incrementalTaskIdleMinuteThreshold = 30;
+ private long incrementalTaskIdleSecondThreshold =
DEFAULT_IDLE_SECOND_THRESHOLD;
@Override
public Properties getProps() {
@@ -54,9 +58,13 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
@Override
public void init() {
- Preconditions.checkArgument(props.containsKey(IDLE_THRESHOLD_KEY), "%s
can not be null.", IDLE_THRESHOLD_KEY);
- incrementalTaskIdleMinuteThreshold =
Long.parseLong(props.getProperty(IDLE_THRESHOLD_KEY));
- Preconditions.checkArgument(incrementalTaskIdleMinuteThreshold > 0,
"%s value must be positive.", IDLE_THRESHOLD_KEY);
+
Preconditions.checkArgument(props.containsKey(IDLE_MINUTE_THRESHOLD_KEY) ||
props.containsKey(IDLE_SECOND_THRESHOLD_KEY), "incremental task idle threshold
can not be null.");
+ if (props.containsKey(IDLE_SECOND_THRESHOLD_KEY)) {
+ incrementalTaskIdleSecondThreshold =
Long.parseLong(props.getProperty(IDLE_SECOND_THRESHOLD_KEY));
+ } else {
+ incrementalTaskIdleSecondThreshold =
TimeUnit.MINUTES.toSeconds(Long.parseLong(props.getProperty(IDLE_MINUTE_THRESHOLD_KEY)));
+ }
+ Preconditions.checkArgument(incrementalTaskIdleSecondThreshold > 0,
"incremental task idle threshold must be positive.");
}
@Override
@@ -74,8 +82,8 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
if (!isAllInventoryTasksCompleted(jobProgresses)) {
return false;
}
- Collection<Long> incrementalTasksIdleMinutes =
getIncrementalTasksIdleMinutes(jobProgresses);
- return incrementalTasksIdleMinutes.stream().allMatch(idleMinute ->
idleMinute >= incrementalTaskIdleMinuteThreshold);
+ Collection<Long> incrementalTasksIdleSeconds =
getIncrementalTasksIdleSeconds(jobProgresses);
+ return incrementalTasksIdleSeconds.stream().allMatch(each -> each >=
incrementalTaskIdleSecondThreshold);
}
private static boolean isAllProgressesFilled(final int jobShardingCount,
final Collection<JobProgress> jobProgresses) {
@@ -89,12 +97,12 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
.allMatch(each -> each.getPosition() instanceof
FinishedPosition);
}
- private static Collection<Long> getIncrementalTasksIdleMinutes(final
Collection<JobProgress> jobProgresses) {
+ private static Collection<Long> getIncrementalTasksIdleSeconds(final
Collection<JobProgress> jobProgresses) {
long currentTimeMillis = System.currentTimeMillis();
return jobProgresses.stream().flatMap(each ->
each.getIncrementalTaskProgressMap().values().stream())
.map(each -> {
long latestActiveTimeMillis =
each.getIncrementalTaskDelay().getLatestActiveTimeMillis();
- return latestActiveTimeMillis > 0 ?
TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) : 0;
+ return latestActiveTimeMillis > 0 ?
TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0;
})
.collect(Collectors.toList());
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
index ce34975..62a4e03 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
@@ -62,28 +62,28 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test(expected = IllegalArgumentException.class)
public void assertInitFailNoIdleThresholdKey() {
-
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(false);
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(false);
detectAlgorithm.init();
}
@Test(expected = IllegalArgumentException.class)
public void assertInitFailInvalidIdleThresholdKey() {
-
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("@");
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("@");
detectAlgorithm.init();
}
@Test(expected = IllegalArgumentException.class)
public void assertInitFailNegativeIdleThresholdKey() {
-
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("-8");
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("-8");
detectAlgorithm.init();
}
@Test
public void assertInitSuccess() {
-
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("4");
+
when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+
when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("4");
detectAlgorithm.init();
}
@@ -115,7 +115,7 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test
public void assertTrueWhenIdleMinutesNotReach() {
int jobShardingCount = 1;
- long latestActiveTimeMillis = System.currentTimeMillis() -
ThreadLocalRandom.current().nextLong(1,
detectAlgorithm.getIncrementalTaskIdleMinuteThreshold());
+ long latestActiveTimeMillis = System.currentTimeMillis() -
ThreadLocalRandom.current().nextLong(1,
detectAlgorithm.getIncrementalTaskIdleSecondThreshold());
JobProgress jobProgress = createJobProgress(latestActiveTimeMillis);
Collection<JobProgress> jobProgresses =
Collections.singleton(jobProgress);
RuleAlteredJobAlmostCompletedParameter parameter = new
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses);
@@ -138,7 +138,8 @@ public final class
IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test
public void assertTrueWhenJobAlmostCompleted() {
int jobShardingCount = 1;
- long latestActiveTimeMillis = System.currentTimeMillis() -
TimeUnit.MINUTES.toMillis(detectAlgorithm.getIncrementalTaskIdleMinuteThreshold()
+ 5);
+ long latestActiveTimeMillis = System.currentTimeMillis() -
TimeUnit.MINUTES.toMillis(detectAlgorithm.getIncrementalTaskIdleSecondThreshold()
+ +
IdleRuleAlteredJobCompletionDetectAlgorithm.DEFAULT_IDLE_SECOND_THRESHOLD);
JobProgress jobProgress = createJobProgress(latestActiveTimeMillis);
Collection<JobProgress> jobProgresses =
Collections.singleton(jobProgress);
RuleAlteredJobAlmostCompletedParameter parameter = new
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses);