jacek-lewandowski commented on a change in pull request #1331:
URL: https://github.com/apache/cassandra/pull/1331#discussion_r754853394



##########
File path: 
src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by 
{@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of 
concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link 
ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when 
dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the 
mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while 
loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a 
deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function 
for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, 
Future<V>>
+{
+
+    /**
+     * Get a value for a given key, waiting for initialization if necessary. 
This method does not initialize the value
+     * if missing. It returns {@code null} regardless the value is missing or 
failed to initialize.
+     */
+    public V blockingGet(K key)
+    {
+        Future<V> future = get(key);
+        if (future != null && future.isDone())
+            return future.awaitUninterruptibly().getNow();
+        else
+            return null;
+    }
+
+    /**
+     * If the value for the given key is missing, execute a load function to 
obtain a value and put it into the map.
+     * It is guaranteed that the load function will be executed only once when 
the key is missing and mulitple threads
+     * called this method for the same key.
+     * <p>
+     * When the mapping function returns {@code null}, {@link 
NullPointerException} is thrown. When the mapping function
+     * throws exception, it is rethrown by this method. In both cases nothing 
gets added to the map.
+     */
+    public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) 
throws RuntimeException
+    {
+        while (true)
+        {
+            Future<V> future = get(key);
+            AsyncPromise<V> newEntry = null;
+            if (future == null)
+            {
+                newEntry = new AsyncPromise<>();
+                future = putIfAbsent(key, newEntry);
+                if (future == null)
+                {
+                    // We managed to create an entry for the value. Now 
initialize it.
+                    future = newEntry;
+                    try
+                    {
+                        V v = loadFunction.get();
+                        if (v == null)
+                        {
+                            newEntry.setFailure(new NullPointerException("The 
mapping function returned null"));
+                            remove(key, future);
+                        }
+                        else
+                        {
+                            newEntry.setSuccess(v);
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        newEntry.setFailure(t);
+                        // Remove future so that construction can be retried 
later
+                        remove(key, future);
+                    }
+                }
+                else
+                {
+                    newEntry = null;
+                }
+
+                // Else some other thread beat us to it, but we now have the 
reference to the future which we can wait for.
+            }
+
+            try
+            {
+                future.syncUninterruptibly();
+            }
+            catch (Throwable t)
+            {
+                // if blockingUnloadIfPresent was called in the meantime, we 
simply retry hoping that unloading gets
+                // finished soon
+                // also we retry if the concurrent attempt to load entry 
failed (but we do not retry if this attempt
+                // failed)
+                if (newEntry == null || Throwables.isCausedBy(t, ex -> ex 
instanceof KeyNotFoundException))
+                {
+                    Thread.yield();
+                    continue;
+                }
+            }
+
+            future.rethrowIfFailed();
+            return future.getNow();
+        }
+    }
+
+    /**
+     * If a value for the given key is present, unload function is run and the 
value is removed from the map.
+     * Similarly to {@link #blockingLoadIfAbsent(Object, Supplier)} 
at-most-once semantics is guaranteed for unload
+     * function.
+     * <p>
+     * When unload function fails, the value is removed from the map anyway 
and the failure is rethrown.
+     * <p>
+     * When the key was not found, the method returns {@code null}.
+     *
+     * @throws UnloadExecutionException when the unloading failed to complete 
- this is checked exception because
+     *                                  the value is removed from the map 
regardless of the result of unloading; therefore if the unloading failed, the
+     *                                  called is responsible for handling 
that; the {@link UnloadExecutionException} encapsulates the value which was
+     *                                  failed to unload.
+     */
+    public V blockingUnloadIfPresent(K key, Consumer<? super V> 
unloadFunction) throws UnloadExecutionException
+    {
+        Promise<V> droppedFuture = new AsyncPromise<V>().setFailure(new 
KeyNotFoundException());
+
+        Future<V> existingFuture;
+        do
+        {
+            existingFuture = get(key);
+            if (existingFuture == null || existingFuture.cause() != null)
+                return null;
+        } while (!replace(key, existingFuture, droppedFuture));
+
+        V v = existingFuture.awaitUninterruptibly().getNow();
+        if (v == null)
+        {
+            // which means that either the value failed to load or a 
concurrent attempt to unload already did the work
+            return null;

Review comment:
       yes, thanks for this catch. I don't know how it happened. I must have 
changes something right before committing because even tests were not passing 
without fixing this :/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to