[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-20 Thread GitBox
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285792194
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -248,16 +337,16 @@ private int compare(T e1, T e2, KVTypeInfo.Accessor 
getter) {
   diff = compare(e1, natural, natural.get(e2));
 }
 return diff;
-  } catch (Exception e) {
-throw Throwables.propagate(e);
+  } catch (ReflectiveOperationException e) {
+throw new RuntimeException(e);
   }
 }
 
 private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
   try {
 return asKey(getter.get(e1)).compareTo(asKey(v2));
 
 Review comment:
   I have not attempted to apply the basic trick in getPredicate (of calling or 
not calling asKey() depending on whether the index requires it).  I agree that 
there might be a win there, but the application seems difficult and unlikely to 
yield clear code.  getPredicate() is doing two things -- it's converting a 
Collection into a Set as well as converting all the entries through asKey as 
necessary, AND it's removing the necessity of calling getClass.isArray when it 
isn't.  
   
   There appear three basic times where this strategy would be useful -- during 
copyElements when parent is defined, and during iteration when first and/or 
last are defined.  Well, I don't see a contract about when first or last can be 
changed wrt when an iteration is started or running, so I'm a little worried 
about modifying that code, and I'm not sure what the usecases are for parents.  
But let's assume that first and last are actually fixed during iteration (it 
seems bad that it is not), and I'm willing to make changes to the parent code 
blind   Presumably I'd want a set of compare() routines that took a 
Comparable as the second argument (the second argument being fixed locally to 
asKey(first|last|parent)) with two potential names -- one that would call asKey 
and one that wouldn't.  I'm blanking on ideas for names.  Ideas?
   
   What I'll do is create the "dumb" version of this compare call, and either 
we'll revert because we want to allow first and last to be modified during 
iteration, or we'll come up with some good names for dealing with the other 
half of the asKey calls.  Or we'll decide that the getClass.isArray calls for 
only one side of the compare isn't so bad.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-16 Thread GitBox
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284815319
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
 ##
 @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._
  */
 private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
 
-  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private class LatchedTriggers(val triggers: Seq[Trigger[_]]) {
+val countDeferred = new AtomicInteger(0)
+
+def fireOnce(f: Seq[Trigger[_]] => Unit): Boolean = {
+  val shouldExecute = countDeferred.compareAndSet(0, 1)
+  if (shouldExecute) {
+doAsync {
+  countDeferred.set(0)
+  f(triggers)
+}
+  }
+  shouldExecute
 
 Review comment:
   Hmm, my memory is faulty, this isn't a tri-value.  I'll inline here, sure.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-16 Thread GitBox
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284799718
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
+  Predicate filter;
+  int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
+// all iterators of concurrent maps, and specifically makes 
countingRemoveIf difficult to
+// implement correctly against the values() iterator, we use forEach 
instead
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
-
-private int size;
+private final ConcurrentMap, T> data;
 
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+// Note: removeIf returns a boolean if any element has been removed.
+// While debugging this code, it was handy to have the count of elements
+// removed, rather than an indicator of whether something has been
+// removed, and a count is no more complicated than a boolean so I've
+// retained that behavior here, although there is no current requirement.
+@SuppressWarnings("unchecked")
+int countingRemoveAllByKeys(String index, Collection keys) {
 
 Review comment:
   "so better be a little bit more careful" - strongly agree.  I'll try 
removeAllByIndexValues (slight merging of our proposals) -- it's a little bit 
of a word salad, but nothing better really strikes me.  Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-16 Thread GitBox
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284742856
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
+  Predicate filter;
+  int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
+// all iterators of concurrent maps, and specifically makes 
countingRemoveIf difficult to
+// implement correctly against the values() iterator, we use forEach 
instead
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
-
-private int size;
+private final ConcurrentMap, T> data;
 
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+// Note: removeIf returns a boolean if any element has been removed.
+// While debugging this code, it was handy to have the count of elements
+// removed, rather than an indicator of whether something has been
+// removed, and a count is no more complicated than a boolean so I've
+// retained that behavior here, although there is no current requirement.
+@SuppressWarnings("unchecked")
+int countingRemoveAllByKeys(String index, Collection keys) {
 
 Review comment:
   Hmm, I suspect we have a terminology overload issue here.
   1) The read|write|view methods on KVStore refer to the things you read, 
write, and look at as "instance", "object", and "entities" respectively (and 
also "value" for write() as the name of the parameter).
   2) read() provides a "naturalKey" to access a specific instance, which is 
the value of the unique/primary/NATURAL_INDEX_NAME index
   3) KVStoreView refers to the values of an index as passed in first() and 
last() as values, not keys -- naturalKey or no
   4) InMemoryStore refers to the Comparable wrappers placed around the values 
of a particular index as a key.
   5) KVStore seems to say that the key is actually created per type written 
and are based on the type name (which itself is referred to as either klass or 
type)
   
   So, yes, I'm probably using terminology wrong, and I hereby declare myself 
confused :(
   
   With respect to klass vs type I had gone with klass in the removeAll___() as 
it was consistent with having 'klass' everywhere in the ElementTrackingStore, 
but please let me know if that should be changed!
   
   With respect to key, value, field-value-for-index, 
comparable-wrapper-around-field-value-for-index, I admit to not knowing what to 
call which things when.  One way to deal with this is to make the removeAll on 
the KVStoreView, where "value" is everywhere (I think) considered in the 
context of the type and index.  The upside is that we could naturally call such 
a method removeAll() as the class and index are owned by the View.  From an 
impl standpoint, it would make the definition of LevelDB's view quite a bit 

[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284521016
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala
 ##
 @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._
  */
 private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
 
-  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private class LatchedTriggers(val triggers: Seq[Trigger[_]]) {
+val countDeferred = new AtomicInteger(0)
+
+def fireOnce(f: Seq[Trigger[_]] => Unit): Boolean = {
+  val shouldExecute = countDeferred.compareAndSet(0, 1)
+  if (shouldExecute) {
+doAsync {
+  countDeferred.set(0)
+  f(triggers)
+}
+  }
+  shouldExecute
 
 Review comment:
   I think there's a case where I needed the tri-value, but I'll take another 
peek.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284520975
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
 ##
 @@ -1177,16 +1169,20 @@ private[spark] class AppStatusListener(
   }
 
   cleanupCachedQuantiles(key)
+  key
 }
 
+// Delete summaries in one pass, as deleting them for each stage is slow
+val totalSummariesDeleted = kvstore.removeAllByKeys(
 
 Review comment:
   excellent - yes, this used to be where I logged the count, thanks for 
catching!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

2019-05-15 Thread GitBox
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] 
Fix performance of ElementTrackingStore deletes when using InMemoryStore under 
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284520604
 
 

 ##
 File path: 
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##
 @@ -126,64 +132,144 @@ public void close() {
 return (Comparable) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static  KVStoreView emptyView() {
+return (InMemoryView) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map 
strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+private ConcurrentMap, InstanceList> data = new 
ConcurrentHashMap<>();
+
+@SuppressWarnings("unchecked")
+public  InstanceList get(Class type) {
+  return (InstanceList)data.get(type);
+}
+
+@SuppressWarnings("unchecked")
+public  void write(T value) throws Exception {
+  InstanceList list =
+(InstanceList) data.computeIfAbsent(value.getClass(), 
InstanceList::new);
+  list.put(value);
+}
+
+public void clear() {
+  data.clear();
+}
+  }
+
+  private static class InstanceList {
+
+private static class CountingRemoveIfForEach implements 
BiConsumer, T> {
+  ConcurrentMap, T> data;
+  Predicate filter;
+  int count = 0;
+
+  CountingRemoveIfForEach(
+  ConcurrentMap, T> data,
+  Predicate filter) {
+this.data = data;
+this.filter = filter;
+  }
+
+  public void accept(Comparable key, T value) {
+// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which 
affects remove() on
+// all iterators of concurrent maps, and specifically makes 
countingRemoveIf difficult to
+// implement correctly against the values() iterator, we use forEach 
instead
+if (filter.test(value)) {
+  if (data.remove(key, value)) {
+count++;
+  }
+}
+  }
+}
 
 private final KVTypeInfo ti;
 private final KVTypeInfo.Accessor naturalKey;
-private final ConcurrentMap, Object> data;
-
-private int size;
+private final ConcurrentMap, T> data;
 
-private InstanceList(Class type) throws Exception {
-  this.ti = new KVTypeInfo(type);
+private InstanceList(Class klass) {
+  this.ti = new KVTypeInfo(klass);
   this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
   this.data = new ConcurrentHashMap<>();
-  this.size = 0;
 }
 
 KVTypeInfo.Accessor getIndexAccessor(String indexName) {
   return ti.getAccessor(indexName);
 }
 
-public Object get(Object key) {
+// Note: removeIf returns a boolean if any element has been removed.
+// While debugging this code, it was handy to have the count of elements
+// removed, rather than an indicator of whether something has been
+// removed, and a count is no more complicated than a boolean so I've
+// retained that behavior here, although there is no current requirement.
+@SuppressWarnings("unchecked")
+int countingRemoveAllByKeys(String index, Collection keys) {
 
 Review comment:
   I'm removing values whose key indicated by "index" matches (one of) the 
passed keys.
   So, I'm matching keys, hence the ByKeys and the keyFromValue which retrieves 
the key indicated by 'getter' of the passed value.
   BTW, I think I could add generic typing to Accessor and ensure that 
the getter and value objects match, but I don't think the K type would turn out 
to be terribly useful, as ultimately there's no match between the index (a 
String) and the key type.  Let me know if you think that's useful.
   
   Yes, I'll try to use the  in a number of the SuppressWarnings and see 
what happens.  It might be possible to get rid of a number of them, which would 
be fabulous.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org