This is an automated email from the ASF dual-hosted git repository. rohit pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/main by this push: new 63a0797b180 Introduce scheduled executor wrapper with dynamic interval (#8916) 63a0797b180 is described below commit 63a0797b18046397961c620a39931552d90e0588 Author: Vishesh <vishes...@gmail.com> AuthorDate: Wed Apr 17 15:15:37 2024 +0530 Introduce scheduled executor wrapper with dynamic interval (#8916) * Introduce scheduled executor wrapper with dynamic interval * Add validation for configkey --- .../java/com/cloud/user/ResourceLimitService.java | 2 +- .../config/ConfigKeyScheduledExecutionWrapper.java | 114 +++++++++++++ .../ConfigKeyScheduledExecutionWrapperTest.java | 177 +++++++++++++++++++++ .../resourcelimit/ResourceLimitManagerImpl.java | 10 +- 4 files changed, 297 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/com/cloud/user/ResourceLimitService.java b/api/src/main/java/com/cloud/user/ResourceLimitService.java index 0a64cbb7440..04560df428f 100644 --- a/api/src/main/java/com/cloud/user/ResourceLimitService.java +++ b/api/src/main/java/com/cloud/user/ResourceLimitService.java @@ -38,7 +38,7 @@ public interface ResourceLimitService { static final ConfigKey<Long> MaxProjectSecondaryStorage = new ConfigKey<>("Project Defaults", Long.class, "max.project.secondary.storage", "400", "The default maximum secondary storage space (in GiB) that can be used for a project", false); static final ConfigKey<Long> ResourceCountCheckInterval = new ConfigKey<>("Advanced", Long.class, "resourcecount.check.interval", "300", - "Time (in seconds) to wait before running resource recalculation and fixing task. Default is 300 seconds, Setting this to 0 disables execution of the task", false); + "Time (in seconds) to wait before running resource recalculation and fixing task. Default is 300 seconds, Setting this to 0 disables execution of the task", true); static final ConfigKey<String> ResourceLimitHostTags = new ConfigKey<>("Advanced", String.class, "resource.limit.host.tags", "", "A comma-separated list of tags for host resource limits", true); static final ConfigKey<String> ResourceLimitStorageTags = new ConfigKey<>("Advanced", String.class, "resource.limit.storage.tags", "", diff --git a/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java b/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java new file mode 100644 index 00000000000..b8d7e782971 --- /dev/null +++ b/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.config; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Uses a ScheduledExecutorService and config key to execute a runnable, + * dynamically rescheduling based on the long value of the config key. + * Timing is similar to ScheduledExecutorService.scheduleAtFixedRate(), + * but we look up the next runtime dynamically via the config key. + * <p> + * If config key is zero, this disables the execution. We skip execution + * and check once a minute in order to re-start execution if re-enabled. + */ +public class ConfigKeyScheduledExecutionWrapper implements Runnable { + protected Logger logger = LogManager.getLogger(getClass()); + private final ScheduledExecutorService executorService; + private final Runnable command; + private final ConfigKey<?> configKey; + private final TimeUnit unit; + private long enableIntervalSeconds = 60; + + private void validateArgs(ScheduledExecutorService executorService, Runnable command, ConfigKey<?> configKey) { + if (executorService == null) { + throw new IllegalArgumentException("ExecutorService cannot be null"); + } + if (command == null) { + throw new IllegalArgumentException("Command cannot be null"); + } + if (configKey == null) { + throw new IllegalArgumentException("ConfigKey cannot be null"); + } + if (!(configKey.value() instanceof Long || configKey.value() instanceof Integer)) { + throw new IllegalArgumentException("ConfigKey value must be a Long or Integer"); + } + } + + public ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService executorService, Runnable command, + ConfigKey<?> configKey, TimeUnit unit) { + validateArgs(executorService, command, configKey); + this.executorService = executorService; + this.command = command; + this.configKey = configKey; + this.unit = unit; + } + + protected ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService executorService, Runnable command, + ConfigKey<?> configKey, int enableIntervalSeconds, TimeUnit unit) { + validateArgs(executorService, command, configKey); + this.executorService = executorService; + this.command = command; + this.configKey = configKey; + this.unit = unit; + this.enableIntervalSeconds = enableIntervalSeconds; + } + + public ScheduledFuture<?> start() { + long duration = getConfigValue(); + duration = duration < 0 ? 0 : duration; + return this.executorService.schedule(this, duration, this.unit); + } + + long getConfigValue() { + if (this.configKey.value() instanceof Long) { + return (Long) this.configKey.value(); + } else if (this.configKey.value() instanceof Integer) { + return (Integer) this.configKey.value(); + } else { + throw new IllegalArgumentException("ConfigKey value must be a Long or Integer"); + } + } + + @Override + public void run() { + if (getConfigValue() <= 0) { + executorService.schedule(this, enableIntervalSeconds, TimeUnit.SECONDS); + return; + } + + long startTime = System.nanoTime(); + try { + command.run(); + } catch (Throwable t) { + logger.warn(String.format("Last run of %s encountered an error", this.command.getClass()), t); + } finally { + long elapsed = System.nanoTime() - startTime; + long delay = this.unit.toNanos(getConfigValue()) - elapsed; + delay = delay > 0 ? delay : 0; + executorService.schedule(this, delay, NANOSECONDS); + } + } +} diff --git a/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java b/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java new file mode 100644 index 00000000000..fbb4dc24fca --- /dev/null +++ b/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.config; + +import com.cloud.utils.concurrency.NamedThreadFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isOneOf; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ConfigKeyScheduledExecutionWrapperTest { + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new NamedThreadFactory("TestExecutor")); + + @Mock + ConfigKey<Integer> configKey; + + @Test(expected = IllegalArgumentException.class) + public void nullExecutorTest() { + TestRunnable runnable = new TestRunnable(); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(null, runnable, configKey, TimeUnit.SECONDS); + } + + @Test(expected = IllegalArgumentException.class) + public void nullCommandTest() { + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, null, configKey, TimeUnit.SECONDS); + } + + @Test(expected = IllegalArgumentException.class) + public void nullConfigKeyTest() { + TestRunnable runnable = new TestRunnable(); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, null, TimeUnit.SECONDS); + } + + @Test(expected = IllegalArgumentException.class) + public void invalidConfigKeyTest() { + TestRunnable runnable = new TestRunnable(); + ConfigKey<String> configKey = new ConfigKey<>(String.class, "test", "test", "test", "test", true, + ConfigKey.Scope.Global, null, null, null, null, null, ConfigKey.Kind.CSV, null); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.SECONDS); + } + + @Test + public void scheduleOncePerSecondTest() { + when(configKey.value()).thenReturn(1); + TestRunnable runnable = new TestRunnable(); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.SECONDS); + runner.start(); + + waitSeconds(3); + assertThat("Runnable ran once per second", runnable.getRunCount(), isOneOf(2, 3)); + } + + private void waitSeconds(int seconds) { + try { + Thread.sleep(seconds * 1000L + 100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void scheduleTwicePerSecondTest() { + when(configKey.value()).thenReturn(500); + TestRunnable runnable = new TestRunnable(); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS); + runner.start(); + + waitSeconds(2); + assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4)); + } + + @Test + public void scheduleDynamicTest() { + // start with twice per second, then switch to four times per second + when(configKey.value()).thenReturn(500); + TestRunnable runnable = new TestRunnable(); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS); + runner.start(); + + waitSeconds(2); + assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4)); + + runnable.resetRunCount(); + when(configKey.value()).thenReturn(250); + waitSeconds(2); + assertThat("Runnable ran four times per second", runnable.getRunCount(), isOneOf(7, 8)); + } + + @Test + public void noOverlappingRunsTest() { + when(configKey.value()).thenReturn(200); + TestRunnable runnable = new TestRunnable(1); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, TimeUnit.MILLISECONDS); + runner.start(); + + waitSeconds(3); + assertThat("Slow runnable on tight schedule runs without overlap", runnable.getRunCount(), isOneOf(2, 3)); + } + + @Test + public void temporaryDisableRunsTest() { + // start with twice per second, then disable, then start again + when(configKey.value()).thenReturn(500); + TestRunnable runnable = new TestRunnable(); + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 1, TimeUnit.MILLISECONDS); + runner.start(); + + waitSeconds(2); + assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4)); + + runnable.resetRunCount(); + when(configKey.value()).thenReturn(0); + waitSeconds(2); + assertThat("Runnable ran zero times per second", runnable.getRunCount(), is(0)); + + runnable.resetRunCount(); + when(configKey.value()).thenReturn(500); + waitSeconds(2); + assertThat("Runnable ran twice per second", runnable.getRunCount(), isOneOf(3, 4)); + } + + static class TestRunnable implements Runnable { + private Integer runCount = 0; + private int waitSeconds = 0; + + TestRunnable(int waitSeconds) { + this.waitSeconds = waitSeconds; + } + + TestRunnable() { + } + + @Override + public void run() { + runCount++; + if (waitSeconds > 0) { + try { + Thread.sleep(waitSeconds * 1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + public int getRunCount() { + return this.runCount; + } + + public void resetRunCount() { + this.runCount = 0; + } + } +} diff --git a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java index 11ebc6da251..7962b380adc 100644 --- a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java +++ b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java @@ -43,6 +43,7 @@ import org.apache.cloudstack.api.response.TaggedResourceLimitAndCountResponse; import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine; import org.apache.cloudstack.framework.config.ConfigKey; +import org.apache.cloudstack.framework.config.ConfigKeyScheduledExecutionWrapper; import org.apache.cloudstack.framework.config.Configurable; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.managed.context.ManagedContextRunnable; @@ -197,7 +198,6 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim protected SearchBuilder<ResourceCountVO> ResourceCountSearch; ScheduledExecutorService _rcExecutor; - long _resourceCountCheckInterval = 0; Map<String, Long> accountResourceLimitMap = new HashMap<>(); Map<String, Long> domainResourceLimitMap = new HashMap<>(); Map<String, Long> projectResourceLimitMap = new HashMap<>(); @@ -220,8 +220,9 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim @Override public boolean start() { - if (_resourceCountCheckInterval > 0) { - _rcExecutor.scheduleAtFixedRate(new ResourceCountCheckTask(), _resourceCountCheckInterval, _resourceCountCheckInterval, TimeUnit.SECONDS); + if (ResourceCountCheckInterval.value() >= 0) { + ConfigKeyScheduledExecutionWrapper runner = new ConfigKeyScheduledExecutionWrapper(_rcExecutor, new ResourceCountCheckTask(), ResourceCountCheckInterval, TimeUnit.SECONDS); + runner.start(); } return true; } @@ -258,8 +259,7 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim snapshotSizeSearch.join("snapshots", join2, snapshotSizeSearch.entity().getSnapshotId(), join2.entity().getId(), JoinBuilder.JoinType.INNER); snapshotSizeSearch.done(); - _resourceCountCheckInterval = ResourceCountCheckInterval.value(); - if (_resourceCountCheckInterval > 0) { + if (ResourceCountCheckInterval.value() >= 0) { _rcExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ResourceCountChecker")); }