Repository: apex-malhar Updated Branches: refs/heads/master c19c80d88 -> f5f1943d2
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java deleted file mode 100644 index 22d317d..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java +++ /dev/null @@ -1,371 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable; - -import java.util.List; -import java.util.Random; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; - -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; -import org.apache.apex.malhar.lib.utils.serde.SliceUtils; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.util.KryoCloneUtils; -import com.datatorrent.netlet.util.Slice; - -public class SpillableByteArrayListMultimapImplTest -{ - public static final byte[] ID1 = new byte[]{(byte)0}; - - @Rule - public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); - - @Test - public void simpleMultiKeyTest() - { - InMemSpillableStateStore store = new InMemSpillableStateStore(); - - simpleMultiKeyTestHelper(store); - } - - @Test - public void simpleMultiKeyManagedStateTest() - { - simpleMultiKeyTestHelper(testMeta.store); - } - - public void simpleMultiKeyTestHelper(SpillableStateStore store) - { - SpillableByteArrayListMultimapImpl<String, String> map = - new SpillableByteArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(), - new SerdeStringSlice()); - - store.setup(testMeta.operatorContext); - map.setup(testMeta.operatorContext); - - long nextWindowId = 0L; - nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); - nextWindowId++; - - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - Assert.assertEquals(1, map.size()); - - map.endWindow(); - store.endWindow(); - - nextWindowId++; - nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId); - nextWindowId++; - - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - Assert.assertEquals(2, map.size()); - - map.endWindow(); - store.endWindow(); - - nextWindowId++; - simpleMultiKeyTestHelper(store, map, "c", nextWindowId); - - nextWindowId++; - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - Assert.assertEquals(3, map.size()); - - map.endWindow(); - store.endWindow(); - - map.teardown(); - store.teardown(); - } - - public long simpleMultiKeyTestHelper(SpillableStateStore store, - SpillableByteArrayListMultimapImpl<String, String> map, String key, long nextWindowId) - { - SerdeStringSlice serdeString = new SerdeStringSlice(); - SerdeIntSlice serdeInt = new SerdeIntSlice(); - - Slice keySlice = serdeString.serialize(key); - - byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); - - nextWindowId++; - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - Assert.assertNull(map.get(key)); - - Assert.assertFalse(map.containsKey(key)); - - map.put(key, "a"); - - Assert.assertTrue(map.containsKey(key)); - - List<String> list1 = map.get(key); - Assert.assertEquals(1, list1.size()); - - Assert.assertEquals("a", list1.get(0)); - - list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g")); - - Assert.assertEquals(8, list1.size()); - - Assert.assertEquals("a", list1.get(0)); - Assert.assertEquals("a", list1.get(1)); - Assert.assertEquals("b", list1.get(2)); - Assert.assertEquals("c", list1.get(3)); - Assert.assertEquals("d", list1.get(4)); - Assert.assertEquals("e", list1.get(5)); - Assert.assertEquals("f", list1.get(6)); - Assert.assertEquals("g", list1.get(7)); - - map.endWindow(); - store.endWindow(); - - nextWindowId++; - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - SpillableTestUtils.checkValue(store, 0L, - SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt); - - SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e", - "f", "g")); - - List<String> list2 = map.get(key); - - Assert.assertEquals(8, list2.size()); - - Assert.assertEquals("a", list2.get(0)); - Assert.assertEquals("a", list2.get(1)); - Assert.assertEquals("b", list2.get(2)); - Assert.assertEquals("c", list2.get(3)); - Assert.assertEquals("d", list2.get(4)); - Assert.assertEquals("e", list2.get(5)); - Assert.assertEquals("f", list2.get(6)); - Assert.assertEquals("g", list2.get(7)); - - list2.add("tt"); - list2.add("ab"); - list2.add("99"); - list2.add("oo"); - - Assert.assertEquals("tt", list2.get(8)); - Assert.assertEquals("ab", list2.get(9)); - Assert.assertEquals("99", list2.get(10)); - Assert.assertEquals("oo", list2.get(11)); - - Assert.assertEquals(12, list2.size()); - - map.endWindow(); - store.endWindow(); - - nextWindowId++; - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - Assert.assertEquals(12, list2.size()); - - SpillableTestUtils.checkValue(store, 0L, - SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt); - - SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e", - "f", "g", "tt", "ab", "99", "oo")); - - List<String> list3 = map.get(key); - - list3.set(1, "111"); - list3.set(3, "222"); - list3.set(5, "333"); - list3.set(11, "444"); - - Assert.assertEquals("a", list3.get(0)); - Assert.assertEquals("111", list3.get(1)); - Assert.assertEquals("b", list3.get(2)); - Assert.assertEquals("222", list3.get(3)); - Assert.assertEquals("d", list3.get(4)); - Assert.assertEquals("333", list3.get(5)); - Assert.assertEquals("f", list3.get(6)); - Assert.assertEquals("g", list3.get(7)); - Assert.assertEquals("tt", list3.get(8)); - Assert.assertEquals("ab", list3.get(9)); - Assert.assertEquals("99", list3.get(10)); - Assert.assertEquals("444", list3.get(11)); - - Assert.assertEquals(12, list2.size()); - - map.endWindow(); - store.endWindow(); - - nextWindowId++; - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - SpillableTestUtils.checkValue(store, 0L, - SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt); - - SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", "333", - "f", "g", "tt", "ab", "99", "444")); - - map.endWindow(); - store.endWindow(); - - return nextWindowId; - } - - @Test - public void recoveryTestWithManagedState() - { - SpillableStateStore store = testMeta.store; - - SpillableByteArrayListMultimapImpl<String, String> map = - new SpillableByteArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); - - store.setup(testMeta.operatorContext); - map.setup(testMeta.operatorContext); - - long nextWindowId = 0L; - nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); - long activationWindow = nextWindowId; - store.beforeCheckpoint(nextWindowId); - SpillableByteArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map); - store.checkpointed(nextWindowId); - store.committed(nextWindowId); - - nextWindowId++; - - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - List<String> list1 = map.get("a"); - - Assert.assertEquals(12, list1.size()); - - Assert.assertEquals("a", list1.get(0)); - Assert.assertEquals("111", list1.get(1)); - Assert.assertEquals("b", list1.get(2)); - Assert.assertEquals("222", list1.get(3)); - Assert.assertEquals("d", list1.get(4)); - Assert.assertEquals("333", list1.get(5)); - Assert.assertEquals("f", list1.get(6)); - Assert.assertEquals("g", list1.get(7)); - Assert.assertEquals("tt", list1.get(8)); - Assert.assertEquals("ab", list1.get(9)); - Assert.assertEquals("99", list1.get(10)); - Assert.assertEquals("444", list1.get(11)); - - list1.add("111"); - - Assert.assertEquals("a", list1.get(0)); - Assert.assertEquals("111", list1.get(1)); - Assert.assertEquals("b", list1.get(2)); - Assert.assertEquals("222", list1.get(3)); - Assert.assertEquals("d", list1.get(4)); - Assert.assertEquals("333", list1.get(5)); - Assert.assertEquals("f", list1.get(6)); - Assert.assertEquals("g", list1.get(7)); - Assert.assertEquals("tt", list1.get(8)); - Assert.assertEquals("ab", list1.get(9)); - Assert.assertEquals("99", list1.get(10)); - Assert.assertEquals("444", list1.get(11)); - Assert.assertEquals("111", list1.get(12)); - - Assert.assertEquals(13, list1.size()); - - map.endWindow(); - store.endWindow(); - - map.teardown(); - store.teardown(); - - map = clonedMap; - store = map.getStore(); - - Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); - attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); - attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow); - Context.OperatorContext context = - new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); - - store.setup(context); - map.setup(context); - nextWindowId = activationWindow + 1; - store.beginWindow(nextWindowId); - map.beginWindow(nextWindowId); - - SerdeStringSlice serdeString = new SerdeStringSlice(); - Slice keySlice = serdeString.serialize("a"); - byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); - - SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", - "333", "f", "g", "tt", "ab", "99", "444")); - - Assert.assertEquals(1, map.size()); - Assert.assertEquals(12, map.get("a").size()); - - map.endWindow(); - store.endWindow(); - - map.teardown(); - store.teardown(); - } - - @Test - public void testLoad() - { - Random random = new Random(); - final int keySize = 1000000; - final int valueSize = 100000000; - final int numOfEntry = 100000; - - SpillableStateStore store = testMeta.store; - - SpillableByteArrayListMultimapImpl<String, String> multimap = new SpillableByteArrayListMultimapImpl<>( - this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); - - Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); - attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); - Context.OperatorContext context = - new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); - store.setup(context); - multimap.setup(context); - - store.beginWindow(1); - multimap.beginWindow(1); - for (int i = 0; i < numOfEntry; ++i) { - multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize))); - } - multimap.endWindow(); - store.endWindow(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java deleted file mode 100644 index 63f7b79..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java +++ /dev/null @@ -1,484 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; - -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; - -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.util.KryoCloneUtils; - -public class SpillableByteMapImplTest -{ - public static final byte[] ID1 = new byte[]{(byte)0}; - public static final byte[] ID2 = new byte[]{(byte)1}; - - @Rule - public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); - - @Test - public void simpleGetAndPutTest() - { - InMemSpillableStateStore store = new InMemSpillableStateStore(); - - simpleGetAndPutTestHelper(store); - } - - @Test - public void simpleGetAndPutManagedStateTest() - { - simpleGetAndPutTestHelper(testMeta.store); - } - - private void simpleGetAndPutTestHelper(SpillableStateStore store) - { - SerdeStringSlice sss = new SerdeStringSlice(); - - SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); - - store.setup(testMeta.operatorContext); - map.setup(testMeta.operatorContext); - - long windowId = 0L; - store.beginWindow(windowId); - map.beginWindow(windowId); - - Assert.assertEquals(0, map.size()); - - map.put("a", "1"); - map.put("b", "2"); - map.put("c", "3"); - - Assert.assertEquals(3, map.size()); - - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals("2", map.get("b")); - Assert.assertEquals("3", map.get("c")); - Assert.assertEquals(null, map.get("d")); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - - map.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - store.committed(windowId); - - windowId++; - store.beginWindow(windowId); - map.beginWindow(windowId); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - - Assert.assertEquals(3, map.size()); - - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals("2", map.get("b")); - Assert.assertEquals("3", map.get("c")); - Assert.assertEquals(null, map.get("d")); - - map.put("d", "4"); - map.put("e", "5"); - map.put("f", "6"); - - Assert.assertEquals(6, map.size()); - - Assert.assertEquals("4", map.get("d")); - Assert.assertEquals("5", map.get("e")); - Assert.assertEquals("6", map.get("f")); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); - - map.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - store.committed(windowId); - - windowId++; - store.beginWindow(windowId); - map.beginWindow(windowId); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); - - map.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - store.committed(windowId); - - map.teardown(); - store.teardown(); - } - - @Test - public void simpleRemoveTest() - { - InMemSpillableStateStore store = new InMemSpillableStateStore(); - - simpleRemoveTestHelper(store); - } - - @Test - public void simpleRemoveManagedStateTest() - { - simpleRemoveTestHelper(testMeta.store); - } - - private void simpleRemoveTestHelper(SpillableStateStore store) - { - SerdeStringSlice sss = new SerdeStringSlice(); - - SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); - - store.setup(testMeta.operatorContext); - map.setup(testMeta.operatorContext); - - long windowId = 0L; - store.beginWindow(windowId); - map.beginWindow(windowId); - - Assert.assertEquals(0, map.size()); - - map.put("a", "1"); - map.put("b", "2"); - map.put("c", "3"); - - Assert.assertEquals(3, map.size()); - - map.remove("b"); - map.remove("c"); - - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals(null, map.get("b")); - Assert.assertEquals(null, map.get("c")); - Assert.assertEquals(null, map.get("d")); - - Assert.assertEquals(1, map.size()); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - - map.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - store.committed(windowId); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - - windowId++; - store.beginWindow(windowId); - map.beginWindow(windowId); - - Assert.assertEquals(1, map.size()); - - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals(null, map.get("b")); - Assert.assertEquals(null, map.get("c")); - Assert.assertEquals(null, map.get("d")); - - map.put("d", "4"); - map.put("e", "5"); - map.put("f", "6"); - - Assert.assertEquals(4, map.size()); - - Assert.assertEquals("4", map.get("d")); - Assert.assertEquals("5", map.get("e")); - Assert.assertEquals("6", map.get("f")); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); - - map.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - store.committed(windowId); - - windowId++; - store.beginWindow(windowId); - map.beginWindow(windowId); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); - - map.remove("a"); - map.remove("d"); - Assert.assertEquals(null, map.get("a")); - Assert.assertEquals(null, map.get("b")); - Assert.assertEquals(null, map.get("c")); - Assert.assertEquals(null, map.get("d")); - Assert.assertEquals("5", map.get("e")); - Assert.assertEquals("6", map.get("f")); - Assert.assertEquals(null, map.get("g")); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); - - map.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - store.committed(windowId); - - windowId++; - store.beginWindow(windowId); - map.beginWindow(windowId); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); - - map.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - store.committed(windowId); - - map.teardown(); - store.teardown(); - } - - @Test - public void multiMapPerBucketTest() - { - InMemSpillableStateStore store = new InMemSpillableStateStore(); - - multiMapPerBucketTestHelper(store); - } - - @Test - public void multiMapPerBucketManagedStateTest() - { - multiMapPerBucketTestHelper(testMeta.store); - } - - public void multiMapPerBucketTestHelper(SpillableStateStore store) - { - SerdeStringSlice sss = new SerdeStringSlice(); - - SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); - SpillableByteMapImpl<String, String> map2 = new SpillableByteMapImpl<>(store, ID2, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); - - store.setup(testMeta.operatorContext); - map1.setup(testMeta.operatorContext); - map2.setup(testMeta.operatorContext); - - long windowId = 0L; - store.beginWindow(windowId); - map1.beginWindow(windowId); - map2.beginWindow(windowId); - - map1.put("a", "1"); - - Assert.assertEquals("1", map1.get("a")); - Assert.assertEquals(null, map2.get("a")); - - map2.put("a", "a1"); - - Assert.assertEquals("1", map1.get("a")); - Assert.assertEquals("a1", map2.get("a")); - - map1.put("b", "2"); - map2.put("c", "3"); - - Assert.assertEquals("1", map1.get("a")); - Assert.assertEquals("2", map1.get("b")); - - Assert.assertEquals("a1", map2.get("a")); - Assert.assertEquals(null, map2.get("b")); - Assert.assertEquals("3", map2.get("c")); - - map1.endWindow(); - map2.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - - windowId++; - store.beginWindow(windowId); - map1.beginWindow(windowId); - map2.beginWindow(windowId); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); - - SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID2, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3"); - - map1.remove("a"); - - Assert.assertEquals(null, map1.get("a")); - Assert.assertEquals("a1", map2.get("a")); - - map1.endWindow(); - map2.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - - windowId++; - store.beginWindow(windowId); - map1.beginWindow(windowId); - map2.beginWindow(windowId); - - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); - - map1.endWindow(); - map2.endWindow(); - store.endWindow(); - store.beforeCheckpoint(windowId); - store.checkpointed(windowId); - - map1.teardown(); - map2.teardown(); - store.teardown(); - } - - @Test - public void recoveryWithManagedStateTest() throws Exception - { - SerdeStringSlice sss = new SerdeStringSlice(); - - SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(testMeta.store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); - - testMeta.store.setup(testMeta.operatorContext); - map1.setup(testMeta.operatorContext); - - testMeta.store.beginWindow(0); - map1.beginWindow(0); - map1.put("x", "1"); - map1.put("y", "2"); - map1.put("z", "3"); - map1.put("zz", "33"); - Assert.assertEquals(4, map1.size()); - map1.endWindow(); - testMeta.store.endWindow(); - - testMeta.store.beginWindow(1); - map1.beginWindow(1); - Assert.assertEquals(4, map1.size()); - map1.put("x", "4"); - map1.put("y", "5"); - map1.remove("zz"); - Assert.assertEquals(3, map1.size()); - map1.endWindow(); - testMeta.store.endWindow(); - testMeta.store.beforeCheckpoint(1); - testMeta.store.checkpointed(1); - - SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1); - - testMeta.store.beginWindow(2); - map1.beginWindow(2); - Assert.assertEquals(3, map1.size()); - map1.put("x", "6"); - map1.put("y", "7"); - map1.put("w", "8"); - Assert.assertEquals(4, map1.size()); - map1.endWindow(); - testMeta.store.endWindow(); - - // simulating crash here - map1.teardown(); - testMeta.store.teardown(); - - Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); - attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); - attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); - Context.OperatorContext context = - new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); - - map1 = clonedMap1; - map1.getStore().setup(context); - map1.setup(testMeta.operatorContext); - - map1.getStore().beginWindow(2); - map1.beginWindow(2); - Assert.assertEquals(3, map1.size()); - Assert.assertEquals("4", map1.get("x")); - Assert.assertEquals("5", map1.get("y")); - Assert.assertEquals("3", map1.get("z")); - map1.endWindow(); - map1.getStore().endWindow(); - - map1.teardown(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java index 96855e0..5c477b1 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java @@ -50,7 +50,7 @@ public class SpillableComplexComponentImplTest Spillable.SpillableComponent scList = (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new SerdeStringSlice()); Spillable.SpillableComponent scMap = - (Spillable.SpillableComponent)sccImpl.newSpillableByteMap(0L, new SerdeStringSlice(), new SerdeStringSlice()); + (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new SerdeStringSlice(), new SerdeStringSlice()); sccImpl.setup(testMeta.operatorContext); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java new file mode 100644 index 0000000..e8aea46 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; + +public class SpillableMapImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + public static final byte[] ID2 = new byte[]{(byte)1}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleGetAndPutTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleGetAndPutTestHelper(store); + } + + @Test + public void simpleGetAndPutManagedStateTest() + { + simpleGetAndPutTestHelper(testMeta.store); + } + + private void simpleGetAndPutTestHelper(SpillableStateStore store) + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(0, map.size()); + + map.put("a", "1"); + map.put("b", "2"); + map.put("c", "3"); + + Assert.assertEquals(3, map.size()); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals("2", map.get("b")); + Assert.assertEquals("3", map.get("c")); + Assert.assertEquals(null, map.get("d")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + Assert.assertEquals(3, map.size()); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals("2", map.get("b")); + Assert.assertEquals("3", map.get("c")); + Assert.assertEquals(null, map.get("d")); + + map.put("d", "4"); + map.put("e", "5"); + map.put("f", "6"); + + Assert.assertEquals(6, map.size()); + + Assert.assertEquals("4", map.get("d")); + Assert.assertEquals("5", map.get("e")); + Assert.assertEquals("6", map.get("f")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + map.teardown(); + store.teardown(); + } + + @Test + public void simpleRemoveTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleRemoveTestHelper(store); + } + + @Test + public void simpleRemoveManagedStateTest() + { + simpleRemoveTestHelper(testMeta.store); + } + + private void simpleRemoveTestHelper(SpillableStateStore store) + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(0, map.size()); + + map.put("a", "1"); + map.put("b", "2"); + map.put("c", "3"); + + Assert.assertEquals(3, map.size()); + + map.remove("b"); + map.remove("c"); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals(null, map.get("b")); + Assert.assertEquals(null, map.get("c")); + Assert.assertEquals(null, map.get("d")); + + Assert.assertEquals(1, map.size()); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(1, map.size()); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals(null, map.get("b")); + Assert.assertEquals(null, map.get("c")); + Assert.assertEquals(null, map.get("d")); + + map.put("d", "4"); + map.put("e", "5"); + map.put("f", "6"); + + Assert.assertEquals(4, map.size()); + + Assert.assertEquals("4", map.get("d")); + Assert.assertEquals("5", map.get("e")); + Assert.assertEquals("6", map.get("f")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.remove("a"); + map.remove("d"); + Assert.assertEquals(null, map.get("a")); + Assert.assertEquals(null, map.get("b")); + Assert.assertEquals(null, map.get("c")); + Assert.assertEquals(null, map.get("d")); + Assert.assertEquals("5", map.get("e")); + Assert.assertEquals("6", map.get("f")); + Assert.assertEquals(null, map.get("g")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + map.teardown(); + store.teardown(); + } + + @Test + public void multiMapPerBucketTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + multiMapPerBucketTestHelper(store); + } + + @Test + public void multiMapPerBucketManagedStateTest() + { + multiMapPerBucketTestHelper(testMeta.store); + } + + public void multiMapPerBucketTestHelper(SpillableStateStore store) + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map1.setup(testMeta.operatorContext); + map2.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + map1.put("a", "1"); + + Assert.assertEquals("1", map1.get("a")); + Assert.assertEquals(null, map2.get("a")); + + map2.put("a", "a1"); + + Assert.assertEquals("1", map1.get("a")); + Assert.assertEquals("a1", map2.get("a")); + + map1.put("b", "2"); + map2.put("c", "3"); + + Assert.assertEquals("1", map1.get("a")); + Assert.assertEquals("2", map1.get("b")); + + Assert.assertEquals("a1", map2.get("a")); + Assert.assertEquals(null, map2.get("b")); + Assert.assertEquals("3", map2.get("c")); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + windowId++; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + + SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID2, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3"); + + map1.remove("a"); + + Assert.assertEquals(null, map1.get("a")); + Assert.assertEquals("a1", map2.get("a")); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + windowId++; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + map1.teardown(); + map2.teardown(); + store.teardown(); + } + + @Test + public void recoveryWithManagedStateTest() throws Exception + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(testMeta.store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + testMeta.store.setup(testMeta.operatorContext); + map1.setup(testMeta.operatorContext); + + testMeta.store.beginWindow(0); + map1.beginWindow(0); + map1.put("x", "1"); + map1.put("y", "2"); + map1.put("z", "3"); + map1.put("zz", "33"); + Assert.assertEquals(4, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + + testMeta.store.beginWindow(1); + map1.beginWindow(1); + Assert.assertEquals(4, map1.size()); + map1.put("x", "4"); + map1.put("y", "5"); + map1.remove("zz"); + Assert.assertEquals(3, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + testMeta.store.beforeCheckpoint(1); + testMeta.store.checkpointed(1); + + SpillableMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1); + + testMeta.store.beginWindow(2); + map1.beginWindow(2); + Assert.assertEquals(3, map1.size()); + map1.put("x", "6"); + map1.put("y", "7"); + map1.put("w", "8"); + Assert.assertEquals(4, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + + // simulating crash here + map1.teardown(); + testMeta.store.teardown(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + + map1 = clonedMap1; + map1.getStore().setup(context); + map1.setup(testMeta.operatorContext); + + map1.getStore().beginWindow(2); + map1.beginWindow(2); + Assert.assertEquals(3, map1.size()); + Assert.assertEquals("4", map1.get("x")); + Assert.assertEquals("5", map1.get("y")); + Assert.assertEquals("3", map1.get("z")); + map1.endWindow(); + map1.getStore().endWindow(); + + map1.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java index 6b188e4..15970af 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java @@ -276,8 +276,7 @@ public class SpillableSetMultimapImplTest final int numOfEntry = 100000; SpillableStateStore store = testMeta.store; - - SpillableByteArrayListMultimapImpl<String, String> multimap = new SpillableByteArrayListMultimapImpl<>( + SpillableSetMultimapImpl<String, String> multimap = new SpillableSetMultimapImpl<>( this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index 4edbcd0..bc5d80f 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -91,7 +91,7 @@ public class WindowedOperatorTest WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>(); if (useSpillable) { sccImpl = new SpillableComplexComponentImpl(testMeta.store); - // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys. + // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys. windowStateStorage = new InMemoryWindowedStorage<>(); SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>(); pds.setSpillableComplexComponent(sccImpl); @@ -117,7 +117,7 @@ public class WindowedOperatorTest KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); if (useSpillable) { sccImpl = new SpillableComplexComponentImpl(testMeta.store); - // TODO: We don't yet support Spillable data structures for window state storage because SpillableByteMapImpl does not yet support iterating over all keys. + // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys. windowStateStorage = new InMemoryWindowedStorage<>(); if (forSession) { SpillableSessionWindowedStorage<String, MutableLong> sws = new SpillableSessionWindowedStorage<>();