[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285792194 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -248,16 +337,16 @@ private int compare(T e1, T e2, KVTypeInfo.Accessor getter) { diff = compare(e1, natural, natural.get(e2)); } return diff; - } catch (Exception e) { -throw Throwables.propagate(e); + } catch (ReflectiveOperationException e) { +throw new RuntimeException(e); } } private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) { try { return asKey(getter.get(e1)).compareTo(asKey(v2)); Review comment: I have not attempted to apply the basic trick in getPredicate (of calling or not calling asKey() depending on whether the index requires it). I agree that there might be a win there, but the application seems difficult and unlikely to yield clear code. getPredicate() is doing two things -- it's converting a Collection into a Set as well as converting all the entries through asKey as necessary, AND it's removing the necessity of calling getClass.isArray when it isn't. There appear three basic times where this strategy would be useful -- during copyElements when parent is defined, and during iteration when first and/or last are defined. Well, I don't see a contract about when first or last can be changed wrt when an iteration is started or running, so I'm a little worried about modifying that code, and I'm not sure what the usecases are for parents. But let's assume that first and last are actually fixed during iteration (it seems bad that it is not), and I'm willing to make changes to the parent code blind Presumably I'd want a set of compare() routines that took a Comparable as the second argument (the second argument being fixed locally to asKey(first|last|parent)) with two potential names -- one that would call asKey and one that wouldn't. I'm blanking on ideas for names. Ideas? What I'll do is create the "dumb" version of this compare call, and either we'll revert because we want to allow first and last to be modified during iteration, or we'll come up with some good names for dealing with the other half of the asKey calls. Or we'll decide that the getClass.isArray calls for only one side of the compare isn't so bad. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284815319 ## File path: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ## @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._ */ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { - private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private class LatchedTriggers(val triggers: Seq[Trigger[_]]) { +val countDeferred = new AtomicInteger(0) + +def fireOnce(f: Seq[Trigger[_]] => Unit): Boolean = { + val shouldExecute = countDeferred.compareAndSet(0, 1) + if (shouldExecute) { +doAsync { + countDeferred.set(0) + f(triggers) +} + } + shouldExecute Review comment: Hmm, my memory is faulty, this isn't a tri-value. I'll inline here, sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284799718 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; + Predicate filter; + int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on +// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to +// implement correctly against the values() iterator, we use forEach instead +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; - -private int size; +private final ConcurrentMap, T> data; -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +// Note: removeIf returns a boolean if any element has been removed. +// While debugging this code, it was handy to have the count of elements +// removed, rather than an indicator of whether something has been +// removed, and a count is no more complicated than a boolean so I've +// retained that behavior here, although there is no current requirement. +@SuppressWarnings("unchecked") +int countingRemoveAllByKeys(String index, Collection keys) { Review comment: "so better be a little bit more careful" - strongly agree. I'll try removeAllByIndexValues (slight merging of our proposals) -- it's a little bit of a word salad, but nothing better really strikes me. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284742856 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; + Predicate filter; + int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on +// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to +// implement correctly against the values() iterator, we use forEach instead +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; - -private int size; +private final ConcurrentMap, T> data; -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +// Note: removeIf returns a boolean if any element has been removed. +// While debugging this code, it was handy to have the count of elements +// removed, rather than an indicator of whether something has been +// removed, and a count is no more complicated than a boolean so I've +// retained that behavior here, although there is no current requirement. +@SuppressWarnings("unchecked") +int countingRemoveAllByKeys(String index, Collection keys) { Review comment: Hmm, I suspect we have a terminology overload issue here. 1) The read|write|view methods on KVStore refer to the things you read, write, and look at as "instance", "object", and "entities" respectively (and also "value" for write() as the name of the parameter). 2) read() provides a "naturalKey" to access a specific instance, which is the value of the unique/primary/NATURAL_INDEX_NAME index 3) KVStoreView refers to the values of an index as passed in first() and last() as values, not keys -- naturalKey or no 4) InMemoryStore refers to the Comparable wrappers placed around the values of a particular index as a key. 5) KVStore seems to say that the key is actually created per type written and are based on the type name (which itself is referred to as either klass or type) So, yes, I'm probably using terminology wrong, and I hereby declare myself confused :( With respect to klass vs type I had gone with klass in the removeAll___() as it was consistent with having 'klass' everywhere in the ElementTrackingStore, but please let me know if that should be changed! With respect to key, value, field-value-for-index, comparable-wrapper-around-field-value-for-index, I admit to not knowing what to call which things when. One way to deal with this is to make the removeAll on the KVStoreView, where "value" is everywhere (I think) considered in the context of the type and index. The upside is that we could naturally call such a method removeAll() as the class and index are owned by the View. From an impl standpoint, it would make the definition of LevelDB's view quite a bit
[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284521016 ## File path: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ## @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._ */ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { - private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private class LatchedTriggers(val triggers: Seq[Trigger[_]]) { +val countDeferred = new AtomicInteger(0) + +def fireOnce(f: Seq[Trigger[_]] => Unit): Boolean = { + val shouldExecute = countDeferred.compareAndSet(0, 1) + if (shouldExecute) { +doAsync { + countDeferred.set(0) + f(triggers) +} + } + shouldExecute Review comment: I think there's a case where I needed the tri-value, but I'll take another peek. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284520975 ## File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ## @@ -1177,16 +1169,20 @@ private[spark] class AppStatusListener( } cleanupCachedQuantiles(key) + key } +// Delete summaries in one pass, as deleting them for each stage is slow +val totalSummariesDeleted = kvstore.removeAllByKeys( Review comment: excellent - yes, this used to be where I logged the count, thanks for catching! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284520604 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; + Predicate filter; + int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on +// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to +// implement correctly against the values() iterator, we use forEach instead +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; - -private int size; +private final ConcurrentMap, T> data; -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +// Note: removeIf returns a boolean if any element has been removed. +// While debugging this code, it was handy to have the count of elements +// removed, rather than an indicator of whether something has been +// removed, and a count is no more complicated than a boolean so I've +// retained that behavior here, although there is no current requirement. +@SuppressWarnings("unchecked") +int countingRemoveAllByKeys(String index, Collection keys) { Review comment: I'm removing values whose key indicated by "index" matches (one of) the passed keys. So, I'm matching keys, hence the ByKeys and the keyFromValue which retrieves the key indicated by 'getter' of the passed value. BTW, I think I could add generic typing to Accessor and ensure that the getter and value objects match, but I don't think the K type would turn out to be terribly useful, as ultimately there's no match between the index (a String) and the key type. Let me know if you think that's useful. Yes, I'll try to use the in a number of the SuppressWarnings and see what happens. It might be possible to get rid of a number of them, which would be fabulous. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org