[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-21 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r286131820
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +134,149 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList) data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+/**
+ * A BiConsumer to control multi-entity removal.  We use this in a forEach 
rather than an
+ * iterator because there is a bug in jdk8 which affects remove() on all 
concurrent map
+ * iterators.  https://bugs.openjdk.java.net/browse/JDK-8078645
+ */
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  private final ConcurrentMap, T> data;
+  private final Predicate filter;
+
+  /**
+   * Keeps a count of the number of elements removed.  This count is not 
currently surfaced
+   * to clients of KVStore as Java's generic removeAll() construct returns 
only a boolean,
+   * but I found it handy to have the count of elements removed while 
debugging; a count being
+   * no more complicated than a boolean, I've retained that behavior here, 
even though there
+   * is no current requirement.
+   */
+  private int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
 
 Review comment:
   `@Override`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-21 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r286131574
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +134,149 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
 
 Review comment:
   final


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-21 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r286133405
 
 

 ##
 File path: 
common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
 ##
 @@ -132,6 +133,51 @@ public void testArrayIndices() throws Exception {
 assertEquals(o, 
store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next());
   }
 
+  @Test
+  public void testRemoveAll() throws Exception {
+KVStore store = new InMemoryStore();
+
+for (int i = 0; i < 2; i++) {
+  for (int j = 0; j < 2; j++) {
+ArrayKeyIndexType o = new ArrayKeyIndexType();
+o.key = new int[] { i, j, 0 };
+o.id = new String[] { "things" };
+store.write(o);
+
+o = new ArrayKeyIndexType();
+o.key = new int[] { i, j, 1 };
+o.id = new String[] { "more things" };
+store.write(o);
+  }
+}
+
+ArrayKeyIndexType o = new ArrayKeyIndexType();
+o.key = new int[] { 2, 2, 2 };
+o.id = new String[] { "things" };
+store.write(o);
+
+assertEquals(9, store.count(ArrayKeyIndexType.class));
+
+
+store.removeAllByIndexValues(
+  ArrayKeyIndexType.class,
+  KVIndex.NATURAL_INDEX_NAME,
+  ImmutableSet.of(new int[] {0, 0, 0}, new int[] { 2, 2, 2 }));
+assertEquals(7, store.count(ArrayKeyIndexType.class));
+
+store.removeAllByIndexValues(
+  ArrayKeyIndexType.class,
+  "id",
+  ImmutableSet.of(new String [] { "things" }));
+assertEquals(4, store.count(ArrayKeyIndexType.class));
+
+store.removeAllByIndexValues(
+  ArrayKeyIndexType.class,
+  "id",
+ImmutableSet.of(new String [] { "more things" }));
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-20 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285751231
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -248,16 +337,16 @@ private int compare(T e1, T e2, KVTypeInfo.Accessor 
getter) {
   diff = compare(e1, natural, natural.get(e2));
 }
 return diff;
-  } catch (Exception e) {
-throw Throwables.propagate(e);
+  } catch (ReflectiveOperationException e) {
+throw new RuntimeException(e);
   }
 }
 
 private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
   try {
 return asKey(getter.get(e1)).compareTo(asKey(v2));
 
 Review comment:
   Did you try to use the `getPredicate` stuff in this class too? Seems like it 
could save some computation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-20 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285756405
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala
 ##
 @@ -17,14 +17,66 @@
 
 package org.apache.spark.status
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import org.mockito.Mockito._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.status.ElementTrackingStore._
 import org.apache.spark.util.kvstore._
 
 class ElementTrackingStoreSuite extends SparkFunSuite {
 
+  test("asynchronous tracking single-fire") {
+val store = mock(classOf[KVStore])
+val tracking = new ElementTrackingStore(store, new SparkConf()
+  .set(ASYNC_TRACKING_ENABLED, true))
+
+val waiter = new Object()
+var done = false
+var type1 = new AtomicInteger(0)
+var queued0: WriteQueueResult = null
+var queued1: WriteQueueResult = null
+var queued2: WriteQueueResult = null
+var queued3: WriteQueueResult = null
+
+
+tracking.addTrigger(classOf[Type1], 1) { count =>
+  val count = type1.getAndIncrement()
+
+  count match {
+case 0 =>
+  // while in the asynchronous thread, attempt to increment twice.  
The first should
+  // succeed, the second should be skipped
+  queued1 = tracking.write(new Type1, checkTriggers = true)
+  queued2 = tracking.write(new Type1, checkTriggers = true)
+case 1 =>
+  // Verify that once we've started deliver again, that we can enqueue 
another
+  queued3 = tracking.write(new Type1, checkTriggers = true)
+case 2 =>
+  waiter.synchronized {
+done = true
+waiter.notifyAll()
+  }
+  }
+}
+
+when(store.count(classOf[Type1])).thenReturn(2L)
+queued0 = tracking.write(new Type1, checkTriggers = true)
+waiter.synchronized {
+  if (!done) {
+waiter.wait()
 
 Review comment:
   Hmmm... maybe add a timeout here (or use `eventually`)? The issue is that if 
there's a bug in your code, this may not actually return. (It's passing, but if 
a bug is added later then this would be an annoying thing to debug.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-20 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285755199
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala
 ##
 @@ -17,14 +17,66 @@
 
 package org.apache.spark.status
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import org.mockito.Mockito._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.status.ElementTrackingStore._
 import org.apache.spark.util.kvstore._
 
 class ElementTrackingStoreSuite extends SparkFunSuite {
 
+  test("asynchronous tracking single-fire") {
+val store = mock(classOf[KVStore])
+val tracking = new ElementTrackingStore(store, new SparkConf()
+  .set(ASYNC_TRACKING_ENABLED, true))
+
+val waiter = new Object()
+var done = false
+var type1 = new AtomicInteger(0)
+var queued0: WriteQueueResult = null
+var queued1: WriteQueueResult = null
+var queued2: WriteQueueResult = null
+var queued3: WriteQueueResult = null
+
 
 Review comment:
   remove one empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-20 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285753585
 
 

 ##
 File path: 
common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
 ##
 @@ -132,6 +133,48 @@ public void testArrayIndices() throws Exception {
 assertEquals(o, 
store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next());
   }
 
+  @Test
+  public void testRemoveAll() throws Exception {
+KVStore store = new InMemoryStore();
+
+for (int i = 0; i < 2; i++) {
+  for (int j = 0; j < 2; j++) {
+ArrayKeyIndexType o = new ArrayKeyIndexType();
+o.key = new int[] { i, j, 0 };
+o.id = new String[] { "things" };
+store.write(o);
+
+o = new ArrayKeyIndexType();
+o.key = new int[] { i, j, 1 };
+o.id = new String[] { "more things" };
+store.write(o);
+  }
+}
+
+ArrayKeyIndexType o = new ArrayKeyIndexType();
+o.key = new int[] { 2, 2, 2 };
+o.id = new String[] { "things" };
+store.write(o);
+
+assertEquals(9, store.count(ArrayKeyIndexType.class));
+
+HashSet set = new HashSet();
 
 Review comment:
   Add generic type.
   
   Or you could use `ImmutableSet` and not need this local variable (and avoid 
re-using it for different types and avoid the warnings from javac at the same 
time).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-20 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285754218
 
 

 ##
 File path: 
common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
 ##
 @@ -198,6 +199,46 @@ public void testUpdate() throws Exception {
 assertEquals(0, db.count(t.getClass(), "name", "name"));
   }
 
+  @Test
+  public void testRemoveAll() throws Exception {
+for (int i = 0; i < 2; i++) {
+  for (int j = 0; j < 2; j++) {
+ArrayKeyIndexType o = new ArrayKeyIndexType();
+o.key = new int[] { i, j, 0 };
+o.id = new String[] { "things" };
+db.write(o);
+
+o = new ArrayKeyIndexType();
+o.key = new int[] { i, j, 1 };
+o.id = new String[] { "more things" };
+db.write(o);
+  }
+}
+
+ArrayKeyIndexType o = new ArrayKeyIndexType();
+o.key = new int[] { 2, 2, 2 };
+o.id = new String[] { "things" };
+db.write(o);
+
+assertEquals(9, db.count(ArrayKeyIndexType.class));
+
+HashSet set = new HashSet();
 
 Review comment:
   Same thing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-17 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285313680
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
 ##
 @@ -1142,20 +1144,10 @@ private[spark] class AppStatusListener(
   s.info.status != v1.StageStatus.ACTIVE && s.info.status != 
v1.StageStatus.PENDING
 }
 
-stages.foreach { s =>
+val stageIndexValues = stages.map { s =>
 
 Review comment:
   `s/stageIndexValues/stageIds` (or `stageKeys`, since here they're actually 
the primary keys of the stages being removed...)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-17 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285313162
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +134,149 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList) data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+/**
+ * A BiConsumer to control multi-entity removal.  We use this in a forEach 
rather than an
+ * iterator because there is a bug in jdk8 which affects remove() on all 
concurrent map
+ * iterators.  https://bugs.openjdk.java.net/browse/JDK-8078645
+ */
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  private final ConcurrentMap, T> data;
+  private final Predicate filter;
+
+  /**
+   * Keeps a count of the number of elements removed.  This count is not 
currently surfaced
+   * to clients of KVStore as Java's generic removeAll() construct returns 
only a boolean,
+   * but I found it handy to have the count of elements removed while 
debugging; a count being
+   * no more complicated than a boolean, I've retained that behavior here, 
even though there
+   * is no current requirement.
+   */
+  private int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+
+  public int count() { return count; }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
+private final ConcurrentMap, T> data;
 
-private int size;
-
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+int countingRemoveAllByIndexValues(String index, Collection 
indexValues) {
+  Predicate filter = getPredicate(ti.getAccessor(index), 
indexValues);
+  CountingRemoveIfForEach callback = new 
CountingRemoveIfForEach<>(data, filter);
+
+  data.forEach(callback);
+  return callback.count();
+}
+
+public T get(Object key) {
   return data.get(asKey(key));
 }
 
-public void put(Object value) throws Exception {
-  Preconditions.checkArgument(ti.type().equals(value.getClass()),
-"Unexpected type: %s", value.getClass());
-  if (data.put(asKey(naturalKey.get(value)), value) == null) {
-size++;
-  }
+public void put(T value) throws Exception {
+  data.put(asKey(naturalKey.get(value)), value);
 }
 
 public void delete(Object key) {
-  if (data.remove(asKey(key)) != null) {
-size--;
-  }
+  data.remove(asKey(key));
 }
 
 public int size() {
-  return size;
+  return data.size();
 }
 
-@SuppressWarnings("unchecked")
-public  InMemoryView view(Class type) {
-  Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: 
%s", type);
-  Collection all = (Collection) data.values();
-  return new InMemoryView<>(type, all, ti);
+public InMemoryView view() {
+  return new InMemoryView<>(data.values(), ti);
+}
+
+private static  Predicate getPredicate(
+KVTypeInfo.Accessor getter,
+Collection keys) {
 
 Review comment:
   s/keys/values


This is an automated message from 

[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-17 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285313350
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java
 ##
 @@ -126,4 +127,9 @@
*/
   long count(Class type, String index, Object indexedValue) throws 
Exception;
 
+  /**
+   * A cheaper way to remove multiple items from the KVStore
+   */
+   boolean removeAllByIndexValues(Class klass, String index, 
Collection indexValues)
+  throws Exception;
 
 Review comment:
   nit: indented too much (2 or 4 spaces is fine here)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-17 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285313778
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
 ##
 @@ -46,7 +50,28 @@ import org.apache.spark.util.kvstore._
  */
 private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
 
-  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private class LatchedTriggers(val triggers: Seq[Trigger[_]]) {
+private val pending = new AtomicBoolean(false)
+
+def fireOnce(f: Seq[Trigger[_]] => Unit): WriteQueueResult = {
+  val shouldEnqueue = pending.compareAndSet(false, true)
 
 Review comment:
   Actually don't need this variable now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-17 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285313219
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +134,149 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList) data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+/**
+ * A BiConsumer to control multi-entity removal.  We use this in a forEach 
rather than an
+ * iterator because there is a bug in jdk8 which affects remove() on all 
concurrent map
+ * iterators.  https://bugs.openjdk.java.net/browse/JDK-8078645
+ */
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  private final ConcurrentMap, T> data;
+  private final Predicate filter;
+
+  /**
+   * Keeps a count of the number of elements removed.  This count is not 
currently surfaced
+   * to clients of KVStore as Java's generic removeAll() construct returns 
only a boolean,
+   * but I found it handy to have the count of elements removed while 
debugging; a count being
+   * no more complicated than a boolean, I've retained that behavior here, 
even though there
+   * is no current requirement.
+   */
+  private int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+
+  public int count() { return count; }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
+private final ConcurrentMap, T> data;
 
-private int size;
-
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+int countingRemoveAllByIndexValues(String index, Collection 
indexValues) {
+  Predicate filter = getPredicate(ti.getAccessor(index), 
indexValues);
+  CountingRemoveIfForEach callback = new 
CountingRemoveIfForEach<>(data, filter);
+
+  data.forEach(callback);
+  return callback.count();
+}
+
+public T get(Object key) {
   return data.get(asKey(key));
 }
 
-public void put(Object value) throws Exception {
-  Preconditions.checkArgument(ti.type().equals(value.getClass()),
-"Unexpected type: %s", value.getClass());
-  if (data.put(asKey(naturalKey.get(value)), value) == null) {
-size++;
-  }
+public void put(T value) throws Exception {
+  data.put(asKey(naturalKey.get(value)), value);
 }
 
 public void delete(Object key) {
-  if (data.remove(asKey(key)) != null) {
-size--;
-  }
+  data.remove(asKey(key));
 }
 
 public int size() {
-  return size;
+  return data.size();
 }
 
-@SuppressWarnings("unchecked")
-public  InMemoryView view(Class type) {
-  Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: 
%s", type);
-  Collection all = (Collection) data.values();
-  return new InMemoryView<>(type, all, ti);
+public InMemoryView view() {
+  return new InMemoryView<>(data.values(), ti);
+}
+
+private static  Predicate getPredicate(
+KVTypeInfo.Accessor getter,
+Collection keys) {
+  if (Comparable.class.isAssignableFrom(getter.getType())) {
+HashSet set = new HashSet<>(keys);
+
+return (value) 

[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-16 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284797365
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
+  Predicate filter;
+  int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
+// all iterators of concurrent maps, and specifically makes 
countingRemoveIf difficult to
+// implement correctly against the values() iterator, we use forEach 
instead
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
-
-private int size;
+private final ConcurrentMap, T> data;
 
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+// Note: removeIf returns a boolean if any element has been removed.
+// While debugging this code, it was handy to have the count of elements
+// removed, rather than an indicator of whether something has been
+// removed, and a count is no more complicated than a boolean so I've
+// retained that behavior here, although there is no current requirement.
+@SuppressWarnings("unchecked")
+int countingRemoveAllByKeys(String index, Collection keys) {
 
 Review comment:
   I very much believe that there might be very confusing terminology in this 
code. I went back and forth on implementation and interfaces a ton of times 
before reaching something I was happy with, and by that time I didn't really 
bother with the internal naming of things so much.
   
   But here we're talking about a new method in a "public" interface (not this 
particular line, but the new method in `KVStore`), so better be a little bit 
more careful. And IMO index values are not keys, so "removeByKeys" is a little 
weird. Maybe "removeByIndexValues" is clearer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284493151
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
+  Predicate filter;
+  int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
+// all iterators of concurrent maps, and specifically makes 
countingRemoveIf difficult to
+// implement correctly against the values() iterator, we use forEach 
instead
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
-
-private int size;
+private final ConcurrentMap, T> data;
 
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+// Note: removeIf returns a boolean if any element has been removed.
+// While debugging this code, it was handy to have the count of elements
+// removed, rather than an indicator of whether something has been
+// removed, and a count is no more complicated than a boolean so I've
+// retained that behavior here, although there is no current requirement.
+@SuppressWarnings("unchecked")
+int countingRemoveAllByKeys(String index, Collection keys) {
+  Predicate filter = getPredicate(ti.getAccessor(index), keys);
+  CountingRemoveIfForEach callback = new 
CountingRemoveIfForEach<>(data, filter);
+
+  data.forEach(callback);
+  return callback.count;
+}
+
+public T get(Object key) {
   return data.get(asKey(key));
 }
 
-public void put(Object value) throws Exception {
-  Preconditions.checkArgument(ti.type().equals(value.getClass()),
-"Unexpected type: %s", value.getClass());
-  if (data.put(asKey(naturalKey.get(value)), value) == null) {
-size++;
-  }
+public void put(T value) throws Exception {
+  data.put(asKey(naturalKey.get(value)), value);
 }
 
 public void delete(Object key) {
-  if (data.remove(asKey(key)) != null) {
-size--;
-  }
+  data.remove(asKey(key));
 }
 
 public int size() {
-  return size;
+  return data.size();
+}
+
+@SuppressWarnings("unchecked")
+public InMemoryView view() {
+  return new InMemoryView<>(data.values(), ti);
 }
 
 @SuppressWarnings("unchecked")
-public  InMemoryView view(Class type) {
-  Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: 
%s", type);
-  Collection all = (Collection) data.values();
-  return new InMemoryView<>(type, all, ti);
+private static  Predicate getPredicate(
+KVTypeInfo.Accessor getter,
+Collection keys) {
+  if (Comparable.class.isAssignableFrom(getter.getType())) {
+HashSet set = new HashSet(keys);
+
+return (value) -> set.contains(keyFromValue(getter, value));
+  } else {
+HashSet set = new 

[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284491196
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
+  Predicate filter;
+  int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
 
 Review comment:
   This comment sounds like it belongs at this class's declaration, not inside 
this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284490875
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
 
 Review comment:
   nit: private, final where it makes sense


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284494996
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
 ##
 @@ -1177,16 +1169,20 @@ private[spark] class AppStatusListener(
   }
 
   cleanupCachedQuantiles(key)
+  key
 }
 
+// Delete summaries in one pass, as deleting them for each stage is slow
+val totalSummariesDeleted = kvstore.removeAllByKeys(
 
 Review comment:
   Unused variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284494865
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
 ##
 @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._
  */
 private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
 
-  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private class LatchedTriggers(val triggers: Seq[Trigger[_]]) {
+val countDeferred = new AtomicInteger(0)
+
+def fireOnce(f: Seq[Trigger[_]] => Unit): Boolean = {
+  val shouldExecute = countDeferred.compareAndSet(0, 1)
+  if (shouldExecute) {
+doAsync {
+  countDeferred.set(0)
+  f(triggers)
+}
+  }
+  shouldExecute
 
 Review comment:
   You could avoid the `WriteQueueResult` object with an inlined `if...else` 
here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284494214
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
 ##
 @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._
  */
 private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
 
-  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private class LatchedTriggers(val triggers: Seq[Trigger[_]]) {
+val countDeferred = new AtomicInteger(0)
 
 Review comment:
   You're kinda using this as a boolean, so `AtomicBoolean`? Also, `private`, 
and the variable name is a little cryptic (maybe just call it `busy` or 
something?).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284492184
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
+  Predicate filter;
+  int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
+// all iterators of concurrent maps, and specifically makes 
countingRemoveIf difficult to
+// implement correctly against the values() iterator, we use forEach 
instead
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
-
-private int size;
+private final ConcurrentMap, T> data;
 
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+// Note: removeIf returns a boolean if any element has been removed.
+// While debugging this code, it was handy to have the count of elements
+// removed, rather than an indicator of whether something has been
+// removed, and a count is no more complicated than a boolean so I've
+// retained that behavior here, although there is no current requirement.
+@SuppressWarnings("unchecked")
+int countingRemoveAllByKeys(String index, Collection keys) {
 
 Review comment:
   IIUC you're removing all entries whose value corresponding to the given 
index match the given `keys` list.
   
   So it sounds to me like the name here (and in related code) should use 
`ByValues` instead of `ByKeys`?
   
   Also, you have a raw type in the argument list, which tells me that your 
`@SuppressWarnings` is either incorrect or perhaps not needed (if you make the 
argument `Collection`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284490577
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
 
 Review comment:
   nit: space after `)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org