Repository: apex-malhar
Updated Branches:
  refs/heads/master c19c80d88 -> f5f1943d2


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
deleted file mode 100644
index 22d317d..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable;
-
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-
-import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.util.KryoCloneUtils;
-import com.datatorrent.netlet.util.Slice;
-
-public class SpillableByteArrayListMultimapImplTest
-{
-  public static final byte[] ID1 = new byte[]{(byte)0};
-
-  @Rule
-  public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
-
-  @Test
-  public void simpleMultiKeyTest()
-  {
-    InMemSpillableStateStore store = new InMemSpillableStateStore();
-
-    simpleMultiKeyTestHelper(store);
-  }
-
-  @Test
-  public void simpleMultiKeyManagedStateTest()
-  {
-    simpleMultiKeyTestHelper(testMeta.store);
-  }
-
-  public void simpleMultiKeyTestHelper(SpillableStateStore store)
-  {
-    SpillableByteArrayListMultimapImpl<String, String> map =
-        new SpillableByteArrayListMultimapImpl<String, String>(store, ID1, 0L, 
new SerdeStringSlice(),
-        new SerdeStringSlice());
-
-    store.setup(testMeta.operatorContext);
-    map.setup(testMeta.operatorContext);
-
-    long nextWindowId = 0L;
-    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
-    nextWindowId++;
-
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    Assert.assertEquals(1, map.size());
-
-    map.endWindow();
-    store.endWindow();
-
-    nextWindowId++;
-    nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId);
-    nextWindowId++;
-
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    Assert.assertEquals(2, map.size());
-
-    map.endWindow();
-    store.endWindow();
-
-    nextWindowId++;
-    simpleMultiKeyTestHelper(store, map, "c", nextWindowId);
-
-    nextWindowId++;
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    Assert.assertEquals(3, map.size());
-
-    map.endWindow();
-    store.endWindow();
-
-    map.teardown();
-    store.teardown();
-  }
-
-  public long simpleMultiKeyTestHelper(SpillableStateStore store,
-      SpillableByteArrayListMultimapImpl<String, String> map, String key, long 
nextWindowId)
-  {
-    SerdeStringSlice serdeString = new SerdeStringSlice();
-    SerdeIntSlice serdeInt = new SerdeIntSlice();
-
-    Slice keySlice = serdeString.serialize(key);
-
-    byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
-
-    nextWindowId++;
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    Assert.assertNull(map.get(key));
-
-    Assert.assertFalse(map.containsKey(key));
-
-    map.put(key, "a");
-
-    Assert.assertTrue(map.containsKey(key));
-
-    List<String> list1 = map.get(key);
-    Assert.assertEquals(1, list1.size());
-
-    Assert.assertEquals("a", list1.get(0));
-
-    list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
-
-    Assert.assertEquals(8, list1.size());
-
-    Assert.assertEquals("a", list1.get(0));
-    Assert.assertEquals("a", list1.get(1));
-    Assert.assertEquals("b", list1.get(2));
-    Assert.assertEquals("c", list1.get(3));
-    Assert.assertEquals("d", list1.get(4));
-    Assert.assertEquals("e", list1.get(5));
-    Assert.assertEquals("f", list1.get(6));
-    Assert.assertEquals("g", list1.get(7));
-
-    map.endWindow();
-    store.endWindow();
-
-    nextWindowId++;
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    SpillableTestUtils.checkValue(store, 0L,
-        SliceUtils.concatenate(keyBytes, 
SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt);
-
-    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, 
Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
-        "f", "g"));
-
-    List<String> list2 = map.get(key);
-
-    Assert.assertEquals(8, list2.size());
-
-    Assert.assertEquals("a", list2.get(0));
-    Assert.assertEquals("a", list2.get(1));
-    Assert.assertEquals("b", list2.get(2));
-    Assert.assertEquals("c", list2.get(3));
-    Assert.assertEquals("d", list2.get(4));
-    Assert.assertEquals("e", list2.get(5));
-    Assert.assertEquals("f", list2.get(6));
-    Assert.assertEquals("g", list2.get(7));
-
-    list2.add("tt");
-    list2.add("ab");
-    list2.add("99");
-    list2.add("oo");
-
-    Assert.assertEquals("tt", list2.get(8));
-    Assert.assertEquals("ab", list2.get(9));
-    Assert.assertEquals("99", list2.get(10));
-    Assert.assertEquals("oo", list2.get(11));
-
-    Assert.assertEquals(12, list2.size());
-
-    map.endWindow();
-    store.endWindow();
-
-    nextWindowId++;
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    Assert.assertEquals(12, list2.size());
-
-    SpillableTestUtils.checkValue(store, 0L,
-        SliceUtils.concatenate(keyBytes, 
SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
-
-    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, 
Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
-        "f", "g", "tt", "ab", "99", "oo"));
-
-    List<String> list3 = map.get(key);
-
-    list3.set(1, "111");
-    list3.set(3, "222");
-    list3.set(5, "333");
-    list3.set(11, "444");
-
-    Assert.assertEquals("a", list3.get(0));
-    Assert.assertEquals("111", list3.get(1));
-    Assert.assertEquals("b", list3.get(2));
-    Assert.assertEquals("222", list3.get(3));
-    Assert.assertEquals("d", list3.get(4));
-    Assert.assertEquals("333", list3.get(5));
-    Assert.assertEquals("f", list3.get(6));
-    Assert.assertEquals("g", list3.get(7));
-    Assert.assertEquals("tt", list3.get(8));
-    Assert.assertEquals("ab", list3.get(9));
-    Assert.assertEquals("99", list3.get(10));
-    Assert.assertEquals("444", list3.get(11));
-
-    Assert.assertEquals(12, list2.size());
-
-    map.endWindow();
-    store.endWindow();
-
-    nextWindowId++;
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    SpillableTestUtils.checkValue(store, 0L,
-        SliceUtils.concatenate(keyBytes, 
SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
-
-    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, 
Lists.<String>newArrayList("a", "111", "b", "222", "d", "333",
-        "f", "g", "tt", "ab", "99", "444"));
-
-    map.endWindow();
-    store.endWindow();
-
-    return nextWindowId;
-  }
-
-  @Test
-  public void recoveryTestWithManagedState()
-  {
-    SpillableStateStore store = testMeta.store;
-
-    SpillableByteArrayListMultimapImpl<String, String> map =
-        new SpillableByteArrayListMultimapImpl<>(store, ID1, 0L, new 
SerdeStringSlice(), new SerdeStringSlice());
-
-    store.setup(testMeta.operatorContext);
-    map.setup(testMeta.operatorContext);
-
-    long nextWindowId = 0L;
-    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
-    long activationWindow = nextWindowId;
-    store.beforeCheckpoint(nextWindowId);
-    SpillableByteArrayListMultimapImpl<String, String> clonedMap = 
KryoCloneUtils.cloneObject(map);
-    store.checkpointed(nextWindowId);
-    store.committed(nextWindowId);
-
-    nextWindowId++;
-
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    List<String> list1 = map.get("a");
-
-    Assert.assertEquals(12, list1.size());
-
-    Assert.assertEquals("a", list1.get(0));
-    Assert.assertEquals("111", list1.get(1));
-    Assert.assertEquals("b", list1.get(2));
-    Assert.assertEquals("222", list1.get(3));
-    Assert.assertEquals("d", list1.get(4));
-    Assert.assertEquals("333", list1.get(5));
-    Assert.assertEquals("f", list1.get(6));
-    Assert.assertEquals("g", list1.get(7));
-    Assert.assertEquals("tt", list1.get(8));
-    Assert.assertEquals("ab", list1.get(9));
-    Assert.assertEquals("99", list1.get(10));
-    Assert.assertEquals("444", list1.get(11));
-
-    list1.add("111");
-
-    Assert.assertEquals("a", list1.get(0));
-    Assert.assertEquals("111", list1.get(1));
-    Assert.assertEquals("b", list1.get(2));
-    Assert.assertEquals("222", list1.get(3));
-    Assert.assertEquals("d", list1.get(4));
-    Assert.assertEquals("333", list1.get(5));
-    Assert.assertEquals("f", list1.get(6));
-    Assert.assertEquals("g", list1.get(7));
-    Assert.assertEquals("tt", list1.get(8));
-    Assert.assertEquals("ab", list1.get(9));
-    Assert.assertEquals("99", list1.get(10));
-    Assert.assertEquals("444", list1.get(11));
-    Assert.assertEquals("111", list1.get(12));
-
-    Assert.assertEquals(13, list1.size());
-
-    map.endWindow();
-    store.endWindow();
-
-    map.teardown();
-    store.teardown();
-
-    map = clonedMap;
-    store = map.getStore();
-
-    Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
-    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 
activationWindow);
-    Context.OperatorContext context =
-        new 
OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(),
 attributes);
-
-    store.setup(context);
-    map.setup(context);
-    nextWindowId = activationWindow + 1;
-    store.beginWindow(nextWindowId);
-    map.beginWindow(nextWindowId);
-
-    SerdeStringSlice serdeString = new SerdeStringSlice();
-    Slice keySlice = serdeString.serialize("a");
-    byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
-
-    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, 
Lists.<String>newArrayList("a", "111", "b", "222", "d",
-        "333", "f", "g", "tt", "ab", "99", "444"));
-
-    Assert.assertEquals(1, map.size());
-    Assert.assertEquals(12, map.get("a").size());
-
-    map.endWindow();
-    store.endWindow();
-
-    map.teardown();
-    store.teardown();
-  }
-
-  @Test
-  public void testLoad()
-  {
-    Random random = new Random();
-    final int keySize = 1000000;
-    final int valueSize = 100000000;
-    final int numOfEntry = 100000;
-
-    SpillableStateStore store = testMeta.store;
-
-    SpillableByteArrayListMultimapImpl<String, String> multimap = new 
SpillableByteArrayListMultimapImpl<>(
-        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new 
SerdeStringSlice());
-
-    Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
-    Context.OperatorContext context =
-        new 
OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(),
 attributes);
-    store.setup(context);
-    multimap.setup(context);
-
-    store.beginWindow(1);
-    multimap.beginWindow(1);
-    for (int i = 0; i < numOfEntry; ++i) {
-      multimap.put(String.valueOf(random.nextInt(keySize)), 
String.valueOf(random.nextInt(valueSize)));
-    }
-    multimap.endWindow();
-    store.endWindow();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
deleted file mode 100644
index 63f7b79..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-
-import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class SpillableByteMapImplTest
-{
-  public static final byte[] ID1 = new byte[]{(byte)0};
-  public static final byte[] ID2 = new byte[]{(byte)1};
-
-  @Rule
-  public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
-
-  @Test
-  public void simpleGetAndPutTest()
-  {
-    InMemSpillableStateStore store = new InMemSpillableStateStore();
-
-    simpleGetAndPutTestHelper(store);
-  }
-
-  @Test
-  public void simpleGetAndPutManagedStateTest()
-  {
-    simpleGetAndPutTestHelper(testMeta.store);
-  }
-
-  private void simpleGetAndPutTestHelper(SpillableStateStore store)
-  {
-    SerdeStringSlice sss = new SerdeStringSlice();
-
-    SpillableByteMapImpl<String, String> map = new 
SpillableByteMapImpl<>(store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
-
-    store.setup(testMeta.operatorContext);
-    map.setup(testMeta.operatorContext);
-
-    long windowId = 0L;
-    store.beginWindow(windowId);
-    map.beginWindow(windowId);
-
-    Assert.assertEquals(0, map.size());
-
-    map.put("a", "1");
-    map.put("b", "2");
-    map.put("c", "3");
-
-    Assert.assertEquals(3, map.size());
-
-    Assert.assertEquals("1", map.get("a"));
-    Assert.assertEquals("2", map.get("b"));
-    Assert.assertEquals("3", map.get("c"));
-    Assert.assertEquals(null, map.get("d"));
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-
-    map.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-    store.committed(windowId);
-
-    windowId++;
-    store.beginWindow(windowId);
-    map.beginWindow(windowId);
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-
-    Assert.assertEquals(3, map.size());
-
-    Assert.assertEquals("1", map.get("a"));
-    Assert.assertEquals("2", map.get("b"));
-    Assert.assertEquals("3", map.get("c"));
-    Assert.assertEquals(null, map.get("d"));
-
-    map.put("d", "4");
-    map.put("e", "5");
-    map.put("f", "6");
-
-    Assert.assertEquals(6, map.size());
-
-    Assert.assertEquals("4", map.get("d"));
-    Assert.assertEquals("5", map.get("e"));
-    Assert.assertEquals("6", map.get("f"));
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
-
-    map.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-    store.committed(windowId);
-
-    windowId++;
-    store.beginWindow(windowId);
-    map.beginWindow(windowId);
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
-
-    map.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-    store.committed(windowId);
-
-    map.teardown();
-    store.teardown();
-  }
-
-  @Test
-  public void simpleRemoveTest()
-  {
-    InMemSpillableStateStore store = new InMemSpillableStateStore();
-
-    simpleRemoveTestHelper(store);
-  }
-
-  @Test
-  public void simpleRemoveManagedStateTest()
-  {
-    simpleRemoveTestHelper(testMeta.store);
-  }
-
-  private void simpleRemoveTestHelper(SpillableStateStore store)
-  {
-    SerdeStringSlice sss = new SerdeStringSlice();
-
-    SpillableByteMapImpl<String, String> map = new 
SpillableByteMapImpl<>(store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
-
-    store.setup(testMeta.operatorContext);
-    map.setup(testMeta.operatorContext);
-
-    long windowId = 0L;
-    store.beginWindow(windowId);
-    map.beginWindow(windowId);
-
-    Assert.assertEquals(0, map.size());
-
-    map.put("a", "1");
-    map.put("b", "2");
-    map.put("c", "3");
-
-    Assert.assertEquals(3, map.size());
-
-    map.remove("b");
-    map.remove("c");
-
-    Assert.assertEquals("1", map.get("a"));
-    Assert.assertEquals(null, map.get("b"));
-    Assert.assertEquals(null, map.get("c"));
-    Assert.assertEquals(null, map.get("d"));
-
-    Assert.assertEquals(1, map.size());
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-
-    map.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-    store.committed(windowId);
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-
-    windowId++;
-    store.beginWindow(windowId);
-    map.beginWindow(windowId);
-
-    Assert.assertEquals(1, map.size());
-
-    Assert.assertEquals("1", map.get("a"));
-    Assert.assertEquals(null, map.get("b"));
-    Assert.assertEquals(null, map.get("c"));
-    Assert.assertEquals(null, map.get("d"));
-
-    map.put("d", "4");
-    map.put("e", "5");
-    map.put("f", "6");
-
-    Assert.assertEquals(4, map.size());
-
-    Assert.assertEquals("4", map.get("d"));
-    Assert.assertEquals("5", map.get("e"));
-    Assert.assertEquals("6", map.get("f"));
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
-
-    map.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-    store.committed(windowId);
-
-    windowId++;
-    store.beginWindow(windowId);
-    map.beginWindow(windowId);
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
-
-    map.remove("a");
-    map.remove("d");
-    Assert.assertEquals(null, map.get("a"));
-    Assert.assertEquals(null, map.get("b"));
-    Assert.assertEquals(null, map.get("c"));
-    Assert.assertEquals(null, map.get("d"));
-    Assert.assertEquals("5", map.get("e"));
-    Assert.assertEquals("6", map.get("f"));
-    Assert.assertEquals(null, map.get("g"));
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
-
-    map.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-    store.committed(windowId);
-
-    windowId++;
-    store.beginWindow(windowId);
-    map.beginWindow(windowId);
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
-    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
-    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
-
-    map.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-    store.committed(windowId);
-
-    map.teardown();
-    store.teardown();
-  }
-
-  @Test
-  public void multiMapPerBucketTest()
-  {
-    InMemSpillableStateStore store = new InMemSpillableStateStore();
-
-    multiMapPerBucketTestHelper(store);
-  }
-
-  @Test
-  public void multiMapPerBucketManagedStateTest()
-  {
-    multiMapPerBucketTestHelper(testMeta.store);
-  }
-
-  public void multiMapPerBucketTestHelper(SpillableStateStore store)
-  {
-    SerdeStringSlice sss = new SerdeStringSlice();
-
-    SpillableByteMapImpl<String, String> map1 = new 
SpillableByteMapImpl<>(store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
-    SpillableByteMapImpl<String, String> map2 = new 
SpillableByteMapImpl<>(store, ID2, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
-
-    store.setup(testMeta.operatorContext);
-    map1.setup(testMeta.operatorContext);
-    map2.setup(testMeta.operatorContext);
-
-    long windowId = 0L;
-    store.beginWindow(windowId);
-    map1.beginWindow(windowId);
-    map2.beginWindow(windowId);
-
-    map1.put("a", "1");
-
-    Assert.assertEquals("1", map1.get("a"));
-    Assert.assertEquals(null, map2.get("a"));
-
-    map2.put("a", "a1");
-
-    Assert.assertEquals("1", map1.get("a"));
-    Assert.assertEquals("a1", map2.get("a"));
-
-    map1.put("b", "2");
-    map2.put("c", "3");
-
-    Assert.assertEquals("1", map1.get("a"));
-    Assert.assertEquals("2", map1.get("b"));
-
-    Assert.assertEquals("a1", map2.get("a"));
-    Assert.assertEquals(null, map2.get("b"));
-    Assert.assertEquals("3", map2.get("c"));
-
-    map1.endWindow();
-    map2.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-
-    windowId++;
-    store.beginWindow(windowId);
-    map1.beginWindow(windowId);
-    map2.beginWindow(windowId);
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
-    SpillableTestUtils.checkValue(store, 0L, "b", ID2, null);
-    SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3");
-
-    map1.remove("a");
-
-    Assert.assertEquals(null, map1.get("a"));
-    Assert.assertEquals("a1", map2.get("a"));
-
-    map1.endWindow();
-    map2.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-
-    windowId++;
-    store.beginWindow(windowId);
-    map1.beginWindow(windowId);
-    map2.beginWindow(windowId);
-
-    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
-    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
-
-    map1.endWindow();
-    map2.endWindow();
-    store.endWindow();
-    store.beforeCheckpoint(windowId);
-    store.checkpointed(windowId);
-
-    map1.teardown();
-    map2.teardown();
-    store.teardown();
-  }
-
-  @Test
-  public void recoveryWithManagedStateTest() throws Exception
-  {
-    SerdeStringSlice sss = new SerdeStringSlice();
-
-    SpillableByteMapImpl<String, String> map1 = new 
SpillableByteMapImpl<>(testMeta.store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
-
-    testMeta.store.setup(testMeta.operatorContext);
-    map1.setup(testMeta.operatorContext);
-
-    testMeta.store.beginWindow(0);
-    map1.beginWindow(0);
-    map1.put("x", "1");
-    map1.put("y", "2");
-    map1.put("z", "3");
-    map1.put("zz", "33");
-    Assert.assertEquals(4, map1.size());
-    map1.endWindow();
-    testMeta.store.endWindow();
-
-    testMeta.store.beginWindow(1);
-    map1.beginWindow(1);
-    Assert.assertEquals(4, map1.size());
-    map1.put("x", "4");
-    map1.put("y", "5");
-    map1.remove("zz");
-    Assert.assertEquals(3, map1.size());
-    map1.endWindow();
-    testMeta.store.endWindow();
-    testMeta.store.beforeCheckpoint(1);
-    testMeta.store.checkpointed(1);
-
-    SpillableByteMapImpl<String, String> clonedMap1 = 
KryoCloneUtils.cloneObject(map1);
-
-    testMeta.store.beginWindow(2);
-    map1.beginWindow(2);
-    Assert.assertEquals(3, map1.size());
-    map1.put("x", "6");
-    map1.put("y", "7");
-    map1.put("w", "8");
-    Assert.assertEquals(4, map1.size());
-    map1.endWindow();
-    testMeta.store.endWindow();
-
-    // simulating crash here
-    map1.teardown();
-    testMeta.store.teardown();
-
-    Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
-    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
-    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
-    Context.OperatorContext context =
-        new 
OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(),
 attributes);
-
-    map1 = clonedMap1;
-    map1.getStore().setup(context);
-    map1.setup(testMeta.operatorContext);
-
-    map1.getStore().beginWindow(2);
-    map1.beginWindow(2);
-    Assert.assertEquals(3, map1.size());
-    Assert.assertEquals("4", map1.get("x"));
-    Assert.assertEquals("5", map1.get("y"));
-    Assert.assertEquals("3", map1.get("z"));
-    map1.endWindow();
-    map1.getStore().endWindow();
-
-    map1.teardown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
index 96855e0..5c477b1 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
@@ -50,7 +50,7 @@ public class SpillableComplexComponentImplTest
     Spillable.SpillableComponent scList =
         (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new 
SerdeStringSlice());
     Spillable.SpillableComponent scMap =
-        (Spillable.SpillableComponent)sccImpl.newSpillableByteMap(0L, new 
SerdeStringSlice(), new SerdeStringSlice());
+        (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new 
SerdeStringSlice(), new SerdeStringSlice());
 
     sccImpl.setup(testMeta.operatorContext);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
new file mode 100644
index 0000000..e8aea46
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class SpillableMapImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+  public static final byte[] ID2 = new byte[]{(byte)1};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleGetAndPutTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleGetAndPutTestHelper(store);
+  }
+
+  @Test
+  public void simpleGetAndPutManagedStateTest()
+  {
+    simpleGetAndPutTestHelper(testMeta.store);
+  }
+
+  private void simpleGetAndPutTestHelper(SpillableStateStore store)
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 
0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    Assert.assertEquals(0, map.size());
+
+    map.put("a", "1");
+    map.put("b", "2");
+    map.put("c", "3");
+
+    Assert.assertEquals(3, map.size());
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals("2", map.get("b"));
+    Assert.assertEquals("3", map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    Assert.assertEquals(3, map.size());
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals("2", map.get("b"));
+    Assert.assertEquals("3", map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    map.put("d", "4");
+    map.put("e", "5");
+    map.put("f", "6");
+
+    Assert.assertEquals(6, map.size());
+
+    Assert.assertEquals("4", map.get("d"));
+    Assert.assertEquals("5", map.get("e"));
+    Assert.assertEquals("6", map.get("f"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    map.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void simpleRemoveTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleRemoveTestHelper(store);
+  }
+
+  @Test
+  public void simpleRemoveManagedStateTest()
+  {
+    simpleRemoveTestHelper(testMeta.store);
+  }
+
+  private void simpleRemoveTestHelper(SpillableStateStore store)
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 
0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    Assert.assertEquals(0, map.size());
+
+    map.put("a", "1");
+    map.put("b", "2");
+    map.put("c", "3");
+
+    Assert.assertEquals(3, map.size());
+
+    map.remove("b");
+    map.remove("c");
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals(null, map.get("b"));
+    Assert.assertEquals(null, map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    Assert.assertEquals(1, map.size());
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    Assert.assertEquals(1, map.size());
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals(null, map.get("b"));
+    Assert.assertEquals(null, map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    map.put("d", "4");
+    map.put("e", "5");
+    map.put("f", "6");
+
+    Assert.assertEquals(4, map.size());
+
+    Assert.assertEquals("4", map.get("d"));
+    Assert.assertEquals("5", map.get("e"));
+    Assert.assertEquals("6", map.get("f"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.remove("a");
+    map.remove("d");
+    Assert.assertEquals(null, map.get("a"));
+    Assert.assertEquals(null, map.get("b"));
+    Assert.assertEquals(null, map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+    Assert.assertEquals("5", map.get("e"));
+    Assert.assertEquals("6", map.get("f"));
+    Assert.assertEquals(null, map.get("g"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    map.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void multiMapPerBucketTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    multiMapPerBucketTestHelper(store);
+  }
+
+  @Test
+  public void multiMapPerBucketManagedStateTest()
+  {
+    multiMapPerBucketTestHelper(testMeta.store);
+  }
+
+  public void multiMapPerBucketTestHelper(SpillableStateStore store)
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 
0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+    SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 
0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map1.setup(testMeta.operatorContext);
+    map2.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map1.beginWindow(windowId);
+    map2.beginWindow(windowId);
+
+    map1.put("a", "1");
+
+    Assert.assertEquals("1", map1.get("a"));
+    Assert.assertEquals(null, map2.get("a"));
+
+    map2.put("a", "a1");
+
+    Assert.assertEquals("1", map1.get("a"));
+    Assert.assertEquals("a1", map2.get("a"));
+
+    map1.put("b", "2");
+    map2.put("c", "3");
+
+    Assert.assertEquals("1", map1.get("a"));
+    Assert.assertEquals("2", map1.get("b"));
+
+    Assert.assertEquals("a1", map2.get("a"));
+    Assert.assertEquals(null, map2.get("b"));
+    Assert.assertEquals("3", map2.get("c"));
+
+    map1.endWindow();
+    map2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map1.beginWindow(windowId);
+    map2.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID2, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3");
+
+    map1.remove("a");
+
+    Assert.assertEquals(null, map1.get("a"));
+    Assert.assertEquals("a1", map2.get("a"));
+
+    map1.endWindow();
+    map2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map1.beginWindow(windowId);
+    map2.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+
+    map1.endWindow();
+    map2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+
+    map1.teardown();
+    map2.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void recoveryWithManagedStateTest() throws Exception
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableMapImpl<String, String> map1 = new 
SpillableMapImpl<>(testMeta.store, ID1, 0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    testMeta.store.setup(testMeta.operatorContext);
+    map1.setup(testMeta.operatorContext);
+
+    testMeta.store.beginWindow(0);
+    map1.beginWindow(0);
+    map1.put("x", "1");
+    map1.put("y", "2");
+    map1.put("z", "3");
+    map1.put("zz", "33");
+    Assert.assertEquals(4, map1.size());
+    map1.endWindow();
+    testMeta.store.endWindow();
+
+    testMeta.store.beginWindow(1);
+    map1.beginWindow(1);
+    Assert.assertEquals(4, map1.size());
+    map1.put("x", "4");
+    map1.put("y", "5");
+    map1.remove("zz");
+    Assert.assertEquals(3, map1.size());
+    map1.endWindow();
+    testMeta.store.endWindow();
+    testMeta.store.beforeCheckpoint(1);
+    testMeta.store.checkpointed(1);
+
+    SpillableMapImpl<String, String> clonedMap1 = 
KryoCloneUtils.cloneObject(map1);
+
+    testMeta.store.beginWindow(2);
+    map1.beginWindow(2);
+    Assert.assertEquals(3, map1.size());
+    map1.put("x", "6");
+    map1.put("y", "7");
+    map1.put("w", "8");
+    Assert.assertEquals(4, map1.size());
+    map1.endWindow();
+    testMeta.store.endWindow();
+
+    // simulating crash here
+    map1.teardown();
+    testMeta.store.teardown();
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+    Context.OperatorContext context =
+        new 
OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(),
 attributes);
+
+    map1 = clonedMap1;
+    map1.getStore().setup(context);
+    map1.setup(testMeta.operatorContext);
+
+    map1.getStore().beginWindow(2);
+    map1.beginWindow(2);
+    Assert.assertEquals(3, map1.size());
+    Assert.assertEquals("4", map1.get("x"));
+    Assert.assertEquals("5", map1.get("y"));
+    Assert.assertEquals("3", map1.get("z"));
+    map1.endWindow();
+    map1.getStore().endWindow();
+
+    map1.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
index 6b188e4..15970af 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -276,8 +276,7 @@ public class SpillableSetMultimapImplTest
     final int numOfEntry = 100000;
 
     SpillableStateStore store = testMeta.store;
-
-    SpillableByteArrayListMultimapImpl<String, String> multimap = new 
SpillableByteArrayListMultimapImpl<>(
+    SpillableSetMultimapImpl<String, String> multimap = new 
SpillableSetMultimapImpl<>(
         this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new 
SerdeStringSlice());
 
     Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
index 4edbcd0..bc5d80f 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java
@@ -91,7 +91,7 @@ public class WindowedOperatorTest
     WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new 
WindowedOperatorImpl<>();
     if (useSpillable) {
       sccImpl = new SpillableComplexComponentImpl(testMeta.store);
-      // TODO: We don't yet support Spillable data structures for window state 
storage because SpillableByteMapImpl does not yet support iterating over all 
keys.
+      // TODO: We don't yet support Spillable data structures for window state 
storage because SpillableMapImpl does not yet support iterating over all keys.
       windowStateStorage = new InMemoryWindowedStorage<>();
       SpillableWindowedPlainStorage<MutableLong> pds = new 
SpillableWindowedPlainStorage<>();
       pds.setSpillableComplexComponent(sccImpl);
@@ -117,7 +117,7 @@ public class WindowedOperatorTest
     KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> 
windowedOperator = new KeyedWindowedOperatorImpl<>();
     if (useSpillable) {
       sccImpl = new SpillableComplexComponentImpl(testMeta.store);
-      // TODO: We don't yet support Spillable data structures for window state 
storage because SpillableByteMapImpl does not yet support iterating over all 
keys.
+      // TODO: We don't yet support Spillable data structures for window state 
storage because SpillableMapImpl does not yet support iterating over all keys.
       windowStateStorage = new InMemoryWindowedStorage<>();
       if (forSession) {
         SpillableSessionWindowedStorage<String, MutableLong> sws = new 
SpillableSessionWindowedStorage<>();

Reply via email to