blambov commented on a change in pull request #1331: URL: https://github.com/apache/cassandra/pull/1331#discussion_r754893053
########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline Review comment: nit: unline -> unlike ########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. + * + * @param <K> + * @param <V> + */ +public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>> +{ + /** + * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value + * if missing. It returns {@code null} regardless the value is missing or failed to initialize. + */ + public V getIfReady(K key) + { + Future<V> future = get(key); + if (future != null && future.isDone()) + return future.getNow(); + else + return null; + } + + /** + * If the value for the given key is missing, execute a load function to obtain a value and put it into the map. + * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads + * called this method for the same key. Review comment: I think we do need to mention the deadlock on nested reinitialization, and the behaviour on concurrent unload and failing load. ########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. + * + * @param <K> + * @param <V> + */ +public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>> +{ + /** + * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value + * if missing. It returns {@code null} regardless the value is missing or failed to initialize. + */ + public V getIfReady(K key) + { + Future<V> future = get(key); + if (future != null && future.isDone()) + return future.getNow(); + else + return null; + } + + /** + * If the value for the given key is missing, execute a load function to obtain a value and put it into the map. + * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads + * called this method for the same key. + * <p> + * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function + * throws exception, it is rethrown by this method. In both cases nothing gets added to the map. + */ + public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException + { + while (true) + { + Future<V> future = get(key); + AsyncPromise<V> newEntry = null; + if (future == null) + { + newEntry = new AsyncPromise<>(); + future = putIfAbsent(key, newEntry); + if (future == null) + { + // We managed to create an entry for the value. Now initialize it. + future = newEntry; + try + { + V v = loadFunction.get(); + if (v == null) + throw new NullPointerException("The mapping function returned null"); + else + newEntry.setSuccess(v); + } + catch (Throwable t) + { + newEntry.setFailure(t); + // Remove future so that construction can be retried later + remove(key, future); + } + } + else + { + newEntry = null; + } + + // Else some other thread beat us to it, but we now have the reference to the future which we can wait for. + } + + future.awaitUninterruptibly(); + + if (future.getNow() != null) // implies success + return future.getNow(); Review comment: Nit: the two separate calls make me wonder if the state couldn't change due to some race between them. That's not the case, but let's make it crystal clear by putting the value in a local variable. ########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. Review comment: nit: sematnics -> semantics ########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. + * + * @param <K> + * @param <V> + */ +public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>> +{ + /** + * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value + * if missing. It returns {@code null} regardless the value is missing or failed to initialize. + */ + public V getIfReady(K key) + { + Future<V> future = get(key); + if (future != null && future.isDone()) + return future.getNow(); + else + return null; + } + + /** + * If the value for the given key is missing, execute a load function to obtain a value and put it into the map. + * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads + * called this method for the same key. + * <p> + * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function + * throws exception, it is rethrown by this method. In both cases nothing gets added to the map. + */ + public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException + { + while (true) + { + Future<V> future = get(key); + AsyncPromise<V> newEntry = null; + if (future == null) + { + newEntry = new AsyncPromise<>(); + future = putIfAbsent(key, newEntry); + if (future == null) + { + // We managed to create an entry for the value. Now initialize it. + future = newEntry; + try + { + V v = loadFunction.get(); + if (v == null) + throw new NullPointerException("The mapping function returned null"); + else + newEntry.setSuccess(v); + } + catch (Throwable t) + { + newEntry.setFailure(t); + // Remove future so that construction can be retried later + remove(key, future); + } + } + else + { + newEntry = null; + } + + // Else some other thread beat us to it, but we now have the reference to the future which we can wait for. + } + + future.awaitUninterruptibly(); + + if (future.getNow() != null) // implies success + return future.getNow(); + + // if blockingUnloadIfPresent was called in the meantime (success and getNow == null), we simply retry hoping that unloading gets finished soon + // also we retry if the concurrent attempt to load entry failed - but we do not retry if this attempt failed (failed and newEntry == null) + else if (future.isSuccess() || newEntry == null) Review comment: I think this would be clearer as ``` if (newEntry != null) future.rethrowIfFailed(); Thread.yield(); ``` I would also consider using a boolean (named e.g. `usingOurFuture`) instead of `newEntry != null`. ########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. + * + * @param <K> + * @param <V> + */ +public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>> +{ + /** + * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value + * if missing. It returns {@code null} regardless the value is missing or failed to initialize. + */ + public V getIfReady(K key) + { + Future<V> future = get(key); + if (future != null && future.isDone()) + return future.getNow(); + else + return null; + } + + /** + * If the value for the given key is missing, execute a load function to obtain a value and put it into the map. + * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads + * called this method for the same key. + * <p> + * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function + * throws exception, it is rethrown by this method. In both cases nothing gets added to the map. + */ + public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException + { + while (true) + { + Future<V> future = get(key); + AsyncPromise<V> newEntry = null; + if (future == null) + { + newEntry = new AsyncPromise<>(); + future = putIfAbsent(key, newEntry); + if (future == null) + { + // We managed to create an entry for the value. Now initialize it. + future = newEntry; + try + { + V v = loadFunction.get(); + if (v == null) + throw new NullPointerException("The mapping function returned null"); + else + newEntry.setSuccess(v); + } + catch (Throwable t) + { + newEntry.setFailure(t); + // Remove future so that construction can be retried later + remove(key, future); + } + } + else + { + newEntry = null; + } + + // Else some other thread beat us to it, but we now have the reference to the future which we can wait for. + } + + future.awaitUninterruptibly(); + + if (future.getNow() != null) // implies success + return future.getNow(); + + // if blockingUnloadIfPresent was called in the meantime (success and getNow == null), we simply retry hoping that unloading gets finished soon + // also we retry if the concurrent attempt to load entry failed - but we do not retry if this attempt failed (failed and newEntry == null) Review comment: Maybe "if the failing attempt was initiated by us" instead of "if this attempt failed"? ########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +import org.apache.cassandra.utils.Throwables; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. + * + * @param <K> + * @param <V> + */ +public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>> Review comment: Let's wrap the map for the extra safety (if the test really needs it, leave it package-private), and then we can call this something like `LoadingMap`. ########## File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +/** + * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}. + * <p> + * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like + * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)} + * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline + * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help + * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such + * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap} + * does not guarantee at-most-once sematnics of running the mapping function for a single key. + * + * @param <K> + * @param <V> + */ +public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>> +{ + /** + * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value Review comment: The comment appears to be outdated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

