[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73969296 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +/** + * Implementations of this interface are used by Spillable datastructures to spill data to disk. + */ +public interface SpillableStateStore extends BucketedState, Component, +Operator.CheckpointNotificationListener, WindowListener +{ --- End diff -- Can we add ```hasBeenSetup()``` to this? I see in the tests that the ```setup()`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73966670 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java --- @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}. + * @param The type of object stored in the {@link SpillableArrayListImpl}. + */ +@DefaultSerializer(FieldSerializer.class) +public class SpillableArrayListImpl implements Spillable.SpillableArrayList, Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + + private int batchSize = DEFAULT_BATCH_SIZE; + private long bucketId; --- End diff -- It seems ```bucketId```, ```prefix``` are unused. Also please add getter/setter for batchsize. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73941677 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java --- @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; --- End diff -- I think this should be moved to a more generic package, possibly in Apex Core. But I think it's okay for now to put it there. Just annotate with InterfaceStability.Unstable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73809225 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +@DefaultSerializer(FieldSerializer.class) +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private transient WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + private transient boolean isRunning = false; + private transient boolean isInWindow = false; + + private int batchSize = DEFAULT_BATCH_SIZE; + @NotNull + private SpillableByteMapImpl map; + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + private SpillableByteArrayListMultimapImpl() + { +// for kryo + } + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + public SpillableStateStore getStore() + { +return store; + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keySlice = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray()); --- End diff -- @davidyan74 no the Spillable map prefixes each key with its identifier internally. It is not necessary for you to manually prefix the keys yourself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
GitHub user ilooner reopened a pull request: https://github.com/apache/apex-malhar/pull/324 Spillable Datastructures PR for review only You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilooner/incubator-apex-malhar APEXMALHAR-2048_pull Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4 Author: Timothy FarkasDate: 2016-07-17T21:32:34Z Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127 Author: Timothy Farkas Date: 2016-07-18T01:56:41Z Added SpillableComplexComponentImpl commit 9f17b4ba9233e3f46746505941a378236193f719 Author: Timothy Farkas Date: 2016-07-18T03:29:07Z Added propagating callbacks to store commit fe41f0c20235aac3ca57facc2491ecfba11d20a7 Author: Timothy Farkas Date: 2016-07-21T06:38:48Z Added checkpoint callbacks to spillable complex components Added some half completed tests commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2 Author: Timothy Farkas Date: 2016-07-24T04:09:54Z Finished unit test for SpillableArrayListMultimap commit dc258b8900688264f349307737b59b096dbc3d2b Author: Timothy Farkas Date: 2016-07-24T05:19:37Z Added unit test which uses managed state commit ed9924b810a76c39404701da81f4753ab68af5a5 Author: Timothy Farkas Date: 2016-07-24T06:55:18Z Finished adding managed state tests for SpillableByteMap commit 43da17d9633dc2405b596b25697b6b4b0baef69f Author: Timothy Farkas Date: 2016-07-24T16:31:53Z Added ManagedStateTests For SpillableArrayList commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08 Author: Timothy Farkas Date: 2016-07-24T16:49:00Z Added managed state tests for SpillableArrayListMultimap commit 57c4e5e3c3e019613f31753be5052c32ba762e53 Author: Timothy Farkas Date: 2016-07-24T16:57:35Z Added ManagedStateTest for SpillableComplexComponent commit ebc822c4ba8752f464395cf01221addc6d34 Author: Timothy Farkas Date: 2016-07-27T06:38:51Z Fixed broken containsKey method and added test commit 983924215af2af7b683a3b654c320b7026b3165d Author: David Yan Date: 2016-07-28T17:27:01Z isolated unit test for spillablebytemapimpl recovery commit 68ca96332dce24aa739e547ba87058dc35bb56df Author: Timothy Farkas Date: 2016-07-29T08:10:28Z Testing commit a7796d61d80111faa24543ddc05b17c2944138be Author: Timothy Farkas Date: 2016-07-31T18:44:23Z Added recovery test for SpillableArrayListMultimap and added license headers commit b3b4aacf44127c79553fc289f1f9e1f7c17b2ec6 Author: Timothy Farkas Date: 2016-08-01T03:34:18Z - Fixed bug - Added more recovery tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73079738 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public interface SpillableStateStore extends BucketedState, Component, --- End diff -- The WindowedOperator only needs a storage that it can grab data from, and the interface of which is defined in the WindowedStorage interfaces. The user of WindowedOperator can set the storage implementation before setup(). That's why it does not have any knowledge of Spillable components. Take a look at the interface WindowedStorage here: https://github.com/davidyan74/apex-malhar/blob/66da57e08e31edb15e83a050f3940dd276851c23/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java If user of the WindowedOperator wants a custom storage implementation, they can simply implement those interfaces without having to deal with Spillable components. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73079717 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public interface SpillableStateStore extends BucketedState, Component, --- End diff -- @ilooner Why does this not extend SpillableComponent? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73076627 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public interface SpillableStateStore extends BucketedState, Component, --- End diff -- Maybe I am missing something but please bear with my questions: 1. Why should a WindowedOperator not have any knowledge of SpillableComponents? 2. Let's say if above is the case, then what is going to be the API of WindowedOperator with respect to storage. If WindowedOperator is specifying the contract for a pluggable storage than an implementation of SpillableStateStore can implement that and therefore can be set on WindowedOperator. However, I think SpillableStateStore itself can be that contract. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73075544 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public interface SpillableStateStore extends BucketedState, Component, --- End diff -- Also, as you can see, we are duplicating the beginWindow and endWindow method declaration in multiple interfaces. (see SpillableComponent and SpillableStateStore) This can be avoided with a single generic WindowListener interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73075245 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public interface SpillableStateStore extends BucketedState, Component, --- End diff -- The reason I brought this up is the WindowedOperator should have no knowledge of any Spillable components. Yet, I need to have a way to have WindowedOperator somehow call SpillableStateStore.beginWindow() and endWindow(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73073791 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +@DefaultSerializer(FieldSerializer.class) +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private transient WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + private transient boolean isRunning = false; + private transient boolean isInWindow = false; + + private int batchSize = DEFAULT_BATCH_SIZE; + @NotNull + private SpillableByteMapImpl map; + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + private SpillableByteArrayListMultimapImpl() + { +// for kryo + } + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + public SpillableStateStore getStore() + { +return store; + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keySlice = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray()); --- End diff -- I see that you removed the identifier as part of the key that stores the size. Would this create a conflict with different identifiers? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73046023 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java --- @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +/** + * Classes implementing this interface can be used as generators for identifiers for Spillable data structures. + */ +public interface SpillableIdentifierGenerator +{ + /** + * Generators the next valid identifier for a Spillable data structure. + * @return A byte array which represents the next valid identifier for a Spillable data structure. + */ + public byte[] next(); --- End diff -- nit: public is redundant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032779 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemorySpillableStateStoreTest.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable.inmem; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.util.TestUtils; + +/** + * Created by tfarkas on 6/6/16. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032644 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +/** + * Created by tfarkas on 7/17/16. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032590 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java --- @@ -0,0 +1,487 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; + +/** + * Created by tfarkas on 6/6/16. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032527 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java --- @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.List; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 7/17/16. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032414 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java --- @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.util.TestUtils; + +/** + * Created by tfarkas on 6/5/16. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032354 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java --- @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public class SliceUtils --- End diff -- javadoc for each of the utility methods --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032245 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java --- @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/11/16. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73032134 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java --- @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73031870 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +public interface SpillableStateStore extends BucketedState, Component, --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73031644 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java --- @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +public class SequentialSpillableIdentifierGenerator implements SpillableIdentifierGenerator --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73031608 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java --- @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.mutable.MutableInt; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +@DefaultSerializer(FieldSerializer.class) +public class SpillableByteMapImplimplements Spillable.SpillableByteMap , Spillable.SpillableComponent, --- End diff -- javadoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72913406 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java --- @@ -0,0 +1,458 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.google.common.collect.Lists; + +/** + * Created by tfarkas on 6/19/16. + */ +public class SpillableArrayListImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + public static final byte[] ID2 = new byte[]{(byte)1}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleAddGetAndSetTest1() + { +InMemSpillableStateStore store = new InMemSpillableStateStore(); + +simpleAddGetAndSetTest1Helper(store); + } + + @Test + public void simpleAddGetAndSetManagedStateTest1() + { +simpleAddGetAndSetTest1Helper(testMeta.store); + } + + public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) + { +SpillableArrayListImpl list = new SpillableArrayListImpl<>(0L, ID1, store, +new SerdeStringSlice(), 1); + +store.setup(testMeta.operatorContext); +list.setup(testMeta.operatorContext); + +long windowId = 0L; +store.beginWindow(windowId); +list.beginWindow(windowId); +windowId++; --- End diff -- Yes will make change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72829289 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java --- @@ -411,45 +441,48 @@ public void recoveryTest() testMeta.store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); -map1.beginWindow(1000); +System.out.println("0"); +testMeta.store.beginWindow(0); +map1.beginWindow(0); map1.put("x", "1"); map1.put("y", "2"); map1.put("z", "3"); map1.endWindow(); -map1.beginWindow(1001); +testMeta.store.endWindow(); + +System.out.println("1"); +testMeta.store.beginWindow(1); +map1.beginWindow(1); map1.put("x", "4"); map1.put("y", "5"); map1.endWindow(); - -testMeta.store.beforeCheckpoint(1001); -testMeta.store.checkpointed(1001); +testMeta.store.endWindow(); +testMeta.store.beforeCheckpoint(1); +testMeta.store.checkpointed(1); SpillableByteMapImplclonedMap1 = KryoCloneUtils.cloneObject(map1); -map1.beginWindow(1002); -map1.put("x", "6"); -map1.put("y", "7"); -map1.endWindow(); - -Assert.assertEquals("6", map1.get("x")); -Assert.assertEquals("7", map1.get("y")); -Assert.assertEquals("3", map1.get("z")); - -map1.beginWindow(1003); -map1.put("x", "8"); -map1.put("y", "9"); +System.out.println("2"); +testMeta.store.beginWindow(2); +map1.beginWindow(2); +map1.put("x1", "6"); +map1.put("y1", "7"); map1.endWindow(); +testMeta.store.endWindow(); // simulating crash here map1.teardown(); testMeta.store.teardown(); +System.out.println("Recovering"); --- End diff -- nit: sys.out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72829166 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java --- @@ -411,45 +441,48 @@ public void recoveryTest() testMeta.store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); -map1.beginWindow(1000); +System.out.println("0"); +testMeta.store.beginWindow(0); +map1.beginWindow(0); map1.put("x", "1"); map1.put("y", "2"); map1.put("z", "3"); map1.endWindow(); -map1.beginWindow(1001); +testMeta.store.endWindow(); + +System.out.println("1"); +testMeta.store.beginWindow(1); +map1.beginWindow(1); map1.put("x", "4"); map1.put("y", "5"); map1.endWindow(); - -testMeta.store.beforeCheckpoint(1001); -testMeta.store.checkpointed(1001); +testMeta.store.endWindow(); +testMeta.store.beforeCheckpoint(1); +testMeta.store.checkpointed(1); SpillableByteMapImplclonedMap1 = KryoCloneUtils.cloneObject(map1); -map1.beginWindow(1002); -map1.put("x", "6"); -map1.put("y", "7"); -map1.endWindow(); - -Assert.assertEquals("6", map1.get("x")); -Assert.assertEquals("7", map1.get("y")); -Assert.assertEquals("3", map1.get("z")); - -map1.beginWindow(1003); -map1.put("x", "8"); -map1.put("y", "9"); +System.out.println("2"); +testMeta.store.beginWindow(2); +map1.beginWindow(2); +map1.put("x1", "6"); +map1.put("y1", "7"); map1.endWindow(); +testMeta.store.endWindow(); // simulating crash here map1.teardown(); testMeta.store.teardown(); +System.out.println("Recovering"); + map1 = clonedMap1; map1.getStore().setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); --- End diff -- If activation window = -1, then ManagedState will not care about the data saved after the activation window --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72829218 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java --- @@ -411,45 +441,48 @@ public void recoveryTest() testMeta.store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); -map1.beginWindow(1000); +System.out.println("0"); +testMeta.store.beginWindow(0); +map1.beginWindow(0); map1.put("x", "1"); map1.put("y", "2"); map1.put("z", "3"); map1.endWindow(); -map1.beginWindow(1001); +testMeta.store.endWindow(); + +System.out.println("1"); --- End diff -- nit: System.out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72828429 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java --- @@ -411,45 +441,48 @@ public void recoveryTest() testMeta.store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); -map1.beginWindow(1000); +System.out.println("0"); --- End diff -- Please remove system.out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72123238 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,262 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + private boolean isRunning = false; + private boolean isInWindow = false; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); --- End diff -- I got a ClassCastException due to this line in my unit tests: ``` java.lang.ClassCastException: com.datatorrent.netlet.util.Slice cannot be cast to [B at org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde.serialize(PassThruByteArraySliceSerde.java:10) at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.get(SpillableByteMapImpl.java:94) at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.containsKey(SpillableByteMapImpl.java:70) at org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl.containsKey(SpillableByteArrayListMultimapImpl.java:139) ``` Shouldn't the 4th parameter be just serdeKey instead of the PassThruByteArraySliceSerde? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72124180 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,262 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + private boolean isRunning = false; + private boolean isInWindow = false; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); --- End diff -- No it shouldn't. That error existed before but I fixed it with my latest commits. Do you have the latest? If so I'll check tonight to see if I can reproduce it in my tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72001616 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java --- @@ -0,0 +1,458 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.google.common.collect.Lists; + +/** + * Created by tfarkas on 6/19/16. + */ +public class SpillableArrayListImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + public static final byte[] ID2 = new byte[]{(byte)1}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleAddGetAndSetTest1() + { +InMemSpillableStateStore store = new InMemSpillableStateStore(); + +simpleAddGetAndSetTest1Helper(store); + } + + @Test + public void simpleAddGetAndSetManagedStateTest1() + { +simpleAddGetAndSetTest1Helper(testMeta.store); + } + + public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) + { +SpillableArrayListImpl list = new SpillableArrayListImpl<>(0L, ID1, store, +new SerdeStringSlice(), 1); + +store.setup(testMeta.operatorContext); +list.setup(testMeta.operatorContext); + +long windowId = 0L; +store.beginWindow(windowId); +list.beginWindow(windowId); +windowId++; --- End diff -- Should this increment be done before the next beginWindow()? It looks like the incremented windowId is used for the checkpoint callbacks later on but shouldn't the callbacks get the windowId before the increment? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user sandeshh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r69600767 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java --- @@ -0,0 +1,128 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class TimeBasedPriorityQueue --- End diff -- Have you looked at https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/DelayQueue.html ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68926798 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + --- End diff -- I think, we need to provide the asynchronous get method also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68926769 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; --- End diff -- Why do we need to configure the bucket? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68560022 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keyPrefix = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX)); + --- End diff -- Accessing the value from map should be as follows: map.get(SliceUtils.concatenate().buffer) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559313 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java --- @@ -0,0 +1,52 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public class SliceUtils +{ + private SliceUtils() + { + } + + public static byte[] concatenate(byte[] a, byte[] b) + { +byte[] output = new byte[a.length + b.length]; + +System.arraycopy(a, 0, output, 0, a.length); +System.arraycopy(b, 0, output, a.length, b.length); +return output; + } + + public static Slice concatenate(Slice a, Slice b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a.buffer, a.offset, bytes, 0, a.length); +System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + +return new Slice(bytes); + } + + public static Slice concatenate(byte[] a, Slice b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a, 0, bytes, 0, a.length); +System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + +return new Slice(bytes); + } + + public static Slice concatenate(Slice a, byte[] b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a, a.offset, bytes, 0, a.length); --- End diff -- a.buffer instead of a. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559266 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java --- @@ -0,0 +1,191 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.mutable.MutableInt; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +public class SpillableByteMapImplimplements Spillable.SpillableByteMap , Spillable.SpillableComponent +{ + @NotNull + private SpillableStateStore store; + @NotNull + private byte[] identifier; + private long bucket; + @NotNull + private Serde serdeKey; + @NotNull + private Serde serdeValue; + + private int size = 0; + + private transient WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + private transient MutableInt tempOffset = new MutableInt(); + + private SpillableByteMapImpl() + { +//for kryo + } + + public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + } + + @Override + public int size() + { +return size; + } + + @Override + public boolean isEmpty() + { +return size == 0; + } + + @Override + public boolean containsKey(Object o) + { +return get(o) != null; + } + + @Override + public boolean containsValue(Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public V get(Object o) + { +K key = (K)o; + +if (cache.getRemovedKeys().contains(key)) { + return null; +} + +V val = cache.get(key); + +if (val != null) { + return val; +} + +Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); + +if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { + return null; +} + +tempOffset.setValue(valSlice.offset + identifier.length); +return serdeValue.deserialize(valSlice, tempOffset); + } + + @Override + public V put(K k, V v) + { +V value = get(k); + +if (value == null) { + size++; +} + +cache.put(k, v); + +return value; + } + + @Override + public V remove(Object o) + { +V value = get(o); + +if (value != null) { + size--; +} + +cache.remove((K)o); + +return value; + } + + @Override + public void putAll(Map map) + { +for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); +} + } + + @Override + public void clear() + { +throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Set > entrySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + } --- End diff -- I think store.setup() has to be called in setup method because SpillableStateStore extends from Component interface. Similarly the other methods like beginWindow, endWindow, teardown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559252 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keyPrefix = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX)); + + if (size == null) { +return null; + } + + spillableArrayList = new SpillableArrayListImpl(bucket, keyPrefix.buffer, store, serdeValue); + spillableArrayList.setSize(size); + + cache.put(key, spillableArrayList); +} + +return spillableArrayList; + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection > entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object key) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { +throw new UnsupportedOperationException(); + } + + @Override + public int size() + { +return map.size(); + } + + @Override + public boolean isEmpty() + { +return map.isEmpty(); + } + + @Override + public boolean containsKey(@Nullable Object key) + { +return map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), SIZE_KEY_SUFFIX)); + } + + @Override + public boolean containsValue(@Nullable Object value) + { +throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(@Nullable Object key, @Nullable Object value) + { +throw new UnsupportedOperationException(); + } + + @Override + public boolean put(@Nullable K key, @Nullable V value) + { +SpillableArrayListImpl spillableArrayList = getHelper(key); +
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
GitHub user ilooner opened a pull request: https://github.com/apache/apex-malhar/pull/324 Spillable Datastructures PR for review only You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilooner/incubator-apex-malhar APEXMALHAR-2048_pull Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit c2c3f0acfcdf033a0e3044967ab3f8048f719259 Author: Timothy FarkasDate: 2016-06-05T00:11:20Z - Intermediate commit. commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8 Author: Timothy Farkas Date: 2016-06-13T06:03:21Z Intermediate commit commit 60acf68b96f2145af3d90b410c6b20613347f881 Author: Timothy Farkas Date: 2016-06-21T06:58:09Z Intermediate commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---