jacek-lewandowski commented on a change in pull request #1331: URL: https://github.com/apache/cassandra/pull/1331#discussion_r754853394
########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +import org.apache.cassandra.utils.Throwables; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. + * + * @param <K> + * @param <V> + */ +public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>> +{ + + /** + * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value + * if missing. It returns {@code null} regardless the value is missing or failed to initialize. + */ + public V blockingGet(K key) + { + Future<V> future = get(key); + if (future != null && future.isDone()) + return future.awaitUninterruptibly().getNow(); + else + return null; + } + + /** + * If the value for the given key is missing, execute a load function to obtain a value and put it into the map. + * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads + * called this method for the same key. + * <p> + * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function + * throws exception, it is rethrown by this method. In both cases nothing gets added to the map. + */ + public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException + { + while (true) + { + Future<V> future = get(key); + AsyncPromise<V> newEntry = null; + if (future == null) + { + newEntry = new AsyncPromise<>(); + future = putIfAbsent(key, newEntry); + if (future == null) + { + // We managed to create an entry for the value. Now initialize it. + future = newEntry; + try + { + V v = loadFunction.get(); + if (v == null) + { + newEntry.setFailure(new NullPointerException("The mapping function returned null")); + remove(key, future); + } + else + { + newEntry.setSuccess(v); + } + } + catch (Throwable t) + { + newEntry.setFailure(t); + // Remove future so that construction can be retried later + remove(key, future); + } + } + else + { + newEntry = null; + } + + // Else some other thread beat us to it, but we now have the reference to the future which we can wait for. + } + + try + { + future.syncUninterruptibly(); + } + catch (Throwable t) + { + // if blockingUnloadIfPresent was called in the meantime, we simply retry hoping that unloading gets + // finished soon + // also we retry if the concurrent attempt to load entry failed (but we do not retry if this attempt + // failed) + if (newEntry == null || Throwables.isCausedBy(t, ex -> ex instanceof KeyNotFoundException)) + { + Thread.yield(); + continue; + } + } + + future.rethrowIfFailed(); + return future.getNow(); + } + } + + /** + * If a value for the given key is present, unload function is run and the value is removed from the map. + * Similarly to {@link #blockingLoadIfAbsent(Object, Supplier)} at-most-once semantics is guaranteed for unload + * function. + * <p> + * When unload function fails, the value is removed from the map anyway and the failure is rethrown. + * <p> + * When the key was not found, the method returns {@code null}. + * + * @throws UnloadExecutionException when the unloading failed to complete - this is checked exception because + * the value is removed from the map regardless of the result of unloading; therefore if the unloading failed, the + * called is responsible for handling that; the {@link UnloadExecutionException} encapsulates the value which was + * failed to unload. + */ + public V blockingUnloadIfPresent(K key, Consumer<? super V> unloadFunction) throws UnloadExecutionException + { + Promise<V> droppedFuture = new AsyncPromise<V>().setFailure(new KeyNotFoundException()); + + Future<V> existingFuture; + do + { + existingFuture = get(key); + if (existingFuture == null || existingFuture.cause() != null) + return null; + } while (!replace(key, existingFuture, droppedFuture)); + + V v = existingFuture.awaitUninterruptibly().getNow(); + if (v == null) + { + // which means that either the value failed to load or a concurrent attempt to unload already did the work + return null; Review comment: yes, thanks for this catch. I don't know how it happened. I must have changes something right before committing because even tests were not passing without fixing this :/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

