This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 54bf757a442 Remove unused job completion detect algorithm (#20534)
54bf757a442 is described below

commit 54bf757a442665d0f803844fcf7c973be3c448bc
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Aug 26 11:31:12 2022 +0800

    Remove unused job completion detect algorithm (#20534)
---
 .../RuleAlteredJobAlmostCompletedParameter.java    |  41 --------
 .../spi/detect/JobCompletionDetectAlgorithm.java   |  36 -------
 .../JobCompletionDetectAlgorithmFactory.java       |  57 -----------
 ...dleRuleAlteredJobCompletionDetectAlgorithm.java | 100 --------------------
 .../JobCompletionDetectAlgorithmFixture.java       |  44 ---------
 ...uleAlteredJobCompletionDetectAlgorithmTest.java | 105 ---------------------
 6 files changed, 383 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
deleted file mode 100644
index 3802e453621..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/detect/RuleAlteredJobAlmostCompletedParameter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.detect;
-
-import lombok.Getter;
-import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-
-import java.util.Collection;
-
-/**
- * Rule altered job almost completed parameter.
- */
-@RequiredArgsConstructor
-@Getter
-@ToString
-// TODO now rename
-public final class RuleAlteredJobAlmostCompletedParameter {
-    
-    private final int jobShardingCount;
-    
-    @NonNull
-    private final Collection<InventoryIncrementalJobItemProgress> 
jobItemProgresses;
-}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
deleted file mode 100644
index 01e604194ba..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.detect;
-
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
-
-/**
- * Job completion detect algorithm.
- *
- * @param <T> type of completion detect parameter
- */
-public interface JobCompletionDetectAlgorithm<T> extends 
ShardingSphereAlgorithm {
-    
-    /**
-     * Whether job is almost completed.
-     *
-     * @param parameter parameter
-     * @return almost completed or not
-     */
-    boolean isAlmostCompleted(T parameter);
-}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithmFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithmFactory.java
deleted file mode 100644
index f76014261ce..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithmFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.detect;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import 
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
-import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
-
-/**
- * Job completion detect algorithm factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class JobCompletionDetectAlgorithmFactory {
-    
-    static {
-        
ShardingSphereServiceLoader.register(JobCompletionDetectAlgorithm.class);
-    }
-    
-    /**
-     * Create new instance of job completion detect algorithm.
-     *
-     * @param jobCompletionDetectAlgorithmConfig job completion detect 
algorithm configuration
-     * @return created instance
-     */
-    @SuppressWarnings("rawtypes")
-    public static JobCompletionDetectAlgorithm newInstance(final 
AlgorithmConfiguration jobCompletionDetectAlgorithmConfig) {
-        return 
ShardingSphereAlgorithmFactory.createAlgorithm(jobCompletionDetectAlgorithmConfig,
 JobCompletionDetectAlgorithm.class);
-    }
-    
-    /**
-     * Judge whether contains job completion detect algorithm.
-     *
-     * @param jobCompletionDetectAlgorithmType job completion detect algorithm 
type
-     * @return contains job completion detect algorithm or not
-     */
-    public static boolean contains(final String 
jobCompletionDetectAlgorithmType) {
-        return 
TypedSPIRegistry.findRegisteredService(JobCompletionDetectAlgorithm.class, 
jobCompletionDetectAlgorithmType).isPresent();
-    }
-}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
deleted file mode 100644
index 6056e09d35d..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
-
-import com.google.common.base.Preconditions;
-import lombok.Getter;
-import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
-
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * Idle rule altered job completion detect algorithm.
- */
-public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements 
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> {
-    
-    private static final String IDLE_SECOND_THRESHOLD_KEY = 
"incremental-task-idle-seconds-threshold";
-    
-    private static final long DEFAULT_IDLE_SECONDS_THRESHOLD = 1800L;
-    
-    @Getter
-    private Properties props;
-    
-    private volatile long incrementalTaskIdleSecondsThreshold;
-    
-    @Override
-    public void init(final Properties props) {
-        this.props = props;
-        incrementalTaskIdleSecondsThreshold = 
getIncrementalTaskIdleSecondsThreshold(props);
-    }
-    
-    private long getIncrementalTaskIdleSecondsThreshold(final Properties 
props) {
-        long result = 
Long.parseLong(props.getOrDefault(IDLE_SECOND_THRESHOLD_KEY, 
DEFAULT_IDLE_SECONDS_THRESHOLD).toString());
-        Preconditions.checkArgument(result > 0, "Incremental task idle 
threshold seconds must be positive.");
-        return result;
-    }
-    
-    @Override
-    public boolean isAlmostCompleted(final 
RuleAlteredJobAlmostCompletedParameter parameter) {
-        int jobShardingCount = parameter.getJobShardingCount();
-        Collection<InventoryIncrementalJobItemProgress> jobItemProgresses = 
parameter.getJobItemProgresses();
-        if (!isAllProgressesFilled(jobShardingCount, jobItemProgresses)) {
-            return false;
-        }
-        if (!isAllInventoryTasksCompleted(jobItemProgresses)) {
-            return false;
-        }
-        Collection<Long> incrementalTasksIdleSeconds = 
getIncrementalTasksIdleSeconds(jobItemProgresses);
-        return incrementalTasksIdleSeconds.stream().allMatch(each -> each >= 
incrementalTaskIdleSecondsThreshold);
-    }
-    
-    private static boolean isAllProgressesFilled(final int jobShardingCount, 
final Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
-        return jobShardingCount == jobItemProgresses.size() && 
jobItemProgresses.stream().allMatch(Objects::nonNull);
-    }
-    
-    private static boolean isAllInventoryTasksCompleted(final 
Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
-        return jobItemProgresses.stream().flatMap(each -> 
each.getInventory().getInventoryTaskProgressMap().values().stream()).allMatch(each
 -> each.getPosition() instanceof FinishedPosition);
-    }
-    
-    private static Collection<Long> getIncrementalTasksIdleSeconds(final 
Collection<InventoryIncrementalJobItemProgress> jobItemProgresses) {
-        long currentTimeMillis = System.currentTimeMillis();
-        return jobItemProgresses.stream().flatMap(each -> 
each.getIncremental().getIncrementalTaskProgressMap().values().stream())
-                .map(each -> {
-                    long latestActiveTimeMillis = 
each.getIncrementalTaskDelay().getLatestActiveTimeMillis();
-                    return latestActiveTimeMillis > 0 ? 
TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0;
-                })
-                .collect(Collectors.toList());
-    }
-    
-    @Override
-    public String getType() {
-        return "IDLE";
-    }
-    
-    @Override
-    public String toString() {
-        return "IdleRuleAlteredJobCompletionDetectAlgorithm{" + "props=" + 
props + '}';
-    }
-}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/JobCompletionDetectAlgorithmFixture.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/JobCompletionDetectAlgorithmFixture.java
deleted file mode 100644
index 175491a90bf..00000000000
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/JobCompletionDetectAlgorithmFixture.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.fixture;
-
-import lombok.Getter;
-import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
-
-import java.util.Properties;
-
-@Getter
-public final class JobCompletionDetectAlgorithmFixture implements 
JobCompletionDetectAlgorithm<Object> {
-    
-    private Properties props;
-    
-    @Override
-    public void init(final Properties props) {
-        this.props = props;
-    }
-    
-    @Override
-    public boolean isAlmostCompleted(final Object parameter) {
-        return true;
-    }
-    
-    @Override
-    public String getType() {
-        return "FIXTURE";
-    }
-}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
deleted file mode 100644
index 53281a6838d..00000000000
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
-import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
-    
-    @Test(expected = IllegalArgumentException.class)
-    public void assertInitFailInvalidIdleThresholdKey() {
-        Properties props = new Properties();
-        props.setProperty("incremental-task-idle-seconds-threshold", 
"invalid_value");
-        JobCompletionDetectAlgorithmFactory.newInstance(new 
AlgorithmConfiguration("IDLE", props));
-    }
-    
-    @Test(expected = IllegalArgumentException.class)
-    public void assertInitFailNegativeIdleThresholdKey() {
-        Properties props = new Properties();
-        props.setProperty("incremental-task-idle-seconds-threshold", "-8");
-        JobCompletionDetectAlgorithmFactory.newInstance(new 
AlgorithmConfiguration("IDLE", props));
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void assertFalseOnFewJobProgresses() {
-        int jobShardingCount = 2;
-        Collection<InventoryIncrementalJobItemProgress> jobItemProgresses = 
Collections.singleton(new InventoryIncrementalJobItemProgress());
-        RuleAlteredJobAlmostCompletedParameter parameter = new 
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobItemProgresses);
-        assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new 
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void assertFalseOnUnFinishedPosition() {
-        int jobShardingCount = 1;
-        InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
-        jobItemProgress.setInventory(new 
JobItemInventoryTasksProgress(Collections.singletonMap("foo_ds", new 
InventoryTaskProgress(new PlaceholderPosition()))));
-        RuleAlteredJobAlmostCompletedParameter parameter = new 
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, 
Collections.singleton(jobItemProgress));
-        assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new 
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void assertTrueWhenIdleMinutesNotReach() {
-        int jobShardingCount = 1;
-        long latestActiveTimeMillis = System.currentTimeMillis() - 
ThreadLocalRandom.current().nextLong(1, 1800L);
-        InventoryIncrementalJobItemProgress jobItemProgress = 
createJobProgress(latestActiveTimeMillis);
-        RuleAlteredJobAlmostCompletedParameter parameter = new 
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, 
Collections.singleton(jobItemProgress));
-        assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new 
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Test
-    public void assertTrueWhenJobAlmostCompleted() {
-        int jobShardingCount = 1;
-        long latestActiveTimeMillis = System.currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(1800L + 1800L);
-        InventoryIncrementalJobItemProgress jobItemProgress = 
createJobProgress(latestActiveTimeMillis);
-        RuleAlteredJobAlmostCompletedParameter parameter = new 
RuleAlteredJobAlmostCompletedParameter(jobShardingCount, 
Collections.singleton(jobItemProgress));
-        assertTrue(JobCompletionDetectAlgorithmFactory.newInstance(new 
AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter));
-    }
-    
-    private InventoryIncrementalJobItemProgress createJobProgress(final long 
latestActiveTimeMillis) {
-        InventoryIncrementalJobItemProgress result = new 
InventoryIncrementalJobItemProgress();
-        result.setInventory(new 
JobItemInventoryTasksProgress(Collections.singletonMap("foo_ds", new 
InventoryTaskProgress(new FinishedPosition()))));
-        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress();
-        
incrementalTaskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(latestActiveTimeMillis);
-        JobItemIncrementalTasksProgress incremental = new 
JobItemIncrementalTasksProgress(Collections.singletonMap("foo_ds", 
incrementalTaskProgress));
-        result.setIncremental(incremental);
-        return result;
-    }
-}

Reply via email to