davidnavas commented on a change in pull request #24616: [SPARK-27726] [Core]
Fix performance of ElementTrackingStore deletes when using InMemoryStore under
high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284742856
##########
File path:
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
##########
@@ -126,64 +132,144 @@ public void close() {
return (Comparable<Object>) in;
}
- private static class InstanceList {
+ @SuppressWarnings("unchecked")
+ private static <T> KVStoreView<T> emptyView() {
+ return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
+ }
+
+ /**
+ * Encapsulates ConcurrentHashMap so that the typing in and out of the map
strictly maps a
+ * class of type T to an InstanceList of type T.
+ */
+ private static class InMemoryLists {
+ private ConcurrentMap<Class<?>, InstanceList<?>> data = new
ConcurrentHashMap<>();
+
+ @SuppressWarnings("unchecked")
+ public <T> InstanceList<T> get(Class<T> type) {
+ return (InstanceList<T>)data.get(type);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> void write(T value) throws Exception {
+ InstanceList<T> list =
+ (InstanceList<T>) data.computeIfAbsent(value.getClass(),
InstanceList::new);
+ list.put(value);
+ }
+
+ public void clear() {
+ data.clear();
+ }
+ }
+
+ private static class InstanceList<T> {
+
+ private static class CountingRemoveIfForEach<T> implements
BiConsumer<Comparable<Object>, T> {
+ ConcurrentMap<Comparable<Object>, T> data;
+ Predicate<? super T> filter;
+ int count = 0;
+
+ CountingRemoveIfForEach(
+ ConcurrentMap<Comparable<Object>, T> data,
+ Predicate<? super T> filter) {
+ this.data = data;
+ this.filter = filter;
+ }
+
+ public void accept(Comparable<Object> key, T value) {
+ // To address https://bugs.openjdk.java.net/browse/JDK-8078645 which
affects remove() on
+ // all iterators of concurrent maps, and specifically makes
countingRemoveIf difficult to
+ // implement correctly against the values() iterator, we use forEach
instead....
+ if (filter.test(value)) {
+ if (data.remove(key, value)) {
+ count++;
+ }
+ }
+ }
+ }
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
- private final ConcurrentMap<Comparable<Object>, Object> data;
-
- private int size;
+ private final ConcurrentMap<Comparable<Object>, T> data;
- private InstanceList(Class<?> type) throws Exception {
- this.ti = new KVTypeInfo(type);
+ private InstanceList(Class<?> klass) {
+ this.ti = new KVTypeInfo(klass);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentHashMap<>();
- this.size = 0;
}
KVTypeInfo.Accessor getIndexAccessor(String indexName) {
return ti.getAccessor(indexName);
}
- public Object get(Object key) {
+ // Note: removeIf returns a boolean if any element has been removed.
+ // While debugging this code, it was handy to have the count of elements
+ // removed, rather than an indicator of whether something has been
+ // removed, and a count is no more complicated than a boolean so I've
+ // retained that behavior here, although there is no current requirement.
+ @SuppressWarnings("unchecked")
+ int countingRemoveAllByKeys(String index, Collection keys) {
Review comment:
Hmm, I suspect we have a terminology overload issue here.
1) The read|write|view methods on KVStore refer to the things you read,
write, and look at as "instance", "object", and "entities" respectively (and
also "value" for write() as the name of the parameter).
2) read() provides a "naturalKey" to access a specific instance, which is
the value of the unique/primary/NATURAL_INDEX_NAME index
3) KVStoreView refers to the values of an index as passed in first() and
last() as values, not keys -- naturalKey or no
4) InMemoryStore refers to the Comparable wrappers placed around the values
of a particular index as a key.
5) KVStore seems to say that the key is actually created per type written
and are based on the type name (which itself is referred to as either klass or
type)
So, yes, I'm probably using terminology wrong, and I hereby declare myself
confused :(
With respect to klass vs type I had gone with klass in the removeAll___() as
it was consistent with having 'klass' everywhere in the ElementTrackingStore,
but please let me know if that should be changed!
With respect to key, value, field-value-for-index,
comparable-wrapper-around-field-value-for-index, I admit to not knowing what to
call which things when. One way to deal with this is to make the removeAll on
the KVStoreView, where "value" is everywhere (I think) considered in the
context of the type and index. The upside is that we could naturally call such
a method removeAll() as the class and index are owned by the View. From an
impl standpoint, it would make the definition of LevelDB's view quite a bit
more complicated, and it would require work in InMemoryView as well -- I'd need
to pass down the containing hash of indexed-values->entity, rather than just
the entities (locally referred to as elements). The other downside is that
you're using a "View" to mutate....
Another approach would be to use 'values' as you suggested with some
commentary to clear up what we mean by values in the parameter and call the
methods something like removeAllIndexedValues() and (internally)
indexedValueFromEntity() or some such.
Thoughts?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]