davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284742856

 File path: 
 @@ -126,64 +132,144 @@ public void close() {
     return (Comparable<Object>) in;
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static <T> KVStoreView<T> emptyView() {
+    return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
+  }
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+    private ConcurrentMap<Class<?>, InstanceList<?>> data = new 
+    @SuppressWarnings("unchecked")
+    public <T> InstanceList<T> get(Class<T> type) {
+      return (InstanceList<T>)data.get(type);
+    }
+    @SuppressWarnings("unchecked")
+    public <T> void write(T value) throws Exception {
+      InstanceList<T> list =
+        (InstanceList<T>) data.computeIfAbsent(value.getClass(), 
+      list.put(value);
+    }
+    public void clear() {
+      data.clear();
+    }
+  }
+  private static class InstanceList<T> {
+    private static class CountingRemoveIfForEach<T> implements 
BiConsumer<Comparable<Object>, T> {
+      ConcurrentMap<Comparable<Object>, T> data;
+      Predicate<? super T> filter;
+      int count = 0;
+      CountingRemoveIfForEach(
+          ConcurrentMap<Comparable<Object>, T> data,
+          Predicate<? super T> filter) {
+        this.data = data;
+        this.filter = filter;
+      }
+      public void accept(Comparable<Object> key, T value) {
+        // To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
+        // all iterators of concurrent maps, and specifically makes 
countingRemoveIf difficult to
+        // implement correctly against the values() iterator, we use forEach 
+        if (filter.test(value)) {
+          if (data.remove(key, value)) {
+            count++;
+          }
+        }
+      }
+    }
     private final KVTypeInfo ti;
     private final KVTypeInfo.Accessor naturalKey;
-    private final ConcurrentMap<Comparable<Object>, Object> data;
-    private int size;
+    private final ConcurrentMap<Comparable<Object>, T> data;
-    private InstanceList(Class<?> type) throws Exception {
-      this.ti = new KVTypeInfo(type);
+    private InstanceList(Class<?> klass) {
+      this.ti = new KVTypeInfo(klass);
       this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
       this.data = new ConcurrentHashMap<>();
-      this.size = 0;
     KVTypeInfo.Accessor getIndexAccessor(String indexName) {
       return ti.getAccessor(indexName);
-    public Object get(Object key) {
+    // Note: removeIf returns a boolean if any element has been removed.
+    // While debugging this code, it was handy to have the count of elements
+    // removed, rather than an indicator of whether something has been
+    // removed, and a count is no more complicated than a boolean so I've
+    // retained that behavior here, although there is no current requirement.
+    @SuppressWarnings("unchecked")
+    int countingRemoveAllByKeys(String index, Collection keys) {
 Review comment:
   Hmm, I suspect we have a terminology overload issue here.
   1) The read|write|view methods on KVStore refer to the things you read, 
write, and look at as "instance", "object", and "entities" respectively (and 
also "value" for write() as the name of the parameter).
   2) read() provides a "naturalKey" to access a specific instance, which is 
the value of the unique/primary/NATURAL_INDEX_NAME index
   3) KVStoreView refers to the values of an index as passed in first() and 
last() as values, not keys -- naturalKey or no
   4) InMemoryStore refers to the Comparable wrappers placed around the values 
of a particular index as a key.
   5) KVStore seems to say that the key is actually created per type written 
and are based on the type name (which itself is referred to as either klass or 
   So, yes, I'm probably using terminology wrong, and I hereby declare myself 
confused :(
   With respect to klass vs type I had gone with klass in the removeAll___() as 
it was consistent with having 'klass' everywhere in the ElementTrackingStore, 
but please let me know if that should be changed!
   With respect to key, value, field-value-for-index, 
comparable-wrapper-around-field-value-for-index, I admit to not knowing what to 
call which things when.  One way to deal with this is to make the removeAll on 
the KVStoreView, where "value" is everywhere (I think) considered in the 
context of the type and index.  The upside is that we could naturally call such 
a method removeAll() as the class and index are owned by the View.  From an 
impl standpoint, it would make the definition of LevelDB's view quite a bit 
more complicated, and it would require work in InMemoryView as well -- I'd need 
to pass down the containing hash of indexed-values->entity, rather than just 
the entities (locally referred to as elements).  The other downside is that 
you're using a "View" to mutate....
   Another approach would be to use 'values' as you suggested with some 
commentary to clear up what we mean by values in the parameter and call the 
methods something like removeAllIndexedValues() and (internally) 
indexedValueFromEntity() or some such.

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to