denis-chudov commented on code in PR #7680: URL: https://github.com/apache/ignite-3/pull/7680#discussion_r2940113716
########## modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link ExponentialBackoffTimeoutStrategy}. + * + * <p>Verifies the correctness of exponential timeout progression, the maximum timeout + * ceiling, and optional jitter behavior. Tests use predictable integer arithmetic to + * make expected values easy to verify by hand. + */ +public class ExponentialBackoffTimeoutStrategyTest { + /** Initial timeout passed to {@link TimeoutStrategy#next(int)} as the starting value. */ + private static final int INITIAL_TIMEOUT = 20; + + /** Maximum timeout the strategy is allowed to produce. */ + private static final int MAX_TIMEOUT = 100; + + /** Backoff coefficient used by the default strategy instance. Doubles each timeout step. */ + private static final double BACKOFF_COEFFICIENT = 2.0; + + /** Strategy instance under test, recreated before each test. */ + private TimeoutStrategy timeoutStrategy; + + /** + * Creates a fresh {@link ExponentialBackoffTimeoutStrategy} with {@link #MAX_TIMEOUT} + * and {@link #BACKOFF_COEFFICIENT}, without jitter, before each test. + */ + @BeforeEach + void setUp() { + timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT, BACKOFF_COEFFICIENT); + } + + /** + * Verifies that a single call to {@link TimeoutStrategy#next(int)} returns + * {@code currentTimeout * backoffCoefficient}. + * + * <p>This is the core contract of the exponential strategy — each step multiplies + * the current timeout by the configured coefficient. + */ + @Test + void testGettingNextTimeout() { + assertEquals(BACKOFF_COEFFICIENT * INITIAL_TIMEOUT, timeoutStrategy.next(INITIAL_TIMEOUT)); + } + + /** + * Verifies that the timeout progression reaches {@link #MAX_TIMEOUT} within the + * expected number of steps and does not exceed it on subsequent calls. + * + * <p>The upper bound on steps is computed from the coefficient and the ratio of + * {@code MAX_TIMEOUT} to {@code INITIAL_TIMEOUT}. If the strategy fails to reach + * the cap within this bound, the test fails with a descriptive message. Once the + * cap is reached, a further call to {@link TimeoutStrategy#next(int)} must return + * exactly {@link #MAX_TIMEOUT}. + */ + @Test + void testMaxTimeoutNotExceeded() { + int maxSteps = 3; + int steps = 0; + + int timeout = INITIAL_TIMEOUT; + do { + timeout = timeoutStrategy.next(timeout); + + assertTrue(++steps <= maxSteps, + "Strategy did not reach MAX_TIMEOUT within expected steps, last timeout: " + timeout); + } while (timeout < MAX_TIMEOUT); + + assertEquals(MAX_TIMEOUT, timeout); + assertEquals(MAX_TIMEOUT, timeoutStrategy.next(timeout)); + } + + /** + * Verifies that when jitter is enabled, the returned timeout falls within the + * expected randomized range {@code [initialTimeout, initialTimeout * coefficient^2)}. + * + * <p>A separate strategy instance with jitter enabled is created for this test. + * The lower bound confirms the jitter does not produce values below the initial + * timeout; the upper bound confirms it does not jump more than two coefficient + * steps in a single call. + */ + @Test + void testJitterApplying() { Review Comment: Please add the test checking that jitter never causes the exceeding of max timeout ########## modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * A {@link TimeoutStrategy} that increases retry timeouts exponentially on each attempt. + * + * <p>Each call to {@link #next(int)} multiplies the current timeout by {@code backoffCoefficient}, + * capping the result at {@link #maxTimeout()}. Optionally, random jitter can be applied to spread + * retry attempts across time and avoid thundering herd problems under high concurrency. + * + * <p>When jitter is enabled, the returned timeout is randomized within the range + * {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout()}. + * + * <p>This class is stateless and thread-safe. A single instance can be shared across + * multiple retry contexts. + */ +public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy { + /** Default backoff coefficient applied on each retry step. Doubles the timeout per attempt. */ + private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0; + + /** + * Multiplier applied to the current timeout on each call to {@link #next(int)}. + * Must be greater than {@code 1.0} to produce a growing sequence. + */ + private final double backoffCoefficient; + + /** + * Whether to apply random jitter to the computed timeout. + * When {@code true}, the result is randomized within {@code [raw / 2, raw * 1.5]}. + */ + private final boolean jitter; + + /** + * Upper bound for the timeout produced by this strategy, in milliseconds. + * The result of {@link #next(int)} is always capped at this value. + */ + private final int maxTimeout; + + /** + * Creates a strategy with default max timeout and backoff coefficient, and no jitter. + * + * @see TimeoutStrategy#DEFAULT_TIMEOUT_MS_MAX + */ + public ExponentialBackoffTimeoutStrategy() { + this(DEFAULT_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT); + } + + /** + * Creates a strategy with the given max timeout and backoff coefficient, and no jitter. + * + * @param maxTimeout maximum timeout this strategy may produce, in milliseconds. + * @param backoffCoefficient multiplier applied to the current timeout on each step. + * Must be greater than {@code 1.0}. + */ + public ExponentialBackoffTimeoutStrategy( + int maxTimeout, + double backoffCoefficient + ) { + this(maxTimeout, backoffCoefficient, false); + } + + /** + * Creates a strategy with the given max timeout, backoff coefficient, and jitter setting. + * + * @param maxTimeout maximum timeout this strategy may produce, in milliseconds. + * @param backoffCoefficient multiplier applied to the current timeout on each step. + * Must be greater than {@code 1.0}. + * @param jitter if {@code true}, random jitter is applied to each computed timeout. + */ + public ExponentialBackoffTimeoutStrategy( + int maxTimeout, + double backoffCoefficient, + boolean jitter + ) { + this.maxTimeout = maxTimeout; + this.backoffCoefficient = backoffCoefficient; + this.jitter = jitter; + } + + /** + * Computes the next retry timeout by multiplying {@code currentTimeout} by + * {@link #backoffCoefficient}, then capping at {@link #maxTimeout}. + * If jitter is enabled, the result is further randomized via {@link #applyJitter(int)}. + * + * @param currentTimeout current retry timeout in milliseconds. + * @return next retry timeout in milliseconds, capped at {@link #maxTimeout}. + */ + @Override + public int next(int currentTimeout) { + int raw = (int) Math.min((currentTimeout * backoffCoefficient), maxTimeout); + + return jitter ? applyJitter(raw) : raw; Review Comment: here you have two `Math.min` - here and in `applyJitter`. I would suggest to apply max timeout after the jitter ########## modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; +import static org.apache.ignite.internal.util.retry.TimeoutState.attempt; +import static org.apache.ignite.internal.util.retry.TimeoutState.timeout; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.jetbrains.annotations.TestOnly; + +/** + * A retry context that tracks timeout state independently per key. + * + * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff progression + * for different retry targets — for example, different replication group IDs or transaction IDs. + * State updates are performed atomically per key using {@link ConcurrentHashMap#compute}. + * + * <p>To prevent unbounded memory growth, the registry is capped at {@link #REGISTRY_SIZE_LIMIT} + * entries. Once the limit is reached, untracked keys receive a fixed {@link #fallbackTimeoutState} + * that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a soft cap and may be + * slightly exceeded under concurrent insertions. + * + * <p>This class is thread-safe. + */ +public class KeyBasedRetryContext { + /** + * Maximum number of keys tracked in {@link #registry}. + * Can be slightly exceeded under concurrent insertions. See class-level Javadoc. + */ + private static final int REGISTRY_SIZE_LIMIT = 1_000; + + /** + * Timeout used when creating a new {@link TimeoutState} for a key that has no prior state. + * Also used as the reset value when a key's state is removed. + */ + private final int initialTimeout; + + /** Strategy used to compute the next timeout from the current one on each advancement. */ + private final TimeoutStrategy timeoutStrategy; + + /** + * Sentinel state returned for keys that cannot be tracked because the registry is full. + * Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt {@code -1} to distinguish + * it from legitimately tracked states. + */ + private final TimeoutState fallbackTimeoutState; + + /** Per-key timeout state registry. Keys are typically transaction IDs or replication group IDs. */ + private final ConcurrentHashMap<String, TimeoutState> registry = new ConcurrentHashMap<>(); + + /** + * Creates a new context with the given initial timeout and strategy. + * + * @param initialTimeout timeout used for the first retry attempt of any new key, in milliseconds. + * @param timeoutStrategy strategy used to compute subsequent timeout values. + */ + public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy timeoutStrategy) { + this.initialTimeout = initialTimeout; + this.timeoutStrategy = timeoutStrategy; + + this.fallbackTimeoutState = new TimeoutState(timeoutStrategy.maxTimeout(), -1); + } + + /** + * Returns the current {@link TimeoutState} for the given key, if tracked. + * + * <p>Returns an empty {@link Optional} if the key has no recorded state yet. + * If the registry is full and the key is not already tracked, returns + * {@link Optional} containing the {@link #fallbackTimeoutState}. + * + * <p>This method does not insert the key into the registry. + * + * @param key the key to look up, typically a transaction ID or replication group ID. + * @return current state for the key, fallback state if registry is full, or empty if not tracked. + */ + public Optional<TimeoutState> getState(String key) { + if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) { + return of(fallbackTimeoutState); + } + + return ofNullable(registry.get(key)); + } + + /** + * Atomically advances the retry state for the given key and returns the updated state. + * + * <p>If the key has no prior state, a new {@link TimeoutState} is created with + * {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the timeout is + * advanced using {@link TimeoutStrategy#next(int)} and the attempt count is incremented. + * + * <p>The update is performed inside {@link ConcurrentHashMap#compute}, which holds + * an exclusive per-key lock for the duration of the lambda. The CAS on the + * {@link TimeoutState}'s internal {@link AtomicLong} is therefore always expected Review Comment: Please add qualifier ########## modules/core/src/main/java/org/apache/ignite/internal/util/retry/SharedRetryContext.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Optional.ofNullable; +import static org.apache.ignite.internal.util.retry.TimeoutState.attempt; +import static org.apache.ignite.internal.util.retry.TimeoutState.timeout; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A retry context that tracks a single shared timeout state across all callers. + * + * <p>Unlike {@link KeyBasedRetryContext}, this context does not distinguish between + * retry targets — all callers advance and observe the same {@link TimeoutState}. + * This is appropriate when a single backoff sequence should govern all retries + * regardless of which operation is being retried. + * + * <p>The state is initialized lazily on the first call to {@link #updateAndGetState()}, + * and can be reset to {@code null} via {@link #resetState()}, allowing the progression + * to restart from scratch. {@link #getState()} returns an empty {@link Optional} before + * the first call and after a reset. + * + * <p>Concurrent calls to {@link #updateAndGetState()} and {@link #resetState()} are safe. + * The {@link AtomicReference} handles structural transitions ({@code null ↔ initialized}), + * while the {@link TimeoutState}'s internal {@link AtomicLong} CAS handles concurrent Review Comment: Please add qualifier. ########## modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; +import static org.apache.ignite.internal.util.retry.TimeoutState.attempt; +import static org.apache.ignite.internal.util.retry.TimeoutState.timeout; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.jetbrains.annotations.TestOnly; + +/** + * A retry context that tracks timeout state independently per key. + * + * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff progression + * for different retry targets — for example, different replication group IDs or transaction IDs. + * State updates are performed atomically per key using {@link ConcurrentHashMap#compute}. + * + * <p>To prevent unbounded memory growth, the registry is capped at {@link #REGISTRY_SIZE_LIMIT} + * entries. Once the limit is reached, untracked keys receive a fixed {@link #fallbackTimeoutState} + * that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a soft cap and may be + * slightly exceeded under concurrent insertions. + * + * <p>This class is thread-safe. + */ +public class KeyBasedRetryContext { + /** + * Maximum number of keys tracked in {@link #registry}. + * Can be slightly exceeded under concurrent insertions. See class-level Javadoc. + */ + private static final int REGISTRY_SIZE_LIMIT = 1_000; + + /** + * Timeout used when creating a new {@link TimeoutState} for a key that has no prior state. + * Also used as the reset value when a key's state is removed. + */ + private final int initialTimeout; + + /** Strategy used to compute the next timeout from the current one on each advancement. */ + private final TimeoutStrategy timeoutStrategy; + + /** + * Sentinel state returned for keys that cannot be tracked because the registry is full. + * Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt {@code -1} to distinguish + * it from legitimately tracked states. + */ + private final TimeoutState fallbackTimeoutState; + + /** Per-key timeout state registry. Keys are typically transaction IDs or replication group IDs. */ + private final ConcurrentHashMap<String, TimeoutState> registry = new ConcurrentHashMap<>(); + + /** + * Creates a new context with the given initial timeout and strategy. + * + * @param initialTimeout timeout used for the first retry attempt of any new key, in milliseconds. + * @param timeoutStrategy strategy used to compute subsequent timeout values. + */ + public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy timeoutStrategy) { + this.initialTimeout = initialTimeout; + this.timeoutStrategy = timeoutStrategy; + + this.fallbackTimeoutState = new TimeoutState(timeoutStrategy.maxTimeout(), -1); + } + + /** + * Returns the current {@link TimeoutState} for the given key, if tracked. + * + * <p>Returns an empty {@link Optional} if the key has no recorded state yet. + * If the registry is full and the key is not already tracked, returns + * {@link Optional} containing the {@link #fallbackTimeoutState}. + * + * <p>This method does not insert the key into the registry. + * + * @param key the key to look up, typically a transaction ID or replication group ID. + * @return current state for the key, fallback state if registry is full, or empty if not tracked. + */ + public Optional<TimeoutState> getState(String key) { + if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) { + return of(fallbackTimeoutState); + } + + return ofNullable(registry.get(key)); + } + + /** + * Atomically advances the retry state for the given key and returns the updated state. + * + * <p>If the key has no prior state, a new {@link TimeoutState} is created with + * {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the timeout is + * advanced using {@link TimeoutStrategy#next(int)} and the attempt count is incremented. + * + * <p>The update is performed inside {@link ConcurrentHashMap#compute}, which holds + * an exclusive per-key lock for the duration of the lambda. The CAS on the + * {@link TimeoutState}'s internal {@link AtomicLong} is therefore always expected + * to succeed on the first attempt within the lambda. + * + * <p>If the registry is full and the key is not already tracked, returns + * {@link #fallbackTimeoutState} without modifying the registry. + * + * @param key the key to advance state for, typically a transaction ID or replication group ID. + * @return updated {@link TimeoutState} for the key, or {@link #fallbackTimeoutState} + * if the registry is full. + */ + public TimeoutState updateAndGetState(String key) { + if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) { + return fallbackTimeoutState; + } + + return registry.compute(key, (k, state) -> { + if (state == null) { + return new TimeoutState(initialTimeout, 1); + } + + long currentState = state.getRawState(); + state.update(currentState, timeoutStrategy.next(timeout(currentState)), attempt(currentState) + 1); Review Comment: So, we get the raw state just to unpack its components and use them as arguments for another call to the same object. This looks like unnecessary leakage of internal state. I would suggest `TimeoutState#update(timeoutStrategy)` method that would keep the internal state hidden ########## modules/core/src/main/java/org/apache/ignite/internal/util/retry/SharedRetryContext.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Optional.ofNullable; +import static org.apache.ignite.internal.util.retry.TimeoutState.attempt; +import static org.apache.ignite.internal.util.retry.TimeoutState.timeout; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A retry context that tracks a single shared timeout state across all callers. + * + * <p>Unlike {@link KeyBasedRetryContext}, this context does not distinguish between + * retry targets — all callers advance and observe the same {@link TimeoutState}. + * This is appropriate when a single backoff sequence should govern all retries + * regardless of which operation is being retried. + * + * <p>The state is initialized lazily on the first call to {@link #updateAndGetState()}, + * and can be reset to {@code null} via {@link #resetState()}, allowing the progression + * to restart from scratch. {@link #getState()} returns an empty {@link Optional} before + * the first call and after a reset. + * + * <p>Concurrent calls to {@link #updateAndGetState()} and {@link #resetState()} are safe. + * The {@link AtomicReference} handles structural transitions ({@code null ↔ initialized}), + * while the {@link TimeoutState}'s internal {@link AtomicLong} CAS handles concurrent + * value updates without allocating new objects on the hot path. + * + * <p>This class is thread-safe. + */ +public class SharedRetryContext { Review Comment: Shouldn't it share the same interface with KeyBasedRetryContext? ########## modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; +import static org.apache.ignite.internal.util.retry.TimeoutState.attempt; +import static org.apache.ignite.internal.util.retry.TimeoutState.timeout; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.jetbrains.annotations.TestOnly; + +/** + * A retry context that tracks timeout state independently per key. + * + * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff progression + * for different retry targets — for example, different replication group IDs or transaction IDs. + * State updates are performed atomically per key using {@link ConcurrentHashMap#compute}. + * + * <p>To prevent unbounded memory growth, the registry is capped at {@link #REGISTRY_SIZE_LIMIT} + * entries. Once the limit is reached, untracked keys receive a fixed {@link #fallbackTimeoutState} + * that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a soft cap and may be + * slightly exceeded under concurrent insertions. + * + * <p>This class is thread-safe. + */ +public class KeyBasedRetryContext { + /** + * Maximum number of keys tracked in {@link #registry}. + * Can be slightly exceeded under concurrent insertions. See class-level Javadoc. + */ + private static final int REGISTRY_SIZE_LIMIT = 1_000; + + /** + * Timeout used when creating a new {@link TimeoutState} for a key that has no prior state. + * Also used as the reset value when a key's state is removed. + */ + private final int initialTimeout; + + /** Strategy used to compute the next timeout from the current one on each advancement. */ + private final TimeoutStrategy timeoutStrategy; + + /** + * Sentinel state returned for keys that cannot be tracked because the registry is full. + * Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt {@code -1} to distinguish + * it from legitimately tracked states. + */ + private final TimeoutState fallbackTimeoutState; + + /** Per-key timeout state registry. Keys are typically transaction IDs or replication group IDs. */ + private final ConcurrentHashMap<String, TimeoutState> registry = new ConcurrentHashMap<>(); + + /** + * Creates a new context with the given initial timeout and strategy. + * + * @param initialTimeout timeout used for the first retry attempt of any new key, in milliseconds. + * @param timeoutStrategy strategy used to compute subsequent timeout values. + */ + public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy timeoutStrategy) { + this.initialTimeout = initialTimeout; + this.timeoutStrategy = timeoutStrategy; + + this.fallbackTimeoutState = new TimeoutState(timeoutStrategy.maxTimeout(), -1); + } + + /** + * Returns the current {@link TimeoutState} for the given key, if tracked. + * + * <p>Returns an empty {@link Optional} if the key has no recorded state yet. + * If the registry is full and the key is not already tracked, returns + * {@link Optional} containing the {@link #fallbackTimeoutState}. + * + * <p>This method does not insert the key into the registry. + * + * @param key the key to look up, typically a transaction ID or replication group ID. + * @return current state for the key, fallback state if registry is full, or empty if not tracked. + */ + public Optional<TimeoutState> getState(String key) { + if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) { + return of(fallbackTimeoutState); + } + + return ofNullable(registry.get(key)); + } + + /** + * Atomically advances the retry state for the given key and returns the updated state. + * + * <p>If the key has no prior state, a new {@link TimeoutState} is created with + * {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the timeout is + * advanced using {@link TimeoutStrategy#next(int)} and the attempt count is incremented. + * + * <p>The update is performed inside {@link ConcurrentHashMap#compute}, which holds + * an exclusive per-key lock for the duration of the lambda. The CAS on the + * {@link TimeoutState}'s internal {@link AtomicLong} is therefore always expected + * to succeed on the first attempt within the lambda. + * + * <p>If the registry is full and the key is not already tracked, returns + * {@link #fallbackTimeoutState} without modifying the registry. + * + * @param key the key to advance state for, typically a transaction ID or replication group ID. + * @return updated {@link TimeoutState} for the key, or {@link #fallbackTimeoutState} + * if the registry is full. + */ + public TimeoutState updateAndGetState(String key) { + if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) { + return fallbackTimeoutState; + } + + return registry.compute(key, (k, state) -> { + if (state == null) { + return new TimeoutState(initialTimeout, 1); + } + + long currentState = state.getRawState(); + state.update(currentState, timeoutStrategy.next(timeout(currentState)), attempt(currentState) + 1); + + return state; + }); + } + + /** + * Removes the retry state for the given key, resetting it as if no retries had occurred. + * + * <p>The next call to {@link #updateAndGetState(String)} for this key after a reset + * will create fresh state starting from {@link #initialTimeout}. + * + * @param key the key whose state should be removed. + */ + public void resetState(String key) { Review Comment: This is called on successfull attempt of retry. But what if it will never happen? 1000 transactions/groups will hang forever in this map, causing everything else to fallback on maximum timeout? ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java: ########## @@ -401,12 +412,15 @@ public TxManagerImpl( transactionExpirationRegistry = new TransactionExpirationRegistry(txStateVolatileStorage); + retryContext = new KeyBasedRetryContext(20, timeoutStrategy); Review Comment: You replaced `RETRY_INITIAL_TIMEOUT_MS` constant, but now the initial timeout became just a "magic number" ########## modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * A {@link TimeoutStrategy} that increases retry timeouts exponentially on each attempt. + * + * <p>Each call to {@link #next(int)} multiplies the current timeout by {@code backoffCoefficient}, + * capping the result at {@link #maxTimeout()}. Optionally, random jitter can be applied to spread + * retry attempts across time and avoid thundering herd problems under high concurrency. + * + * <p>When jitter is enabled, the returned timeout is randomized within the range + * {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout()}. + * + * <p>This class is stateless and thread-safe. A single instance can be shared across + * multiple retry contexts. + */ +public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy { Review Comment: I think, we should add TODO here to replace timeout strategy in `ignite-raft` module with this one. ########## modules/core/src/test/java/org/apache/ignite/internal/util/retry/SharedRetryContextTest.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Unit tests for {@link SharedRetryContext}. + * + * <p>Verifies lazy initialization, sequential timeout progression, reset behavior, + * and thread safety of concurrent updates. A deterministic {@link TestProgressiveTimeoutStrategy} + * with a fixed multiplier is used to make expected timeout values easy to compute by hand. + */ +public class SharedRetryContextTest { Review Comment: This is the single place where `SharedRetryContext` is created, is it really needed in this PR? ########## modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java: ########## @@ -52,6 +98,50 @@ public void setup() { sql(tableSql); } + /** + * Verifies that no retry occurs when the write intent cleanup succeeds on the first attempt. + * + * <p>Installs a message interceptor that counts {@link WriteIntentSwitchReplicaRequest} + * messages without dropping any of them. After all write intents are resolved, asserts + * that exactly one cleanup message was sent across all nodes. + */ + @Test + public void testNoRetryOnSuccessfulCleanup() { + IgniteImpl node = anyNode(); + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) values (1, 'val-1')"); + + AtomicInteger cleanupAttempts = new AtomicInteger(); + + for (IgniteImpl n : runningNodesIter()) { + n.dropMessages((dest, msg) -> { + if (msg instanceof WriteIntentSwitchReplicaRequest) { + cleanupAttempts.incrementAndGet(); + } + return false; + }); + } + + tx.commitAsync(); + + await().timeout(5, TimeUnit.SECONDS) Review Comment: `TimeUnit` not needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
