This is an automated email from the ASF dual-hosted git repository.

mkataria pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/1.22 by this push:
     new 372a5d5a7a OAK-10225 Utility to rate limit writes in case async 
indexing is delayed (#1048)
372a5d5a7a is described below

commit 372a5d5a7aeaa902b09ca96a66a53ec2524f88ba
Author: Mohit Kataria <[email protected]>
AuthorDate: Fri Aug 4 17:34:56 2023 +0530

    OAK-10225 Utility to rate limit writes in case async indexing is delayed 
(#1048)
    
    * OAK-10225 Utility to rate limit writes in case async indexing is delayed
    
    * Update 
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
    
    Co-authored-by: Fabrizio Fortino <[email protected]>
    
    * OAK-10225 Utility to rate limit writes in case async indexing is delayed
    
    ---------
    
    Co-authored-by: Thomas Mueller <[email protected]>
    Co-authored-by: Fabrizio Fortino <[email protected]>
---
 .../oak/plugins/index/RateLimitUtils.java          | 138 +++++++++++++++++++++
 .../oak/plugins/index/RateLimitTest.java           |  67 ++++++++++
 2 files changed, 205 insertions(+)

diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
new file mode 100644
index 0000000000..a6dd017cfe
--- /dev/null
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import java.lang.management.ManagementFactory;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Calendar;
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RateLimitUtils {
+    
+    private final static Logger LOG = 
LoggerFactory.getLogger(RateLimitUtils.class);    
+
+    // when getOldestAsyncIndexUpdate was called the last time 
+    private static Instant lastJmxBeanCall = Instant.now();
+
+    // the return value of the last call to getOldestAsyncIndexUpdate
+    private static Instant cachedOldestAsyncUpdate = Instant.now();
+    
+    // the last time the rateLimitWrites was called
+    private static Instant lastRateLimitCall = Instant.now();
+
+    private RateLimitUtils() {
+    }
+    
+    /**
+     * Rate limit writes in case indexes are lagging behind too far. The method
+     * returns immediately if all async indexes are up-to-date (updated in the 
last
+     * 30 seconds).
+     * 
+     * If indexing lanes are lagging behind, however, the method will wait 
(using
+     * Thread.sleep) for at most 1 minute. If the method is called more than 
once
+     * per minute, it will sleep for at most the time that passed until the 
last
+     * call; that is, an application that is calling it a lot will be paused 
for up
+     * to 50%. This assumes indexes will be able to catch up in this situation.
+     * 
+     * @return the number of milliseconds the call was sleeping
+     */
+    public static long rateLimitWrites() {
+        Instant now = Instant.now();
+        Duration delay = Duration.between(getOldestAsyncIndexUpdate(now), now);
+        return rateLimit(now, delay, true);
+    }
+    
+    static long rateLimit(Instant now, Duration delay, boolean 
callThreadSleep) {
+        long sleep;
+        if (delay.getSeconds()  < 30) {
+            // less than 30 seconds: no need to wait
+            sleep = 0;
+        } else {
+            if (delay.toMinutes() > 60) {
+                LOG.warn("Indexing is delayed for {} minutes", 
delay.toMinutes());
+            }
+            // sleep for as long as the duration between the last call and 
this call
+            sleep = Duration.between(lastRateLimitCall, now).toMillis();
+            // sleep at most 1 minute
+            sleep = Math.min(sleep, Duration.ofMinutes(1).toMillis());
+            // ensure we don't try to sleep negative values (it would throw 
IllegalArgumentException)
+            sleep = Math.max(0, sleep);
+            if (callThreadSleep) {
+                try {
+                    Thread.sleep(sleep);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            now = now.plus(Duration.ofMillis(sleep));
+        }
+        lastRateLimitCall = now;
+        return sleep;
+    }
+    
+    /**
+     * Get the timestamp of the oldest indexing lane. In the normal case, this 
is at
+     * most 5 seconds ago, if indexing lanes are updated every 5 seconds. If
+     * indexing is delayed, or paused, this can be in the past.
+     * 
+     * The method can be called often without issues. If the last call was 
less than
+     * one second ago, the last returned value is returned. This is to avoid
+     * unnecessary calls to the JMX beans.
+     * 
+     * @param now the current time
+     * @return the timestamp of the oldest indexing lane
+     */
+    private static Instant getOldestAsyncIndexUpdate(Instant now) {
+        if (Duration.between(lastJmxBeanCall, now).getSeconds() < 1) {
+            return cachedOldestAsyncUpdate;
+        }
+        lastJmxBeanCall = now;
+        Instant oldestAsyncUpdate = now;
+        MBeanServerConnection server = 
ManagementFactory.getPlatformMBeanServer();
+        try {
+            Set<ObjectName> beanNames;
+            beanNames = server.queryNames(new ObjectName(
+                "org.apache.jackrabbit.oak:type=IndexStats,*"), null);
+            for (ObjectName objectName : beanNames) {
+                Object time = server.getAttribute(objectName, "Done");
+                if (time == null || time.toString().isEmpty()) {
+                    time = server.getAttribute(objectName, "LastIndexedTime");
+                }
+                if (time != null && !time.toString().isEmpty()) {
+                    Calendar cal = ISO8601.parse(time.toString());
+                    Instant doneInstant = 
Instant.ofEpochMilli(cal.getTimeInMillis());
+                    if (doneInstant.isBefore(oldestAsyncUpdate)) {
+                        oldestAsyncUpdate = doneInstant;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Could not retrieve the async lane state", e);
+        }
+        cachedOldestAsyncUpdate = oldestAsyncUpdate;
+        return oldestAsyncUpdate;
+    }
+
+}
diff --git 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/RateLimitTest.java
 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/RateLimitTest.java
new file mode 100644
index 0000000000..91e0382d7a
--- /dev/null
+++ 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/RateLimitTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.junit.Assert.assertEquals;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import org.junit.Test;
+
+public class RateLimitTest {
+
+    @Test
+    public void rateLimitNoDelay() {
+        Instant now = Instant.now();
+        // reset the rate limit
+        RateLimitUtils.rateLimit(now, Duration.ofSeconds(0), false);
+        for (int i = 0; i < 30; i++) {
+            assertEquals(0, RateLimitUtils.rateLimit(now, 
Duration.ofSeconds(i), false));
+            now = now.plus(Duration.ofSeconds(1));
+        }
+    }
+
+    @Test
+    public void rateLimitDelay() {
+        Instant now = Instant.now();
+        // reset the rate limit
+        RateLimitUtils.rateLimit(now, Duration.ofSeconds(0), false);
+        now = now.plus(Duration.ofSeconds(1));
+        for (int i = 30; i < 60; i++) {
+            long sleep = RateLimitUtils.rateLimit(now, Duration.ofSeconds(i), 
false);
+            assertEquals(1000, sleep);
+            now = now.plus(Duration.ofMillis(sleep));
+            now = now.plus(Duration.ofSeconds(1));
+        }        
+    }
+    
+    @Test
+    public void rateLimitMaxOneMinute() {
+        Instant now = Instant.now();
+        // reset the rate limit
+        RateLimitUtils.rateLimit(now, Duration.ofSeconds(0), false);
+        now = now.plus(Duration.ofMinutes(1));
+        for (int i = 60; i < 120; i++) {
+            long sleep = RateLimitUtils.rateLimit(now, Duration.ofMinutes(i), 
false);
+            assertEquals(60 * 1000, sleep);
+            now = now.plus(Duration.ofMillis(sleep));
+            now = now.plus(Duration.ofMinutes(10));
+        }        
+    }
+    
+}

Reply via email to