[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r286131820 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +134,149 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList) data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +/** + * A BiConsumer to control multi-entity removal. We use this in a forEach rather than an + * iterator because there is a bug in jdk8 which affects remove() on all concurrent map + * iterators. https://bugs.openjdk.java.net/browse/JDK-8078645 + */ +private static class CountingRemoveIfForEach implements BiConsumer, T> { + private final ConcurrentMap, T> data; + private final Predicate filter; + + /** + * Keeps a count of the number of elements removed. This count is not currently surfaced + * to clients of KVStore as Java's generic removeAll() construct returns only a boolean, + * but I found it handy to have the count of elements removed while debugging; a count being + * no more complicated than a boolean, I've retained that behavior here, even though there + * is no current requirement. + */ + private int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { Review comment: `@Override` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r286131574 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +134,149 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); Review comment: final This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r286133405 ## File path: common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java ## @@ -132,6 +133,51 @@ public void testArrayIndices() throws Exception { assertEquals(o, store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next()); } + @Test + public void testRemoveAll() throws Exception { +KVStore store = new InMemoryStore(); + +for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { +ArrayKeyIndexType o = new ArrayKeyIndexType(); +o.key = new int[] { i, j, 0 }; +o.id = new String[] { "things" }; +store.write(o); + +o = new ArrayKeyIndexType(); +o.key = new int[] { i, j, 1 }; +o.id = new String[] { "more things" }; +store.write(o); + } +} + +ArrayKeyIndexType o = new ArrayKeyIndexType(); +o.key = new int[] { 2, 2, 2 }; +o.id = new String[] { "things" }; +store.write(o); + +assertEquals(9, store.count(ArrayKeyIndexType.class)); + + +store.removeAllByIndexValues( + ArrayKeyIndexType.class, + KVIndex.NATURAL_INDEX_NAME, + ImmutableSet.of(new int[] {0, 0, 0}, new int[] { 2, 2, 2 })); +assertEquals(7, store.count(ArrayKeyIndexType.class)); + +store.removeAllByIndexValues( + ArrayKeyIndexType.class, + "id", + ImmutableSet.of(new String [] { "things" })); +assertEquals(4, store.count(ArrayKeyIndexType.class)); + +store.removeAllByIndexValues( + ArrayKeyIndexType.class, + "id", +ImmutableSet.of(new String [] { "more things" })); Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285751231 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -248,16 +337,16 @@ private int compare(T e1, T e2, KVTypeInfo.Accessor getter) { diff = compare(e1, natural, natural.get(e2)); } return diff; - } catch (Exception e) { -throw Throwables.propagate(e); + } catch (ReflectiveOperationException e) { +throw new RuntimeException(e); } } private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) { try { return asKey(getter.get(e1)).compareTo(asKey(v2)); Review comment: Did you try to use the `getPredicate` stuff in this class too? Seems like it could save some computation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285756405 ## File path: core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala ## @@ -17,14 +17,66 @@ package org.apache.spark.status +import java.util.concurrent.atomic.AtomicInteger + import org.mockito.Mockito._ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.Status._ +import org.apache.spark.status.ElementTrackingStore._ import org.apache.spark.util.kvstore._ class ElementTrackingStoreSuite extends SparkFunSuite { + test("asynchronous tracking single-fire") { +val store = mock(classOf[KVStore]) +val tracking = new ElementTrackingStore(store, new SparkConf() + .set(ASYNC_TRACKING_ENABLED, true)) + +val waiter = new Object() +var done = false +var type1 = new AtomicInteger(0) +var queued0: WriteQueueResult = null +var queued1: WriteQueueResult = null +var queued2: WriteQueueResult = null +var queued3: WriteQueueResult = null + + +tracking.addTrigger(classOf[Type1], 1) { count => + val count = type1.getAndIncrement() + + count match { +case 0 => + // while in the asynchronous thread, attempt to increment twice. The first should + // succeed, the second should be skipped + queued1 = tracking.write(new Type1, checkTriggers = true) + queued2 = tracking.write(new Type1, checkTriggers = true) +case 1 => + // Verify that once we've started deliver again, that we can enqueue another + queued3 = tracking.write(new Type1, checkTriggers = true) +case 2 => + waiter.synchronized { +done = true +waiter.notifyAll() + } + } +} + +when(store.count(classOf[Type1])).thenReturn(2L) +queued0 = tracking.write(new Type1, checkTriggers = true) +waiter.synchronized { + if (!done) { +waiter.wait() Review comment: Hmmm... maybe add a timeout here (or use `eventually`)? The issue is that if there's a bug in your code, this may not actually return. (It's passing, but if a bug is added later then this would be an annoying thing to debug.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285755199 ## File path: core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala ## @@ -17,14 +17,66 @@ package org.apache.spark.status +import java.util.concurrent.atomic.AtomicInteger + import org.mockito.Mockito._ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.Status._ +import org.apache.spark.status.ElementTrackingStore._ import org.apache.spark.util.kvstore._ class ElementTrackingStoreSuite extends SparkFunSuite { + test("asynchronous tracking single-fire") { +val store = mock(classOf[KVStore]) +val tracking = new ElementTrackingStore(store, new SparkConf() + .set(ASYNC_TRACKING_ENABLED, true)) + +val waiter = new Object() +var done = false +var type1 = new AtomicInteger(0) +var queued0: WriteQueueResult = null +var queued1: WriteQueueResult = null +var queued2: WriteQueueResult = null +var queued3: WriteQueueResult = null + Review comment: remove one empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285753585 ## File path: common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java ## @@ -132,6 +133,48 @@ public void testArrayIndices() throws Exception { assertEquals(o, store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next()); } + @Test + public void testRemoveAll() throws Exception { +KVStore store = new InMemoryStore(); + +for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { +ArrayKeyIndexType o = new ArrayKeyIndexType(); +o.key = new int[] { i, j, 0 }; +o.id = new String[] { "things" }; +store.write(o); + +o = new ArrayKeyIndexType(); +o.key = new int[] { i, j, 1 }; +o.id = new String[] { "more things" }; +store.write(o); + } +} + +ArrayKeyIndexType o = new ArrayKeyIndexType(); +o.key = new int[] { 2, 2, 2 }; +o.id = new String[] { "things" }; +store.write(o); + +assertEquals(9, store.count(ArrayKeyIndexType.class)); + +HashSet set = new HashSet(); Review comment: Add generic type. Or you could use `ImmutableSet` and not need this local variable (and avoid re-using it for different types and avoid the warnings from javac at the same time). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285754218 ## File path: common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java ## @@ -198,6 +199,46 @@ public void testUpdate() throws Exception { assertEquals(0, db.count(t.getClass(), "name", "name")); } + @Test + public void testRemoveAll() throws Exception { +for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { +ArrayKeyIndexType o = new ArrayKeyIndexType(); +o.key = new int[] { i, j, 0 }; +o.id = new String[] { "things" }; +db.write(o); + +o = new ArrayKeyIndexType(); +o.key = new int[] { i, j, 1 }; +o.id = new String[] { "more things" }; +db.write(o); + } +} + +ArrayKeyIndexType o = new ArrayKeyIndexType(); +o.key = new int[] { 2, 2, 2 }; +o.id = new String[] { "things" }; +db.write(o); + +assertEquals(9, db.count(ArrayKeyIndexType.class)); + +HashSet set = new HashSet(); Review comment: Same thing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285313680 ## File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ## @@ -1142,20 +1144,10 @@ private[spark] class AppStatusListener( s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING } -stages.foreach { s => +val stageIndexValues = stages.map { s => Review comment: `s/stageIndexValues/stageIds` (or `stageKeys`, since here they're actually the primary keys of the stages being removed...) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285313162 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +134,149 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList) data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +/** + * A BiConsumer to control multi-entity removal. We use this in a forEach rather than an + * iterator because there is a bug in jdk8 which affects remove() on all concurrent map + * iterators. https://bugs.openjdk.java.net/browse/JDK-8078645 + */ +private static class CountingRemoveIfForEach implements BiConsumer, T> { + private final ConcurrentMap, T> data; + private final Predicate filter; + + /** + * Keeps a count of the number of elements removed. This count is not currently surfaced + * to clients of KVStore as Java's generic removeAll() construct returns only a boolean, + * but I found it handy to have the count of elements removed while debugging; a count being + * no more complicated than a boolean, I've retained that behavior here, even though there + * is no current requirement. + */ + private int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } + + public int count() { return count; } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; +private final ConcurrentMap, T> data; -private int size; - -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +int countingRemoveAllByIndexValues(String index, Collection indexValues) { + Predicate filter = getPredicate(ti.getAccessor(index), indexValues); + CountingRemoveIfForEach callback = new CountingRemoveIfForEach<>(data, filter); + + data.forEach(callback); + return callback.count(); +} + +public T get(Object key) { return data.get(asKey(key)); } -public void put(Object value) throws Exception { - Preconditions.checkArgument(ti.type().equals(value.getClass()), -"Unexpected type: %s", value.getClass()); - if (data.put(asKey(naturalKey.get(value)), value) == null) { -size++; - } +public void put(T value) throws Exception { + data.put(asKey(naturalKey.get(value)), value); } public void delete(Object key) { - if (data.remove(asKey(key)) != null) { -size--; - } + data.remove(asKey(key)); } public int size() { - return size; + return data.size(); } -@SuppressWarnings("unchecked") -public InMemoryView view(Class type) { - Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type); - Collection all = (Collection) data.values(); - return new InMemoryView<>(type, all, ti); +public InMemoryView view() { + return new InMemoryView<>(data.values(), ti); +} + +private static Predicate getPredicate( +KVTypeInfo.Accessor getter, +Collection keys) { Review comment: s/keys/values This is an automated message from
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285313350 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java ## @@ -126,4 +127,9 @@ */ long count(Class type, String index, Object indexedValue) throws Exception; + /** + * A cheaper way to remove multiple items from the KVStore + */ + boolean removeAllByIndexValues(Class klass, String index, Collection indexValues) + throws Exception; Review comment: nit: indented too much (2 or 4 spaces is fine here) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285313778 ## File path: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ## @@ -46,7 +50,28 @@ import org.apache.spark.util.kvstore._ */ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { - private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private class LatchedTriggers(val triggers: Seq[Trigger[_]]) { +private val pending = new AtomicBoolean(false) + +def fireOnce(f: Seq[Trigger[_]] => Unit): WriteQueueResult = { + val shouldEnqueue = pending.compareAndSet(false, true) Review comment: Actually don't need this variable now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r285313219 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +134,149 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList) data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +/** + * A BiConsumer to control multi-entity removal. We use this in a forEach rather than an + * iterator because there is a bug in jdk8 which affects remove() on all concurrent map + * iterators. https://bugs.openjdk.java.net/browse/JDK-8078645 + */ +private static class CountingRemoveIfForEach implements BiConsumer, T> { + private final ConcurrentMap, T> data; + private final Predicate filter; + + /** + * Keeps a count of the number of elements removed. This count is not currently surfaced + * to clients of KVStore as Java's generic removeAll() construct returns only a boolean, + * but I found it handy to have the count of elements removed while debugging; a count being + * no more complicated than a boolean, I've retained that behavior here, even though there + * is no current requirement. + */ + private int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } + + public int count() { return count; } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; +private final ConcurrentMap, T> data; -private int size; - -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +int countingRemoveAllByIndexValues(String index, Collection indexValues) { + Predicate filter = getPredicate(ti.getAccessor(index), indexValues); + CountingRemoveIfForEach callback = new CountingRemoveIfForEach<>(data, filter); + + data.forEach(callback); + return callback.count(); +} + +public T get(Object key) { return data.get(asKey(key)); } -public void put(Object value) throws Exception { - Preconditions.checkArgument(ti.type().equals(value.getClass()), -"Unexpected type: %s", value.getClass()); - if (data.put(asKey(naturalKey.get(value)), value) == null) { -size++; - } +public void put(T value) throws Exception { + data.put(asKey(naturalKey.get(value)), value); } public void delete(Object key) { - if (data.remove(asKey(key)) != null) { -size--; - } + data.remove(asKey(key)); } public int size() { - return size; + return data.size(); } -@SuppressWarnings("unchecked") -public InMemoryView view(Class type) { - Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type); - Collection all = (Collection) data.values(); - return new InMemoryView<>(type, all, ti); +public InMemoryView view() { + return new InMemoryView<>(data.values(), ti); +} + +private static Predicate getPredicate( +KVTypeInfo.Accessor getter, +Collection keys) { + if (Comparable.class.isAssignableFrom(getter.getType())) { +HashSet set = new HashSet<>(keys); + +return (value)
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284797365 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; + Predicate filter; + int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on +// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to +// implement correctly against the values() iterator, we use forEach instead +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; - -private int size; +private final ConcurrentMap, T> data; -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +// Note: removeIf returns a boolean if any element has been removed. +// While debugging this code, it was handy to have the count of elements +// removed, rather than an indicator of whether something has been +// removed, and a count is no more complicated than a boolean so I've +// retained that behavior here, although there is no current requirement. +@SuppressWarnings("unchecked") +int countingRemoveAllByKeys(String index, Collection keys) { Review comment: I very much believe that there might be very confusing terminology in this code. I went back and forth on implementation and interfaces a ton of times before reaching something I was happy with, and by that time I didn't really bother with the internal naming of things so much. But here we're talking about a new method in a "public" interface (not this particular line, but the new method in `KVStore`), so better be a little bit more careful. And IMO index values are not keys, so "removeByKeys" is a little weird. Maybe "removeByIndexValues" is clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284493151 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; + Predicate filter; + int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on +// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to +// implement correctly against the values() iterator, we use forEach instead +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; - -private int size; +private final ConcurrentMap, T> data; -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +// Note: removeIf returns a boolean if any element has been removed. +// While debugging this code, it was handy to have the count of elements +// removed, rather than an indicator of whether something has been +// removed, and a count is no more complicated than a boolean so I've +// retained that behavior here, although there is no current requirement. +@SuppressWarnings("unchecked") +int countingRemoveAllByKeys(String index, Collection keys) { + Predicate filter = getPredicate(ti.getAccessor(index), keys); + CountingRemoveIfForEach callback = new CountingRemoveIfForEach<>(data, filter); + + data.forEach(callback); + return callback.count; +} + +public T get(Object key) { return data.get(asKey(key)); } -public void put(Object value) throws Exception { - Preconditions.checkArgument(ti.type().equals(value.getClass()), -"Unexpected type: %s", value.getClass()); - if (data.put(asKey(naturalKey.get(value)), value) == null) { -size++; - } +public void put(T value) throws Exception { + data.put(asKey(naturalKey.get(value)), value); } public void delete(Object key) { - if (data.remove(asKey(key)) != null) { -size--; - } + data.remove(asKey(key)); } public int size() { - return size; + return data.size(); +} + +@SuppressWarnings("unchecked") +public InMemoryView view() { + return new InMemoryView<>(data.values(), ti); } @SuppressWarnings("unchecked") -public InMemoryView view(Class type) { - Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type); - Collection all = (Collection) data.values(); - return new InMemoryView<>(type, all, ti); +private static Predicate getPredicate( +KVTypeInfo.Accessor getter, +Collection keys) { + if (Comparable.class.isAssignableFrom(getter.getType())) { +HashSet set = new HashSet(keys); + +return (value) -> set.contains(keyFromValue(getter, value)); + } else { +HashSet set = new
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284491196 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; + Predicate filter; + int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on Review comment: This comment sounds like it belongs at this class's declaration, not inside this method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284490875 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; Review comment: nit: private, final where it makes sense This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284494996 ## File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ## @@ -1177,16 +1169,20 @@ private[spark] class AppStatusListener( } cleanupCachedQuantiles(key) + key } +// Delete summaries in one pass, as deleting them for each stage is slow +val totalSummariesDeleted = kvstore.removeAllByKeys( Review comment: Unused variable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284494865 ## File path: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ## @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._ */ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { - private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private class LatchedTriggers(val triggers: Seq[Trigger[_]]) { +val countDeferred = new AtomicInteger(0) + +def fireOnce(f: Seq[Trigger[_]] => Unit): Boolean = { + val shouldExecute = countDeferred.compareAndSet(0, 1) + if (shouldExecute) { +doAsync { + countDeferred.set(0) + f(triggers) +} + } + shouldExecute Review comment: You could avoid the `WriteQueueResult` object with an inlined `if...else` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284494214 ## File path: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ## @@ -46,7 +50,26 @@ import org.apache.spark.util.kvstore._ */ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { - private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private class LatchedTriggers(val triggers: Seq[Trigger[_]]) { +val countDeferred = new AtomicInteger(0) Review comment: You're kinda using this as a boolean, so `AtomicBoolean`? Also, `private`, and the variable name is a little cryptic (maybe just call it `busy` or something?). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284492184 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); +} + +@SuppressWarnings("unchecked") +public void write(T value) throws Exception { + InstanceList list = +(InstanceList) data.computeIfAbsent(value.getClass(), InstanceList::new); + list.put(value); +} + +public void clear() { + data.clear(); +} + } + + private static class InstanceList { + +private static class CountingRemoveIfForEach implements BiConsumer, T> { + ConcurrentMap, T> data; + Predicate filter; + int count = 0; + + CountingRemoveIfForEach( + ConcurrentMap, T> data, + Predicate filter) { +this.data = data; +this.filter = filter; + } + + public void accept(Comparable key, T value) { +// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on +// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to +// implement correctly against the values() iterator, we use forEach instead +if (filter.test(value)) { + if (data.remove(key, value)) { +count++; + } +} + } +} private final KVTypeInfo ti; private final KVTypeInfo.Accessor naturalKey; -private final ConcurrentMap, Object> data; - -private int size; +private final ConcurrentMap, T> data; -private InstanceList(Class type) throws Exception { - this.ti = new KVTypeInfo(type); +private InstanceList(Class klass) { + this.ti = new KVTypeInfo(klass); this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME); this.data = new ConcurrentHashMap<>(); - this.size = 0; } KVTypeInfo.Accessor getIndexAccessor(String indexName) { return ti.getAccessor(indexName); } -public Object get(Object key) { +// Note: removeIf returns a boolean if any element has been removed. +// While debugging this code, it was handy to have the count of elements +// removed, rather than an indicator of whether something has been +// removed, and a count is no more complicated than a boolean so I've +// retained that behavior here, although there is no current requirement. +@SuppressWarnings("unchecked") +int countingRemoveAllByKeys(String index, Collection keys) { Review comment: IIUC you're removing all entries whose value corresponding to the given index match the given `keys` list. So it sounds to me like the name here (and in related code) should use `ByValues` instead of `ByKeys`? Also, you have a raw type in the argument list, which tells me that your `@SuppressWarnings` is either incorrect or perhaps not needed (if you make the argument `Collection`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads URL: https://github.com/apache/spark/pull/24616#discussion_r284490577 ## File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java ## @@ -126,64 +132,144 @@ public void close() { return (Comparable) in; } - private static class InstanceList { + @SuppressWarnings("unchecked") + private static KVStoreView emptyView() { +return (InMemoryView) InMemoryView.EMPTY_VIEW; + } + + /** + * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a + * class of type T to an InstanceList of type T. + */ + private static class InMemoryLists { +private ConcurrentMap, InstanceList> data = new ConcurrentHashMap<>(); + +@SuppressWarnings("unchecked") +public InstanceList get(Class type) { + return (InstanceList)data.get(type); Review comment: nit: space after `)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org