[GitHub] apex-malhar pull request #388: Tim serde interface
Github user ilooner closed the pull request at: https://github.com/apache/apex-malhar/pull/388 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #388: Tim serde interface
GitHub user ilooner opened a pull request: https://github.com/apache/apex-malhar/pull/388 Tim serde interface You can merge this pull request into a Git repository by running: $ git pull https://github.com/simplifi-it/kelp tim-serdeInterface Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/388.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #388 commit 3897e59b2f4bfc6958c14725d27dccf238f4fb5e Author: Chandni Singh <csi...@apache.org> Date: 2016-08-01T07:37:07Z APEXMALHAR-2063 Made window data manager use file system wal commit ac6873f6663f86f94050799ed241a702a8ae3573 Author: ilooner <timothytiborfar...@gmail.com> Date: 2016-08-08T04:17:13Z Merge pull request #1 from chandnisingh/APEXMALHAR-2063 APEXMALHAR-2063 Made window data manager use file system wal commit d54b2de283ce829dfa7756295f782a451f1e3472 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-17T21:32:34Z Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap commit f32c2eb3ac2b2067efefbb182cff6d5744b16b10 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-08-08T06:18:42Z - Deleted obsolete files - Ignored failing Spillable tests commit 960dcc0b2a09b91bd2d67e1c277033bdf3f9a369 Author: Chandni Singh <csi...@apache.org> Date: 2016-08-08T07:54:57Z Added kelp modules commit b95dcd64e308900d87aa7ed6f8690c31f8070f13 Author: Chandni Singh <csi...@apache.org> Date: 2016-08-09T01:46:51Z APEXMALHAR-2063 IncrementalCheckpointManager was using old api commit 1a140d3a89cb20a49cccaefa98e8d73bfbe5c577 Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-09T02:47:25Z Deleting unnecesary code commit b54aebcab53627757fc1405a499ee1de3b150f00 Author: Chandni Singh <csi...@apache.org> Date: 2016-08-09T03:21:03Z KELP-1: On local fs, output stream needs to closed to consume data commit 28be4dcb9bfc8b74bb49915ca45024439b6a712d Author: Chandni Singh <csi...@apache.org> Date: 2016-08-09T03:35:56Z Merge branch 'master' of github.com:simplifi-it/kelp commit e14ab2870a822c488c3c7812cbd87e3b409ccd70 Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-15T01:37:19Z Removed unnecessary operators commit 415c657ef3094744d4a5b035e039b1a624f0e6c5 Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-15T02:43:04Z Deleted uneeded operators and modules. commit e4327ad2d3a1e6355ac8d2e44e448ae49707d44c Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-21T18:58:48Z Deleted uneccessary operators and Added FieldDescriptorSerializer commit 2fef8276a8ef70699d61fca1ce8a97a6017e6b9a Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-21T19:33:40Z Made bytesList transient commit 1d6d1a31ddd32a90b4749b4ba92150d19b861803 Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-21T22:44:15Z Fix bug where incorrect offset will be set on mutable int commit 4fbdedcd95ad5945478dc8cb21c1adf5775c6ff9 Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-25T07:30:10Z Compile the kafka module by default commit d4fe8bb0df203161dce516f1bfb28db6dfec5b74 Author: Chandni Singh <csi...@apache.org> Date: 2016-08-27T03:16:17Z APEXMALHAR-2063 Made window data manager use file system wal commit ed16071bf959211a6636b337d4ea070bdd32e908 Author: Timothy Farkas <t...@simplifi.it> Date: 2016-08-28T22:37:27Z Added Serializer and Deserializer interfaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #322: APEXMALHAR-2063 Made window data manager use ...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/322#discussion_r74370558 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java --- @@ -51,291 +66,607 @@ public class FSWindowDataManager implements WindowDataManager { private static final String DEF_RECOVERY_PATH = "idempotentState"; - - protected transient FSStorageAgent storageAgent; + private static final String WAL_FILE_NAME = "wal"; /** - * Recovery path relative to app path where state is saved. + * Recovery filePath relative to app filePath where state is saved. */ @NotNull - private String recoveryPath; + private String recoveryPath = DEF_RECOVERY_PATH; private boolean isRecoveryPathRelativeToAppPath = true; /** - * largest window for which there is recovery data across all physical operator instances. + * This is not null only for one physical instance. + * It consists of operator ids which have been deleted but have some state that can be replayed. + * Only one of the instances would be handling (modifying) the files that belong to this state. + * The value is assigned during partitioning. */ - protected transient long largestRecoveryWindow; + private Set deletedOperators; + + private boolean repartitioned; /** - * This is not null only for one physical instance. - * It consists of operator ids which have been deleted but have some state that can be replayed. - * Only one of the instances would be handling (modifying) the files that belong to this state. + * Used when it is not necessary to replay every streaming/app window. + * Used by {@link IncrementalCheckpointManager} */ - protected Set deletedOperators; + private boolean relyOnCheckpoints; /** - * Sorted mapping from window id to all the operators that have state to replay for that window. + * largest window for which there is recovery data across all physical operator instances. */ - protected final transient TreeMultimap<Long, Integer> replayState; + private transient long largestRecoveryWindow = Stateless.WINDOW_ID; + + private final FSWindowReplayWAL wal = new FSWindowReplayWAL(); - protected transient FileSystem fs; - protected transient Path appPath; + //operator id -> wals (sorted) + private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>(); + + private transient String statePath; + private transient int operatorId; + + private final transient Kryo kryo = new Kryo(); + + private transient FileContext fileContext; public FSWindowDataManager() { -replayState = TreeMultimap.create(); -largestRecoveryWindow = Stateless.WINDOW_ID; -recoveryPath = DEF_RECOVERY_PATH; +kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); --- End diff -- for my learning :) why is this needed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r73809225 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +@DefaultSerializer(FieldSerializer.class) +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private transient WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new WindowBoundedMapCache<>(); + private transient boolean isRunning = false; + private transient boolean isInWindow = false; + + private int batchSize = DEFAULT_BATCH_SIZE; + @NotNull + private SpillableByteMapImpl<byte[], Integer> map; + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + + private SpillableByteArrayListMultimapImpl() + { +// for kryo + } + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + public SpillableStateStore getStore() + { +return store; + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keySlice = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray()); --- End diff -- @davidyan74 no the Spillable map prefixes each key with its identifier internally. It is not necessary for you to manually prefix the keys yourself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
GitHub user ilooner reopened a pull request: https://github.com/apache/apex-malhar/pull/324 Spillable Datastructures PR for review only You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilooner/incubator-apex-malhar APEXMALHAR-2048_pull Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-17T21:32:34Z Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-18T01:56:41Z Added SpillableComplexComponentImpl commit 9f17b4ba9233e3f46746505941a378236193f719 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-18T03:29:07Z Added propagating callbacks to store commit fe41f0c20235aac3ca57facc2491ecfba11d20a7 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-21T06:38:48Z Added checkpoint callbacks to spillable complex components Added some half completed tests commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-24T04:09:54Z Finished unit test for SpillableArrayListMultimap commit dc258b8900688264f349307737b59b096dbc3d2b Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-24T05:19:37Z Added unit test which uses managed state commit ed9924b810a76c39404701da81f4753ab68af5a5 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-24T06:55:18Z Finished adding managed state tests for SpillableByteMap commit 43da17d9633dc2405b596b25697b6b4b0baef69f Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-24T16:31:53Z Added ManagedStateTests For SpillableArrayList commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-24T16:49:00Z Added managed state tests for SpillableArrayListMultimap commit 57c4e5e3c3e019613f31753be5052c32ba762e53 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-24T16:57:35Z Added ManagedStateTest for SpillableComplexComponent commit ebc822c4ba8752f464395cf01221addc6d34 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-27T06:38:51Z Fixed broken containsKey method and added test commit 983924215af2af7b683a3b654c320b7026b3165d Author: David Yan <da...@datatorrent.com> Date: 2016-07-28T17:27:01Z isolated unit test for spillablebytemapimpl recovery commit 68ca96332dce24aa739e547ba87058dc35bb56df Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-29T08:10:28Z Testing commit a7796d61d80111faa24543ddc05b17c2944138be Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-07-31T18:44:23Z Added recovery test for SpillableArrayListMultimap and added license headers commit b3b4aacf44127c79553fc289f1f9e1f7c17b2ec6 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-08-01T03:34:18Z - Fixed bug - Added more recovery tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-core/pull/364#discussion_r73450775 --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java --- @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.engine; + +import javax.validation.constraints.NotNull; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.stram.api.OperatorDeployInfo; + +public class OperatorContextTest +{ + private static boolean foundOperatorName; + + @Test + public void testInjectionOfOperatorName() throws Exception + { +final LocalMode lma = LocalMode.newInstance(); +StreamingApplication testApp = new StreamingApplication() +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { +MockInputOperator input = dag.addOperator("input", new MockInputOperator()); +GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator()); --- End diff -- Critical catch. nice job @gauravgopi123 ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72913406 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java --- @@ -0,0 +1,458 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.google.common.collect.Lists; + +/** + * Created by tfarkas on 6/19/16. + */ +public class SpillableArrayListImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + public static final byte[] ID2 = new byte[]{(byte)1}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleAddGetAndSetTest1() + { +InMemSpillableStateStore store = new InMemSpillableStateStore(); + +simpleAddGetAndSetTest1Helper(store); + } + + @Test + public void simpleAddGetAndSetManagedStateTest1() + { +simpleAddGetAndSetTest1Helper(testMeta.store); + } + + public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) + { +SpillableArrayListImpl list = new SpillableArrayListImpl<>(0L, ID1, store, +new SerdeStringSlice(), 1); + +store.setup(testMeta.operatorContext); +list.setup(testMeta.operatorContext); + +long windowId = 0L; +store.beginWindow(windowId); +list.beginWindow(windowId); +windowId++; --- End diff -- Yes will make change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...
Github user ilooner commented on the issue: https://github.com/apache/apex-core/pull/364 @chandnisingh lgtm once unit test is updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-core/pull/364#discussion_r72900235 --- Diff: engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java --- @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.engine; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; + +public class OperatorContextTest +{ + + @Test + public void testInjectionOfOperatorName() throws Exception + { +final LocalMode lma = LocalMode.newInstance(); +final CountDownLatch latch = new CountDownLatch(1); +StreamingApplication testApp = new StreamingApplication() +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { +MockInputOperator input = dag.addOperator("input", new MockInputOperator()); +input.countDownLatch = latch; +GenericNodeTest.GenericOperator output = dag.addOperator("output", new GenericNodeTest.GenericOperator()); + +dag.addStream("stream", input.output, output.ip1); + } +}; + +lma.prepareDAG(testApp, new Configuration()); +LocalMode.Controller lc = lma.getController(); +lc.runAsync(); +latch.await(); +lc.shutdown(); + } + + private static class MockInputOperator extends BaseOperator implements InputOperator, AutoMetric.Aggregator + { +private transient CountDownLatch countDownLatch; + +@AutoMetric +String operatorName; + +public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + +@Override +public void setup(Context.OperatorContext context) +{ + operatorName = Preconditions.checkNotNull(context.getOperatorName(), "operator name"); +} + +@Override +public void emitTuples() +{ +} + +@Override +public Map<String, Object> aggregate(long windowId, Collection physicalMetrics) +{ + String name = (String)physicalMetrics.iterator().next().getMetrics().get("operatorName"); + Assert.assertEquals("operator name", "input", name); --- End diff -- Could we just set a static variable and check it in the test instead of using a latch and the assert? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r72389548 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java --- @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures + * + * @param Type of the value per window + */ +public class SpillableWindowedPlainStorage implements WindowedStorage.WindowedPlainStorage +{ + private SpillableStateStore store; + private transient SpillableComplexComponentImpl sccImpl; + private long bucket; + private Serde<Window, Slice> windowSerde; + private Serde<T, Slice> valueSerde; + + protected transient Spillable.SpillableByteMap<Window, T> internMap; + + public SpillableWindowedPlainStorage() + { + } + + public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde) + { +this.bucket = bucket; +this.windowSerde = windowSerde; +this.valueSerde = valueSerde; + } + + public void setStore(SpillableStateStore store) + { +this.store = store; + } + + public void setBucket(long bucket) + { +this.bucket = bucket; + } + + public void setWindowSerde(Serde<Window, Slice> windowSerde) + { +this.windowSerde = windowSerde; + } + + public void setValueSerde(Serde<T, Slice> valueSerde) + { +this.valueSerde = valueSerde; + } + + @Override + public void put(Window window, T value) + { +internMap.put(window, value); + } + + @Override + public T get(Window window) + { +return internMap.get(window); + } + + @Override + public Iterable<Map.Entry<Window, T>> entrySet() + { +return internMap.entrySet(); + } + + @Override + public Iterator<Map.Entry<Window, T>> iterator() + { +return internMap.entrySet().iterator(); + } + + @Override + public boolean containsWindow(Window window) + { +return internMap.containsKey(window); + } + + @Override + public long size() + { +return internMap.size(); + } + + @Override + public void remove(Window window) + { +internMap.remove(window); + } + + @Override + public void migrateWindow(Window fromWindow, Window toWindow) + { +internMap.put(toWindow, internMap.remove(fromWindow)); + } + + @Override + public void setup(Context.OperatorContext context) + { +if (store == null) { + // provide a default store + store = new ManagedStateSpillableStateStore(); +} +if (bucket == 0) { + // choose a bucket that is almost guaranteed to be unique + bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode(); +} +// se
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r72124180 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,262 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl<byte[], Integer> map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + + private boolean isRunning = false; + private boolean isInWindow = false; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); --- End diff -- No it shouldn't. That error existed before but I fixed it with my latest commits. Do you have the latest? If so I'll check tonight to see if I can reproduce it in my tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #322: APEXMALHAR-2063 Made window data manager use ...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/322#discussion_r71995236 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java --- @@ -126,36 +146,24 @@ public void teardown() } @Override -public void save(Object object, int operatorId, long windowId) throws IOException +public void save(Object object, long windowId) throws IOException { } @Override -public Object load(int operatorId, long windowId) throws IOException +public Object retrieve(long windowId) throws IOException { return null; } @Override -public void delete(int operatorId, long windowId) throws IOException -{ -} - -@Override -public void deleteUpTo(int operatorId, long windowId) throws IOException --- End diff -- These are no longer needed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #322: APEXMALHAR-2063 Made window data manager use ...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/322#discussion_r71995041 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java --- @@ -41,15 +41,42 @@ * * @since 2.0.0 */ -public interface WindowDataManager extends StorageAgent, Component +public interface WindowDataManager extends Component { /** + * Save the state for a window id. + * @param objectstate + * @param windowId window id + * @throws IOException + */ + void save(Object object, long windowId) throws IOException; + + /** + * Gets the object saved for the provided window id. + * Typically it is used to replay tuples of successive windows in input operators after failure. + * + * @param windowId window id + * @return saved state for the window id. + * @throws IOException + */ + Object retrieve(long windowId) throws IOException; + + /** + * Delete the artifact corresponding to the --- End diff -- complete javadoc here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r71096442 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java --- @@ -0,0 +1,135 @@ +package org.apache.apex.malhar.lib.window.impl; --- End diff -- Same comments above would apply here as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r71095327 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java --- @@ -0,0 +1,136 @@ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.List; +import java.util.Map; + +import javax.annotation.concurrent.Immutable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.commons.lang3.tuple.ImmutablePair; + +import com.datatorrent.netlet.util.Slice; + +/** + * Created by david on 7/15/16. + */ +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V> +{ + @NotNull + private final ManagedStateSpillableStateStore store; + + protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues; + protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys; + + private SpillableWindowedKeyedStorage() + { +// for kryo +store = null; +internValues = null; +internKeys = null; + } + + public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue) + { +store = new ManagedStateSpillableStateStore(); +store.getCheckpointManager().setNeedBucketFile(false); +internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue); +internKeys = new SpillableByteArrayListMultimapImpl<>(store, identifier, bucket, serdeWindow, serdeKey); + } + + @Override + public boolean containsWindow(Window window) + { +return internKeys.containsKey(window); + } + + @Override + public long size() + { +return internKeys.size(); + } + + @Override + public void remove(Window window) + { +List keys = internKeys.get(window); +for (K key : keys) { + internValues.remove(new ImmutablePair<>(window, key)); +} +internKeys.removeAll(window); + } + + @Override + public void migrateWindow(Window fromWindow, Window toWindow) + { +List keys = internKeys.get(fromWindow); +internValues.remove(toWindow); +for (K key : keys) { + internKeys.put(toWindow, key); + ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key); + ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key); + + V value = internValues.get(oldKey); + internValues.remove(oldKey); + internValues.put(newKey, value); +} +internKeys.removeAll(fromWindow); + } + + @Override + public void beginApexWindow(long windowId) + { +store.beginWindow(windowId); --- End diff -- spillableComplexComponent.beginWindow(windowId) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...
Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r71095257 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java --- @@ -0,0 +1,136 @@ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.List; +import java.util.Map; + +import javax.annotation.concurrent.Immutable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.commons.lang3.tuple.ImmutablePair; + +import com.datatorrent.netlet.util.Slice; + +/** + * Created by david on 7/15/16. + */ +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V> +{ + @NotNull + private final ManagedStateSpillableStateStore store; --- End diff -- The field here should be of type SpillableComplexComponent, that way ManagedState isn't hardcoded --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
GitHub user ilooner opened a pull request: https://github.com/apache/apex-malhar/pull/324 Spillable Datastructures PR for review only You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilooner/incubator-apex-malhar APEXMALHAR-2048_pull Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit c2c3f0acfcdf033a0e3044967ab3f8048f719259 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-06-05T00:11:20Z - Intermediate commit. commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-06-13T06:03:21Z Intermediate commit commit 60acf68b96f2145af3d90b410c6b20613347f881 Author: Timothy Farkas <t...@datatorrent.com> Date: 2016-06-21T06:58:09Z Intermediate commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---