This is an automated email from the ASF dual-hosted git repository.
mkataria pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/1.22 by this push:
new 372a5d5a7a OAK-10225 Utility to rate limit writes in case async
indexing is delayed (#1048)
372a5d5a7a is described below
commit 372a5d5a7aeaa902b09ca96a66a53ec2524f88ba
Author: Mohit Kataria <[email protected]>
AuthorDate: Fri Aug 4 17:34:56 2023 +0530
OAK-10225 Utility to rate limit writes in case async indexing is delayed
(#1048)
* OAK-10225 Utility to rate limit writes in case async indexing is delayed
* Update
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
Co-authored-by: Fabrizio Fortino <[email protected]>
* OAK-10225 Utility to rate limit writes in case async indexing is delayed
---------
Co-authored-by: Thomas Mueller <[email protected]>
Co-authored-by: Fabrizio Fortino <[email protected]>
---
.../oak/plugins/index/RateLimitUtils.java | 138 +++++++++++++++++++++
.../oak/plugins/index/RateLimitTest.java | 67 ++++++++++
2 files changed, 205 insertions(+)
diff --git
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
new file mode 100644
index 0000000000..a6dd017cfe
--- /dev/null
+++
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/RateLimitUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import java.lang.management.ManagementFactory;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Calendar;
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RateLimitUtils {
+
+ private final static Logger LOG =
LoggerFactory.getLogger(RateLimitUtils.class);
+
+ // when getOldestAsyncIndexUpdate was called the last time
+ private static Instant lastJmxBeanCall = Instant.now();
+
+ // the return value of the last call to getOldestAsyncIndexUpdate
+ private static Instant cachedOldestAsyncUpdate = Instant.now();
+
+ // the last time the rateLimitWrites was called
+ private static Instant lastRateLimitCall = Instant.now();
+
+ private RateLimitUtils() {
+ }
+
+ /**
+ * Rate limit writes in case indexes are lagging behind too far. The method
+ * returns immediately if all async indexes are up-to-date (updated in the
last
+ * 30 seconds).
+ *
+ * If indexing lanes are lagging behind, however, the method will wait
(using
+ * Thread.sleep) for at most 1 minute. If the method is called more than
once
+ * per minute, it will sleep for at most the time that passed until the
last
+ * call; that is, an application that is calling it a lot will be paused
for up
+ * to 50%. This assumes indexes will be able to catch up in this situation.
+ *
+ * @return the number of milliseconds the call was sleeping
+ */
+ public static long rateLimitWrites() {
+ Instant now = Instant.now();
+ Duration delay = Duration.between(getOldestAsyncIndexUpdate(now), now);
+ return rateLimit(now, delay, true);
+ }
+
+ static long rateLimit(Instant now, Duration delay, boolean
callThreadSleep) {
+ long sleep;
+ if (delay.getSeconds() < 30) {
+ // less than 30 seconds: no need to wait
+ sleep = 0;
+ } else {
+ if (delay.toMinutes() > 60) {
+ LOG.warn("Indexing is delayed for {} minutes",
delay.toMinutes());
+ }
+ // sleep for as long as the duration between the last call and
this call
+ sleep = Duration.between(lastRateLimitCall, now).toMillis();
+ // sleep at most 1 minute
+ sleep = Math.min(sleep, Duration.ofMinutes(1).toMillis());
+ // ensure we don't try to sleep negative values (it would throw
IllegalArgumentException)
+ sleep = Math.max(0, sleep);
+ if (callThreadSleep) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ now = now.plus(Duration.ofMillis(sleep));
+ }
+ lastRateLimitCall = now;
+ return sleep;
+ }
+
+ /**
+ * Get the timestamp of the oldest indexing lane. In the normal case, this
is at
+ * most 5 seconds ago, if indexing lanes are updated every 5 seconds. If
+ * indexing is delayed, or paused, this can be in the past.
+ *
+ * The method can be called often without issues. If the last call was
less than
+ * one second ago, the last returned value is returned. This is to avoid
+ * unnecessary calls to the JMX beans.
+ *
+ * @param now the current time
+ * @return the timestamp of the oldest indexing lane
+ */
+ private static Instant getOldestAsyncIndexUpdate(Instant now) {
+ if (Duration.between(lastJmxBeanCall, now).getSeconds() < 1) {
+ return cachedOldestAsyncUpdate;
+ }
+ lastJmxBeanCall = now;
+ Instant oldestAsyncUpdate = now;
+ MBeanServerConnection server =
ManagementFactory.getPlatformMBeanServer();
+ try {
+ Set<ObjectName> beanNames;
+ beanNames = server.queryNames(new ObjectName(
+ "org.apache.jackrabbit.oak:type=IndexStats,*"), null);
+ for (ObjectName objectName : beanNames) {
+ Object time = server.getAttribute(objectName, "Done");
+ if (time == null || time.toString().isEmpty()) {
+ time = server.getAttribute(objectName, "LastIndexedTime");
+ }
+ if (time != null && !time.toString().isEmpty()) {
+ Calendar cal = ISO8601.parse(time.toString());
+ Instant doneInstant =
Instant.ofEpochMilli(cal.getTimeInMillis());
+ if (doneInstant.isBefore(oldestAsyncUpdate)) {
+ oldestAsyncUpdate = doneInstant;
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not retrieve the async lane state", e);
+ }
+ cachedOldestAsyncUpdate = oldestAsyncUpdate;
+ return oldestAsyncUpdate;
+ }
+
+}
diff --git
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/RateLimitTest.java
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/RateLimitTest.java
new file mode 100644
index 0000000000..91e0382d7a
--- /dev/null
+++
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/RateLimitTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.junit.Assert.assertEquals;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import org.junit.Test;
+
+public class RateLimitTest {
+
+ @Test
+ public void rateLimitNoDelay() {
+ Instant now = Instant.now();
+ // reset the rate limit
+ RateLimitUtils.rateLimit(now, Duration.ofSeconds(0), false);
+ for (int i = 0; i < 30; i++) {
+ assertEquals(0, RateLimitUtils.rateLimit(now,
Duration.ofSeconds(i), false));
+ now = now.plus(Duration.ofSeconds(1));
+ }
+ }
+
+ @Test
+ public void rateLimitDelay() {
+ Instant now = Instant.now();
+ // reset the rate limit
+ RateLimitUtils.rateLimit(now, Duration.ofSeconds(0), false);
+ now = now.plus(Duration.ofSeconds(1));
+ for (int i = 30; i < 60; i++) {
+ long sleep = RateLimitUtils.rateLimit(now, Duration.ofSeconds(i),
false);
+ assertEquals(1000, sleep);
+ now = now.plus(Duration.ofMillis(sleep));
+ now = now.plus(Duration.ofSeconds(1));
+ }
+ }
+
+ @Test
+ public void rateLimitMaxOneMinute() {
+ Instant now = Instant.now();
+ // reset the rate limit
+ RateLimitUtils.rateLimit(now, Duration.ofSeconds(0), false);
+ now = now.plus(Duration.ofMinutes(1));
+ for (int i = 60; i < 120; i++) {
+ long sleep = RateLimitUtils.rateLimit(now, Duration.ofMinutes(i),
false);
+ assertEquals(60 * 1000, sleep);
+ now = now.plus(Duration.ofMillis(sleep));
+ now = now.plus(Duration.ofMinutes(10));
+ }
+ }
+
+}