This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 54bf757a442 Remove unused job completion detect algorithm (#20534)
54bf757a442 is described below
commit 54bf757a442665d0f803844fcf7c973be3c448bc
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 26 11:31:12 2022 +0800
Remove unused job completion detect algorithm (#20534)
---
.../RuleAlteredJobAlmostCompletedParameter.java | 41 --------
.../spi/detect/JobCompletionDetectAlgorithm.java | 36 -------
.../JobCompletionDetectAlgorithmFactory.java | 57 -----------
...dleRuleAlteredJobCompletionDetectAlgorithm.java | 100 --------------------
.../JobCompletionDetectAlgorithmFixture.java | 44 ---------
...uleAlteredJobCompletionDetectAlgorithmTest.java | 105 ---------------------
6 files changed, 383 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
deleted file mode 100644
index 3802e453621..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.detect;
-
-import lombok.Getter;
-import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-
-import java.util.Collection;
-
-/**
- * Rule altered job almost completed parameter.
- */
-@RequiredArgsConstructor
-@Getter
-@ToString
-// TODO now rename
-public final class RuleAlteredJobAlmostCompletedParameter {
-
- private final int jobShardingCount;
-
- @NonNull
- private final Collection<InventoryIncrementalJobItemProgress>
jobItemProgresses;
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
deleted file mode 100644
index 01e604194ba..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.detect;
-
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
-
-/**
- * Job completion detect algorithm.
- *
- * @param <T> type of completion detect parameter
- */
-public interface JobCompletionDetectAlgorithm<T> extends
ShardingSphereAlgorithm {
-
- /**
- * Whether job is almost completed.
- *
- * @param parameter parameter
- * @return almost completed or not
- */
- boolean isAlmostCompleted(T parameter);
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithmFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithmFactory.java
deleted file mode 100644
index f76014261ce..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithmFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.detect;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
-import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
-
-/**
- * Job completion detect algorithm factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class JobCompletionDetectAlgorithmFactory {
-
- static {
-
ShardingSphereServiceLoader.register(JobCompletionDetectAlgorithm.class);
- }
-
- /**
- * Create new instance of job completion detect algorithm.
- *
- * @param jobCompletionDetectAlgorithmConfig job completion detect
algorithm configuration
- * @return created instance
- */
- @SuppressWarnings("rawtypes")
- public static JobCompletionDetectAlgorithm newInstance(final
AlgorithmConfiguration jobCompletionDetectAlgorithmConfig) {
- return
ShardingSphereAlgorithmFactory.createAlgorithm(jobCompletionDetectAlgorithmConfig,
JobCompletionDetectAlgorithm.class);
- }
-
- /**
- * Judge whether contains job completion detect algorithm.
- *
- * @param jobCompletionDetectAlgorithmType job completion detect algorithm
type
- * @return contains job completion detect algorithm or not
- */
- public static boolean contains(final String
jobCompletionDetectAlgorithmType) {
- return
TypedSPIRegistry.findRegisteredService(JobCompletionDetectAlgorithm.class,
jobCompletionDetectAlgorithmType).isPresent();
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
deleted file mode 100644
index 6056e09d35d..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
-
-import com.google.common.base.Preconditions;
-import lombok.Getter;
-import
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
-
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * Idle rule altered job completion detect algorithm.
- */
-public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> {
-
- private static final String IDLE_SECOND_THRESHOLD_KEY =
"incremental-task-idle-seconds-threshold";
-
- private static final long DEFAULT_IDLE_SECONDS_THRESHOLD = 1800L;
-
- @Getter
- private Properties props;
-
- private volatile long incrementalTaskIdleSecondsThreshold;
-
- @Override
- public void init(final Properties props) {
- this.props = props;
- incrementalTaskIdleSecondsThreshold =
getIncrementalTaskIdleSecondsThreshold(props);
- }
-
- private long getIncrementalTaskIdleSecondsThreshold(final Properties
props) {
- long result =
Long.parseLong(props.getOrDefault(IDLE_SECOND_THRESHOLD_KEY,
DEFAULT_IDLE_SECONDS_THRESHOLD).toString());
- Preconditions.checkArgument(result > 0, "Incremental task idle
threshold seconds must be positive.");
- return result;
- }
-
- @Override
- public boolean isAlmostCompleted(final
RuleAlteredJobAlmostCompletedParameter parameter) {
- int jobShardingCount = parameter.getJobShardingCount();
- Collection<InventoryIncrementalJobItemProgress> jobItemProgresses =
parameter.getJobItemProgresses();
- if (!isAllProgressesFilled(jobShardingCount, jobItemProgresses)) {
- return false;
- }
- if (!isAllInventoryTasksCompleted(jobItemProgresses)) {
- return false;
- }
- Collection<Long> incrementalTasksIdleSeconds =
getIncrementalTasksIdleSeconds(jobItemProgresses);
- return incrementalTasksIdleSeconds.stream().allMatch(each -> each >=
incrementalTaskIdleSecondsThreshold);
- }
-
- private static boolean isAllProgressesFilled(final int jobShardingCount,
final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
- return jobShardingCount == jobItemProgresses.size() &&
jobItemProgresses.stream().allMatch(Objects::nonNull);
- }
-
- private static boolean isAllInventoryTasksCompleted(final
Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
- return jobItemProgresses.stream().flatMap(each ->
each.getInventory().getInventoryTaskProgressMap().values().stream()).allMatch(each
-> each.getPosition() instanceof FinishedPosition);
- }
-
- private static Collection<Long> getIncrementalTasksIdleSeconds(final
Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
- long currentTimeMillis = System.currentTimeMillis();
- return jobItemProgresses.stream().flatMap(each ->
each.getIncremental().getIncrementalTaskProgressMap().values().stream())
- .map(each -> {
- long latestActiveTimeMillis =
each.getIncrementalTaskDelay().getLatestActiveTimeMillis();
- return latestActiveTimeMillis > 0 ?
TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0;
- })
- .collect(Collectors.toList());
- }
-
- @Override
- public String getType() {
- return "IDLE";
- }
-
- @Override
- public String toString() {
- return "IdleRuleAlteredJobCompletionDetectAlgorithm{" + "props=" +
props + '}';
- }
-}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/JobCompletionDetectAlgorithmFixture.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/JobCompletionDetectAlgorithmFixture.java
deleted file mode 100644
index 175491a90bf..00000000000
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/JobCompletionDetectAlgorithmFixture.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.fixture;
-
-import lombok.Getter;
-import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
-
-import java.util.Properties;
-
-@Getter
-public final class JobCompletionDetectAlgorithmFixture implements
JobCompletionDetectAlgorithm<Object> {
-
- private Properties props;
-
- @Override
- public void init(final Properties props) {
- this.props = props;
- }
-
- @Override
- public boolean isAlmostCompleted(final Object parameter) {
- return true;
- }
-
- @Override
- public String getType() {
- return "FIXTURE";
- }
-}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
deleted file mode 100644
index 53281a6838d..00000000000
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
-import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
-
- @Test(expected = IllegalArgumentException.class)
- public void assertInitFailInvalidIdleThresholdKey() {
- Properties props = new Properties();
- props.setProperty("incremental-task-idle-seconds-threshold",
"invalid_value");
- JobCompletionDetectAlgorithmFactory.newInstance(new
AlgorithmConfiguration("IDLE", props));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void assertInitFailNegativeIdleThresholdKey() {
- Properties props = new Properties();
- props.setProperty("incremental-task-idle-seconds-threshold", "-8");
- JobCompletionDetectAlgorithmFactory.newInstance(new
AlgorithmConfiguration("IDLE", props));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertFalseOnFewJobProgresses() {
- int jobShardingCount = 2;
- Collection<InventoryIncrementalJobItemProgress> jobItemProgresses =
Collections.singleton(new InventoryIncrementalJobItemProgress());
- RuleAlteredJobAlmostCompletedParameter parameter = new
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobItemProgresses);
- assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertFalseOnUnFinishedPosition() {
- int jobShardingCount = 1;
- InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
- jobItemProgress.setInventory(new
JobItemInventoryTasksProgress(Collections.singletonMap("foo_ds", new
InventoryTaskProgress(new PlaceholderPosition()))));
- RuleAlteredJobAlmostCompletedParameter parameter = new
RuleAlteredJobAlmostCompletedParameter(jobShardingCount,
Collections.singleton(jobItemProgress));
- assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertTrueWhenIdleMinutesNotReach() {
- int jobShardingCount = 1;
- long latestActiveTimeMillis = System.currentTimeMillis() -
ThreadLocalRandom.current().nextLong(1, 1800L);
- InventoryIncrementalJobItemProgress jobItemProgress =
createJobProgress(latestActiveTimeMillis);
- RuleAlteredJobAlmostCompletedParameter parameter = new
RuleAlteredJobAlmostCompletedParameter(jobShardingCount,
Collections.singleton(jobItemProgress));
- assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void assertTrueWhenJobAlmostCompleted() {
- int jobShardingCount = 1;
- long latestActiveTimeMillis = System.currentTimeMillis() -
TimeUnit.MINUTES.toMillis(1800L + 1800L);
- InventoryIncrementalJobItemProgress jobItemProgress =
createJobProgress(latestActiveTimeMillis);
- RuleAlteredJobAlmostCompletedParameter parameter = new
RuleAlteredJobAlmostCompletedParameter(jobShardingCount,
Collections.singleton(jobItemProgress));
- assertTrue(JobCompletionDetectAlgorithmFactory.newInstance(new
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
- }
-
- private InventoryIncrementalJobItemProgress createJobProgress(final long
latestActiveTimeMillis) {
- InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
- result.setInventory(new
JobItemInventoryTasksProgress(Collections.singletonMap("foo_ds", new
InventoryTaskProgress(new FinishedPosition()))));
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress();
-
incrementalTaskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(latestActiveTimeMillis);
- JobItemIncrementalTasksProgress incremental = new
JobItemIncrementalTasksProgress(Collections.singletonMap("foo_ds",
incrementalTaskProgress));
- result.setIncremental(incremental);
- return result;
- }
-}