Repository: apex-malhar Updated Branches: refs/heads/master d713e521e -> 7ac4a0ed7
APEXMALHAR-2248 #resolve Added interfaces and implementations of SpillableSet and SpillableSetMultimap Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7ac4a0ed Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7ac4a0ed Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7ac4a0ed Branch: refs/heads/master Commit: 7ac4a0ed759b06c382fa54fc148114709c1452f2 Parents: d713e52 Author: David Yan <da...@datatorrent.com> Authored: Mon Sep 19 17:58:12 2016 -0700 Committer: Siyuan Hua <hsy...@apache.org> Committed: Thu Sep 22 08:56:43 2016 -0700 ---------------------------------------------------------------------- .../malhar/lib/state/spillable/Spillable.java | 35 +- .../spillable/SpillableComplexComponent.java | 32 +- .../SpillableComplexComponentImpl.java | 9 + .../lib/state/spillable/SpillableSetImpl.java | 352 +++++++++++++++++++ .../spillable/SpillableSetMultimapImpl.java | 321 +++++++++++++++++ .../malhar/lib/utils/serde/SerdeKryoSlice.java | 100 ++++++ .../malhar/lib/utils/serde/SerdeLongSlice.java | 54 +++ .../malhar/lib/utils/serde/SerdePairSlice.java | 89 +++++ .../state/spillable/SpillableSetImplTest.java | 148 ++++++++ .../spillable/SpillableSetMultimapImplTest.java | 298 ++++++++++++++++ .../lib/utils/serde/SerdeKryoSliceTest.java | 79 +++++ .../lib/utils/serde/SerdePairSliceTest.java | 44 +++ 12 files changed, 1546 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java index 4c9b997..849389b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java @@ -21,9 +21,11 @@ package org.apache.apex.malhar.lib.state.spillable; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import com.google.common.collect.ListMultimap; import com.google.common.collect.Multiset; +import com.google.common.collect.SetMultimap; import com.datatorrent.api.Component; import com.datatorrent.api.Context.OperatorContext; @@ -37,7 +39,7 @@ public interface Spillable { /** * This represents a spillable {@link java.util.List}. The underlying implementation - * of this list is similar to that of an {@link java.util.ArrayList}. User's that receive an + * of this list is similar to that of an {@link java.util.ArrayList}. Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param <T> The type of the data stored in the {@link SpillableArrayList}. @@ -47,9 +49,19 @@ public interface Spillable } /** + * This represents a spillable {@link java.util.Set}. Users that receive an + * implementation of this interface don't need to worry about propagating operator call-backs + * to the data structure. + * @param <T> The type of the data stored in the {@link SpillableSet}. + */ + interface SpillableSet<T> extends Set<T> + { + } + + /** * This represents a spillable {@link java.util.Map}. Implementations make * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is - * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an + * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param <K> The type of the keys. @@ -62,7 +74,7 @@ public interface Spillable /** * This represents a spillable {@link com.google.common.collect.ListMultimap} implementation. Implementations make * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is - * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an + * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param <K> The type of the keys. @@ -73,9 +85,22 @@ public interface Spillable } /** + * This represents a spillable {@link com.google.common.collect.SetMultimap} implementation. Implementations make + * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is + * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an + * implementation of this interface don't need to worry about propagating operator call-backs + * to the data structure. + * @param <K> The type of the keys. + * @param <V> The type of the values. + */ + interface SpillableSetMultimap<K, V> extends SetMultimap<K, V> + { + } + + /** * This represents a spillable {@link com.google.common.collect.Multiset} implementation. Implementations make * some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is - * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). User's that receive an + * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs to the data structure. */ interface SpillableByteMultiset<T> extends Multiset<T> @@ -83,7 +108,7 @@ public interface Spillable } /** - * This represents a spillable {@link java.util.Queue} implementation. User's that receive an + * This represents a spillable {@link java.util.Queue} implementation. Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. * @param <T> The type of the data stored in the queue. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java index c63c7ef..e4836c4 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java @@ -39,7 +39,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * This is a method for creating a {@link SpillableArrayList}. This method * auto-generates an identifier for the data structure. * @param <T> The type of data stored in the {@link SpillableArrayList}. - * @param bucket The bucket that this {@link SpillableArrayList} will be spilled too. + * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to. * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}. * @return A {@link SpillableArrayList}. */ @@ -49,7 +49,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * This is a method for creating a {@link SpillableArrayList}. * @param <T> The type of data stored in the {@link SpillableArrayList}. * @param identifier The identifier for this {@link SpillableArrayList}. - * @param bucket The bucket that this {@link SpillableArrayList} will be spilled too. + * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to. * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}. * @return A {@link SpillableArrayList}. */ @@ -60,7 +60,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * auto-generates an identifier for the data structure. * @param <K> The type of the keys. * @param <V> The type of the values. - * @param bucket The bucket that this {@link SpillableByteMap} will be spilled too. + * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the map's values. * @return A {@link SpillableByteMap}. @@ -73,7 +73,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param <K> The type of the keys. * @param <V> The type of the values. * @param identifier The identifier for this {@link SpillableByteMap}. - * @param bucket The bucket that this {@link SpillableByteMap} will be spilled too. + * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the map's values. * @return A {@link SpillableByteMap}. @@ -86,7 +86,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * auto-generates an identifier for the data structure. * @param <K> The type of the keys. * @param <V> The type of the values in the map's lists. - * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled too. + * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. * @return A {@link SpillableByteArrayListMultimap}. @@ -99,7 +99,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * @param <K> The type of the keys. * @param <V> The type of the values in the map's lists. * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}. - * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled too. + * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. * @return A {@link SpillableByteArrayListMultimap}. @@ -109,10 +109,22 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S Serde<V, Slice> serdeValue); /** + * This is a method for creating a {@link SpillableSetMultimap}. + * @param <K> The type of the keys. + * @param <V> The type of the values in the map's lists. + * @param bucket The bucket that this {@link SpillableSetMultimap} will be spilled to. + * @param serdeKey The Serializer/Deserializer to use for the map's keys. + * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. + * @return A {@link SpillableSetMultimap}. + */ + <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K, + Slice> serdeKey, Serde<V, Slice> serdeValue); + + /** * This is a method for creating a {@link SpillableByteMultiset}. This method * auto-generates an identifier for the data structure. * @param <T> The type of the elements. - * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too. + * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to. * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}. * @return A {@link SpillableByteMultiset}. */ @@ -122,7 +134,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * This is a method for creating a {@link SpillableByteMultiset}. * @param <T> The type of the elements. * @param identifier The identifier for this {@link SpillableByteMultiset}. - * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too. + * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to. * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}. * @return A {@link SpillableByteMultiset}. */ @@ -132,7 +144,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * This is a method for creating a {@link SpillableQueue}. This method * auto-generates an identifier for the data structure. * @param <T> The type of the data stored in the {@link SpillableQueue}. - * @param bucket The bucket that this {@link SpillableQueue} will be spilled too. + * @param bucket The bucket that this {@link SpillableQueue} will be spilled to. * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}. * @return A {@link SpillableQueue}. */ @@ -142,7 +154,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S * This is a method for creating a {@link SpillableQueue}. * @param <T> The type of the data stored in the {@link SpillableQueue}. * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}. - * @param bucket The bucket that this {@link SpillableQueue} will be spilled too. + * @param bucket The bucket that this {@link SpillableQueue} will be spilled to. * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}. * @return A {@link SpillableQueue}. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java index aeb02c5..9c3defc 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java @@ -119,6 +119,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent return map; } + public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K, + Slice> serdeKey, Serde<V, Slice> serdeValue) + { + SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store, + identifierGenerator.next(), bucket, serdeKey, serdeValue); + componentList.add(map); + return map; + } + public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde) { throw new UnsupportedOperationException("Unsupported Operation"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java new file mode 100644 index 0000000..122cd2d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}. + * @param <T> The type of object stored in the {@link SpillableSetImpl}. + * + * @since 3.5.0 + */ +@DefaultSerializer(FieldSerializer.class) +@InterfaceStability.Evolving +public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable.SpillableComponent +{ + private static class ListNode<T> + { + ListNode() + { + } + + ListNode(boolean valid, T next) + { + this.valid = valid; + this.next = next; + } + + boolean valid; + T next; + } + + public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice> + { + private Serde<T, Slice> serde; + private static Slice falseSlice = new Slice(new byte[]{0}); + private static Slice trueSlice = new Slice(new byte[]{1}); + + public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde) + { + this.serde = Preconditions.checkNotNull(serde); + } + + @Override + public Slice serialize(ListNode<T> object) + { + int size = 0; + + Slice slice1 = object.valid ? trueSlice : falseSlice; + size += 1; + Slice slice2 = serde.serialize(object.next); + size += slice2.length; + + byte[] bytes = new byte[size]; + System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length); + System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length); + + return new Slice(bytes); + } + + @Override + public ListNode<T> deserialize(Slice slice, MutableInt offset) + { + ListNode<T> result = new ListNode<>(); + result.valid = slice.buffer[offset.intValue()] != 0; + offset.add(1); + result.next = serde.deserialize(slice, offset); + return result; + } + + @Override + public ListNode<T> deserialize(Slice object) + { + return deserialize(object, new MutableInt(0)); + } + } + + @NotNull + private SpillableStateStore store; + @NotNull + private SpillableByteMapImpl<T, ListNode<T>> map; + + private T head; + private int size; + + private SpillableSetImpl() + { + //for kryo + } + + public SpillableStateStore getStore() + { + return store; + } + + /** + * Creates a {@link SpillableSetImpl}. + * @param bucketId The Id of the bucket used to store this + * {@link SpillableSetImpl} in the provided {@link SpillableStateStore}. + * @param prefix The Id of this {@link SpillableSetImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param serde The {@link Serde} to use when serializing and deserializing data. + */ + public SpillableSetImpl(long bucketId, @NotNull byte[] prefix, + @NotNull SpillableStateStore store, + @NotNull Serde<T, Slice> serde) + { + this.store = Preconditions.checkNotNull(store); + + map = new SpillableByteMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde)); + } + + public void setSize(int size) + { + Preconditions.checkArgument(size >= 0); + this.size = size; + } + + public void setHead(T head) + { + Preconditions.checkNotNull(head); + this.head = head; + } + + public T getHead() + { + return head; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean isEmpty() + { + return size == 0; + } + + @Override + public boolean contains(Object o) + { + T t = (T)o; + ListNode<T> node = map.get(t); + return node != null && node.valid; + } + + @Override + public Iterator<T> iterator() + { + return new Iterator<T>() + { + T cur = head; + T prev = null; + + @Override + public boolean hasNext() + { + while (cur != null) { + ListNode<T> node = map.get(cur); + if (node.valid) { + return true; + } + if (cur.equals(node.next)) { + break; + } else { + cur = node.next; + } + } + return false; + } + + @Override + public T next() + { + while (cur != null) { + ListNode<T> node = map.get(cur); + try { + if (node.valid) { + prev = cur; + return prev; + } + } finally { + if (cur.equals(node.next)) { + cur = null; + } else { + cur = node.next; + } + } + } + throw new NoSuchElementException(); + } + + @Override + public void remove() + { + ListNode<T> node = map.get(prev); + node.valid = false; + map.put(prev, node); + size--; + } + }; + } + + @Override + public Object[] toArray() + { + throw new UnsupportedOperationException(); + } + + @Override + public <T1> T1[] toArray(T1[] t1s) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(T t) + { + Preconditions.checkArgument((size() + 1) > 0); + ListNode<T> node = map.get(t); + if (node == null) { + map.put(t, new ListNode<>(true, head == null ? t : head)); + head = t; + size++; + return true; + } else if (!node.valid) { + node.valid = true; + map.put(t, node); + size++; + return true; + } else { + return false; + } + } + + @Override + public boolean remove(Object o) + { + T t = (T)o; + ListNode<T> node = map.get(t); + if (node == null || !node.valid) { + return false; + } else { + node.valid = false; + map.put(t, node); + size--; + return true; + } + } + + @Override + public boolean containsAll(Collection<?> collection) + { + for (Object item : collection) { + if (!contains(item)) { + return false; + } + } + return true; + } + + @Override + public boolean addAll(Collection<? extends T> collection) + { + for (T element: collection) { + add(element); + } + + return true; + } + + @Override + public boolean removeAll(Collection<?> collection) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection<?> collection) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + Iterator<T> it = iterator(); + while (it.hasNext()) { + it.next(); + it.remove(); + } + } + + @Override + public void setup(Context.OperatorContext context) + { + map.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + map.beginWindow(windowId); + } + + @Override + public void endWindow() + { + map.endWindow(); + } + + @Override + public void teardown() + { + map.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java new file mode 100644 index 0000000..951ef76 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}. + * + * @since 3.5.0 + */ +@DefaultSerializer(FieldSerializer.class) +@InterfaceStability.Evolving +public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMultimap<K, V>, + Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl<Slice, Pair<Integer, V>> map; + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>(); + + private SpillableSetMultimapImpl() + { + // for kryo + } + + /** + * Creates a {@link SpillableSetMultimapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableSetMultimapImpl}. + * @param bucket The Id of the bucket used to store this + * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}. + * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. + * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + */ + public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + this.store = Preconditions.checkNotNull(store); + this.identifier = Preconditions.checkNotNull(identifier); + this.bucket = bucket; + this.serdeKey = Preconditions.checkNotNull(serdeKey); + this.serdeValue = Preconditions.checkNotNull(serdeValue); + + map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue)); + } + + public SpillableStateStore getStore() + { + return store; + } + + @Override + public Set<V> get(@NotNull K key) + { + return getHelper(key); + } + + private SpillableSetImpl<V> getHelper(@NotNull K key) + { + SpillableSetImpl<V> spillableSet = cache.get(key); + + if (spillableSet == null) { + Slice keySlice = serdeKey.serialize(key); + Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX)); + + if (meta == null) { + return null; + } + + Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice); + spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue); + spillableSet.setSize(meta.getLeft()); + spillableSet.setHead(meta.getRight()); + } + + cache.put(key, spillableSet); + + return spillableSet; + } + + @Override + public Set<K> keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Multiset<K> keys() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<V> values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set<Map.Entry<K, V>> entries() + { + throw new UnsupportedOperationException(); + } + + /** + * Note that this always returns null because the set is no longer valid after this call + * + * @param key + * @return null + */ + @Override + public Set<V> removeAll(@NotNull Object key) + { + SpillableSetImpl<V> spillableSet = getHelper((K)key); + if (spillableSet != null) { + cache.remove((K)key); + Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX); + map.remove(keySlice); + spillableSet.clear(); + removedSets.add(spillableSet); + } + return null; + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys + return map.size(); + } + + @Override + public boolean isEmpty() + { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) + { + if (cache.contains((K)key)) { + return true; + } + Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX); + return map.containsKey(keySlice); + } + + @Override + public boolean containsValue(@NotNull Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(Object key, Object value) + { + Set<V> set = get((K)key); + if (set == null) { + return false; + } else { + return set.contains(value); + } + } + + @Override + public boolean put(K key, V value) + { + SpillableSetImpl<V> spillableSet = getHelper(key); + + if (spillableSet == null) { + Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key)); + spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue); + cache.put(key, spillableSet); + } + spillableSet.add(value); + return true; + } + + @Override + public boolean remove(@NotNull Object key, @NotNull Object value) + { + Set<V> set = get((K)key); + if (set == null) { + return false; + } else { + return set.remove(value); + } + } + + @Override + public boolean putAll(@Nullable K key, Iterable<? extends V> values) + { + boolean changed = false; + + for (V value: values) { + changed |= put(key, value); + } + + return changed; + } + + @Override + public boolean putAll(Multimap<? extends K, ? extends V> multimap) + { + boolean changed = false; + + for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) { + changed |= put(entry.getKey(), entry.getValue()); + } + + return changed; + } + + @Override + public Set<V> replaceValues(K key, Iterable<? extends V> values) + { + throw new UnsupportedOperationException(); + } + + @Override + public Map<K, Collection<V>> asMap() + { + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + map.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + map.beginWindow(windowId); + } + + @Override + public void endWindow() + { + for (K key: cache.getChangedKeys()) { + + SpillableSetImpl<V> spillableSet = cache.get(key); + spillableSet.endWindow(); + + map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX), + new ImmutablePair<>(spillableSet.size(), spillableSet.getHead())); + } + + for (SpillableSetImpl removedSet : removedSets) { + removedSet.endWindow(); + } + + cache.endWindow(); + map.endWindow(); + } + + @Override + public void teardown() + { + map.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java new file mode 100644 index 0000000..d4b9488 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.io.ByteArrayOutputStream; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import com.datatorrent.netlet.util.Slice; + +/** + * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because + * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or + * incompatible changes to the class being serialized. + * + * @param <T> The type being serialized + */ +@InterfaceStability.Evolving +public class SerdeKryoSlice<T> implements Serde<T, Slice> +{ + // Setup ThreadLocal of Kryo instances + private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() + { + protected Kryo initialValue() + { + Kryo kryo = new Kryo(); + // configure kryo instance, customize settings + return kryo; + } + }; + + private final Class<? extends T> clazz; + + public SerdeKryoSlice() + { + this.clazz = null; + } + + public SerdeKryoSlice(Class<? extends T> clazz) + { + this.clazz = clazz; + } + + @Override + public Slice serialize(T object) + { + Kryo kryo = kryos.get(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + Output output = new Output(stream); + if (clazz == null) { + kryo.writeClassAndObject(output, object); + } else { + kryo.writeObject(output, object); + } + return new Slice(output.toBytes()); + } + + @Override + public T deserialize(Slice slice, MutableInt offset) + { + byte[] bytes = slice.toByteArray(); + Kryo kryo = kryos.get(); + Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue()); + T object; + if (clazz == null) { + object = (T)kryo.readClassAndObject(input); + } else { + object = kryo.readObject(input, clazz); + } + offset.setValue(bytes.length - input.position()); + return object; + } + + @Override + public T deserialize(Slice slice) + { + return deserialize(slice, new MutableInt(0)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java new file mode 100644 index 0000000..6fe07d9 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of {@link Serde} which deserializes and serializes integers. + * + * @since 3.5.0 + */ +@InterfaceStability.Evolving +public class SerdeLongSlice implements Serde<Long, Slice> +{ + @Override + public Slice serialize(Long object) + { + return new Slice(GPOUtils.serializeLong(object)); + } + + @Override + public Long deserialize(Slice slice, MutableInt offset) + { + long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue())); + offset.add(8); + return val; + } + + @Override + public Long deserialize(Slice object) + { + return deserialize(object, new MutableInt(0)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java new file mode 100644 index 0000000..59cf282 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of {@link Serde} which serializes and deserializes pairs. + */ +@InterfaceStability.Evolving +public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice> +{ + @NotNull + private Serde<T1, Slice> serde1; + @NotNull + private Serde<T2, Slice> serde2; + + private SerdePairSlice() + { + // for Kryo + } + + /** + * Creates a {@link SerdePairSlice}. + * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair + * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair + */ + public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2) + { + this.serde1 = Preconditions.checkNotNull(serde1); + this.serde2 = Preconditions.checkNotNull(serde2); + } + + @Override + public Slice serialize(Pair<T1, T2> pair) + { + int size = 0; + + Slice slice1 = serde1.serialize(pair.getLeft()); + size += slice1.length; + Slice slice2 = serde2.serialize(pair.getRight()); + size += slice2.length; + + byte[] bytes = new byte[size]; + System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length); + System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length); + + return new Slice(bytes); + } + + @Override + public Pair<T1, T2> deserialize(Slice slice, MutableInt offset) + { + T1 first = serde1.deserialize(slice, offset); + T2 second = serde2.deserialize(slice, offset); + return new ImmutablePair<>(first, second); + } + + @Override + public Pair<T1, T2> deserialize(Slice slice) + { + return deserialize(slice, new MutableInt(0)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java new file mode 100644 index 0000000..3883191 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.HashSet; +import java.util.Iterator; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.google.common.collect.Lists; + +public class SpillableSetImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleAddGetAndSetTest1() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleAddGetAndSetTest1Helper(store); + } + + @Test + public void simpleAddGetAndSetManagedStateTest1() + { + simpleAddGetAndSetTest1Helper(testMeta.store); + } + + public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) + { + SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + set.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + set.beginWindow(windowId); + + Assert.assertEquals(0, set.size()); + + set.add("a"); + + Assert.assertEquals(1, set.size()); + + Assert.assertTrue(set.contains("a")); + + set.addAll(Lists.newArrayList("a", "b", "c")); + + Assert.assertEquals(3, set.size()); + + Assert.assertTrue(set.contains("a")); + Assert.assertTrue(set.contains("b")); + Assert.assertTrue(set.contains("c")); + + HashSet<String> result = new HashSet<>(); + Iterator<String> it = set.iterator(); + int i = 0; + while (it.hasNext()) { + result.add(it.next()); + i++; + } + Assert.assertTrue(result.containsAll(Lists.newArrayList("a", "b", "c"))); + Assert.assertEquals(3, i); + + it = set.iterator(); + while (it.hasNext()) { + if ("b".equals(it.next())) { + it.remove(); + } + } + Assert.assertEquals(2, set.size()); + Assert.assertTrue(set.contains("a")); + Assert.assertFalse(set.contains("b")); + Assert.assertTrue(set.contains("c")); + + set.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + set.beginWindow(windowId); + + set.add("tt"); + set.add("ab"); + set.add("99"); + set.add("oo"); + + Assert.assertTrue(set.contains("tt")); + Assert.assertTrue(set.contains("ab")); + Assert.assertTrue(set.contains("99")); + Assert.assertTrue(set.contains("oo")); + + set.remove("ab"); + + Assert.assertTrue(set.contains("tt")); + Assert.assertFalse(set.contains("ab")); + Assert.assertTrue(set.contains("99")); + Assert.assertTrue(set.contains("oo")); + + set.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + set.beginWindow(windowId); + + set.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + set.teardown(); + store.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java new file mode 100644 index 0000000..e9903ec --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Iterator; +import java.util.Random; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; + +public class SpillableSetMultimapImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleMultiKeyTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleMultiKeyTestHelper(store); + } + + @Test + public void simpleMultiKeyManagedStateTest() + { + simpleMultiKeyTestHelper(testMeta.store); + } + + public void simpleMultiKeyTestHelper(SpillableStateStore store) + { + SpillableSetMultimapImpl<String, String> map = + new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long nextWindowId = 0L; + nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(1, map.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId); + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(2, map.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + simpleMultiKeyTestHelper(store, map, "c", nextWindowId); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(3, map.size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + } + + public long simpleMultiKeyTestHelper(SpillableStateStore store, + SpillableSetMultimapImpl<String, String> map, String key, long nextWindowId) + { + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertNull(map.get(key)); + + Assert.assertFalse(map.containsKey(key)); + + map.put(key, "a"); + + Assert.assertTrue(map.containsKey(key)); + + Set<String> set1 = map.get(key); + Assert.assertEquals(1, set1.size()); + Iterator<String> it = set1.iterator(); + + Assert.assertEquals("a", it.next()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + map.removeAll(key); + Assert.assertFalse(map.containsKey(key)); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertFalse(map.containsKey(key)); + map.put(key, "a"); + set1 = map.get(key); + Assert.assertEquals(1, set1.size()); + set1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g")); + Assert.assertEquals(7, set1.size()); + + Set<String> referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g"); + Assert.assertTrue(referenceSet.containsAll(set1)); + Assert.assertTrue(set1.containsAll(referenceSet)); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Set<String> set2 = map.get(key); + + Assert.assertEquals(7, set2.size()); + Assert.assertTrue(referenceSet.containsAll(set2)); + Assert.assertTrue(set2.containsAll(referenceSet)); + + set2.add("tt"); + set2.add("ab"); + set2.add("99"); + set2.add("oo"); + referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g", "tt", "ab", "99", "oo"); + Assert.assertTrue(referenceSet.containsAll(set2)); + Assert.assertTrue(set2.containsAll(referenceSet)); + + Assert.assertEquals(11, set2.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(11, set2.size()); + + map.endWindow(); + store.endWindow(); + + return nextWindowId; + } + + @Test + public void recoveryTestWithManagedState() + { + SpillableStateStore store = testMeta.store; + + SpillableSetMultimapImpl<String, String> map = + new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long nextWindowId = 0L; + nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); + long activationWindow = nextWindowId; + store.beforeCheckpoint(nextWindowId); + SpillableSetMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map); + store.checkpointed(nextWindowId); + store.committed(nextWindowId); + + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Set<String> set1 = map.get("a"); + + Assert.assertEquals(11, set1.size()); + + Set<String> referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g", "tt", "ab", "99", "oo"); + Assert.assertTrue(referenceSet.containsAll(set1)); + Assert.assertTrue(set1.containsAll(referenceSet)); + + set1.add("111"); + + Assert.assertTrue(set1.contains("111")); + + Assert.assertEquals(12, set1.size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + + map = clonedMap; + store = map.getStore(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + + store.setup(context); + map.setup(context); + + nextWindowId = activationWindow + 1; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(1, map.size()); + Assert.assertTrue(map.containsKey("a")); + Assert.assertEquals(11, map.get("a").size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + } + + @Test + public void testLoad() + { + Random random = new Random(); + final int keySize = 1000000; + final int valueSize = 100000000; + final int numOfEntry = 100000; + + SpillableStateStore store = testMeta.store; + + SpillableByteArrayListMultimapImpl<String, String> multimap = new SpillableByteArrayListMultimapImpl<>( + this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + store.setup(context); + multimap.setup(context); + + store.beginWindow(1); + multimap.beginWindow(1); + for (int i = 0; i < numOfEntry; ++i) { + multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize))); + } + multimap.endWindow(); + store.endWindow(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java new file mode 100644 index 0000000..b780f66 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import com.datatorrent.netlet.util.Slice; + +/** + * SerdeKryoSlice unit tests + */ +public class SerdeKryoSliceTest +{ + public static class TestPojo + { + private TestPojo() + { + } + + public TestPojo(int intValue, String stringValue) + { + this.intValue = intValue; + this.stringValue = stringValue; + } + + @Override + public boolean equals(Object other) + { + TestPojo o = (TestPojo)other; + return intValue == o.intValue && stringValue.equals(o.stringValue); + } + + int intValue; + String stringValue; + } + + @Test + public void stringListTest() + { + SerdeKryoSlice<ArrayList> serdeList = new SerdeKryoSlice<>(ArrayList.class); + + ArrayList<String> stringList = Lists.newArrayList("a", "b", "c"); + Slice slice = serdeList.serialize(stringList); + List<String> deserializedList = serdeList.deserialize(slice); + Assert.assertEquals(stringList, deserializedList); + } + + @Test + public void pojoTest() + { + SerdeKryoSlice<TestPojo> serdePojo = new SerdeKryoSlice<>(); + TestPojo pojo = new TestPojo(345, "xyz"); + Slice slice = serdePojo.serialize(pojo); + TestPojo deserializedPojo = serdePojo.deserialize(slice); + Assert.assertEquals(pojo, deserializedPojo); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java new file mode 100644 index 0000000..6684a9f --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import com.datatorrent.netlet.util.Slice; + +public class SerdePairSliceTest +{ + @Test + public void simpleSerdeTest() + { + SerdePairSlice<String, Integer> serdePair = new SerdePairSlice<>(new SerdeStringSlice(), new SerdeIntSlice()); + + Pair<String, Integer> pair = new ImmutablePair<>("abc", 123); + + Slice slice = serdePair.serialize(pair); + + Pair<String, Integer> deserializedPair = serdePair.deserialize(slice); + + Assert.assertEquals(pair, deserializedPair); + } +}