This is an automated email from the ASF dual-hosted git repository.

rohit pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 63a0797b180 Introduce scheduled executor wrapper with dynamic interval 
(#8916)
63a0797b180 is described below

commit 63a0797b18046397961c620a39931552d90e0588
Author: Vishesh <vishes...@gmail.com>
AuthorDate: Wed Apr 17 15:15:37 2024 +0530

    Introduce scheduled executor wrapper with dynamic interval (#8916)
    
    * Introduce scheduled executor wrapper with dynamic interval
    
    * Add validation for configkey
---
 .../java/com/cloud/user/ResourceLimitService.java  |   2 +-
 .../config/ConfigKeyScheduledExecutionWrapper.java | 114 +++++++++++++
 .../ConfigKeyScheduledExecutionWrapperTest.java    | 177 +++++++++++++++++++++
 .../resourcelimit/ResourceLimitManagerImpl.java    |  10 +-
 4 files changed, 297 insertions(+), 6 deletions(-)

diff --git a/api/src/main/java/com/cloud/user/ResourceLimitService.java 
b/api/src/main/java/com/cloud/user/ResourceLimitService.java
index 0a64cbb7440..04560df428f 100644
--- a/api/src/main/java/com/cloud/user/ResourceLimitService.java
+++ b/api/src/main/java/com/cloud/user/ResourceLimitService.java
@@ -38,7 +38,7 @@ public interface ResourceLimitService {
     static final ConfigKey<Long> MaxProjectSecondaryStorage = new 
ConfigKey<>("Project Defaults", Long.class, "max.project.secondary.storage", 
"400",
             "The default maximum secondary storage space (in GiB) that can be 
used for a project", false);
     static final ConfigKey<Long> ResourceCountCheckInterval = new 
ConfigKey<>("Advanced", Long.class, "resourcecount.check.interval", "300",
-            "Time (in seconds) to wait before running resource recalculation 
and fixing task. Default is 300 seconds, Setting this to 0 disables execution 
of the task", false);
+            "Time (in seconds) to wait before running resource recalculation 
and fixing task. Default is 300 seconds, Setting this to 0 disables execution 
of the task", true);
     static final ConfigKey<String> ResourceLimitHostTags = new 
ConfigKey<>("Advanced", String.class, "resource.limit.host.tags", "",
             "A comma-separated list of tags for host resource limits", true);
     static final ConfigKey<String> ResourceLimitStorageTags = new 
ConfigKey<>("Advanced", String.class, "resource.limit.storage.tags", "",
diff --git 
a/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java
 
b/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java
new file mode 100644
index 00000000000..b8d7e782971
--- /dev/null
+++ 
b/framework/config/src/main/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapper.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.config;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Uses a ScheduledExecutorService and config key to execute a runnable,
+ * dynamically rescheduling based on the long value of the config key.
+ * Timing is similar to ScheduledExecutorService.scheduleAtFixedRate(),
+ * but we look up the next runtime dynamically via the config key.
+ * <p>
+ * If config key is zero, this disables the execution. We skip execution
+ * and check once a minute in order to re-start execution if re-enabled.
+ */
+public class ConfigKeyScheduledExecutionWrapper implements Runnable {
+    protected Logger logger = LogManager.getLogger(getClass());
+    private final ScheduledExecutorService executorService;
+    private final Runnable command;
+    private final ConfigKey<?> configKey;
+    private final TimeUnit unit;
+    private long enableIntervalSeconds = 60;
+
+    private void validateArgs(ScheduledExecutorService executorService, 
Runnable command, ConfigKey<?> configKey) {
+        if (executorService == null) {
+            throw new IllegalArgumentException("ExecutorService cannot be 
null");
+        }
+        if (command == null) {
+            throw new IllegalArgumentException("Command cannot be null");
+        }
+        if (configKey == null) {
+            throw new IllegalArgumentException("ConfigKey cannot be null");
+        }
+        if (!(configKey.value() instanceof Long || configKey.value() 
instanceof Integer)) {
+            throw new IllegalArgumentException("ConfigKey value must be a Long 
or Integer");
+        }
+    }
+
+    public ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService 
executorService, Runnable command,
+            ConfigKey<?> configKey, TimeUnit unit) {
+        validateArgs(executorService, command, configKey);
+        this.executorService = executorService;
+        this.command = command;
+        this.configKey = configKey;
+        this.unit = unit;
+    }
+
+    protected ConfigKeyScheduledExecutionWrapper(ScheduledExecutorService 
executorService, Runnable command,
+            ConfigKey<?> configKey, int enableIntervalSeconds, TimeUnit unit) {
+        validateArgs(executorService, command, configKey);
+        this.executorService = executorService;
+        this.command = command;
+        this.configKey = configKey;
+        this.unit = unit;
+        this.enableIntervalSeconds = enableIntervalSeconds;
+    }
+
+    public ScheduledFuture<?> start() {
+        long duration = getConfigValue();
+        duration = duration < 0 ? 0 : duration;
+        return this.executorService.schedule(this, duration, this.unit);
+    }
+
+    long getConfigValue() {
+        if (this.configKey.value() instanceof Long) {
+            return (Long) this.configKey.value();
+        } else if (this.configKey.value() instanceof Integer) {
+            return (Integer) this.configKey.value();
+        } else {
+            throw new IllegalArgumentException("ConfigKey value must be a Long 
or Integer");
+        }
+    }
+
+    @Override
+    public void run() {
+        if (getConfigValue() <= 0) {
+            executorService.schedule(this, enableIntervalSeconds, 
TimeUnit.SECONDS);
+            return;
+        }
+
+        long startTime = System.nanoTime();
+        try {
+            command.run();
+        } catch (Throwable t) {
+            logger.warn(String.format("Last run of %s encountered an error", 
this.command.getClass()), t);
+        } finally {
+            long elapsed = System.nanoTime() - startTime;
+            long delay = this.unit.toNanos(getConfigValue()) - elapsed;
+            delay = delay > 0 ? delay : 0;
+            executorService.schedule(this, delay, NANOSECONDS);
+        }
+    }
+}
diff --git 
a/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java
 
b/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java
new file mode 100644
index 00000000000..fbb4dc24fca
--- /dev/null
+++ 
b/framework/config/src/test/java/org/apache/cloudstack/framework/config/ConfigKeyScheduledExecutionWrapperTest.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.config;
+
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConfigKeyScheduledExecutionWrapperTest {
+    private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1, new NamedThreadFactory("TestExecutor"));
+
+    @Mock
+    ConfigKey<Integer> configKey;
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nullExecutorTest() {
+        TestRunnable runnable = new TestRunnable();
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(null, runnable, configKey, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nullCommandTest() {
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, null, configKey, 
TimeUnit.SECONDS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nullConfigKeyTest() {
+        TestRunnable runnable = new TestRunnable();
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, runnable, null, 
TimeUnit.SECONDS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void invalidConfigKeyTest() {
+        TestRunnable runnable = new TestRunnable();
+        ConfigKey<String> configKey = new ConfigKey<>(String.class, "test", 
"test", "test", "test", true,
+                ConfigKey.Scope.Global, null, null, null, null, null, 
ConfigKey.Kind.CSV, null);
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 
TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void scheduleOncePerSecondTest() {
+        when(configKey.value()).thenReturn(1);
+        TestRunnable runnable = new TestRunnable();
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 
TimeUnit.SECONDS);
+        runner.start();
+
+        waitSeconds(3);
+        assertThat("Runnable ran once per second", runnable.getRunCount(), 
isOneOf(2, 3));
+    }
+
+    private void waitSeconds(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000L + 100);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void scheduleTwicePerSecondTest() {
+        when(configKey.value()).thenReturn(500);
+        TestRunnable runnable = new TestRunnable();
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 
TimeUnit.MILLISECONDS);
+        runner.start();
+
+        waitSeconds(2);
+        assertThat("Runnable ran twice per second", runnable.getRunCount(), 
isOneOf(3, 4));
+    }
+
+    @Test
+    public void scheduleDynamicTest() {
+        // start with twice per second, then switch to four times per second
+        when(configKey.value()).thenReturn(500);
+        TestRunnable runnable = new TestRunnable();
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 
TimeUnit.MILLISECONDS);
+        runner.start();
+
+        waitSeconds(2);
+        assertThat("Runnable ran twice per second", runnable.getRunCount(), 
isOneOf(3, 4));
+
+        runnable.resetRunCount();
+        when(configKey.value()).thenReturn(250);
+        waitSeconds(2);
+        assertThat("Runnable ran four times per second", 
runnable.getRunCount(), isOneOf(7, 8));
+    }
+
+    @Test
+    public void noOverlappingRunsTest() {
+        when(configKey.value()).thenReturn(200);
+        TestRunnable runnable = new TestRunnable(1);
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 
TimeUnit.MILLISECONDS);
+        runner.start();
+
+        waitSeconds(3);
+        assertThat("Slow runnable on tight schedule runs without overlap", 
runnable.getRunCount(), isOneOf(2, 3));
+    }
+
+    @Test
+    public void temporaryDisableRunsTest() {
+        // start with twice per second, then disable, then start again
+        when(configKey.value()).thenReturn(500);
+        TestRunnable runnable = new TestRunnable();
+        ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(executorService, runnable, configKey, 1, 
TimeUnit.MILLISECONDS);
+        runner.start();
+
+        waitSeconds(2);
+        assertThat("Runnable ran twice per second", runnable.getRunCount(), 
isOneOf(3, 4));
+
+        runnable.resetRunCount();
+        when(configKey.value()).thenReturn(0);
+        waitSeconds(2);
+        assertThat("Runnable ran zero times per second", 
runnable.getRunCount(), is(0));
+
+        runnable.resetRunCount();
+        when(configKey.value()).thenReturn(500);
+        waitSeconds(2);
+        assertThat("Runnable ran twice per second", runnable.getRunCount(), 
isOneOf(3, 4));
+    }
+
+    static class TestRunnable implements Runnable {
+        private Integer runCount = 0;
+        private int waitSeconds = 0;
+
+        TestRunnable(int waitSeconds) {
+            this.waitSeconds = waitSeconds;
+        }
+
+        TestRunnable() {
+        }
+
+        @Override
+        public void run() {
+            runCount++;
+            if (waitSeconds > 0) {
+                try {
+                    Thread.sleep(waitSeconds * 1000L);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public int getRunCount() {
+            return this.runCount;
+        }
+
+        public void resetRunCount() {
+            this.runCount = 0;
+        }
+    }
+}
diff --git 
a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java 
b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
index 11ebc6da251..7962b380adc 100644
--- a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
+++ b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
@@ -43,6 +43,7 @@ import 
org.apache.cloudstack.api.response.TaggedResourceLimitAndCountResponse;
 import org.apache.cloudstack.context.CallContext;
 import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
 import org.apache.cloudstack.framework.config.ConfigKey;
+import 
org.apache.cloudstack.framework.config.ConfigKeyScheduledExecutionWrapper;
 import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@@ -197,7 +198,6 @@ public class ResourceLimitManagerImpl extends ManagerBase 
implements ResourceLim
 
     protected SearchBuilder<ResourceCountVO> ResourceCountSearch;
     ScheduledExecutorService _rcExecutor;
-    long _resourceCountCheckInterval = 0;
     Map<String, Long> accountResourceLimitMap = new HashMap<>();
     Map<String, Long> domainResourceLimitMap = new HashMap<>();
     Map<String, Long> projectResourceLimitMap = new HashMap<>();
@@ -220,8 +220,9 @@ public class ResourceLimitManagerImpl extends ManagerBase 
implements ResourceLim
 
     @Override
     public boolean start() {
-        if (_resourceCountCheckInterval > 0) {
-            _rcExecutor.scheduleAtFixedRate(new ResourceCountCheckTask(), 
_resourceCountCheckInterval, _resourceCountCheckInterval, TimeUnit.SECONDS);
+        if (ResourceCountCheckInterval.value() >= 0) {
+            ConfigKeyScheduledExecutionWrapper runner = new 
ConfigKeyScheduledExecutionWrapper(_rcExecutor, new ResourceCountCheckTask(), 
ResourceCountCheckInterval, TimeUnit.SECONDS);
+            runner.start();
         }
         return true;
     }
@@ -258,8 +259,7 @@ public class ResourceLimitManagerImpl extends ManagerBase 
implements ResourceLim
         snapshotSizeSearch.join("snapshots", join2, 
snapshotSizeSearch.entity().getSnapshotId(), join2.entity().getId(), 
JoinBuilder.JoinType.INNER);
         snapshotSizeSearch.done();
 
-        _resourceCountCheckInterval = ResourceCountCheckInterval.value();
-        if (_resourceCountCheckInterval > 0) {
+        if (ResourceCountCheckInterval.value() >= 0) {
             _rcExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("ResourceCountChecker"));
         }
 

Reply via email to