[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-08 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73969296
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+/**
+ * Implementations of this interface are used by Spillable datastructures 
to spill data to disk.
+ */
+public interface SpillableStateStore extends BucketedState, 
Component,
+Operator.CheckpointNotificationListener, WindowListener
+{
--- End diff --

Can we add ```hasBeenSetup()``` to this?
I see in the tests that the ```setup()``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-08 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73966670
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
 ---
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link List} backed by a {@link 
SpillableStateStore}.
+ * @param  The type of object stored in the {@link 
SpillableArrayListImpl}.
+ */
+@DefaultSerializer(FieldSerializer.class)
+public class SpillableArrayListImpl implements 
Spillable.SpillableArrayList, Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  private long bucketId;
--- End diff --

It seems ```bucketId```, ```prefix``` are unused. Also please add 
getter/setter for batchsize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-08 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73941677
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
 ---
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
--- End diff --

I think this should be moved to a more generic package, possibly in Apex 
Core. But I think it's okay for now to put it there. Just annotate with 
InterfaceStability.Unstable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-07 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73809225
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+@DefaultSerializer(FieldSerializer.class)
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private transient WindowBoundedMapCache 
cache = new WindowBoundedMapCache<>();
+  private transient boolean isRunning = false;
+  private transient boolean isInWindow = false;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  @NotNull
+  private SpillableByteMapImpl map;
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  private SpillableByteArrayListMultimapImpl()
+  {
+// for kryo
+  }
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  public SpillableStateStore getStore()
+  {
+return store;
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keySlice = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keySlice, 
SIZE_KEY_SUFFIX).toByteArray());
--- End diff --

@davidyan74 no the Spillable map prefixes each key with its identifier 
internally. It is not necessary for you to manually prefix the keys yourself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-03 Thread ilooner
GitHub user ilooner reopened a pull request:

https://github.com/apache/apex-malhar/pull/324

Spillable Datastructures PR for review only



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ilooner/incubator-apex-malhar 
APEXMALHAR-2048_pull

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4
Author: Timothy Farkas 
Date:   2016-07-17T21:32:34Z

Added implementations of SpillableList, SpillableMap, and 
SpillableArrayListMultimap

commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127
Author: Timothy Farkas 
Date:   2016-07-18T01:56:41Z

Added SpillableComplexComponentImpl

commit 9f17b4ba9233e3f46746505941a378236193f719
Author: Timothy Farkas 
Date:   2016-07-18T03:29:07Z

Added propagating callbacks to store

commit fe41f0c20235aac3ca57facc2491ecfba11d20a7
Author: Timothy Farkas 
Date:   2016-07-21T06:38:48Z

Added checkpoint callbacks to spillable complex components
Added some half completed tests

commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2
Author: Timothy Farkas 
Date:   2016-07-24T04:09:54Z

Finished unit test for SpillableArrayListMultimap

commit dc258b8900688264f349307737b59b096dbc3d2b
Author: Timothy Farkas 
Date:   2016-07-24T05:19:37Z

Added unit test which uses managed state

commit ed9924b810a76c39404701da81f4753ab68af5a5
Author: Timothy Farkas 
Date:   2016-07-24T06:55:18Z

Finished adding managed state tests for SpillableByteMap

commit 43da17d9633dc2405b596b25697b6b4b0baef69f
Author: Timothy Farkas 
Date:   2016-07-24T16:31:53Z

Added ManagedStateTests For SpillableArrayList

commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08
Author: Timothy Farkas 
Date:   2016-07-24T16:49:00Z

Added managed state tests for SpillableArrayListMultimap

commit 57c4e5e3c3e019613f31753be5052c32ba762e53
Author: Timothy Farkas 
Date:   2016-07-24T16:57:35Z

Added ManagedStateTest for SpillableComplexComponent

commit ebc822c4ba8752f464395cf01221addc6d34
Author: Timothy Farkas 
Date:   2016-07-27T06:38:51Z

Fixed broken containsKey method and added test

commit 983924215af2af7b683a3b654c320b7026b3165d
Author: David Yan 
Date:   2016-07-28T17:27:01Z

isolated unit test for spillablebytemapimpl recovery

commit 68ca96332dce24aa739e547ba87058dc35bb56df
Author: Timothy Farkas 
Date:   2016-07-29T08:10:28Z

Testing

commit a7796d61d80111faa24543ddc05b17c2944138be
Author: Timothy Farkas 
Date:   2016-07-31T18:44:23Z

Added recovery test for SpillableArrayListMultimap and added license headers

commit b3b4aacf44127c79553fc289f1f9e1f7c17b2ec6
Author: Timothy Farkas 
Date:   2016-08-01T03:34:18Z

 - Fixed bug
 - Added more recovery tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73079738
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public interface SpillableStateStore extends BucketedState, 
Component,
--- End diff --

The WindowedOperator only needs a storage that it can grab data from, and 
the interface of which is defined in the WindowedStorage interfaces. The user 
of WindowedOperator can set the storage implementation before setup().  That's 
why it does not have any knowledge of Spillable components. Take a look at the 
interface WindowedStorage here: 
https://github.com/davidyan74/apex-malhar/blob/66da57e08e31edb15e83a050f3940dd276851c23/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java

If user of the WindowedOperator wants a custom storage implementation, they 
can simply implement those interfaces without having to deal with Spillable 
components.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73079717
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public interface SpillableStateStore extends BucketedState, 
Component,
--- End diff --

@ilooner 
Why does this not extend SpillableComponent?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73076627
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public interface SpillableStateStore extends BucketedState, 
Component,
--- End diff --

Maybe I am missing something but please bear with my questions:
1. Why should a WindowedOperator not have any knowledge of 
SpillableComponents?
2. Let's say if above is the case, then what is going to be the API of 
WindowedOperator with respect to storage. If WindowedOperator is specifying the 
contract for a pluggable storage than an implementation of SpillableStateStore 
can implement that and therefore can be set on WindowedOperator. However,  I 
think SpillableStateStore  itself can be that contract. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73075544
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public interface SpillableStateStore extends BucketedState, 
Component,
--- End diff --

Also, as you can see, we are duplicating the beginWindow and endWindow 
method declaration in multiple interfaces. (see SpillableComponent and 
SpillableStateStore)  This can be avoided with a single generic WindowListener 
interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73075245
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public interface SpillableStateStore extends BucketedState, 
Component,
--- End diff --

The reason I brought this up is the WindowedOperator should have no 
knowledge of any Spillable components. Yet, I need to have a way to have 
WindowedOperator somehow call SpillableStateStore.beginWindow() and 
endWindow(). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73073791
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+@DefaultSerializer(FieldSerializer.class)
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private transient WindowBoundedMapCache 
cache = new WindowBoundedMapCache<>();
+  private transient boolean isRunning = false;
+  private transient boolean isInWindow = false;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  @NotNull
+  private SpillableByteMapImpl map;
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  private SpillableByteArrayListMultimapImpl()
+  {
+// for kryo
+  }
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  public SpillableStateStore getStore()
+  {
+return store;
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keySlice = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keySlice, 
SIZE_KEY_SUFFIX).toByteArray());
--- End diff --

I see that you removed the identifier as part of the key that stores the 
size. Would this create a conflict with different identifiers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73046023
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java
 ---
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+/**
+ * Classes implementing this interface can be used as generators for 
identifiers for Spillable data structures.
+ */
+public interface SpillableIdentifierGenerator
+{
+  /**
+   * Generators the next valid identifier for a Spillable data structure.
+   * @return A byte array which represents the next valid identifier for a 
Spillable data structure.
+   */
+  public byte[] next();
--- End diff --

nit: public is redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032779
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemorySpillableStateStoreTest.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.inmem;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Created by tfarkas on 6/6/16.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032644
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+/**
+ * Created by tfarkas on 7/17/16.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032590
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
 ---
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+/**
+ * Created by tfarkas on 6/6/16.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032527
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
 ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 7/17/16.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032414
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java
 ---
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Created by tfarkas on 6/5/16.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032354
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SliceUtils
--- End diff --

javadoc for each of the utility  methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032245
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
 ---
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/11/16.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73032134
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
 ---
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73031870
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
 ---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+public interface SpillableStateStore extends BucketedState, 
Component,
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73031644
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class SequentialSpillableIdentifierGenerator implements 
SpillableIdentifierGenerator
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-01 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73031608
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
 ---
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+@DefaultSerializer(FieldSerializer.class)
+public class SpillableByteMapImpl implements 
Spillable.SpillableByteMap, Spillable.SpillableComponent,
--- End diff --

javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-31 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72913406
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
 ---
@@ -0,0 +1,458 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by tfarkas on 6/19/16.
+ */
+public class SpillableArrayListImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+  public static final byte[] ID2 = new byte[]{(byte)1};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleAddGetAndSetTest1()
+  {
+InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+simpleAddGetAndSetTest1Helper(store);
+  }
+
+  @Test
+  public void simpleAddGetAndSetManagedStateTest1()
+  {
+simpleAddGetAndSetTest1Helper(testMeta.store);
+  }
+
+  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+  {
+SpillableArrayListImpl list = new SpillableArrayListImpl<>(0L, 
ID1, store,
+new SerdeStringSlice(), 1);
+
+store.setup(testMeta.operatorContext);
+list.setup(testMeta.operatorContext);
+
+long windowId = 0L;
+store.beginWindow(windowId);
+list.beginWindow(windowId);
+windowId++;
--- End diff --

Yes will make change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-29 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72829289
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
 ---
@@ -411,45 +441,48 @@ public void recoveryTest()
 testMeta.store.setup(testMeta.operatorContext);
 map1.setup(testMeta.operatorContext);
 
-map1.beginWindow(1000);
+System.out.println("0");
+testMeta.store.beginWindow(0);
+map1.beginWindow(0);
 map1.put("x", "1");
 map1.put("y", "2");
 map1.put("z", "3");
 map1.endWindow();
-map1.beginWindow(1001);
+testMeta.store.endWindow();
+
+System.out.println("1");
+testMeta.store.beginWindow(1);
+map1.beginWindow(1);
 map1.put("x", "4");
 map1.put("y", "5");
 map1.endWindow();
-
-testMeta.store.beforeCheckpoint(1001);
-testMeta.store.checkpointed(1001);
+testMeta.store.endWindow();
+testMeta.store.beforeCheckpoint(1);
+testMeta.store.checkpointed(1);
 
 SpillableByteMapImpl clonedMap1 = 
KryoCloneUtils.cloneObject(map1);
 
-map1.beginWindow(1002);
-map1.put("x", "6");
-map1.put("y", "7");
-map1.endWindow();
-
-Assert.assertEquals("6", map1.get("x"));
-Assert.assertEquals("7", map1.get("y"));
-Assert.assertEquals("3", map1.get("z"));
-
-map1.beginWindow(1003);
-map1.put("x", "8");
-map1.put("y", "9");
+System.out.println("2");
+testMeta.store.beginWindow(2);
+map1.beginWindow(2);
+map1.put("x1", "6");
+map1.put("y1", "7");
 map1.endWindow();
+testMeta.store.endWindow();
 
 // simulating crash here
 map1.teardown();
 testMeta.store.teardown();
 
+System.out.println("Recovering");
--- End diff --

nit: sys.out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-29 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72829166
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
 ---
@@ -411,45 +441,48 @@ public void recoveryTest()
 testMeta.store.setup(testMeta.operatorContext);
 map1.setup(testMeta.operatorContext);
 
-map1.beginWindow(1000);
+System.out.println("0");
+testMeta.store.beginWindow(0);
+map1.beginWindow(0);
 map1.put("x", "1");
 map1.put("y", "2");
 map1.put("z", "3");
 map1.endWindow();
-map1.beginWindow(1001);
+testMeta.store.endWindow();
+
+System.out.println("1");
+testMeta.store.beginWindow(1);
+map1.beginWindow(1);
 map1.put("x", "4");
 map1.put("y", "5");
 map1.endWindow();
-
-testMeta.store.beforeCheckpoint(1001);
-testMeta.store.checkpointed(1001);
+testMeta.store.endWindow();
+testMeta.store.beforeCheckpoint(1);
+testMeta.store.checkpointed(1);
 
 SpillableByteMapImpl clonedMap1 = 
KryoCloneUtils.cloneObject(map1);
 
-map1.beginWindow(1002);
-map1.put("x", "6");
-map1.put("y", "7");
-map1.endWindow();
-
-Assert.assertEquals("6", map1.get("x"));
-Assert.assertEquals("7", map1.get("y"));
-Assert.assertEquals("3", map1.get("z"));
-
-map1.beginWindow(1003);
-map1.put("x", "8");
-map1.put("y", "9");
+System.out.println("2");
+testMeta.store.beginWindow(2);
+map1.beginWindow(2);
+map1.put("x1", "6");
+map1.put("y1", "7");
 map1.endWindow();
+testMeta.store.endWindow();
 
 // simulating crash here
 map1.teardown();
 testMeta.store.teardown();
 
+System.out.println("Recovering");
+
 map1 = clonedMap1;
 map1.getStore().setup(testMeta.operatorContext);
 map1.setup(testMeta.operatorContext);
--- End diff --

If activation window = -1, then ManagedState will not care about the data 
saved after the activation window


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-29 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72829218
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
 ---
@@ -411,45 +441,48 @@ public void recoveryTest()
 testMeta.store.setup(testMeta.operatorContext);
 map1.setup(testMeta.operatorContext);
 
-map1.beginWindow(1000);
+System.out.println("0");
+testMeta.store.beginWindow(0);
+map1.beginWindow(0);
 map1.put("x", "1");
 map1.put("y", "2");
 map1.put("z", "3");
 map1.endWindow();
-map1.beginWindow(1001);
+testMeta.store.endWindow();
+
+System.out.println("1");
--- End diff --

nit: System.out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-29 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72828429
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
 ---
@@ -411,45 +441,48 @@ public void recoveryTest()
 testMeta.store.setup(testMeta.operatorContext);
 map1.setup(testMeta.operatorContext);
 
-map1.beginWindow(1000);
+System.out.println("0");
--- End diff --

Please remove system.out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-25 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72123238
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  private boolean isRunning = false;
+  private boolean isInWindow = false;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
--- End diff --

I got a ClassCastException due to this line in my unit tests:
```
java.lang.ClassCastException: com.datatorrent.netlet.util.Slice cannot be 
cast to [B
at 
org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde.serialize(PassThruByteArraySliceSerde.java:10)
at 
org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.get(SpillableByteMapImpl.java:94)
at 
org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.containsKey(SpillableByteMapImpl.java:70)
at 
org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl.containsKey(SpillableByteArrayListMultimapImpl.java:139)
```
Shouldn't the 4th parameter be just serdeKey instead of the 
PassThruByteArraySliceSerde?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-25 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72124180
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  private boolean isRunning = false;
+  private boolean isInWindow = false;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
--- End diff --

No it shouldn't. That error existed before but I fixed it with my latest 
commits. Do you have the latest? If so I'll check tonight to see if I can 
reproduce it in my tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-24 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72001616
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
 ---
@@ -0,0 +1,458 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by tfarkas on 6/19/16.
+ */
+public class SpillableArrayListImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+  public static final byte[] ID2 = new byte[]{(byte)1};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleAddGetAndSetTest1()
+  {
+InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+simpleAddGetAndSetTest1Helper(store);
+  }
+
+  @Test
+  public void simpleAddGetAndSetManagedStateTest1()
+  {
+simpleAddGetAndSetTest1Helper(testMeta.store);
+  }
+
+  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+  {
+SpillableArrayListImpl list = new SpillableArrayListImpl<>(0L, 
ID1, store,
+new SerdeStringSlice(), 1);
+
+store.setup(testMeta.operatorContext);
+list.setup(testMeta.operatorContext);
+
+long windowId = 0L;
+store.beginWindow(windowId);
+list.beginWindow(windowId);
+windowId++;
--- End diff --

Should this increment be done before the next beginWindow()? It looks like 
the incremented windowId is used for the checkpoint callbacks later on but 
shouldn't the callbacks get the windowId before the increment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-05 Thread sandeshh
Github user sandeshh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r69600767
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
 ---
@@ -0,0 +1,128 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TimeBasedPriorityQueue
--- End diff --

Have you looked at 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/DelayQueue.html ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-29 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68926798
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
--- End diff --

I think, we need to provide the asynchronous get method also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-29 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68926769
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
--- End diff --

Why do we need to configure the bucket?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68560022
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keyPrefix = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keyPrefix, 
SIZE_KEY_SUFFIX));
+
--- End diff --

Accessing the value from map should be as follows: 
map.get(SliceUtils.concatenate().buffer)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559313
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---
@@ -0,0 +1,52 @@
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SliceUtils
+{
+  private SliceUtils()
+  {
+  }
+
+  public static byte[] concatenate(byte[] a, byte[] b)
+  {
+byte[] output = new byte[a.length + b.length];
+
+System.arraycopy(a, 0, output, 0, a.length);
+System.arraycopy(b, 0, output, a.length, b.length);
+return output;
+  }
+
+  public static Slice concatenate(Slice a, Slice b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
+System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+return new Slice(bytes);
+  }
+
+  public static Slice concatenate(byte[] a, Slice b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a, 0, bytes, 0, a.length);
+System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+return new Slice(bytes);
+  }
+
+  public static Slice concatenate(Slice a, byte[] b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a, a.offset, bytes, 0, a.length);
--- End diff --

a.buffer instead of a.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559266
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
 ---
@@ -0,0 +1,191 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+public class SpillableByteMapImpl implements 
Spillable.SpillableByteMap, Spillable.SpillableComponent
+{
+  @NotNull
+  private SpillableStateStore store;
+  @NotNull
+  private byte[] identifier;
+  private long bucket;
+  @NotNull
+  private Serde serdeKey;
+  @NotNull
+  private Serde serdeValue;
+
+  private int size = 0;
+
+  private transient WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+  private transient MutableInt tempOffset = new MutableInt();
+
+  private SpillableByteMapImpl()
+  {
+//for kryo
+  }
+
+  public SpillableByteMapImpl(SpillableStateStore store, byte[] 
identifier, long bucket, Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+  }
+
+  @Override
+  public int size()
+  {
+return size;
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+return size == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object o)
+  {
+return get(o) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public V get(Object o)
+  {
+K key = (K)o;
+
+if (cache.getRemovedKeys().contains(key)) {
+  return null;
+}
+
+V val = cache.get(key);
+
+if (val != null) {
+  return val;
+}
+
+Slice valSlice = store.getSync(bucket, 
SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+
+if (valSlice == null || valSlice == BucketedState.EXPIRED || 
valSlice.length == 0) {
+  return null;
+}
+
+tempOffset.setValue(valSlice.offset + identifier.length);
+return serdeValue.deserialize(valSlice, tempOffset);
+  }
+
+  @Override
+  public V put(K k, V v)
+  {
+V value = get(k);
+
+if (value == null) {
+  size++;
+}
+
+cache.put(k, v);
+
+return value;
+  }
+
+  @Override
+  public V remove(Object o)
+  {
+V value = get(o);
+
+if (value != null) {
+  size--;
+}
+
+cache.remove((K)o);
+
+return value;
+  }
+
+  @Override
+  public void putAll(Map map)
+  {
+for (Map.Entry entry : map.entrySet()) {
+  put(entry.getKey(), entry.getValue());
+}
+  }
+
+  @Override
+  public void clear()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set> entrySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
--- End diff --

I think store.setup() has to be called in setup method because 
SpillableStateStore extends from Component interface. Similarly the other 
methods like beginWindow, endWindow, teardown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559252
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keyPrefix = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keyPrefix, 
SIZE_KEY_SUFFIX));
+
+  if (size == null) {
+return null;
+  }
+
+  spillableArrayList = new SpillableArrayListImpl(bucket, 
keyPrefix.buffer, store, serdeValue);
+  spillableArrayList.setSize(size);
+
+  cache.put(key, spillableArrayList);
+}
+
+return spillableArrayList;
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object key)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int size()
+  {
+return map.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+return map.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(@Nullable Object key)
+  {
+return 
map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), 
SIZE_KEY_SUFFIX));
+  }
+
+  @Override
+  public boolean containsValue(@Nullable Object value)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsEntry(@Nullable Object key, @Nullable Object 
value)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean put(@Nullable K key, @Nullable V value)
+  {
+SpillableArrayListImpl spillableArrayList = getHelper(key);
+

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-21 Thread ilooner
GitHub user ilooner opened a pull request:

https://github.com/apache/apex-malhar/pull/324

Spillable Datastructures PR for review only



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ilooner/incubator-apex-malhar 
APEXMALHAR-2048_pull

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit c2c3f0acfcdf033a0e3044967ab3f8048f719259
Author: Timothy Farkas 
Date:   2016-06-05T00:11:20Z

- Intermediate commit.

commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8
Author: Timothy Farkas 
Date:   2016-06-13T06:03:21Z

Intermediate commit

commit 60acf68b96f2145af3d90b410c6b20613347f881
Author: Timothy Farkas 
Date:   2016-06-21T06:58:09Z

Intermediate commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---